Implement getParagraphRunner transaction.

This commit is contained in:
CloverHearts 2016-11-15 19:49:51 +09:00
parent 25232387d1
commit 3d34f9e0bb
48 changed files with 575 additions and 173 deletions

View file

@ -23,10 +23,7 @@ import java.io.PrintStream;
import java.io.ByteArrayOutputStream;
import java.util.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.slf4j.Logger;
@ -89,6 +86,15 @@ public class AlluxioInterpreter extends Interpreter {
}
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] lines = splitAndRemoveEmpty(st, "\n");

View file

@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -47,6 +48,15 @@ public class AngularInterpreter extends Interpreter {
public void close() {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
return new InterpreterResult(Code.SUCCESS, Type.ANGULAR, st);

View file

@ -51,10 +51,7 @@ import java.sql.SQLException;
import java.util.Properties;
import java.util.Set;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@ -290,6 +287,15 @@ public class BigQueryInterpreter extends Interpreter {
service = null;
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String sql, InterpreterContext contextInterpreter) {
logger.info("Run SQL command '{}'", sql);

View file

@ -19,10 +19,7 @@ package org.apache.zeppelin.cassandra;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ProtocolOptions.Compression;
import com.datastax.driver.core.Session;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -195,6 +192,15 @@ public class CassandraInterpreter extends Interpreter {
cluster.close();
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
return helper.interpret(session, st, context);

View file

@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
@ -149,6 +150,15 @@ public class ElasticsearchInterpreter extends Interpreter {
}
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
logger.info("Run Elasticsearch command '" + cmd + "'");

View file

@ -23,6 +23,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -100,6 +101,16 @@ public abstract class FileInterpreter extends Interpreter {
// Handle the command handling uniformly across all file systems
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.info("Run File command '" + cmd + "'");

View file

@ -35,12 +35,8 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -242,6 +238,15 @@ public class FlinkInterpreter extends Interpreter {
}
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String line, InterpreterContext context) {
if (line == null || line.trim().length() == 0) {

View file

@ -110,6 +110,15 @@ public class HbaseInterpreter extends Interpreter {
}
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
try {

View file

@ -275,6 +275,15 @@ public class IgniteInterpreter extends Interpreter {
public void cancel(InterpreterContext context) {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
private InterpreterResult interpret(String[] lines) {
String[] linesToRun = new String[lines.length + 1];
System.arraycopy(lines, 0, linesToRun, 0, lines.length);

View file

@ -16,11 +16,7 @@
*/
package org.apache.zeppelin.ignite;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@ -117,6 +113,15 @@ public class IgniteSqlInterpreter extends Interpreter {
}
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
if (connEx != null) {

View file

@ -30,10 +30,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
@ -441,6 +438,15 @@ public class JDBCInterpreter extends Interpreter {
return (!isTableResponseType) ? str : str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.info("Run SQL command '{}'", cmd);

View file

@ -24,10 +24,7 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,6 +66,15 @@ public class KylinInterpreter extends Interpreter {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
try {

View file

@ -31,10 +31,7 @@ import org.apache.lens.client.LensClient;
import org.apache.lens.client.LensClientConfig;
import org.apache.lens.client.LensClientSingletonWrapper;
import org.apache.lens.cli.commands.BaseLensCommand;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@ -247,6 +244,15 @@ public class LensInterpreter extends Interpreter {
return sb.toString();
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String input, InterpreterContext context) {
if (input == null || input.length() == 0) {

View file

@ -55,6 +55,15 @@ public class LivyPySparkInterpreter extends Interpreter {
livyHelper.closeSession(userSessionMap);
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {

View file

@ -71,6 +71,15 @@ public class LivySparkInterpreter extends Interpreter {
livyHelper.closeSession(userSessionMap);
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {

View file

@ -55,6 +55,15 @@ public class LivySparkRInterpreter extends Interpreter {
livyHelper.closeSession(userSessionMap);
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {

View file

@ -54,6 +54,15 @@ public class LivySparkSQLInterpreter extends Interpreter {
livyHelper.closeSession(userSessionMap);
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {

View file

@ -21,12 +21,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -84,6 +80,15 @@ public class Markdown extends Interpreter {
@Override
public void close() {}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String markdownText, InterpreterContext interpreterContext) {
String html;

View file

@ -25,6 +25,7 @@ import org.apache.pig.tools.pigstats.*;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,6 +70,14 @@ public class PigInterpreter extends BasePigInterpreter {
pigServer = null;
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {

View file

@ -63,6 +63,15 @@ public class PigQueryInterpreter extends BasePigInterpreter {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
// '-' is invalid for pig alias

View file

@ -28,10 +28,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@ -283,6 +280,15 @@ public class PostgreSqlInterpreter extends Interpreter {
return (!isTableResponseType) ? str : str.replace(TAB, WhITESPACE).replace(NEWLINE, WhITESPACE);
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.info("Run SQL command '{}'", cmd);

View file

@ -28,12 +28,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
@ -132,6 +129,15 @@ public class PythonInterpreter extends Interpreter {
}
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
if (cmd == null || cmd.isEmpty()) {

View file

@ -20,11 +20,7 @@ package org.apache.zeppelin.python;
import java.io.IOException;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -90,6 +86,15 @@ public class PythonInterpreterPandasSql extends Interpreter {
python.close();
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);

View file

@ -36,6 +36,7 @@ import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -69,6 +70,14 @@ public class ShellInterpreter extends Interpreter {
@Override
public void close() {}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {

View file

@ -34,12 +34,8 @@ import java.util.Properties;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import org.apache.spark.repl.SparkILoop;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
@ -196,6 +192,15 @@ public class DepInterpreter extends Interpreter {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
private Results.Result interpret(String line) {
return (Results.Result) Utils.invokeMethod(
intp,

View file

@ -45,14 +45,8 @@ import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
@ -312,6 +306,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
outputStream.getInterpreterOutput().write(message);
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SparkInterpreter sparkInterpreter = getSparkInterpreter();

View file

@ -46,15 +46,9 @@ import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterProperty;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.RemoteZeppelinServerController;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -113,6 +107,8 @@ public class SparkInterpreter extends Interpreter {
private SparkOutputStream out;
private SparkDependencyResolver dep;
private RemoteWorksController remoteWorksController;
/**
* completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
*/
@ -819,7 +815,7 @@ public class SparkInterpreter extends Interpreter {
hooks = getInterpreterGroup().getInterpreterHookRegistry();
z = new ZeppelinContext(sc, sqlc, null, dep, hooks,
z = new ZeppelinContext(sc, sqlc, null, dep, hooks, getRemoteZeppelinServerController(),
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
@ -1045,10 +1041,20 @@ public class SparkInterpreter extends Interpreter {
return resultCompletionText;
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
this.remoteWorksController = zServer;
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return this.remoteWorksController;
}
/*
* this method doesn't work in scala 2.11
* Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
*/
* this method doesn't work in scala 2.11
* Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
*/
public Object getValue(String name) {
Object ret = Utils.invokeMethod(
intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});

View file

@ -25,13 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -86,6 +81,15 @@ public class SparkSqlInterpreter extends Interpreter {
@Override
public void close() {}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SQLContext sqlc = null;

View file

@ -41,10 +41,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectWatcher;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
@ -73,17 +70,20 @@ public class ZeppelinContext {
private int maxResult;
private List<Class> supportedClasses;
private InterpreterHookRegistry hooks;
private RemoteWorksController remoteWorksController;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
SparkDependencyResolver dep,
InterpreterHookRegistry hooks,
RemoteWorksController remoteWorksController,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
this.hooks = hooks;
this.remoteWorksController = remoteWorksController;
this.maxResult = maxResult;
this.supportedClasses = new ArrayList<>();
try {
@ -298,45 +298,79 @@ public class ZeppelinContext {
return msg.toString();
}
/**
* Run paragraph by id
* @param id
* @param noteId
* @param paragraphId
*/
@ZeppelinApi
public void run(String id) {
run(id, interpreterContext);
public void run(String noteId, String paragraphId) {
run(noteId, paragraphId, interpreterContext);
}
/**
* Run paragraph by id
* @param id
* @param paragraphId
*/
@ZeppelinApi
public void run(String paragraphId) {
String noteId = interpreterContext.getNoteId();
run(noteId, paragraphId, interpreterContext);
}
/**
* Run paragraph by id
* @param noteId
* @param context
*/
@ZeppelinApi
public void run(String id, InterpreterContext context) {
if (id.equals(context.getParagraphId())) {
public void run(String noteId, String paragraphId, InterpreterContext context) {
if (paragraphId.equals(context.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
}
for (InterpreterContextRunner r : context.getRunners()) {
if (id.equals(r.getParagraphId())) {
r.run();
return;
}
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, paragraphId);
if (runners.size() <= 0) {
throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
}
throw new InterpreterException("Paragraph " + id + " not found");
for (InterpreterContextRunner r : runners) {
r.run();
}
}
/**
* get Zeppelin Paragraph Runner from zeppelin server
* @param noteId
*/
@ZeppelinApi
public List<InterpreterContextRunner> getInterpreterContextRunner(String noteId) {
List<InterpreterContextRunner> runners = new LinkedList<>();
if (remoteWorksController != null) {
runners = remoteWorksController.getRemoteContextRunner(noteId);
}
return runners;
}
/**
* get Zeppelin Paragraph Runner from zeppelin server
* @param noteId
* @param paragraphId
*/
@ZeppelinApi
public List<InterpreterContextRunner> getInterpreterContextRunner(
String noteId, String paragraphId) {
List<InterpreterContextRunner> runners = new LinkedList<>();
if (remoteWorksController != null) {
runners = remoteWorksController.getRemoteContextRunner(noteId, paragraphId);
}
return runners;
}
@ -347,7 +381,8 @@ public class ZeppelinContext {
*/
@ZeppelinApi
public void run(int idx) {
run(idx, interpreterContext);
String noteId = interpreterContext.getNoteId();
run(noteId, idx, interpreterContext);
}
/**
@ -355,12 +390,13 @@ public class ZeppelinContext {
* @param idx index starting from 0
* @param context interpreter context
*/
public void run(int idx, InterpreterContext context) {
if (idx >= context.getRunners().size()) {
public void run(String noteId, int idx, InterpreterContext context) {
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId);
if (idx >= runners.size()) {
throw new InterpreterException("Index out of bound");
}
InterpreterContextRunner runner = context.getRunners().get(idx);
InterpreterContextRunner runner = runners.get(idx);
if (runner.getParagraphId().equals(context.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
}
@ -379,13 +415,14 @@ public class ZeppelinContext {
*/
@ZeppelinApi
public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
String noteId = context.getNoteId();
for (Object idOrIdx : paragraphIdOrIdx) {
if (idOrIdx instanceof String) {
String id = (String) idOrIdx;
run(id, context);
String paragraphId = (String) idOrIdx;
run(noteId, paragraphId, context);
} else if (idOrIdx instanceof Integer) {
Integer idx = (Integer) idOrIdx;
run(idx, context);
run(noteId, idx, context);
} else {
throw new InterpreterException("Paragraph " + idOrIdx + " not found");
}

View file

@ -50,6 +50,15 @@ public class ClassloaderInterpreter
return cl;
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();

View file

@ -196,6 +196,19 @@ public abstract class Interpreter {
return this.interpreterGroup;
}
/**
* setup Remote Zeppelin Server Controller interface
* @param zServer
*/
@ZeppelinApi
public abstract void setRemoteZeppelinServerController(RemoteWorksController zServer);
/**
* get Remote Zeppelin Server Controller interface
*/
@ZeppelinApi
public abstract RemoteWorksController getRemoteZeppelinServerController();
public URL[] getClassloaderUrls() {
return classloaderUrls;
}

View file

@ -88,6 +88,15 @@ public class LazyOpenInterpreter
}
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
open();

View file

@ -24,6 +24,6 @@ import java.util.List;
*
*/
public interface RemoteWorksController {
List<InterpreterContextRunner> getRunner(String noteId);
InterpreterContextRunner getRunner(String noteId, String paragraphId);
List<InterpreterContextRunner> getRemoteContextRunner(String noteId);
List<InterpreterContextRunner> getRemoteContextRunner(String noteId, String paragraphId);
}

View file

@ -20,12 +20,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
/**
@ -79,6 +74,15 @@ public class DevInterpreter extends Interpreter {
}
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
this.context = context;

View file

@ -302,6 +302,15 @@ public class RemoteInterpreter extends Interpreter {
}
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
if (logger.isDebugEnabled()) {

View file

@ -247,11 +247,11 @@ public class RemoteInterpreterEventPoller extends Thread {
resResource.setType(RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT);
resResource.setEventOwnerKey(eventOwnerKey);
if (runner.getParagraphId() != null) {
InterpreterContextRunner intpRunner = remoteWorkController.getRunner(
interpreterContextRunners = remoteWorkController.getRemoteContextRunner(
runner.getNoteId(), runner.getParagraphId());
interpreterContextRunners.add(intpRunner);
} else {
interpreterContextRunners = remoteWorkController.getRunner(runner.getNoteId());
interpreterContextRunners = remoteWorkController.getRemoteContextRunner(
runner.getNoteId());
}
logger.info("clover remotework count 1 {}", interpreterContextRunners.size());

View file

@ -68,6 +68,7 @@ public class RemoteInterpreterServer
InterpreterHookRegistry hookRegistry;
DistributedResourcePool resourcePool;
private ApplicationLoader appLoader;
private RemoteWorksController remoteServerController;
Gson gson = new Gson();
@ -82,8 +83,7 @@ public class RemoteInterpreterServer
private final Map<String, RunningApplication> runningApplications =
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
private Map<String, Object> remoteWorksResponsePool =
Collections.synchronizedMap(new HashMap<String, Object>());
private Map<String, Object> remoteWorksResponsePool;
public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
@ -92,6 +92,8 @@ public class RemoteInterpreterServer
TServerSocket serverTransport = new TServerSocket(port);
server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport).processor(processor));
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
remoteServerController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
}
@Override
@ -181,8 +183,8 @@ public class RemoteInterpreterServer
Constructor<Interpreter> constructor =
replClass.getConstructor(new Class[] {Properties.class});
Interpreter repl = constructor.newInstance(p);
repl.setClassloaderUrls(new URL[]{});
repl.setRemoteZeppelinServerController(remoteServerController);
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
@ -347,16 +349,20 @@ public class RemoteInterpreterServer
RemoteZeppelinServerController response) throws TException {
logger.info("clover remote zeppelin server controller feedback {}", response);
logger.info("clover remote zeppelin server conteroller body {}", response.getMsg());
if (response.getType() == RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT) {
List<InterpreterContextRunner> intpContextRunners = new LinkedList<>();
List<ZeppelinServerResourceParagraphRunner> runners = gson.fromJson(response.getMsg(),
new TypeToken<List<ZeppelinServerResourceParagraphRunner>>() {}.getType());
logger.info("clover get runner size " + runners.size());
for (ZeppelinServerResourceParagraphRunner r : runners) {
logger.info("clover runner nid {} pid {}", r.getNoteId(), r.getParagraphId());
logger.info("clover runner nid " + r.getNoteId() + " pid " + r.getParagraphId());
intpContextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
}
remoteWorksResponsePool.put(response.getEventOwnerKey(), intpContextRunners);
synchronized (this.remoteWorksResponsePool) {
this.remoteWorksResponsePool.put(response.getEventOwnerKey(), intpContextRunners);
logger.info("clover feedback remoteWorks {} count {}",
response.getEventOwnerKey(), remoteWorksResponsePool.size());
}
}
}
@ -598,7 +604,7 @@ public class RemoteInterpreterServer
static class ParagraphRunner extends InterpreterContextRunner {
Logger logger = LoggerFactory.getLogger(ParagraphRunner.class);
private transient RemoteInterpreterServer server;
public ParagraphRunner(RemoteInterpreterServer server, String noteId, String paragraphId) {
@ -608,20 +614,28 @@ public class RemoteInterpreterServer
@Override
public void run() {
ZeppelinServerResourceParagraphRunner test = new ZeppelinServerResourceParagraphRunner();
test.setNoteId(getNoteId());
test.setParagraphId(getParagraphId());
server.eventClient.getZeppelinServerNoteRunner("IamOWNER", test);
logger.info("clover call run");
server.eventClient.run(this);
}
}
static class RemoteServerController {
static class ZeppelinRemoteWorksController implements RemoteWorksController{
//clover
private final long DEFAULT_TIMEOUT_VALUE = 30000;
Logger logger = LoggerFactory.getLogger(ZeppelinRemoteWorksController.class);
private final long DEFAULT_TIMEOUT_VALUE = 300000;
private Map<String, Object> remoteWorksResponsePool;
public RemoteServerController(Map<String, Object> remoteWorksResponsePool) {
private RemoteInterpreterServer server;
public ZeppelinRemoteWorksController(
RemoteInterpreterServer server, Map<String, Object> remoteWorksResponsePool) {
this.remoteWorksResponsePool = remoteWorksResponsePool;
this.server = server;
}
public String generateOwnerKey() {
String hashKeyText = new String("ownerKey" + System.currentTimeMillis());
String hashKey = String.valueOf(hashKeyText.hashCode());
return hashKey;
}
public boolean waitForEvent(String eventOwnerKey) throws InterruptedException {
@ -633,18 +647,55 @@ public class RemoteInterpreterServer
long now = System.currentTimeMillis();
long endTime = System.currentTimeMillis() + timeout;
while (endTime <= now) {
if (this.remoteWorksResponsePool.containsKey(eventOwnerKey) == true) {
wasGetData = true;
while (endTime >= now) {
logger.info("clover sleep... size {} key {}",
this.remoteWorksResponsePool.size(), eventOwnerKey);
synchronized (this.remoteWorksResponsePool) {
wasGetData = this.remoteWorksResponsePool.containsKey(eventOwnerKey);
}
if (wasGetData == true) {
logger.info("clover found!!!!!!!!!!!!!");
break;
}
now = System.currentTimeMillis();
sleep(500);
}
logger.info("clover out {}", wasGetData);
return wasGetData;
}
@Override
public List<InterpreterContextRunner> getRemoteContextRunner(String noteId) {
return getRemoteContextRunner(noteId, null);
}
public List<InterpreterContextRunner> getRemoteContextRunner(
String noteId, String paragraphID) {
List<InterpreterContextRunner> runners = null;
String ownerKey = generateOwnerKey();
logger.info("clover request owner key {}", ownerKey);
ZeppelinServerResourceParagraphRunner resource = new ZeppelinServerResourceParagraphRunner();
resource.setNoteId(noteId);
resource.setParagraphId(paragraphID);
server.eventClient.getZeppelinServerNoteRunner(ownerKey, resource);
try {
this.waitForEvent(ownerKey);
} catch (Exception e) {
logger.info("clover timeout Interrupt getRemoteContextRunner ", e);
return new LinkedList<>();
}
synchronized (this.remoteWorksResponsePool) {
runners = (List<InterpreterContextRunner>) this.remoteWorksResponsePool.get(ownerKey);
this.remoteWorksResponsePool.remove(ownerKey);
}
logger.info("clover out time {} {}", runners.size(), System.currentTimeMillis());
return runners;
}
}

View file

@ -20,11 +20,7 @@ package org.apache.zeppelin.interpreter.remote.mock;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@ -60,6 +56,15 @@ public class MockInterpreterA extends Interpreter {
return lastSt;
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
try {

View file

@ -23,10 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectWatcher;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -56,6 +53,15 @@ public class MockInterpreterAngular extends Interpreter {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] stmt = st.split(" ");

View file

@ -20,14 +20,8 @@ package org.apache.zeppelin.interpreter.remote.mock;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@ -54,6 +48,15 @@ public class MockInterpreterB extends Interpreter {
public void close() {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
MockInterpreterA intpA = getInterpreterA();

View file

@ -48,6 +48,15 @@ public class MockInterpreterEnv extends Interpreter {
public void close() {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] cmd = st.split(" ");

View file

@ -57,6 +57,15 @@ public class MockInterpreterOutputStream extends Interpreter {
return lastSt;
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] ret = st.split(":");

View file

@ -24,10 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.gson.Gson;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectWatcher;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.resource.Resource;
@ -59,6 +56,15 @@ public class MockInterpreterResourcePool extends Interpreter {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] stmt = st.split(" ");

View file

@ -24,6 +24,7 @@ import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -43,6 +44,15 @@ public class MockInterpreter1 extends Interpreter{
public void close() {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st);

View file

@ -63,30 +63,25 @@ public class RemoteWorksManager {
return notebook;
}
public List<InterpreterContextRunner> getRunner(String noteId) {
List<InterpreterContextRunner> runners = new LinkedList<>();
try {
Note note = getNotebook().getNote(noteId);
if (note != null) {
for (Paragraph paragraph : note.getParagraphs()) {
runners.add(paragraph.getInterpreterContextRunner());
}
}
} catch (NullPointerException e) {
LOG.warn(e.getMessage());
}
return runners;
public List<InterpreterContextRunner> getRemoteContextRunner(String noteId) {
return getRemoteContextRunner(noteId, null);
}
public InterpreterContextRunner getRunner(String noteId, String paragraphId) {
InterpreterContextRunner runner = null;
public List<InterpreterContextRunner> getRemoteContextRunner(
String noteId, String paragraphId) {
List<InterpreterContextRunner> runner = new LinkedList<>();
try {
Note note = getNotebook().getNote(noteId);
if (note != null) {
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph != null) {
runner = paragraph.getInterpreterContextRunner();
if (paragraphId != null) {
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph != null) {
runner.add(paragraph.getInterpreterContextRunner());
}
} else {
for (Paragraph p : note.getParagraphs()) {
runner.add(p.getInterpreterContextRunner());
}
}
}
} catch (NullPointerException e) {

View file

@ -25,6 +25,7 @@ import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -53,6 +54,15 @@ Map<String, Object> vars = new HashMap<>();
return open;
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterResult result;

View file

@ -25,6 +25,7 @@ import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -44,6 +45,15 @@ public class MockInterpreter11 extends Interpreter{
public void close() {
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl11: "+st);

View file

@ -25,6 +25,7 @@ import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -52,6 +53,14 @@ public class MockInterpreter2 extends Interpreter{
return open;
}
@Override
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
}
@Override
public RemoteWorksController getRemoteZeppelinServerController() {
return null;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {