mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Implement getParagraphRunner transaction.
This commit is contained in:
parent
25232387d1
commit
3d34f9e0bb
48 changed files with 575 additions and 173 deletions
|
|
@ -23,10 +23,7 @@ import java.io.PrintStream;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -89,6 +86,15 @@ public class AlluxioInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] lines = splitAndRemoveEmpty(st, "\n");
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -47,6 +48,15 @@ public class AngularInterpreter extends Interpreter {
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
return new InterpreterResult(Code.SUCCESS, Type.ANGULAR, st);
|
||||
|
|
|
|||
|
|
@ -51,10 +51,7 @@ import java.sql.SQLException;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -290,6 +287,15 @@ public class BigQueryInterpreter extends Interpreter {
|
|||
service = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String sql, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run SQL command '{}'", sql);
|
||||
|
|
|
|||
|
|
@ -19,10 +19,7 @@ package org.apache.zeppelin.cassandra;
|
|||
import com.datastax.driver.core.Cluster;
|
||||
import com.datastax.driver.core.ProtocolOptions.Compression;
|
||||
import com.datastax.driver.core.Session;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -195,6 +192,15 @@ public class CassandraInterpreter extends Interpreter {
|
|||
cluster.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
return helper.interpret(session, st, context);
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUtils;
|
|||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
|
|
@ -149,6 +150,15 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
|
||||
logger.info("Run Elasticsearch command '" + cmd + "'");
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -100,6 +101,16 @@ public abstract class FileInterpreter extends Interpreter {
|
|||
|
||||
// Handle the command handling uniformly across all file systems
|
||||
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run File command '" + cmd + "'");
|
||||
|
|
|
|||
|
|
@ -35,12 +35,8 @@ import org.apache.flink.runtime.instance.ActorGateway;
|
|||
import org.apache.flink.runtime.messages.JobManagerMessages;
|
||||
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
|
||||
import org.apache.flink.runtime.util.EnvironmentInformation;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -242,6 +238,15 @@ public class FlinkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
|
|
|
|||
|
|
@ -110,6 +110,15 @@ public class HbaseInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -275,6 +275,15 @@ public class IgniteInterpreter extends Interpreter {
|
|||
public void cancel(InterpreterContext context) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
private InterpreterResult interpret(String[] lines) {
|
||||
String[] linesToRun = new String[lines.length + 1];
|
||||
System.arraycopy(lines, 0, linesToRun, 0, lines.length);
|
||||
|
|
|
|||
|
|
@ -16,11 +16,7 @@
|
|||
*/
|
||||
package org.apache.zeppelin.ignite;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -117,6 +113,15 @@ public class IgniteSqlInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
if (connEx != null) {
|
||||
|
|
|
|||
|
|
@ -30,10 +30,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.apache.commons.pool2.ObjectPool;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPool;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
|
||||
|
|
@ -441,6 +438,15 @@ public class JDBCInterpreter extends Interpreter {
|
|||
return (!isTableResponseType) ? str : str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run SQL command '{}'", cmd);
|
||||
|
|
|
|||
|
|
@ -24,10 +24,7 @@ import org.apache.http.client.HttpClient;
|
|||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -69,6 +66,15 @@ public class KylinInterpreter extends Interpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -31,10 +31,7 @@ import org.apache.lens.client.LensClient;
|
|||
import org.apache.lens.client.LensClientConfig;
|
||||
import org.apache.lens.client.LensClientSingletonWrapper;
|
||||
import org.apache.lens.cli.commands.BaseLensCommand;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -247,6 +244,15 @@ public class LensInterpreter extends Interpreter {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String input, InterpreterContext context) {
|
||||
if (input == null || input.length() == 0) {
|
||||
|
|
|
|||
|
|
@ -55,6 +55,15 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -71,6 +71,15 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -55,6 +55,15 @@ public class LivySparkRInterpreter extends Interpreter {
|
|||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -54,6 +54,15 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -21,12 +21,8 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -84,6 +80,15 @@ public class Markdown extends Interpreter {
|
|||
@Override
|
||||
public void close() {}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String markdownText, InterpreterContext interpreterContext) {
|
||||
String html;
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import org.apache.pig.tools.pigstats.*;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -69,6 +70,14 @@ public class PigInterpreter extends BasePigInterpreter {
|
|||
pigServer = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
|
|
|
|||
|
|
@ -63,6 +63,15 @@ public class PigQueryInterpreter extends BasePigInterpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
// '-' is invalid for pig alias
|
||||
|
|
|
|||
|
|
@ -28,10 +28,7 @@ import java.util.List;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -283,6 +280,15 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
return (!isTableResponseType) ? str : str.replace(TAB, WhITESPACE).replace(NEWLINE, WhITESPACE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run SQL command '{}'", cmd);
|
||||
|
|
|
|||
|
|
@ -28,12 +28,9 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -132,6 +129,15 @@ public class PythonInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
if (cmd == null || cmd.isEmpty()) {
|
||||
|
|
|
|||
|
|
@ -20,11 +20,7 @@ package org.apache.zeppelin.python;
|
|||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -90,6 +86,15 @@ public class PythonInterpreterPandasSql extends Interpreter {
|
|||
python.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import org.apache.zeppelin.interpreter.Interpreter;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -69,6 +70,14 @@ public class ShellInterpreter extends Interpreter {
|
|||
@Override
|
||||
public void close() {}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
|
|
|
|||
|
|
@ -34,12 +34,8 @@ import java.util.Properties;
|
|||
import com.google.common.reflect.TypeToken;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.spark.repl.SparkILoop;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
|
||||
|
|
@ -196,6 +192,15 @@ public class DepInterpreter extends Interpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
private Results.Result interpret(String line) {
|
||||
return (Results.Result) Utils.invokeMethod(
|
||||
intp,
|
||||
|
|
|
|||
|
|
@ -45,14 +45,8 @@ import org.apache.commons.exec.environment.EnvironmentUtils;
|
|||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
|
||||
|
|
@ -312,6 +306,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
outputStream.getInterpreterOutput().write(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
|
|
|
|||
|
|
@ -46,15 +46,9 @@ import org.apache.spark.scheduler.DAGScheduler;
|
|||
import org.apache.spark.scheduler.Pool;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.spark.ui.jobs.JobProgressListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
|
||||
import org.apache.zeppelin.interpreter.InterpreterProperty;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteZeppelinServerController;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.WellKnownResourceName;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
|
|
@ -113,6 +107,8 @@ public class SparkInterpreter extends Interpreter {
|
|||
private SparkOutputStream out;
|
||||
private SparkDependencyResolver dep;
|
||||
|
||||
private RemoteWorksController remoteWorksController;
|
||||
|
||||
/**
|
||||
* completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
|
||||
*/
|
||||
|
|
@ -819,7 +815,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
hooks = getInterpreterGroup().getInterpreterHookRegistry();
|
||||
|
||||
z = new ZeppelinContext(sc, sqlc, null, dep, hooks,
|
||||
z = new ZeppelinContext(sc, sqlc, null, dep, hooks, getRemoteZeppelinServerController(),
|
||||
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
|
||||
|
||||
interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
|
||||
|
|
@ -1045,10 +1041,20 @@ public class SparkInterpreter extends Interpreter {
|
|||
return resultCompletionText;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
this.remoteWorksController = zServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return this.remoteWorksController;
|
||||
}
|
||||
|
||||
/*
|
||||
* this method doesn't work in scala 2.11
|
||||
* Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
|
||||
*/
|
||||
* this method doesn't work in scala 2.11
|
||||
* Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
|
||||
*/
|
||||
public Object getValue(String name) {
|
||||
Object ret = Utils.invokeMethod(
|
||||
intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});
|
||||
|
|
|
|||
|
|
@ -25,13 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -86,6 +81,15 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
@Override
|
||||
public void close() {}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
SQLContext sqlc = null;
|
||||
|
|
|
|||
|
|
@ -41,10 +41,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
|
|||
import org.apache.zeppelin.display.AngularObjectWatcher;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.display.Input.ParamOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
|
||||
import org.apache.zeppelin.resource.Resource;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
|
@ -73,17 +70,20 @@ public class ZeppelinContext {
|
|||
private int maxResult;
|
||||
private List<Class> supportedClasses;
|
||||
private InterpreterHookRegistry hooks;
|
||||
private RemoteWorksController remoteWorksController;
|
||||
|
||||
public ZeppelinContext(SparkContext sc, SQLContext sql,
|
||||
InterpreterContext interpreterContext,
|
||||
SparkDependencyResolver dep,
|
||||
InterpreterHookRegistry hooks,
|
||||
RemoteWorksController remoteWorksController,
|
||||
int maxResult) {
|
||||
this.sc = sc;
|
||||
this.sqlContext = sql;
|
||||
this.interpreterContext = interpreterContext;
|
||||
this.dep = dep;
|
||||
this.hooks = hooks;
|
||||
this.remoteWorksController = remoteWorksController;
|
||||
this.maxResult = maxResult;
|
||||
this.supportedClasses = new ArrayList<>();
|
||||
try {
|
||||
|
|
@ -298,45 +298,79 @@ public class ZeppelinContext {
|
|||
return msg.toString();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Run paragraph by id
|
||||
* @param id
|
||||
* @param noteId
|
||||
* @param paragraphId
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public void run(String id) {
|
||||
run(id, interpreterContext);
|
||||
public void run(String noteId, String paragraphId) {
|
||||
run(noteId, paragraphId, interpreterContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraph by id
|
||||
* @param id
|
||||
* @param paragraphId
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public void run(String paragraphId) {
|
||||
String noteId = interpreterContext.getNoteId();
|
||||
run(noteId, paragraphId, interpreterContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraph by id
|
||||
* @param noteId
|
||||
* @param context
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public void run(String id, InterpreterContext context) {
|
||||
if (id.equals(context.getParagraphId())) {
|
||||
public void run(String noteId, String paragraphId, InterpreterContext context) {
|
||||
if (paragraphId.equals(context.getParagraphId())) {
|
||||
throw new InterpreterException("Can not run current Paragraph");
|
||||
}
|
||||
|
||||
for (InterpreterContextRunner r : context.getRunners()) {
|
||||
if (id.equals(r.getParagraphId())) {
|
||||
r.run();
|
||||
return;
|
||||
}
|
||||
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, paragraphId);
|
||||
|
||||
if (runners.size() <= 0) {
|
||||
throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
|
||||
}
|
||||
|
||||
throw new InterpreterException("Paragraph " + id + " not found");
|
||||
for (InterpreterContextRunner r : runners) {
|
||||
r.run();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get Zeppelin Paragraph Runner from zeppelin server
|
||||
* @param noteId
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public List<InterpreterContextRunner> getInterpreterContextRunner(String noteId) {
|
||||
List<InterpreterContextRunner> runners = new LinkedList<>();
|
||||
|
||||
if (remoteWorksController != null) {
|
||||
runners = remoteWorksController.getRemoteContextRunner(noteId);
|
||||
}
|
||||
|
||||
return runners;
|
||||
}
|
||||
|
||||
/**
|
||||
* get Zeppelin Paragraph Runner from zeppelin server
|
||||
* @param noteId
|
||||
* @param paragraphId
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public List<InterpreterContextRunner> getInterpreterContextRunner(
|
||||
String noteId, String paragraphId) {
|
||||
List<InterpreterContextRunner> runners = new LinkedList<>();
|
||||
|
||||
|
||||
if (remoteWorksController != null) {
|
||||
runners = remoteWorksController.getRemoteContextRunner(noteId, paragraphId);
|
||||
}
|
||||
|
||||
return runners;
|
||||
}
|
||||
|
|
@ -347,7 +381,8 @@ public class ZeppelinContext {
|
|||
*/
|
||||
@ZeppelinApi
|
||||
public void run(int idx) {
|
||||
run(idx, interpreterContext);
|
||||
String noteId = interpreterContext.getNoteId();
|
||||
run(noteId, idx, interpreterContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -355,12 +390,13 @@ public class ZeppelinContext {
|
|||
* @param idx index starting from 0
|
||||
* @param context interpreter context
|
||||
*/
|
||||
public void run(int idx, InterpreterContext context) {
|
||||
if (idx >= context.getRunners().size()) {
|
||||
public void run(String noteId, int idx, InterpreterContext context) {
|
||||
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId);
|
||||
if (idx >= runners.size()) {
|
||||
throw new InterpreterException("Index out of bound");
|
||||
}
|
||||
|
||||
InterpreterContextRunner runner = context.getRunners().get(idx);
|
||||
InterpreterContextRunner runner = runners.get(idx);
|
||||
if (runner.getParagraphId().equals(context.getParagraphId())) {
|
||||
throw new InterpreterException("Can not run current Paragraph");
|
||||
}
|
||||
|
|
@ -379,13 +415,14 @@ public class ZeppelinContext {
|
|||
*/
|
||||
@ZeppelinApi
|
||||
public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
|
||||
String noteId = context.getNoteId();
|
||||
for (Object idOrIdx : paragraphIdOrIdx) {
|
||||
if (idOrIdx instanceof String) {
|
||||
String id = (String) idOrIdx;
|
||||
run(id, context);
|
||||
String paragraphId = (String) idOrIdx;
|
||||
run(noteId, paragraphId, context);
|
||||
} else if (idOrIdx instanceof Integer) {
|
||||
Integer idx = (Integer) idOrIdx;
|
||||
run(idx, context);
|
||||
run(noteId, idx, context);
|
||||
} else {
|
||||
throw new InterpreterException("Paragraph " + idOrIdx + " not found");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,15 @@ public class ClassloaderInterpreter
|
|||
return cl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
|
||||
|
|
|
|||
|
|
@ -196,6 +196,19 @@ public abstract class Interpreter {
|
|||
return this.interpreterGroup;
|
||||
}
|
||||
|
||||
/**
|
||||
* setup Remote Zeppelin Server Controller interface
|
||||
* @param zServer
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public abstract void setRemoteZeppelinServerController(RemoteWorksController zServer);
|
||||
|
||||
/**
|
||||
* get Remote Zeppelin Server Controller interface
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public abstract RemoteWorksController getRemoteZeppelinServerController();
|
||||
|
||||
public URL[] getClassloaderUrls() {
|
||||
return classloaderUrls;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,6 +88,15 @@ public class LazyOpenInterpreter
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
open();
|
||||
|
|
|
|||
|
|
@ -24,6 +24,6 @@ import java.util.List;
|
|||
*
|
||||
*/
|
||||
public interface RemoteWorksController {
|
||||
List<InterpreterContextRunner> getRunner(String noteId);
|
||||
InterpreterContextRunner getRunner(String noteId, String paragraphId);
|
||||
List<InterpreterContextRunner> getRemoteContextRunner(String noteId);
|
||||
List<InterpreterContextRunner> getRemoteContextRunner(String noteId, String paragraphId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,12 +20,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
|
||||
/**
|
||||
|
|
@ -79,6 +74,15 @@ public class DevInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
this.context = context;
|
||||
|
|
|
|||
|
|
@ -302,6 +302,15 @@ public class RemoteInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
|
|
|||
|
|
@ -247,11 +247,11 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
resResource.setType(RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT);
|
||||
resResource.setEventOwnerKey(eventOwnerKey);
|
||||
if (runner.getParagraphId() != null) {
|
||||
InterpreterContextRunner intpRunner = remoteWorkController.getRunner(
|
||||
interpreterContextRunners = remoteWorkController.getRemoteContextRunner(
|
||||
runner.getNoteId(), runner.getParagraphId());
|
||||
interpreterContextRunners.add(intpRunner);
|
||||
} else {
|
||||
interpreterContextRunners = remoteWorkController.getRunner(runner.getNoteId());
|
||||
interpreterContextRunners = remoteWorkController.getRemoteContextRunner(
|
||||
runner.getNoteId());
|
||||
}
|
||||
|
||||
logger.info("clover remotework count 1 {}", interpreterContextRunners.size());
|
||||
|
|
|
|||
|
|
@ -68,6 +68,7 @@ public class RemoteInterpreterServer
|
|||
InterpreterHookRegistry hookRegistry;
|
||||
DistributedResourcePool resourcePool;
|
||||
private ApplicationLoader appLoader;
|
||||
private RemoteWorksController remoteServerController;
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
|
|
@ -82,8 +83,7 @@ public class RemoteInterpreterServer
|
|||
private final Map<String, RunningApplication> runningApplications =
|
||||
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
|
||||
|
||||
private Map<String, Object> remoteWorksResponsePool =
|
||||
Collections.synchronizedMap(new HashMap<String, Object>());
|
||||
private Map<String, Object> remoteWorksResponsePool;
|
||||
|
||||
public RemoteInterpreterServer(int port) throws TTransportException {
|
||||
this.port = port;
|
||||
|
|
@ -92,6 +92,8 @@ public class RemoteInterpreterServer
|
|||
TServerSocket serverTransport = new TServerSocket(port);
|
||||
server = new TThreadPoolServer(
|
||||
new TThreadPoolServer.Args(serverTransport).processor(processor));
|
||||
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
|
||||
remoteServerController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -181,8 +183,8 @@ public class RemoteInterpreterServer
|
|||
Constructor<Interpreter> constructor =
|
||||
replClass.getConstructor(new Class[] {Properties.class});
|
||||
Interpreter repl = constructor.newInstance(p);
|
||||
|
||||
repl.setClassloaderUrls(new URL[]{});
|
||||
repl.setRemoteZeppelinServerController(remoteServerController);
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
|
|
@ -347,16 +349,20 @@ public class RemoteInterpreterServer
|
|||
RemoteZeppelinServerController response) throws TException {
|
||||
logger.info("clover remote zeppelin server controller feedback {}", response);
|
||||
logger.info("clover remote zeppelin server conteroller body {}", response.getMsg());
|
||||
|
||||
if (response.getType() == RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT) {
|
||||
List<InterpreterContextRunner> intpContextRunners = new LinkedList<>();
|
||||
List<ZeppelinServerResourceParagraphRunner> runners = gson.fromJson(response.getMsg(),
|
||||
new TypeToken<List<ZeppelinServerResourceParagraphRunner>>() {}.getType());
|
||||
logger.info("clover get runner size " + runners.size());
|
||||
for (ZeppelinServerResourceParagraphRunner r : runners) {
|
||||
logger.info("clover runner nid {} pid {}", r.getNoteId(), r.getParagraphId());
|
||||
logger.info("clover runner nid " + r.getNoteId() + " pid " + r.getParagraphId());
|
||||
intpContextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
|
||||
}
|
||||
remoteWorksResponsePool.put(response.getEventOwnerKey(), intpContextRunners);
|
||||
synchronized (this.remoteWorksResponsePool) {
|
||||
this.remoteWorksResponsePool.put(response.getEventOwnerKey(), intpContextRunners);
|
||||
logger.info("clover feedback remoteWorks {} count {}",
|
||||
response.getEventOwnerKey(), remoteWorksResponsePool.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -598,7 +604,7 @@ public class RemoteInterpreterServer
|
|||
|
||||
|
||||
static class ParagraphRunner extends InterpreterContextRunner {
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(ParagraphRunner.class);
|
||||
private transient RemoteInterpreterServer server;
|
||||
|
||||
public ParagraphRunner(RemoteInterpreterServer server, String noteId, String paragraphId) {
|
||||
|
|
@ -608,20 +614,28 @@ public class RemoteInterpreterServer
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
ZeppelinServerResourceParagraphRunner test = new ZeppelinServerResourceParagraphRunner();
|
||||
test.setNoteId(getNoteId());
|
||||
test.setParagraphId(getParagraphId());
|
||||
server.eventClient.getZeppelinServerNoteRunner("IamOWNER", test);
|
||||
logger.info("clover call run");
|
||||
server.eventClient.run(this);
|
||||
}
|
||||
}
|
||||
|
||||
static class RemoteServerController {
|
||||
static class ZeppelinRemoteWorksController implements RemoteWorksController{
|
||||
//clover
|
||||
private final long DEFAULT_TIMEOUT_VALUE = 30000;
|
||||
Logger logger = LoggerFactory.getLogger(ZeppelinRemoteWorksController.class);
|
||||
|
||||
private final long DEFAULT_TIMEOUT_VALUE = 300000;
|
||||
private Map<String, Object> remoteWorksResponsePool;
|
||||
public RemoteServerController(Map<String, Object> remoteWorksResponsePool) {
|
||||
private RemoteInterpreterServer server;
|
||||
public ZeppelinRemoteWorksController(
|
||||
RemoteInterpreterServer server, Map<String, Object> remoteWorksResponsePool) {
|
||||
this.remoteWorksResponsePool = remoteWorksResponsePool;
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
public String generateOwnerKey() {
|
||||
String hashKeyText = new String("ownerKey" + System.currentTimeMillis());
|
||||
String hashKey = String.valueOf(hashKeyText.hashCode());
|
||||
return hashKey;
|
||||
}
|
||||
|
||||
public boolean waitForEvent(String eventOwnerKey) throws InterruptedException {
|
||||
|
|
@ -633,18 +647,55 @@ public class RemoteInterpreterServer
|
|||
long now = System.currentTimeMillis();
|
||||
long endTime = System.currentTimeMillis() + timeout;
|
||||
|
||||
while (endTime <= now) {
|
||||
if (this.remoteWorksResponsePool.containsKey(eventOwnerKey) == true) {
|
||||
wasGetData = true;
|
||||
while (endTime >= now) {
|
||||
logger.info("clover sleep... size {} key {}",
|
||||
this.remoteWorksResponsePool.size(), eventOwnerKey);
|
||||
synchronized (this.remoteWorksResponsePool) {
|
||||
wasGetData = this.remoteWorksResponsePool.containsKey(eventOwnerKey);
|
||||
}
|
||||
if (wasGetData == true) {
|
||||
logger.info("clover found!!!!!!!!!!!!!");
|
||||
break;
|
||||
}
|
||||
now = System.currentTimeMillis();
|
||||
sleep(500);
|
||||
}
|
||||
logger.info("clover out {}", wasGetData);
|
||||
|
||||
return wasGetData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InterpreterContextRunner> getRemoteContextRunner(String noteId) {
|
||||
return getRemoteContextRunner(noteId, null);
|
||||
}
|
||||
|
||||
public List<InterpreterContextRunner> getRemoteContextRunner(
|
||||
String noteId, String paragraphID) {
|
||||
|
||||
List<InterpreterContextRunner> runners = null;
|
||||
String ownerKey = generateOwnerKey();
|
||||
|
||||
logger.info("clover request owner key {}", ownerKey);
|
||||
ZeppelinServerResourceParagraphRunner resource = new ZeppelinServerResourceParagraphRunner();
|
||||
resource.setNoteId(noteId);
|
||||
resource.setParagraphId(paragraphID);
|
||||
server.eventClient.getZeppelinServerNoteRunner(ownerKey, resource);
|
||||
|
||||
try {
|
||||
this.waitForEvent(ownerKey);
|
||||
} catch (Exception e) {
|
||||
logger.info("clover timeout Interrupt getRemoteContextRunner ", e);
|
||||
return new LinkedList<>();
|
||||
}
|
||||
synchronized (this.remoteWorksResponsePool) {
|
||||
runners = (List<InterpreterContextRunner>) this.remoteWorksResponsePool.get(ownerKey);
|
||||
this.remoteWorksResponsePool.remove(ownerKey);
|
||||
}
|
||||
logger.info("clover out time {} {}", runners.size(), System.currentTimeMillis());
|
||||
return runners;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,11 +20,7 @@ package org.apache.zeppelin.interpreter.remote.mock;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -60,6 +56,15 @@ public class MockInterpreterA extends Interpreter {
|
|||
return lastSt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -23,10 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectWatcher;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
|
||||
|
|
@ -56,6 +53,15 @@ public class MockInterpreterAngular extends Interpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] stmt = st.split(" ");
|
||||
|
|
|
|||
|
|
@ -20,14 +20,8 @@ package org.apache.zeppelin.interpreter.remote.mock;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
||||
|
|
@ -54,6 +48,15 @@ public class MockInterpreterB extends Interpreter {
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
MockInterpreterA intpA = getInterpreterA();
|
||||
|
|
|
|||
|
|
@ -48,6 +48,15 @@ public class MockInterpreterEnv extends Interpreter {
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] cmd = st.split(" ");
|
||||
|
|
|
|||
|
|
@ -57,6 +57,15 @@ public class MockInterpreterOutputStream extends Interpreter {
|
|||
return lastSt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] ret = st.split(":");
|
||||
|
|
|
|||
|
|
@ -24,10 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import com.google.gson.Gson;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectWatcher;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.resource.Resource;
|
||||
|
|
@ -59,6 +56,15 @@ public class MockInterpreterResourcePool extends Interpreter {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] stmt = st.split(" ");
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import java.util.Properties;
|
|||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -43,6 +44,15 @@ public class MockInterpreter1 extends Interpreter{
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st);
|
||||
|
|
|
|||
|
|
@ -63,30 +63,25 @@ public class RemoteWorksManager {
|
|||
return notebook;
|
||||
}
|
||||
|
||||
public List<InterpreterContextRunner> getRunner(String noteId) {
|
||||
List<InterpreterContextRunner> runners = new LinkedList<>();
|
||||
try {
|
||||
Note note = getNotebook().getNote(noteId);
|
||||
if (note != null) {
|
||||
for (Paragraph paragraph : note.getParagraphs()) {
|
||||
runners.add(paragraph.getInterpreterContextRunner());
|
||||
}
|
||||
}
|
||||
} catch (NullPointerException e) {
|
||||
LOG.warn(e.getMessage());
|
||||
}
|
||||
|
||||
return runners;
|
||||
public List<InterpreterContextRunner> getRemoteContextRunner(String noteId) {
|
||||
return getRemoteContextRunner(noteId, null);
|
||||
}
|
||||
|
||||
public InterpreterContextRunner getRunner(String noteId, String paragraphId) {
|
||||
InterpreterContextRunner runner = null;
|
||||
public List<InterpreterContextRunner> getRemoteContextRunner(
|
||||
String noteId, String paragraphId) {
|
||||
List<InterpreterContextRunner> runner = new LinkedList<>();
|
||||
try {
|
||||
Note note = getNotebook().getNote(noteId);
|
||||
if (note != null) {
|
||||
Paragraph paragraph = note.getParagraph(paragraphId);
|
||||
if (paragraph != null) {
|
||||
runner = paragraph.getInterpreterContextRunner();
|
||||
if (paragraphId != null) {
|
||||
Paragraph paragraph = note.getParagraph(paragraphId);
|
||||
if (paragraph != null) {
|
||||
runner.add(paragraph.getInterpreterContextRunner());
|
||||
}
|
||||
} else {
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
runner.add(p.getInterpreterContextRunner());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (NullPointerException e) {
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import java.util.Properties;
|
|||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -53,6 +54,15 @@ Map<String, Object> vars = new HashMap<>();
|
|||
return open;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
InterpreterResult result;
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import java.util.Properties;
|
|||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -44,6 +45,15 @@ public class MockInterpreter11 extends Interpreter{
|
|||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl11: "+st);
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import java.util.Properties;
|
|||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -52,6 +53,14 @@ public class MockInterpreter2 extends Interpreter{
|
|||
return open;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemoteZeppelinServerController(RemoteWorksController zServer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteWorksController getRemoteZeppelinServerController() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue