mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
[ZEPPELIN-4480]. Move the ipython code into a general jupyter kernel bridge
This commit is contained in:
parent
9320d1493f
commit
78453473e6
18 changed files with 915 additions and 533 deletions
|
|
@ -58,8 +58,8 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> setupIPythonEnv() throws IOException {
|
||||
Map<String, String> envs = super.setupIPythonEnv();
|
||||
protected Map<String, String> setupKernelEnv() throws IOException {
|
||||
Map<String, String> envs = super.setupKernelEnv();
|
||||
String pythonPath = envs.getOrDefault("PYTHONPATH", "");
|
||||
String pyflinkPythonPath = PyFlinkInterpreter.getPyFlinkPythonPath(properties);
|
||||
envs.put("PYTHONPATH", pythonPath + ":" + pyflinkPythonPath);
|
||||
|
|
|
|||
3
pom.xml
3
pom.xml
|
|
@ -58,6 +58,7 @@
|
|||
<module>zeppelin-zengine</module>
|
||||
<module>zeppelin-display</module>
|
||||
<module>rlang</module>
|
||||
<module>zeppelin-jupyter-adapter</module>
|
||||
<module>kotlin</module>
|
||||
<module>groovy</module>
|
||||
<module>spark</module>
|
||||
|
|
@ -1086,7 +1087,7 @@
|
|||
<exclude>**/R/rzeppelin/DESCRIPTION</exclude>
|
||||
<exclude>**/R/rzeppelin/NAMESPACE</exclude>
|
||||
|
||||
<exclude>python/src/main/resources/grpc/**/*</exclude>
|
||||
<exclude>zeppelin-jupyter-adapter/src/main/resources/grpc/**/*</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
|
||||
|
|
|
|||
|
|
@ -41,6 +41,12 @@
|
|||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-jupyter-adapter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-exec</artifactId>
|
||||
|
|
@ -110,25 +116,6 @@
|
|||
|
||||
<plugins>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.xolstice.maven.plugins</groupId>
|
||||
<artifactId>protobuf-maven-plugin</artifactId>
|
||||
<version>0.5.0</version>
|
||||
<configuration>
|
||||
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact>
|
||||
<pluginId>grpc-java</pluginId>
|
||||
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}</pluginArtifact>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>compile</goal>
|
||||
<goal>compile-custom</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
|
|
|
|||
|
|
@ -17,44 +17,23 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.JupyterKernelInterpreter;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteStatus;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.python.proto.CancelRequest;
|
||||
import org.apache.zeppelin.python.proto.CompletionRequest;
|
||||
import org.apache.zeppelin.python.proto.CompletionResponse;
|
||||
import org.apache.zeppelin.python.proto.ExecuteRequest;
|
||||
import org.apache.zeppelin.python.proto.ExecuteResponse;
|
||||
import org.apache.zeppelin.python.proto.ExecuteStatus;
|
||||
import org.apache.zeppelin.python.proto.IPythonStatus;
|
||||
import org.apache.zeppelin.python.proto.StatusRequest;
|
||||
import org.apache.zeppelin.python.proto.StatusResponse;
|
||||
import org.apache.zeppelin.python.proto.StopRequest;
|
||||
import org.apache.zeppelin.interpreter.util.ProcessLauncher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import py4j.GatewayServer;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
|
@ -62,43 +41,49 @@ import java.util.Properties;
|
|||
/**
|
||||
* IPython Interpreter for Zeppelin
|
||||
*/
|
||||
public class IPythonInterpreter extends Interpreter {
|
||||
public class IPythonInterpreter extends JupyterKernelInterpreter {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreter.class);
|
||||
|
||||
private IPythonProcessLauncher iPythonProcessLauncher;
|
||||
private IPythonClient ipythonClient;
|
||||
// GatewayServer in jvm side to communicate with python kernel process.
|
||||
private GatewayServer gatewayServer;
|
||||
|
||||
protected BaseZeppelinContext zeppelinContext;
|
||||
private String pythonExecutable;
|
||||
private int ipythonLaunchTimeout;
|
||||
// allow to set PYTHONPATH
|
||||
private String additionalPythonPath;
|
||||
private String additionalPythonInitFile;
|
||||
private boolean useBuiltinPy4j = true;
|
||||
private boolean usePy4JAuth = true;
|
||||
private String secret;
|
||||
|
||||
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
|
||||
private String py4jGatewaySecret;
|
||||
|
||||
public IPythonInterpreter(Properties properties) {
|
||||
super(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKernelName() {
|
||||
return "python";
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getRequiredPackages() {
|
||||
List<String> requiredPackages = super.getRequiredPackages();
|
||||
requiredPackages.add("ipython");
|
||||
requiredPackages.add("ipykernel");
|
||||
return requiredPackages;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sub class can customize the interpreter by adding more python packages under PYTHONPATH.
|
||||
* e.g. PySparkInterpreter
|
||||
* e.g. IPySparkInterpreter
|
||||
*
|
||||
* @param additionalPythonPath
|
||||
*/
|
||||
public void setAdditionalPythonPath(String additionalPythonPath) {
|
||||
LOGGER.info("setAdditionalPythonPath: " + additionalPythonPath);
|
||||
this.additionalPythonPath = additionalPythonPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sub class can customize the interpreter by running additional python init code.
|
||||
* e.g. PySparkInterpreter
|
||||
* e.g. IPySparkInterpreter
|
||||
*
|
||||
* @param additionalPythonInitFile
|
||||
*/
|
||||
|
|
@ -106,10 +91,11 @@ public class IPythonInterpreter extends Interpreter {
|
|||
this.additionalPythonInitFile = additionalPythonInitFile;
|
||||
}
|
||||
|
||||
public void setAddBulitinPy4j(boolean add) {
|
||||
this.useBuiltinPy4j = add;
|
||||
public void setUseBuiltinPy4j(boolean useBuiltinPy4j) {
|
||||
this.useBuiltinPy4j = useBuiltinPy4j;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BaseZeppelinContext buildZeppelinContext() {
|
||||
return new PythonZeppelinContext(
|
||||
getInterpreterGroup().getInterpreterHookRegistry(),
|
||||
|
|
@ -118,119 +104,48 @@ public class IPythonInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void open() throws InterpreterException {
|
||||
super.open();
|
||||
try {
|
||||
if (ipythonClient != null) {
|
||||
// IPythonInterpreter might already been opened by PythonInterpreter
|
||||
return;
|
||||
}
|
||||
pythonExecutable = getProperty("zeppelin.python", "python");
|
||||
LOGGER.info("Python Exec: " + pythonExecutable);
|
||||
String checkPrerequisiteResult = checkIPythonPrerequisite(pythonExecutable);
|
||||
if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
|
||||
throw new InterpreterException("IPython prerequisite is not meet: " +
|
||||
checkPrerequisiteResult);
|
||||
}
|
||||
ipythonLaunchTimeout = Integer.parseInt(
|
||||
getProperty("zeppelin.ipython.launch.timeout", "30000"));
|
||||
this.zeppelinContext = buildZeppelinContext();
|
||||
int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
int message_size = Integer.parseInt(getProperty("zeppelin.ipython.grpc.message_size",
|
||||
32 * 1024 * 1024 + ""));
|
||||
ipythonClient = new IPythonClient(ManagedChannelBuilder.forAddress("127.0.0.1", ipythonPort)
|
||||
.usePlaintext(true).maxInboundMessageSize(message_size));
|
||||
this.usePy4JAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true"));
|
||||
this.secret = PythonUtils.createSecret(256);
|
||||
launchIPythonKernel(ipythonPort);
|
||||
setupJVMGateway(jvmGatewayPort);
|
||||
String gatewayHost = PythonUtils.getLocalIP(properties);
|
||||
int gatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
setupJVMGateway(gatewayHost, gatewayPort);
|
||||
initPythonInterpreter(gatewayHost, gatewayPort);
|
||||
} catch (Exception e) {
|
||||
throw new InterpreterException("Fail to open IPythonInterpreter\n" +
|
||||
ExceptionUtils.getStackTrace(e), e);
|
||||
LOGGER.error("Fail to open IPythonInterpreter", e);
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* non-empty return value mean the errors when checking ipython prerequisite.
|
||||
* empty value mean IPython prerequisite is meet.
|
||||
*
|
||||
* @param pythonExec
|
||||
* @return
|
||||
*/
|
||||
public String checkIPythonPrerequisite(String pythonExec) {
|
||||
ProcessBuilder processBuilder = new ProcessBuilder(pythonExec, "-m", "pip", "freeze");
|
||||
File stderrFile = null;
|
||||
File stdoutFile = null;
|
||||
try {
|
||||
stderrFile = File.createTempFile("zeppelin", ".txt");
|
||||
processBuilder.redirectError(stderrFile);
|
||||
stdoutFile = File.createTempFile("zeppelin", ".txt");
|
||||
processBuilder.redirectOutput(stdoutFile);
|
||||
|
||||
Process proc = processBuilder.start();
|
||||
int ret = proc.waitFor();
|
||||
if (ret != 0) {
|
||||
try (FileInputStream in = new FileInputStream(stderrFile)) {
|
||||
return "Fail to run pip freeze.\n" + IOUtils.toString(in);
|
||||
}
|
||||
}
|
||||
try (FileInputStream in = new FileInputStream(stdoutFile)) {
|
||||
String freezeOutput = IOUtils.toString(in);
|
||||
if (!freezeOutput.contains("jupyter-client=")) {
|
||||
return "jupyter-client is not installed.";
|
||||
}
|
||||
if (!freezeOutput.contains("ipykernel=")) {
|
||||
return "ipykernel is not installed";
|
||||
}
|
||||
if (!freezeOutput.contains("ipython=")) {
|
||||
return "ipython is not installed";
|
||||
}
|
||||
if (!freezeOutput.contains("grpcio=")) {
|
||||
return "grpcio is not installed";
|
||||
}
|
||||
if (!freezeOutput.contains("protobuf=")) {
|
||||
return "protobuf is not installed";
|
||||
}
|
||||
LOGGER.info("IPython prerequisite is met");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Fail to checkIPythonPrerequisite", e);
|
||||
return "Fail to checkIPythonPrerequisite: " + ExceptionUtils.getStackTrace(e);
|
||||
} finally {
|
||||
FileUtils.deleteQuietly(stderrFile);
|
||||
FileUtils.deleteQuietly(stdoutFile);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
private void setupJVMGateway(int jvmGatewayPort) throws IOException {
|
||||
String serverAddress = PythonUtils.getLocalIP(properties);
|
||||
this.gatewayServer =
|
||||
PythonUtils.createGatewayServer(this, serverAddress, jvmGatewayPort, secret, usePy4JAuth);
|
||||
private void setupJVMGateway(String gatewayHost, int gatewayPort) throws IOException {
|
||||
this.gatewayServer = PythonUtils.createGatewayServer(this, gatewayHost,
|
||||
gatewayPort, py4jGatewaySecret, usePy4JAuth);
|
||||
gatewayServer.start();
|
||||
}
|
||||
|
||||
private void initPythonInterpreter(String gatewayHost, int gatewayPort) throws IOException {
|
||||
InputStream input =
|
||||
getClass().getClassLoader().getResourceAsStream("grpc/python/zeppelin_python.py");
|
||||
getClass().getClassLoader().getResourceAsStream("python/zeppelin_ipython.py");
|
||||
List<String> lines = IOUtils.readLines(input);
|
||||
ExecuteResponse response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode(StringUtils.join(lines, System.lineSeparator())
|
||||
.replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")
|
||||
.replace("${JVM_GATEWAY_ADDRESS}", serverAddress)).build());
|
||||
ExecuteResponse response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode(StringUtils.join(lines, System.lineSeparator())
|
||||
.replace("${JVM_GATEWAY_PORT}", gatewayPort + "")
|
||||
.replace("${JVM_GATEWAY_ADDRESS}", gatewayHost)).build());
|
||||
if (response.getStatus() != ExecuteStatus.SUCCESS) {
|
||||
throw new IOException("Fail to setup JVMGateway\n" + response.getOutput());
|
||||
}
|
||||
|
||||
input =
|
||||
getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py");
|
||||
getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py");
|
||||
lines = IOUtils.readLines(input);
|
||||
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode(StringUtils.join(lines, System.lineSeparator())).build());
|
||||
response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode(StringUtils.join(lines, System.lineSeparator())).build());
|
||||
if (response.getStatus() != ExecuteStatus.SUCCESS) {
|
||||
throw new IOException("Fail to import ZeppelinContext\n" + response.getOutput());
|
||||
}
|
||||
|
||||
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)")
|
||||
.build());
|
||||
response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)")
|
||||
.build());
|
||||
if (response.getStatus() != ExecuteStatus.SUCCESS) {
|
||||
throw new IOException("Fail to setup ZeppelinContext\n" + response.getOutput());
|
||||
}
|
||||
|
|
@ -238,65 +153,34 @@ public class IPythonInterpreter extends Interpreter {
|
|||
if (additionalPythonInitFile != null) {
|
||||
input = getClass().getClassLoader().getResourceAsStream(additionalPythonInitFile);
|
||||
lines = IOUtils.readLines(input);
|
||||
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode(StringUtils.join(lines, System.lineSeparator())
|
||||
.replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")
|
||||
.replace("${JVM_GATEWAY_ADDRESS}", serverAddress)).build());
|
||||
response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode(StringUtils.join(lines, System.lineSeparator())
|
||||
.replace("${JVM_GATEWAY_PORT}", gatewayPort + "")
|
||||
.replace("${JVM_GATEWAY_ADDRESS}", gatewayHost)).build());
|
||||
if (response.getStatus() != ExecuteStatus.SUCCESS) {
|
||||
LOGGER.error("Fail to run additional Python init file\n" + response.getOutput());
|
||||
throw new IOException("Fail to run additional Python init file: "
|
||||
+ additionalPythonInitFile + "\n" + response.getOutput());
|
||||
+ additionalPythonInitFile + "\n" + response.getOutput());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void launchIPythonKernel(int ipythonPort)
|
||||
throws IOException {
|
||||
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
|
||||
// copy the python scripts to a temp directory, then launch ipython kernel in that folder
|
||||
File pythonWorkDir = Files.createTempDirectory("zeppelin_ipython").toFile();
|
||||
String[] ipythonScripts = {"ipython_server.py", "ipython_pb2.py", "ipython_pb2_grpc.py"};
|
||||
for (String ipythonScript : ipythonScripts) {
|
||||
URL url = getClass().getClassLoader().getResource("grpc/python"
|
||||
+ "/" + ipythonScript);
|
||||
FileUtils.copyURLToFile(url, new File(pythonWorkDir, ipythonScript));
|
||||
}
|
||||
|
||||
CommandLine cmd = CommandLine.parse(pythonExecutable);
|
||||
cmd.addArgument(pythonWorkDir.getAbsolutePath() + "/ipython_server.py");
|
||||
cmd.addArgument(ipythonPort + "");
|
||||
|
||||
@Override
|
||||
protected Map<String, String> setupKernelEnv() throws IOException {
|
||||
Map<String, String> envs = super.setupKernelEnv();
|
||||
if (useBuiltinPy4j) {
|
||||
//TODO(zjffdu) don't do hard code on py4j here
|
||||
File py4jDestFile = new File(pythonWorkDir, "py4j-src-0.10.7.zip");
|
||||
File py4jDestFile = new File(kernelWorkDir, "py4j-src-0.10.7.zip");
|
||||
FileUtils.copyURLToFile(getClass().getClassLoader().getResource(
|
||||
"python/py4j-src-0.10.7.zip"), py4jDestFile);
|
||||
"python/py4j-src-0.10.7.zip"), py4jDestFile);
|
||||
if (additionalPythonPath != null) {
|
||||
// put the py4j at the end, because additionalPythonPath may already contain py4j.
|
||||
// e.g. PySparkInterpreter
|
||||
// e.g. IPySparkInterpreter
|
||||
additionalPythonPath = additionalPythonPath + ":" + py4jDestFile.getAbsolutePath();
|
||||
} else {
|
||||
additionalPythonPath = py4jDestFile.getAbsolutePath();
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> envs = setupIPythonEnv();
|
||||
iPythonProcessLauncher = new IPythonProcessLauncher(cmd, envs);
|
||||
iPythonProcessLauncher.launch();
|
||||
iPythonProcessLauncher.waitForReady(ipythonLaunchTimeout);
|
||||
|
||||
if (iPythonProcessLauncher.isLaunchTimeout()) {
|
||||
throw new IOException("Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000
|
||||
+ " seconds.\n" + iPythonProcessLauncher.getErrorMessage());
|
||||
}
|
||||
if (!iPythonProcessLauncher.isRunning()) {
|
||||
throw new IOException("Fail to launch IPython Kernel as the python process is failed.\n"
|
||||
+ iPythonProcessLauncher.getErrorMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<String, String> setupIPythonEnv() throws IOException {
|
||||
Map<String, String> envs = EnvironmentUtils.getProcEnvironment();
|
||||
if (envs.containsKey("PYTHONPATH")) {
|
||||
if (additionalPythonPath != null) {
|
||||
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
|
||||
|
|
@ -304,153 +188,23 @@ public class IPythonInterpreter extends Interpreter {
|
|||
} else {
|
||||
envs.put("PYTHONPATH", additionalPythonPath);
|
||||
}
|
||||
|
||||
this.usePy4JAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true"));
|
||||
this.py4jGatewaySecret = PythonUtils.createSecret(256);
|
||||
if (usePy4JAuth) {
|
||||
envs.put("PY4J_GATEWAY_SECRET", secret);
|
||||
envs.put("PY4J_GATEWAY_SECRET", this.py4jGatewaySecret);
|
||||
}
|
||||
LOGGER.info("PYTHONPATH:" + envs.get("PYTHONPATH"));
|
||||
return envs;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public IPythonProcessLauncher getIPythonProcessLauncher() {
|
||||
return iPythonProcessLauncher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws InterpreterException {
|
||||
if (iPythonProcessLauncher != null) {
|
||||
LOGGER.info("Kill IPython Process");
|
||||
if (iPythonProcessLauncher.isRunning()) {
|
||||
ipythonClient.stop(StopRequest.newBuilder().build());
|
||||
try {
|
||||
ipythonClient.shutdown();
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn("Exception happens when shutting down ipythonClient", e);
|
||||
}
|
||||
}
|
||||
iPythonProcessLauncher.stop();
|
||||
iPythonProcessLauncher = null;
|
||||
}
|
||||
super.close();
|
||||
if (gatewayServer != null) {
|
||||
LOGGER.info("Shutdown Py4j GatewayServer");
|
||||
gatewayServer.shutdown();
|
||||
gatewayServer = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st,
|
||||
InterpreterContext context) throws InterpreterException {
|
||||
zeppelinContext.setGui(context.getGui());
|
||||
zeppelinContext.setNoteGui(context.getNoteGui());
|
||||
zeppelinContext.setInterpreterContext(context);
|
||||
interpreterOutput.setInterpreterOutput(context.out);
|
||||
try {
|
||||
ExecuteResponse response =
|
||||
ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
|
||||
interpreterOutput);
|
||||
interpreterOutput.getInterpreterOutput().flush();
|
||||
// It is not known which method is called first (ipythonClient.stream_execute
|
||||
// or onProcessFailed) when ipython kernel process is exited. Because they are in
|
||||
// 2 different threads. So here we would check ipythonClient's status and sleep 1 second
|
||||
// if ipython kernel is maybe terminated.
|
||||
if (iPythonProcessLauncher.isRunning() && !ipythonClient.isMaybeIPythonFailed()) {
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.valueOf(response.getStatus().name()));
|
||||
} else {
|
||||
if (ipythonClient.isMaybeIPythonFailed()) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
if (iPythonProcessLauncher.isRunning()) {
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.valueOf(response.getStatus().name()));
|
||||
} else {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"IPython kernel is abnormally exited, please check your code and log.");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new InterpreterException("Fail to interpret python code", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) throws InterpreterException {
|
||||
ipythonClient.cancel(CancelRequest.newBuilder().build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) throws InterpreterException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InterpreterCompletion> completion(String buf, int cursor,
|
||||
InterpreterContext interpreterContext) {
|
||||
LOGGER.debug("Call completion for: " + buf + ", cursor: " + cursor);
|
||||
List<InterpreterCompletion> completions = new ArrayList<>();
|
||||
CompletionResponse response =
|
||||
ipythonClient.complete(
|
||||
CompletionRequest.getDefaultInstance().newBuilder().setCode(buf)
|
||||
.setCursor(cursor).build());
|
||||
for (int i = 0; i < response.getMatchesCount(); i++) {
|
||||
String match = response.getMatches(i);
|
||||
int lastIndexOfDot = match.lastIndexOf(".");
|
||||
if (lastIndexOfDot != -1) {
|
||||
match = match.substring(lastIndexOfDot + 1);
|
||||
}
|
||||
LOGGER.debug("Candidate completion: " + match);
|
||||
completions.add(new InterpreterCompletion(match, match, ""));
|
||||
}
|
||||
return completions;
|
||||
}
|
||||
|
||||
public BaseZeppelinContext getZeppelinContext() {
|
||||
return zeppelinContext;
|
||||
}
|
||||
|
||||
class IPythonProcessLauncher extends ProcessLauncher {
|
||||
|
||||
IPythonProcessLauncher(CommandLine commandLine,
|
||||
Map<String, String> envs) {
|
||||
super(commandLine, envs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForReady(int timeout) {
|
||||
// wait until IPython kernel is started or timeout
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (state == State.LAUNCHED) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("Interrupted by something", e);
|
||||
}
|
||||
|
||||
try {
|
||||
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
|
||||
if (response.getStatus() == IPythonStatus.RUNNING) {
|
||||
LOGGER.info("IPython Kernel is Running");
|
||||
onProcessRunning();
|
||||
break;
|
||||
} else {
|
||||
LOGGER.info("Wait for IPython Kernel to be started");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// ignore the exception, because is may happen when grpc server has not started yet.
|
||||
LOGGER.info("Wait for IPython Kernel to be started");
|
||||
}
|
||||
|
||||
if ((System.currentTimeMillis() - startTime) > timeout) {
|
||||
onTimeout();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ public class PythonInterpreter extends Interpreter {
|
|||
iPythonInterpreter = getIPythonInterpreter();
|
||||
if (getProperty("zeppelin.python.useIPython", "true").equals("true") &&
|
||||
StringUtils.isEmpty(
|
||||
iPythonInterpreter.checkIPythonPrerequisite(getPythonExec()))) {
|
||||
iPythonInterpreter.checkKernelPrerequisite(getPythonExec()))) {
|
||||
try {
|
||||
iPythonInterpreter.open();
|
||||
LOGGER.info("IPython is available, Use IPythonInterpreter to replace PythonInterpreter");
|
||||
|
|
|
|||
|
|
@ -384,7 +384,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
tearDown();
|
||||
|
||||
Properties properties = initIntpProperties();
|
||||
properties.setProperty("zeppelin.ipython.grpc.message_size", "4000");
|
||||
properties.setProperty("zeppelin.jupyter.kernel.grpc.message_size", "4000");
|
||||
|
||||
startInterpreter(properties);
|
||||
|
||||
|
|
@ -443,7 +443,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
Thread.sleep(3000);
|
||||
IPythonInterpreter iPythonInterpreter = (IPythonInterpreter)
|
||||
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
|
||||
iPythonInterpreter.getIPythonProcessLauncher().stop();
|
||||
iPythonInterpreter.getKernelProcessLauncher().stop();
|
||||
waiter.await(3000);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ public class IPySparkInterpreter extends IPythonInterpreter {
|
|||
!conf.get("spark.submit.deployMode").equals("cluster")) {
|
||||
setAdditionalPythonPath(PythonUtils.sparkPythonPath());
|
||||
}
|
||||
setAddBulitinPy4j(false);
|
||||
setUseBuiltinPy4j(false);
|
||||
setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
|
||||
setProperty("zeppelin.py4j.useAuth",
|
||||
sparkInterpreter.getSparkVersion().isSecretSocketSupported() + "");
|
||||
|
|
@ -67,8 +67,8 @@ public class IPySparkInterpreter extends IPythonInterpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> setupIPythonEnv() throws IOException {
|
||||
Map<String, String> env = super.setupIPythonEnv();
|
||||
protected Map<String, String> setupKernelEnv() throws IOException {
|
||||
Map<String, String> env = super.setupKernelEnv();
|
||||
// set PYSPARK_PYTHON
|
||||
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
|
||||
if (conf.contains("spark.pyspark.python")) {
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT {
|
|||
static String interpreterOptionPath = "";
|
||||
static String originalInterpreterOption = "";
|
||||
|
||||
static String cmdPsPython = "ps aux | grep 'zeppelin_ipython' | grep -v 'grep' | wc -l";
|
||||
static String cmdPsPython = "ps aux | grep 'kernel_server.py' | grep -v 'grep' | wc -l";
|
||||
static String cmdPsInterpreter = "ps aux | grep 'zeppelin/interpreter/python/*' |" +
|
||||
" sed -E '/grep|local-repo/d' | wc -l";
|
||||
|
||||
|
|
|
|||
191
zeppelin-jupyter-adapter/pom.xml
Normal file
191
zeppelin-jupyter-adapter/pom.xml
Normal file
|
|
@ -0,0 +1,191 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.9.0-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-jupyter-adapter</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.9.0-SNAPSHOT</version>
|
||||
<name>Zeppelin: Jupyter Adapter</name>
|
||||
|
||||
<properties>
|
||||
<interpreter.name>python</interpreter.name>
|
||||
<python.py4j.version>0.10.7</python.py4j.version>
|
||||
<grpc.version>1.15.0</grpc.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-exec</artifactId>
|
||||
<version>1.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.sf.py4j</groupId>
|
||||
<artifactId>py4j</artifactId>
|
||||
<version>${python.py4j.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.grpc</groupId>
|
||||
<artifactId>grpc-netty</artifactId>
|
||||
<version>${grpc.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.grpc</groupId>
|
||||
<artifactId>grpc-protobuf</artifactId>
|
||||
<version>${grpc.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.grpc</groupId>
|
||||
<artifactId>grpc-stub</artifactId>
|
||||
<version>${grpc.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>commons-lang</groupId>
|
||||
<artifactId>commons-lang</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>commons-codec</groupId>
|
||||
<artifactId>commons-codec</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>zeppelin-interpreter-api</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- test libraries -->
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.jodah</groupId>
|
||||
<artifactId>concurrentunit</artifactId>
|
||||
<version>0.4.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
<extensions>
|
||||
<extension>
|
||||
<groupId>kr.motd.maven</groupId>
|
||||
<artifactId>os-maven-plugin</artifactId>
|
||||
<version>1.4.1.Final</version>
|
||||
</extension>
|
||||
</extensions>
|
||||
|
||||
<plugins>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.xolstice.maven.plugins</groupId>
|
||||
<artifactId>protobuf-maven-plugin</artifactId>
|
||||
<version>0.5.0</version>
|
||||
<configuration>
|
||||
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact>
|
||||
<pluginId>grpc-java</pluginId>
|
||||
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}</pluginArtifact>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>compile</goal>
|
||||
<goal>compile-custom</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>run</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-resources-plugin</artifactId>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<configuration>
|
||||
<skip>false</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
|
|
@ -15,25 +15,25 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.python;
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.JupyterKernelGrpc;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.python.proto.CancelRequest;
|
||||
import org.apache.zeppelin.python.proto.CancelResponse;
|
||||
import org.apache.zeppelin.python.proto.CompletionRequest;
|
||||
import org.apache.zeppelin.python.proto.CompletionResponse;
|
||||
import org.apache.zeppelin.python.proto.ExecuteRequest;
|
||||
import org.apache.zeppelin.python.proto.ExecuteResponse;
|
||||
import org.apache.zeppelin.python.proto.ExecuteStatus;
|
||||
import org.apache.zeppelin.python.proto.IPythonGrpc;
|
||||
import org.apache.zeppelin.python.proto.OutputType;
|
||||
import org.apache.zeppelin.python.proto.StatusRequest;
|
||||
import org.apache.zeppelin.python.proto.StatusResponse;
|
||||
import org.apache.zeppelin.python.proto.StopRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.CancelRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.CancelResponse;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionResponse;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteStatus;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.OutputType;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.StatusRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.StatusResponse;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.StopRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -44,33 +44,33 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Grpc client for IPython kernel
|
||||
* Grpc client for Jupyter kernel
|
||||
*/
|
||||
public class IPythonClient {
|
||||
public class JupyterKernelClient {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonClient.class.getName());
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelClient.class.getName());
|
||||
|
||||
private final ManagedChannel channel;
|
||||
private final IPythonGrpc.IPythonBlockingStub blockingStub;
|
||||
private final IPythonGrpc.IPythonStub asyncStub;
|
||||
private volatile boolean maybeIPythonFailed = false;
|
||||
private final JupyterKernelGrpc.JupyterKernelBlockingStub blockingStub;
|
||||
private final JupyterKernelGrpc.JupyterKernelStub asyncStub;
|
||||
private volatile boolean maybeKernelFailed = false;
|
||||
|
||||
private SecureRandom random = new SecureRandom();
|
||||
|
||||
/**
|
||||
* Construct client for accessing RouteGuide server at {@code host:port}.
|
||||
*/
|
||||
public IPythonClient(String host, int port) {
|
||||
public JupyterKernelClient(String host, int port) {
|
||||
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true));
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct client for accessing RouteGuide server using the existing channel.
|
||||
*/
|
||||
public IPythonClient(ManagedChannelBuilder<?> channelBuilder) {
|
||||
public JupyterKernelClient(ManagedChannelBuilder<?> channelBuilder) {
|
||||
channel = channelBuilder.build();
|
||||
blockingStub = IPythonGrpc.newBlockingStub(channel);
|
||||
asyncStub = IPythonGrpc.newStub(channel);
|
||||
blockingStub = JupyterKernelGrpc.newBlockingStub(channel);
|
||||
asyncStub = JupyterKernelGrpc.newStub(channel);
|
||||
}
|
||||
|
||||
public void shutdown() throws InterruptedException {
|
||||
|
|
@ -84,7 +84,7 @@ public class IPythonClient {
|
|||
final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
|
||||
.setStatus(ExecuteStatus.SUCCESS);
|
||||
final AtomicBoolean completedFlag = new AtomicBoolean(false);
|
||||
maybeIPythonFailed = false;
|
||||
maybeKernelFailed = false;
|
||||
LOGGER.debug("stream_execute code:\n" + request.getCode());
|
||||
asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
|
||||
OutputType lastOutputType = null;
|
||||
|
|
@ -97,7 +97,7 @@ public class IPythonClient {
|
|||
case TEXT:
|
||||
try {
|
||||
if (executeResponse.getOutput().startsWith("%")) {
|
||||
// the output from ipython kernel maybe specify format already.
|
||||
// the output from jupyter kernel maybe specify format already.
|
||||
interpreterOutput.write((executeResponse.getOutput()).getBytes());
|
||||
} else {
|
||||
// only add %text when the previous output type is not TEXT.
|
||||
|
|
@ -154,7 +154,7 @@ public class IPythonClient {
|
|||
}
|
||||
LOGGER.error("Fail to call IPython grpc", throwable);
|
||||
finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
|
||||
maybeIPythonFailed = true;
|
||||
maybeKernelFailed = true;
|
||||
completedFlag.set(true);
|
||||
synchronized (completedFlag) {
|
||||
completedFlag.notify();
|
||||
|
|
@ -221,12 +221,12 @@ public class IPythonClient {
|
|||
asyncStub.stop(request, null);
|
||||
}
|
||||
|
||||
public boolean isMaybeIPythonFailed() {
|
||||
return maybeIPythonFailed;
|
||||
public boolean isMaybeKernelFailed() {
|
||||
return maybeKernelFailed;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
IPythonClient client = new IPythonClient("localhost", 50053);
|
||||
JupyterKernelClient client = new JupyterKernelClient("localhost", 50053);
|
||||
client.status(StatusRequest.newBuilder().build());
|
||||
|
||||
ExecuteResponse response = client.block_execute(ExecuteRequest.newBuilder().
|
||||
|
|
@ -0,0 +1,338 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.CancelRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionResponse;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.KernelStatus;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.StatusRequest;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.StatusResponse;
|
||||
import org.apache.zeppelin.interpreter.jupyter.proto.StopRequest;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.interpreter.util.ProcessLauncher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Jupyter Kernel adapter for Zeppelin. All the jupyter kernel could be used by Zeppelin
|
||||
* by extending this class.
|
||||
*/
|
||||
public abstract class JupyterKernelInterpreter extends Interpreter {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelInterpreter.class);
|
||||
|
||||
private JupyterKernelProcessLauncher jupyterKernelProcessLauncher;
|
||||
protected JupyterKernelClient jupyterKernelClient;
|
||||
|
||||
protected BaseZeppelinContext zeppelinContext;
|
||||
// working directory of jupyter kernel
|
||||
protected File kernelWorkDir;
|
||||
// python executable file for launching the jupyter kernel
|
||||
private String pythonExecutable;
|
||||
private int kernelLaunchTimeout;
|
||||
|
||||
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
|
||||
|
||||
public JupyterKernelInterpreter(Properties properties) {
|
||||
super(properties);
|
||||
}
|
||||
|
||||
public abstract String getKernelName();
|
||||
|
||||
public List<String> getRequiredPackages() {
|
||||
List<String> requiredPackages = new ArrayList<>();
|
||||
requiredPackages.add("jupyter-client");
|
||||
requiredPackages.add("grpcio");
|
||||
requiredPackages.add("protobuf");
|
||||
return requiredPackages;
|
||||
}
|
||||
|
||||
public abstract BaseZeppelinContext buildZeppelinContext();
|
||||
|
||||
@Override
|
||||
public void open() throws InterpreterException {
|
||||
try {
|
||||
if (jupyterKernelClient != null) {
|
||||
// JupyterKernelInterpreter might already been opened
|
||||
return;
|
||||
}
|
||||
pythonExecutable = getProperty("zeppelin.python", "python");
|
||||
LOGGER.info("Python Exec: " + pythonExecutable);
|
||||
String checkPrerequisiteResult = checkKernelPrerequisite(pythonExecutable);
|
||||
if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
|
||||
throw new InterpreterException("Kernel prerequisite is not meet: " +
|
||||
checkPrerequisiteResult);
|
||||
}
|
||||
kernelLaunchTimeout = Integer.parseInt(
|
||||
getProperty("zeppelin.jupyter.kernel.launch.timeout", "30000"));
|
||||
this.zeppelinContext = buildZeppelinContext();
|
||||
int kernelPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
int message_size = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size",
|
||||
32 * 1024 * 1024 + ""));
|
||||
jupyterKernelClient = new JupyterKernelClient(ManagedChannelBuilder.forAddress("127.0.0.1",
|
||||
kernelPort)
|
||||
.usePlaintext(true).maxInboundMessageSize(message_size));
|
||||
|
||||
launchJupyterKernel(kernelPort);
|
||||
} catch (Exception e) {
|
||||
throw new InterpreterException("Fail to open JupyterKernelInterpreter:\n" +
|
||||
ExceptionUtils.getStackTrace(e), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* non-empty return value mean the errors when checking kernel prerequisite.
|
||||
* empty value mean kernel prerequisite is met.
|
||||
*
|
||||
* @return check result of checking kernel prerequisite.
|
||||
*/
|
||||
public String checkKernelPrerequisite(String pythonExec) {
|
||||
ProcessBuilder processBuilder = new ProcessBuilder(pythonExec, "-m", "pip", "freeze");
|
||||
File stderrFile = null;
|
||||
File stdoutFile = null;
|
||||
try {
|
||||
stderrFile = File.createTempFile("zeppelin", ".txt");
|
||||
processBuilder.redirectError(stderrFile);
|
||||
stdoutFile = File.createTempFile("zeppelin", ".txt");
|
||||
processBuilder.redirectOutput(stdoutFile);
|
||||
|
||||
Process proc = processBuilder.start();
|
||||
int ret = proc.waitFor();
|
||||
if (ret != 0) {
|
||||
try (FileInputStream in = new FileInputStream(stderrFile)) {
|
||||
return "Fail to run pip freeze.\n" + IOUtils.toString(in);
|
||||
}
|
||||
}
|
||||
try (FileInputStream in = new FileInputStream(stdoutFile)) {
|
||||
String freezeOutput = IOUtils.toString(in);
|
||||
for (String packageName : getRequiredPackages()) {
|
||||
if (!freezeOutput.contains(packageName + "=")) {
|
||||
return packageName + " is not installed.";
|
||||
}
|
||||
}
|
||||
LOGGER.info("Prerequisite for kernel " + getKernelName() + " is met");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Fail to checkKernelPrerequisite", e);
|
||||
return "Fail to checkKernelPrerequisite: " + ExceptionUtils.getStackTrace(e);
|
||||
} finally {
|
||||
FileUtils.deleteQuietly(stderrFile);
|
||||
FileUtils.deleteQuietly(stdoutFile);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
private void launchJupyterKernel(int kernelPort)
|
||||
throws IOException {
|
||||
LOGGER.info("Launching Jupyter Kernel at port: " + kernelPort);
|
||||
// copy the python scripts to a temp directory, then launch jupyter kernel in that folder
|
||||
this.kernelWorkDir = Files.createTempDirectory(
|
||||
"zeppelin_jupyter_kernel_" + getKernelName()).toFile();
|
||||
String[] kernelScripts = {"kernel_server.py", "kernel_pb2.py", "kernel_pb2_grpc.py"};
|
||||
for (String kernelScript : kernelScripts) {
|
||||
URL url = getClass().getClassLoader().getResource("grpc/jupyter"
|
||||
+ "/" + kernelScript);
|
||||
FileUtils.copyURLToFile(url, new File(kernelWorkDir, kernelScript));
|
||||
}
|
||||
|
||||
CommandLine cmd = CommandLine.parse(pythonExecutable);
|
||||
cmd.addArgument(kernelWorkDir.getAbsolutePath() + "/kernel_server.py");
|
||||
cmd.addArgument(getKernelName());
|
||||
cmd.addArgument(kernelPort + "");
|
||||
|
||||
Map<String, String> envs = setupKernelEnv();
|
||||
jupyterKernelProcessLauncher = new JupyterKernelProcessLauncher(cmd, envs);
|
||||
jupyterKernelProcessLauncher.launch();
|
||||
jupyterKernelProcessLauncher.waitForReady(kernelLaunchTimeout);
|
||||
|
||||
if (jupyterKernelProcessLauncher.isLaunchTimeout()) {
|
||||
throw new IOException("Fail to launch Jupyter Kernel in " + kernelLaunchTimeout / 1000
|
||||
+ " seconds.\n" + jupyterKernelProcessLauncher.getErrorMessage());
|
||||
}
|
||||
if (!jupyterKernelProcessLauncher.isRunning()) {
|
||||
throw new IOException("Fail to launch Jupyter Kernel as the python process is failed.\n"
|
||||
+ jupyterKernelProcessLauncher.getErrorMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<String, String> setupKernelEnv() throws IOException {
|
||||
return EnvironmentUtils.getProcEnvironment();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public JupyterKernelProcessLauncher getKernelProcessLauncher() {
|
||||
return jupyterKernelProcessLauncher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws InterpreterException {
|
||||
if (jupyterKernelProcessLauncher != null) {
|
||||
LOGGER.info("Killing Jupyter Kernel Process");
|
||||
if (jupyterKernelProcessLauncher.isRunning()) {
|
||||
jupyterKernelClient.stop(StopRequest.newBuilder().build());
|
||||
try {
|
||||
jupyterKernelClient.shutdown();
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn("Exception happens when shutting down jupyter kernel client", e);
|
||||
}
|
||||
}
|
||||
jupyterKernelProcessLauncher.stop();
|
||||
jupyterKernelProcessLauncher = null;
|
||||
LOGGER.info("Jupyter Kernel is killed");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st,
|
||||
InterpreterContext context) throws InterpreterException {
|
||||
zeppelinContext.setGui(context.getGui());
|
||||
zeppelinContext.setNoteGui(context.getNoteGui());
|
||||
zeppelinContext.setInterpreterContext(context);
|
||||
interpreterOutput.setInterpreterOutput(context.out);
|
||||
try {
|
||||
ExecuteResponse response =
|
||||
jupyterKernelClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
|
||||
interpreterOutput);
|
||||
interpreterOutput.getInterpreterOutput().flush();
|
||||
// It is not known which method is called first (JupyterKernelClient.stream_execute
|
||||
// or onProcessFailed) when jupyter kernel process is exited. Because they are in
|
||||
// 2 different threads. So here we would check JupyterKernelClient's status and sleep 1 second
|
||||
// if jupyter kernel is maybe terminated.
|
||||
if (jupyterKernelProcessLauncher.isRunning() && !jupyterKernelClient.isMaybeKernelFailed()) {
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.valueOf(response.getStatus().name()));
|
||||
} else {
|
||||
if (jupyterKernelClient.isMaybeKernelFailed()) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
if (jupyterKernelProcessLauncher.isRunning()) {
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.valueOf(response.getStatus().name()));
|
||||
} else {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"IPython kernel is abnormally exited, please check your code and log.");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new InterpreterException("Fail to interpret python code", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) throws InterpreterException {
|
||||
jupyterKernelClient.cancel(CancelRequest.newBuilder().build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) throws InterpreterException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InterpreterCompletion> completion(String buf, int cursor,
|
||||
InterpreterContext interpreterContext) {
|
||||
LOGGER.debug("Call completion for: " + buf + ", cursor: " + cursor);
|
||||
List<InterpreterCompletion> completions = new ArrayList<>();
|
||||
CompletionResponse response =
|
||||
jupyterKernelClient.complete(
|
||||
CompletionRequest.getDefaultInstance().newBuilder().setCode(buf)
|
||||
.setCursor(cursor).build());
|
||||
for (int i = 0; i < response.getMatchesCount(); i++) {
|
||||
String match = response.getMatches(i);
|
||||
int lastIndexOfDot = match.lastIndexOf(".");
|
||||
if (lastIndexOfDot != -1) {
|
||||
match = match.substring(lastIndexOfDot + 1);
|
||||
}
|
||||
LOGGER.debug("Candidate completion: " + match);
|
||||
completions.add(new InterpreterCompletion(match, match, ""));
|
||||
}
|
||||
return completions;
|
||||
}
|
||||
|
||||
public BaseZeppelinContext getZeppelinContext() {
|
||||
return zeppelinContext;
|
||||
}
|
||||
|
||||
public class JupyterKernelProcessLauncher extends ProcessLauncher {
|
||||
|
||||
JupyterKernelProcessLauncher(CommandLine commandLine,
|
||||
Map<String, String> envs) {
|
||||
super(commandLine, envs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForReady(int timeout) {
|
||||
// wait until jupyter kernel is started or timeout
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (state == State.LAUNCHED) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("Interrupted by something", e);
|
||||
}
|
||||
|
||||
try {
|
||||
StatusResponse response = jupyterKernelClient.status(StatusRequest.newBuilder().build());
|
||||
if (response.getStatus() == KernelStatus.RUNNING) {
|
||||
LOGGER.info("Jupyter Kernel is Running");
|
||||
onProcessRunning();
|
||||
break;
|
||||
} else {
|
||||
LOGGER.info("Wait for Jupyter Kernel to be started");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// ignore the exception, because is may happen when grpc server has not started yet.
|
||||
LOGGER.info("Wait for Jupyter Kernel to be started");
|
||||
}
|
||||
|
||||
if ((System.currentTimeMillis() - startTime) > timeout) {
|
||||
onTimeout();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
108
zeppelin-jupyter-adapter/src/main/proto/kernel.proto
Normal file
108
zeppelin-jupyter-adapter/src/main/proto/kernel.proto
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
option java_multiple_files = true;
|
||||
option java_package = "org.apache.zeppelin.interpreter.jupyter.proto";
|
||||
option java_outer_classname = "JupyterKernelProto";
|
||||
option objc_class_prefix = "JupyterKernel";
|
||||
|
||||
package jupyter;
|
||||
|
||||
// The JupyterKernel service definition.
|
||||
service JupyterKernel {
|
||||
// Sends code
|
||||
rpc execute (ExecuteRequest) returns (stream ExecuteResponse) {}
|
||||
|
||||
// Get completion
|
||||
rpc complete (CompletionRequest) returns (CompletionResponse) {}
|
||||
|
||||
// Cancel the running statement
|
||||
rpc cancel (CancelRequest) returns (CancelResponse) {}
|
||||
|
||||
// Get jupyter kernel status
|
||||
rpc status (StatusRequest) returns (StatusResponse) {}
|
||||
|
||||
// Stop jupyter kernel
|
||||
rpc stop(StopRequest) returns (StopResponse) {}
|
||||
}
|
||||
|
||||
enum ExecuteStatus {
|
||||
SUCCESS = 0;
|
||||
ERROR = 1;
|
||||
}
|
||||
|
||||
enum KernelStatus {
|
||||
STARTING = 0;
|
||||
RUNNING = 1;
|
||||
}
|
||||
|
||||
enum OutputType {
|
||||
TEXT = 0;
|
||||
PNG = 1;
|
||||
JPEG = 2;
|
||||
HTML = 3;
|
||||
SVG = 4;
|
||||
JSON = 5;
|
||||
LaTeX = 6;
|
||||
}
|
||||
|
||||
// The request message containing the code
|
||||
message ExecuteRequest {
|
||||
string code = 1;
|
||||
}
|
||||
|
||||
// The response message containing the execution result.
|
||||
message ExecuteResponse {
|
||||
ExecuteStatus status = 1;
|
||||
OutputType type = 2;
|
||||
string output = 3;
|
||||
}
|
||||
|
||||
message CancelRequest {
|
||||
|
||||
}
|
||||
|
||||
message CancelResponse {
|
||||
|
||||
}
|
||||
|
||||
message CompletionRequest {
|
||||
string code = 1;
|
||||
int32 cursor = 2;
|
||||
}
|
||||
|
||||
message CompletionResponse {
|
||||
repeated string matches = 1;
|
||||
}
|
||||
|
||||
message StatusRequest {
|
||||
|
||||
}
|
||||
|
||||
message StatusResponse {
|
||||
KernelStatus status = 1;
|
||||
}
|
||||
|
||||
message StopRequest {
|
||||
|
||||
}
|
||||
|
||||
message StopResponse {
|
||||
|
||||
}
|
||||
|
|
@ -15,4 +15,4 @@
|
|||
|
||||
#!/usr/bin/env bash
|
||||
|
||||
python -m grpc_tools.protoc -I../../proto --python_out=python --grpc_python_out=python ../../proto/ipython.proto
|
||||
python -m grpc_tools.protoc -I../../proto --python_out=jupyter --grpc_python_out=jupyter ../../proto/kernel.proto
|
||||
|
|
@ -16,19 +16,19 @@
|
|||
|
||||
import grpc
|
||||
|
||||
import ipython_pb2
|
||||
import ipython_pb2_grpc
|
||||
import kernel_pb2
|
||||
import kernel_pb2_grpc
|
||||
|
||||
|
||||
def run():
|
||||
channel = grpc.insecure_channel('localhost:50053')
|
||||
stub = ipython_pb2_grpc.IPythonStub(channel)
|
||||
response = stub.execute(ipython_pb2.ExecuteRequest(code="import time\nfor i in range(1,4):\n\ttime.sleep(1)\n\tprint(i)\n" +
|
||||
stub = kernel_pb2_grpc.JupyterKernelStub(channel)
|
||||
response = stub.execute(kernel_pb2.ExecuteRequest(code="import time\nfor i in range(1,4):\n\ttime.sleep(1)\n\tprint(i)\n" +
|
||||
"%matplotlib inline\nimport matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)"))
|
||||
for r in response:
|
||||
print("output:" + r.output)
|
||||
|
||||
response = stub.execute(ipython_pb2.ExecuteRequest(code="range?"))
|
||||
response = stub.execute(kernel_pb2.ExecuteRequest(code="range?"))
|
||||
for r in response:
|
||||
print(r)
|
||||
|
||||
|
|
@ -13,8 +13,9 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: ipython.proto
|
||||
# source: kernel.proto
|
||||
|
||||
import sys
|
||||
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
|
||||
|
|
@ -31,16 +32,16 @@ _sym_db = _symbol_database.Default()
|
|||
|
||||
|
||||
DESCRIPTOR = _descriptor.FileDescriptor(
|
||||
name='ipython.proto',
|
||||
package='ipython',
|
||||
name='kernel.proto',
|
||||
package='jupyter',
|
||||
syntax='proto3',
|
||||
serialized_options=_b('\n org.apache.zeppelin.python.protoB\014IPythonProtoP\001\242\002\007IPython'),
|
||||
serialized_pb=_b('\n\ripython.proto\x12\x07ipython\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.ipython.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionResponse\x12\x0f\n\x07matches\x18\x01 \x03(\t\"\x0f\n\rStatusRequest\"8\n\x0eStatusResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.IPythonStatus\"\r\n\x0bStopRequest\"\x0e\n\x0cStopResponse*\'\n\rExecuteStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01**\n\rIPythonStatus\x12\x0c\n\x08STARTING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01*Q\n\nOutputType\x12\x08\n\x04TEXT\x10\x00\x12\x07\n\x03PNG\x10\x01\x12\x08\n\x04JPEG\x10\x02\x12\x08\n\x04HTML\x10\x03\x12\x07\n\x03SVG\x10\x04\x12\x08\n\x04JSON\x10\x05\x12\t\n\x05LaTeX\x10\x06\x32\xc3\x02\n\x07IPython\x12@\n\x07\x65xecute\x12\x17.ipython.ExecuteRequest\x1a\x18.ipython.ExecuteResponse\"\x00\x30\x01\x12\x45\n\x08\x63omplete\x12\x1a.ipython.CompletionRequest\x1a\x1b.ipython.CompletionResponse\"\x00\x12;\n\x06\x63\x61ncel\x12\x16.ipython.CancelRequest\x1a\x17.ipython.CancelResponse\"\x00\x12;\n\x06status\x12\x16.ipython.StatusRequest\x1a\x17.ipython.StatusResponse\"\x00\x12\x35\n\x04stop\x12\x14.ipython.StopRequest\x1a\x15.ipython.StopResponse\"\x00\x42<\n org.apache.zeppelin.python.protoB\x0cIPythonProtoP\x01\xa2\x02\x07IPythonb\x06proto3')
|
||||
serialized_options=_b('\n-org.apache.zeppelin.interpreter.jupyter.protoB\022JupyterKernelProtoP\001\242\002\rJupyterKernel'),
|
||||
serialized_pb=_b('\n\x0ckernel.proto\x12\x07jupyter\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.jupyter.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.jupyter.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionResponse\x12\x0f\n\x07matches\x18\x01 \x03(\t\"\x0f\n\rStatusRequest\"7\n\x0eStatusResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.jupyter.KernelStatus\"\r\n\x0bStopRequest\"\x0e\n\x0cStopResponse*\'\n\rExecuteStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01*)\n\x0cKernelStatus\x12\x0c\n\x08STARTING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01*Q\n\nOutputType\x12\x08\n\x04TEXT\x10\x00\x12\x07\n\x03PNG\x10\x01\x12\x08\n\x04JPEG\x10\x02\x12\x08\n\x04HTML\x10\x03\x12\x07\n\x03SVG\x10\x04\x12\x08\n\x04JSON\x10\x05\x12\t\n\x05LaTeX\x10\x06\x32\xc9\x02\n\rJupyterKernel\x12@\n\x07\x65xecute\x12\x17.jupyter.ExecuteRequest\x1a\x18.jupyter.ExecuteResponse\"\x00\x30\x01\x12\x45\n\x08\x63omplete\x12\x1a.jupyter.CompletionRequest\x1a\x1b.jupyter.CompletionResponse\"\x00\x12;\n\x06\x63\x61ncel\x12\x16.jupyter.CancelRequest\x1a\x17.jupyter.CancelResponse\"\x00\x12;\n\x06status\x12\x16.jupyter.StatusRequest\x1a\x17.jupyter.StatusResponse\"\x00\x12\x35\n\x04stop\x12\x14.jupyter.StopRequest\x1a\x15.jupyter.StopResponse\"\x00\x42U\n-org.apache.zeppelin.interpreter.jupyter.protoB\x12JupyterKernelProtoP\x01\xa2\x02\rJupyterKernelb\x06proto3')
|
||||
)
|
||||
|
||||
_EXECUTESTATUS = _descriptor.EnumDescriptor(
|
||||
name='ExecuteStatus',
|
||||
full_name='ipython.ExecuteStatus',
|
||||
full_name='jupyter.ExecuteStatus',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
values=[
|
||||
|
|
@ -55,15 +56,15 @@ _EXECUTESTATUS = _descriptor.EnumDescriptor(
|
|||
],
|
||||
containing_type=None,
|
||||
serialized_options=None,
|
||||
serialized_start=399,
|
||||
serialized_end=438,
|
||||
serialized_start=397,
|
||||
serialized_end=436,
|
||||
)
|
||||
_sym_db.RegisterEnumDescriptor(_EXECUTESTATUS)
|
||||
|
||||
ExecuteStatus = enum_type_wrapper.EnumTypeWrapper(_EXECUTESTATUS)
|
||||
_IPYTHONSTATUS = _descriptor.EnumDescriptor(
|
||||
name='IPythonStatus',
|
||||
full_name='ipython.IPythonStatus',
|
||||
_KERNELSTATUS = _descriptor.EnumDescriptor(
|
||||
name='KernelStatus',
|
||||
full_name='jupyter.KernelStatus',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
values=[
|
||||
|
|
@ -78,15 +79,15 @@ _IPYTHONSTATUS = _descriptor.EnumDescriptor(
|
|||
],
|
||||
containing_type=None,
|
||||
serialized_options=None,
|
||||
serialized_start=440,
|
||||
serialized_end=482,
|
||||
serialized_start=438,
|
||||
serialized_end=479,
|
||||
)
|
||||
_sym_db.RegisterEnumDescriptor(_IPYTHONSTATUS)
|
||||
_sym_db.RegisterEnumDescriptor(_KERNELSTATUS)
|
||||
|
||||
IPythonStatus = enum_type_wrapper.EnumTypeWrapper(_IPYTHONSTATUS)
|
||||
KernelStatus = enum_type_wrapper.EnumTypeWrapper(_KERNELSTATUS)
|
||||
_OUTPUTTYPE = _descriptor.EnumDescriptor(
|
||||
name='OutputType',
|
||||
full_name='ipython.OutputType',
|
||||
full_name='jupyter.OutputType',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
values=[
|
||||
|
|
@ -121,8 +122,8 @@ _OUTPUTTYPE = _descriptor.EnumDescriptor(
|
|||
],
|
||||
containing_type=None,
|
||||
serialized_options=None,
|
||||
serialized_start=484,
|
||||
serialized_end=565,
|
||||
serialized_start=481,
|
||||
serialized_end=562,
|
||||
)
|
||||
_sym_db.RegisterEnumDescriptor(_OUTPUTTYPE)
|
||||
|
||||
|
|
@ -143,13 +144,13 @@ LaTeX = 6
|
|||
|
||||
_EXECUTEREQUEST = _descriptor.Descriptor(
|
||||
name='ExecuteRequest',
|
||||
full_name='ipython.ExecuteRequest',
|
||||
full_name='jupyter.ExecuteRequest',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
fields=[
|
||||
_descriptor.FieldDescriptor(
|
||||
name='code', full_name='ipython.ExecuteRequest.code', index=0,
|
||||
name='code', full_name='jupyter.ExecuteRequest.code', index=0,
|
||||
number=1, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
|
|
@ -167,34 +168,34 @@ _EXECUTEREQUEST = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=26,
|
||||
serialized_end=56,
|
||||
serialized_start=25,
|
||||
serialized_end=55,
|
||||
)
|
||||
|
||||
|
||||
_EXECUTERESPONSE = _descriptor.Descriptor(
|
||||
name='ExecuteResponse',
|
||||
full_name='ipython.ExecuteResponse',
|
||||
full_name='jupyter.ExecuteResponse',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
fields=[
|
||||
_descriptor.FieldDescriptor(
|
||||
name='status', full_name='ipython.ExecuteResponse.status', index=0,
|
||||
name='status', full_name='jupyter.ExecuteResponse.status', index=0,
|
||||
number=1, type=14, cpp_type=8, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
serialized_options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='type', full_name='ipython.ExecuteResponse.type', index=1,
|
||||
name='type', full_name='jupyter.ExecuteResponse.type', index=1,
|
||||
number=2, type=14, cpp_type=8, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
serialized_options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='output', full_name='ipython.ExecuteResponse.output', index=2,
|
||||
name='output', full_name='jupyter.ExecuteResponse.output', index=2,
|
||||
number=3, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
|
|
@ -212,14 +213,14 @@ _EXECUTERESPONSE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=58,
|
||||
serialized_end=166,
|
||||
serialized_start=57,
|
||||
serialized_end=165,
|
||||
)
|
||||
|
||||
|
||||
_CANCELREQUEST = _descriptor.Descriptor(
|
||||
name='CancelRequest',
|
||||
full_name='ipython.CancelRequest',
|
||||
full_name='jupyter.CancelRequest',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
|
|
@ -236,14 +237,14 @@ _CANCELREQUEST = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=168,
|
||||
serialized_end=183,
|
||||
serialized_start=167,
|
||||
serialized_end=182,
|
||||
)
|
||||
|
||||
|
||||
_CANCELRESPONSE = _descriptor.Descriptor(
|
||||
name='CancelResponse',
|
||||
full_name='ipython.CancelResponse',
|
||||
full_name='jupyter.CancelResponse',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
|
|
@ -260,27 +261,27 @@ _CANCELRESPONSE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=185,
|
||||
serialized_end=201,
|
||||
serialized_start=184,
|
||||
serialized_end=200,
|
||||
)
|
||||
|
||||
|
||||
_COMPLETIONREQUEST = _descriptor.Descriptor(
|
||||
name='CompletionRequest',
|
||||
full_name='ipython.CompletionRequest',
|
||||
full_name='jupyter.CompletionRequest',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
fields=[
|
||||
_descriptor.FieldDescriptor(
|
||||
name='code', full_name='ipython.CompletionRequest.code', index=0,
|
||||
name='code', full_name='jupyter.CompletionRequest.code', index=0,
|
||||
number=1, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
serialized_options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='cursor', full_name='ipython.CompletionRequest.cursor', index=1,
|
||||
name='cursor', full_name='jupyter.CompletionRequest.cursor', index=1,
|
||||
number=2, type=5, cpp_type=1, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
|
|
@ -298,20 +299,20 @@ _COMPLETIONREQUEST = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=203,
|
||||
serialized_end=252,
|
||||
serialized_start=202,
|
||||
serialized_end=251,
|
||||
)
|
||||
|
||||
|
||||
_COMPLETIONRESPONSE = _descriptor.Descriptor(
|
||||
name='CompletionResponse',
|
||||
full_name='ipython.CompletionResponse',
|
||||
full_name='jupyter.CompletionResponse',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
fields=[
|
||||
_descriptor.FieldDescriptor(
|
||||
name='matches', full_name='ipython.CompletionResponse.matches', index=0,
|
||||
name='matches', full_name='jupyter.CompletionResponse.matches', index=0,
|
||||
number=1, type=9, cpp_type=9, label=3,
|
||||
has_default_value=False, default_value=[],
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
|
|
@ -329,14 +330,14 @@ _COMPLETIONRESPONSE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=254,
|
||||
serialized_end=291,
|
||||
serialized_start=253,
|
||||
serialized_end=290,
|
||||
)
|
||||
|
||||
|
||||
_STATUSREQUEST = _descriptor.Descriptor(
|
||||
name='StatusRequest',
|
||||
full_name='ipython.StatusRequest',
|
||||
full_name='jupyter.StatusRequest',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
|
|
@ -353,20 +354,20 @@ _STATUSREQUEST = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=293,
|
||||
serialized_end=308,
|
||||
serialized_start=292,
|
||||
serialized_end=307,
|
||||
)
|
||||
|
||||
|
||||
_STATUSRESPONSE = _descriptor.Descriptor(
|
||||
name='StatusResponse',
|
||||
full_name='ipython.StatusResponse',
|
||||
full_name='jupyter.StatusResponse',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
fields=[
|
||||
_descriptor.FieldDescriptor(
|
||||
name='status', full_name='ipython.StatusResponse.status', index=0,
|
||||
name='status', full_name='jupyter.StatusResponse.status', index=0,
|
||||
number=1, type=14, cpp_type=8, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
|
|
@ -384,14 +385,14 @@ _STATUSRESPONSE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=310,
|
||||
serialized_end=366,
|
||||
serialized_start=309,
|
||||
serialized_end=364,
|
||||
)
|
||||
|
||||
|
||||
_STOPREQUEST = _descriptor.Descriptor(
|
||||
name='StopRequest',
|
||||
full_name='ipython.StopRequest',
|
||||
full_name='jupyter.StopRequest',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
|
|
@ -408,14 +409,14 @@ _STOPREQUEST = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=368,
|
||||
serialized_end=381,
|
||||
serialized_start=366,
|
||||
serialized_end=379,
|
||||
)
|
||||
|
||||
|
||||
_STOPRESPONSE = _descriptor.Descriptor(
|
||||
name='StopResponse',
|
||||
full_name='ipython.StopResponse',
|
||||
full_name='jupyter.StopResponse',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
|
|
@ -432,13 +433,13 @@ _STOPRESPONSE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=383,
|
||||
serialized_end=397,
|
||||
serialized_start=381,
|
||||
serialized_end=395,
|
||||
)
|
||||
|
||||
_EXECUTERESPONSE.fields_by_name['status'].enum_type = _EXECUTESTATUS
|
||||
_EXECUTERESPONSE.fields_by_name['type'].enum_type = _OUTPUTTYPE
|
||||
_STATUSRESPONSE.fields_by_name['status'].enum_type = _IPYTHONSTATUS
|
||||
_STATUSRESPONSE.fields_by_name['status'].enum_type = _KERNELSTATUS
|
||||
DESCRIPTOR.message_types_by_name['ExecuteRequest'] = _EXECUTEREQUEST
|
||||
DESCRIPTOR.message_types_by_name['ExecuteResponse'] = _EXECUTERESPONSE
|
||||
DESCRIPTOR.message_types_by_name['CancelRequest'] = _CANCELREQUEST
|
||||
|
|
@ -450,95 +451,95 @@ DESCRIPTOR.message_types_by_name['StatusResponse'] = _STATUSRESPONSE
|
|||
DESCRIPTOR.message_types_by_name['StopRequest'] = _STOPREQUEST
|
||||
DESCRIPTOR.message_types_by_name['StopResponse'] = _STOPRESPONSE
|
||||
DESCRIPTOR.enum_types_by_name['ExecuteStatus'] = _EXECUTESTATUS
|
||||
DESCRIPTOR.enum_types_by_name['IPythonStatus'] = _IPYTHONSTATUS
|
||||
DESCRIPTOR.enum_types_by_name['KernelStatus'] = _KERNELSTATUS
|
||||
DESCRIPTOR.enum_types_by_name['OutputType'] = _OUTPUTTYPE
|
||||
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
|
||||
|
||||
ExecuteRequest = _reflection.GeneratedProtocolMessageType('ExecuteRequest', (_message.Message,), dict(
|
||||
DESCRIPTOR = _EXECUTEREQUEST,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.ExecuteRequest)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.ExecuteRequest)
|
||||
))
|
||||
_sym_db.RegisterMessage(ExecuteRequest)
|
||||
|
||||
ExecuteResponse = _reflection.GeneratedProtocolMessageType('ExecuteResponse', (_message.Message,), dict(
|
||||
DESCRIPTOR = _EXECUTERESPONSE,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.ExecuteResponse)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.ExecuteResponse)
|
||||
))
|
||||
_sym_db.RegisterMessage(ExecuteResponse)
|
||||
|
||||
CancelRequest = _reflection.GeneratedProtocolMessageType('CancelRequest', (_message.Message,), dict(
|
||||
DESCRIPTOR = _CANCELREQUEST,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.CancelRequest)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.CancelRequest)
|
||||
))
|
||||
_sym_db.RegisterMessage(CancelRequest)
|
||||
|
||||
CancelResponse = _reflection.GeneratedProtocolMessageType('CancelResponse', (_message.Message,), dict(
|
||||
DESCRIPTOR = _CANCELRESPONSE,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.CancelResponse)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.CancelResponse)
|
||||
))
|
||||
_sym_db.RegisterMessage(CancelResponse)
|
||||
|
||||
CompletionRequest = _reflection.GeneratedProtocolMessageType('CompletionRequest', (_message.Message,), dict(
|
||||
DESCRIPTOR = _COMPLETIONREQUEST,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.CompletionRequest)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.CompletionRequest)
|
||||
))
|
||||
_sym_db.RegisterMessage(CompletionRequest)
|
||||
|
||||
CompletionResponse = _reflection.GeneratedProtocolMessageType('CompletionResponse', (_message.Message,), dict(
|
||||
DESCRIPTOR = _COMPLETIONRESPONSE,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.CompletionResponse)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.CompletionResponse)
|
||||
))
|
||||
_sym_db.RegisterMessage(CompletionResponse)
|
||||
|
||||
StatusRequest = _reflection.GeneratedProtocolMessageType('StatusRequest', (_message.Message,), dict(
|
||||
DESCRIPTOR = _STATUSREQUEST,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.StatusRequest)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.StatusRequest)
|
||||
))
|
||||
_sym_db.RegisterMessage(StatusRequest)
|
||||
|
||||
StatusResponse = _reflection.GeneratedProtocolMessageType('StatusResponse', (_message.Message,), dict(
|
||||
DESCRIPTOR = _STATUSRESPONSE,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.StatusResponse)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.StatusResponse)
|
||||
))
|
||||
_sym_db.RegisterMessage(StatusResponse)
|
||||
|
||||
StopRequest = _reflection.GeneratedProtocolMessageType('StopRequest', (_message.Message,), dict(
|
||||
DESCRIPTOR = _STOPREQUEST,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.StopRequest)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.StopRequest)
|
||||
))
|
||||
_sym_db.RegisterMessage(StopRequest)
|
||||
|
||||
StopResponse = _reflection.GeneratedProtocolMessageType('StopResponse', (_message.Message,), dict(
|
||||
DESCRIPTOR = _STOPRESPONSE,
|
||||
__module__ = 'ipython_pb2'
|
||||
# @@protoc_insertion_point(class_scope:ipython.StopResponse)
|
||||
__module__ = 'kernel_pb2'
|
||||
# @@protoc_insertion_point(class_scope:jupyter.StopResponse)
|
||||
))
|
||||
_sym_db.RegisterMessage(StopResponse)
|
||||
|
||||
|
||||
DESCRIPTOR._options = None
|
||||
|
||||
_IPYTHON = _descriptor.ServiceDescriptor(
|
||||
name='IPython',
|
||||
full_name='ipython.IPython',
|
||||
_JUPYTERKERNEL = _descriptor.ServiceDescriptor(
|
||||
name='JupyterKernel',
|
||||
full_name='jupyter.JupyterKernel',
|
||||
file=DESCRIPTOR,
|
||||
index=0,
|
||||
serialized_options=None,
|
||||
serialized_start=568,
|
||||
serialized_end=891,
|
||||
serialized_start=565,
|
||||
serialized_end=894,
|
||||
methods=[
|
||||
_descriptor.MethodDescriptor(
|
||||
name='execute',
|
||||
full_name='ipython.IPython.execute',
|
||||
full_name='jupyter.JupyterKernel.execute',
|
||||
index=0,
|
||||
containing_service=None,
|
||||
input_type=_EXECUTEREQUEST,
|
||||
|
|
@ -547,7 +548,7 @@ _IPYTHON = _descriptor.ServiceDescriptor(
|
|||
),
|
||||
_descriptor.MethodDescriptor(
|
||||
name='complete',
|
||||
full_name='ipython.IPython.complete',
|
||||
full_name='jupyter.JupyterKernel.complete',
|
||||
index=1,
|
||||
containing_service=None,
|
||||
input_type=_COMPLETIONREQUEST,
|
||||
|
|
@ -556,7 +557,7 @@ _IPYTHON = _descriptor.ServiceDescriptor(
|
|||
),
|
||||
_descriptor.MethodDescriptor(
|
||||
name='cancel',
|
||||
full_name='ipython.IPython.cancel',
|
||||
full_name='jupyter.JupyterKernel.cancel',
|
||||
index=2,
|
||||
containing_service=None,
|
||||
input_type=_CANCELREQUEST,
|
||||
|
|
@ -565,7 +566,7 @@ _IPYTHON = _descriptor.ServiceDescriptor(
|
|||
),
|
||||
_descriptor.MethodDescriptor(
|
||||
name='status',
|
||||
full_name='ipython.IPython.status',
|
||||
full_name='jupyter.JupyterKernel.status',
|
||||
index=3,
|
||||
containing_service=None,
|
||||
input_type=_STATUSREQUEST,
|
||||
|
|
@ -574,7 +575,7 @@ _IPYTHON = _descriptor.ServiceDescriptor(
|
|||
),
|
||||
_descriptor.MethodDescriptor(
|
||||
name='stop',
|
||||
full_name='ipython.IPython.stop',
|
||||
full_name='jupyter.JupyterKernel.stop',
|
||||
index=4,
|
||||
containing_service=None,
|
||||
input_type=_STOPREQUEST,
|
||||
|
|
@ -582,8 +583,8 @@ _IPYTHON = _descriptor.ServiceDescriptor(
|
|||
serialized_options=None,
|
||||
),
|
||||
])
|
||||
_sym_db.RegisterServiceDescriptor(_IPYTHON)
|
||||
_sym_db.RegisterServiceDescriptor(_JUPYTERKERNEL)
|
||||
|
||||
DESCRIPTOR.services_by_name['IPython'] = _IPYTHON
|
||||
DESCRIPTOR.services_by_name['JupyterKernel'] = _JUPYTERKERNEL
|
||||
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
|
|
@ -13,14 +13,15 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
|
||||
import grpc
|
||||
|
||||
import ipython_pb2 as ipython__pb2
|
||||
import kernel_pb2 as kernel__pb2
|
||||
|
||||
|
||||
class IPythonStub(object):
|
||||
"""The IPython service definition.
|
||||
class JupyterKernelStub(object):
|
||||
"""The JupyterKernel service definition.
|
||||
"""
|
||||
|
||||
def __init__(self, channel):
|
||||
|
|
@ -30,34 +31,34 @@ class IPythonStub(object):
|
|||
channel: A grpc.Channel.
|
||||
"""
|
||||
self.execute = channel.unary_stream(
|
||||
'/ipython.IPython/execute',
|
||||
request_serializer=ipython__pb2.ExecuteRequest.SerializeToString,
|
||||
response_deserializer=ipython__pb2.ExecuteResponse.FromString,
|
||||
'/jupyter.JupyterKernel/execute',
|
||||
request_serializer=kernel__pb2.ExecuteRequest.SerializeToString,
|
||||
response_deserializer=kernel__pb2.ExecuteResponse.FromString,
|
||||
)
|
||||
self.complete = channel.unary_unary(
|
||||
'/ipython.IPython/complete',
|
||||
request_serializer=ipython__pb2.CompletionRequest.SerializeToString,
|
||||
response_deserializer=ipython__pb2.CompletionResponse.FromString,
|
||||
'/jupyter.JupyterKernel/complete',
|
||||
request_serializer=kernel__pb2.CompletionRequest.SerializeToString,
|
||||
response_deserializer=kernel__pb2.CompletionResponse.FromString,
|
||||
)
|
||||
self.cancel = channel.unary_unary(
|
||||
'/ipython.IPython/cancel',
|
||||
request_serializer=ipython__pb2.CancelRequest.SerializeToString,
|
||||
response_deserializer=ipython__pb2.CancelResponse.FromString,
|
||||
'/jupyter.JupyterKernel/cancel',
|
||||
request_serializer=kernel__pb2.CancelRequest.SerializeToString,
|
||||
response_deserializer=kernel__pb2.CancelResponse.FromString,
|
||||
)
|
||||
self.status = channel.unary_unary(
|
||||
'/ipython.IPython/status',
|
||||
request_serializer=ipython__pb2.StatusRequest.SerializeToString,
|
||||
response_deserializer=ipython__pb2.StatusResponse.FromString,
|
||||
'/jupyter.JupyterKernel/status',
|
||||
request_serializer=kernel__pb2.StatusRequest.SerializeToString,
|
||||
response_deserializer=kernel__pb2.StatusResponse.FromString,
|
||||
)
|
||||
self.stop = channel.unary_unary(
|
||||
'/ipython.IPython/stop',
|
||||
request_serializer=ipython__pb2.StopRequest.SerializeToString,
|
||||
response_deserializer=ipython__pb2.StopResponse.FromString,
|
||||
'/jupyter.JupyterKernel/stop',
|
||||
request_serializer=kernel__pb2.StopRequest.SerializeToString,
|
||||
response_deserializer=kernel__pb2.StopResponse.FromString,
|
||||
)
|
||||
|
||||
|
||||
class IPythonServicer(object):
|
||||
"""The IPython service definition.
|
||||
class JupyterKernelServicer(object):
|
||||
"""The JupyterKernel service definition.
|
||||
"""
|
||||
|
||||
def execute(self, request, context):
|
||||
|
|
@ -82,48 +83,48 @@ class IPythonServicer(object):
|
|||
raise NotImplementedError('Method not implemented!')
|
||||
|
||||
def status(self, request, context):
|
||||
"""Get ipython kernel status
|
||||
"""Get jupyter kernel status
|
||||
"""
|
||||
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
||||
context.set_details('Method not implemented!')
|
||||
raise NotImplementedError('Method not implemented!')
|
||||
|
||||
def stop(self, request, context):
|
||||
# missing associated documentation comment in .proto file
|
||||
pass
|
||||
"""Stop jupyter kernel
|
||||
"""
|
||||
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
||||
context.set_details('Method not implemented!')
|
||||
raise NotImplementedError('Method not implemented!')
|
||||
|
||||
|
||||
def add_IPythonServicer_to_server(servicer, server):
|
||||
def add_JupyterKernelServicer_to_server(servicer, server):
|
||||
rpc_method_handlers = {
|
||||
'execute': grpc.unary_stream_rpc_method_handler(
|
||||
servicer.execute,
|
||||
request_deserializer=ipython__pb2.ExecuteRequest.FromString,
|
||||
response_serializer=ipython__pb2.ExecuteResponse.SerializeToString,
|
||||
request_deserializer=kernel__pb2.ExecuteRequest.FromString,
|
||||
response_serializer=kernel__pb2.ExecuteResponse.SerializeToString,
|
||||
),
|
||||
'complete': grpc.unary_unary_rpc_method_handler(
|
||||
servicer.complete,
|
||||
request_deserializer=ipython__pb2.CompletionRequest.FromString,
|
||||
response_serializer=ipython__pb2.CompletionResponse.SerializeToString,
|
||||
request_deserializer=kernel__pb2.CompletionRequest.FromString,
|
||||
response_serializer=kernel__pb2.CompletionResponse.SerializeToString,
|
||||
),
|
||||
'cancel': grpc.unary_unary_rpc_method_handler(
|
||||
servicer.cancel,
|
||||
request_deserializer=ipython__pb2.CancelRequest.FromString,
|
||||
response_serializer=ipython__pb2.CancelResponse.SerializeToString,
|
||||
request_deserializer=kernel__pb2.CancelRequest.FromString,
|
||||
response_serializer=kernel__pb2.CancelResponse.SerializeToString,
|
||||
),
|
||||
'status': grpc.unary_unary_rpc_method_handler(
|
||||
servicer.status,
|
||||
request_deserializer=ipython__pb2.StatusRequest.FromString,
|
||||
response_serializer=ipython__pb2.StatusResponse.SerializeToString,
|
||||
request_deserializer=kernel__pb2.StatusRequest.FromString,
|
||||
response_serializer=kernel__pb2.StatusResponse.SerializeToString,
|
||||
),
|
||||
'stop': grpc.unary_unary_rpc_method_handler(
|
||||
servicer.stop,
|
||||
request_deserializer=ipython__pb2.StopRequest.FromString,
|
||||
response_serializer=ipython__pb2.StopResponse.SerializeToString,
|
||||
request_deserializer=kernel__pb2.StopRequest.FromString,
|
||||
response_serializer=kernel__pb2.StopResponse.SerializeToString,
|
||||
),
|
||||
}
|
||||
generic_handler = grpc.method_handlers_generic_handler(
|
||||
'ipython.IPython', rpc_method_handlers)
|
||||
'jupyter.JupyterKernel', rpc_method_handlers)
|
||||
server.add_generic_rpc_handlers((generic_handler,))
|
||||
|
|
@ -23,8 +23,8 @@ import time
|
|||
from concurrent import futures
|
||||
|
||||
import grpc
|
||||
import ipython_pb2
|
||||
import ipython_pb2_grpc
|
||||
import kernel_pb2
|
||||
import kernel_pb2_grpc
|
||||
|
||||
is_py2 = sys.version[0] == '2'
|
||||
if is_py2:
|
||||
|
|
@ -33,11 +33,12 @@ else:
|
|||
import queue as queue
|
||||
|
||||
|
||||
class IPython(ipython_pb2_grpc.IPythonServicer):
|
||||
class KernelServer(kernel_pb2_grpc.JupyterKernelServicer):
|
||||
|
||||
def __init__(self, server):
|
||||
self._status = ipython_pb2.STARTING
|
||||
def __init__(self, server, kernel_name):
|
||||
self._status = kernel_pb2.STARTING
|
||||
self._server = server
|
||||
self._kernel_name = kernel_name
|
||||
# issue with execute_interactive and auto completion: https://github.com/jupyter/jupyter_client/issues/429
|
||||
# in all case because ipython does not support run and auto completion at the same time: https://github.com/jupyter/notebook/issues/3763
|
||||
# For now we will lock to ensure that there is no concurrent bug that can "hang" the kernel
|
||||
|
|
@ -46,8 +47,8 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
def start(self):
|
||||
print("starting...")
|
||||
sys.stdout.flush()
|
||||
self._km, self._kc = jupyter_client.manager.start_new_kernel(kernel_name='python')
|
||||
self._status = ipython_pb2.RUNNING
|
||||
self._km, self._kc = jupyter_client.manager.start_new_kernel(kernel_name=self._kernel_name)
|
||||
self._status = kernel_pb2.RUNNING
|
||||
|
||||
def execute(self, request, context):
|
||||
print("execute code:\n")
|
||||
|
|
@ -59,42 +60,42 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
msg_type = msg['header']['msg_type']
|
||||
content = msg['content']
|
||||
print("******************* CONTENT ******************")
|
||||
outStatus, outType, output = ipython_pb2.SUCCESS, None, None
|
||||
outStatus, outType, output = kernel_pb2.SUCCESS, None, None
|
||||
# prepare the reply
|
||||
if msg_type == 'stream':
|
||||
outType = ipython_pb2.TEXT
|
||||
outType = kernel_pb2.TEXT
|
||||
output = content['text']
|
||||
elif msg_type in ('display_data', 'execute_result'):
|
||||
print(content['data'])
|
||||
# The if-else order matters, can not be changed. Because ipython may provide multiple output.
|
||||
# TEXT is the last resort type.
|
||||
if 'text/html' in content['data']:
|
||||
outType = ipython_pb2.HTML
|
||||
outType = kernel_pb2.HTML
|
||||
output = content['data']['text/html']
|
||||
elif 'image/jpeg' in content['data']:
|
||||
outType = ipython_pb2.JPEG
|
||||
outType = kernel_pb2.JPEG
|
||||
output = content['data']['image/jpeg']
|
||||
elif 'image/png' in content['data']:
|
||||
outType = ipython_pb2.PNG
|
||||
outType = kernel_pb2.PNG
|
||||
output = content['data']['image/png']
|
||||
elif 'application/javascript' in content['data']:
|
||||
outType = ipython_pb2.HTML
|
||||
outType = kernel_pb2.HTML
|
||||
output = '<script> ' + content['data']['application/javascript'] + ' </script>\n'
|
||||
elif 'application/vnd.holoviews_load.v0+json' in content['data']:
|
||||
outType = ipython_pb2.HTML
|
||||
outType = kernel_pb2.HTML
|
||||
output = '<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n'
|
||||
elif 'text/plain' in content['data']:
|
||||
outType = ipython_pb2.TEXT
|
||||
outType = kernel_pb2.TEXT
|
||||
output = content['data']['text/plain']
|
||||
elif msg_type == 'error':
|
||||
outStatus = ipython_pb2.ERROR
|
||||
outType = ipython_pb2.TEXT
|
||||
outStatus = kernel_pb2.ERROR
|
||||
outType = kernel_pb2.TEXT
|
||||
output = '\n'.join(content['traceback'])
|
||||
|
||||
# send reply if we supported the output type
|
||||
if outType is not None:
|
||||
stream_reply_queue.put(
|
||||
ipython_pb2.ExecuteResponse(status=outStatus,
|
||||
kernel_pb2.ExecuteResponse(status=outStatus,
|
||||
type=outType,
|
||||
output=output))
|
||||
def execute_worker():
|
||||
|
|
@ -121,8 +122,8 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
|
||||
# if kernel is not alive or thread is still alive, it means that we face an issue.
|
||||
if not self.isKernelAlive() or t.is_alive():
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
|
||||
type=ipython_pb2.TEXT,
|
||||
yield kernel_pb2.ExecuteResponse(status=kernel_pb2.ERROR,
|
||||
type=kernel_pb2.TEXT,
|
||||
output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
|
||||
if payload_reply:
|
||||
result = []
|
||||
|
|
@ -130,21 +131,21 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
if payload['data']['text/plain']:
|
||||
result.append(payload['data']['text/plain'])
|
||||
if result:
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.TEXT,
|
||||
yield kernel_pb2.ExecuteResponse(status=kernel_pb2.SUCCESS,
|
||||
type=kernel_pb2.TEXT,
|
||||
output='\n'.join(result))
|
||||
|
||||
def cancel(self, request, context):
|
||||
self._km.interrupt_kernel()
|
||||
return ipython_pb2.CancelResponse()
|
||||
return kernel_pb2.CancelResponse()
|
||||
|
||||
def complete(self, request, context):
|
||||
with self._lock:
|
||||
reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None)
|
||||
return ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
|
||||
return kernel_pb2.CompletionResponse(matches=reply['content']['matches'])
|
||||
|
||||
def status(self, request, context):
|
||||
return ipython_pb2.StatusResponse(status = self._status)
|
||||
return kernel_pb2.StatusResponse(status = self._status)
|
||||
|
||||
def isKernelAlive(self):
|
||||
return self._km.is_alive()
|
||||
|
|
@ -154,18 +155,18 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
|
||||
def stop(self, request, context):
|
||||
self.terminate()
|
||||
return ipython_pb2.StopResponse()
|
||||
return kernel_pb2.StopResponse()
|
||||
|
||||
|
||||
def serve(port):
|
||||
def serve(kernel_name, port):
|
||||
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
|
||||
ipython = IPython(server)
|
||||
ipython_pb2_grpc.add_IPythonServicer_to_server(ipython, server)
|
||||
kernel = KernelServer(server, kernel_name)
|
||||
kernel_pb2_grpc.add_JupyterKernelServicer_to_server(kernel, server)
|
||||
server.add_insecure_port('[::]:' + port)
|
||||
server.start()
|
||||
ipython.start()
|
||||
kernel.start()
|
||||
try:
|
||||
while ipython.isKernelAlive():
|
||||
while kernel.isKernelAlive():
|
||||
time.sleep(5)
|
||||
except KeyboardInterrupt:
|
||||
print("interrupted")
|
||||
|
|
@ -173,8 +174,8 @@ def serve(port):
|
|||
print("shutdown")
|
||||
# we let 2 sc for all request to be complete
|
||||
server.stop(2)
|
||||
ipython.terminate()
|
||||
kernel.terminate()
|
||||
os._exit(0)
|
||||
|
||||
if __name__ == '__main__':
|
||||
serve(sys.argv[1])
|
||||
serve(sys.argv[1], sys.argv[2])
|
||||
Loading…
Reference in a new issue