mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-3236. Make grpc framesize configurable
This commit is contained in:
parent
2be8f35065
commit
ffce774768
4 changed files with 56 additions and 4 deletions
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.python;
|
|||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.python.proto.CancelRequest;
|
||||
import org.apache.zeppelin.python.proto.CancelResponse;
|
||||
|
|
@ -131,11 +132,18 @@ public class IPythonClient {
|
|||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
try {
|
||||
interpreterOutput.getInterpreterOutput().write(ExceptionUtils.getStackTrace(throwable));
|
||||
interpreterOutput.getInterpreterOutput().flush();
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Unexpected IOException", e);
|
||||
}
|
||||
LOGGER.error("Fail to call IPython grpc", throwable);
|
||||
finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
|
||||
|
||||
completedFlag.set(true);
|
||||
synchronized (completedFlag) {
|
||||
completedFlag.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
|
|
@ -137,7 +138,10 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
|
||||
LOGGER.info("Launching JVM Gateway at port: " + jvmGatewayPort);
|
||||
ipythonClient = new IPythonClient("127.0.0.1", ipythonPort);
|
||||
int framesize = Integer.parseInt(getProperty("zeppelin.ipython.grpc.framesize",
|
||||
32 * 1024 * 1024 + ""));
|
||||
ipythonClient = new IPythonClient(ManagedChannelBuilder.forAddress("127.0.0.1", ipythonPort)
|
||||
.usePlaintext(true).maxInboundMessageSize(framesize));
|
||||
launchIPythonKernel(ipythonPort);
|
||||
setupJVMGateway(jvmGatewayPort);
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -40,6 +40,12 @@
|
|||
"defaultValue": "30000",
|
||||
"description": "time out for ipython launch",
|
||||
"type": "number"
|
||||
},
|
||||
"zeppelin.ipython.grpc.framesize": {
|
||||
"propertyName": "zeppelin.ipython.grpc.framesize",
|
||||
"defaultValue": "33554432",
|
||||
"description": "grpc framesize, default is 32M",
|
||||
"type": "number"
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
|
|
|
|||
|
|
@ -56,9 +56,7 @@ public class IPythonInterpreterTest {
|
|||
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreterTest.class);
|
||||
private IPythonInterpreter interpreter;
|
||||
|
||||
@Before
|
||||
public void setUp() throws InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
public void startInterpreter(Properties properties) throws InterpreterException {
|
||||
interpreter = new IPythonInterpreter(properties);
|
||||
InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class);
|
||||
interpreter.setInterpreterGroup(mockInterpreterGroup);
|
||||
|
|
@ -73,9 +71,45 @@ public class IPythonInterpreterTest {
|
|||
|
||||
@Test
|
||||
public void testIPython() throws IOException, InterruptedException, InterpreterException {
|
||||
startInterpreter(new Properties());
|
||||
testInterpreter(interpreter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGrpcFrameSize() throws InterpreterException, IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("zeppelin.ipython.grpc.framesize", "4");
|
||||
startInterpreter(properties);
|
||||
|
||||
// to make this test can run under both python2 and python3
|
||||
InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
result = interpreter.interpret("print(11111111111111111111111111111)", context);
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages();
|
||||
assertEquals(1, interpreterResultMessages.size());
|
||||
assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 32 exceeds maximum: 4"));
|
||||
|
||||
// next call continue work
|
||||
result = interpreter.interpret("print(1)", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
close();
|
||||
|
||||
// increase framesize to make it work
|
||||
properties.setProperty("zeppelin.ipython.grpc.framesize", "40");
|
||||
startInterpreter(properties);
|
||||
// to make this test can run under both python2 and python3
|
||||
result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
context = getInterpreterContext();
|
||||
result = interpreter.interpret("print(11111111111111111111111111111)", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
}
|
||||
|
||||
public static void testInterpreter(final Interpreter interpreter) throws IOException, InterruptedException, InterpreterException {
|
||||
// to make this test can run under both python2 and python3
|
||||
InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
|
||||
|
|
|
|||
Loading…
Reference in a new issue