mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
remote remoteworksController in interpreter.java
This commit is contained in:
parent
8d42c166a3
commit
8cbe46cbc8
54 changed files with 36 additions and 476 deletions
|
|
@ -86,15 +86,6 @@ public class AlluxioInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] lines = splitAndRemoveEmpty(st, "\n");
|
||||
|
|
|
|||
|
|
@ -48,15 +48,6 @@ public class AngularInterpreter extends Interpreter {
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
return new InterpreterResult(Code.SUCCESS, Type.ANGULAR, st);
|
||||
|
|
|
|||
|
|
@ -287,15 +287,6 @@ public class BigQueryInterpreter extends Interpreter {
|
|||
service = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String sql, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run SQL command '{}'", sql);
|
||||
|
|
|
|||
|
|
@ -192,15 +192,6 @@ public class CassandraInterpreter extends Interpreter {
|
|||
cluster.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
return helper.interpret(session, st, context);
|
||||
|
|
|
|||
|
|
@ -150,15 +150,6 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
|
||||
logger.info("Run Elasticsearch command '" + cmd + "'");
|
||||
|
|
|
|||
|
|
@ -101,16 +101,6 @@ public abstract class FileInterpreter extends Interpreter {
|
|||
|
||||
// Handle the command handling uniformly across all file systems
|
||||
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run File command '" + cmd + "'");
|
||||
|
|
|
|||
|
|
@ -238,15 +238,6 @@ public class FlinkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
|
|
|
|||
|
|
@ -110,15 +110,6 @@ public class HbaseInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -275,15 +275,6 @@ public class IgniteInterpreter extends Interpreter {
|
|||
public void cancel(InterpreterContext context) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
private InterpreterResult interpret(String[] lines) {
|
||||
String[] linesToRun = new String[lines.length + 1];
|
||||
System.arraycopy(lines, 0, linesToRun, 0, lines.length);
|
||||
|
|
|
|||
|
|
@ -113,15 +113,6 @@ public class IgniteSqlInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
if (connEx != null) {
|
||||
|
|
|
|||
|
|
@ -438,15 +438,6 @@ public class JDBCInterpreter extends Interpreter {
|
|||
return (!isTableResponseType) ? str : str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run SQL command '{}'", cmd);
|
||||
|
|
|
|||
|
|
@ -66,15 +66,6 @@ public class KylinInterpreter extends Interpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -244,15 +244,6 @@ public class LensInterpreter extends Interpreter {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String input, InterpreterContext context) {
|
||||
if (input == null || input.length() == 0) {
|
||||
|
|
|
|||
|
|
@ -55,15 +55,6 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -71,15 +71,6 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -55,15 +55,6 @@ public class LivySparkRInterpreter extends Interpreter {
|
|||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -54,15 +54,6 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -85,15 +85,6 @@ public class Markdown extends Interpreter {
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String markdownText, InterpreterContext interpreterContext) {
|
||||
String html;
|
||||
|
|
|
|||
|
|
@ -70,15 +70,6 @@ public class PigInterpreter extends BasePigInterpreter {
|
|||
pigServer = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
// remember the origial stdout, because we will redirect stdout to capture
|
||||
|
|
|
|||
|
|
@ -63,15 +63,6 @@ public class PigQueryInterpreter extends BasePigInterpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
// '-' is invalid for pig alias
|
||||
|
|
|
|||
|
|
@ -280,15 +280,6 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
return (!isTableResponseType) ? str : str.replace(TAB, WhITESPACE).replace(NEWLINE, WhITESPACE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run SQL command '{}'", cmd);
|
||||
|
|
|
|||
|
|
@ -129,15 +129,6 @@ public class PythonInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
if (cmd == null || cmd.isEmpty()) {
|
||||
|
|
|
|||
|
|
@ -86,15 +86,6 @@ public class PythonInterpreterPandasSql extends Interpreter {
|
|||
python.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
|
||||
|
|
|
|||
|
|
@ -116,15 +116,6 @@ public class ScaldingInterpreter extends Interpreter {
|
|||
interpreter.intp().close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
String user = contextInterpreter.getAuthenticationInfo().getUser();
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner
|
|||
import com.spotify.scio.repl.{ScioILoop, ScioReplClassLoader}
|
||||
import org.apache.zeppelin.interpreter.Interpreter.FormType
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
|
||||
import org.apache.zeppelin.interpreter.{Interpreter, InterpreterContext, InterpreterResult, RemoteWorksController}
|
||||
import org.apache.zeppelin.interpreter.{Interpreter, InterpreterContext, InterpreterResult}
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.reflect.io.File
|
||||
|
|
@ -153,10 +153,6 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
|||
REPL.closeInterpreter()
|
||||
}
|
||||
|
||||
override def getRemoteZeppelinServerController: RemoteWorksController = null
|
||||
|
||||
override def setRemoteZeppelinServerController(zServer: RemoteWorksController): Unit = {}
|
||||
|
||||
override def interpret(code: String, context: InterpreterContext): InterpreterResult = {
|
||||
val paragraphId = context.getParagraphId
|
||||
|
||||
|
|
|
|||
|
|
@ -70,15 +70,6 @@ public class ShellInterpreter extends Interpreter {
|
|||
@Override
|
||||
public void close() {}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
LOGGER.debug("Run shell command '" + cmd + "'");
|
||||
|
|
|
|||
|
|
@ -192,15 +192,6 @@ public class DepInterpreter extends Interpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
private Results.Result interpret(String line) {
|
||||
return (Results.Result) Utils.invokeMethod(
|
||||
intp,
|
||||
|
|
|
|||
|
|
@ -307,15 +307,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
outputStream.getInterpreterOutput().write(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
|
|
|
|||
|
|
@ -818,7 +818,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
hooks = getInterpreterGroup().getInterpreterHookRegistry();
|
||||
|
||||
z = new ZeppelinContext(sc, sqlc, null, dep, hooks, getRemoteZeppelinServerController(),
|
||||
z = new ZeppelinContext(sc, sqlc, null, dep, hooks,
|
||||
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
|
||||
|
||||
interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
|
||||
|
|
@ -1065,16 +1065,6 @@ public class SparkInterpreter extends Interpreter {
|
|||
return resultCompletionText;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
this.remoteWorksController = zServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return this.remoteWorksController;
|
||||
}
|
||||
|
||||
/*
|
||||
* this method doesn't work in scala 2.11
|
||||
* Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
|
||||
|
|
|
|||
|
|
@ -94,15 +94,6 @@ public class SparkRInterpreter extends Interpreter {
|
|||
renderOptions = getProperty("zeppelin.R.render.options");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {
|
||||
|
||||
|
|
|
|||
|
|
@ -81,15 +81,6 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
@Override
|
||||
public void close() {}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
SQLContext sqlc = null;
|
||||
|
|
|
|||
|
|
@ -76,14 +76,12 @@ public class ZeppelinContext {
|
|||
InterpreterContext interpreterContext,
|
||||
SparkDependencyResolver dep,
|
||||
InterpreterHookRegistry hooks,
|
||||
RemoteWorksController remoteWorksController,
|
||||
int maxResult) {
|
||||
this.sc = sc;
|
||||
this.sqlContext = sql;
|
||||
this.interpreterContext = interpreterContext;
|
||||
this.dep = dep;
|
||||
this.hooks = hooks;
|
||||
this.remoteWorksController = remoteWorksController;
|
||||
this.maxResult = maxResult;
|
||||
this.supportedClasses = new ArrayList<>();
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -50,15 +50,6 @@ public class ClassloaderInterpreter
|
|||
return cl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
|
||||
|
|
|
|||
|
|
@ -196,19 +196,6 @@ public abstract class Interpreter {
|
|||
return this.interpreterGroup;
|
||||
}
|
||||
|
||||
/**
|
||||
* setup Remote Zeppelin Server Controller interface
|
||||
* @param zServer
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public abstract void setRemoteZeppelinServerController(RemoteWorksController zServer);
|
||||
|
||||
/**
|
||||
* get Remote Zeppelin Server Controller interface
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public abstract RemoteWorksController getRemoteZeppelinServerController();
|
||||
|
||||
public URL[] getClassloaderUrls() {
|
||||
return classloaderUrls;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,15 +88,6 @@ public class LazyOpenInterpreter
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
open();
|
||||
|
|
|
|||
|
|
@ -74,15 +74,6 @@ public class DevInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
this.context = context;
|
||||
|
|
|
|||
|
|
@ -45,7 +45,6 @@ import com.google.gson.reflect.TypeToken;
|
|||
public class RemoteInterpreter extends Interpreter {
|
||||
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
|
||||
private final ApplicationEventListener applicationEventListener;
|
||||
private final RemoteWorksController remoteWorksController;
|
||||
Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
|
||||
Gson gson = new Gson();
|
||||
private String interpreterRunner;
|
||||
|
|
@ -196,15 +195,13 @@ public class RemoteInterpreter extends Interpreter {
|
|||
connectTimeout,
|
||||
remoteInterpreterProcessListener,
|
||||
applicationEventListener,
|
||||
remoteWorksController,
|
||||
host,
|
||||
port);
|
||||
} else {
|
||||
// create new remote process
|
||||
remoteProcess = new RemoteInterpreterManagedProcess(
|
||||
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
|
||||
remoteInterpreterProcessListener, applicationEventListener,
|
||||
remoteWorksController);
|
||||
remoteInterpreterProcessListener, applicationEventListener);
|
||||
}
|
||||
|
||||
intpGroup.setRemoteInterpreterProcess(remoteProcess);
|
||||
|
|
@ -310,15 +307,6 @@ public class RemoteInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
|
|
|||
|
|
@ -62,15 +62,12 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
|
||||
private RemoteInterpreterProcess interpreterProcess;
|
||||
private InterpreterGroup interpreterGroup;
|
||||
private RemoteWorksController remoteWorkController;
|
||||
|
||||
public RemoteInterpreterEventPoller(
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorkController) {
|
||||
ApplicationEventListener appListener) {
|
||||
this.listener = listener;
|
||||
this.appListener = appListener;
|
||||
this.remoteWorkController = remoteWorkController;
|
||||
shutdown = false;
|
||||
}
|
||||
|
||||
|
|
@ -82,14 +79,6 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
this.interpreterGroup = interpreterGroup;
|
||||
}
|
||||
|
||||
public RemoteWorksController getRemoteWorkController() {
|
||||
return remoteWorkController;
|
||||
}
|
||||
|
||||
public void setRemoteWorkController(RemoteWorksController remoteWorkController) {
|
||||
this.remoteWorkController = remoteWorkController;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Client client = null;
|
||||
|
|
@ -243,29 +232,30 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
List<InterpreterContextRunner> interpreterContextRunners = new LinkedList<>();
|
||||
List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
|
||||
if (event.getType() == RemoteZeppelinServerControlEvent.REQ_RESOURCE_PARAGRAPH_RUN_CONTEXT) {
|
||||
ZeppelinServerResourceParagraphRunner runner = gson.fromJson(
|
||||
event.getMsg(), ZeppelinServerResourceParagraphRunner.class);
|
||||
|
||||
RemoteZeppelinServerController resResource = new RemoteZeppelinServerController();
|
||||
resResource.setType(RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT);
|
||||
resResource.setEventOwnerKey(eventOwnerKey);
|
||||
if (runner.getParagraphId() != null) {
|
||||
interpreterContextRunners = remoteWorkController.getRemoteContextRunner(
|
||||
runner.getNoteId(), runner.getParagraphId());
|
||||
} else {
|
||||
interpreterContextRunners = remoteWorkController.getRemoteContextRunner(
|
||||
runner.getNoteId());
|
||||
}
|
||||
|
||||
for (InterpreterContextRunner r : interpreterContextRunners) {
|
||||
remoteRunners.add(
|
||||
new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
|
||||
);
|
||||
}
|
||||
|
||||
resResource.setMsg(gson.toJson(remoteRunners));
|
||||
|
||||
interpreterServer.remoteZeppelinServerControlFeedback(resResource);
|
||||
// ZeppelinServerResourceParagraphRunner runner = gson.fromJson(
|
||||
// event.getMsg(), ZeppelinServerResourceParagraphRunner.class);
|
||||
//
|
||||
// RemoteZeppelinServerController resResource = new RemoteZeppelinServerController();
|
||||
// resResource.setType(RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT);
|
||||
// resResource.setEventOwnerKey(eventOwnerKey);
|
||||
// if (runner.getParagraphId() != null) {
|
||||
//
|
||||
// interpreterContextRunners = remoteWorkController.getRemoteContextRunner(
|
||||
// runner.getNoteId(), runner.getParagraphId());
|
||||
// } else {
|
||||
// interpreterContextRunners = remoteWorkController.getRemoteContextRunner(
|
||||
// runner.getNoteId());
|
||||
// }
|
||||
//
|
||||
// for (InterpreterContextRunner r : interpreterContextRunners) {
|
||||
// remoteRunners.add(
|
||||
// new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
|
||||
// );
|
||||
// }
|
||||
//
|
||||
// resResource.setMsg(gson.toJson(remoteRunners));
|
||||
//
|
||||
// interpreterServer.remoteZeppelinServerControlFeedback(resResource);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -44,7 +44,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
private int port = -1;
|
||||
private final String interpreterDir;
|
||||
private final String localRepoDir;
|
||||
private RemoteWorksController remoteWorksController;
|
||||
|
||||
private Map<String, String> env;
|
||||
|
||||
|
|
@ -55,15 +54,13 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorksController) {
|
||||
super(new RemoteInterpreterEventPoller(listener, appListener, remoteWorksController),
|
||||
ApplicationEventListener appListener) {
|
||||
super(new RemoteInterpreterEventPoller(listener, appListener),
|
||||
connectTimeout);
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.env = env;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
this.remoteWorksController = remoteWorksController;
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -79,7 +76,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
this.env = env;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
this.remoteWorksController = remoteInterpreterEventPoller.getRemoteWorkController();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import org.apache.zeppelin.helium.ApplicationEventListener;
|
|||
import org.apache.zeppelin.interpreter.Constants;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -49,9 +48,8 @@ public abstract class RemoteInterpreterProcess {
|
|||
public RemoteInterpreterProcess(
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorksController) {
|
||||
this(new RemoteInterpreterEventPoller(listener, appListener, remoteWorksController),
|
||||
ApplicationEventListener appListener) {
|
||||
this(new RemoteInterpreterEventPoller(listener, appListener),
|
||||
connectTimeout);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -33,11 +33,10 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorksController,
|
||||
String host,
|
||||
int port
|
||||
) {
|
||||
super(connectTimeout, listener, appListener, remoteWorksController);
|
||||
super(connectTimeout, listener, appListener);
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,7 +68,6 @@ public class RemoteInterpreterServer
|
|||
InterpreterHookRegistry hookRegistry;
|
||||
DistributedResourcePool resourcePool;
|
||||
private ApplicationLoader appLoader;
|
||||
private RemoteWorksController remoteServerController;
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
|
|
@ -93,7 +92,6 @@ public class RemoteInterpreterServer
|
|||
server = new TThreadPoolServer(
|
||||
new TThreadPoolServer.Args(serverTransport).processor(processor));
|
||||
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
|
||||
remoteServerController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -184,7 +182,6 @@ public class RemoteInterpreterServer
|
|||
replClass.getConstructor(new Class[] {Properties.class});
|
||||
Interpreter repl = constructor.newInstance(p);
|
||||
repl.setClassloaderUrls(new URL[]{});
|
||||
repl.setRemoteZeppelinServerController(remoteServerController);
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class RemoteInterpreterProcessTest {
|
|||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
|
||||
INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
|
||||
10 * 1000, null, null, null);
|
||||
10 * 1000, null, null);
|
||||
assertFalse(rip.isRunning());
|
||||
assertEquals(0, rip.referenceCount());
|
||||
assertEquals(1, rip.reference(intpGroup, "anonymous", false));
|
||||
|
|
|
|||
|
|
@ -56,15 +56,6 @@ public class MockInterpreterA extends Interpreter {
|
|||
return lastSt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -53,15 +53,6 @@ public class MockInterpreterAngular extends Interpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] stmt = st.split(" ");
|
||||
|
|
|
|||
|
|
@ -48,15 +48,6 @@ public class MockInterpreterB extends Interpreter {
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
MockInterpreterA intpA = getInterpreterA();
|
||||
|
|
|
|||
|
|
@ -48,15 +48,6 @@ public class MockInterpreterEnv extends Interpreter {
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] cmd = st.split(" ");
|
||||
|
|
|
|||
|
|
@ -57,15 +57,6 @@ public class MockInterpreterOutputStream extends Interpreter {
|
|||
return lastSt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] ret = st.split(":");
|
||||
|
|
|
|||
|
|
@ -56,15 +56,6 @@ public class MockInterpreterResourcePool extends Interpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] stmt = st.split(" ");
|
||||
|
|
|
|||
|
|
@ -116,11 +116,11 @@ public class DistributedResourcePoolTest {
|
|||
intp1.open();
|
||||
intp2.open();
|
||||
|
||||
eventPoller1 = new RemoteInterpreterEventPoller(null, null, null);
|
||||
eventPoller1 = new RemoteInterpreterEventPoller(null, null);
|
||||
eventPoller1.setInterpreterGroup(intpGroup1);
|
||||
eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
|
||||
|
||||
eventPoller2 = new RemoteInterpreterEventPoller(null, null, null);
|
||||
eventPoller2 = new RemoteInterpreterEventPoller(null, null);
|
||||
eventPoller2.setInterpreterGroup(intpGroup2);
|
||||
eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
|
||||
|
||||
|
|
|
|||
|
|
@ -44,15 +44,6 @@ public class MockInterpreter1 extends Interpreter{
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st);
|
||||
|
|
|
|||
|
|
@ -54,15 +54,6 @@ Map<String, Object> vars = new HashMap<>();
|
|||
return open;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
InterpreterResult result;
|
||||
|
|
|
|||
|
|
@ -45,15 +45,6 @@ public class MockInterpreter11 extends Interpreter{
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl11: "+st);
|
||||
|
|
|
|||
|
|
@ -53,15 +53,6 @@ public class MockInterpreter2 extends Interpreter{
|
|||
return open;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
InterpreterResult result;
|
||||
|
|
|
|||
Loading…
Reference in a new issue