[ZEPPELIN-4480]. Move the ipython code into a general jupyter kernel bridge

This commit is contained in:
Jeff Zhang 2019-12-12 17:14:16 +08:00
parent 9320d1493f
commit 78453473e6
18 changed files with 915 additions and 533 deletions

View file

@ -58,8 +58,8 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
}
@Override
protected Map<String, String> setupIPythonEnv() throws IOException {
Map<String, String> envs = super.setupIPythonEnv();
protected Map<String, String> setupKernelEnv() throws IOException {
Map<String, String> envs = super.setupKernelEnv();
String pythonPath = envs.getOrDefault("PYTHONPATH", "");
String pyflinkPythonPath = PyFlinkInterpreter.getPyFlinkPythonPath(properties);
envs.put("PYTHONPATH", pythonPath + ":" + pyflinkPythonPath);

View file

@ -58,6 +58,7 @@
<module>zeppelin-zengine</module>
<module>zeppelin-display</module>
<module>rlang</module>
<module>zeppelin-jupyter-adapter</module>
<module>kotlin</module>
<module>groovy</module>
<module>spark</module>
@ -1086,7 +1087,7 @@
<exclude>**/R/rzeppelin/DESCRIPTION</exclude>
<exclude>**/R/rzeppelin/NAMESPACE</exclude>
<exclude>python/src/main/resources/grpc/**/*</exclude>
<exclude>zeppelin-jupyter-adapter/src/main/resources/grpc/**/*</exclude>
</excludes>
</configuration>

View file

@ -41,6 +41,12 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-jupyter-adapter</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
@ -110,25 +116,6 @@
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>

View file

@ -17,44 +17,23 @@
package org.apache.zeppelin.python;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.JupyterKernelInterpreter;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteStatus;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.proto.CancelRequest;
import org.apache.zeppelin.python.proto.CompletionRequest;
import org.apache.zeppelin.python.proto.CompletionResponse;
import org.apache.zeppelin.python.proto.ExecuteRequest;
import org.apache.zeppelin.python.proto.ExecuteResponse;
import org.apache.zeppelin.python.proto.ExecuteStatus;
import org.apache.zeppelin.python.proto.IPythonStatus;
import org.apache.zeppelin.python.proto.StatusRequest;
import org.apache.zeppelin.python.proto.StatusResponse;
import org.apache.zeppelin.python.proto.StopRequest;
import org.apache.zeppelin.interpreter.util.ProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -62,43 +41,49 @@ import java.util.Properties;
/**
* IPython Interpreter for Zeppelin
*/
public class IPythonInterpreter extends Interpreter {
public class IPythonInterpreter extends JupyterKernelInterpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreter.class);
private IPythonProcessLauncher iPythonProcessLauncher;
private IPythonClient ipythonClient;
// GatewayServer in jvm side to communicate with python kernel process.
private GatewayServer gatewayServer;
protected BaseZeppelinContext zeppelinContext;
private String pythonExecutable;
private int ipythonLaunchTimeout;
// allow to set PYTHONPATH
private String additionalPythonPath;
private String additionalPythonInitFile;
private boolean useBuiltinPy4j = true;
private boolean usePy4JAuth = true;
private String secret;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
private String py4jGatewaySecret;
public IPythonInterpreter(Properties properties) {
super(properties);
}
@Override
public String getKernelName() {
return "python";
}
@Override
public List<String> getRequiredPackages() {
List<String> requiredPackages = super.getRequiredPackages();
requiredPackages.add("ipython");
requiredPackages.add("ipykernel");
return requiredPackages;
}
/**
* Sub class can customize the interpreter by adding more python packages under PYTHONPATH.
* e.g. PySparkInterpreter
* e.g. IPySparkInterpreter
*
* @param additionalPythonPath
*/
public void setAdditionalPythonPath(String additionalPythonPath) {
LOGGER.info("setAdditionalPythonPath: " + additionalPythonPath);
this.additionalPythonPath = additionalPythonPath;
}
/**
* Sub class can customize the interpreter by running additional python init code.
* e.g. PySparkInterpreter
* e.g. IPySparkInterpreter
*
* @param additionalPythonInitFile
*/
@ -106,10 +91,11 @@ public class IPythonInterpreter extends Interpreter {
this.additionalPythonInitFile = additionalPythonInitFile;
}
public void setAddBulitinPy4j(boolean add) {
this.useBuiltinPy4j = add;
public void setUseBuiltinPy4j(boolean useBuiltinPy4j) {
this.useBuiltinPy4j = useBuiltinPy4j;
}
@Override
public BaseZeppelinContext buildZeppelinContext() {
return new PythonZeppelinContext(
getInterpreterGroup().getInterpreterHookRegistry(),
@ -118,119 +104,48 @@ public class IPythonInterpreter extends Interpreter {
@Override
public void open() throws InterpreterException {
super.open();
try {
if (ipythonClient != null) {
// IPythonInterpreter might already been opened by PythonInterpreter
return;
}
pythonExecutable = getProperty("zeppelin.python", "python");
LOGGER.info("Python Exec: " + pythonExecutable);
String checkPrerequisiteResult = checkIPythonPrerequisite(pythonExecutable);
if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
throw new InterpreterException("IPython prerequisite is not meet: " +
checkPrerequisiteResult);
}
ipythonLaunchTimeout = Integer.parseInt(
getProperty("zeppelin.ipython.launch.timeout", "30000"));
this.zeppelinContext = buildZeppelinContext();
int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
int message_size = Integer.parseInt(getProperty("zeppelin.ipython.grpc.message_size",
32 * 1024 * 1024 + ""));
ipythonClient = new IPythonClient(ManagedChannelBuilder.forAddress("127.0.0.1", ipythonPort)
.usePlaintext(true).maxInboundMessageSize(message_size));
this.usePy4JAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true"));
this.secret = PythonUtils.createSecret(256);
launchIPythonKernel(ipythonPort);
setupJVMGateway(jvmGatewayPort);
String gatewayHost = PythonUtils.getLocalIP(properties);
int gatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
setupJVMGateway(gatewayHost, gatewayPort);
initPythonInterpreter(gatewayHost, gatewayPort);
} catch (Exception e) {
throw new InterpreterException("Fail to open IPythonInterpreter\n" +
ExceptionUtils.getStackTrace(e), e);
LOGGER.error("Fail to open IPythonInterpreter", e);
throw new InterpreterException(e);
}
}
/**
* non-empty return value mean the errors when checking ipython prerequisite.
* empty value mean IPython prerequisite is meet.
*
* @param pythonExec
* @return
*/
public String checkIPythonPrerequisite(String pythonExec) {
ProcessBuilder processBuilder = new ProcessBuilder(pythonExec, "-m", "pip", "freeze");
File stderrFile = null;
File stdoutFile = null;
try {
stderrFile = File.createTempFile("zeppelin", ".txt");
processBuilder.redirectError(stderrFile);
stdoutFile = File.createTempFile("zeppelin", ".txt");
processBuilder.redirectOutput(stdoutFile);
Process proc = processBuilder.start();
int ret = proc.waitFor();
if (ret != 0) {
try (FileInputStream in = new FileInputStream(stderrFile)) {
return "Fail to run pip freeze.\n" + IOUtils.toString(in);
}
}
try (FileInputStream in = new FileInputStream(stdoutFile)) {
String freezeOutput = IOUtils.toString(in);
if (!freezeOutput.contains("jupyter-client=")) {
return "jupyter-client is not installed.";
}
if (!freezeOutput.contains("ipykernel=")) {
return "ipykernel is not installed";
}
if (!freezeOutput.contains("ipython=")) {
return "ipython is not installed";
}
if (!freezeOutput.contains("grpcio=")) {
return "grpcio is not installed";
}
if (!freezeOutput.contains("protobuf=")) {
return "protobuf is not installed";
}
LOGGER.info("IPython prerequisite is met");
}
} catch (Exception e) {
LOGGER.warn("Fail to checkIPythonPrerequisite", e);
return "Fail to checkIPythonPrerequisite: " + ExceptionUtils.getStackTrace(e);
} finally {
FileUtils.deleteQuietly(stderrFile);
FileUtils.deleteQuietly(stdoutFile);
}
return "";
}
private void setupJVMGateway(int jvmGatewayPort) throws IOException {
String serverAddress = PythonUtils.getLocalIP(properties);
this.gatewayServer =
PythonUtils.createGatewayServer(this, serverAddress, jvmGatewayPort, secret, usePy4JAuth);
private void setupJVMGateway(String gatewayHost, int gatewayPort) throws IOException {
this.gatewayServer = PythonUtils.createGatewayServer(this, gatewayHost,
gatewayPort, py4jGatewaySecret, usePy4JAuth);
gatewayServer.start();
}
private void initPythonInterpreter(String gatewayHost, int gatewayPort) throws IOException {
InputStream input =
getClass().getClassLoader().getResourceAsStream("grpc/python/zeppelin_python.py");
getClass().getClassLoader().getResourceAsStream("python/zeppelin_ipython.py");
List<String> lines = IOUtils.readLines(input);
ExecuteResponse response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())
.replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")
.replace("${JVM_GATEWAY_ADDRESS}", serverAddress)).build());
ExecuteResponse response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())
.replace("${JVM_GATEWAY_PORT}", gatewayPort + "")
.replace("${JVM_GATEWAY_ADDRESS}", gatewayHost)).build());
if (response.getStatus() != ExecuteStatus.SUCCESS) {
throw new IOException("Fail to setup JVMGateway\n" + response.getOutput());
}
input =
getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py");
getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py");
lines = IOUtils.readLines(input);
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())).build());
response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())).build());
if (response.getStatus() != ExecuteStatus.SUCCESS) {
throw new IOException("Fail to import ZeppelinContext\n" + response.getOutput());
}
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
.setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)")
.build());
response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
.setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)")
.build());
if (response.getStatus() != ExecuteStatus.SUCCESS) {
throw new IOException("Fail to setup ZeppelinContext\n" + response.getOutput());
}
@ -238,65 +153,34 @@ public class IPythonInterpreter extends Interpreter {
if (additionalPythonInitFile != null) {
input = getClass().getClassLoader().getResourceAsStream(additionalPythonInitFile);
lines = IOUtils.readLines(input);
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())
.replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")
.replace("${JVM_GATEWAY_ADDRESS}", serverAddress)).build());
response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())
.replace("${JVM_GATEWAY_PORT}", gatewayPort + "")
.replace("${JVM_GATEWAY_ADDRESS}", gatewayHost)).build());
if (response.getStatus() != ExecuteStatus.SUCCESS) {
LOGGER.error("Fail to run additional Python init file\n" + response.getOutput());
throw new IOException("Fail to run additional Python init file: "
+ additionalPythonInitFile + "\n" + response.getOutput());
+ additionalPythonInitFile + "\n" + response.getOutput());
}
}
}
private void launchIPythonKernel(int ipythonPort)
throws IOException {
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
// copy the python scripts to a temp directory, then launch ipython kernel in that folder
File pythonWorkDir = Files.createTempDirectory("zeppelin_ipython").toFile();
String[] ipythonScripts = {"ipython_server.py", "ipython_pb2.py", "ipython_pb2_grpc.py"};
for (String ipythonScript : ipythonScripts) {
URL url = getClass().getClassLoader().getResource("grpc/python"
+ "/" + ipythonScript);
FileUtils.copyURLToFile(url, new File(pythonWorkDir, ipythonScript));
}
CommandLine cmd = CommandLine.parse(pythonExecutable);
cmd.addArgument(pythonWorkDir.getAbsolutePath() + "/ipython_server.py");
cmd.addArgument(ipythonPort + "");
@Override
protected Map<String, String> setupKernelEnv() throws IOException {
Map<String, String> envs = super.setupKernelEnv();
if (useBuiltinPy4j) {
//TODO(zjffdu) don't do hard code on py4j here
File py4jDestFile = new File(pythonWorkDir, "py4j-src-0.10.7.zip");
File py4jDestFile = new File(kernelWorkDir, "py4j-src-0.10.7.zip");
FileUtils.copyURLToFile(getClass().getClassLoader().getResource(
"python/py4j-src-0.10.7.zip"), py4jDestFile);
"python/py4j-src-0.10.7.zip"), py4jDestFile);
if (additionalPythonPath != null) {
// put the py4j at the end, because additionalPythonPath may already contain py4j.
// e.g. PySparkInterpreter
// e.g. IPySparkInterpreter
additionalPythonPath = additionalPythonPath + ":" + py4jDestFile.getAbsolutePath();
} else {
additionalPythonPath = py4jDestFile.getAbsolutePath();
}
}
Map<String, String> envs = setupIPythonEnv();
iPythonProcessLauncher = new IPythonProcessLauncher(cmd, envs);
iPythonProcessLauncher.launch();
iPythonProcessLauncher.waitForReady(ipythonLaunchTimeout);
if (iPythonProcessLauncher.isLaunchTimeout()) {
throw new IOException("Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000
+ " seconds.\n" + iPythonProcessLauncher.getErrorMessage());
}
if (!iPythonProcessLauncher.isRunning()) {
throw new IOException("Fail to launch IPython Kernel as the python process is failed.\n"
+ iPythonProcessLauncher.getErrorMessage());
}
}
protected Map<String, String> setupIPythonEnv() throws IOException {
Map<String, String> envs = EnvironmentUtils.getProcEnvironment();
if (envs.containsKey("PYTHONPATH")) {
if (additionalPythonPath != null) {
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
@ -304,153 +188,23 @@ public class IPythonInterpreter extends Interpreter {
} else {
envs.put("PYTHONPATH", additionalPythonPath);
}
this.usePy4JAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true"));
this.py4jGatewaySecret = PythonUtils.createSecret(256);
if (usePy4JAuth) {
envs.put("PY4J_GATEWAY_SECRET", secret);
envs.put("PY4J_GATEWAY_SECRET", this.py4jGatewaySecret);
}
LOGGER.info("PYTHONPATH:" + envs.get("PYTHONPATH"));
return envs;
}
@VisibleForTesting
public IPythonProcessLauncher getIPythonProcessLauncher() {
return iPythonProcessLauncher;
}
@Override
public void close() throws InterpreterException {
if (iPythonProcessLauncher != null) {
LOGGER.info("Kill IPython Process");
if (iPythonProcessLauncher.isRunning()) {
ipythonClient.stop(StopRequest.newBuilder().build());
try {
ipythonClient.shutdown();
} catch (InterruptedException e) {
LOGGER.warn("Exception happens when shutting down ipythonClient", e);
}
}
iPythonProcessLauncher.stop();
iPythonProcessLauncher = null;
}
super.close();
if (gatewayServer != null) {
LOGGER.info("Shutdown Py4j GatewayServer");
gatewayServer.shutdown();
gatewayServer = null;
}
}
@Override
public InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
zeppelinContext.setGui(context.getGui());
zeppelinContext.setNoteGui(context.getNoteGui());
zeppelinContext.setInterpreterContext(context);
interpreterOutput.setInterpreterOutput(context.out);
try {
ExecuteResponse response =
ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
interpreterOutput);
interpreterOutput.getInterpreterOutput().flush();
// It is not known which method is called first (ipythonClient.stream_execute
// or onProcessFailed) when ipython kernel process is exited. Because they are in
// 2 different threads. So here we would check ipythonClient's status and sleep 1 second
// if ipython kernel is maybe terminated.
if (iPythonProcessLauncher.isRunning() && !ipythonClient.isMaybeIPythonFailed()) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
if (ipythonClient.isMaybeIPythonFailed()) {
Thread.sleep(1000);
}
if (iPythonProcessLauncher.isRunning()) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"IPython kernel is abnormally exited, please check your code and log.");
}
}
} catch (Exception e) {
throw new InterpreterException("Fail to interpret python code", e);
}
}
@Override
public void cancel(InterpreterContext context) throws InterpreterException {
ipythonClient.cancel(CancelRequest.newBuilder().build());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
return 0;
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
LOGGER.debug("Call completion for: " + buf + ", cursor: " + cursor);
List<InterpreterCompletion> completions = new ArrayList<>();
CompletionResponse response =
ipythonClient.complete(
CompletionRequest.getDefaultInstance().newBuilder().setCode(buf)
.setCursor(cursor).build());
for (int i = 0; i < response.getMatchesCount(); i++) {
String match = response.getMatches(i);
int lastIndexOfDot = match.lastIndexOf(".");
if (lastIndexOfDot != -1) {
match = match.substring(lastIndexOfDot + 1);
}
LOGGER.debug("Candidate completion: " + match);
completions.add(new InterpreterCompletion(match, match, ""));
}
return completions;
}
public BaseZeppelinContext getZeppelinContext() {
return zeppelinContext;
}
class IPythonProcessLauncher extends ProcessLauncher {
IPythonProcessLauncher(CommandLine commandLine,
Map<String, String> envs) {
super(commandLine, envs);
}
@Override
public void waitForReady(int timeout) {
// wait until IPython kernel is started or timeout
long startTime = System.currentTimeMillis();
while (state == State.LAUNCHED) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Interrupted by something", e);
}
try {
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
if (response.getStatus() == IPythonStatus.RUNNING) {
LOGGER.info("IPython Kernel is Running");
onProcessRunning();
break;
} else {
LOGGER.info("Wait for IPython Kernel to be started");
}
} catch (Exception e) {
// ignore the exception, because is may happen when grpc server has not started yet.
LOGGER.info("Wait for IPython Kernel to be started");
}
if ((System.currentTimeMillis() - startTime) > timeout) {
onTimeout();
break;
}
}
}
}
}

View file

@ -83,7 +83,7 @@ public class PythonInterpreter extends Interpreter {
iPythonInterpreter = getIPythonInterpreter();
if (getProperty("zeppelin.python.useIPython", "true").equals("true") &&
StringUtils.isEmpty(
iPythonInterpreter.checkIPythonPrerequisite(getPythonExec()))) {
iPythonInterpreter.checkKernelPrerequisite(getPythonExec()))) {
try {
iPythonInterpreter.open();
LOGGER.info("IPython is available, Use IPythonInterpreter to replace PythonInterpreter");

View file

@ -384,7 +384,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
tearDown();
Properties properties = initIntpProperties();
properties.setProperty("zeppelin.ipython.grpc.message_size", "4000");
properties.setProperty("zeppelin.jupyter.kernel.grpc.message_size", "4000");
startInterpreter(properties);
@ -443,7 +443,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
Thread.sleep(3000);
IPythonInterpreter iPythonInterpreter = (IPythonInterpreter)
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
iPythonInterpreter.getIPythonProcessLauncher().stop();
iPythonInterpreter.getKernelProcessLauncher().stop();
waiter.await(3000);
}

View file

@ -59,7 +59,7 @@ public class IPySparkInterpreter extends IPythonInterpreter {
!conf.get("spark.submit.deployMode").equals("cluster")) {
setAdditionalPythonPath(PythonUtils.sparkPythonPath());
}
setAddBulitinPy4j(false);
setUseBuiltinPy4j(false);
setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
setProperty("zeppelin.py4j.useAuth",
sparkInterpreter.getSparkVersion().isSecretSocketSupported() + "");
@ -67,8 +67,8 @@ public class IPySparkInterpreter extends IPythonInterpreter {
}
@Override
protected Map<String, String> setupIPythonEnv() throws IOException {
Map<String, String> env = super.setupIPythonEnv();
protected Map<String, String> setupKernelEnv() throws IOException {
Map<String, String> env = super.setupKernelEnv();
// set PYSPARK_PYTHON
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
if (conf.contains("spark.pyspark.python")) {

View file

@ -73,7 +73,7 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT {
static String interpreterOptionPath = "";
static String originalInterpreterOption = "";
static String cmdPsPython = "ps aux | grep 'zeppelin_ipython' | grep -v 'grep' | wc -l";
static String cmdPsPython = "ps aux | grep 'kernel_server.py' | grep -v 'grep' | wc -l";
static String cmdPsInterpreter = "ps aux | grep 'zeppelin/interpreter/python/*' |" +
" sed -E '/grep|local-repo/d' | wc -l";

View file

@ -0,0 +1,191 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-jupyter-adapter</artifactId>
<packaging>jar</packaging>
<version>0.9.0-SNAPSHOT</version>
<name>Zeppelin: Jupyter Adapter</name>
<properties>
<interpreter.name>python</interpreter.name>
<python.py4j.version>0.10.7</python.py4j.version>
<grpc.version>1.15.0</grpc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>${python.py4j.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- test libraries -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>
<version>0.4.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<skip>false</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

View file

@ -15,25 +15,25 @@
* limitations under the License.
*/
package org.apache.zeppelin.python;
package org.apache.zeppelin.interpreter;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.jupyter.proto.JupyterKernelGrpc;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.proto.CancelRequest;
import org.apache.zeppelin.python.proto.CancelResponse;
import org.apache.zeppelin.python.proto.CompletionRequest;
import org.apache.zeppelin.python.proto.CompletionResponse;
import org.apache.zeppelin.python.proto.ExecuteRequest;
import org.apache.zeppelin.python.proto.ExecuteResponse;
import org.apache.zeppelin.python.proto.ExecuteStatus;
import org.apache.zeppelin.python.proto.IPythonGrpc;
import org.apache.zeppelin.python.proto.OutputType;
import org.apache.zeppelin.python.proto.StatusRequest;
import org.apache.zeppelin.python.proto.StatusResponse;
import org.apache.zeppelin.python.proto.StopRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.CancelRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.CancelResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteStatus;
import org.apache.zeppelin.interpreter.jupyter.proto.OutputType;
import org.apache.zeppelin.interpreter.jupyter.proto.StatusRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.StatusResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.StopRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,33 +44,33 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Grpc client for IPython kernel
* Grpc client for Jupyter kernel
*/
public class IPythonClient {
public class JupyterKernelClient {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonClient.class.getName());
private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelClient.class.getName());
private final ManagedChannel channel;
private final IPythonGrpc.IPythonBlockingStub blockingStub;
private final IPythonGrpc.IPythonStub asyncStub;
private volatile boolean maybeIPythonFailed = false;
private final JupyterKernelGrpc.JupyterKernelBlockingStub blockingStub;
private final JupyterKernelGrpc.JupyterKernelStub asyncStub;
private volatile boolean maybeKernelFailed = false;
private SecureRandom random = new SecureRandom();
/**
* Construct client for accessing RouteGuide server at {@code host:port}.
*/
public IPythonClient(String host, int port) {
public JupyterKernelClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true));
}
/**
* Construct client for accessing RouteGuide server using the existing channel.
*/
public IPythonClient(ManagedChannelBuilder<?> channelBuilder) {
public JupyterKernelClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = IPythonGrpc.newBlockingStub(channel);
asyncStub = IPythonGrpc.newStub(channel);
blockingStub = JupyterKernelGrpc.newBlockingStub(channel);
asyncStub = JupyterKernelGrpc.newStub(channel);
}
public void shutdown() throws InterruptedException {
@ -84,7 +84,7 @@ public class IPythonClient {
final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
.setStatus(ExecuteStatus.SUCCESS);
final AtomicBoolean completedFlag = new AtomicBoolean(false);
maybeIPythonFailed = false;
maybeKernelFailed = false;
LOGGER.debug("stream_execute code:\n" + request.getCode());
asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
OutputType lastOutputType = null;
@ -97,7 +97,7 @@ public class IPythonClient {
case TEXT:
try {
if (executeResponse.getOutput().startsWith("%")) {
// the output from ipython kernel maybe specify format already.
// the output from jupyter kernel maybe specify format already.
interpreterOutput.write((executeResponse.getOutput()).getBytes());
} else {
// only add %text when the previous output type is not TEXT.
@ -154,7 +154,7 @@ public class IPythonClient {
}
LOGGER.error("Fail to call IPython grpc", throwable);
finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
maybeIPythonFailed = true;
maybeKernelFailed = true;
completedFlag.set(true);
synchronized (completedFlag) {
completedFlag.notify();
@ -221,12 +221,12 @@ public class IPythonClient {
asyncStub.stop(request, null);
}
public boolean isMaybeIPythonFailed() {
return maybeIPythonFailed;
public boolean isMaybeKernelFailed() {
return maybeKernelFailed;
}
public static void main(String[] args) {
IPythonClient client = new IPythonClient("localhost", 50053);
JupyterKernelClient client = new JupyterKernelClient("localhost", 50053);
client.status(StatusRequest.newBuilder().build());
ExecuteResponse response = client.block_execute(ExecuteRequest.newBuilder().

View file

@ -0,0 +1,338 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.jupyter.proto.CancelRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.KernelStatus;
import org.apache.zeppelin.interpreter.jupyter.proto.StatusRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.StatusResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.StopRequest;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.interpreter.util.ProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Jupyter Kernel adapter for Zeppelin. All the jupyter kernel could be used by Zeppelin
* by extending this class.
*/
public abstract class JupyterKernelInterpreter extends Interpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelInterpreter.class);
private JupyterKernelProcessLauncher jupyterKernelProcessLauncher;
protected JupyterKernelClient jupyterKernelClient;
protected BaseZeppelinContext zeppelinContext;
// working directory of jupyter kernel
protected File kernelWorkDir;
// python executable file for launching the jupyter kernel
private String pythonExecutable;
private int kernelLaunchTimeout;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
public JupyterKernelInterpreter(Properties properties) {
super(properties);
}
public abstract String getKernelName();
public List<String> getRequiredPackages() {
List<String> requiredPackages = new ArrayList<>();
requiredPackages.add("jupyter-client");
requiredPackages.add("grpcio");
requiredPackages.add("protobuf");
return requiredPackages;
}
public abstract BaseZeppelinContext buildZeppelinContext();
@Override
public void open() throws InterpreterException {
try {
if (jupyterKernelClient != null) {
// JupyterKernelInterpreter might already been opened
return;
}
pythonExecutable = getProperty("zeppelin.python", "python");
LOGGER.info("Python Exec: " + pythonExecutable);
String checkPrerequisiteResult = checkKernelPrerequisite(pythonExecutable);
if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
throw new InterpreterException("Kernel prerequisite is not meet: " +
checkPrerequisiteResult);
}
kernelLaunchTimeout = Integer.parseInt(
getProperty("zeppelin.jupyter.kernel.launch.timeout", "30000"));
this.zeppelinContext = buildZeppelinContext();
int kernelPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
int message_size = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size",
32 * 1024 * 1024 + ""));
jupyterKernelClient = new JupyterKernelClient(ManagedChannelBuilder.forAddress("127.0.0.1",
kernelPort)
.usePlaintext(true).maxInboundMessageSize(message_size));
launchJupyterKernel(kernelPort);
} catch (Exception e) {
throw new InterpreterException("Fail to open JupyterKernelInterpreter:\n" +
ExceptionUtils.getStackTrace(e), e);
}
}
/**
* non-empty return value mean the errors when checking kernel prerequisite.
* empty value mean kernel prerequisite is met.
*
* @return check result of checking kernel prerequisite.
*/
public String checkKernelPrerequisite(String pythonExec) {
ProcessBuilder processBuilder = new ProcessBuilder(pythonExec, "-m", "pip", "freeze");
File stderrFile = null;
File stdoutFile = null;
try {
stderrFile = File.createTempFile("zeppelin", ".txt");
processBuilder.redirectError(stderrFile);
stdoutFile = File.createTempFile("zeppelin", ".txt");
processBuilder.redirectOutput(stdoutFile);
Process proc = processBuilder.start();
int ret = proc.waitFor();
if (ret != 0) {
try (FileInputStream in = new FileInputStream(stderrFile)) {
return "Fail to run pip freeze.\n" + IOUtils.toString(in);
}
}
try (FileInputStream in = new FileInputStream(stdoutFile)) {
String freezeOutput = IOUtils.toString(in);
for (String packageName : getRequiredPackages()) {
if (!freezeOutput.contains(packageName + "=")) {
return packageName + " is not installed.";
}
}
LOGGER.info("Prerequisite for kernel " + getKernelName() + " is met");
}
} catch (Exception e) {
LOGGER.warn("Fail to checkKernelPrerequisite", e);
return "Fail to checkKernelPrerequisite: " + ExceptionUtils.getStackTrace(e);
} finally {
FileUtils.deleteQuietly(stderrFile);
FileUtils.deleteQuietly(stdoutFile);
}
return "";
}
private void launchJupyterKernel(int kernelPort)
throws IOException {
LOGGER.info("Launching Jupyter Kernel at port: " + kernelPort);
// copy the python scripts to a temp directory, then launch jupyter kernel in that folder
this.kernelWorkDir = Files.createTempDirectory(
"zeppelin_jupyter_kernel_" + getKernelName()).toFile();
String[] kernelScripts = {"kernel_server.py", "kernel_pb2.py", "kernel_pb2_grpc.py"};
for (String kernelScript : kernelScripts) {
URL url = getClass().getClassLoader().getResource("grpc/jupyter"
+ "/" + kernelScript);
FileUtils.copyURLToFile(url, new File(kernelWorkDir, kernelScript));
}
CommandLine cmd = CommandLine.parse(pythonExecutable);
cmd.addArgument(kernelWorkDir.getAbsolutePath() + "/kernel_server.py");
cmd.addArgument(getKernelName());
cmd.addArgument(kernelPort + "");
Map<String, String> envs = setupKernelEnv();
jupyterKernelProcessLauncher = new JupyterKernelProcessLauncher(cmd, envs);
jupyterKernelProcessLauncher.launch();
jupyterKernelProcessLauncher.waitForReady(kernelLaunchTimeout);
if (jupyterKernelProcessLauncher.isLaunchTimeout()) {
throw new IOException("Fail to launch Jupyter Kernel in " + kernelLaunchTimeout / 1000
+ " seconds.\n" + jupyterKernelProcessLauncher.getErrorMessage());
}
if (!jupyterKernelProcessLauncher.isRunning()) {
throw new IOException("Fail to launch Jupyter Kernel as the python process is failed.\n"
+ jupyterKernelProcessLauncher.getErrorMessage());
}
}
protected Map<String, String> setupKernelEnv() throws IOException {
return EnvironmentUtils.getProcEnvironment();
}
@VisibleForTesting
public JupyterKernelProcessLauncher getKernelProcessLauncher() {
return jupyterKernelProcessLauncher;
}
@Override
public void close() throws InterpreterException {
if (jupyterKernelProcessLauncher != null) {
LOGGER.info("Killing Jupyter Kernel Process");
if (jupyterKernelProcessLauncher.isRunning()) {
jupyterKernelClient.stop(StopRequest.newBuilder().build());
try {
jupyterKernelClient.shutdown();
} catch (InterruptedException e) {
LOGGER.warn("Exception happens when shutting down jupyter kernel client", e);
}
}
jupyterKernelProcessLauncher.stop();
jupyterKernelProcessLauncher = null;
LOGGER.info("Jupyter Kernel is killed");
}
}
@Override
public InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
zeppelinContext.setGui(context.getGui());
zeppelinContext.setNoteGui(context.getNoteGui());
zeppelinContext.setInterpreterContext(context);
interpreterOutput.setInterpreterOutput(context.out);
try {
ExecuteResponse response =
jupyterKernelClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
interpreterOutput);
interpreterOutput.getInterpreterOutput().flush();
// It is not known which method is called first (JupyterKernelClient.stream_execute
// or onProcessFailed) when jupyter kernel process is exited. Because they are in
// 2 different threads. So here we would check JupyterKernelClient's status and sleep 1 second
// if jupyter kernel is maybe terminated.
if (jupyterKernelProcessLauncher.isRunning() && !jupyterKernelClient.isMaybeKernelFailed()) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
if (jupyterKernelClient.isMaybeKernelFailed()) {
Thread.sleep(1000);
}
if (jupyterKernelProcessLauncher.isRunning()) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"IPython kernel is abnormally exited, please check your code and log.");
}
}
} catch (Exception e) {
throw new InterpreterException("Fail to interpret python code", e);
}
}
@Override
public void cancel(InterpreterContext context) throws InterpreterException {
jupyterKernelClient.cancel(CancelRequest.newBuilder().build());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
return 0;
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
LOGGER.debug("Call completion for: " + buf + ", cursor: " + cursor);
List<InterpreterCompletion> completions = new ArrayList<>();
CompletionResponse response =
jupyterKernelClient.complete(
CompletionRequest.getDefaultInstance().newBuilder().setCode(buf)
.setCursor(cursor).build());
for (int i = 0; i < response.getMatchesCount(); i++) {
String match = response.getMatches(i);
int lastIndexOfDot = match.lastIndexOf(".");
if (lastIndexOfDot != -1) {
match = match.substring(lastIndexOfDot + 1);
}
LOGGER.debug("Candidate completion: " + match);
completions.add(new InterpreterCompletion(match, match, ""));
}
return completions;
}
public BaseZeppelinContext getZeppelinContext() {
return zeppelinContext;
}
public class JupyterKernelProcessLauncher extends ProcessLauncher {
JupyterKernelProcessLauncher(CommandLine commandLine,
Map<String, String> envs) {
super(commandLine, envs);
}
@Override
public void waitForReady(int timeout) {
// wait until jupyter kernel is started or timeout
long startTime = System.currentTimeMillis();
while (state == State.LAUNCHED) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Interrupted by something", e);
}
try {
StatusResponse response = jupyterKernelClient.status(StatusRequest.newBuilder().build());
if (response.getStatus() == KernelStatus.RUNNING) {
LOGGER.info("Jupyter Kernel is Running");
onProcessRunning();
break;
} else {
LOGGER.info("Wait for Jupyter Kernel to be started");
}
} catch (Exception e) {
// ignore the exception, because is may happen when grpc server has not started yet.
LOGGER.info("Wait for Jupyter Kernel to be started");
}
if ((System.currentTimeMillis() - startTime) > timeout) {
onTimeout();
break;
}
}
}
}
}

View file

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.zeppelin.interpreter.jupyter.proto";
option java_outer_classname = "JupyterKernelProto";
option objc_class_prefix = "JupyterKernel";
package jupyter;
// The JupyterKernel service definition.
service JupyterKernel {
// Sends code
rpc execute (ExecuteRequest) returns (stream ExecuteResponse) {}
// Get completion
rpc complete (CompletionRequest) returns (CompletionResponse) {}
// Cancel the running statement
rpc cancel (CancelRequest) returns (CancelResponse) {}
// Get jupyter kernel status
rpc status (StatusRequest) returns (StatusResponse) {}
// Stop jupyter kernel
rpc stop(StopRequest) returns (StopResponse) {}
}
enum ExecuteStatus {
SUCCESS = 0;
ERROR = 1;
}
enum KernelStatus {
STARTING = 0;
RUNNING = 1;
}
enum OutputType {
TEXT = 0;
PNG = 1;
JPEG = 2;
HTML = 3;
SVG = 4;
JSON = 5;
LaTeX = 6;
}
// The request message containing the code
message ExecuteRequest {
string code = 1;
}
// The response message containing the execution result.
message ExecuteResponse {
ExecuteStatus status = 1;
OutputType type = 2;
string output = 3;
}
message CancelRequest {
}
message CancelResponse {
}
message CompletionRequest {
string code = 1;
int32 cursor = 2;
}
message CompletionResponse {
repeated string matches = 1;
}
message StatusRequest {
}
message StatusResponse {
KernelStatus status = 1;
}
message StopRequest {
}
message StopResponse {
}

View file

@ -15,4 +15,4 @@
#!/usr/bin/env bash
python -m grpc_tools.protoc -I../../proto --python_out=python --grpc_python_out=python ../../proto/ipython.proto
python -m grpc_tools.protoc -I../../proto --python_out=jupyter --grpc_python_out=jupyter ../../proto/kernel.proto

View file

@ -16,19 +16,19 @@
import grpc
import ipython_pb2
import ipython_pb2_grpc
import kernel_pb2
import kernel_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50053')
stub = ipython_pb2_grpc.IPythonStub(channel)
response = stub.execute(ipython_pb2.ExecuteRequest(code="import time\nfor i in range(1,4):\n\ttime.sleep(1)\n\tprint(i)\n" +
stub = kernel_pb2_grpc.JupyterKernelStub(channel)
response = stub.execute(kernel_pb2.ExecuteRequest(code="import time\nfor i in range(1,4):\n\ttime.sleep(1)\n\tprint(i)\n" +
"%matplotlib inline\nimport matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)"))
for r in response:
print("output:" + r.output)
response = stub.execute(ipython_pb2.ExecuteRequest(code="range?"))
response = stub.execute(kernel_pb2.ExecuteRequest(code="range?"))
for r in response:
print(r)

View file

@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: ipython.proto
# source: kernel.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
@ -31,16 +32,16 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='ipython.proto',
package='ipython',
name='kernel.proto',
package='jupyter',
syntax='proto3',
serialized_options=_b('\n org.apache.zeppelin.python.protoB\014IPythonProtoP\001\242\002\007IPython'),
serialized_pb=_b('\n\ripython.proto\x12\x07ipython\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.ipython.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionResponse\x12\x0f\n\x07matches\x18\x01 \x03(\t\"\x0f\n\rStatusRequest\"8\n\x0eStatusResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.IPythonStatus\"\r\n\x0bStopRequest\"\x0e\n\x0cStopResponse*\'\n\rExecuteStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01**\n\rIPythonStatus\x12\x0c\n\x08STARTING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01*Q\n\nOutputType\x12\x08\n\x04TEXT\x10\x00\x12\x07\n\x03PNG\x10\x01\x12\x08\n\x04JPEG\x10\x02\x12\x08\n\x04HTML\x10\x03\x12\x07\n\x03SVG\x10\x04\x12\x08\n\x04JSON\x10\x05\x12\t\n\x05LaTeX\x10\x06\x32\xc3\x02\n\x07IPython\x12@\n\x07\x65xecute\x12\x17.ipython.ExecuteRequest\x1a\x18.ipython.ExecuteResponse\"\x00\x30\x01\x12\x45\n\x08\x63omplete\x12\x1a.ipython.CompletionRequest\x1a\x1b.ipython.CompletionResponse\"\x00\x12;\n\x06\x63\x61ncel\x12\x16.ipython.CancelRequest\x1a\x17.ipython.CancelResponse\"\x00\x12;\n\x06status\x12\x16.ipython.StatusRequest\x1a\x17.ipython.StatusResponse\"\x00\x12\x35\n\x04stop\x12\x14.ipython.StopRequest\x1a\x15.ipython.StopResponse\"\x00\x42<\n org.apache.zeppelin.python.protoB\x0cIPythonProtoP\x01\xa2\x02\x07IPythonb\x06proto3')
serialized_options=_b('\n-org.apache.zeppelin.interpreter.jupyter.protoB\022JupyterKernelProtoP\001\242\002\rJupyterKernel'),
serialized_pb=_b('\n\x0ckernel.proto\x12\x07jupyter\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.jupyter.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.jupyter.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionResponse\x12\x0f\n\x07matches\x18\x01 \x03(\t\"\x0f\n\rStatusRequest\"7\n\x0eStatusResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.jupyter.KernelStatus\"\r\n\x0bStopRequest\"\x0e\n\x0cStopResponse*\'\n\rExecuteStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01*)\n\x0cKernelStatus\x12\x0c\n\x08STARTING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01*Q\n\nOutputType\x12\x08\n\x04TEXT\x10\x00\x12\x07\n\x03PNG\x10\x01\x12\x08\n\x04JPEG\x10\x02\x12\x08\n\x04HTML\x10\x03\x12\x07\n\x03SVG\x10\x04\x12\x08\n\x04JSON\x10\x05\x12\t\n\x05LaTeX\x10\x06\x32\xc9\x02\n\rJupyterKernel\x12@\n\x07\x65xecute\x12\x17.jupyter.ExecuteRequest\x1a\x18.jupyter.ExecuteResponse\"\x00\x30\x01\x12\x45\n\x08\x63omplete\x12\x1a.jupyter.CompletionRequest\x1a\x1b.jupyter.CompletionResponse\"\x00\x12;\n\x06\x63\x61ncel\x12\x16.jupyter.CancelRequest\x1a\x17.jupyter.CancelResponse\"\x00\x12;\n\x06status\x12\x16.jupyter.StatusRequest\x1a\x17.jupyter.StatusResponse\"\x00\x12\x35\n\x04stop\x12\x14.jupyter.StopRequest\x1a\x15.jupyter.StopResponse\"\x00\x42U\n-org.apache.zeppelin.interpreter.jupyter.protoB\x12JupyterKernelProtoP\x01\xa2\x02\rJupyterKernelb\x06proto3')
)
_EXECUTESTATUS = _descriptor.EnumDescriptor(
name='ExecuteStatus',
full_name='ipython.ExecuteStatus',
full_name='jupyter.ExecuteStatus',
filename=None,
file=DESCRIPTOR,
values=[
@ -55,15 +56,15 @@ _EXECUTESTATUS = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
serialized_start=399,
serialized_end=438,
serialized_start=397,
serialized_end=436,
)
_sym_db.RegisterEnumDescriptor(_EXECUTESTATUS)
ExecuteStatus = enum_type_wrapper.EnumTypeWrapper(_EXECUTESTATUS)
_IPYTHONSTATUS = _descriptor.EnumDescriptor(
name='IPythonStatus',
full_name='ipython.IPythonStatus',
_KERNELSTATUS = _descriptor.EnumDescriptor(
name='KernelStatus',
full_name='jupyter.KernelStatus',
filename=None,
file=DESCRIPTOR,
values=[
@ -78,15 +79,15 @@ _IPYTHONSTATUS = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
serialized_start=440,
serialized_end=482,
serialized_start=438,
serialized_end=479,
)
_sym_db.RegisterEnumDescriptor(_IPYTHONSTATUS)
_sym_db.RegisterEnumDescriptor(_KERNELSTATUS)
IPythonStatus = enum_type_wrapper.EnumTypeWrapper(_IPYTHONSTATUS)
KernelStatus = enum_type_wrapper.EnumTypeWrapper(_KERNELSTATUS)
_OUTPUTTYPE = _descriptor.EnumDescriptor(
name='OutputType',
full_name='ipython.OutputType',
full_name='jupyter.OutputType',
filename=None,
file=DESCRIPTOR,
values=[
@ -121,8 +122,8 @@ _OUTPUTTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
serialized_start=484,
serialized_end=565,
serialized_start=481,
serialized_end=562,
)
_sym_db.RegisterEnumDescriptor(_OUTPUTTYPE)
@ -143,13 +144,13 @@ LaTeX = 6
_EXECUTEREQUEST = _descriptor.Descriptor(
name='ExecuteRequest',
full_name='ipython.ExecuteRequest',
full_name='jupyter.ExecuteRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='code', full_name='ipython.ExecuteRequest.code', index=0,
name='code', full_name='jupyter.ExecuteRequest.code', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
@ -167,34 +168,34 @@ _EXECUTEREQUEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=26,
serialized_end=56,
serialized_start=25,
serialized_end=55,
)
_EXECUTERESPONSE = _descriptor.Descriptor(
name='ExecuteResponse',
full_name='ipython.ExecuteResponse',
full_name='jupyter.ExecuteResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='status', full_name='ipython.ExecuteResponse.status', index=0,
name='status', full_name='jupyter.ExecuteResponse.status', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='type', full_name='ipython.ExecuteResponse.type', index=1,
name='type', full_name='jupyter.ExecuteResponse.type', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='output', full_name='ipython.ExecuteResponse.output', index=2,
name='output', full_name='jupyter.ExecuteResponse.output', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
@ -212,14 +213,14 @@ _EXECUTERESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=58,
serialized_end=166,
serialized_start=57,
serialized_end=165,
)
_CANCELREQUEST = _descriptor.Descriptor(
name='CancelRequest',
full_name='ipython.CancelRequest',
full_name='jupyter.CancelRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
@ -236,14 +237,14 @@ _CANCELREQUEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=168,
serialized_end=183,
serialized_start=167,
serialized_end=182,
)
_CANCELRESPONSE = _descriptor.Descriptor(
name='CancelResponse',
full_name='ipython.CancelResponse',
full_name='jupyter.CancelResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
@ -260,27 +261,27 @@ _CANCELRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=185,
serialized_end=201,
serialized_start=184,
serialized_end=200,
)
_COMPLETIONREQUEST = _descriptor.Descriptor(
name='CompletionRequest',
full_name='ipython.CompletionRequest',
full_name='jupyter.CompletionRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='code', full_name='ipython.CompletionRequest.code', index=0,
name='code', full_name='jupyter.CompletionRequest.code', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='cursor', full_name='ipython.CompletionRequest.cursor', index=1,
name='cursor', full_name='jupyter.CompletionRequest.cursor', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
@ -298,20 +299,20 @@ _COMPLETIONREQUEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=203,
serialized_end=252,
serialized_start=202,
serialized_end=251,
)
_COMPLETIONRESPONSE = _descriptor.Descriptor(
name='CompletionResponse',
full_name='ipython.CompletionResponse',
full_name='jupyter.CompletionResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='matches', full_name='ipython.CompletionResponse.matches', index=0,
name='matches', full_name='jupyter.CompletionResponse.matches', index=0,
number=1, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
@ -329,14 +330,14 @@ _COMPLETIONRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=254,
serialized_end=291,
serialized_start=253,
serialized_end=290,
)
_STATUSREQUEST = _descriptor.Descriptor(
name='StatusRequest',
full_name='ipython.StatusRequest',
full_name='jupyter.StatusRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
@ -353,20 +354,20 @@ _STATUSREQUEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=293,
serialized_end=308,
serialized_start=292,
serialized_end=307,
)
_STATUSRESPONSE = _descriptor.Descriptor(
name='StatusResponse',
full_name='ipython.StatusResponse',
full_name='jupyter.StatusResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='status', full_name='ipython.StatusResponse.status', index=0,
name='status', full_name='jupyter.StatusResponse.status', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
@ -384,14 +385,14 @@ _STATUSRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=310,
serialized_end=366,
serialized_start=309,
serialized_end=364,
)
_STOPREQUEST = _descriptor.Descriptor(
name='StopRequest',
full_name='ipython.StopRequest',
full_name='jupyter.StopRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
@ -408,14 +409,14 @@ _STOPREQUEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=368,
serialized_end=381,
serialized_start=366,
serialized_end=379,
)
_STOPRESPONSE = _descriptor.Descriptor(
name='StopResponse',
full_name='ipython.StopResponse',
full_name='jupyter.StopResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
@ -432,13 +433,13 @@ _STOPRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=383,
serialized_end=397,
serialized_start=381,
serialized_end=395,
)
_EXECUTERESPONSE.fields_by_name['status'].enum_type = _EXECUTESTATUS
_EXECUTERESPONSE.fields_by_name['type'].enum_type = _OUTPUTTYPE
_STATUSRESPONSE.fields_by_name['status'].enum_type = _IPYTHONSTATUS
_STATUSRESPONSE.fields_by_name['status'].enum_type = _KERNELSTATUS
DESCRIPTOR.message_types_by_name['ExecuteRequest'] = _EXECUTEREQUEST
DESCRIPTOR.message_types_by_name['ExecuteResponse'] = _EXECUTERESPONSE
DESCRIPTOR.message_types_by_name['CancelRequest'] = _CANCELREQUEST
@ -450,95 +451,95 @@ DESCRIPTOR.message_types_by_name['StatusResponse'] = _STATUSRESPONSE
DESCRIPTOR.message_types_by_name['StopRequest'] = _STOPREQUEST
DESCRIPTOR.message_types_by_name['StopResponse'] = _STOPRESPONSE
DESCRIPTOR.enum_types_by_name['ExecuteStatus'] = _EXECUTESTATUS
DESCRIPTOR.enum_types_by_name['IPythonStatus'] = _IPYTHONSTATUS
DESCRIPTOR.enum_types_by_name['KernelStatus'] = _KERNELSTATUS
DESCRIPTOR.enum_types_by_name['OutputType'] = _OUTPUTTYPE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
ExecuteRequest = _reflection.GeneratedProtocolMessageType('ExecuteRequest', (_message.Message,), dict(
DESCRIPTOR = _EXECUTEREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.ExecuteRequest)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.ExecuteRequest)
))
_sym_db.RegisterMessage(ExecuteRequest)
ExecuteResponse = _reflection.GeneratedProtocolMessageType('ExecuteResponse', (_message.Message,), dict(
DESCRIPTOR = _EXECUTERESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.ExecuteResponse)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.ExecuteResponse)
))
_sym_db.RegisterMessage(ExecuteResponse)
CancelRequest = _reflection.GeneratedProtocolMessageType('CancelRequest', (_message.Message,), dict(
DESCRIPTOR = _CANCELREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.CancelRequest)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.CancelRequest)
))
_sym_db.RegisterMessage(CancelRequest)
CancelResponse = _reflection.GeneratedProtocolMessageType('CancelResponse', (_message.Message,), dict(
DESCRIPTOR = _CANCELRESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.CancelResponse)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.CancelResponse)
))
_sym_db.RegisterMessage(CancelResponse)
CompletionRequest = _reflection.GeneratedProtocolMessageType('CompletionRequest', (_message.Message,), dict(
DESCRIPTOR = _COMPLETIONREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.CompletionRequest)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.CompletionRequest)
))
_sym_db.RegisterMessage(CompletionRequest)
CompletionResponse = _reflection.GeneratedProtocolMessageType('CompletionResponse', (_message.Message,), dict(
DESCRIPTOR = _COMPLETIONRESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.CompletionResponse)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.CompletionResponse)
))
_sym_db.RegisterMessage(CompletionResponse)
StatusRequest = _reflection.GeneratedProtocolMessageType('StatusRequest', (_message.Message,), dict(
DESCRIPTOR = _STATUSREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.StatusRequest)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.StatusRequest)
))
_sym_db.RegisterMessage(StatusRequest)
StatusResponse = _reflection.GeneratedProtocolMessageType('StatusResponse', (_message.Message,), dict(
DESCRIPTOR = _STATUSRESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.StatusResponse)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.StatusResponse)
))
_sym_db.RegisterMessage(StatusResponse)
StopRequest = _reflection.GeneratedProtocolMessageType('StopRequest', (_message.Message,), dict(
DESCRIPTOR = _STOPREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.StopRequest)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.StopRequest)
))
_sym_db.RegisterMessage(StopRequest)
StopResponse = _reflection.GeneratedProtocolMessageType('StopResponse', (_message.Message,), dict(
DESCRIPTOR = _STOPRESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.StopResponse)
__module__ = 'kernel_pb2'
# @@protoc_insertion_point(class_scope:jupyter.StopResponse)
))
_sym_db.RegisterMessage(StopResponse)
DESCRIPTOR._options = None
_IPYTHON = _descriptor.ServiceDescriptor(
name='IPython',
full_name='ipython.IPython',
_JUPYTERKERNEL = _descriptor.ServiceDescriptor(
name='JupyterKernel',
full_name='jupyter.JupyterKernel',
file=DESCRIPTOR,
index=0,
serialized_options=None,
serialized_start=568,
serialized_end=891,
serialized_start=565,
serialized_end=894,
methods=[
_descriptor.MethodDescriptor(
name='execute',
full_name='ipython.IPython.execute',
full_name='jupyter.JupyterKernel.execute',
index=0,
containing_service=None,
input_type=_EXECUTEREQUEST,
@ -547,7 +548,7 @@ _IPYTHON = _descriptor.ServiceDescriptor(
),
_descriptor.MethodDescriptor(
name='complete',
full_name='ipython.IPython.complete',
full_name='jupyter.JupyterKernel.complete',
index=1,
containing_service=None,
input_type=_COMPLETIONREQUEST,
@ -556,7 +557,7 @@ _IPYTHON = _descriptor.ServiceDescriptor(
),
_descriptor.MethodDescriptor(
name='cancel',
full_name='ipython.IPython.cancel',
full_name='jupyter.JupyterKernel.cancel',
index=2,
containing_service=None,
input_type=_CANCELREQUEST,
@ -565,7 +566,7 @@ _IPYTHON = _descriptor.ServiceDescriptor(
),
_descriptor.MethodDescriptor(
name='status',
full_name='ipython.IPython.status',
full_name='jupyter.JupyterKernel.status',
index=3,
containing_service=None,
input_type=_STATUSREQUEST,
@ -574,7 +575,7 @@ _IPYTHON = _descriptor.ServiceDescriptor(
),
_descriptor.MethodDescriptor(
name='stop',
full_name='ipython.IPython.stop',
full_name='jupyter.JupyterKernel.stop',
index=4,
containing_service=None,
input_type=_STOPREQUEST,
@ -582,8 +583,8 @@ _IPYTHON = _descriptor.ServiceDescriptor(
serialized_options=None,
),
])
_sym_db.RegisterServiceDescriptor(_IPYTHON)
_sym_db.RegisterServiceDescriptor(_JUPYTERKERNEL)
DESCRIPTOR.services_by_name['IPython'] = _IPYTHON
DESCRIPTOR.services_by_name['JupyterKernel'] = _JUPYTERKERNEL
# @@protoc_insertion_point(module_scope)

View file

@ -13,14 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import ipython_pb2 as ipython__pb2
import kernel_pb2 as kernel__pb2
class IPythonStub(object):
"""The IPython service definition.
class JupyterKernelStub(object):
"""The JupyterKernel service definition.
"""
def __init__(self, channel):
@ -30,34 +31,34 @@ class IPythonStub(object):
channel: A grpc.Channel.
"""
self.execute = channel.unary_stream(
'/ipython.IPython/execute',
request_serializer=ipython__pb2.ExecuteRequest.SerializeToString,
response_deserializer=ipython__pb2.ExecuteResponse.FromString,
'/jupyter.JupyterKernel/execute',
request_serializer=kernel__pb2.ExecuteRequest.SerializeToString,
response_deserializer=kernel__pb2.ExecuteResponse.FromString,
)
self.complete = channel.unary_unary(
'/ipython.IPython/complete',
request_serializer=ipython__pb2.CompletionRequest.SerializeToString,
response_deserializer=ipython__pb2.CompletionResponse.FromString,
'/jupyter.JupyterKernel/complete',
request_serializer=kernel__pb2.CompletionRequest.SerializeToString,
response_deserializer=kernel__pb2.CompletionResponse.FromString,
)
self.cancel = channel.unary_unary(
'/ipython.IPython/cancel',
request_serializer=ipython__pb2.CancelRequest.SerializeToString,
response_deserializer=ipython__pb2.CancelResponse.FromString,
'/jupyter.JupyterKernel/cancel',
request_serializer=kernel__pb2.CancelRequest.SerializeToString,
response_deserializer=kernel__pb2.CancelResponse.FromString,
)
self.status = channel.unary_unary(
'/ipython.IPython/status',
request_serializer=ipython__pb2.StatusRequest.SerializeToString,
response_deserializer=ipython__pb2.StatusResponse.FromString,
'/jupyter.JupyterKernel/status',
request_serializer=kernel__pb2.StatusRequest.SerializeToString,
response_deserializer=kernel__pb2.StatusResponse.FromString,
)
self.stop = channel.unary_unary(
'/ipython.IPython/stop',
request_serializer=ipython__pb2.StopRequest.SerializeToString,
response_deserializer=ipython__pb2.StopResponse.FromString,
'/jupyter.JupyterKernel/stop',
request_serializer=kernel__pb2.StopRequest.SerializeToString,
response_deserializer=kernel__pb2.StopResponse.FromString,
)
class IPythonServicer(object):
"""The IPython service definition.
class JupyterKernelServicer(object):
"""The JupyterKernel service definition.
"""
def execute(self, request, context):
@ -82,48 +83,48 @@ class IPythonServicer(object):
raise NotImplementedError('Method not implemented!')
def status(self, request, context):
"""Get ipython kernel status
"""Get jupyter kernel status
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def stop(self, request, context):
# missing associated documentation comment in .proto file
pass
"""Stop jupyter kernel
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_IPythonServicer_to_server(servicer, server):
def add_JupyterKernelServicer_to_server(servicer, server):
rpc_method_handlers = {
'execute': grpc.unary_stream_rpc_method_handler(
servicer.execute,
request_deserializer=ipython__pb2.ExecuteRequest.FromString,
response_serializer=ipython__pb2.ExecuteResponse.SerializeToString,
request_deserializer=kernel__pb2.ExecuteRequest.FromString,
response_serializer=kernel__pb2.ExecuteResponse.SerializeToString,
),
'complete': grpc.unary_unary_rpc_method_handler(
servicer.complete,
request_deserializer=ipython__pb2.CompletionRequest.FromString,
response_serializer=ipython__pb2.CompletionResponse.SerializeToString,
request_deserializer=kernel__pb2.CompletionRequest.FromString,
response_serializer=kernel__pb2.CompletionResponse.SerializeToString,
),
'cancel': grpc.unary_unary_rpc_method_handler(
servicer.cancel,
request_deserializer=ipython__pb2.CancelRequest.FromString,
response_serializer=ipython__pb2.CancelResponse.SerializeToString,
request_deserializer=kernel__pb2.CancelRequest.FromString,
response_serializer=kernel__pb2.CancelResponse.SerializeToString,
),
'status': grpc.unary_unary_rpc_method_handler(
servicer.status,
request_deserializer=ipython__pb2.StatusRequest.FromString,
response_serializer=ipython__pb2.StatusResponse.SerializeToString,
request_deserializer=kernel__pb2.StatusRequest.FromString,
response_serializer=kernel__pb2.StatusResponse.SerializeToString,
),
'stop': grpc.unary_unary_rpc_method_handler(
servicer.stop,
request_deserializer=ipython__pb2.StopRequest.FromString,
response_serializer=ipython__pb2.StopResponse.SerializeToString,
request_deserializer=kernel__pb2.StopRequest.FromString,
response_serializer=kernel__pb2.StopResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'ipython.IPython', rpc_method_handlers)
'jupyter.JupyterKernel', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))

View file

@ -23,8 +23,8 @@ import time
from concurrent import futures
import grpc
import ipython_pb2
import ipython_pb2_grpc
import kernel_pb2
import kernel_pb2_grpc
is_py2 = sys.version[0] == '2'
if is_py2:
@ -33,11 +33,12 @@ else:
import queue as queue
class IPython(ipython_pb2_grpc.IPythonServicer):
class KernelServer(kernel_pb2_grpc.JupyterKernelServicer):
def __init__(self, server):
self._status = ipython_pb2.STARTING
def __init__(self, server, kernel_name):
self._status = kernel_pb2.STARTING
self._server = server
self._kernel_name = kernel_name
# issue with execute_interactive and auto completion: https://github.com/jupyter/jupyter_client/issues/429
# in all case because ipython does not support run and auto completion at the same time: https://github.com/jupyter/notebook/issues/3763
# For now we will lock to ensure that there is no concurrent bug that can "hang" the kernel
@ -46,8 +47,8 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
def start(self):
print("starting...")
sys.stdout.flush()
self._km, self._kc = jupyter_client.manager.start_new_kernel(kernel_name='python')
self._status = ipython_pb2.RUNNING
self._km, self._kc = jupyter_client.manager.start_new_kernel(kernel_name=self._kernel_name)
self._status = kernel_pb2.RUNNING
def execute(self, request, context):
print("execute code:\n")
@ -59,42 +60,42 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
msg_type = msg['header']['msg_type']
content = msg['content']
print("******************* CONTENT ******************")
outStatus, outType, output = ipython_pb2.SUCCESS, None, None
outStatus, outType, output = kernel_pb2.SUCCESS, None, None
# prepare the reply
if msg_type == 'stream':
outType = ipython_pb2.TEXT
outType = kernel_pb2.TEXT
output = content['text']
elif msg_type in ('display_data', 'execute_result'):
print(content['data'])
# The if-else order matters, can not be changed. Because ipython may provide multiple output.
# TEXT is the last resort type.
if 'text/html' in content['data']:
outType = ipython_pb2.HTML
outType = kernel_pb2.HTML
output = content['data']['text/html']
elif 'image/jpeg' in content['data']:
outType = ipython_pb2.JPEG
outType = kernel_pb2.JPEG
output = content['data']['image/jpeg']
elif 'image/png' in content['data']:
outType = ipython_pb2.PNG
outType = kernel_pb2.PNG
output = content['data']['image/png']
elif 'application/javascript' in content['data']:
outType = ipython_pb2.HTML
outType = kernel_pb2.HTML
output = '<script> ' + content['data']['application/javascript'] + ' </script>\n'
elif 'application/vnd.holoviews_load.v0+json' in content['data']:
outType = ipython_pb2.HTML
outType = kernel_pb2.HTML
output = '<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n'
elif 'text/plain' in content['data']:
outType = ipython_pb2.TEXT
outType = kernel_pb2.TEXT
output = content['data']['text/plain']
elif msg_type == 'error':
outStatus = ipython_pb2.ERROR
outType = ipython_pb2.TEXT
outStatus = kernel_pb2.ERROR
outType = kernel_pb2.TEXT
output = '\n'.join(content['traceback'])
# send reply if we supported the output type
if outType is not None:
stream_reply_queue.put(
ipython_pb2.ExecuteResponse(status=outStatus,
kernel_pb2.ExecuteResponse(status=outStatus,
type=outType,
output=output))
def execute_worker():
@ -121,8 +122,8 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
# if kernel is not alive or thread is still alive, it means that we face an issue.
if not self.isKernelAlive() or t.is_alive():
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
type=ipython_pb2.TEXT,
yield kernel_pb2.ExecuteResponse(status=kernel_pb2.ERROR,
type=kernel_pb2.TEXT,
output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
if payload_reply:
result = []
@ -130,21 +131,21 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
if payload['data']['text/plain']:
result.append(payload['data']['text/plain'])
if result:
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.TEXT,
yield kernel_pb2.ExecuteResponse(status=kernel_pb2.SUCCESS,
type=kernel_pb2.TEXT,
output='\n'.join(result))
def cancel(self, request, context):
self._km.interrupt_kernel()
return ipython_pb2.CancelResponse()
return kernel_pb2.CancelResponse()
def complete(self, request, context):
with self._lock:
reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None)
return ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
return kernel_pb2.CompletionResponse(matches=reply['content']['matches'])
def status(self, request, context):
return ipython_pb2.StatusResponse(status = self._status)
return kernel_pb2.StatusResponse(status = self._status)
def isKernelAlive(self):
return self._km.is_alive()
@ -154,18 +155,18 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
def stop(self, request, context):
self.terminate()
return ipython_pb2.StopResponse()
return kernel_pb2.StopResponse()
def serve(port):
def serve(kernel_name, port):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
ipython = IPython(server)
ipython_pb2_grpc.add_IPythonServicer_to_server(ipython, server)
kernel = KernelServer(server, kernel_name)
kernel_pb2_grpc.add_JupyterKernelServicer_to_server(kernel, server)
server.add_insecure_port('[::]:' + port)
server.start()
ipython.start()
kernel.start()
try:
while ipython.isKernelAlive():
while kernel.isKernelAlive():
time.sleep(5)
except KeyboardInterrupt:
print("interrupted")
@ -173,8 +174,8 @@ def serve(port):
print("shutdown")
# we let 2 sc for all request to be complete
server.stop(2)
ipython.terminate()
kernel.terminate()
os._exit(0)
if __name__ == '__main__':
serve(sys.argv[1])
serve(sys.argv[1], sys.argv[2])