[ZEPPELIN-2753] Basic Implementation of IPython Interpreter

This commit is contained in:
Jeff Zhang 2017-07-02 15:51:14 +08:00
parent 30bfcae0c0
commit b0b5c95a68
48 changed files with 3258 additions and 96 deletions

View file

@ -37,7 +37,7 @@ addons:
env:
global:
# Interpreters does not required by zeppelin-server integration tests
- INTERPRETERS='!hbase,!pig,!jdbc,!file,!flink,!ignite,!kylin,!python,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy'
- INTERPRETERS='!hbase,!pig,!jdbc,!file,!flink,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy'
matrix:
include:
@ -53,7 +53,7 @@ matrix:
sudo: false
dist: trusty
jdk: "oraclejdk8"
env: WEB_E2E="true" SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pscala-2.11" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_MODULES="-pl zeppelin-web" TEST_PROJECTS="-Pweb-e2e"
env: PYTHON="2" WEB_E2E="true" SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pscala-2.11" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_MODULES="-pl zeppelin-web" TEST_PROJECTS="-Pweb-e2e"
addons:
apt:
sources:
@ -68,54 +68,54 @@ matrix:
# After issues are fixed these tests need to be included back by removing them from the "-Dtests.to.exclude" property
- jdk: "oraclejdk8"
dist: precise
env: SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Pweb-ci -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org.apache.zeppelin.spark.*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Pweb-ci -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org.apache.zeppelin.spark.*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
# Test selenium with spark module for 1.6.3
- jdk: "oraclejdk7"
dist: precise
env: TEST_SELENIUM="true" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-1.6 -Phadoop-2.6 -Phelium-dev -Pexamples" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.AbstractFunctionalSuite -DfailIfNoTests=false"
env: PYTHON="2" TEST_SELENIUM="true" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-1.6 -Phadoop-2.6 -Phelium-dev -Pexamples" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,python -Dtest=org.apache.zeppelin.AbstractFunctionalSuite -DfailIfNoTests=false"
# Test interpreter modules
- jdk: "oraclejdk7"
dist: precise
env: SCALA_VER="2.10" PROFILE="-Pscalding" BUILD_FLAG="package -DskipTests -DskipRat -Pr" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS=""
env: PYTHON="3" SCALA_VER="2.10" PROFILE="-Pscalding" BUILD_FLAG="install -DskipTests -DskipRat -Pr" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS=""
# Test spark module for 2.2.0 with scala 2.11, livy
- jdk: "oraclejdk8"
dist: precise
env: SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-2.2 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,livy" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.livy.* -DfailIfNoTests=false"
env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-2.2 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,python,livy" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.livy.* -DfailIfNoTests=false"
# Test spark module for 2.1.0 with scala 2.11, livy
- jdk: "oraclejdk7"
dist: precise
env: SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-2.1 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,livy" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.livy.* -DfailIfNoTests=false"
env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-2.1 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,python,livy" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.livy.* -DfailIfNoTests=false"
# Test spark module for 2.0.2 with scala 2.11
- jdk: "oraclejdk7"
dist: precise
env: SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-2.0 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-2.0 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,python" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
# Test spark module for 1.6.3 with scala 2.10
- jdk: "oraclejdk7"
dist: precise
env: SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-1.6 -Phadoop-2.6 -Pscala-2.10" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
env: PYTHON="3" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-1.6 -Phadoop-2.6 -Pscala-2.10" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,python" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
# Test spark module for 1.6.3 with scala 2.11
- jdk: "oraclejdk7"
dist: precise
env: SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-1.6 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-1.6 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,python" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
# Test python/pyspark with python 2, livy 0.2
- sudo: required
dist: precise
jdk: "oraclejdk7"
env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.6" LIVY_VER="0.2.0" PROFILE="-Pspark-1.6 -Phadoop-2.6 -Plivy-0.2" BUILD_FLAG="package -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python,livy" TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false"
env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.6" LIVY_VER="0.2.0" PROFILE="-Pspark-1.6 -Phadoop-2.6 -Plivy-0.2 -Pscala-2.10" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python,livy" TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false"
# Test python/pyspark with python 3, livy 0.3
- sudo: required
dist: precise
jdk: "oraclejdk7"
env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" LIVY_VER="0.3.0" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Pscala-2.11 -Plivy-0.3" BUILD_FLAG="package -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python,livy" TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false"
env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" LIVY_VER="0.3.0" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Pscala-2.11 -Plivy-0.3" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python,livy" TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false"
before_install:
# check files included in commit range, clear bower_components if a bower.json file has changed.

View file

@ -47,6 +47,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>

View file

@ -232,6 +232,70 @@ SELECT * FROM rates WHERE age < 40
Otherwise it can be referred to as `%python.sql`
## IPython Support
IPython is more powerful than the default python interpreter with extra functionality. You can use IPython2 or IPython3 which depends on which python you set `zeppelin.python`.
**Pre-requests**
- Jupyter `pip install jupyter`
- grpcio `pip install grpcio`
If you already install anaconda, then you just need to install `grpcio` as Jupyter is already included in anaconda.
Besides the basic function of python interpreter. You can use all the IPython advanced features as you use it in Jupyter Notebook.
e.g.
Use IPython magic
```
%python.ipython
#python help
range?
#timeit
%timeit range(100)
```
Use matplotlib
```
%python.ipython
%matplotlib inline
import matplotlib.pyplot as plt
print("hello world")
data=[1,2,3,4]
plt.figure()
plt.plot(data)
```
We also make `ZeppelinContext` available in IPython Interpreter. You can use `ZeppelinContext` to create dynamic forms and display pandas DataFrame.
e.g.
Create dynamic form
```
z.input(name='my_name', defaultValue='hello')
```
Show pandas dataframe
```
import pandas as pd
df = pd.DataFrame({'id':[1,2,3], 'name':['a','b','c']})
z.show(df)
```
By default, we would use IPython in `%python.python` if IPython is available. Otherwise it would fall back to the original Python implementation.
If you don't want to use IPython, then you can set `zeppelin.python.useIPython` as `false` in interpreter setting.
## Technical description
For in-depth technical details on current implementation please refer to [python/README.md](https://github.com/apache/zeppelin/blob/master/python/README.md).

View file

@ -414,6 +414,12 @@ You can choose one of `shared`, `scoped` and `isolated` options wheh you configu
Spark interpreter creates separated Scala compiler per each notebook but share a single SparkContext in `scoped` mode (experimental).
It creates separated SparkContext per each notebook in `isolated` mode.
## IPython support
By default, zeppelin would use IPython in `pyspark` when IPython is available, Otherwise it would fall back to the original PySpark implementation.
If you don't want to use IPython, then you can set `zeppelin.spark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
[Python Interpreter](python.html)
## Setting up Zeppelin with Kerberos
Logical setup with Zeppelin, Kerberos Key Distribution Center (KDC), and Spark on YARN:

13
pom.xml
View file

@ -101,7 +101,6 @@
<libthrift.version>0.9.2</libthrift.version>
<gson.version>2.2</gson.version>
<gson-extras.version>0.2.1</gson-extras.version>
<guava.version>15.0</guava.version>
<jetty.version>9.2.15.v20160210</jetty.version>
<httpcomponents.core.version>4.4.1</httpcomponents.core.version>
<httpcomponents.client.version>4.5.1</httpcomponents.client.version>
@ -246,12 +245,6 @@
<version>${commons.cli.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- Apache Shiro -->
<dependency>
<groupId>org.apache.shiro</groupId>
@ -404,7 +397,7 @@
</goals>
<configuration>
<failOnViolation>true</failOnViolation>
<excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*</excludes>
<excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/python/proto/*</excludes>
</configuration>
</execution>
<execution>
@ -414,7 +407,7 @@
<goal>checkstyle-aggregate</goal>
</goals>
<configuration>
<excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*</excludes>
<excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/python/proto/*</excludes>
</configuration>
</execution>
</executions>
@ -1082,6 +1075,8 @@
<!--The following files are mechanical-->
<exclude>**/R/rzeppelin/DESCRIPTION</exclude>
<exclude>**/R/rzeppelin/NAMESPACE</exclude>
<exclude>python/src/main/resources/grpc/**/*</exclude>
</excludes>
</configuration>

View file

@ -50,3 +50,22 @@ mvn -Dpython.test.exclude='' test -pl python -am
* Matplotlib figures are displayed inline with the notebook automatically using a built-in backend for zeppelin in conjunction with a post-execute hook.
* `%python.sql` support for Pandas DataFrames is optional and provided using https://github.com/yhat/pandasql if user have one installed
# IPython Overview
IPython interpreter for Apache Zeppelin
# IPython Requirements
You need to install the following python packages to make the IPython interpreter work.
* jupyter 5.x
* IPython
* ipykernel
* grpcio
If you have installed anaconda, then you just need to install grpc.
# IPython Architecture
Current interpreter delegate the whole work to ipython kernel via `jupyter_client`. Zeppelin would launch a python process which host the ipython kernel.
Zeppelin interpreter process will communicate with the python process via `grpc`. Ideally every feature works in IPython should work in Zeppelin as well.

View file

@ -41,6 +41,7 @@
</python.test.exclude>
<pypi.repo.url>https://pypi.python.org/packages</pypi.repo.url>
<python.py4j.repo.folder>/64/5c/01e13b68e8caafece40d549f232c9b5677ad1016071a48d04cc3895acaa3</python.py4j.repo.folder>
<grpc.version>1.4.0</grpc.version>
</properties>
<dependencies>
@ -73,6 +74,28 @@
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<!-- test libraries -->
<dependency>
<groupId>junit</groupId>
@ -88,7 +111,36 @@
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
@ -136,6 +188,19 @@
</executions>
</plugin>
<!-- publish test jar as well so that spark module can use it -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View file

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.proto.CancelRequest;
import org.apache.zeppelin.python.proto.CancelResponse;
import org.apache.zeppelin.python.proto.CompletionRequest;
import org.apache.zeppelin.python.proto.CompletionResponse;
import org.apache.zeppelin.python.proto.ExecuteRequest;
import org.apache.zeppelin.python.proto.ExecuteResponse;
import org.apache.zeppelin.python.proto.ExecuteStatus;
import org.apache.zeppelin.python.proto.IPythonGrpc;
import org.apache.zeppelin.python.proto.OutputType;
import org.apache.zeppelin.python.proto.StatusRequest;
import org.apache.zeppelin.python.proto.StatusResponse;
import org.apache.zeppelin.python.proto.StopRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Grpc client for IPython kernel
*/
public class IPythonClient {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonClient.class.getName());
private final ManagedChannel channel;
private final IPythonGrpc.IPythonBlockingStub blockingStub;
private final IPythonGrpc.IPythonStub asyncStub;
private Random random = new Random();
/**
* Construct client for accessing RouteGuide server at {@code host:port}.
*/
public IPythonClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true));
}
/**
* Construct client for accessing RouteGuide server using the existing channel.
*/
public IPythonClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = IPythonGrpc.newBlockingStub(channel);
asyncStub = IPythonGrpc.newStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
// execute the code and make the output as streaming by writing it to InterpreterOutputStream
// one by one.
public ExecuteResponse stream_execute(ExecuteRequest request,
final InterpreterOutputStream interpreterOutput) {
final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
.setStatus(ExecuteStatus.SUCCESS);
final AtomicBoolean completedFlag = new AtomicBoolean(false);
LOGGER.debug("stream_execute code:\n" + request.getCode());
asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
int index = 0;
boolean isPreviousOutputImage = false;
@Override
public void onNext(ExecuteResponse executeResponse) {
if (executeResponse.getType() == OutputType.TEXT) {
try {
LOGGER.debug("Interpreter Streaming Output: " + executeResponse.getOutput());
if (isPreviousOutputImage) {
// add '\n' when switch from image to text
interpreterOutput.write("\n".getBytes());
}
isPreviousOutputImage = false;
interpreterOutput.write(executeResponse.getOutput().getBytes());
interpreterOutput.getInterpreterOutput().flush();
} catch (IOException e) {
LOGGER.error("Unexpected IOException", e);
}
}
if (executeResponse.getType() == OutputType.IMAGE) {
try {
LOGGER.debug("Interpreter Streaming Output: IMAGE_DATA");
if (index != 0) {
// add '\n' if this is the not the first element. otherwise it would mix the image
// with the text
interpreterOutput.write("\n".getBytes());
}
interpreterOutput.write(("%img " + executeResponse.getOutput()).getBytes());
interpreterOutput.getInterpreterOutput().flush();
isPreviousOutputImage = true;
} catch (IOException e) {
LOGGER.error("Unexpected IOException", e);
}
}
if (executeResponse.getStatus() == ExecuteStatus.ERROR) {
// set the finalResponse to ERROR if any ERROR happens, otherwise the finalResponse would
// be SUCCESS.
finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
}
index++;
}
@Override
public void onError(Throwable throwable) {
try {
interpreterOutput.getInterpreterOutput().flush();
} catch (IOException e) {
LOGGER.error("Unexpected IOException", e);
}
LOGGER.error("Fail to call IPython grpc", throwable);
}
@Override
public void onCompleted() {
synchronized (completedFlag) {
try {
LOGGER.debug("stream_execute is completed");
interpreterOutput.getInterpreterOutput().flush();
} catch (IOException e) {
LOGGER.error("Unexpected IOException", e);
}
completedFlag.set(true);
completedFlag.notify();
}
}
});
synchronized (completedFlag) {
if (!completedFlag.get()) {
try {
completedFlag.wait();
} catch (InterruptedException e) {
LOGGER.error("Unexpected Interruption", e);
}
}
}
return finalResponseBuilder.build();
}
// blocking execute the code
public ExecuteResponse block_execute(ExecuteRequest request) {
ExecuteResponse.Builder responseBuilder = ExecuteResponse.newBuilder();
responseBuilder.setStatus(ExecuteStatus.SUCCESS);
Iterator<ExecuteResponse> iter = blockingStub.execute(request);
StringBuilder outputBuilder = new StringBuilder();
while (iter.hasNext()) {
ExecuteResponse nextResponse = iter.next();
if (nextResponse.getStatus() == ExecuteStatus.ERROR) {
responseBuilder.setStatus(ExecuteStatus.ERROR);
}
outputBuilder.append(nextResponse.getOutput());
}
responseBuilder.setOutput(outputBuilder.toString());
return responseBuilder.build();
}
public CancelResponse cancel(CancelRequest request) {
return blockingStub.cancel(request);
}
public CompletionResponse complete(CompletionRequest request) {
return blockingStub.complete(request);
}
public StatusResponse status(StatusRequest request) {
return blockingStub.status(request);
}
public void stop(StopRequest request) {
asyncStub.stop(request, null);
}
public static void main(String[] args) {
IPythonClient client = new IPythonClient("localhost", 50053);
client.status(StatusRequest.newBuilder().build());
ExecuteResponse response = client.block_execute(ExecuteRequest.newBuilder().
setCode("abcd=2").build());
System.out.println(response.getOutput());
}
}

View file

@ -0,0 +1,359 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.LogOutputStream;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.proto.CancelRequest;
import org.apache.zeppelin.python.proto.CompletionRequest;
import org.apache.zeppelin.python.proto.CompletionResponse;
import org.apache.zeppelin.python.proto.ExecuteRequest;
import org.apache.zeppelin.python.proto.ExecuteResponse;
import org.apache.zeppelin.python.proto.ExecuteStatus;
import org.apache.zeppelin.python.proto.IPythonStatus;
import org.apache.zeppelin.python.proto.StatusRequest;
import org.apache.zeppelin.python.proto.StatusResponse;
import org.apache.zeppelin.python.proto.StopRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* IPython Interpreter for Zeppelin
*/
public class IPythonInterpreter extends Interpreter implements ExecuteResultHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreter.class);
private ExecuteWatchdog watchDog;
private IPythonClient ipythonClient;
private GatewayServer gatewayServer;
private PythonZeppelinContext zeppelinContext;
private String pythonExecutable;
private long ipythonLaunchTimeout;
private String additionalPythonPath;
private String additionalPythonInitFile;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
public IPythonInterpreter(Properties properties) {
super(properties);
}
/**
* Sub class can customize the interpreter by adding more python packages under PYTHONPATH.
* e.g. PySparkInterpreter
*
* @param additionalPythonPath
*/
public void setAdditionalPythonPath(String additionalPythonPath) {
this.additionalPythonPath = additionalPythonPath;
}
/**
* Sub class can customize the interpreter by running additional python init code.
* e.g. PySparkInterpreter
*
* @param additionalPythonInitFile
*/
public void setAdditionalPythonInitFile(String additionalPythonInitFile) {
this.additionalPythonInitFile = additionalPythonInitFile;
}
@Override
public void open() {
try {
if (ipythonClient != null) {
// IPythonInterpreter might already been opened by PythonInterpreter
return;
}
pythonExecutable = getProperty().getProperty("zeppelin.python", "python");
ipythonLaunchTimeout = Long.parseLong(
getProperty().getProperty("zeppelin.ipython.launch.timeout", "30000"));
this.zeppelinContext = new PythonZeppelinContext(
getInterpreterGroup().getInterpreterHookRegistry(),
Integer.parseInt(getProperty().getProperty("zeppelin.python.maxResult", "1000")));
int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
LOGGER.info("Launching JVM Gateway at port: " + jvmGatewayPort);
ipythonClient = new IPythonClient("127.0.0.1", ipythonPort);
launchIPythonKernel(ipythonPort);
setupJVMGateway(jvmGatewayPort);
} catch (Exception e) {
throw new RuntimeException("Fail to open IPythonInterpreter", e);
}
}
public boolean checkIPythonPrerequisite() {
ProcessBuilder processBuilder = new ProcessBuilder("pip", "freeze");
try {
File stderrFile = File.createTempFile("zeppelin", ".txt");
processBuilder.redirectError(stderrFile);
File stdoutFile = File.createTempFile("zeppelin", ".txt");
processBuilder.redirectOutput(stdoutFile);
Process proc = processBuilder.start();
int ret = proc.waitFor();
if (ret != 0) {
LOGGER.warn("Fail to run pip freeze.\n" +
IOUtils.toString(new FileInputStream(stderrFile)));
return false;
}
String freezeOutput = IOUtils.toString(new FileInputStream(stdoutFile));
if (!freezeOutput.contains("jupyter-client=")) {
InterpreterContext.get().out.write("jupyter-client is not installed\n".getBytes());
return false;
}
if (!freezeOutput.contains("ipykernel=")) {
InterpreterContext.get().out.write("ipkernel is not installed\n".getBytes());
return false;
}
if (!freezeOutput.contains("ipython=")) {
InterpreterContext.get().out.write("ipython is not installed\n".getBytes());
return false;
}
if (!freezeOutput.contains("grpcio=")) {
InterpreterContext.get().out.write("grpcio is not installed\n".getBytes());
return false;
}
LOGGER.info("IPython prerequisite is meet");
return true;
} catch (Exception e) {
LOGGER.warn("Fail to checkIPythonPrerequisite", e);
return false;
}
}
private void setupJVMGateway(int jvmGatewayPort) throws IOException {
gatewayServer = new GatewayServer(this, jvmGatewayPort);
gatewayServer.start();
InputStream input =
getClass().getClassLoader().getResourceAsStream("grpc/python/zeppelin_python.py");
List<String> lines = IOUtils.readLines(input);
ExecuteResponse response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())
.replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")).build());
if (response.getStatus() == ExecuteStatus.ERROR) {
throw new IOException("Fail to setup JVMGateway\n" + response.getOutput());
}
if (additionalPythonInitFile != null) {
input = getClass().getClassLoader().getResourceAsStream(additionalPythonInitFile);
lines = IOUtils.readLines(input);
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())
.replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")).build());
if (response.getStatus() == ExecuteStatus.ERROR) {
throw new IOException("Fail to run additional Python init file: "
+ additionalPythonInitFile + "\n" + response.getOutput());
}
}
}
private void launchIPythonKernel(int ipythonPort)
throws IOException, URISyntaxException {
// copy the python scripts to a temp directory, then launch ipython kernel in that folder
File tmpPythonScriptFolder = Files.createTempDirectory("zeppelin_ipython").toFile();
String[] ipythonScripts = {"ipython_server.py", "ipython_pb2.py", "ipython_pb2_grpc.py"};
for (String ipythonScript : ipythonScripts) {
URL url = getClass().getClassLoader().getResource("grpc/python"
+ "/" + ipythonScript);
FileUtils.copyURLToFile(url, new File(tmpPythonScriptFolder, ipythonScript));
}
CommandLine cmd = CommandLine.parse(pythonExecutable);
cmd.addArgument(tmpPythonScriptFolder.getAbsolutePath() + "/ipython_server.py");
cmd.addArgument(ipythonPort + "");
DefaultExecutor executor = new DefaultExecutor();
ProcessLogOutputStream processOutput = new ProcessLogOutputStream(LOGGER);
executor.setStreamHandler(new PumpStreamHandler(processOutput));
watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchDog);
String py4jLibPath = null;
if (System.getenv("ZEPPELIN_HOME") != null) {
py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
+ PythonInterpreter.ZEPPELIN_PY4JPATH;
} else {
Path workingPath = Paths.get("..").toAbsolutePath();
py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
}
if (additionalPythonPath != null) {
// put the py4j at the end, because additionalPythonPath may already contain py4j.
// e.g. PySparkInterpreter
additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
} else {
additionalPythonPath = py4jLibPath;
}
Map<String, String> envs = EnvironmentUtils.getProcEnvironment();
if (envs.containsKey("PYTHONPATH")) {
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
} else {
envs.put("PYTHONPATH", additionalPythonPath);
}
LOGGER.debug("PYTHONPATH: " + envs.get("PYTHONPATH"));
executor.execute(cmd, envs, this);
// wait until IPython kernel is started or timeout
long startTime = System.currentTimeMillis();
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Interrupted by something", e);
}
try {
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
if (response.getStatus() == IPythonStatus.RUNNING) {
LOGGER.info("IPython Kernel is Running");
break;
} else {
LOGGER.info("Wait for IPython Kernel to be started");
}
} catch (Exception e) {
// ignore the exception, because is may happen when grpc server has not started yet.
LOGGER.info("Wait for IPython Kernel to be started");
}
if ((System.currentTimeMillis() - startTime) > ipythonLaunchTimeout) {
throw new IOException("Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000
+ " seconds");
}
}
}
@Override
public void close() {
if (watchDog != null) {
LOGGER.debug("Kill IPython Process");
ipythonClient.stop(StopRequest.newBuilder().build());
watchDog.destroyProcess();
gatewayServer.shutdown();
}
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
zeppelinContext.setGui(context.getGui());
interpreterOutput.setInterpreterOutput(context.out);
ExecuteResponse response =
ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
interpreterOutput);
try {
interpreterOutput.getInterpreterOutput().flush();
} catch (IOException e) {
throw new RuntimeException("Fail to write output", e);
}
InterpreterResult result = new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
return result;
}
@Override
public void cancel(InterpreterContext context) {
ipythonClient.cancel(CancelRequest.newBuilder().build());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
List<InterpreterCompletion> completions = new ArrayList<>();
CompletionResponse response =
ipythonClient.complete(
CompletionRequest.getDefaultInstance().newBuilder().setCode(buf)
.setCursor(cursor).build());
for (int i = 0; i < response.getMatchesCount(); i++) {
completions.add(new InterpreterCompletion(
response.getMatches(i), response.getMatches(i), ""));
}
return completions;
}
public PythonZeppelinContext getZeppelinContext() {
return zeppelinContext;
}
@Override
public void onProcessComplete(int exitValue) {
LOGGER.warn("Python Process is completed with exitValue: " + exitValue);
}
@Override
public void onProcessFailed(ExecuteException e) {
LOGGER.warn("Exception happens in Python Process", e);
}
private static class ProcessLogOutputStream extends LogOutputStream {
private Logger logger;
public ProcessLogOutputStream(Logger logger) {
this.logger = logger;
}
@Override
protected void processLine(String s, int i) {
this.logger.debug("Process Output: " + s);
}
}
}

View file

@ -91,6 +91,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
private static final int MAX_TIMEOUT_SEC = 10;
private long pythonPid = 0;
private IPythonInterpreter iPythonInterpreter;
Integer statementSetNotifier = new Integer(0);
@ -219,6 +220,37 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
@Override
public void open() {
// try IPythonInterpreter first. If it is not available, we will fallback to the original
// python interpreter implementation.
iPythonInterpreter = getIPythonInterpreter();
if (getProperty().getProperty("zeppelin.python.useIPython", "true").equals("true") &&
iPythonInterpreter.checkIPythonPrerequisite()) {
try {
iPythonInterpreter.open();
if (InterpreterContext.get() != null) {
InterpreterContext.get().out.write(("IPython is available, " +
"use IPython for PythonInterpreter\n")
.getBytes());
}
LOG.info("Use IPythonInterpreter to replace PythonInterpreter");
return;
} catch (Exception e) {
iPythonInterpreter = null;
}
}
// reset iPythonInterpreter to null
iPythonInterpreter = null;
try {
if (InterpreterContext.get() != null) {
InterpreterContext.get().out.write(("IPython is not available, " +
"use the native PythonInterpreter\n")
.getBytes());
}
} catch (IOException e) {
LOG.warn("Fail to write InterpreterOutput", e.getMessage());
}
// Add matplotlib display hook
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
@ -232,8 +264,27 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
}
private IPythonInterpreter getIPythonInterpreter() {
LazyOpenInterpreter lazy = null;
IPythonInterpreter ipython = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(IPythonInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
ipython = (IPythonInterpreter) p;
return ipython;
}
@Override
public void close() {
if (iPythonInterpreter != null) {
iPythonInterpreter.close();
return;
}
pythonscriptRunning = false;
pythonScriptInitialized = false;
@ -319,6 +370,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
if (iPythonInterpreter != null) {
return iPythonInterpreter.interpret(cmd, contextInterpreter);
}
if (cmd == null || cmd.isEmpty()) {
return new InterpreterResult(Code.SUCCESS, "");
}
@ -411,6 +465,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
@Override
public void cancel(InterpreterContext context) {
if (iPythonInterpreter != null) {
iPythonInterpreter.cancel(context);
}
try {
interrupt();
} catch (IOException e) {
@ -425,11 +482,17 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
@Override
public int getProgress(InterpreterContext context) {
if (iPythonInterpreter != null) {
return iPythonInterpreter.getProgress(context);
}
return 0;
}
@Override
public Scheduler getScheduler() {
if (iPythonInterpreter != null) {
return iPythonInterpreter.getScheduler();
}
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
PythonInterpreter.class.getName() + this.hashCode());
}
@ -437,6 +500,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
if (iPythonInterpreter != null) {
return iPythonInterpreter.completion(buf, cursor, interpreterContext);
}
return null;
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import java.util.List;
import java.util.Map;
/**
* ZeppelinContext for Python
*/
public class PythonZeppelinContext extends BaseZeppelinContext {
public PythonZeppelinContext(InterpreterHookRegistry hooks, int maxResult) {
super(hooks, maxResult);
}
@Override
public Map<String, String> getInterpreterClassMap() {
return null;
}
@Override
public List<Class> getSupportedClasses() {
return null;
}
@Override
protected String showData(Object obj) {
return null;
}
}

View file

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.zeppelin.python.proto";
option java_outer_classname = "IPythonProto";
option objc_class_prefix = "IPython";
package ipython;
// The IPython service definition.
service IPython {
// Sends code
rpc execute (ExecuteRequest) returns (stream ExecuteResponse) {}
// Get completion
rpc complete (CompletionRequest) returns (CompletionResponse) {}
// Cancel the running statement
rpc cancel (CancelRequest) returns (CancelResponse) {}
// Get ipython kernel status
rpc status (StatusRequest) returns (StatusResponse) {}
rpc stop(StopRequest) returns (StopResponse) {}
}
enum ExecuteStatus {
SUCCESS = 0;
ERROR = 1;
}
enum IPythonStatus {
STARTING = 0;
RUNNING = 1;
}
enum OutputType {
TEXT = 0;
IMAGE = 1;
}
// The request message containing the code
message ExecuteRequest {
string code = 1;
}
// The response message containing the execution result.
message ExecuteResponse {
ExecuteStatus status = 1;
OutputType type = 2;
string output = 3;
}
message CancelRequest {
}
message CancelResponse {
}
message CompletionRequest {
string code = 1;
int32 cursor = 2;
}
message CompletionResponse {
repeated string matches = 1;
}
message StatusRequest {
}
message StatusResponse {
IPythonStatus status = 1;
}
message StopRequest {
}
message StopResponse {
}

View file

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env bash
python -m grpc_tools.protoc -I../../proto --python_out=python --grpc_python_out=python ../../proto/ipython.proto

View file

@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
import ipython_pb2
import ipython_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50053')
stub = ipython_pb2_grpc.IPythonStub(channel)
response = stub.execute(ipython_pb2.ExecuteRequest(code="import time\nfor i in range(1,4):\n\ttime.sleep(1)\n\tprint(i)\n" +
"%matplotlib inline\nimport matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)"))
for r in response:
print("output:" + r.output)
response = stub.execute(ipython_pb2.ExecuteRequest(code="range?"))
for r in response:
print(r)
if __name__ == '__main__':
run()

View file

@ -0,0 +1,751 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: ipython.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf.internal import enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='ipython.proto',
package='ipython',
syntax='proto3',
serialized_pb=_b('\n\ripython.proto\x12\x07ipython\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.ipython.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionResponse\x12\x0f\n\x07matches\x18\x01 \x03(\t\"\x0f\n\rStatusRequest\"8\n\x0eStatusResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.IPythonStatus\"\r\n\x0bStopRequest\"\x0e\n\x0cStopResponse*\'\n\rExecuteStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01**\n\rIPythonStatus\x12\x0c\n\x08STARTING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01*!\n\nOutputType\x12\x08\n\x04TEXT\x10\x00\x12\t\n\x05IMAGE\x10\x01\x32\xc3\x02\n\x07IPython\x12@\n\x07\x65xecute\x12\x17.ipython.ExecuteRequest\x1a\x18.ipython.ExecuteResponse\"\x00\x30\x01\x12\x45\n\x08\x63omplete\x12\x1a.ipython.CompletionRequest\x1a\x1b.ipython.CompletionResponse\"\x00\x12;\n\x06\x63\x61ncel\x12\x16.ipython.CancelRequest\x1a\x17.ipython.CancelResponse\"\x00\x12;\n\x06status\x12\x16.ipython.StatusRequest\x1a\x17.ipython.StatusResponse\"\x00\x12\x35\n\x04stop\x12\x14.ipython.StopRequest\x1a\x15.ipython.StopResponse\"\x00\x42<\n org.apache.zeppelin.python.protoB\x0cIPythonProtoP\x01\xa2\x02\x07IPythonb\x06proto3')
)
_EXECUTESTATUS = _descriptor.EnumDescriptor(
name='ExecuteStatus',
full_name='ipython.ExecuteStatus',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='SUCCESS', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='ERROR', index=1, number=1,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=399,
serialized_end=438,
)
_sym_db.RegisterEnumDescriptor(_EXECUTESTATUS)
ExecuteStatus = enum_type_wrapper.EnumTypeWrapper(_EXECUTESTATUS)
_IPYTHONSTATUS = _descriptor.EnumDescriptor(
name='IPythonStatus',
full_name='ipython.IPythonStatus',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='STARTING', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='RUNNING', index=1, number=1,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=440,
serialized_end=482,
)
_sym_db.RegisterEnumDescriptor(_IPYTHONSTATUS)
IPythonStatus = enum_type_wrapper.EnumTypeWrapper(_IPYTHONSTATUS)
_OUTPUTTYPE = _descriptor.EnumDescriptor(
name='OutputType',
full_name='ipython.OutputType',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='TEXT', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='IMAGE', index=1, number=1,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=484,
serialized_end=517,
)
_sym_db.RegisterEnumDescriptor(_OUTPUTTYPE)
OutputType = enum_type_wrapper.EnumTypeWrapper(_OUTPUTTYPE)
SUCCESS = 0
ERROR = 1
STARTING = 0
RUNNING = 1
TEXT = 0
IMAGE = 1
_EXECUTEREQUEST = _descriptor.Descriptor(
name='ExecuteRequest',
full_name='ipython.ExecuteRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='code', full_name='ipython.ExecuteRequest.code', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=26,
serialized_end=56,
)
_EXECUTERESPONSE = _descriptor.Descriptor(
name='ExecuteResponse',
full_name='ipython.ExecuteResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='status', full_name='ipython.ExecuteResponse.status', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='type', full_name='ipython.ExecuteResponse.type', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='output', full_name='ipython.ExecuteResponse.output', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=58,
serialized_end=166,
)
_CANCELREQUEST = _descriptor.Descriptor(
name='CancelRequest',
full_name='ipython.CancelRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=168,
serialized_end=183,
)
_CANCELRESPONSE = _descriptor.Descriptor(
name='CancelResponse',
full_name='ipython.CancelResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=185,
serialized_end=201,
)
_COMPLETIONREQUEST = _descriptor.Descriptor(
name='CompletionRequest',
full_name='ipython.CompletionRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='code', full_name='ipython.CompletionRequest.code', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='cursor', full_name='ipython.CompletionRequest.cursor', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=203,
serialized_end=252,
)
_COMPLETIONRESPONSE = _descriptor.Descriptor(
name='CompletionResponse',
full_name='ipython.CompletionResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='matches', full_name='ipython.CompletionResponse.matches', index=0,
number=1, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=254,
serialized_end=291,
)
_STATUSREQUEST = _descriptor.Descriptor(
name='StatusRequest',
full_name='ipython.StatusRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=293,
serialized_end=308,
)
_STATUSRESPONSE = _descriptor.Descriptor(
name='StatusResponse',
full_name='ipython.StatusResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='status', full_name='ipython.StatusResponse.status', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=310,
serialized_end=366,
)
_STOPREQUEST = _descriptor.Descriptor(
name='StopRequest',
full_name='ipython.StopRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=368,
serialized_end=381,
)
_STOPRESPONSE = _descriptor.Descriptor(
name='StopResponse',
full_name='ipython.StopResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=383,
serialized_end=397,
)
_EXECUTERESPONSE.fields_by_name['status'].enum_type = _EXECUTESTATUS
_EXECUTERESPONSE.fields_by_name['type'].enum_type = _OUTPUTTYPE
_STATUSRESPONSE.fields_by_name['status'].enum_type = _IPYTHONSTATUS
DESCRIPTOR.message_types_by_name['ExecuteRequest'] = _EXECUTEREQUEST
DESCRIPTOR.message_types_by_name['ExecuteResponse'] = _EXECUTERESPONSE
DESCRIPTOR.message_types_by_name['CancelRequest'] = _CANCELREQUEST
DESCRIPTOR.message_types_by_name['CancelResponse'] = _CANCELRESPONSE
DESCRIPTOR.message_types_by_name['CompletionRequest'] = _COMPLETIONREQUEST
DESCRIPTOR.message_types_by_name['CompletionResponse'] = _COMPLETIONRESPONSE
DESCRIPTOR.message_types_by_name['StatusRequest'] = _STATUSREQUEST
DESCRIPTOR.message_types_by_name['StatusResponse'] = _STATUSRESPONSE
DESCRIPTOR.message_types_by_name['StopRequest'] = _STOPREQUEST
DESCRIPTOR.message_types_by_name['StopResponse'] = _STOPRESPONSE
DESCRIPTOR.enum_types_by_name['ExecuteStatus'] = _EXECUTESTATUS
DESCRIPTOR.enum_types_by_name['IPythonStatus'] = _IPYTHONSTATUS
DESCRIPTOR.enum_types_by_name['OutputType'] = _OUTPUTTYPE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
ExecuteRequest = _reflection.GeneratedProtocolMessageType('ExecuteRequest', (_message.Message,), dict(
DESCRIPTOR = _EXECUTEREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.ExecuteRequest)
))
_sym_db.RegisterMessage(ExecuteRequest)
ExecuteResponse = _reflection.GeneratedProtocolMessageType('ExecuteResponse', (_message.Message,), dict(
DESCRIPTOR = _EXECUTERESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.ExecuteResponse)
))
_sym_db.RegisterMessage(ExecuteResponse)
CancelRequest = _reflection.GeneratedProtocolMessageType('CancelRequest', (_message.Message,), dict(
DESCRIPTOR = _CANCELREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.CancelRequest)
))
_sym_db.RegisterMessage(CancelRequest)
CancelResponse = _reflection.GeneratedProtocolMessageType('CancelResponse', (_message.Message,), dict(
DESCRIPTOR = _CANCELRESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.CancelResponse)
))
_sym_db.RegisterMessage(CancelResponse)
CompletionRequest = _reflection.GeneratedProtocolMessageType('CompletionRequest', (_message.Message,), dict(
DESCRIPTOR = _COMPLETIONREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.CompletionRequest)
))
_sym_db.RegisterMessage(CompletionRequest)
CompletionResponse = _reflection.GeneratedProtocolMessageType('CompletionResponse', (_message.Message,), dict(
DESCRIPTOR = _COMPLETIONRESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.CompletionResponse)
))
_sym_db.RegisterMessage(CompletionResponse)
StatusRequest = _reflection.GeneratedProtocolMessageType('StatusRequest', (_message.Message,), dict(
DESCRIPTOR = _STATUSREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.StatusRequest)
))
_sym_db.RegisterMessage(StatusRequest)
StatusResponse = _reflection.GeneratedProtocolMessageType('StatusResponse', (_message.Message,), dict(
DESCRIPTOR = _STATUSRESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.StatusResponse)
))
_sym_db.RegisterMessage(StatusResponse)
StopRequest = _reflection.GeneratedProtocolMessageType('StopRequest', (_message.Message,), dict(
DESCRIPTOR = _STOPREQUEST,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.StopRequest)
))
_sym_db.RegisterMessage(StopRequest)
StopResponse = _reflection.GeneratedProtocolMessageType('StopResponse', (_message.Message,), dict(
DESCRIPTOR = _STOPRESPONSE,
__module__ = 'ipython_pb2'
# @@protoc_insertion_point(class_scope:ipython.StopResponse)
))
_sym_db.RegisterMessage(StopResponse)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n org.apache.zeppelin.python.protoB\014IPythonProtoP\001\242\002\007IPython'))
try:
# THESE ELEMENTS WILL BE DEPRECATED.
# Please use the generated *_pb2_grpc.py files instead.
import grpc
from grpc.beta import implementations as beta_implementations
from grpc.beta import interfaces as beta_interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import utilities as face_utilities
class IPythonStub(object):
"""The IPython service definition.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.execute = channel.unary_stream(
'/ipython.IPython/execute',
request_serializer=ExecuteRequest.SerializeToString,
response_deserializer=ExecuteResponse.FromString,
)
self.complete = channel.unary_unary(
'/ipython.IPython/complete',
request_serializer=CompletionRequest.SerializeToString,
response_deserializer=CompletionResponse.FromString,
)
self.cancel = channel.unary_unary(
'/ipython.IPython/cancel',
request_serializer=CancelRequest.SerializeToString,
response_deserializer=CancelResponse.FromString,
)
self.status = channel.unary_unary(
'/ipython.IPython/status',
request_serializer=StatusRequest.SerializeToString,
response_deserializer=StatusResponse.FromString,
)
self.stop = channel.unary_unary(
'/ipython.IPython/stop',
request_serializer=StopRequest.SerializeToString,
response_deserializer=StopResponse.FromString,
)
class IPythonServicer(object):
"""The IPython service definition.
"""
def execute(self, request, context):
"""Sends code
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def complete(self, request, context):
"""Get completion
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def cancel(self, request, context):
"""Cancel the running statement
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def status(self, request, context):
"""Get ipython kernel status
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def stop(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_IPythonServicer_to_server(servicer, server):
rpc_method_handlers = {
'execute': grpc.unary_stream_rpc_method_handler(
servicer.execute,
request_deserializer=ExecuteRequest.FromString,
response_serializer=ExecuteResponse.SerializeToString,
),
'complete': grpc.unary_unary_rpc_method_handler(
servicer.complete,
request_deserializer=CompletionRequest.FromString,
response_serializer=CompletionResponse.SerializeToString,
),
'cancel': grpc.unary_unary_rpc_method_handler(
servicer.cancel,
request_deserializer=CancelRequest.FromString,
response_serializer=CancelResponse.SerializeToString,
),
'status': grpc.unary_unary_rpc_method_handler(
servicer.status,
request_deserializer=StatusRequest.FromString,
response_serializer=StatusResponse.SerializeToString,
),
'stop': grpc.unary_unary_rpc_method_handler(
servicer.stop,
request_deserializer=StopRequest.FromString,
response_serializer=StopResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'ipython.IPython', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
class BetaIPythonServicer(object):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This class was generated
only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0."""
"""The IPython service definition.
"""
def execute(self, request, context):
"""Sends code
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def complete(self, request, context):
"""Get completion
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def cancel(self, request, context):
"""Cancel the running statement
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def status(self, request, context):
"""Get ipython kernel status
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def stop(self, request, context):
# missing associated documentation comment in .proto file
pass
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
class BetaIPythonStub(object):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This class was generated
only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0."""
"""The IPython service definition.
"""
def execute(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
"""Sends code
"""
raise NotImplementedError()
def complete(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
"""Get completion
"""
raise NotImplementedError()
complete.future = None
def cancel(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
"""Cancel the running statement
"""
raise NotImplementedError()
cancel.future = None
def status(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
"""Get ipython kernel status
"""
raise NotImplementedError()
status.future = None
def stop(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
# missing associated documentation comment in .proto file
pass
raise NotImplementedError()
stop.future = None
def beta_create_IPython_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This function was
generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0"""
request_deserializers = {
('ipython.IPython', 'cancel'): CancelRequest.FromString,
('ipython.IPython', 'complete'): CompletionRequest.FromString,
('ipython.IPython', 'execute'): ExecuteRequest.FromString,
('ipython.IPython', 'status'): StatusRequest.FromString,
('ipython.IPython', 'stop'): StopRequest.FromString,
}
response_serializers = {
('ipython.IPython', 'cancel'): CancelResponse.SerializeToString,
('ipython.IPython', 'complete'): CompletionResponse.SerializeToString,
('ipython.IPython', 'execute'): ExecuteResponse.SerializeToString,
('ipython.IPython', 'status'): StatusResponse.SerializeToString,
('ipython.IPython', 'stop'): StopResponse.SerializeToString,
}
method_implementations = {
('ipython.IPython', 'cancel'): face_utilities.unary_unary_inline(servicer.cancel),
('ipython.IPython', 'complete'): face_utilities.unary_unary_inline(servicer.complete),
('ipython.IPython', 'execute'): face_utilities.unary_stream_inline(servicer.execute),
('ipython.IPython', 'status'): face_utilities.unary_unary_inline(servicer.status),
('ipython.IPython', 'stop'): face_utilities.unary_unary_inline(servicer.stop),
}
server_options = beta_implementations.server_options(request_deserializers=request_deserializers, response_serializers=response_serializers, thread_pool=pool, thread_pool_size=pool_size, default_timeout=default_timeout, maximum_timeout=maximum_timeout)
return beta_implementations.server(method_implementations, options=server_options)
def beta_create_IPython_stub(channel, host=None, metadata_transformer=None, pool=None, pool_size=None):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This function was
generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0"""
request_serializers = {
('ipython.IPython', 'cancel'): CancelRequest.SerializeToString,
('ipython.IPython', 'complete'): CompletionRequest.SerializeToString,
('ipython.IPython', 'execute'): ExecuteRequest.SerializeToString,
('ipython.IPython', 'status'): StatusRequest.SerializeToString,
('ipython.IPython', 'stop'): StopRequest.SerializeToString,
}
response_deserializers = {
('ipython.IPython', 'cancel'): CancelResponse.FromString,
('ipython.IPython', 'complete'): CompletionResponse.FromString,
('ipython.IPython', 'execute'): ExecuteResponse.FromString,
('ipython.IPython', 'status'): StatusResponse.FromString,
('ipython.IPython', 'stop'): StopResponse.FromString,
}
cardinalities = {
'cancel': cardinality.Cardinality.UNARY_UNARY,
'complete': cardinality.Cardinality.UNARY_UNARY,
'execute': cardinality.Cardinality.UNARY_STREAM,
'status': cardinality.Cardinality.UNARY_UNARY,
'stop': cardinality.Cardinality.UNARY_UNARY,
}
stub_options = beta_implementations.stub_options(host=host, metadata_transformer=metadata_transformer, request_serializers=request_serializers, response_deserializers=response_deserializers, thread_pool=pool, thread_pool_size=pool_size)
return beta_implementations.dynamic_stub(channel, 'ipython.IPython', cardinalities, options=stub_options)
except ImportError:
pass
# @@protoc_insertion_point(module_scope)

View file

@ -0,0 +1,129 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import ipython_pb2 as ipython__pb2
class IPythonStub(object):
"""The IPython service definition.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.execute = channel.unary_stream(
'/ipython.IPython/execute',
request_serializer=ipython__pb2.ExecuteRequest.SerializeToString,
response_deserializer=ipython__pb2.ExecuteResponse.FromString,
)
self.complete = channel.unary_unary(
'/ipython.IPython/complete',
request_serializer=ipython__pb2.CompletionRequest.SerializeToString,
response_deserializer=ipython__pb2.CompletionResponse.FromString,
)
self.cancel = channel.unary_unary(
'/ipython.IPython/cancel',
request_serializer=ipython__pb2.CancelRequest.SerializeToString,
response_deserializer=ipython__pb2.CancelResponse.FromString,
)
self.status = channel.unary_unary(
'/ipython.IPython/status',
request_serializer=ipython__pb2.StatusRequest.SerializeToString,
response_deserializer=ipython__pb2.StatusResponse.FromString,
)
self.stop = channel.unary_unary(
'/ipython.IPython/stop',
request_serializer=ipython__pb2.StopRequest.SerializeToString,
response_deserializer=ipython__pb2.StopResponse.FromString,
)
class IPythonServicer(object):
"""The IPython service definition.
"""
def execute(self, request, context):
"""Sends code
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def complete(self, request, context):
"""Get completion
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def cancel(self, request, context):
"""Cancel the running statement
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def status(self, request, context):
"""Get ipython kernel status
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def stop(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_IPythonServicer_to_server(servicer, server):
rpc_method_handlers = {
'execute': grpc.unary_stream_rpc_method_handler(
servicer.execute,
request_deserializer=ipython__pb2.ExecuteRequest.FromString,
response_serializer=ipython__pb2.ExecuteResponse.SerializeToString,
),
'complete': grpc.unary_unary_rpc_method_handler(
servicer.complete,
request_deserializer=ipython__pb2.CompletionRequest.FromString,
response_serializer=ipython__pb2.CompletionResponse.SerializeToString,
),
'cancel': grpc.unary_unary_rpc_method_handler(
servicer.cancel,
request_deserializer=ipython__pb2.CancelRequest.FromString,
response_serializer=ipython__pb2.CancelResponse.SerializeToString,
),
'status': grpc.unary_unary_rpc_method_handler(
servicer.status,
request_deserializer=ipython__pb2.StatusRequest.FromString,
response_serializer=ipython__pb2.StatusResponse.SerializeToString,
),
'stop': grpc.unary_unary_rpc_method_handler(
servicer.stop,
request_deserializer=ipython__pb2.StopRequest.FromString,
response_serializer=ipython__pb2.StopResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'ipython.IPython', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))

View file

@ -0,0 +1,155 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import jupyter_client
import sys
import threading
import time
from concurrent import futures
import grpc
import ipython_pb2
import ipython_pb2_grpc
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
is_py2 = sys.version[0] == '2'
if is_py2:
import Queue as queue
else:
import queue as queue
TIMEOUT = 30
class IPython(ipython_pb2_grpc.IPythonServicer):
def __init__(self, server):
self._status = ipython_pb2.STARTING
self._server = server
def start(self):
print("starting...")
sys.stdout.flush()
self._km, self._kc = jupyter_client.manager.start_new_kernel(kernel_name='python')
self._status = ipython_pb2.RUNNING
def execute(self, request, context):
print("execute code: " + request.code)
sys.stdout.flush()
stdout_queue = queue.Queue(maxsize = 10)
stderr_queue = queue.Queue(maxsize = 10)
image_queue = queue.Queue(maxsize = 5)
def _output_hook(msg):
msg_type = msg['header']['msg_type']
content = msg['content']
if msg_type == 'stream':
stdout_queue.put(content['text'])
elif msg_type in ('display_data', 'execute_result'):
stdout_queue.put(content['data'].get('text/plain', ''))
if 'image/png' in content['data']:
image_queue.put(content['data']['image/png'])
elif msg_type == 'error':
stderr_queue.put('\n'.join(content['traceback']))
payload_reply = []
def execute_worker():
reply = self._kc.execute_interactive(request.code,
output_hook=_output_hook,
timeout=TIMEOUT)
payload_reply.append(reply)
t = threading.Thread(name="ConsumerThread", target=execute_worker)
t.start()
while t.is_alive():
while not stdout_queue.empty():
output = stdout_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.TEXT,
output=output)
while not stderr_queue.empty():
output = stderr_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
type=ipython_pb2.TEXT,
output=output)
while not image_queue.empty():
output = image_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.IMAGE,
output=output)
while not stdout_queue.empty():
output = stdout_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.TEXT,
output=output)
while not stderr_queue.empty():
output = stderr_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
type=ipython_pb2.TEXT,
output=output)
while not image_queue.empty():
output = image_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.IMAGE,
output=output)
if payload_reply:
result = []
for payload in payload_reply[0]['content']['payload']:
if payload['data']['text/plain']:
result.append(payload['data']['text/plain'])
if result:
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.TEXT,
output='\n'.join(result))
def cancel(self, request, context):
self._km.interrupt_kernel()
return ipython_pb2.CancelResponse()
def complete(self, request, context):
reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=TIMEOUT)
return ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
def status(self, request, context):
return ipython_pb2.StatusResponse(status = self._status)
def stop(self, request, context):
self._server.stop(0)
sys.exit(0)
def serve(port):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
ipython = IPython(server)
ipython_pb2_grpc.add_IPythonServicer_to_server(ipython, server)
server.add_insecure_port('[::]:' + port)
server.start()
ipython.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve(sys.argv[1])

View file

@ -0,0 +1,107 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from io import BytesIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
class PyZeppelinContext(object):
""" A context impl that uses Py4j to communicate to JVM
"""
def __init__(self, z):
self.z = z
self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
self.javaList = gateway.jvm.java.util.ArrayList
self.max_result = z.getMaxResult()
def input(self, name, defaultValue=""):
return self.z.getGui().input(name, defaultValue)
def select(self, name, options, defaultValue=""):
javaOptions = gateway.new_array(self.paramOption, len(options))
i = 0
for tuple in options:
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
i += 1
return self.z.getGui().select(name, defaultValue, javaOptions)
def checkbox(self, name, options, defaultChecked=[]):
javaOptions = gateway.new_array(self.paramOption, len(options))
i = 0
for tuple in options:
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
i += 1
javaDefaultCheck = self.javaList()
for check in defaultChecked:
javaDefaultCheck.append(check)
return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions)
def show(self, p, **kwargs):
if type(p).__name__ == "DataFrame": # does not play well with sub-classes
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
# and so a dependency on pandas
self.show_dataframe(p, **kwargs)
elif hasattr(p, '__call__'):
p() #error reporting
def show_dataframe(self, df, show_index=False, **kwargs):
"""Pretty prints DF using Table Display System
"""
limit = len(df) > self.max_result
header_buf = StringIO("")
if show_index:
idx_name = str(df.index.name) if df.index.name is not None else ""
header_buf.write(idx_name + "\t")
header_buf.write(str(df.columns[0]))
for col in df.columns[1:]:
header_buf.write("\t")
header_buf.write(str(col))
header_buf.write("\n")
body_buf = StringIO("")
rows = df.head(self.max_result).values if limit else df.values
index = df.index.values
for idx, row in zip(index, rows):
if show_index:
body_buf.write("%html <strong>{}</strong>".format(idx))
body_buf.write("\t")
body_buf.write(str(row[0]))
for cell in row[1:]:
body_buf.write("\t")
body_buf.write(str(cell))
body_buf.write("\n")
body_buf.seek(0); header_buf.seek(0)
#TODO(bzz): fix it, so it shows red notice, as in Spark
print("%table " + header_buf.read() + body_buf.read()) # +
# ("\n<font color=red>Results are limited by {}.</font>" \
# .format(self.max_result) if limit else "")
#)
body_buf.close(); header_buf.close()
# start JVM gateway
client = GatewayClient(address='127.0.0.1', port=${JVM_GATEWAY_PORT})
gateway = JavaGateway(client)
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
intp = gateway.entry_point
z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext())

View file

@ -17,6 +17,29 @@
"defaultValue": "1000",
"description": "Max number of dataframe rows to display.",
"type": "number"
},
"zeppelin.python.useIPython": {
"propertyName": "zeppelin.python.useIPython",
"defaultValue": true,
"description": "whether use IPython when it is available",
"type": "checkbox"
}
},
"editor": {
"language": "python",
"editOnDblClick": false
}
},
{
"group": "python",
"name": "ipython",
"className": "org.apache.zeppelin.python.IPythonInterpreter",
"properties": {
"zeppelin.ipython.launch.timeout": {
"propertyName": "zeppelin.ipython.launch.timeout",
"defaultValue": "30000",
"description": "time out for ipython launch",
"type": "number"
}
},
"editor": {

View file

@ -0,0 +1,402 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.ui.CheckBox;
import org.apache.zeppelin.display.ui.Select;
import org.apache.zeppelin.display.ui.TextBox;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
public class IPythonInterpreterTest {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreterTest.class);
private IPythonInterpreter interpreter;
@Before
public void setUp() {
Properties properties = new Properties();
interpreter = new IPythonInterpreter(properties);
InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class);
interpreter.setInterpreterGroup(mockInterpreterGroup);
interpreter.open();
}
@After
public void close() {
interpreter.close();
}
@Test
public void testIPython() throws IOException, InterruptedException {
testInterpreter(interpreter);
}
public static void testInterpreter(final Interpreter interpreter) throws IOException, InterruptedException {
// to make this test can run under both python2 and python3
InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// single output without print
InterpreterContext context = getInterpreterContext();
result = interpreter.interpret("'hello world'", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertEquals("'hello world'", interpreterResultMessages.get(0).getData());
// only the last statement is printed
context = getInterpreterContext();
result = interpreter.interpret("'hello world'\n'hello world2'", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertEquals("'hello world2'", interpreterResultMessages.get(0).getData());
// single output
context = getInterpreterContext();
result = interpreter.interpret("print('hello world')", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertEquals("hello world\n", interpreterResultMessages.get(0).getData());
// multiple output
context = getInterpreterContext();
result = interpreter.interpret("print('hello world')\nprint('hello world2')", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertEquals("hello world\nhello world2\n", interpreterResultMessages.get(0).getData());
// assignment
context = getInterpreterContext();
result = interpreter.interpret("abc=1",context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(0, interpreterResultMessages.size());
// if block
context = getInterpreterContext();
result = interpreter.interpret("if abc > 0:\n\tprint('True')\nelse:\n\tprint('False')", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertEquals("True\n", interpreterResultMessages.get(0).getData());
// for loop
context = getInterpreterContext();
result = interpreter.interpret("for i in range(3):\n\tprint(i)", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertEquals("0\n1\n2\n", interpreterResultMessages.get(0).getData());
// syntax error
context = getInterpreterContext();
result = interpreter.interpret("print(unknown)", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.ERROR, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertTrue(interpreterResultMessages.get(0).getData().contains("name 'unknown' is not defined"));
// raise runtime exception
context = getInterpreterContext();
result = interpreter.interpret("1/0", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.ERROR, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertTrue(interpreterResultMessages.get(0).getData().contains("ZeroDivisionError"));
// ZEPPELIN-1133
context = getInterpreterContext();
result = interpreter.interpret("def greet(name):\n" +
" print('Hello', name)\n" +
"greet('Jack')", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertEquals("Hello Jack\n",interpreterResultMessages.get(0).getData());
// ZEPPELIN-1114
context = getInterpreterContext();
result = interpreter.interpret("print('there is no Error: ok')", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertEquals("there is no Error: ok\n", interpreterResultMessages.get(0).getData());
// completion
context = getInterpreterContext();
List<InterpreterCompletion> completions = interpreter.completion("ab", 2, context);
assertEquals(2, completions.size());
assertEquals("abc", completions.get(0).getValue());
assertEquals("abs", completions.get(1).getValue());
context = getInterpreterContext();
interpreter.interpret("import sys", context);
completions = interpreter.completion("sys.", 4, context);
assertFalse(completions.isEmpty());
context = getInterpreterContext();
completions = interpreter.completion("sys.std", 7, context);
assertEquals(3, completions.size());
assertEquals("sys.stderr", completions.get(0).getValue());
assertEquals("sys.stdin", completions.get(1).getValue());
assertEquals("sys.stdout", completions.get(2).getValue());
// there's no completion for 'a.' because it is not recognized by compiler for now.
context = getInterpreterContext();
String st = "a='hello'\na.";
completions = interpreter.completion(st, st.length(), context);
assertEquals(0, completions.size());
// define `a` first
context = getInterpreterContext();
st = "a='hello'";
result = interpreter.interpret(st, context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(0, interpreterResultMessages.size());
// now we can get the completion for `a.`
context = getInterpreterContext();
st = "a.";
completions = interpreter.completion(st, st.length(), context);
// it is different for python2 and python3 and may even different for different minor version
// so only verify it is larger than 20
assertTrue(completions.size() > 20);
context = getInterpreterContext();
st = "a.co";
completions = interpreter.completion(st, st.length(), context);
assertEquals(1, completions.size());
assertEquals("a.count", completions.get(0).getValue());
// cursor is in the middle of code
context = getInterpreterContext();
st = "a.co\b='hello";
completions = interpreter.completion(st, 4, context);
assertEquals(1, completions.size());
assertEquals("a.count", completions.get(0).getValue());
// ipython help
context = getInterpreterContext();
result = interpreter.interpret("range?", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertTrue(interpreterResultMessages.get(0).getData().contains("range(stop)"));
// timeit
context = getInterpreterContext();
result = interpreter.interpret("%timeit range(100)", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertTrue(interpreterResultMessages.get(0).getData().contains("loops"));
// cancel
final InterpreterContext context2 = getInterpreterContext();
new Thread() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
interpreter.cancel(context2);
}
}.start();
result = interpreter.interpret("import time\ntime.sleep(10)", context2);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.ERROR, result.code());
interpreterResultMessages = context2.out.getInterpreterResultMessages();
assertTrue(interpreterResultMessages.get(0).getData().contains("KeyboardInterrupt"));
// matplotlib
context = getInterpreterContext();
result = interpreter.interpret("%matplotlib inline\nimport matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
// the order of IMAGE and TEXT is not determined
// check there must be one IMAGE output
boolean hasImageOutput = false;
for (InterpreterResultMessage msg : interpreterResultMessages) {
if (msg.getType() == InterpreterResult.Type.IMG) {
hasImageOutput = true;
}
}
assertTrue("No Image Output", hasImageOutput);
// bokeh
// bokeh initialization
context = getInterpreterContext();
result = interpreter.interpret("from bokeh.io import output_notebook, show\n" +
"from bokeh.plotting import figure\n" +
"output_notebook(notebook_type='zeppelin')", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(2, interpreterResultMessages.size());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
assertTrue(interpreterResultMessages.get(0).getData().contains("Loading BokehJS"));
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
assertTrue(interpreterResultMessages.get(1).getData().contains("BokehJS is being loaded"));
// bokeh plotting
context = getInterpreterContext();
result = interpreter.interpret("from bokeh.plotting import figure, output_file, show\n" +
"x = [1, 2, 3, 4, 5]\n" +
"y = [6, 7, 2, 4, 5]\n" +
"p = figure(title=\"simple line example\", x_axis_label='x', y_axis_label='y')\n" +
"p.line(x, y, legend=\"Temp.\", line_width=2)\n" +
"show(p)", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
// docs_json is the source data of plotting which bokeh would use to render the plotting.
assertTrue(interpreterResultMessages.get(0).getData().contains("docs_json"));
// ggplot
context = getInterpreterContext();
result = interpreter.interpret("from ggplot import *\n" +
"ggplot(diamonds, aes(x='price', fill='cut')) +\\\n" +
" geom_density(alpha=0.25) +\\\n" +
" facet_wrap(\"clarity\")", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
// the order of IMAGE and TEXT is not determined
// check there must be one IMAGE output
hasImageOutput = false;
for (InterpreterResultMessage msg : interpreterResultMessages) {
if (msg.getType() == InterpreterResult.Type.IMG) {
hasImageOutput = true;
}
}
assertTrue("No Image Output", hasImageOutput);
// ZeppelinContext
// TextBox
context = getInterpreterContext();
result = interpreter.interpret("z.input(name='text_1', defaultValue='value_1')", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertTrue(interpreterResultMessages.get(0).getData().contains("'value_1'"));
assertEquals(1, context.getGui().getForms().size());
assertTrue(context.getGui().getForms().get("text_1") instanceof TextBox);
TextBox textbox = (TextBox) context.getGui().getForms().get("text_1");
assertEquals("text_1", textbox.getName());
assertEquals("value_1", textbox.getDefaultValue());
// Select
context = getInterpreterContext();
result = interpreter.interpret("z.select(name='select_1', options=[('value_1', 'name_1'), ('value_2', 'name_2')])", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, context.getGui().getForms().size());
assertTrue(context.getGui().getForms().get("select_1") instanceof Select);
Select select = (Select) context.getGui().getForms().get("select_1");
assertEquals("select_1", select.getName());
assertEquals(2, select.getOptions().length);
assertEquals("name_1", select.getOptions()[0].getDisplayName());
assertEquals("value_1", select.getOptions()[0].getValue());
// CheckBox
context = getInterpreterContext();
result = interpreter.interpret("z.checkbox(name='checkbox_1', options=[('value_1', 'name_1'), ('value_2', 'name_2')])", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, context.getGui().getForms().size());
assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox);
CheckBox checkbox = (CheckBox) context.getGui().getForms().get("checkbox_1");
assertEquals("checkbox_1", checkbox.getName());
assertEquals(2, checkbox.getOptions().length);
assertEquals("name_1", checkbox.getOptions()[0].getDisplayName());
assertEquals("value_1", checkbox.getOptions()[0].getValue());
// Pandas DataFrame
context = getInterpreterContext();
result = interpreter.interpret("import pandas as pd\ndf = pd.DataFrame({'id':[1,2,3], 'name':['a','b','c']})\nz.show(df)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(InterpreterResult.Type.TABLE, interpreterResultMessages.get(0).getType());
assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", interpreterResultMessages.get(0).getData());
}
private static InterpreterContext getInterpreterContext() {
return new InterpreterContext(
"noteId",
"paragraphId",
"replName",
"paragraphTitle",
"paragraphText",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
null,
null,
null,
new InterpreterOutput(null));
}
}

View file

@ -53,6 +53,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene
Properties p = new Properties();
p.setProperty("zeppelin.python", "python");
p.setProperty("zeppelin.python.maxResult", "100");
p.setProperty("zeppelin.python.useIPython", "false");
intpGroup = new InterpreterGroup();

View file

@ -73,9 +73,21 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener
Properties p = new Properties();
p.setProperty("zeppelin.python", "python");
p.setProperty("zeppelin.python.maxResult", "100");
p.setProperty("zeppelin.python.useIPython", "false");
intpGroup = new InterpreterGroup();
out = new InterpreterOutput(this);
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
out);
InterpreterContext.set(context);
python = new PythonInterpreter(p);
python.setInterpreterGroup(intpGroup);
python.open();
@ -85,16 +97,7 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener
intpGroup.put("note", Arrays.asList(python, sql));
out = new InterpreterOutput(this);
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
out);
// to make sure python is running.
InterpreterResult ret = python.interpret("\n", context);
@ -140,10 +143,10 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener
ret = sql.interpret("select name, age from df2 where age < 40", context);
//then
assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType());
assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("moon\t33") > 0);
assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("park\t34") > 0);
assertEquals(new String(out.getOutputAt(1).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(new String(out.getOutputAt(1).toByteArray()), Type.TABLE, out.getOutputAt(1).getType());
assertTrue(new String(out.getOutputAt(1).toByteArray()).indexOf("moon\t33") > 0);
assertTrue(new String(out.getOutputAt(1).toByteArray()).indexOf("park\t34") > 0);
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from df2", context).code());
}
@ -156,7 +159,6 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener
//then
assertNotNull("Interpreter returned 'null'", ret);
assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
assertTrue(out.toInterpreterResultMessage().size() == 0);
}
@Test
@ -176,10 +178,10 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener
// then
assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType());
assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("index_name"));
assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("nan"));
assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("6.7"));
assertEquals(new String(out.getOutputAt(1).toByteArray()), Type.TABLE, out.getOutputAt(1).getType());
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("index_name"));
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("nan"));
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("6.7"));
}
@Override

View file

@ -59,6 +59,7 @@ public class PythonInterpreterTest implements InterpreterOutputListener {
Properties p = new Properties();
p.setProperty(ZEPPELIN_PYTHON, DEFAULT_ZEPPELIN_PYTHON);
p.setProperty(MAX_RESULT, "1000");
p.setProperty("zeppelin.python.useIPython", "false");
return p;
}
@ -85,6 +86,7 @@ public class PythonInterpreterTest implements InterpreterOutputListener {
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
out);
InterpreterContext.set(context);
pythonInterpreter.open();
}

View file

@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
#log4j.appender.stdout.layout.ConversionPattern=
#%5p [%t] (%F:%L) - %m%n
#%-4r [%t] %-5p %c %x - %m%n
#
# Root logger option
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG
log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG

View file

@ -71,6 +71,32 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-python</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-python</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@ -95,12 +121,6 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- Aether :: maven dependency resolution -->
<dependency>
<groupId>org.apache.maven</groupId>
@ -355,6 +375,12 @@
<exclude>**/SparkRInterpreterTest.java</exclude>
<exclude>${pyspark.test.exclude}</exclude>
</excludes>
<environmentVariables>
<!-- local pyspark execution needs PYTHONPATH otherwise python daemon in executor side will fail
e.g. sc.range(1,10).sum()
-->
<PYTHONPATH>../interpreter/spark/pyspark/pyspark.zip:../interpreter/spark/pyspark/py4j-${spark.py4j.version}-src.zip:../interpreter/lib/python</PYTHONPATH>
</environmentVariables>
</configuration>
</plugin>
@ -379,6 +405,19 @@
<resource>reference.conf</resource>
</transformer>
</transformers>
<relocations>
<!-- shade guava and proto-buf, because it might conflict with those of spark -->
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.zeppelin.com.google</shadedPattern>
</relocation>
<!-- shade netty, because it might conflict with that of spark-->
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.zeppelin.io.netty</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
<execution>

View file

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
/**
* PySparkInterpreter which use IPython underlying.
*/
public class IPySparkInterpreter extends IPythonInterpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(IPySparkInterpreter.class);
private SparkInterpreter sparkInterpreter;
public IPySparkInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
getProperty().setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
sparkInterpreter = getSparkInterpreter();
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
String additionalPythonPath = conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
":../interpreter/lib/python";
setAdditionalPythonPath(additionalPythonPath);
setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
super.open();
}
private SparkInterpreter getSparkInterpreter() {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
}
@Override
public void cancel(InterpreterContext context) {
super.cancel(context);
sparkInterpreter.cancel(context);
}
@Override
public void close() {
super.close();
if (sparkInterpreter != null) {
sparkInterpreter.close();
}
}
@Override
public int getProgress(InterpreterContext context) {
return sparkInterpreter.getProgress(context);
}
public boolean isSpark2() {
return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0);
}
public JavaSparkContext getJavaSparkContext() {
return sparkInterpreter.getJavaSparkContext();
}
public Object getSQLContext() {
return sparkInterpreter.getSQLContext();
}
public Object getSparkSession() {
return sparkInterpreter.getSparkSession();
}
}

View file

@ -76,6 +76,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
private static final int MAX_TIMEOUT_SEC = 10;
private long pythonPid;
private IPySparkInterpreter iPySparkInterpreter;
public PySparkInterpreter(Properties property) {
super(property);
@ -111,6 +113,37 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public void open() {
// try IPySparkInterpreter first
iPySparkInterpreter = getIPySparkInterpreter();
if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true") &&
iPySparkInterpreter.checkIPythonPrerequisite()) {
try {
iPySparkInterpreter.open();
if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true")) {
// don't print it when it is in testing, just for easy output check in test.
InterpreterContext.get().out.write(("IPython is available, " +
"use IPython for PySparkInterpreter\n")
.getBytes());
}
LOGGER.info("Use IPySparkInterpreter to replace PySparkInterpreter");
return;
} catch (Exception e) {
LOGGER.warn("Fail to open IPySparkInterpreter", e);
}
}
iPySparkInterpreter = null;
if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true")) {
// don't print it when it is in testing, just for easy output check in test.
try {
InterpreterContext.get().out.write(("IPython is not available, " +
"use the native PySparkInterpreter\n")
.getBytes());
} catch (IOException e) {
LOGGER.warn("Fail to write InterpreterOutput", e.getMessage());
}
}
// Add matplotlib display hook
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
@ -190,9 +223,24 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
LOGGER.debug("PYTHONPATH: " + env.get("PYTHONPATH"));
return env;
}
// Run python shell
// Choose python in the order of
// PYSPARK_DRIVER_PYTHON > PYSPARK_PYTHON > zeppelin.pyspark.python
public static String getPythonExec(Properties properties) {
String pythonExec = properties.getProperty("zeppelin.pyspark.python", "python");
if (System.getenv("PYSPARK_PYTHON") != null) {
pythonExec = System.getenv("PYSPARK_PYTHON");
}
if (System.getenv("PYSPARK_DRIVER_PYTHON") != null) {
pythonExec = System.getenv("PYSPARK_DRIVER_PYTHON");
}
return pythonExec;
}
private void createGatewayServerAndStartScript() {
// create python script
createPythonScript();
@ -202,16 +250,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
gatewayServer = new GatewayServer(this, port);
gatewayServer.start();
// Run python shell
// Choose python in the order of
// PYSPARK_DRIVER_PYTHON > PYSPARK_PYTHON > zeppelin.pyspark.python
String pythonExec = getProperty("zeppelin.pyspark.python");
if (System.getenv("PYSPARK_PYTHON") != null) {
pythonExec = System.getenv("PYSPARK_PYTHON");
}
if (System.getenv("PYSPARK_DRIVER_PYTHON") != null) {
pythonExec = System.getenv("PYSPARK_DRIVER_PYTHON");
}
String pythonExec = getPythonExec(property);
CommandLine cmd = CommandLine.parse(pythonExec);
cmd.addArgument(scriptPath, false);
cmd.addArgument(Integer.toString(port), false);
@ -263,6 +302,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public void close() {
if (iPySparkInterpreter != null) {
iPySparkInterpreter.close();
return;
}
executor.getWatchdog().destroyProcess();
new File(scriptPath).delete();
gatewayServer.shutdown();
@ -353,6 +396,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
}
if (iPySparkInterpreter != null) {
return iPySparkInterpreter.interpret(st, context);
}
if (!pythonscriptRunning) {
return new InterpreterResult(Code.ERROR, "python process not running"
+ outputStream.toString());
@ -448,6 +495,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public void cancel(InterpreterContext context) {
if (iPySparkInterpreter != null) {
iPySparkInterpreter.cancel(context);
return;
}
SparkInterpreter sparkInterpreter = getSparkInterpreter();
sparkInterpreter.cancel(context);
try {
@ -464,6 +515,9 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public int getProgress(InterpreterContext context) {
if (iPySparkInterpreter != null) {
return iPySparkInterpreter.getProgress(context);
}
SparkInterpreter sparkInterpreter = getSparkInterpreter();
return sparkInterpreter.getProgress(context);
}
@ -472,6 +526,9 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
if (iPySparkInterpreter != null) {
return iPySparkInterpreter.completion(buf, cursor, interpreterContext);
}
if (buf.length() < cursor) {
cursor = buf.length();
}
@ -588,6 +645,21 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
return spark;
}
private IPySparkInterpreter getIPySparkInterpreter() {
LazyOpenInterpreter lazy = null;
IPySparkInterpreter iPySpark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
iPySpark = (IPySparkInterpreter) p;
return iPySpark;
}
public SparkZeppelinContext getZeppelinContext() {
SparkInterpreter sparkIntp = getSparkInterpreter();
if (sparkIntp != null) {

View file

@ -149,11 +149,28 @@
"defaultValue": "python",
"description": "Python command to run pyspark with",
"type": "string"
},
"zeppelin.spark.useIPython": {
"envName": null,
"propertyName": "zeppelin.spark.useIPython",
"defaultValue": true,
"description": "whether use IPython when it is available",
"type": "checkbox"
}
},
"editor": {
"language": "python",
"editOnDblClick": false
}
},
{
"group": "spark",
"name": "ipyspark",
"className": "org.apache.zeppelin.spark.IPySparkInterpreter",
"properties": {},
"editor": {
"language": "python",
"editOnDblClick": false
}
}
]

View file

@ -0,0 +1,53 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
# for back compatibility
from pyspark.sql import SQLContext
# start JVM gateway
client = GatewayClient(port=${JVM_GATEWAY_PORT})
gateway = JavaGateway(client, auto_convert=True)
java_import(gateway.jvm, "org.apache.spark.SparkEnv")
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
intp = gateway.entry_point
jsc = intp.getJavaSparkContext()
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
jconf = jsc.getConf()
conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
if intp.isSpark2():
from pyspark.sql import SparkSession
spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
else:
sqlContext = sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())

View file

@ -189,5 +189,16 @@
"editor": {
"language": "r"
}
},
{
"group": "spark",
"name": "ipyspark",
"className": "org.apache.zeppelin.spark.IPySparkInterpreter",
"properties": {},
"editor": {
"language": "python",
"editOnDblClick": false
}
}
]

View file

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import com.google.common.io.Files;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.python.IPythonInterpreterTest;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class IPySparkInterpreterTest {
private IPySparkInterpreter iPySparkInterpreter;
private InterpreterGroup intpGroup;
@Before
public void setup() {
Properties p = new Properties();
p.setProperty("spark.master", "local[4]");
p.setProperty("master", "local[4]");
p.setProperty("spark.app.name", "Zeppelin Test");
p.setProperty("zeppelin.spark.useHiveContext", "true");
p.setProperty("zeppelin.spark.maxResult", "1000");
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
p.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath());
intpGroup = new InterpreterGroup();
intpGroup.put("session_1", new LinkedList<Interpreter>());
SparkInterpreter sparkInterpreter = new SparkInterpreter(p);
intpGroup.get("session_1").add(sparkInterpreter);
sparkInterpreter.setInterpreterGroup(intpGroup);
sparkInterpreter.open();
iPySparkInterpreter = new IPySparkInterpreter(p);
intpGroup.get("session_1").add(iPySparkInterpreter);
iPySparkInterpreter.setInterpreterGroup(intpGroup);
iPySparkInterpreter.open();
}
@After
public void tearDown() {
if (iPySparkInterpreter != null) {
iPySparkInterpreter.close();
}
}
@Test
public void testBasics() throws InterruptedException, IOException {
// all the ipython test should pass too.
IPythonInterpreterTest.testInterpreter(iPySparkInterpreter);
// rdd
InterpreterContext context = getInterpreterContext();
InterpreterResult result = iPySparkInterpreter.interpret("sc.range(1,10).sum()", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals("45", interpreterResultMessages.get(0).getData());
context = getInterpreterContext();
result = iPySparkInterpreter.interpret("sc.version", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
// spark sql
context = getInterpreterContext();
if (interpreterResultMessages.get(0).getData().startsWith("'1.") ||
interpreterResultMessages.get(0).getData().startsWith("u'1.")) {
result = iPySparkInterpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(
"+---+---+\n" +
"| _1| _2|\n" +
"+---+---+\n" +
"| 1| a|\n" +
"| 2| b|\n" +
"+---+---+\n\n", interpreterResultMessages.get(0).getData());
} else {
result = iPySparkInterpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(
"+---+---+\n" +
"| _1| _2|\n" +
"+---+---+\n" +
"| 1| a|\n" +
"| 2| b|\n" +
"+---+---+\n\n", interpreterResultMessages.get(0).getData());
}
// cancel
final InterpreterContext context2 = getInterpreterContext();
Thread thread = new Thread(){
@Override
public void run() {
InterpreterResult result = iPySparkInterpreter.interpret("import time\nsc.range(1,10).foreach(lambda x: time.sleep(1))", context2);
assertEquals(InterpreterResult.Code.ERROR, result.code());
List<InterpreterResultMessage> interpreterResultMessages = null;
try {
interpreterResultMessages = context2.out.getInterpreterResultMessages();
assertTrue(interpreterResultMessages.get(0).getData().contains("cancelled"));
} catch (IOException e) {
e.printStackTrace();
}
}
};
thread.start();
// sleep 1 second to wait for the spark job starts
Thread.sleep(1000);
iPySparkInterpreter.cancel(context);
thread.join();
// completions
List<InterpreterCompletion> completions = iPySparkInterpreter.completion("sc.ran", 6, getInterpreterContext());
assertEquals(1, completions.size());
assertEquals("sc.range", completions.get(0).getValue());
// pyspark streaming
context = getInterpreterContext();
result = iPySparkInterpreter.interpret(
"from pyspark.streaming import StreamingContext\n" +
"import time\n" +
"ssc = StreamingContext(sc, 1)\n" +
"rddQueue = []\n" +
"for i in range(5):\n" +
" rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" +
"inputStream = ssc.queueStream(rddQueue)\n" +
"mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" +
"reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" +
"reducedStream.pprint()\n" +
"ssc.start()\n" +
"time.sleep(6)\n" +
"ssc.stop(stopSparkContext=False, stopGraceFully=True)", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(1, interpreterResultMessages.size());
assertTrue(interpreterResultMessages.get(0).getData().contains("(0, 100)"));
}
private InterpreterContext getInterpreterContext() {
return new InterpreterContext(
"noteId",
"paragraphId",
"replName",
"paragraphTitle",
"paragraphText",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
null,
null,
null,
new InterpreterOutput(null));
}
}

View file

@ -89,6 +89,7 @@ public class PySparkInterpreterMatplotlibTest {
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
p.setProperty("zeppelin.spark.useIPython", "false");
return p;
}
@ -110,6 +111,15 @@ public class PySparkInterpreterMatplotlibTest {
public static void setUp() throws Exception {
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(null));
InterpreterContext.set(context);
sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
intpGroup.get("note").add(sparkInterpreter);
@ -121,14 +131,6 @@ public class PySparkInterpreterMatplotlibTest {
pyspark.setInterpreterGroup(intpGroup);
pyspark.open();
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(null));
}
@AfterClass

View file

@ -59,6 +59,7 @@ public class PySparkInterpreterTest {
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
p.setProperty("zeppelin.spark.useIPython", "false");
return p;
}
@ -81,6 +82,16 @@ public class PySparkInterpreterTest {
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(null));
InterpreterContext.set(context);
sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
intpGroup.get("note").add(sparkInterpreter);
sparkInterpreter.setInterpreterGroup(intpGroup);
@ -91,14 +102,7 @@ public class PySparkInterpreterTest {
pySparkInterpreter.setInterpreterGroup(intpGroup);
pySparkInterpreter.open();
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(null));
}
@AfterClass
@ -113,6 +117,22 @@ public class PySparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS,
pySparkInterpreter.interpret("a = 1\n", context).code());
}
InterpreterResult result = pySparkInterpreter.interpret(
"from pyspark.streaming import StreamingContext\n" +
"import time\n" +
"ssc = StreamingContext(sc, 1)\n" +
"rddQueue = []\n" +
"for i in range(5):\n" +
" rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" +
"inputStream = ssc.queueStream(rddQueue)\n" +
"mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" +
"reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" +
"reducedStream.pprint()\n" +
"ssc.start()\n" +
"time.sleep(6)\n" +
"ssc.stop(stopSparkContext=False, stopGraceFully=True)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
@Test

View file

@ -45,3 +45,5 @@ log4j.logger.org.hibernate.type=ALL
log4j.logger.org.apache.zeppelin.interpreter=DEBUG
log4j.logger.org.apache.zeppelin.spark=DEBUG
log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG
log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG

View file

@ -44,5 +44,6 @@ if [[ -n "$PYTHON" ]] ; then
conda update -q conda
conda info -a
conda config --add channels conda-forge
conda install -q matplotlib pandasql
conda install -q matplotlib pandasql ipython jupyter_client ipykernel matplotlib bokeh
pip install grpcio ggplot
fi

View file

@ -214,11 +214,6 @@
<version>${jline.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View file

@ -123,6 +123,10 @@ public abstract class BaseZeppelinContext {
this.gui = o;
}
public GUI getGui() {
return gui;
}
private void restartInterpreter() {
}

View file

@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@ -115,6 +116,16 @@ public class InterpreterOutput extends OutputStream {
};
}
public List<InterpreterResultMessage> getInterpreterResultMessages() throws IOException {
synchronized (resultMessageOutputs) {
List<InterpreterResultMessage> resultMessages = new ArrayList<>();
for (InterpreterResultMessageOutput output : this.resultMessageOutputs) {
resultMessages.add(output.toInterpreterResultMessage());
}
return resultMessages;
}
}
public InterpreterResultMessageOutput getCurrentOutput() {
synchronized (resultMessageOutputs) {
return currentOut;

View file

@ -513,6 +513,13 @@ public class RemoteInterpreterServer
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
resultMessages.addAll(result.message());
for (InterpreterResultMessage msg : resultMessages) {
if (msg.getType() == InterpreterResult.Type.IMG) {
logger.debug("InterpreterResultMessage: IMAGE_DATA");
} else {
logger.debug("InterpreterResultMessage: " + msg.toString());
}
}
// put result into resource pool
if (resultMessages.size() > 0) {
int lastMessageIndex = resultMessages.size() - 1;

View file

@ -29,7 +29,7 @@ import java.io.IOException;
*/
public class InterpreterOutputStream extends LogOutputStream {
private Logger logger;
InterpreterOutput interpreterOutput;
volatile InterpreterOutput interpreterOutput;
boolean ignoreLeadingNewLinesFromScalaReporter = false;
public InterpreterOutputStream(Logger logger) {
@ -78,7 +78,7 @@ public class InterpreterOutputStream extends LogOutputStream {
@Override
protected void processLine(String s, int i) {
logger.debug("Interpreter output:" + s);
// logger.debug("Interpreter output:" + s);
}
@Override

View file

@ -52,6 +52,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>

View file

@ -87,7 +87,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-zengine</artifactId>
<version>${project.version}</version>
<version>0.8.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
@ -275,6 +275,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>

View file

@ -141,6 +141,7 @@ public class WebDriverManager {
fail();
}
driver.manage().window().maximize();
return driver;
}

View file

@ -72,7 +72,7 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT {
static String interpreterOptionPath = "";
static String originalInterpreterOption = "";
static String cmdPsPython = "ps aux | grep 'zeppelin_python-' | grep -v 'grep' | wc -l";
static String cmdPsPython = "ps aux | grep 'zeppelin_ipython' | grep -v 'grep' | wc -l";
static String cmdPsInterpreter = "ps aux | grep 'zeppelin/interpreter/python/*' |" +
" sed -E '/grep|local-repo/d' | wc -l";
@ -145,19 +145,19 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT {
}
private void logoutUser(String userName) throws URISyntaxException {
pollingWait(By.xpath("//div[contains(@class, 'navbar-collapse')]//li[contains(.,'" +
userName + "')]"), MAX_BROWSER_TIMEOUT_SEC).click();
pollingWait(By.xpath("//div[contains(@class, 'navbar-collapse')]//li[contains(.,'" +
userName + "')]//a[@ng-click='navbar.logout()']"), MAX_BROWSER_TIMEOUT_SEC).click();
By locator = By.xpath("//*[@id='loginModal']//div[contains(@class, 'modal-header')]/button");
WebElement element = (new WebDriverWait(driver, MAX_BROWSER_TIMEOUT_SEC))
.until(ExpectedConditions.visibilityOfElementLocated(locator));
if (element.isDisplayed()) {
ZeppelinITUtils.sleep(500, false);
driver.findElement(By.xpath("//div[contains(@class, 'navbar-collapse')]//li[contains(.,'" +
userName + "')]")).click();
ZeppelinITUtils.sleep(500, false);
driver.findElement(By.xpath("//div[contains(@class, 'navbar-collapse')]//li[contains(.,'" +
userName + "')]//a[@ng-click='navbar.logout()']")).click();
ZeppelinITUtils.sleep(2000, false);
if (driver.findElement(By.xpath("//*[@id='loginModal']//div[contains(@class, 'modal-header')]/button"))
.isDisplayed()) {
driver.findElement(By.xpath("//*[@id='loginModal']//div[contains(@class, 'modal-header')]/button")).click();
}
driver.get(new URI(driver.getCurrentUrl()).resolve("/#/").toString());
ZeppelinITUtils.sleep(1000, false);
ZeppelinITUtils.sleep(500, false);
}
private void setPythonParagraph(int num, String text) {
@ -199,7 +199,6 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT {
"//div[@class='modal-dialog']//div[@class='bootstrap-dialog-footer-buttons']//button[contains(., 'OK')]"));
clickAndWait(By.xpath("//a[@class='navbar-brand navbar-title'][contains(@href, '#/')]"));
interpreterModeActionsIT.logoutUser("admin");
//step 2: (user1) login, create a new note, run two paragraph with 'python', check result, check process, logout
//paragraph: Check if the result is 'user1' in the second paragraph
//System: Check if the number of python interpreter process is '1'

View file

@ -138,7 +138,7 @@ public class SparkParagraphIT extends AbstractZeppelinIT {
WebElement paragraph1Result = driver.findElement(By.xpath(
getParagraphXPath(1) + "//div[contains(@id,\"_text\")]"));
collector.checkThat("Paragraph from SparkParagraphIT of testPySpark result: ",
paragraph1Result.getText().toString(), CoreMatchers.equalTo("test loop 0\ntest loop 1\ntest loop 2")
paragraph1Result.getText().toString(), CoreMatchers.containsString("test loop 0\ntest loop 1\ntest loop 2")
);
// the last statement's evaluation result is printed

View file

@ -211,6 +211,7 @@ public abstract class AbstractTestRestApi {
// set spark home for pyspark
sparkProperties.put("spark.home",
new InterpreterProperty("spark.home", getSparkHome(), InterpreterPropertyType.TEXTAREA.getValue()));
sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
sparkIntpSetting.setProperties(sparkProperties);
pySpark = true;
@ -233,6 +234,8 @@ public abstract class AbstractTestRestApi {
new InterpreterProperty("spark.home", sparkHome, InterpreterPropertyType.TEXTAREA.getValue()));
sparkProperties.put("zeppelin.spark.useHiveContext",
new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue()));
sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
pySpark = true;
sparkR = true;
}

View file

@ -271,7 +271,8 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(len=u'3')]\n", p.getResult().message().get(0).getData());
assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) ||
"[Row(len='3')]\n".equals(p.getResult().message().get(0).getData()));
// test exception
p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@ -321,7 +322,8 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(len=u'3')]\n", p.getResult().message().get(0).getData());
assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) ||
"[Row(len='3')]\n".equals(p.getResult().message().get(0).getData()));
}
}
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);

View file

@ -57,7 +57,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<version>0.8.0-SNAPSHOT</version>
</dependency>
<dependency>
@ -171,6 +171,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>
@ -262,6 +263,12 @@
<artifactId>truth</artifactId>
<version>${google.truth.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -365,6 +372,10 @@
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>