Merge remote-tracking branch 'origin/master' into ZEPPELIN-212

This commit is contained in:
Lee moon soo 2016-11-29 19:18:56 -08:00
commit f5034b8c6e
43 changed files with 3304 additions and 1119 deletions

View file

@ -66,16 +66,22 @@ matrix:
- jdk: "oraclejdk7"
env: TEST_SELENIUM="true" SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark -Pexamples" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.AbstractFunctionalSuite -DfailIfNoTests=false"
# Test python/pyspark with python 2
- jdk: "oraclejdk7"
env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -pl spark,python -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python -Dtest=org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false"
# Test python/pyspark with python 3
- jdk: "oraclejdk7"
env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Phadoop-2.3 -Ppyspark -Pscala-2.11" BUILD_FLAG="package -pl spark,python -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python -Dtest=org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false"
before_install:
- echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxPermSize=1024m -XX:-UseGCOverheadLimit'" >> ~/.mavenrc
- "ls -la .spark-dist ${HOME}/.m2/repository/.cache/maven-download-plugin"
- ./testing/install_external_dependencies.sh
- ls -la .spark-dist ${HOME}/.m2/repository/.cache/maven-download-plugin || true
- ls .node_modules && cp -r .node_modules zeppelin-web/node_modules || echo "node_modules are not cached"
- mkdir -p ~/R
- echo 'R_LIBS=~/R' > ~/.Renviron
- R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', lib='~/R')"
- export R_LIBS='~/R'
- "/sbin/start-stop-daemon --start --quiet --pidfile /tmp/custom_xvfb_99.pid --make-pidfile --background --exec /usr/bin/Xvfb -- :99 -ac -screen 0 1600x1024x16"
- ./dev/change_scala_version.sh $SCALA_VER
- source ~/.environ
install:
- mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=warn $BUILD_FLAG $PROFILE -B
@ -99,4 +105,3 @@ after_failure:
- cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.out
- cat zeppelin-web/npm-debug.log
- cat spark-*/logs/*

View file

@ -26,7 +26,7 @@ user3 = password4, role2
# Sample LDAP configuration, for user Authentication, currently tested for single Realm
[main]
### A sample for configuring Active Directory Realm
#activeDirectoryRealm = org.apache.zeppelin.server.ActiveDirectoryGroupRealm
#activeDirectoryRealm = org.apache.zeppelin.realm.ActiveDirectoryGroupRealm
#activeDirectoryRealm.systemUsername = userNameA
#use either systemPassword or hadoopSecurityCredentialPath, more details in http://zeppelin.apache.org/docs/latest/security/shiroauthentication.html
@ -38,7 +38,7 @@ user3 = password4, role2
#activeDirectoryRealm.authorizationCachingEnabled = false
### A sample for configuring LDAP Directory Realm
#ldapRealm = org.apache.zeppelin.server.LdapGroupRealm
#ldapRealm = org.apache.zeppelin.realm.LdapGroupRealm
## search base for ldap groups (only relevant for LdapGroupRealm):
#ldapRealm.contextFactory.environment[ldap.searchBase] = dc=COMPANY,dc=COM
#ldapRealm.contextFactory.url = ldap://ldap.test.com:389
@ -74,7 +74,7 @@ admin = *
[urls]
# This section is used for url-based security.
# You can secure interpreter, configuration and credential information by urls. Comment or uncomment the below urls that you want to hide.
# You can secure interpreter, configuration and credential information by urls. Comment or uncomment the below urls that you want to hide.
# anon means the access is anonymous.
# authc means Form based Auth Security
# To enfore security, comment the line below and uncomment the next one

View file

@ -150,7 +150,17 @@ How does the front-end AngularJS API compares to the [back-end API](./back-end-a
<td>Executing Paragraph</td>
<td>z.runParagraph(paragraphId)</td>
<td>z.run(paragraphId)</td>
</tr>
</tr>
<tr>
<td>Executing Paragraph (Specific paragraphs in other notes) (</td>
<td></td>
<td>z.run(noteid, paragraphId)</td>
</tr>
<tr>
<td>Executing note</td>
<td></td>
<td>z.runNote(noteId)</td>
</tr>
<tbody>
<tbody>
</table>

View file

@ -25,7 +25,11 @@ limitations under the License.
## Overview
JDBC interpreter lets you create a JDBC connection to any data sources seamlessly. By now, it has been tested with:
JDBC interpreter lets you create a JDBC connection to any data sources seamlessly.
Inserts, Updates, and Upserts are applied immediately after running each statement.
By now, it has been tested with:
<div class="row" style="margin: 30px auto;">
<div class="col-md-6">

View file

@ -93,6 +93,7 @@ class FigureCanvasZInline(FigureCanvasAgg):
# Express the image as bytes
buf = BytesIO()
self.print_figure(buf, **kwargs)
fmt = fmt.encode()
byte_str = b"data:image/%s;base64," %fmt
byte_str += base64.b64encode(buf.getvalue())

View file

@ -462,6 +462,8 @@ public class JDBCInterpreter extends Interpreter {
msg.append(UPDATE_COUNT_HEADER).append(NEWLINE);
msg.append(updateCount).append(NEWLINE);
}
//In case user ran an insert/update/upsert statement
if (connection.getAutoCommit() != true) connection.commit();
} finally {
if (resultSet != null) {
try {

View file

@ -477,7 +477,7 @@
<version>1.0.1</version>
<executions>
<execution>
<phase>validate</phase>
<phase>verify</phase>
<goals>
<goal>validate</goal>
</goals>

View file

@ -57,6 +57,7 @@ public class PythonInterpreter extends Interpreter {
private Boolean py4JisInstalled = false;
private InterpreterContext context;
private Pattern errorInLastLine = Pattern.compile(".*(Error|Exception): .*$");
private String pythonPath;
private int maxResult;
PythonProcess process = null;
@ -74,6 +75,8 @@ public class PythonInterpreter extends Interpreter {
registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
}
// Add zeppelin-bundled libs to PYTHONPATH
setPythonPath("../interpreter/lib/python:$PYTHONPATH");
LOG.info("Starting Python interpreter ---->");
LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON));
@ -198,13 +201,17 @@ public class PythonInterpreter extends Interpreter {
return null;
}
public void setPythonPath(String pythonPath) {
this.pythonPath = pythonPath;
}
public PythonProcess getPythonProcess() {
if (process == null) {
String binPath = getProperty(ZEPPELIN_PYTHON);
if (pythonCommand != null) {
binPath = pythonCommand;
}
return new PythonProcess(binPath);
return new PythonProcess(binPath, pythonPath);
} else {
return process;
}

View file

@ -42,10 +42,12 @@ public class PythonProcess {
Process process;
private String binPath;
private String pythonPath;
private long pid;
public PythonProcess(String binPath) {
public PythonProcess(String binPath, String pythonPath) {
this.binPath = binPath;
this.pythonPath = pythonPath;
}
public void open() throws IOException {
@ -65,6 +67,9 @@ public class PythonProcess {
cmd = binPath + " -iu";
}
builder = new ProcessBuilder("bash", "-c", cmd);
if (pythonPath != null) {
builder.environment().put("PYTHONPATH", pythonPath);
}
}
builder.redirectErrorStream(true);

View file

@ -102,7 +102,7 @@ public class PythonInterpreterWithPythonInstalledTest {
realPython.open();
//when
InterpreterResult ret1 = realPython.interpret("print \"...\"", null);
InterpreterResult ret1 = realPython.interpret("print(\"...\")", null);
//then
//System.out.println("\nInterpreter response: \n" + ret.message());

View file

@ -161,7 +161,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
innerOut.setInterpreterOutput(context.out)
try {
import tools.nsc.interpreter.Results._
import scala.tools.nsc.interpreter.Results._
REPL.interpret(code) match {
case Success => {
logger.debug(s"Successfully executed `$code` in $paragraphId")

View file

@ -38,6 +38,12 @@
<mockito.version>1.10.19</mockito.version>
<powermock.version>1.6.4</powermock.version>
<spark.version>2.0.1</spark.version>
<pyspark.test.exclude>
**/PySparkInterpreterMatplotlibTest.java
</pyspark.test.exclude>
<pyspark.test.include>
**/*Test.*
</pyspark.test.include>
</properties>
<dependencies>
@ -322,6 +328,7 @@
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
<excludes>
<exclude>**/SparkRInterpreterTest.java</exclude>
<exclude>${pyspark.test.exclude}</exclude>
</excludes>
</configuration>
</plugin>
@ -431,11 +438,35 @@
<exclude>**/SparkRInterpreter.java</exclude>
</excludes>
<testExcludes>
<testExclude>${pyspark.test.exclude}</testExclude>
<testExclude>**/SparkRInterpreterTest.java</testExclude>
<testExclude>**/ZeppelinRTest.java</testExclude>
</testExcludes>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/ZeppelinR.scala</exclude>
<exclude>**/SparkRBackend.scala</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>${pyspark.test.include}</include>
</includes>
<excludes>
<exclude>${pyspark.test.exclude}</exclude>
<exclude>**/SparkRInterpreterTest.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
@ -580,7 +611,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes combine.self="override"></excludes>
<testExcludes combine.self="override"></testExcludes>
<testExcludes combine.self="override">
<testExclude>${pyspark.test.exclude}</testExclude>
</testExcludes>
</configuration>
</plugin>
<plugin>
@ -596,6 +629,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes combine.self="override">
<exclude>${pyspark.test.exclude}</exclude>
</excludes>
</configuration>
</plugin>

View file

@ -171,7 +171,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
Map env = EnvironmentUtils.getProcEnvironment();
if (!env.containsKey("PYTHONPATH")) {
SparkConf conf = getSparkConf();
env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":"));
env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
":../interpreter/lib/python");
}
return env;
}

View file

@ -45,6 +45,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
@ -300,32 +301,103 @@ public class ZeppelinContext {
/**
* Run paragraph by id
* @param id
* @param noteId
* @param paragraphId
*/
@ZeppelinApi
public void run(String id) {
run(id, interpreterContext);
public void run(String noteId, String paragraphId) {
run(noteId, paragraphId, interpreterContext);
}
/**
* Run paragraph by id
* @param id
* @param paragraphId
*/
@ZeppelinApi
public void run(String paragraphId) {
String noteId = interpreterContext.getNoteId();
run(noteId, paragraphId, interpreterContext);
}
/**
* Run paragraph by id
* @param noteId
* @param context
*/
@ZeppelinApi
public void run(String id, InterpreterContext context) {
if (id.equals(context.getParagraphId())) {
public void run(String noteId, String paragraphId, InterpreterContext context) {
if (paragraphId.equals(context.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
}
for (InterpreterContextRunner r : context.getRunners()) {
if (id.equals(r.getParagraphId())) {
r.run();
return;
}
List<InterpreterContextRunner> runners =
getInterpreterContextRunner(noteId, paragraphId, context);
if (runners.size() <= 0) {
throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
}
throw new InterpreterException("Paragraph " + id + " not found");
for (InterpreterContextRunner r : runners) {
r.run();
}
}
public void runNote(String noteId) {
runNote(noteId, interpreterContext);
}
public void runNote(String noteId, InterpreterContext context) {
String runningNoteId = context.getNoteId();
String runningParagraphId = context.getParagraphId();
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
if (runners.size() <= 0) {
throw new InterpreterException("Note " + noteId + " not found " + runners.size());
}
for (InterpreterContextRunner r : runners) {
if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) {
continue;
}
r.run();
}
}
/**
* get Zeppelin Paragraph Runner from zeppelin server
* @param noteId
*/
@ZeppelinApi
public List<InterpreterContextRunner> getInterpreterContextRunner(
String noteId, InterpreterContext interpreterContext) {
List<InterpreterContextRunner> runners = new LinkedList<>();
RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
if (remoteWorksController != null) {
runners = remoteWorksController.getRemoteContextRunner(noteId);
}
return runners;
}
/**
* get Zeppelin Paragraph Runner from zeppelin server
* @param noteId
* @param paragraphId
*/
@ZeppelinApi
public List<InterpreterContextRunner> getInterpreterContextRunner(
String noteId, String paragraphId, InterpreterContext interpreterContext) {
List<InterpreterContextRunner> runners = new LinkedList<>();
RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
if (remoteWorksController != null) {
runners = remoteWorksController.getRemoteContextRunner(noteId, paragraphId);
}
return runners;
}
/**
@ -334,7 +406,8 @@ public class ZeppelinContext {
*/
@ZeppelinApi
public void run(int idx) {
run(idx, interpreterContext);
String noteId = interpreterContext.getNoteId();
run(noteId, idx, interpreterContext);
}
/**
@ -342,12 +415,13 @@ public class ZeppelinContext {
* @param idx index starting from 0
* @param context interpreter context
*/
public void run(int idx, InterpreterContext context) {
if (idx >= context.getRunners().size()) {
public void run(String noteId, int idx, InterpreterContext context) {
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
if (idx >= runners.size()) {
throw new InterpreterException("Index out of bound");
}
InterpreterContextRunner runner = context.getRunners().get(idx);
InterpreterContextRunner runner = runners.get(idx);
if (runner.getParagraphId().equals(context.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
}
@ -366,13 +440,14 @@ public class ZeppelinContext {
*/
@ZeppelinApi
public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
String noteId = context.getNoteId();
for (Object idOrIdx : paragraphIdOrIdx) {
if (idOrIdx instanceof String) {
String id = (String) idOrIdx;
run(id, context);
String paragraphId = (String) idOrIdx;
run(noteId, paragraphId, context);
} else if (idOrIdx instanceof Integer) {
Integer idx = (Integer) idOrIdx;
run(idx, context);
run(noteId, idx, context);
} else {
throw new InterpreterException("Paragraph " + idOrIdx + " not found");
}
@ -389,13 +464,7 @@ public class ZeppelinContext {
*/
@ZeppelinApi
public void runAll(InterpreterContext context) {
for (InterpreterContextRunner r : context.getRunners()) {
if (r.getParagraphId().equals(context.getParagraphId())) {
// skip itself
continue;
}
r.run();
}
runNote(context.getNoteId());
}
@ZeppelinApi

View file

@ -29,6 +29,7 @@ from pyspark.broadcast import Broadcast
from pyspark.serializers import MarshalSerializer, PickleSerializer
import ast
import traceback
import warnings
# for back compatibility
from pyspark.sql import SQLContext, HiveContext, Row

View file

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import static org.junit.Assert.*;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class PySparkInterpreterMatplotlibTest {
public static SparkInterpreter sparkInterpreter;
public static PySparkInterpreter pyspark;
public static InterpreterGroup intpGroup;
private File tmpDir;
public static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class);
private InterpreterContext context;
public static class AltPySparkInterpreter extends PySparkInterpreter {
/**
* Since pyspark output is sent to an outputstream rather than
* being directly provided by interpret(), this subclass is created to
* override interpret() to append the result from the outputStream
* for the sake of convenience in testing.
*/
public AltPySparkInterpreter(Properties property) {
super(property);
}
/**
* This code is mainly copied from RemoteInterpreterServer.java which
* normally handles this in real use cases.
*/
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
context.out.clear();
InterpreterResult result = super.interpret(st, context);
List<InterpreterResultMessage> resultMessages = null;
try {
context.out.flush();
resultMessages = context.out.toInterpreterResultMessage();
} catch (IOException e) {
e.printStackTrace();
}
resultMessages.addAll(result.message());
return new InterpreterResult(result.code(), resultMessages);
}
}
public static Properties getPySparkTestProperties() {
Properties p = new Properties();
p.setProperty("master", "local[*]");
p.setProperty("spark.app.name", "Zeppelin Test");
p.setProperty("zeppelin.spark.useHiveContext", "true");
p.setProperty("zeppelin.spark.maxResult", "1000");
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
return p;
}
/**
* Get spark version number as a numerical value.
* eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
*/
public static int getSparkVersionNumber() {
if (sparkInterpreter == null) {
return 0;
}
String[] split = sparkInterpreter.getSparkContext().version().split("\\.");
int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
return version;
}
@Before
public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo");
tmpDir.mkdirs();
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
if (sparkInterpreter == null) {
sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
intpGroup.get("note").add(sparkInterpreter);
sparkInterpreter.setInterpreterGroup(intpGroup);
sparkInterpreter.open();
}
if (pyspark == null) {
pyspark = new AltPySparkInterpreter(getPySparkTestProperties());
intpGroup.get("note").add(pyspark);
pyspark.setInterpreterGroup(intpGroup);
pyspark.open();
}
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(null));
}
@After
public void tearDown() throws Exception {
delete(tmpDir);
}
private void delete(File file) {
if (file.isFile()) file.delete();
else if (file.isDirectory()) {
File[] files = file.listFiles();
if (files != null && files.length > 0) {
for (File f : files) {
delete(f);
}
}
file.delete();
}
}
@Test
public void dependenciesAreInstalled() {
// matplotlib
InterpreterResult ret = pyspark.interpret("import matplotlib", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
// inline backend
ret = pyspark.interpret("import backend_zinline", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
public void showPlot() {
// Simple plot test
InterpreterResult ret;
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
ret = pyspark.interpret("plt.close()", context);
ret = pyspark.interpret("z.configure_mpl(interactive=False)", context);
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret = pyspark.interpret("plt.show()", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().toString(), Type.HTML, ret.message().get(0).getType());
assertTrue(ret.message().get(0).getData().contains("data:image/png;base64"));
assertTrue(ret.message().get(0).getData().contains("<div>"));
}
@Test
// Test for when configuration is set to auto-close figures after show().
public void testClose() {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
ret = pyspark.interpret("plt.close()", context);
ret = pyspark.interpret("z.configure_mpl(interactive=False, close=True, angular=False)", context);
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret1 = pyspark.interpret("plt.show()", context);
// Second call to show() should print nothing, and Type should be TEXT.
// This is because when close=True, there should be no living instances
// of FigureManager, causing show() to return before setting the output
// type to HTML.
ret = pyspark.interpret("plt.show()", context);
assertEquals(0, ret.message().size());
// Now test that new plot is drawn. It should be identical to the
// previous one.
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret2 = pyspark.interpret("plt.show()", context);
assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType());
assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData());
}
@Test
// Test for when configuration is set to not auto-close figures after show().
public void testNoClose() {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
ret = pyspark.interpret("plt.close()", context);
ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=False)", context);
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret1 = pyspark.interpret("plt.show()", context);
// Second call to show() should print nothing, and Type should be HTML.
// This is because when close=False, there should be living instances
// of FigureManager, causing show() to set the output
// type to HTML even though the figure is inactive.
ret = pyspark.interpret("plt.show()", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
// Now test that plot can be reshown if it is updated. It should be
// different from the previous one because it will plot the same line
// again but in a different color.
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret2 = pyspark.interpret("plt.show()", context);
assertNotSame(ret1.message().get(1).getData(), ret2.message().get(1).getData());
}
@Test
// Test angular mode
public void testAngular() {
InterpreterResult ret;
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
ret = pyspark.interpret("plt.close()", context);
ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=True)", context);
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret = pyspark.interpret("plt.show()", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().toString(), Type.ANGULAR, ret.message().get(1).getType());
// Check if the figure data is in the Angular Object Registry
AngularObjectRegistry registry = context.getAngularObjectRegistry();
String figureData = registry.getAll("note", null).get(0).toString();
assertTrue(figureData.contains("data:image/png;base64"));
}
}

View file

@ -0,0 +1,46 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script for installing R / Python dependencies for Travis CI
set -ev
touch ~/.environ
# Install R dependencies if R profiles are used
if [[ ${PROFILE/"-Pr "} != $PROFILE ]] || [[ ${PROFILE/"-Psparkr "} != $PROFILE ]] ; then
echo "R_LIBS=~/R" > ~/.Renviron
echo "export R_LIBS=~/R" >> ~/.environ
source ~/.environ
if [[ ! -d "$HOME/R/knitr" ]] ; then
mkdir -p ~/R
R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', lib='~/R')"
fi
fi
# Install Python dependencies for Python specific tests
if [[ -n "$PYTHON" ]] ; then
wget https://repo.continuum.io/miniconda/Miniconda${PYTHON}-latest-Linux-x86_64.sh -O miniconda.sh
bash miniconda.sh -b -p $HOME/miniconda
echo "export PATH='$HOME/miniconda/bin:$PATH'" >> ~/.environ
source ~/.environ
hash -r
conda config --set always_yes yes --set changeps1 no
conda update -q conda
conda info -a
conda config --add channels conda-forge
conda install -q matplotlib pandasql
fi

View file

@ -61,6 +61,7 @@ public class InterpreterContext {
private List<InterpreterContextRunner> runners;
private String className;
private RemoteEventClientWrapper client;
private RemoteWorksController remoteWorksController;
public InterpreterContext(String noteId,
String paragraphId,
@ -75,6 +76,24 @@ public class InterpreterContext {
List<InterpreterContextRunner> runners,
InterpreterOutput out
) {
this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
config, gui, angularObjectRegistry, resourcePool, runners, out, null);
}
public InterpreterContext(String noteId,
String paragraphId,
String replName,
String paragraphTitle,
String paragraphText,
AuthenticationInfo authenticationInfo,
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry,
ResourcePool resourcePool,
List<InterpreterContextRunner> runners,
InterpreterOutput out,
RemoteWorksController remoteWorksController
) {
this.noteId = noteId;
this.paragraphId = paragraphId;
this.replName = replName;
@ -87,6 +106,7 @@ public class InterpreterContext {
this.resourcePool = resourcePool;
this.runners = runners;
this.out = out;
this.remoteWorksController = remoteWorksController;
}
public InterpreterContext(String noteId,
@ -101,9 +121,11 @@ public class InterpreterContext {
ResourcePool resourcePool,
List<InterpreterContextRunner> contextRunners,
InterpreterOutput output,
RemoteWorksController remoteWorksController,
RemoteInterpreterEventClient eventClient) {
this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
config, gui, angularObjectRegistry, resourcePool, contextRunners, output);
config, gui, angularObjectRegistry, resourcePool, contextRunners, output,
remoteWorksController);
this.client = new RemoteEventClient(eventClient);
}
@ -162,4 +184,12 @@ public class InterpreterContext {
public RemoteEventClientWrapper getClient() {
return client;
}
public RemoteWorksController getRemoteWorksController() {
return remoteWorksController;
}
public void setRemoteWorksController(RemoteWorksController remoteWorksController) {
this.remoteWorksController = remoteWorksController;
}
}

View file

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import java.util.List;
/**
* zeppelin job for Remote works controller by interpreter
*
*/
public interface RemoteWorksController {
List<InterpreterContextRunner> getRemoteContextRunner(String noteId);
List<InterpreterContextRunner> getRemoteContextRunner(String noteId, String paragraphId);
}

View file

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
/**
* Remote Zeppelin Server Resource
*/
public class RemoteZeppelinServerResource {
/**
* Resource Type for Zeppelin Server
*/
public enum Type{
PARAGRAPH_RUNNERS
}
private String ownerKey;
private Type resourceType;
private Object data;
public Type getResourceType() {
return resourceType;
}
public String getOwnerKey() {
return ownerKey;
}
public void setOwnerKey(String ownerKey) {
this.ownerKey = ownerKey;
}
public void setResourceType(Type resourceType) {
this.resourceType = resourceType;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}

View file

@ -21,8 +21,10 @@ import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
import org.apache.zeppelin.resource.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,6 +52,22 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
private final Map<ResourceId, Object> getResourceResponse = new HashMap<>();
private final Gson gson = new Gson();
/**
* Run paragraph
* @param runner
*/
public void getZeppelinServerNoteRunner(
String eventOwnerKey, ZeppelinServerResourceParagraphRunner runner) {
RemoteZeppelinServerResource eventBody = new RemoteZeppelinServerResource();
eventBody.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
eventBody.setOwnerKey(eventOwnerKey);
eventBody.setData(runner);
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE,
gson.toJson(eventBody)));
}
/**
* Run paragraph
* @param runner

View file

@ -26,9 +26,11 @@ import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourceId;
import org.apache.zeppelin.resource.ResourcePool;
@ -146,8 +148,9 @@ public class RemoteInterpreterEventPoller extends Thread {
InterpreterContextRunner runnerFromRemote = gson.fromJson(
event.getData(), RemoteInterpreterContextRunner.class);
interpreterProcess.getInterpreterContextRunnerPool().run(
listener.onRemoteRunParagraph(
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
ResourceSet resourceSet = getAllResourcePoolExcept();
sendResourcePoolResponseGetAll(resourceSet);
@ -222,6 +225,12 @@ public class RemoteInterpreterEventPoller extends Thread {
String status = appStatusUpdate.get("status");
appListener.onStatusChange(noteId, paragraphId, appId, status);
} else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE) {
RemoteZeppelinServerResource reqResourceBody = gson.fromJson(
event.getData(), RemoteZeppelinServerResource.class);
progressRemoteZeppelinControlEvent(
reqResourceBody.getResourceType(), listener, reqResourceBody);
} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
Map<String, String> metaInfos = gson.fromJson(event.getData(),
new TypeToken<Map<String, String>>() {
@ -241,6 +250,82 @@ public class RemoteInterpreterEventPoller extends Thread {
}
}
private void progressRemoteZeppelinControlEvent(
RemoteZeppelinServerResource.Type resourceType,
RemoteInterpreterProcessListener remoteWorksEventListener,
RemoteZeppelinServerResource reqResourceBody) throws Exception {
boolean broken = false;
final Gson gson = new Gson();
final String eventOwnerKey = reqResourceBody.getOwnerKey();
Client interpreterServerMain = null;
try {
interpreterServerMain = interpreterProcess.getClient();
final Client eventClient = interpreterServerMain;
if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
ZeppelinServerResourceParagraphRunner reqRunnerContext =
new ZeppelinServerResourceParagraphRunner();
Map<String, Object> reqResourceMap = (Map<String, Object>) reqResourceBody.getData();
String noteId = (String) reqResourceMap.get("noteId");
String paragraphId = (String) reqResourceMap.get("paragraphId");
reqRunnerContext.setNoteId(noteId);
reqRunnerContext.setParagraphId(paragraphId);
RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent =
new RemoteInterpreterProcessListener.RemoteWorksEventListener() {
@Override
public void onFinished(Object resultObject) {
boolean clientBroken = false;
if (resultObject != null && resultObject instanceof List) {
List<InterpreterContextRunner> runnerList =
(List<InterpreterContextRunner>) resultObject;
for (InterpreterContextRunner r : runnerList) {
remoteRunners.add(
new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
);
}
final RemoteZeppelinServerResource resResource =
new RemoteZeppelinServerResource();
resResource.setOwnerKey(eventOwnerKey);
resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
resResource.setData(remoteRunners);
try {
eventClient.onReceivedZeppelinResource(gson.toJson(resResource));
} catch (Exception e) {
clientBroken = true;
logger.error("Can't get RemoteInterpreterEvent", e);
waitQuietly();
} finally {
interpreterProcess.releaseClient(eventClient, clientBroken);
}
}
}
@Override
public void onError() {
logger.info("onGetParagraphRunners onError");
}
};
remoteWorksEventListener.onGetParagraphRunners(
reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent);
}
} catch (Exception e) {
broken = true;
logger.error("Can't get RemoteInterpreterEvent", e);
waitQuietly();
} finally {
interpreterProcess.releaseClient(interpreterServerMain, broken);
}
}
private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
Client client = null;
boolean broken = false;

View file

@ -29,4 +29,15 @@ public interface RemoteInterpreterProcessListener {
String noteId, String paragraphId, int index, InterpreterResult.Type type, String output);
public void onOutputClear(String noteId, String paragraphId);
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception;
public void onGetParagraphRunners(
String noteId, String paragraphId, RemoteWorksEventListener callback);
/**
* Remote works for Interpreter callback listener
*/
public interface RemoteWorksEventListener {
public void onFinished(Object resultObject);
public void onError();
}
}

View file

@ -23,7 +23,10 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.rmi.server.RemoteServer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
@ -78,6 +81,9 @@ public class RemoteInterpreterServer
private final Map<String, RunningApplication> runningApplications =
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
private Map<String, Object> remoteWorksResponsePool;
private ZeppelinRemoteWorksController remoteWorksController;
public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
@ -85,6 +91,8 @@ public class RemoteInterpreterServer
TServerSocket serverTransport = new TServerSocket(port);
server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport).processor(processor));
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
remoteWorksController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
}
@Override
@ -174,7 +182,6 @@ public class RemoteInterpreterServer
Constructor<Interpreter> constructor =
replClass.getConstructor(new Class[] {Properties.class});
Interpreter repl = constructor.newInstance(p);
repl.setClassloaderUrls(new URL[]{});
synchronized (interpreterGroup) {
@ -335,6 +342,42 @@ public class RemoteInterpreterServer
context.getGui());
}
@Override
public void onReceivedZeppelinResource(String responseJson) throws TException {
RemoteZeppelinServerResource response = gson.fromJson(
responseJson, RemoteZeppelinServerResource.class);
if (response == null) {
throw new TException("Bad response for remote resource");
}
try {
if (response.getResourceType() == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
List<InterpreterContextRunner> intpContextRunners = new LinkedList<>();
List<Map<String, Object>> remoteRunnersMap =
(List<Map<String, Object>>) response.getData();
String noteId = null;
String paragraphId = null;
for (Map<String, Object> runnerItem : remoteRunnersMap) {
noteId = (String) runnerItem.get("noteId");
paragraphId = (String) runnerItem.get("paragraphId");
intpContextRunners.add(
new ParagraphRunner(this, noteId, paragraphId)
);
}
synchronized (this.remoteWorksResponsePool) {
this.remoteWorksResponsePool.put(
response.getOwnerKey(),
intpContextRunners);
}
}
} catch (Exception e) {
throw e;
}
}
class InterpretJobListener implements JobListener {
@ -540,7 +583,7 @@ public class RemoteInterpreterServer
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry(),
interpreterGroup.getResourcePool(),
contextRunners, output, eventClient);
contextRunners, output, remoteWorksController, eventClient);
}
@ -582,7 +625,7 @@ public class RemoteInterpreterServer
static class ParagraphRunner extends InterpreterContextRunner {
Logger logger = LoggerFactory.getLogger(ParagraphRunner.class);
private transient RemoteInterpreterServer server;
public ParagraphRunner(RemoteInterpreterServer server, String noteId, String paragraphId) {
@ -596,6 +639,78 @@ public class RemoteInterpreterServer
}
}
static class ZeppelinRemoteWorksController implements RemoteWorksController{
Logger logger = LoggerFactory.getLogger(ZeppelinRemoteWorksController.class);
private final long DEFAULT_TIMEOUT_VALUE = 300000;
private final Map<String, Object> remoteWorksResponsePool;
private RemoteInterpreterServer server;
public ZeppelinRemoteWorksController(
RemoteInterpreterServer server, Map<String, Object> remoteWorksResponsePool) {
this.remoteWorksResponsePool = remoteWorksResponsePool;
this.server = server;
}
public String generateOwnerKey() {
String hashKeyText = new String("ownerKey" + System.currentTimeMillis());
String hashKey = String.valueOf(hashKeyText.hashCode());
return hashKey;
}
public boolean waitForEvent(String eventOwnerKey) throws InterruptedException {
return waitForEvent(eventOwnerKey, DEFAULT_TIMEOUT_VALUE);
}
public boolean waitForEvent(String eventOwnerKey, long timeout) throws InterruptedException {
boolean wasGetData = false;
long now = System.currentTimeMillis();
long endTime = System.currentTimeMillis() + timeout;
while (endTime >= now) {
synchronized (this.remoteWorksResponsePool) {
wasGetData = this.remoteWorksResponsePool.containsKey(eventOwnerKey);
}
if (wasGetData == true) {
break;
}
now = System.currentTimeMillis();
sleep(500);
}
return wasGetData;
}
@Override
public List<InterpreterContextRunner> getRemoteContextRunner(String noteId) {
return getRemoteContextRunner(noteId, null);
}
public List<InterpreterContextRunner> getRemoteContextRunner(
String noteId, String paragraphID) {
List<InterpreterContextRunner> runners = null;
String ownerKey = generateOwnerKey();
ZeppelinServerResourceParagraphRunner resource = new ZeppelinServerResourceParagraphRunner();
resource.setNoteId(noteId);
resource.setParagraphId(paragraphID);
server.eventClient.getZeppelinServerNoteRunner(ownerKey, resource);
try {
this.waitForEvent(ownerKey);
} catch (Exception e) {
return new LinkedList<>();
}
synchronized (this.remoteWorksResponsePool) {
runners = (List<InterpreterContextRunner>) this.remoteWorksResponsePool.get(ownerKey);
this.remoteWorksResponsePool.remove(ownerKey);
}
return runners;
}
}
private RemoteInterpreterResult convert(InterpreterResult result,
Map<String, Object> config, GUI gui) {

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -41,7 +41,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
OUTPUT_UPDATE_ALL(10),
ANGULAR_REGISTRY_PUSH(11),
APP_STATUS_UPDATE(12),
META_INFOS(13);
META_INFOS(13),
REMOTE_ZEPPELIN_SERVER_RESOURCE(14);
private final int value;
@ -88,6 +89,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return APP_STATUS_UPDATE;
case 13:
return META_INFOS;
case 14:
return REMOTE_ZEPPELIN_SERVER_RESOURCE;
default:
return null;
}

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-24")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase<RemoteInterpreterResultMessage, RemoteInterpreterResultMessage._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResultMessage> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage");

View file

@ -0,0 +1,520 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package org.apache.zeppelin.interpreter.thrift;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.SchemeFactory;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.scheme.TupleScheme;
import org.apache.thrift.protocol.TTupleProtocol;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.EncodingUtils;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.server.AbstractNonblockingServer.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.EnumMap;
import java.util.Set;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.Collections;
import java.util.BitSet;
import java.nio.ByteBuffer;
import java.util.Arrays;
import javax.annotation.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase<ZeppelinServerResourceParagraphRunner, ZeppelinServerResourceParagraphRunner._Fields>, java.io.Serializable, Cloneable, Comparable<ZeppelinServerResourceParagraphRunner> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner");
private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphId", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
schemes.put(StandardScheme.class, new ZeppelinServerResourceParagraphRunnerStandardSchemeFactory());
schemes.put(TupleScheme.class, new ZeppelinServerResourceParagraphRunnerTupleSchemeFactory());
}
public String noteId; // required
public String paragraphId; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
NOTE_ID((short)1, "noteId"),
PARAGRAPH_ID((short)2, "paragraphId");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}
/**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
case 1: // NOTE_ID
return NOTE_ID;
case 2: // PARAGRAPH_ID
return PARAGRAPH_ID;
default:
return null;
}
}
/**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}
/**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
}
private final short _thriftId;
private final String _fieldName;
_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {
return _thriftId;
}
public String getFieldName() {
return _fieldName;
}
}
// isset id assignments
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("paragraphId", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ZeppelinServerResourceParagraphRunner.class, metaDataMap);
}
public ZeppelinServerResourceParagraphRunner() {
}
public ZeppelinServerResourceParagraphRunner(
String noteId,
String paragraphId)
{
this();
this.noteId = noteId;
this.paragraphId = paragraphId;
}
/**
* Performs a deep copy on <i>other</i>.
*/
public ZeppelinServerResourceParagraphRunner(ZeppelinServerResourceParagraphRunner other) {
if (other.isSetNoteId()) {
this.noteId = other.noteId;
}
if (other.isSetParagraphId()) {
this.paragraphId = other.paragraphId;
}
}
public ZeppelinServerResourceParagraphRunner deepCopy() {
return new ZeppelinServerResourceParagraphRunner(this);
}
@Override
public void clear() {
this.noteId = null;
this.paragraphId = null;
}
public String getNoteId() {
return this.noteId;
}
public ZeppelinServerResourceParagraphRunner setNoteId(String noteId) {
this.noteId = noteId;
return this;
}
public void unsetNoteId() {
this.noteId = null;
}
/** Returns true if field noteId is set (has been assigned a value) and false otherwise */
public boolean isSetNoteId() {
return this.noteId != null;
}
public void setNoteIdIsSet(boolean value) {
if (!value) {
this.noteId = null;
}
}
public String getParagraphId() {
return this.paragraphId;
}
public ZeppelinServerResourceParagraphRunner setParagraphId(String paragraphId) {
this.paragraphId = paragraphId;
return this;
}
public void unsetParagraphId() {
this.paragraphId = null;
}
/** Returns true if field paragraphId is set (has been assigned a value) and false otherwise */
public boolean isSetParagraphId() {
return this.paragraphId != null;
}
public void setParagraphIdIsSet(boolean value) {
if (!value) {
this.paragraphId = null;
}
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case NOTE_ID:
if (value == null) {
unsetNoteId();
} else {
setNoteId((String)value);
}
break;
case PARAGRAPH_ID:
if (value == null) {
unsetParagraphId();
} else {
setParagraphId((String)value);
}
break;
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
case NOTE_ID:
return getNoteId();
case PARAGRAPH_ID:
return getParagraphId();
}
throw new IllegalStateException();
}
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}
switch (field) {
case NOTE_ID:
return isSetNoteId();
case PARAGRAPH_ID:
return isSetParagraphId();
}
throw new IllegalStateException();
}
@Override
public boolean equals(Object that) {
if (that == null)
return false;
if (that instanceof ZeppelinServerResourceParagraphRunner)
return this.equals((ZeppelinServerResourceParagraphRunner)that);
return false;
}
public boolean equals(ZeppelinServerResourceParagraphRunner that) {
if (that == null)
return false;
boolean this_present_noteId = true && this.isSetNoteId();
boolean that_present_noteId = true && that.isSetNoteId();
if (this_present_noteId || that_present_noteId) {
if (!(this_present_noteId && that_present_noteId))
return false;
if (!this.noteId.equals(that.noteId))
return false;
}
boolean this_present_paragraphId = true && this.isSetParagraphId();
boolean that_present_paragraphId = true && that.isSetParagraphId();
if (this_present_paragraphId || that_present_paragraphId) {
if (!(this_present_paragraphId && that_present_paragraphId))
return false;
if (!this.paragraphId.equals(that.paragraphId))
return false;
}
return true;
}
@Override
public int hashCode() {
List<Object> list = new ArrayList<Object>();
boolean present_noteId = true && (isSetNoteId());
list.add(present_noteId);
if (present_noteId)
list.add(noteId);
boolean present_paragraphId = true && (isSetParagraphId());
list.add(present_paragraphId);
if (present_paragraphId)
list.add(paragraphId);
return list.hashCode();
}
@Override
public int compareTo(ZeppelinServerResourceParagraphRunner other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}
int lastComparison = 0;
lastComparison = Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetNoteId()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId);
if (lastComparison != 0) {
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetParagraphId()).compareTo(other.isSetParagraphId());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetParagraphId()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphId, other.paragraphId);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}
public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("ZeppelinServerResourceParagraphRunner(");
boolean first = true;
sb.append("noteId:");
if (this.noteId == null) {
sb.append("null");
} else {
sb.append(this.noteId);
}
first = false;
if (!first) sb.append(", ");
sb.append("paragraphId:");
if (this.paragraphId == null) {
sb.append("null");
} else {
sb.append(this.paragraphId);
}
first = false;
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private static class ZeppelinServerResourceParagraphRunnerStandardSchemeFactory implements SchemeFactory {
public ZeppelinServerResourceParagraphRunnerStandardScheme getScheme() {
return new ZeppelinServerResourceParagraphRunnerStandardScheme();
}
}
private static class ZeppelinServerResourceParagraphRunnerStandardScheme extends StandardScheme<ZeppelinServerResourceParagraphRunner> {
public void read(org.apache.thrift.protocol.TProtocol iprot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // NOTE_ID
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.noteId = iprot.readString();
struct.setNoteIdIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 2: // PARAGRAPH_ID
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.paragraphId = iprot.readString();
struct.setParagraphIdIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
public void write(org.apache.thrift.protocol.TProtocol oprot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.noteId != null) {
oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
oprot.writeString(struct.noteId);
oprot.writeFieldEnd();
}
if (struct.paragraphId != null) {
oprot.writeFieldBegin(PARAGRAPH_ID_FIELD_DESC);
oprot.writeString(struct.paragraphId);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
private static class ZeppelinServerResourceParagraphRunnerTupleSchemeFactory implements SchemeFactory {
public ZeppelinServerResourceParagraphRunnerTupleScheme getScheme() {
return new ZeppelinServerResourceParagraphRunnerTupleScheme();
}
}
private static class ZeppelinServerResourceParagraphRunnerTupleScheme extends TupleScheme<ZeppelinServerResourceParagraphRunner> {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
BitSet optionals = new BitSet();
if (struct.isSetNoteId()) {
optionals.set(0);
}
if (struct.isSetParagraphId()) {
optionals.set(1);
}
oprot.writeBitSet(optionals, 2);
if (struct.isSetNoteId()) {
oprot.writeString(struct.noteId);
}
if (struct.isSetParagraphId()) {
oprot.writeString(struct.paragraphId);
}
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
struct.noteId = iprot.readString();
struct.setNoteIdIsSet(true);
}
if (incoming.get(1)) {
struct.paragraphId = iprot.readString();
struct.setParagraphIdIsSet(true);
}
}
}
}

View file

@ -19,7 +19,7 @@
namespace java org.apache.zeppelin.interpreter.thrift
struct RemoteInterpreterContext {
1: string sessionKey,
1: string noteId,
2: string paragraphId,
3: string replName,
4: string paragraphTitle,
@ -54,9 +54,11 @@ enum RemoteInterpreterEventType {
OUTPUT_UPDATE_ALL = 10,
ANGULAR_REGISTRY_PUSH = 11,
APP_STATUS_UPDATE = 12,
META_INFOS = 13
META_INFOS = 13,
REMOTE_ZEPPELIN_SERVER_RESOURCE = 14
}
struct RemoteInterpreterEvent {
1: RemoteInterpreterEventType type,
2: string data // json serialized data
@ -67,6 +69,11 @@ struct RemoteApplicationResult {
2: string msg
}
struct ZeppelinServerResourceParagraphRunner {
1: string noteId,
2: string paragraphId
}
/*
* The below variables(name, value) will be connected to getCompletions in paragraph.controller.js
*
@ -114,4 +121,6 @@ service RemoteInterpreterService {
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionKey, 4: string paragraphId);
RemoteApplicationResult unloadApplication(1: string applicationInstanceId);
RemoteApplicationResult runApplication(1: string applicationInstanceId);
void onReceivedZeppelinResource(1: string object);
}

View file

@ -171,4 +171,16 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
}
@Override
public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
if (callback != null) {
callback.onFinished(new LinkedList<>());
}
}
@Override
public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
}
}

View file

@ -82,16 +82,16 @@ public class RemoteInterpreterTest {
private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) {
return new RemoteInterpreter(
p,
noteId,
MockInterpreterA.class.getName(),
new File(INTERPRETER_SCRIPT).getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null,
null,
p,
noteId,
MockInterpreterA.class.getName(),
new File(INTERPRETER_SCRIPT).getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null,
null,
"anonymous",
false);
}
@ -102,16 +102,16 @@ public class RemoteInterpreterTest {
private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) {
return new RemoteInterpreter(
p,
noteId,
MockInterpreterB.class.getName(),
new File(INTERPRETER_SCRIPT).getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null,
null,
p,
noteId,
MockInterpreterB.class.getName(),
new File(INTERPRETER_SCRIPT).getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null,
null,
"anonymous",
false);
}
@ -218,7 +218,6 @@ public class RemoteInterpreterTest {
"anonymous",
false);
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);

View file

@ -313,4 +313,16 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
}
@Override
public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
if (callback != null) {
callback.onFinished(new LinkedList<>());
}
}
@Override
public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
}
}

View file

@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@ -41,7 +42,12 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -56,6 +62,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.notebook.socket.WatcherMessage;
import org.apache.zeppelin.rest.exception.ForbiddenException;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
@ -1529,6 +1536,72 @@ public class NotebookServer extends WebSocketServlet implements
broadcast(noteId, msg);
}
@Override
public void onGetParagraphRunners(
String noteId, String paragraphId, RemoteWorksEventListener callback) {
Notebook notebookIns = notebook();
List<InterpreterContextRunner> runner = new LinkedList<>();
if (notebookIns == null) {
LOG.info("intepreter request notebook instance is null");
callback.onFinished(notebookIns);
}
try {
Note note = notebookIns.getNote(noteId);
if (note != null) {
if (paragraphId != null) {
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph != null) {
runner.add(paragraph.getInterpreterContextRunner());
}
} else {
for (Paragraph p : note.getParagraphs()) {
runner.add(p.getInterpreterContextRunner());
}
}
}
callback.onFinished(runner);
} catch (NullPointerException e) {
LOG.warn(e.getMessage());
callback.onError();
}
}
@Override
public void onRemoteRunParagraph(String noteId, String paragraphId) throws Exception {
Notebook notebookIns = notebook();
try {
if (notebookIns == null) {
throw new Exception("onRemoteRunParagraph notebook instance is null");
}
Note noteIns = notebookIns.getNote(noteId);
if (noteIns == null) {
throw new Exception(String.format("Can't found note id %s", noteId));
}
Paragraph paragraph = noteIns.getParagraph(paragraphId);
if (paragraph == null) {
throw new Exception(String.format("Can't found paragraph %s %s", noteId, paragraphId));
}
Set<String> userAndRoles = Sets.newHashSet();
userAndRoles.add(SecurityUtils.getPrincipal());
userAndRoles.addAll(SecurityUtils.getRoles());
if (!notebookIns.getNotebookAuthorization().hasWriteAuthorization(userAndRoles, noteId)) {
throw new ForbiddenException(String.format("can't execute note %s", noteId));
}
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
paragraph.setAuthenticationInfo(subject);
noteIns.run(paragraphId);
} catch (Exception e) {
throw e;
}
}
/**
* Notebook Information Change event
*/

View file

@ -137,7 +137,7 @@ public class ZeppelinIT extends AbstractZeppelinIT {
* z.run(2, context)
* }
*/
setTextOfParagraph(4, "z.angularWatch(\"myVar\", (before:Object, after:Object, context:org.apache.zeppelin.interpreter.InterpreterContext)=>{ z.run(2, context)})");
setTextOfParagraph(4, "z.angularWatch(\"myVar\", (before:Object, after:Object, context:org.apache.zeppelin.interpreter.InterpreterContext)=>{ z.run(2)})");
runParagraph(4);
waitForParagraph(4, "FINISHED");

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.rest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
@ -346,6 +347,34 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
assertEquals(Status.FINISHED, p2.getStatus());
assertEquals("10", p2.getResult().message().get(0).getData());
Paragraph p3 = note.addParagraph();
Map config3 = p3.getConfig();
config3.put("enabled", true);
p3.setConfig(config3);
p3.setText("%spark println(new java.util.Date())");
p3.setAuthenticationInfo(anonymous);
p0.setText(String.format("%%spark z.runNote(\"%s\")", note.getId()));
note.run(p0.getId());
waitForFinish(p0);
waitForFinish(p1);
waitForFinish(p2);
waitForFinish(p3);
assertEquals(Status.FINISHED, p3.getStatus());
String p3result = p3.getResult().message().get(0).getData();
assertNotEquals(null, p3result);
assertNotEquals("", p3result);
p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note.getId(), p3.getId()));
p3.setText("%%spark println(\"END\")");
note.run(p0.getId());
waitForFinish(p0);
waitForFinish(p3);
assertNotEquals(p3result, p3.getResult().message());
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}

View file

@ -172,7 +172,7 @@ a.navbar-brand:hover {
height: 28px;
width: 200px;
font-size: 14px;
font-family: 'Helvetica Neue', Helvetica, Arial, 'FontAwesome', sans-serif;
font-family: 'FontAwesome', 'Helvetica Neue', Helvetica, Arial, sans-serif;
}
.dropdown-submenu {

View file

@ -29,7 +29,6 @@ limitations under the License.
<select class="form-control input-sm"
ng-if="paragraph.settings.forms[formulaire.name].options && paragraph.settings.forms[formulaire.name].type != 'checkbox'"
ng-change="runParagraph(getEditorValue())"
ng-model="paragraph.settings.params[formulaire.name]"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
name="{{formulaire.name}}"

View file

@ -476,6 +476,11 @@ public class Paragraph extends Job implements Serializable, Cloneable {
return interpreterContext;
}
public InterpreterContextRunner getInterpreterContextRunner() {
return new ParagraphRunner(note, note.getId(), getId());
}
static class ParagraphRunner extends InterpreterContextRunner {
private transient Note note;