ZEPPELIN-3236. Make grpc framesize configurable

This commit is contained in:
Jeff Zhang 2018-02-15 10:30:10 +08:00
parent 2be8f35065
commit ffce774768
4 changed files with 56 additions and 4 deletions

View file

@ -20,6 +20,7 @@ package org.apache.zeppelin.python;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.proto.CancelRequest;
import org.apache.zeppelin.python.proto.CancelResponse;
@ -131,11 +132,18 @@ public class IPythonClient {
@Override
public void onError(Throwable throwable) {
try {
interpreterOutput.getInterpreterOutput().write(ExceptionUtils.getStackTrace(throwable));
interpreterOutput.getInterpreterOutput().flush();
} catch (IOException e) {
LOGGER.error("Unexpected IOException", e);
}
LOGGER.error("Fail to call IPython grpc", throwable);
finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
completedFlag.set(true);
synchronized (completedFlag) {
completedFlag.notify();
}
}
@Override

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.python;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
@ -137,7 +138,10 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
LOGGER.info("Launching JVM Gateway at port: " + jvmGatewayPort);
ipythonClient = new IPythonClient("127.0.0.1", ipythonPort);
int framesize = Integer.parseInt(getProperty("zeppelin.ipython.grpc.framesize",
32 * 1024 * 1024 + ""));
ipythonClient = new IPythonClient(ManagedChannelBuilder.forAddress("127.0.0.1", ipythonPort)
.usePlaintext(true).maxInboundMessageSize(framesize));
launchIPythonKernel(ipythonPort);
setupJVMGateway(jvmGatewayPort);
} catch (Exception e) {

View file

@ -40,6 +40,12 @@
"defaultValue": "30000",
"description": "time out for ipython launch",
"type": "number"
},
"zeppelin.ipython.grpc.framesize": {
"propertyName": "zeppelin.ipython.grpc.framesize",
"defaultValue": "33554432",
"description": "grpc framesize, default is 32M",
"type": "number"
}
},
"editor": {

View file

@ -56,9 +56,7 @@ public class IPythonInterpreterTest {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreterTest.class);
private IPythonInterpreter interpreter;
@Before
public void setUp() throws InterpreterException {
Properties properties = new Properties();
public void startInterpreter(Properties properties) throws InterpreterException {
interpreter = new IPythonInterpreter(properties);
InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class);
interpreter.setInterpreterGroup(mockInterpreterGroup);
@ -73,9 +71,45 @@ public class IPythonInterpreterTest {
@Test
public void testIPython() throws IOException, InterruptedException, InterpreterException {
startInterpreter(new Properties());
testInterpreter(interpreter);
}
@Test
public void testGrpcFrameSize() throws InterpreterException, IOException {
Properties properties = new Properties();
properties.setProperty("zeppelin.ipython.grpc.framesize", "4");
startInterpreter(properties);
// to make this test can run under both python2 and python3
InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
InterpreterContext context = getInterpreterContext();
result = interpreter.interpret("print(11111111111111111111111111111)", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 32 exceeds maximum: 4"));
// next call continue work
result = interpreter.interpret("print(1)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
close();
// increase framesize to make it work
properties.setProperty("zeppelin.ipython.grpc.framesize", "40");
startInterpreter(properties);
// to make this test can run under both python2 and python3
result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
context = getInterpreterContext();
result = interpreter.interpret("print(11111111111111111111111111111)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
public static void testInterpreter(final Interpreter interpreter) throws IOException, InterruptedException, InterpreterException {
// to make this test can run under both python2 and python3
InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());