mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge remote-tracking branch 'origin/master' into ZEPPELIN-212
This commit is contained in:
commit
f5034b8c6e
43 changed files with 3304 additions and 1119 deletions
17
.travis.yml
17
.travis.yml
|
|
@ -66,16 +66,22 @@ matrix:
|
|||
- jdk: "oraclejdk7"
|
||||
env: TEST_SELENIUM="true" SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark -Pexamples" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.AbstractFunctionalSuite -DfailIfNoTests=false"
|
||||
|
||||
# Test python/pyspark with python 2
|
||||
- jdk: "oraclejdk7"
|
||||
env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -pl spark,python -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python -Dtest=org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false"
|
||||
|
||||
# Test python/pyspark with python 3
|
||||
- jdk: "oraclejdk7"
|
||||
env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Phadoop-2.3 -Ppyspark -Pscala-2.11" BUILD_FLAG="package -pl spark,python -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python -Dtest=org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false"
|
||||
|
||||
before_install:
|
||||
- echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxPermSize=1024m -XX:-UseGCOverheadLimit'" >> ~/.mavenrc
|
||||
- "ls -la .spark-dist ${HOME}/.m2/repository/.cache/maven-download-plugin"
|
||||
- ./testing/install_external_dependencies.sh
|
||||
- ls -la .spark-dist ${HOME}/.m2/repository/.cache/maven-download-plugin || true
|
||||
- ls .node_modules && cp -r .node_modules zeppelin-web/node_modules || echo "node_modules are not cached"
|
||||
- mkdir -p ~/R
|
||||
- echo 'R_LIBS=~/R' > ~/.Renviron
|
||||
- R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', lib='~/R')"
|
||||
- export R_LIBS='~/R'
|
||||
- "/sbin/start-stop-daemon --start --quiet --pidfile /tmp/custom_xvfb_99.pid --make-pidfile --background --exec /usr/bin/Xvfb -- :99 -ac -screen 0 1600x1024x16"
|
||||
- ./dev/change_scala_version.sh $SCALA_VER
|
||||
- source ~/.environ
|
||||
|
||||
install:
|
||||
- mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=warn $BUILD_FLAG $PROFILE -B
|
||||
|
|
@ -99,4 +105,3 @@ after_failure:
|
|||
- cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.out
|
||||
- cat zeppelin-web/npm-debug.log
|
||||
- cat spark-*/logs/*
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ user3 = password4, role2
|
|||
# Sample LDAP configuration, for user Authentication, currently tested for single Realm
|
||||
[main]
|
||||
### A sample for configuring Active Directory Realm
|
||||
#activeDirectoryRealm = org.apache.zeppelin.server.ActiveDirectoryGroupRealm
|
||||
#activeDirectoryRealm = org.apache.zeppelin.realm.ActiveDirectoryGroupRealm
|
||||
#activeDirectoryRealm.systemUsername = userNameA
|
||||
|
||||
#use either systemPassword or hadoopSecurityCredentialPath, more details in http://zeppelin.apache.org/docs/latest/security/shiroauthentication.html
|
||||
|
|
@ -38,7 +38,7 @@ user3 = password4, role2
|
|||
#activeDirectoryRealm.authorizationCachingEnabled = false
|
||||
|
||||
### A sample for configuring LDAP Directory Realm
|
||||
#ldapRealm = org.apache.zeppelin.server.LdapGroupRealm
|
||||
#ldapRealm = org.apache.zeppelin.realm.LdapGroupRealm
|
||||
## search base for ldap groups (only relevant for LdapGroupRealm):
|
||||
#ldapRealm.contextFactory.environment[ldap.searchBase] = dc=COMPANY,dc=COM
|
||||
#ldapRealm.contextFactory.url = ldap://ldap.test.com:389
|
||||
|
|
@ -74,7 +74,7 @@ admin = *
|
|||
|
||||
[urls]
|
||||
# This section is used for url-based security.
|
||||
# You can secure interpreter, configuration and credential information by urls. Comment or uncomment the below urls that you want to hide.
|
||||
# You can secure interpreter, configuration and credential information by urls. Comment or uncomment the below urls that you want to hide.
|
||||
# anon means the access is anonymous.
|
||||
# authc means Form based Auth Security
|
||||
# To enfore security, comment the line below and uncomment the next one
|
||||
|
|
|
|||
|
|
@ -150,7 +150,17 @@ How does the front-end AngularJS API compares to the [back-end API](./back-end-a
|
|||
<td>Executing Paragraph</td>
|
||||
<td>z.runParagraph(paragraphId)</td>
|
||||
<td>z.run(paragraphId)</td>
|
||||
</tr>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Executing Paragraph (Specific paragraphs in other notes) (</td>
|
||||
<td></td>
|
||||
<td>z.run(noteid, paragraphId)</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Executing note</td>
|
||||
<td></td>
|
||||
<td>z.runNote(noteId)</td>
|
||||
</tr>
|
||||
<tbody>
|
||||
<tbody>
|
||||
</table>
|
||||
|
|
|
|||
|
|
@ -25,7 +25,11 @@ limitations under the License.
|
|||
|
||||
## Overview
|
||||
|
||||
JDBC interpreter lets you create a JDBC connection to any data sources seamlessly. By now, it has been tested with:
|
||||
JDBC interpreter lets you create a JDBC connection to any data sources seamlessly.
|
||||
|
||||
Inserts, Updates, and Upserts are applied immediately after running each statement.
|
||||
|
||||
By now, it has been tested with:
|
||||
|
||||
<div class="row" style="margin: 30px auto;">
|
||||
<div class="col-md-6">
|
||||
|
|
|
|||
|
|
@ -93,6 +93,7 @@ class FigureCanvasZInline(FigureCanvasAgg):
|
|||
# Express the image as bytes
|
||||
buf = BytesIO()
|
||||
self.print_figure(buf, **kwargs)
|
||||
fmt = fmt.encode()
|
||||
byte_str = b"data:image/%s;base64," %fmt
|
||||
byte_str += base64.b64encode(buf.getvalue())
|
||||
|
||||
|
|
|
|||
|
|
@ -462,6 +462,8 @@ public class JDBCInterpreter extends Interpreter {
|
|||
msg.append(UPDATE_COUNT_HEADER).append(NEWLINE);
|
||||
msg.append(updateCount).append(NEWLINE);
|
||||
}
|
||||
//In case user ran an insert/update/upsert statement
|
||||
if (connection.getAutoCommit() != true) connection.commit();
|
||||
} finally {
|
||||
if (resultSet != null) {
|
||||
try {
|
||||
|
|
|
|||
2
pom.xml
2
pom.xml
|
|
@ -477,7 +477,7 @@
|
|||
<version>1.0.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>validate</phase>
|
||||
<phase>verify</phase>
|
||||
<goals>
|
||||
<goal>validate</goal>
|
||||
</goals>
|
||||
|
|
|
|||
|
|
@ -57,6 +57,7 @@ public class PythonInterpreter extends Interpreter {
|
|||
private Boolean py4JisInstalled = false;
|
||||
private InterpreterContext context;
|
||||
private Pattern errorInLastLine = Pattern.compile(".*(Error|Exception): .*$");
|
||||
private String pythonPath;
|
||||
private int maxResult;
|
||||
|
||||
PythonProcess process = null;
|
||||
|
|
@ -74,6 +75,8 @@ public class PythonInterpreter extends Interpreter {
|
|||
registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
|
||||
}
|
||||
|
||||
// Add zeppelin-bundled libs to PYTHONPATH
|
||||
setPythonPath("../interpreter/lib/python:$PYTHONPATH");
|
||||
LOG.info("Starting Python interpreter ---->");
|
||||
LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON));
|
||||
|
||||
|
|
@ -198,13 +201,17 @@ public class PythonInterpreter extends Interpreter {
|
|||
return null;
|
||||
}
|
||||
|
||||
public void setPythonPath(String pythonPath) {
|
||||
this.pythonPath = pythonPath;
|
||||
}
|
||||
|
||||
public PythonProcess getPythonProcess() {
|
||||
if (process == null) {
|
||||
String binPath = getProperty(ZEPPELIN_PYTHON);
|
||||
if (pythonCommand != null) {
|
||||
binPath = pythonCommand;
|
||||
}
|
||||
return new PythonProcess(binPath);
|
||||
return new PythonProcess(binPath, pythonPath);
|
||||
} else {
|
||||
return process;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,10 +42,12 @@ public class PythonProcess {
|
|||
Process process;
|
||||
|
||||
private String binPath;
|
||||
private String pythonPath;
|
||||
private long pid;
|
||||
|
||||
public PythonProcess(String binPath) {
|
||||
public PythonProcess(String binPath, String pythonPath) {
|
||||
this.binPath = binPath;
|
||||
this.pythonPath = pythonPath;
|
||||
}
|
||||
|
||||
public void open() throws IOException {
|
||||
|
|
@ -65,6 +67,9 @@ public class PythonProcess {
|
|||
cmd = binPath + " -iu";
|
||||
}
|
||||
builder = new ProcessBuilder("bash", "-c", cmd);
|
||||
if (pythonPath != null) {
|
||||
builder.environment().put("PYTHONPATH", pythonPath);
|
||||
}
|
||||
}
|
||||
|
||||
builder.redirectErrorStream(true);
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ public class PythonInterpreterWithPythonInstalledTest {
|
|||
realPython.open();
|
||||
|
||||
//when
|
||||
InterpreterResult ret1 = realPython.interpret("print \"...\"", null);
|
||||
InterpreterResult ret1 = realPython.interpret("print(\"...\")", null);
|
||||
|
||||
//then
|
||||
//System.out.println("\nInterpreter response: \n" + ret.message());
|
||||
|
|
|
|||
|
|
@ -161,7 +161,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
|||
innerOut.setInterpreterOutput(context.out)
|
||||
|
||||
try {
|
||||
import tools.nsc.interpreter.Results._
|
||||
import scala.tools.nsc.interpreter.Results._
|
||||
REPL.interpret(code) match {
|
||||
case Success => {
|
||||
logger.debug(s"Successfully executed `$code` in $paragraphId")
|
||||
|
|
|
|||
|
|
@ -38,6 +38,12 @@
|
|||
<mockito.version>1.10.19</mockito.version>
|
||||
<powermock.version>1.6.4</powermock.version>
|
||||
<spark.version>2.0.1</spark.version>
|
||||
<pyspark.test.exclude>
|
||||
**/PySparkInterpreterMatplotlibTest.java
|
||||
</pyspark.test.exclude>
|
||||
<pyspark.test.include>
|
||||
**/*Test.*
|
||||
</pyspark.test.include>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
@ -322,6 +328,7 @@
|
|||
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
|
||||
<excludes>
|
||||
<exclude>**/SparkRInterpreterTest.java</exclude>
|
||||
<exclude>${pyspark.test.exclude}</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
@ -431,11 +438,35 @@
|
|||
<exclude>**/SparkRInterpreter.java</exclude>
|
||||
</excludes>
|
||||
<testExcludes>
|
||||
<testExclude>${pyspark.test.exclude}</testExclude>
|
||||
<testExclude>**/SparkRInterpreterTest.java</testExclude>
|
||||
<testExclude>**/ZeppelinRTest.java</testExclude>
|
||||
</testExcludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.scala-tools</groupId>
|
||||
<artifactId>maven-scala-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>**/ZeppelinR.scala</exclude>
|
||||
<exclude>**/SparkRBackend.scala</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<includes>
|
||||
<include>${pyspark.test.include}</include>
|
||||
</includes>
|
||||
<excludes>
|
||||
<exclude>${pyspark.test.exclude}</exclude>
|
||||
<exclude>**/SparkRInterpreterTest.java</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
|
@ -580,7 +611,9 @@
|
|||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes combine.self="override"></excludes>
|
||||
<testExcludes combine.self="override"></testExcludes>
|
||||
<testExcludes combine.self="override">
|
||||
<testExclude>${pyspark.test.exclude}</testExclude>
|
||||
</testExcludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
|
|
@ -596,6 +629,7 @@
|
|||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes combine.self="override">
|
||||
<exclude>${pyspark.test.exclude}</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
|||
|
|
@ -171,7 +171,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
Map env = EnvironmentUtils.getProcEnvironment();
|
||||
if (!env.containsKey("PYTHONPATH")) {
|
||||
SparkConf conf = getSparkConf();
|
||||
env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":"));
|
||||
env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
|
||||
":../interpreter/lib/python");
|
||||
}
|
||||
return env;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
|
||||
import org.apache.zeppelin.resource.Resource;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
|
@ -300,32 +301,103 @@ public class ZeppelinContext {
|
|||
|
||||
/**
|
||||
* Run paragraph by id
|
||||
* @param id
|
||||
* @param noteId
|
||||
* @param paragraphId
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public void run(String id) {
|
||||
run(id, interpreterContext);
|
||||
public void run(String noteId, String paragraphId) {
|
||||
run(noteId, paragraphId, interpreterContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraph by id
|
||||
* @param id
|
||||
* @param paragraphId
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public void run(String paragraphId) {
|
||||
String noteId = interpreterContext.getNoteId();
|
||||
run(noteId, paragraphId, interpreterContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraph by id
|
||||
* @param noteId
|
||||
* @param context
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public void run(String id, InterpreterContext context) {
|
||||
if (id.equals(context.getParagraphId())) {
|
||||
public void run(String noteId, String paragraphId, InterpreterContext context) {
|
||||
if (paragraphId.equals(context.getParagraphId())) {
|
||||
throw new InterpreterException("Can not run current Paragraph");
|
||||
}
|
||||
|
||||
for (InterpreterContextRunner r : context.getRunners()) {
|
||||
if (id.equals(r.getParagraphId())) {
|
||||
r.run();
|
||||
return;
|
||||
}
|
||||
List<InterpreterContextRunner> runners =
|
||||
getInterpreterContextRunner(noteId, paragraphId, context);
|
||||
|
||||
if (runners.size() <= 0) {
|
||||
throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
|
||||
}
|
||||
|
||||
throw new InterpreterException("Paragraph " + id + " not found");
|
||||
for (InterpreterContextRunner r : runners) {
|
||||
r.run();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void runNote(String noteId) {
|
||||
runNote(noteId, interpreterContext);
|
||||
}
|
||||
|
||||
public void runNote(String noteId, InterpreterContext context) {
|
||||
String runningNoteId = context.getNoteId();
|
||||
String runningParagraphId = context.getParagraphId();
|
||||
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
|
||||
|
||||
if (runners.size() <= 0) {
|
||||
throw new InterpreterException("Note " + noteId + " not found " + runners.size());
|
||||
}
|
||||
|
||||
for (InterpreterContextRunner r : runners) {
|
||||
if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) {
|
||||
continue;
|
||||
}
|
||||
r.run();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get Zeppelin Paragraph Runner from zeppelin server
|
||||
* @param noteId
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public List<InterpreterContextRunner> getInterpreterContextRunner(
|
||||
String noteId, InterpreterContext interpreterContext) {
|
||||
List<InterpreterContextRunner> runners = new LinkedList<>();
|
||||
RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
|
||||
|
||||
if (remoteWorksController != null) {
|
||||
runners = remoteWorksController.getRemoteContextRunner(noteId);
|
||||
}
|
||||
|
||||
return runners;
|
||||
}
|
||||
|
||||
/**
|
||||
* get Zeppelin Paragraph Runner from zeppelin server
|
||||
* @param noteId
|
||||
* @param paragraphId
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public List<InterpreterContextRunner> getInterpreterContextRunner(
|
||||
String noteId, String paragraphId, InterpreterContext interpreterContext) {
|
||||
List<InterpreterContextRunner> runners = new LinkedList<>();
|
||||
RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
|
||||
|
||||
if (remoteWorksController != null) {
|
||||
runners = remoteWorksController.getRemoteContextRunner(noteId, paragraphId);
|
||||
}
|
||||
|
||||
return runners;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -334,7 +406,8 @@ public class ZeppelinContext {
|
|||
*/
|
||||
@ZeppelinApi
|
||||
public void run(int idx) {
|
||||
run(idx, interpreterContext);
|
||||
String noteId = interpreterContext.getNoteId();
|
||||
run(noteId, idx, interpreterContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -342,12 +415,13 @@ public class ZeppelinContext {
|
|||
* @param idx index starting from 0
|
||||
* @param context interpreter context
|
||||
*/
|
||||
public void run(int idx, InterpreterContext context) {
|
||||
if (idx >= context.getRunners().size()) {
|
||||
public void run(String noteId, int idx, InterpreterContext context) {
|
||||
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
|
||||
if (idx >= runners.size()) {
|
||||
throw new InterpreterException("Index out of bound");
|
||||
}
|
||||
|
||||
InterpreterContextRunner runner = context.getRunners().get(idx);
|
||||
InterpreterContextRunner runner = runners.get(idx);
|
||||
if (runner.getParagraphId().equals(context.getParagraphId())) {
|
||||
throw new InterpreterException("Can not run current Paragraph");
|
||||
}
|
||||
|
|
@ -366,13 +440,14 @@ public class ZeppelinContext {
|
|||
*/
|
||||
@ZeppelinApi
|
||||
public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
|
||||
String noteId = context.getNoteId();
|
||||
for (Object idOrIdx : paragraphIdOrIdx) {
|
||||
if (idOrIdx instanceof String) {
|
||||
String id = (String) idOrIdx;
|
||||
run(id, context);
|
||||
String paragraphId = (String) idOrIdx;
|
||||
run(noteId, paragraphId, context);
|
||||
} else if (idOrIdx instanceof Integer) {
|
||||
Integer idx = (Integer) idOrIdx;
|
||||
run(idx, context);
|
||||
run(noteId, idx, context);
|
||||
} else {
|
||||
throw new InterpreterException("Paragraph " + idOrIdx + " not found");
|
||||
}
|
||||
|
|
@ -389,13 +464,7 @@ public class ZeppelinContext {
|
|||
*/
|
||||
@ZeppelinApi
|
||||
public void runAll(InterpreterContext context) {
|
||||
for (InterpreterContextRunner r : context.getRunners()) {
|
||||
if (r.getParagraphId().equals(context.getParagraphId())) {
|
||||
// skip itself
|
||||
continue;
|
||||
}
|
||||
r.run();
|
||||
}
|
||||
runNote(context.getNoteId());
|
||||
}
|
||||
|
||||
@ZeppelinApi
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ from pyspark.broadcast import Broadcast
|
|||
from pyspark.serializers import MarshalSerializer, PickleSerializer
|
||||
import ast
|
||||
import traceback
|
||||
import warnings
|
||||
|
||||
# for back compatibility
|
||||
from pyspark.sql import SQLContext, HiveContext, Row
|
||||
|
|
|
|||
|
|
@ -0,0 +1,256 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.spark;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.resource.LocalResourcePool;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
public class PySparkInterpreterMatplotlibTest {
|
||||
public static SparkInterpreter sparkInterpreter;
|
||||
public static PySparkInterpreter pyspark;
|
||||
public static InterpreterGroup intpGroup;
|
||||
private File tmpDir;
|
||||
public static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class);
|
||||
private InterpreterContext context;
|
||||
|
||||
public static class AltPySparkInterpreter extends PySparkInterpreter {
|
||||
/**
|
||||
* Since pyspark output is sent to an outputstream rather than
|
||||
* being directly provided by interpret(), this subclass is created to
|
||||
* override interpret() to append the result from the outputStream
|
||||
* for the sake of convenience in testing.
|
||||
*/
|
||||
public AltPySparkInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
/**
|
||||
* This code is mainly copied from RemoteInterpreterServer.java which
|
||||
* normally handles this in real use cases.
|
||||
*/
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
context.out.clear();
|
||||
InterpreterResult result = super.interpret(st, context);
|
||||
List<InterpreterResultMessage> resultMessages = null;
|
||||
try {
|
||||
context.out.flush();
|
||||
resultMessages = context.out.toInterpreterResultMessage();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
resultMessages.addAll(result.message());
|
||||
|
||||
return new InterpreterResult(result.code(), resultMessages);
|
||||
}
|
||||
}
|
||||
|
||||
public static Properties getPySparkTestProperties() {
|
||||
Properties p = new Properties();
|
||||
p.setProperty("master", "local[*]");
|
||||
p.setProperty("spark.app.name", "Zeppelin Test");
|
||||
p.setProperty("zeppelin.spark.useHiveContext", "true");
|
||||
p.setProperty("zeppelin.spark.maxResult", "1000");
|
||||
p.setProperty("zeppelin.spark.importImplicit", "true");
|
||||
p.setProperty("zeppelin.pyspark.python", "python");
|
||||
return p;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get spark version number as a numerical value.
|
||||
* eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
|
||||
*/
|
||||
public static int getSparkVersionNumber() {
|
||||
if (sparkInterpreter == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
String[] split = sparkInterpreter.getSparkContext().version().split("\\.");
|
||||
int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
|
||||
return version;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
|
||||
System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo");
|
||||
tmpDir.mkdirs();
|
||||
|
||||
intpGroup = new InterpreterGroup();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
if (sparkInterpreter == null) {
|
||||
sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
|
||||
intpGroup.get("note").add(sparkInterpreter);
|
||||
sparkInterpreter.setInterpreterGroup(intpGroup);
|
||||
sparkInterpreter.open();
|
||||
}
|
||||
|
||||
if (pyspark == null) {
|
||||
pyspark = new AltPySparkInterpreter(getPySparkTestProperties());
|
||||
intpGroup.get("note").add(pyspark);
|
||||
pyspark.setInterpreterGroup(intpGroup);
|
||||
pyspark.open();
|
||||
}
|
||||
|
||||
context = new InterpreterContext("note", "id", null, "title", "text",
|
||||
new AuthenticationInfo(),
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LocalResourcePool("id"),
|
||||
new LinkedList<InterpreterContextRunner>(),
|
||||
new InterpreterOutput(null));
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
delete(tmpDir);
|
||||
}
|
||||
|
||||
private void delete(File file) {
|
||||
if (file.isFile()) file.delete();
|
||||
else if (file.isDirectory()) {
|
||||
File[] files = file.listFiles();
|
||||
if (files != null && files.length > 0) {
|
||||
for (File f : files) {
|
||||
delete(f);
|
||||
}
|
||||
}
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dependenciesAreInstalled() {
|
||||
// matplotlib
|
||||
InterpreterResult ret = pyspark.interpret("import matplotlib", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
|
||||
// inline backend
|
||||
ret = pyspark.interpret("import backend_zinline", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void showPlot() {
|
||||
// Simple plot test
|
||||
InterpreterResult ret;
|
||||
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
|
||||
ret = pyspark.interpret("plt.close()", context);
|
||||
ret = pyspark.interpret("z.configure_mpl(interactive=False)", context);
|
||||
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret = pyspark.interpret("plt.show()", context);
|
||||
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertEquals(ret.message().toString(), Type.HTML, ret.message().get(0).getType());
|
||||
assertTrue(ret.message().get(0).getData().contains("data:image/png;base64"));
|
||||
assertTrue(ret.message().get(0).getData().contains("<div>"));
|
||||
}
|
||||
|
||||
@Test
|
||||
// Test for when configuration is set to auto-close figures after show().
|
||||
public void testClose() {
|
||||
InterpreterResult ret;
|
||||
InterpreterResult ret1;
|
||||
InterpreterResult ret2;
|
||||
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
|
||||
ret = pyspark.interpret("plt.close()", context);
|
||||
ret = pyspark.interpret("z.configure_mpl(interactive=False, close=True, angular=False)", context);
|
||||
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret1 = pyspark.interpret("plt.show()", context);
|
||||
|
||||
// Second call to show() should print nothing, and Type should be TEXT.
|
||||
// This is because when close=True, there should be no living instances
|
||||
// of FigureManager, causing show() to return before setting the output
|
||||
// type to HTML.
|
||||
ret = pyspark.interpret("plt.show()", context);
|
||||
assertEquals(0, ret.message().size());
|
||||
|
||||
// Now test that new plot is drawn. It should be identical to the
|
||||
// previous one.
|
||||
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret2 = pyspark.interpret("plt.show()", context);
|
||||
assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType());
|
||||
assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
// Test for when configuration is set to not auto-close figures after show().
|
||||
public void testNoClose() {
|
||||
InterpreterResult ret;
|
||||
InterpreterResult ret1;
|
||||
InterpreterResult ret2;
|
||||
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
|
||||
ret = pyspark.interpret("plt.close()", context);
|
||||
ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=False)", context);
|
||||
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret1 = pyspark.interpret("plt.show()", context);
|
||||
|
||||
// Second call to show() should print nothing, and Type should be HTML.
|
||||
// This is because when close=False, there should be living instances
|
||||
// of FigureManager, causing show() to set the output
|
||||
// type to HTML even though the figure is inactive.
|
||||
ret = pyspark.interpret("plt.show()", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
|
||||
// Now test that plot can be reshown if it is updated. It should be
|
||||
// different from the previous one because it will plot the same line
|
||||
// again but in a different color.
|
||||
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret2 = pyspark.interpret("plt.show()", context);
|
||||
assertNotSame(ret1.message().get(1).getData(), ret2.message().get(1).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
// Test angular mode
|
||||
public void testAngular() {
|
||||
InterpreterResult ret;
|
||||
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
|
||||
ret = pyspark.interpret("plt.close()", context);
|
||||
ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=True)", context);
|
||||
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret = pyspark.interpret("plt.show()", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertEquals(ret.message().toString(), Type.ANGULAR, ret.message().get(1).getType());
|
||||
|
||||
// Check if the figure data is in the Angular Object Registry
|
||||
AngularObjectRegistry registry = context.getAngularObjectRegistry();
|
||||
String figureData = registry.getAll("note", null).get(0).toString();
|
||||
assertTrue(figureData.contains("data:image/png;base64"));
|
||||
}
|
||||
}
|
||||
46
testing/install_external_dependencies.sh
Executable file
46
testing/install_external_dependencies.sh
Executable file
|
|
@ -0,0 +1,46 @@
|
|||
#!/bin/bash
|
||||
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# Script for installing R / Python dependencies for Travis CI
|
||||
set -ev
|
||||
touch ~/.environ
|
||||
|
||||
# Install R dependencies if R profiles are used
|
||||
if [[ ${PROFILE/"-Pr "} != $PROFILE ]] || [[ ${PROFILE/"-Psparkr "} != $PROFILE ]] ; then
|
||||
echo "R_LIBS=~/R" > ~/.Renviron
|
||||
echo "export R_LIBS=~/R" >> ~/.environ
|
||||
source ~/.environ
|
||||
if [[ ! -d "$HOME/R/knitr" ]] ; then
|
||||
mkdir -p ~/R
|
||||
R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', lib='~/R')"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Install Python dependencies for Python specific tests
|
||||
if [[ -n "$PYTHON" ]] ; then
|
||||
wget https://repo.continuum.io/miniconda/Miniconda${PYTHON}-latest-Linux-x86_64.sh -O miniconda.sh
|
||||
bash miniconda.sh -b -p $HOME/miniconda
|
||||
echo "export PATH='$HOME/miniconda/bin:$PATH'" >> ~/.environ
|
||||
source ~/.environ
|
||||
hash -r
|
||||
conda config --set always_yes yes --set changeps1 no
|
||||
conda update -q conda
|
||||
conda info -a
|
||||
conda config --add channels conda-forge
|
||||
conda install -q matplotlib pandasql
|
||||
fi
|
||||
|
|
@ -61,6 +61,7 @@ public class InterpreterContext {
|
|||
private List<InterpreterContextRunner> runners;
|
||||
private String className;
|
||||
private RemoteEventClientWrapper client;
|
||||
private RemoteWorksController remoteWorksController;
|
||||
|
||||
public InterpreterContext(String noteId,
|
||||
String paragraphId,
|
||||
|
|
@ -75,6 +76,24 @@ public class InterpreterContext {
|
|||
List<InterpreterContextRunner> runners,
|
||||
InterpreterOutput out
|
||||
) {
|
||||
this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
|
||||
config, gui, angularObjectRegistry, resourcePool, runners, out, null);
|
||||
}
|
||||
|
||||
public InterpreterContext(String noteId,
|
||||
String paragraphId,
|
||||
String replName,
|
||||
String paragraphTitle,
|
||||
String paragraphText,
|
||||
AuthenticationInfo authenticationInfo,
|
||||
Map<String, Object> config,
|
||||
GUI gui,
|
||||
AngularObjectRegistry angularObjectRegistry,
|
||||
ResourcePool resourcePool,
|
||||
List<InterpreterContextRunner> runners,
|
||||
InterpreterOutput out,
|
||||
RemoteWorksController remoteWorksController
|
||||
) {
|
||||
this.noteId = noteId;
|
||||
this.paragraphId = paragraphId;
|
||||
this.replName = replName;
|
||||
|
|
@ -87,6 +106,7 @@ public class InterpreterContext {
|
|||
this.resourcePool = resourcePool;
|
||||
this.runners = runners;
|
||||
this.out = out;
|
||||
this.remoteWorksController = remoteWorksController;
|
||||
}
|
||||
|
||||
public InterpreterContext(String noteId,
|
||||
|
|
@ -101,9 +121,11 @@ public class InterpreterContext {
|
|||
ResourcePool resourcePool,
|
||||
List<InterpreterContextRunner> contextRunners,
|
||||
InterpreterOutput output,
|
||||
RemoteWorksController remoteWorksController,
|
||||
RemoteInterpreterEventClient eventClient) {
|
||||
this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
|
||||
config, gui, angularObjectRegistry, resourcePool, contextRunners, output);
|
||||
config, gui, angularObjectRegistry, resourcePool, contextRunners, output,
|
||||
remoteWorksController);
|
||||
this.client = new RemoteEventClient(eventClient);
|
||||
}
|
||||
|
||||
|
|
@ -162,4 +184,12 @@ public class InterpreterContext {
|
|||
public RemoteEventClientWrapper getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
public RemoteWorksController getRemoteWorksController() {
|
||||
return remoteWorksController;
|
||||
}
|
||||
|
||||
public void setRemoteWorksController(RemoteWorksController remoteWorksController) {
|
||||
this.remoteWorksController = remoteWorksController;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* zeppelin job for Remote works controller by interpreter
|
||||
*
|
||||
*/
|
||||
public interface RemoteWorksController {
|
||||
List<InterpreterContextRunner> getRemoteContextRunner(String noteId);
|
||||
List<InterpreterContextRunner> getRemoteContextRunner(String noteId, String paragraphId);
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
/**
|
||||
* Remote Zeppelin Server Resource
|
||||
*/
|
||||
public class RemoteZeppelinServerResource {
|
||||
/**
|
||||
* Resource Type for Zeppelin Server
|
||||
*/
|
||||
public enum Type{
|
||||
PARAGRAPH_RUNNERS
|
||||
}
|
||||
|
||||
private String ownerKey;
|
||||
private Type resourceType;
|
||||
private Object data;
|
||||
|
||||
public Type getResourceType() {
|
||||
return resourceType;
|
||||
}
|
||||
|
||||
public String getOwnerKey() {
|
||||
return ownerKey;
|
||||
}
|
||||
|
||||
public void setOwnerKey(String ownerKey) {
|
||||
this.ownerKey = ownerKey;
|
||||
}
|
||||
|
||||
public void setResourceType(Type resourceType) {
|
||||
this.resourceType = resourceType;
|
||||
}
|
||||
|
||||
public Object getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public void setData(Object data) {
|
||||
this.data = data;
|
||||
}
|
||||
}
|
||||
|
|
@ -21,8 +21,10 @@ import org.apache.zeppelin.display.AngularObject;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
|
||||
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
|
||||
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
|
||||
import org.apache.zeppelin.resource.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -50,6 +52,22 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
private final Map<ResourceId, Object> getResourceResponse = new HashMap<>();
|
||||
private final Gson gson = new Gson();
|
||||
|
||||
/**
|
||||
* Run paragraph
|
||||
* @param runner
|
||||
*/
|
||||
public void getZeppelinServerNoteRunner(
|
||||
String eventOwnerKey, ZeppelinServerResourceParagraphRunner runner) {
|
||||
RemoteZeppelinServerResource eventBody = new RemoteZeppelinServerResource();
|
||||
eventBody.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
|
||||
eventBody.setOwnerKey(eventOwnerKey);
|
||||
eventBody.setData(runner);
|
||||
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE,
|
||||
gson.toJson(eventBody)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraph
|
||||
* @param runner
|
||||
|
|
|
|||
|
|
@ -26,9 +26,11 @@ import org.apache.zeppelin.helium.ApplicationEventListener;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
|
||||
import org.apache.zeppelin.resource.Resource;
|
||||
import org.apache.zeppelin.resource.ResourceId;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
|
@ -146,8 +148,9 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
InterpreterContextRunner runnerFromRemote = gson.fromJson(
|
||||
event.getData(), RemoteInterpreterContextRunner.class);
|
||||
|
||||
interpreterProcess.getInterpreterContextRunnerPool().run(
|
||||
listener.onRemoteRunParagraph(
|
||||
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
|
||||
|
||||
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
|
||||
ResourceSet resourceSet = getAllResourcePoolExcept();
|
||||
sendResourcePoolResponseGetAll(resourceSet);
|
||||
|
|
@ -222,6 +225,12 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
String status = appStatusUpdate.get("status");
|
||||
|
||||
appListener.onStatusChange(noteId, paragraphId, appId, status);
|
||||
} else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE) {
|
||||
RemoteZeppelinServerResource reqResourceBody = gson.fromJson(
|
||||
event.getData(), RemoteZeppelinServerResource.class);
|
||||
progressRemoteZeppelinControlEvent(
|
||||
reqResourceBody.getResourceType(), listener, reqResourceBody);
|
||||
|
||||
} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
|
||||
Map<String, String> metaInfos = gson.fromJson(event.getData(),
|
||||
new TypeToken<Map<String, String>>() {
|
||||
|
|
@ -241,6 +250,82 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
private void progressRemoteZeppelinControlEvent(
|
||||
RemoteZeppelinServerResource.Type resourceType,
|
||||
RemoteInterpreterProcessListener remoteWorksEventListener,
|
||||
RemoteZeppelinServerResource reqResourceBody) throws Exception {
|
||||
boolean broken = false;
|
||||
final Gson gson = new Gson();
|
||||
final String eventOwnerKey = reqResourceBody.getOwnerKey();
|
||||
Client interpreterServerMain = null;
|
||||
try {
|
||||
interpreterServerMain = interpreterProcess.getClient();
|
||||
final Client eventClient = interpreterServerMain;
|
||||
if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
|
||||
final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
|
||||
|
||||
ZeppelinServerResourceParagraphRunner reqRunnerContext =
|
||||
new ZeppelinServerResourceParagraphRunner();
|
||||
|
||||
Map<String, Object> reqResourceMap = (Map<String, Object>) reqResourceBody.getData();
|
||||
String noteId = (String) reqResourceMap.get("noteId");
|
||||
String paragraphId = (String) reqResourceMap.get("paragraphId");
|
||||
|
||||
reqRunnerContext.setNoteId(noteId);
|
||||
reqRunnerContext.setParagraphId(paragraphId);
|
||||
|
||||
RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent =
|
||||
new RemoteInterpreterProcessListener.RemoteWorksEventListener() {
|
||||
|
||||
@Override
|
||||
public void onFinished(Object resultObject) {
|
||||
boolean clientBroken = false;
|
||||
if (resultObject != null && resultObject instanceof List) {
|
||||
List<InterpreterContextRunner> runnerList =
|
||||
(List<InterpreterContextRunner>) resultObject;
|
||||
for (InterpreterContextRunner r : runnerList) {
|
||||
remoteRunners.add(
|
||||
new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
|
||||
);
|
||||
}
|
||||
|
||||
final RemoteZeppelinServerResource resResource =
|
||||
new RemoteZeppelinServerResource();
|
||||
resResource.setOwnerKey(eventOwnerKey);
|
||||
resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
|
||||
resResource.setData(remoteRunners);
|
||||
|
||||
try {
|
||||
eventClient.onReceivedZeppelinResource(gson.toJson(resResource));
|
||||
} catch (Exception e) {
|
||||
clientBroken = true;
|
||||
logger.error("Can't get RemoteInterpreterEvent", e);
|
||||
waitQuietly();
|
||||
} finally {
|
||||
interpreterProcess.releaseClient(eventClient, clientBroken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError() {
|
||||
logger.info("onGetParagraphRunners onError");
|
||||
}
|
||||
};
|
||||
|
||||
remoteWorksEventListener.onGetParagraphRunners(
|
||||
reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
broken = true;
|
||||
logger.error("Can't get RemoteInterpreterEvent", e);
|
||||
waitQuietly();
|
||||
|
||||
} finally {
|
||||
interpreterProcess.releaseClient(interpreterServerMain, broken);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
|
||||
Client client = null;
|
||||
boolean broken = false;
|
||||
|
|
|
|||
|
|
@ -29,4 +29,15 @@ public interface RemoteInterpreterProcessListener {
|
|||
String noteId, String paragraphId, int index, InterpreterResult.Type type, String output);
|
||||
public void onOutputClear(String noteId, String paragraphId);
|
||||
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
|
||||
public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception;
|
||||
public void onGetParagraphRunners(
|
||||
String noteId, String paragraphId, RemoteWorksEventListener callback);
|
||||
|
||||
/**
|
||||
* Remote works for Interpreter callback listener
|
||||
*/
|
||||
public interface RemoteWorksEventListener {
|
||||
public void onFinished(Object resultObject);
|
||||
public void onError();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,7 +23,10 @@ import java.lang.reflect.Constructor;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.rmi.server.RemoteServer;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
|
|
@ -78,6 +81,9 @@ public class RemoteInterpreterServer
|
|||
private final Map<String, RunningApplication> runningApplications =
|
||||
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
|
||||
|
||||
private Map<String, Object> remoteWorksResponsePool;
|
||||
private ZeppelinRemoteWorksController remoteWorksController;
|
||||
|
||||
public RemoteInterpreterServer(int port) throws TTransportException {
|
||||
this.port = port;
|
||||
|
||||
|
|
@ -85,6 +91,8 @@ public class RemoteInterpreterServer
|
|||
TServerSocket serverTransport = new TServerSocket(port);
|
||||
server = new TThreadPoolServer(
|
||||
new TThreadPoolServer.Args(serverTransport).processor(processor));
|
||||
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
|
||||
remoteWorksController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -174,7 +182,6 @@ public class RemoteInterpreterServer
|
|||
Constructor<Interpreter> constructor =
|
||||
replClass.getConstructor(new Class[] {Properties.class});
|
||||
Interpreter repl = constructor.newInstance(p);
|
||||
|
||||
repl.setClassloaderUrls(new URL[]{});
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
|
|
@ -335,6 +342,42 @@ public class RemoteInterpreterServer
|
|||
context.getGui());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceivedZeppelinResource(String responseJson) throws TException {
|
||||
RemoteZeppelinServerResource response = gson.fromJson(
|
||||
responseJson, RemoteZeppelinServerResource.class);
|
||||
|
||||
if (response == null) {
|
||||
throw new TException("Bad response for remote resource");
|
||||
}
|
||||
|
||||
try {
|
||||
if (response.getResourceType() == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
|
||||
List<InterpreterContextRunner> intpContextRunners = new LinkedList<>();
|
||||
List<Map<String, Object>> remoteRunnersMap =
|
||||
(List<Map<String, Object>>) response.getData();
|
||||
|
||||
String noteId = null;
|
||||
String paragraphId = null;
|
||||
|
||||
for (Map<String, Object> runnerItem : remoteRunnersMap) {
|
||||
noteId = (String) runnerItem.get("noteId");
|
||||
paragraphId = (String) runnerItem.get("paragraphId");
|
||||
intpContextRunners.add(
|
||||
new ParagraphRunner(this, noteId, paragraphId)
|
||||
);
|
||||
}
|
||||
|
||||
synchronized (this.remoteWorksResponsePool) {
|
||||
this.remoteWorksResponsePool.put(
|
||||
response.getOwnerKey(),
|
||||
intpContextRunners);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
class InterpretJobListener implements JobListener {
|
||||
|
||||
|
|
@ -540,7 +583,7 @@ public class RemoteInterpreterServer
|
|||
gson.fromJson(ric.getGui(), GUI.class),
|
||||
interpreterGroup.getAngularObjectRegistry(),
|
||||
interpreterGroup.getResourcePool(),
|
||||
contextRunners, output, eventClient);
|
||||
contextRunners, output, remoteWorksController, eventClient);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -582,7 +625,7 @@ public class RemoteInterpreterServer
|
|||
|
||||
|
||||
static class ParagraphRunner extends InterpreterContextRunner {
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(ParagraphRunner.class);
|
||||
private transient RemoteInterpreterServer server;
|
||||
|
||||
public ParagraphRunner(RemoteInterpreterServer server, String noteId, String paragraphId) {
|
||||
|
|
@ -596,6 +639,78 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
}
|
||||
|
||||
static class ZeppelinRemoteWorksController implements RemoteWorksController{
|
||||
Logger logger = LoggerFactory.getLogger(ZeppelinRemoteWorksController.class);
|
||||
|
||||
private final long DEFAULT_TIMEOUT_VALUE = 300000;
|
||||
private final Map<String, Object> remoteWorksResponsePool;
|
||||
private RemoteInterpreterServer server;
|
||||
public ZeppelinRemoteWorksController(
|
||||
RemoteInterpreterServer server, Map<String, Object> remoteWorksResponsePool) {
|
||||
this.remoteWorksResponsePool = remoteWorksResponsePool;
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
public String generateOwnerKey() {
|
||||
String hashKeyText = new String("ownerKey" + System.currentTimeMillis());
|
||||
String hashKey = String.valueOf(hashKeyText.hashCode());
|
||||
return hashKey;
|
||||
}
|
||||
|
||||
public boolean waitForEvent(String eventOwnerKey) throws InterruptedException {
|
||||
return waitForEvent(eventOwnerKey, DEFAULT_TIMEOUT_VALUE);
|
||||
}
|
||||
|
||||
public boolean waitForEvent(String eventOwnerKey, long timeout) throws InterruptedException {
|
||||
boolean wasGetData = false;
|
||||
long now = System.currentTimeMillis();
|
||||
long endTime = System.currentTimeMillis() + timeout;
|
||||
|
||||
while (endTime >= now) {
|
||||
synchronized (this.remoteWorksResponsePool) {
|
||||
wasGetData = this.remoteWorksResponsePool.containsKey(eventOwnerKey);
|
||||
}
|
||||
if (wasGetData == true) {
|
||||
break;
|
||||
}
|
||||
now = System.currentTimeMillis();
|
||||
sleep(500);
|
||||
}
|
||||
|
||||
return wasGetData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InterpreterContextRunner> getRemoteContextRunner(String noteId) {
|
||||
return getRemoteContextRunner(noteId, null);
|
||||
}
|
||||
|
||||
public List<InterpreterContextRunner> getRemoteContextRunner(
|
||||
String noteId, String paragraphID) {
|
||||
|
||||
List<InterpreterContextRunner> runners = null;
|
||||
String ownerKey = generateOwnerKey();
|
||||
|
||||
ZeppelinServerResourceParagraphRunner resource = new ZeppelinServerResourceParagraphRunner();
|
||||
resource.setNoteId(noteId);
|
||||
resource.setParagraphId(paragraphID);
|
||||
server.eventClient.getZeppelinServerNoteRunner(ownerKey, resource);
|
||||
|
||||
try {
|
||||
this.waitForEvent(ownerKey);
|
||||
} catch (Exception e) {
|
||||
return new LinkedList<>();
|
||||
}
|
||||
synchronized (this.remoteWorksResponsePool) {
|
||||
runners = (List<InterpreterContextRunner>) this.remoteWorksResponsePool.get(ownerKey);
|
||||
this.remoteWorksResponsePool.remove(ownerKey);
|
||||
}
|
||||
return runners;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private RemoteInterpreterResult convert(InterpreterResult result,
|
||||
Map<String, Object> config, GUI gui) {
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
|
||||
public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
|
||||
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
|
||||
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
|
||||
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
|
||||
|
||||
|
|
|
|||
|
|
@ -41,7 +41,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
|||
OUTPUT_UPDATE_ALL(10),
|
||||
ANGULAR_REGISTRY_PUSH(11),
|
||||
APP_STATUS_UPDATE(12),
|
||||
META_INFOS(13);
|
||||
META_INFOS(13),
|
||||
REMOTE_ZEPPELIN_SERVER_RESOURCE(14);
|
||||
|
||||
private final int value;
|
||||
|
||||
|
|
@ -88,6 +89,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
|||
return APP_STATUS_UPDATE;
|
||||
case 13:
|
||||
return META_INFOS;
|
||||
case 14:
|
||||
return REMOTE_ZEPPELIN_SERVER_RESOURCE;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
|
||||
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
|
||||
public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase<RemoteInterpreterResultMessage, RemoteInterpreterResultMessage._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResultMessage> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage");
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -0,0 +1,520 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.2)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.thrift;
|
||||
|
||||
import org.apache.thrift.scheme.IScheme;
|
||||
import org.apache.thrift.scheme.SchemeFactory;
|
||||
import org.apache.thrift.scheme.StandardScheme;
|
||||
|
||||
import org.apache.thrift.scheme.TupleScheme;
|
||||
import org.apache.thrift.protocol.TTupleProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolException;
|
||||
import org.apache.thrift.EncodingUtils;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.async.AsyncMethodCallback;
|
||||
import org.apache.thrift.server.AbstractNonblockingServer.*;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Collections;
|
||||
import java.util.BitSet;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import javax.annotation.Generated;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
|
||||
public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase<ZeppelinServerResourceParagraphRunner, ZeppelinServerResourceParagraphRunner._Fields>, java.io.Serializable, Cloneable, Comparable<ZeppelinServerResourceParagraphRunner> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner");
|
||||
|
||||
private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
|
||||
private static final org.apache.thrift.protocol.TField PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphId", org.apache.thrift.protocol.TType.STRING, (short)2);
|
||||
|
||||
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
|
||||
static {
|
||||
schemes.put(StandardScheme.class, new ZeppelinServerResourceParagraphRunnerStandardSchemeFactory());
|
||||
schemes.put(TupleScheme.class, new ZeppelinServerResourceParagraphRunnerTupleSchemeFactory());
|
||||
}
|
||||
|
||||
public String noteId; // required
|
||||
public String paragraphId; // required
|
||||
|
||||
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
|
||||
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
|
||||
NOTE_ID((short)1, "noteId"),
|
||||
PARAGRAPH_ID((short)2, "paragraphId");
|
||||
|
||||
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
|
||||
|
||||
static {
|
||||
for (_Fields field : EnumSet.allOf(_Fields.class)) {
|
||||
byName.put(field.getFieldName(), field);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByThriftId(int fieldId) {
|
||||
switch(fieldId) {
|
||||
case 1: // NOTE_ID
|
||||
return NOTE_ID;
|
||||
case 2: // PARAGRAPH_ID
|
||||
return PARAGRAPH_ID;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, throwing an exception
|
||||
* if it is not found.
|
||||
*/
|
||||
public static _Fields findByThriftIdOrThrow(int fieldId) {
|
||||
_Fields fields = findByThriftId(fieldId);
|
||||
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
|
||||
return fields;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches name, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByName(String name) {
|
||||
return byName.get(name);
|
||||
}
|
||||
|
||||
private final short _thriftId;
|
||||
private final String _fieldName;
|
||||
|
||||
_Fields(short thriftId, String fieldName) {
|
||||
_thriftId = thriftId;
|
||||
_fieldName = fieldName;
|
||||
}
|
||||
|
||||
public short getThriftFieldId() {
|
||||
return _thriftId;
|
||||
}
|
||||
|
||||
public String getFieldName() {
|
||||
return _fieldName;
|
||||
}
|
||||
}
|
||||
|
||||
// isset id assignments
|
||||
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
|
||||
static {
|
||||
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
|
||||
tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
tmpMap.put(_Fields.PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("paragraphId", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
metaDataMap = Collections.unmodifiableMap(tmpMap);
|
||||
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ZeppelinServerResourceParagraphRunner.class, metaDataMap);
|
||||
}
|
||||
|
||||
public ZeppelinServerResourceParagraphRunner() {
|
||||
}
|
||||
|
||||
public ZeppelinServerResourceParagraphRunner(
|
||||
String noteId,
|
||||
String paragraphId)
|
||||
{
|
||||
this();
|
||||
this.noteId = noteId;
|
||||
this.paragraphId = paragraphId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a deep copy on <i>other</i>.
|
||||
*/
|
||||
public ZeppelinServerResourceParagraphRunner(ZeppelinServerResourceParagraphRunner other) {
|
||||
if (other.isSetNoteId()) {
|
||||
this.noteId = other.noteId;
|
||||
}
|
||||
if (other.isSetParagraphId()) {
|
||||
this.paragraphId = other.paragraphId;
|
||||
}
|
||||
}
|
||||
|
||||
public ZeppelinServerResourceParagraphRunner deepCopy() {
|
||||
return new ZeppelinServerResourceParagraphRunner(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
this.noteId = null;
|
||||
this.paragraphId = null;
|
||||
}
|
||||
|
||||
public String getNoteId() {
|
||||
return this.noteId;
|
||||
}
|
||||
|
||||
public ZeppelinServerResourceParagraphRunner setNoteId(String noteId) {
|
||||
this.noteId = noteId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetNoteId() {
|
||||
this.noteId = null;
|
||||
}
|
||||
|
||||
/** Returns true if field noteId is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetNoteId() {
|
||||
return this.noteId != null;
|
||||
}
|
||||
|
||||
public void setNoteIdIsSet(boolean value) {
|
||||
if (!value) {
|
||||
this.noteId = null;
|
||||
}
|
||||
}
|
||||
|
||||
public String getParagraphId() {
|
||||
return this.paragraphId;
|
||||
}
|
||||
|
||||
public ZeppelinServerResourceParagraphRunner setParagraphId(String paragraphId) {
|
||||
this.paragraphId = paragraphId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetParagraphId() {
|
||||
this.paragraphId = null;
|
||||
}
|
||||
|
||||
/** Returns true if field paragraphId is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetParagraphId() {
|
||||
return this.paragraphId != null;
|
||||
}
|
||||
|
||||
public void setParagraphIdIsSet(boolean value) {
|
||||
if (!value) {
|
||||
this.paragraphId = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setFieldValue(_Fields field, Object value) {
|
||||
switch (field) {
|
||||
case NOTE_ID:
|
||||
if (value == null) {
|
||||
unsetNoteId();
|
||||
} else {
|
||||
setNoteId((String)value);
|
||||
}
|
||||
break;
|
||||
|
||||
case PARAGRAPH_ID:
|
||||
if (value == null) {
|
||||
unsetParagraphId();
|
||||
} else {
|
||||
setParagraphId((String)value);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public Object getFieldValue(_Fields field) {
|
||||
switch (field) {
|
||||
case NOTE_ID:
|
||||
return getNoteId();
|
||||
|
||||
case PARAGRAPH_ID:
|
||||
return getParagraphId();
|
||||
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSet(_Fields field) {
|
||||
if (field == null) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
switch (field) {
|
||||
case NOTE_ID:
|
||||
return isSetNoteId();
|
||||
case PARAGRAPH_ID:
|
||||
return isSetParagraphId();
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
if (that instanceof ZeppelinServerResourceParagraphRunner)
|
||||
return this.equals((ZeppelinServerResourceParagraphRunner)that);
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean equals(ZeppelinServerResourceParagraphRunner that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
|
||||
boolean this_present_noteId = true && this.isSetNoteId();
|
||||
boolean that_present_noteId = true && that.isSetNoteId();
|
||||
if (this_present_noteId || that_present_noteId) {
|
||||
if (!(this_present_noteId && that_present_noteId))
|
||||
return false;
|
||||
if (!this.noteId.equals(that.noteId))
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean this_present_paragraphId = true && this.isSetParagraphId();
|
||||
boolean that_present_paragraphId = true && that.isSetParagraphId();
|
||||
if (this_present_paragraphId || that_present_paragraphId) {
|
||||
if (!(this_present_paragraphId && that_present_paragraphId))
|
||||
return false;
|
||||
if (!this.paragraphId.equals(that.paragraphId))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
List<Object> list = new ArrayList<Object>();
|
||||
|
||||
boolean present_noteId = true && (isSetNoteId());
|
||||
list.add(present_noteId);
|
||||
if (present_noteId)
|
||||
list.add(noteId);
|
||||
|
||||
boolean present_paragraphId = true && (isSetParagraphId());
|
||||
list.add(present_paragraphId);
|
||||
if (present_paragraphId)
|
||||
list.add(paragraphId);
|
||||
|
||||
return list.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(ZeppelinServerResourceParagraphRunner other) {
|
||||
if (!getClass().equals(other.getClass())) {
|
||||
return getClass().getName().compareTo(other.getClass().getName());
|
||||
}
|
||||
|
||||
int lastComparison = 0;
|
||||
|
||||
lastComparison = Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetNoteId()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
lastComparison = Boolean.valueOf(isSetParagraphId()).compareTo(other.isSetParagraphId());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetParagraphId()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphId, other.paragraphId);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public _Fields fieldForId(int fieldId) {
|
||||
return _Fields.findByThriftId(fieldId);
|
||||
}
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
|
||||
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
|
||||
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("ZeppelinServerResourceParagraphRunner(");
|
||||
boolean first = true;
|
||||
|
||||
sb.append("noteId:");
|
||||
if (this.noteId == null) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
sb.append(this.noteId);
|
||||
}
|
||||
first = false;
|
||||
if (!first) sb.append(", ");
|
||||
sb.append("paragraphId:");
|
||||
if (this.paragraphId == null) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
sb.append(this.paragraphId);
|
||||
}
|
||||
first = false;
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public void validate() throws org.apache.thrift.TException {
|
||||
// check for required fields
|
||||
// check for sub-struct validity
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
|
||||
try {
|
||||
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
|
||||
try {
|
||||
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ZeppelinServerResourceParagraphRunnerStandardSchemeFactory implements SchemeFactory {
|
||||
public ZeppelinServerResourceParagraphRunnerStandardScheme getScheme() {
|
||||
return new ZeppelinServerResourceParagraphRunnerStandardScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class ZeppelinServerResourceParagraphRunnerStandardScheme extends StandardScheme<ZeppelinServerResourceParagraphRunner> {
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
|
||||
org.apache.thrift.protocol.TField schemeField;
|
||||
iprot.readStructBegin();
|
||||
while (true)
|
||||
{
|
||||
schemeField = iprot.readFieldBegin();
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
|
||||
break;
|
||||
}
|
||||
switch (schemeField.id) {
|
||||
case 1: // NOTE_ID
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
|
||||
struct.noteId = iprot.readString();
|
||||
struct.setNoteIdIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
case 2: // PARAGRAPH_ID
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
|
||||
struct.paragraphId = iprot.readString();
|
||||
struct.setParagraphIdIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
iprot.readFieldEnd();
|
||||
}
|
||||
iprot.readStructEnd();
|
||||
|
||||
// check for required fields of primitive type, which can't be checked in the validate method
|
||||
struct.validate();
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
|
||||
struct.validate();
|
||||
|
||||
oprot.writeStructBegin(STRUCT_DESC);
|
||||
if (struct.noteId != null) {
|
||||
oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
|
||||
oprot.writeString(struct.noteId);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
if (struct.paragraphId != null) {
|
||||
oprot.writeFieldBegin(PARAGRAPH_ID_FIELD_DESC);
|
||||
oprot.writeString(struct.paragraphId);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
oprot.writeFieldStop();
|
||||
oprot.writeStructEnd();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class ZeppelinServerResourceParagraphRunnerTupleSchemeFactory implements SchemeFactory {
|
||||
public ZeppelinServerResourceParagraphRunnerTupleScheme getScheme() {
|
||||
return new ZeppelinServerResourceParagraphRunnerTupleScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class ZeppelinServerResourceParagraphRunnerTupleScheme extends TupleScheme<ZeppelinServerResourceParagraphRunner> {
|
||||
|
||||
@Override
|
||||
public void write(org.apache.thrift.protocol.TProtocol prot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol oprot = (TTupleProtocol) prot;
|
||||
BitSet optionals = new BitSet();
|
||||
if (struct.isSetNoteId()) {
|
||||
optionals.set(0);
|
||||
}
|
||||
if (struct.isSetParagraphId()) {
|
||||
optionals.set(1);
|
||||
}
|
||||
oprot.writeBitSet(optionals, 2);
|
||||
if (struct.isSetNoteId()) {
|
||||
oprot.writeString(struct.noteId);
|
||||
}
|
||||
if (struct.isSetParagraphId()) {
|
||||
oprot.writeString(struct.paragraphId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void read(org.apache.thrift.protocol.TProtocol prot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol iprot = (TTupleProtocol) prot;
|
||||
BitSet incoming = iprot.readBitSet(2);
|
||||
if (incoming.get(0)) {
|
||||
struct.noteId = iprot.readString();
|
||||
struct.setNoteIdIsSet(true);
|
||||
}
|
||||
if (incoming.get(1)) {
|
||||
struct.paragraphId = iprot.readString();
|
||||
struct.setParagraphIdIsSet(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -19,7 +19,7 @@
|
|||
namespace java org.apache.zeppelin.interpreter.thrift
|
||||
|
||||
struct RemoteInterpreterContext {
|
||||
1: string sessionKey,
|
||||
1: string noteId,
|
||||
2: string paragraphId,
|
||||
3: string replName,
|
||||
4: string paragraphTitle,
|
||||
|
|
@ -54,9 +54,11 @@ enum RemoteInterpreterEventType {
|
|||
OUTPUT_UPDATE_ALL = 10,
|
||||
ANGULAR_REGISTRY_PUSH = 11,
|
||||
APP_STATUS_UPDATE = 12,
|
||||
META_INFOS = 13
|
||||
META_INFOS = 13,
|
||||
REMOTE_ZEPPELIN_SERVER_RESOURCE = 14
|
||||
}
|
||||
|
||||
|
||||
struct RemoteInterpreterEvent {
|
||||
1: RemoteInterpreterEventType type,
|
||||
2: string data // json serialized data
|
||||
|
|
@ -67,6 +69,11 @@ struct RemoteApplicationResult {
|
|||
2: string msg
|
||||
}
|
||||
|
||||
struct ZeppelinServerResourceParagraphRunner {
|
||||
1: string noteId,
|
||||
2: string paragraphId
|
||||
}
|
||||
|
||||
/*
|
||||
* The below variables(name, value) will be connected to getCompletions in paragraph.controller.js
|
||||
*
|
||||
|
|
@ -114,4 +121,6 @@ service RemoteInterpreterService {
|
|||
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionKey, 4: string paragraphId);
|
||||
RemoteApplicationResult unloadApplication(1: string applicationInstanceId);
|
||||
RemoteApplicationResult runApplication(1: string applicationInstanceId);
|
||||
|
||||
void onReceivedZeppelinResource(1: string object);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -171,4 +171,16 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
|
|||
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
|
||||
if (callback != null) {
|
||||
callback.onFinished(new LinkedList<>());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -82,16 +82,16 @@ public class RemoteInterpreterTest {
|
|||
|
||||
private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) {
|
||||
return new RemoteInterpreter(
|
||||
p,
|
||||
noteId,
|
||||
MockInterpreterA.class.getName(),
|
||||
new File(INTERPRETER_SCRIPT).getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
p,
|
||||
noteId,
|
||||
MockInterpreterA.class.getName(),
|
||||
new File(INTERPRETER_SCRIPT).getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
"anonymous",
|
||||
false);
|
||||
}
|
||||
|
|
@ -102,16 +102,16 @@ public class RemoteInterpreterTest {
|
|||
|
||||
private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) {
|
||||
return new RemoteInterpreter(
|
||||
p,
|
||||
noteId,
|
||||
MockInterpreterB.class.getName(),
|
||||
new File(INTERPRETER_SCRIPT).getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
p,
|
||||
noteId,
|
||||
MockInterpreterB.class.getName(),
|
||||
new File(INTERPRETER_SCRIPT).getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
"anonymous",
|
||||
false);
|
||||
}
|
||||
|
|
@ -218,7 +218,6 @@ public class RemoteInterpreterTest {
|
|||
"anonymous",
|
||||
false);
|
||||
|
||||
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
|
|
|
|||
|
|
@ -313,4 +313,16 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
|
||||
if (callback != null) {
|
||||
callback.onFinished(new LinkedList<>());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.vfs2.FileSystemException;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
|
|
@ -41,7 +42,12 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
|
|
@ -56,6 +62,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
|
|||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.apache.zeppelin.notebook.socket.Message.OP;
|
||||
import org.apache.zeppelin.notebook.socket.WatcherMessage;
|
||||
import org.apache.zeppelin.rest.exception.ForbiddenException;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.server.ZeppelinServer;
|
||||
|
|
@ -1529,6 +1536,72 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGetParagraphRunners(
|
||||
String noteId, String paragraphId, RemoteWorksEventListener callback) {
|
||||
Notebook notebookIns = notebook();
|
||||
List<InterpreterContextRunner> runner = new LinkedList<>();
|
||||
|
||||
if (notebookIns == null) {
|
||||
LOG.info("intepreter request notebook instance is null");
|
||||
callback.onFinished(notebookIns);
|
||||
}
|
||||
|
||||
try {
|
||||
Note note = notebookIns.getNote(noteId);
|
||||
if (note != null) {
|
||||
if (paragraphId != null) {
|
||||
Paragraph paragraph = note.getParagraph(paragraphId);
|
||||
if (paragraph != null) {
|
||||
runner.add(paragraph.getInterpreterContextRunner());
|
||||
}
|
||||
} else {
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
runner.add(p.getInterpreterContextRunner());
|
||||
}
|
||||
}
|
||||
}
|
||||
callback.onFinished(runner);
|
||||
} catch (NullPointerException e) {
|
||||
LOG.warn(e.getMessage());
|
||||
callback.onError();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteRunParagraph(String noteId, String paragraphId) throws Exception {
|
||||
Notebook notebookIns = notebook();
|
||||
try {
|
||||
if (notebookIns == null) {
|
||||
throw new Exception("onRemoteRunParagraph notebook instance is null");
|
||||
}
|
||||
Note noteIns = notebookIns.getNote(noteId);
|
||||
if (noteIns == null) {
|
||||
throw new Exception(String.format("Can't found note id %s", noteId));
|
||||
}
|
||||
|
||||
Paragraph paragraph = noteIns.getParagraph(paragraphId);
|
||||
if (paragraph == null) {
|
||||
throw new Exception(String.format("Can't found paragraph %s %s", noteId, paragraphId));
|
||||
}
|
||||
|
||||
Set<String> userAndRoles = Sets.newHashSet();
|
||||
userAndRoles.add(SecurityUtils.getPrincipal());
|
||||
userAndRoles.addAll(SecurityUtils.getRoles());
|
||||
if (!notebookIns.getNotebookAuthorization().hasWriteAuthorization(userAndRoles, noteId)) {
|
||||
throw new ForbiddenException(String.format("can't execute note %s", noteId));
|
||||
}
|
||||
|
||||
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
|
||||
paragraph.setAuthenticationInfo(subject);
|
||||
|
||||
noteIns.run(paragraphId);
|
||||
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Notebook Information Change event
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ public class ZeppelinIT extends AbstractZeppelinIT {
|
|||
* z.run(2, context)
|
||||
* }
|
||||
*/
|
||||
setTextOfParagraph(4, "z.angularWatch(\"myVar\", (before:Object, after:Object, context:org.apache.zeppelin.interpreter.InterpreterContext)=>{ z.run(2, context)})");
|
||||
setTextOfParagraph(4, "z.angularWatch(\"myVar\", (before:Object, after:Object, context:org.apache.zeppelin.interpreter.InterpreterContext)=>{ z.run(2)})");
|
||||
runParagraph(4);
|
||||
waitForParagraph(4, "FINISHED");
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.zeppelin.rest;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
|
|
@ -346,6 +347,34 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
assertEquals(Status.FINISHED, p2.getStatus());
|
||||
assertEquals("10", p2.getResult().message().get(0).getData());
|
||||
|
||||
Paragraph p3 = note.addParagraph();
|
||||
Map config3 = p3.getConfig();
|
||||
config3.put("enabled", true);
|
||||
p3.setConfig(config3);
|
||||
p3.setText("%spark println(new java.util.Date())");
|
||||
p3.setAuthenticationInfo(anonymous);
|
||||
|
||||
p0.setText(String.format("%%spark z.runNote(\"%s\")", note.getId()));
|
||||
note.run(p0.getId());
|
||||
waitForFinish(p0);
|
||||
waitForFinish(p1);
|
||||
waitForFinish(p2);
|
||||
waitForFinish(p3);
|
||||
|
||||
assertEquals(Status.FINISHED, p3.getStatus());
|
||||
String p3result = p3.getResult().message().get(0).getData();
|
||||
assertNotEquals(null, p3result);
|
||||
assertNotEquals("", p3result);
|
||||
|
||||
p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note.getId(), p3.getId()));
|
||||
p3.setText("%%spark println(\"END\")");
|
||||
|
||||
note.run(p0.getId());
|
||||
waitForFinish(p0);
|
||||
waitForFinish(p3);
|
||||
|
||||
assertNotEquals(p3result, p3.getResult().message());
|
||||
|
||||
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -172,7 +172,7 @@ a.navbar-brand:hover {
|
|||
height: 28px;
|
||||
width: 200px;
|
||||
font-size: 14px;
|
||||
font-family: 'Helvetica Neue', Helvetica, Arial, 'FontAwesome', sans-serif;
|
||||
font-family: 'FontAwesome', 'Helvetica Neue', Helvetica, Arial, sans-serif;
|
||||
}
|
||||
|
||||
.dropdown-submenu {
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ limitations under the License.
|
|||
|
||||
<select class="form-control input-sm"
|
||||
ng-if="paragraph.settings.forms[formulaire.name].options && paragraph.settings.forms[formulaire.name].type != 'checkbox'"
|
||||
ng-change="runParagraph(getEditorValue())"
|
||||
ng-model="paragraph.settings.params[formulaire.name]"
|
||||
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
|
||||
name="{{formulaire.name}}"
|
||||
|
|
|
|||
|
|
@ -476,6 +476,11 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
return interpreterContext;
|
||||
}
|
||||
|
||||
public InterpreterContextRunner getInterpreterContextRunner() {
|
||||
|
||||
return new ParagraphRunner(note, note.getId(), getId());
|
||||
}
|
||||
|
||||
static class ParagraphRunner extends InterpreterContextRunner {
|
||||
private transient Note note;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue