mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' into ZEPPELIN-277
This commit is contained in:
commit
b09730e039
19 changed files with 1882 additions and 120 deletions
|
|
@ -23,7 +23,7 @@ function usage() {
|
|||
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
|
||||
}
|
||||
|
||||
while getopts "hp:d:l:v:u:g:" o; do
|
||||
while getopts "hc:p:d:l:v:u:g:" o; do
|
||||
case ${o} in
|
||||
h)
|
||||
usage
|
||||
|
|
@ -32,8 +32,11 @@ while getopts "hp:d:l:v:u:g:" o; do
|
|||
d)
|
||||
INTERPRETER_DIR=${OPTARG}
|
||||
;;
|
||||
c)
|
||||
CALLBACK_HOST=${OPTARG} # This will be used callback host
|
||||
;;
|
||||
p)
|
||||
PORT=${OPTARG}
|
||||
PORT=${OPTARG} # This will be used callback port
|
||||
;;
|
||||
l)
|
||||
LOCAL_INTERPRETER_REPO=${OPTARG}
|
||||
|
|
@ -202,12 +205,12 @@ fi
|
|||
|
||||
if [[ -n "${SPARK_SUBMIT}" ]]; then
|
||||
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}`
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
|
||||
else
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${PORT}`
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
|
||||
fi
|
||||
else
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} `
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} `
|
||||
fi
|
||||
|
||||
if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
|
||||
|
|
|
|||
|
|
@ -430,4 +430,10 @@
|
|||
<description>The HTTP X-XSS-Protection response header is a feature of Internet Explorer, Chrome and Safari that stops pages from loading when they detect reflected cross-site scripting (XSS) attacks. When value is set to 1 and a cross-site scripting attack is detected, the browser will sanitize the page (remove the unsafe parts).</description>
|
||||
</property>
|
||||
-->
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.interpreter.callback.portRange</name>
|
||||
<value>10000:10010</value>
|
||||
</property>
|
||||
-->
|
||||
</configuration>
|
||||
|
|
|
|||
|
|
@ -196,6 +196,13 @@ Staring from 0.6.1 SparkSession is available as variable `spark` when you are us
|
|||
|
||||
<a name="dependencyloading"> </a>
|
||||
|
||||
### How to pass property to SparkConf
|
||||
|
||||
There're 2 kinds of properties that would be passed to SparkConf
|
||||
|
||||
* Standard spark property (prefix with `spark.`). e.g. `spark.executor.memory` will be passed to `SparkConf`
|
||||
* Non-standard spark property (prefix with `zeppelin.spark.`). e.g. `zeppelin.spark.property_1`, `property_1` will be passed to `SparkConf`
|
||||
|
||||
## Dependency Management
|
||||
There are two ways to load external libraries in Spark interpreter. First is using interpreter setting menu and second is loading Spark properties.
|
||||
|
||||
|
|
|
|||
|
|
@ -38,8 +38,8 @@ public class ZeppelinDevServer extends
|
|||
|
||||
private DevInterpreter interpreter = null;
|
||||
private InterpreterOutput out;
|
||||
public ZeppelinDevServer(int port) throws TException {
|
||||
super(port);
|
||||
public ZeppelinDevServer(int port) throws TException, IOException {
|
||||
super(null, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -381,9 +381,16 @@ public class SparkInterpreter extends Interpreter {
|
|||
for (Object k : intpProperty.keySet()) {
|
||||
String key = (String) k;
|
||||
String val = toString(intpProperty.get(key));
|
||||
if (key.startsWith("spark.") && !val.trim().isEmpty()) {
|
||||
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
|
||||
conf.set(key, val);
|
||||
if (!val.trim().isEmpty()) {
|
||||
if (key.startsWith("spark.")) {
|
||||
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
|
||||
conf.set(key, val);
|
||||
}
|
||||
if (key.startsWith("zeppelin.spark.")) {
|
||||
String sparkPropertyKey = key.substring("zeppelin.spark.".length());
|
||||
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
|
||||
conf.set(sparkPropertyKey, val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -509,9 +516,17 @@ public class SparkInterpreter extends Interpreter {
|
|||
for (Object k : intpProperty.keySet()) {
|
||||
String key = (String) k;
|
||||
String val = toString(intpProperty.get(key));
|
||||
if (key.startsWith("spark.") && !val.trim().isEmpty()) {
|
||||
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
|
||||
conf.set(key, val);
|
||||
if (!val.trim().isEmpty()) {
|
||||
if (key.startsWith("spark.")) {
|
||||
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
|
||||
conf.set(key, val);
|
||||
}
|
||||
|
||||
if (key.startsWith("zeppelin.spark.")) {
|
||||
String sparkPropertyKey = key.substring("zeppelin.spark.".length());
|
||||
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
|
||||
conf.set(sparkPropertyKey, val);
|
||||
}
|
||||
}
|
||||
}
|
||||
setupConfForPySpark(conf);
|
||||
|
|
@ -1096,23 +1111,33 @@ public class SparkInterpreter extends Interpreter {
|
|||
if (buf.length() < cursor) {
|
||||
cursor = buf.length();
|
||||
}
|
||||
String completionText = getCompletionTargetString(buf, cursor);
|
||||
if (completionText == null) {
|
||||
completionText = "";
|
||||
cursor = completionText.length();
|
||||
}
|
||||
|
||||
ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
|
||||
Candidates ret = c.complete(completionText, cursor);
|
||||
|
||||
List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
|
||||
List<InterpreterCompletion> completions = new LinkedList<>();
|
||||
|
||||
for (String candidate : candidates) {
|
||||
completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
|
||||
|
||||
if (Utils.isScala2_10() || !Utils.isCompilerAboveScala2_11_7()) {
|
||||
String singleToken = getCompletionTargetString(buf, cursor);
|
||||
Candidates ret = c.complete(singleToken, singleToken.length());
|
||||
|
||||
List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
|
||||
List<InterpreterCompletion> completions = new LinkedList<>();
|
||||
|
||||
for (String candidate : candidates) {
|
||||
completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
|
||||
}
|
||||
|
||||
return completions;
|
||||
} else {
|
||||
Candidates ret = c.complete(buf, cursor);
|
||||
|
||||
List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
|
||||
List<InterpreterCompletion> completions = new LinkedList<>();
|
||||
|
||||
for (String candidate : candidates) {
|
||||
completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
|
||||
}
|
||||
|
||||
return completions;
|
||||
}
|
||||
|
||||
return completions;
|
||||
}
|
||||
|
||||
private String getCompletionTargetString(String text, int cursor) {
|
||||
|
|
|
|||
|
|
@ -24,18 +24,22 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.Properties;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Utility and helper functions for the Spark Interpreter
|
||||
*/
|
||||
class Utils {
|
||||
public static Logger logger = LoggerFactory.getLogger(Utils.class);
|
||||
private static final String SCALA_COMPILER_VERSION = evaluateScalaCompilerVersion();
|
||||
|
||||
static Object invokeMethod(Object o, String name) {
|
||||
return invokeMethod(o, name, new Class[]{}, new Object[]{});
|
||||
}
|
||||
|
||||
static Object invokeMethod(Object o, String name, Class[] argTypes, Object[] params) {
|
||||
static Object invokeMethod(Object o, String name, Class<?>[] argTypes, Object[] params) {
|
||||
try {
|
||||
return o.getClass().getMethod(name, argTypes).invoke(o, params);
|
||||
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
|
||||
|
|
@ -44,7 +48,7 @@ class Utils {
|
|||
return null;
|
||||
}
|
||||
|
||||
static Object invokeStaticMethod(Class c, String name, Class[] argTypes, Object[] params) {
|
||||
static Object invokeStaticMethod(Class<?> c, String name, Class<?>[] argTypes, Object[] params) {
|
||||
try {
|
||||
return c.getMethod(name, argTypes).invoke(null, params);
|
||||
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
|
||||
|
|
@ -53,17 +57,17 @@ class Utils {
|
|||
return null;
|
||||
}
|
||||
|
||||
static Object invokeStaticMethod(Class c, String name) {
|
||||
static Object invokeStaticMethod(Class<?> c, String name) {
|
||||
return invokeStaticMethod(c, name, new Class[]{}, new Object[]{});
|
||||
}
|
||||
|
||||
static Class findClass(String name) {
|
||||
static Class<?> findClass(String name) {
|
||||
return findClass(name, false);
|
||||
}
|
||||
|
||||
static Class findClass(String name, boolean silence) {
|
||||
static Class<?> findClass(String name, boolean silence) {
|
||||
try {
|
||||
return Utils.class.forName(name);
|
||||
return Class.forName(name);
|
||||
} catch (ClassNotFoundException e) {
|
||||
if (!silence) {
|
||||
logger.error(e.getMessage(), e);
|
||||
|
|
@ -72,7 +76,7 @@ class Utils {
|
|||
}
|
||||
}
|
||||
|
||||
static Object instantiateClass(String name, Class[] argTypes, Object[] params) {
|
||||
static Object instantiateClass(String name, Class<?>[] argTypes, Object[] params) {
|
||||
try {
|
||||
Constructor<?> constructor = Utils.class.getClassLoader()
|
||||
.loadClass(name).getConstructor(argTypes);
|
||||
|
|
@ -87,7 +91,7 @@ class Utils {
|
|||
// function works after intp is initialized
|
||||
static boolean isScala2_10() {
|
||||
try {
|
||||
Utils.class.forName("org.apache.spark.repl.SparkIMain");
|
||||
Class.forName("org.apache.spark.repl.SparkIMain");
|
||||
return true;
|
||||
} catch (ClassNotFoundException e) {
|
||||
return false;
|
||||
|
|
@ -99,10 +103,45 @@ class Utils {
|
|||
static boolean isScala2_11() {
|
||||
return !isScala2_10();
|
||||
}
|
||||
|
||||
static boolean isCompilerAboveScala2_11_7() {
|
||||
if (isScala2_10() || SCALA_COMPILER_VERSION == null) {
|
||||
return false;
|
||||
}
|
||||
Pattern p = Pattern.compile("([0-9]+)[.]([0-9]+)[.]([0-9]+)");
|
||||
Matcher m = p.matcher(SCALA_COMPILER_VERSION);
|
||||
if (m.matches()) {
|
||||
int major = Integer.parseInt(m.group(1));
|
||||
int minor = Integer.parseInt(m.group(2));
|
||||
int bugfix = Integer.parseInt(m.group(3));
|
||||
return (major > 2 || (major == 2 && minor > 11) || (major == 2 && minor == 11 && bugfix > 7));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static String evaluateScalaCompilerVersion() {
|
||||
String version = null;
|
||||
try {
|
||||
Properties p = new Properties();
|
||||
Class<?> completionClass = findClass("scala.tools.nsc.interpreter.JLineCompletion");
|
||||
if (completionClass != null) {
|
||||
try (java.io.InputStream in = completionClass.getClass()
|
||||
.getResourceAsStream("/compiler.properties")) {
|
||||
p.load(in);
|
||||
version = p.getProperty("version.number");
|
||||
} catch (java.io.IOException e) {
|
||||
logger.error("Failed to evaluate Scala compiler version", e);
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("Failed to evaluate Scala compiler version", e);
|
||||
}
|
||||
return version;
|
||||
}
|
||||
|
||||
static boolean isSpark2() {
|
||||
try {
|
||||
Utils.class.forName("org.apache.spark.sql.SparkSession");
|
||||
Class.forName("org.apache.spark.sql.SparkSession");
|
||||
return true;
|
||||
} catch (ClassNotFoundException e) {
|
||||
return false;
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ public class SparkInterpreterTest {
|
|||
p.setProperty("zeppelin.spark.maxResult", "1000");
|
||||
p.setProperty("zeppelin.spark.importImplicit", "true");
|
||||
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
|
||||
|
||||
p.setProperty("zeppelin.spark.property_1", "value_1");
|
||||
return p;
|
||||
}
|
||||
|
||||
|
|
@ -151,6 +151,13 @@ public class SparkInterpreterTest {
|
|||
*/
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonStandardSparkProperties() throws IOException {
|
||||
// throw NoSuchElementException if no such property is found
|
||||
InterpreterResult result = repl.interpret("sc.getConf.get(\"property_1\")", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNextLineInvocation() {
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
|
||||
|
|
@ -305,6 +312,22 @@ public class SparkInterpreterTest {
|
|||
assertTrue(completions.size() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultilineCompletion() {
|
||||
String buf = "val x = 1\nsc.";
|
||||
List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
|
||||
assertTrue(completions.size() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultilineCompletionNewVar() {
|
||||
Assume.assumeFalse("this feature does not work with scala 2.10", Utils.isScala2_10());
|
||||
Assume.assumeTrue("This feature does not work with scala < 2.11.8", Utils.isCompilerAboveScala2_11_7());
|
||||
String buf = "val x = sc\nx.";
|
||||
List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
|
||||
assertTrue(completions.size() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParagraphUrls() {
|
||||
String paraId = "test_para_job_url";
|
||||
|
|
|
|||
|
|
@ -17,29 +17,55 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.display.*;
|
||||
import org.apache.zeppelin.helium.*;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.helium.Application;
|
||||
import org.apache.zeppelin.helium.ApplicationContext;
|
||||
import org.apache.zeppelin.helium.ApplicationException;
|
||||
import org.apache.zeppelin.helium.ApplicationLoader;
|
||||
import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.interpreter.Constants;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.*;
|
||||
import org.apache.zeppelin.resource.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
|
||||
import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
|
||||
import org.apache.zeppelin.resource.DistributedResourcePool;
|
||||
import org.apache.zeppelin.resource.Resource;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
import org.apache.zeppelin.resource.WellKnownResourceName;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
|
|
@ -49,8 +75,22 @@ import org.apache.zeppelin.user.AuthenticationInfo;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* Entry point for Interpreter process.
|
||||
|
|
@ -70,6 +110,9 @@ public class RemoteInterpreterServer
|
|||
Gson gson = new Gson();
|
||||
|
||||
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
|
||||
private String callbackHost;
|
||||
private int callbackPort;
|
||||
private String host;
|
||||
private int port;
|
||||
private TThreadPoolServer server;
|
||||
|
||||
|
|
@ -87,11 +130,34 @@ public class RemoteInterpreterServer
|
|||
// Hold information for manual progress update
|
||||
private ConcurrentMap<String, Integer> progressMap = new ConcurrentHashMap<>();
|
||||
|
||||
public RemoteInterpreterServer(int port) throws TTransportException {
|
||||
this.port = port;
|
||||
private boolean isTest;
|
||||
|
||||
public RemoteInterpreterServer(String callbackHost, int port) throws IOException,
|
||||
TTransportException {
|
||||
this(callbackHost, port, false);
|
||||
}
|
||||
|
||||
public RemoteInterpreterServer(String callbackHost, int port, boolean isTest)
|
||||
throws TTransportException, IOException {
|
||||
if (null != callbackHost) {
|
||||
this.callbackHost = callbackHost;
|
||||
this.callbackPort = port;
|
||||
} else {
|
||||
// DevInterpreter
|
||||
this.port = port;
|
||||
}
|
||||
this.isTest = isTest;
|
||||
|
||||
processor = new RemoteInterpreterService.Processor<>(this);
|
||||
TServerSocket serverTransport = new TServerSocket(port);
|
||||
TServerSocket serverTransport;
|
||||
if (null == callbackHost) {
|
||||
// Dev Interpreter
|
||||
serverTransport = new TServerSocket(port);
|
||||
} else {
|
||||
this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
this.host = RemoteInterpreterUtils.findAvailableHostAddress();
|
||||
serverTransport = new TServerSocket(this.port);
|
||||
}
|
||||
server = new TThreadPoolServer(
|
||||
new TThreadPoolServer.Args(serverTransport).processor(processor));
|
||||
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
|
||||
|
|
@ -100,6 +166,36 @@ public class RemoteInterpreterServer
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
if (null != callbackHost && !isTest) {
|
||||
new Thread(new Runnable() {
|
||||
boolean interrupted = false;
|
||||
@Override
|
||||
public void run() {
|
||||
while (!interrupted && !server.isServing()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!interrupted) {
|
||||
CallbackInfo callbackInfo = new CallbackInfo(host, port);
|
||||
try {
|
||||
RemoteInterpreterUtils
|
||||
.registerInterpreter(callbackHost, callbackPort, callbackInfo);
|
||||
} catch (TException e) {
|
||||
logger.error("Error while registering interpreter: {}", callbackInfo, e);
|
||||
try {
|
||||
shutdown();
|
||||
} catch (TException e1) {
|
||||
logger.warn("Exception occurs while shutting down", e1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
logger.info("Starting remote interpreter server on port {}", port);
|
||||
server.serve();
|
||||
}
|
||||
|
|
@ -151,13 +247,15 @@ public class RemoteInterpreterServer
|
|||
|
||||
|
||||
public static void main(String[] args)
|
||||
throws TTransportException, InterruptedException {
|
||||
|
||||
throws TTransportException, InterruptedException, IOException {
|
||||
String callbackHost = null;
|
||||
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
|
||||
if (args.length > 0) {
|
||||
port = Integer.parseInt(args[0]);
|
||||
callbackHost = args[0];
|
||||
port = Integer.parseInt(args[1]);
|
||||
}
|
||||
RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port);
|
||||
RemoteInterpreterServer remoteInterpreterServer =
|
||||
new RemoteInterpreterServer(callbackHost, port);
|
||||
remoteInterpreterServer.start();
|
||||
remoteInterpreterServer.join();
|
||||
System.exit(0);
|
||||
|
|
|
|||
|
|
@ -17,27 +17,96 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.InterfaceAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Collections;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class RemoteInterpreterUtils {
|
||||
static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class);
|
||||
|
||||
|
||||
public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
|
||||
int port;
|
||||
try (ServerSocket socket = new ServerSocket(0);) {
|
||||
port = socket.getLocalPort();
|
||||
socket.close();
|
||||
return findRandomAvailablePortOnAllLocalInterfaces(":");
|
||||
}
|
||||
|
||||
/**
|
||||
* start:end
|
||||
*
|
||||
* @param portRange
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange)
|
||||
throws IOException {
|
||||
|
||||
// ':' is the default value which means no constraints on the portRange
|
||||
if (portRange == null || portRange.equals(":")) {
|
||||
int port;
|
||||
try (ServerSocket socket = new ServerSocket(0);) {
|
||||
port = socket.getLocalPort();
|
||||
socket.close();
|
||||
}
|
||||
return port;
|
||||
}
|
||||
return port;
|
||||
// valid user registered port https://en.wikipedia.org/wiki/Registered_port
|
||||
int start = 1024;
|
||||
int end = 49151;
|
||||
String[] ports = portRange.split(":", -1);
|
||||
if (!ports[0].isEmpty()) {
|
||||
start = Integer.parseInt(ports[0]);
|
||||
}
|
||||
if (!ports[1].isEmpty()) {
|
||||
end = Integer.parseInt(ports[1]);
|
||||
}
|
||||
for (int i = start; i <= end; ++i) {
|
||||
try {
|
||||
ServerSocket socket = new ServerSocket(i);
|
||||
return socket.getLocalPort();
|
||||
} catch (Exception e) {
|
||||
// ignore this
|
||||
}
|
||||
}
|
||||
throw new IOException("No available port in the portRange: " + portRange);
|
||||
}
|
||||
|
||||
public static String findAvailableHostAddress() throws UnknownHostException, SocketException {
|
||||
InetAddress address = InetAddress.getLocalHost();
|
||||
if (address.isLoopbackAddress()) {
|
||||
for (NetworkInterface networkInterface : Collections
|
||||
.list(NetworkInterface.getNetworkInterfaces())) {
|
||||
if (!networkInterface.isLoopback()) {
|
||||
for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) {
|
||||
InetAddress a = interfaceAddress.getAddress();
|
||||
if (a instanceof Inet4Address) {
|
||||
return a.getHostAddress();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return address.getHostAddress();
|
||||
}
|
||||
|
||||
public static boolean checkIfRemoteEndpointAccessible(String host, int port) {
|
||||
|
|
@ -80,4 +149,17 @@ public class RemoteInterpreterUtils {
|
|||
|
||||
return key.matches("^[A-Z_0-9]*");
|
||||
}
|
||||
|
||||
public static void registerInterpreter(String callbackHost, int callbackPort,
|
||||
final CallbackInfo callbackInfo) throws TException {
|
||||
LOGGER.info("callbackHost: {}, callbackPort: {}, callbackInfo: {}", callbackHost, callbackPort,
|
||||
callbackInfo);
|
||||
try (TTransport transport = new TSocket(callbackHost, callbackPort)) {
|
||||
transport.open();
|
||||
TProtocol protocol = new TBinaryProtocol(transport);
|
||||
RemoteInterpreterCallbackService.Client client = new RemoteInterpreterCallbackService.Client(
|
||||
protocol);
|
||||
client.callback(callbackInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,518 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.2)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.thrift;
|
||||
|
||||
import org.apache.thrift.scheme.IScheme;
|
||||
import org.apache.thrift.scheme.SchemeFactory;
|
||||
import org.apache.thrift.scheme.StandardScheme;
|
||||
|
||||
import org.apache.thrift.scheme.TupleScheme;
|
||||
import org.apache.thrift.protocol.TTupleProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolException;
|
||||
import org.apache.thrift.EncodingUtils;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.async.AsyncMethodCallback;
|
||||
import org.apache.thrift.server.AbstractNonblockingServer.*;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Collections;
|
||||
import java.util.BitSet;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import javax.annotation.Generated;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-6-17")
|
||||
public class CallbackInfo implements org.apache.thrift.TBase<CallbackInfo, CallbackInfo._Fields>, java.io.Serializable, Cloneable, Comparable<CallbackInfo> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CallbackInfo");
|
||||
|
||||
private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1);
|
||||
private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2);
|
||||
|
||||
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
|
||||
static {
|
||||
schemes.put(StandardScheme.class, new CallbackInfoStandardSchemeFactory());
|
||||
schemes.put(TupleScheme.class, new CallbackInfoTupleSchemeFactory());
|
||||
}
|
||||
|
||||
public String host; // required
|
||||
public int port; // required
|
||||
|
||||
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
|
||||
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
|
||||
HOST((short)1, "host"),
|
||||
PORT((short)2, "port");
|
||||
|
||||
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
|
||||
|
||||
static {
|
||||
for (_Fields field : EnumSet.allOf(_Fields.class)) {
|
||||
byName.put(field.getFieldName(), field);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByThriftId(int fieldId) {
|
||||
switch(fieldId) {
|
||||
case 1: // HOST
|
||||
return HOST;
|
||||
case 2: // PORT
|
||||
return PORT;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, throwing an exception
|
||||
* if it is not found.
|
||||
*/
|
||||
public static _Fields findByThriftIdOrThrow(int fieldId) {
|
||||
_Fields fields = findByThriftId(fieldId);
|
||||
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
|
||||
return fields;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches name, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByName(String name) {
|
||||
return byName.get(name);
|
||||
}
|
||||
|
||||
private final short _thriftId;
|
||||
private final String _fieldName;
|
||||
|
||||
_Fields(short thriftId, String fieldName) {
|
||||
_thriftId = thriftId;
|
||||
_fieldName = fieldName;
|
||||
}
|
||||
|
||||
public short getThriftFieldId() {
|
||||
return _thriftId;
|
||||
}
|
||||
|
||||
public String getFieldName() {
|
||||
return _fieldName;
|
||||
}
|
||||
}
|
||||
|
||||
// isset id assignments
|
||||
private static final int __PORT_ISSET_ID = 0;
|
||||
private byte __isset_bitfield = 0;
|
||||
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
|
||||
static {
|
||||
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
|
||||
tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
|
||||
metaDataMap = Collections.unmodifiableMap(tmpMap);
|
||||
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CallbackInfo.class, metaDataMap);
|
||||
}
|
||||
|
||||
public CallbackInfo() {
|
||||
}
|
||||
|
||||
public CallbackInfo(
|
||||
String host,
|
||||
int port)
|
||||
{
|
||||
this();
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
setPortIsSet(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a deep copy on <i>other</i>.
|
||||
*/
|
||||
public CallbackInfo(CallbackInfo other) {
|
||||
__isset_bitfield = other.__isset_bitfield;
|
||||
if (other.isSetHost()) {
|
||||
this.host = other.host;
|
||||
}
|
||||
this.port = other.port;
|
||||
}
|
||||
|
||||
public CallbackInfo deepCopy() {
|
||||
return new CallbackInfo(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
this.host = null;
|
||||
setPortIsSet(false);
|
||||
this.port = 0;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return this.host;
|
||||
}
|
||||
|
||||
public CallbackInfo setHost(String host) {
|
||||
this.host = host;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetHost() {
|
||||
this.host = null;
|
||||
}
|
||||
|
||||
/** Returns true if field host is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetHost() {
|
||||
return this.host != null;
|
||||
}
|
||||
|
||||
public void setHostIsSet(boolean value) {
|
||||
if (!value) {
|
||||
this.host = null;
|
||||
}
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return this.port;
|
||||
}
|
||||
|
||||
public CallbackInfo setPort(int port) {
|
||||
this.port = port;
|
||||
setPortIsSet(true);
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetPort() {
|
||||
__isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID);
|
||||
}
|
||||
|
||||
/** Returns true if field port is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetPort() {
|
||||
return EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID);
|
||||
}
|
||||
|
||||
public void setPortIsSet(boolean value) {
|
||||
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value);
|
||||
}
|
||||
|
||||
public void setFieldValue(_Fields field, Object value) {
|
||||
switch (field) {
|
||||
case HOST:
|
||||
if (value == null) {
|
||||
unsetHost();
|
||||
} else {
|
||||
setHost((String)value);
|
||||
}
|
||||
break;
|
||||
|
||||
case PORT:
|
||||
if (value == null) {
|
||||
unsetPort();
|
||||
} else {
|
||||
setPort((Integer)value);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public Object getFieldValue(_Fields field) {
|
||||
switch (field) {
|
||||
case HOST:
|
||||
return getHost();
|
||||
|
||||
case PORT:
|
||||
return Integer.valueOf(getPort());
|
||||
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSet(_Fields field) {
|
||||
if (field == null) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
switch (field) {
|
||||
case HOST:
|
||||
return isSetHost();
|
||||
case PORT:
|
||||
return isSetPort();
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
if (that instanceof CallbackInfo)
|
||||
return this.equals((CallbackInfo)that);
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean equals(CallbackInfo that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
|
||||
boolean this_present_host = true && this.isSetHost();
|
||||
boolean that_present_host = true && that.isSetHost();
|
||||
if (this_present_host || that_present_host) {
|
||||
if (!(this_present_host && that_present_host))
|
||||
return false;
|
||||
if (!this.host.equals(that.host))
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean this_present_port = true;
|
||||
boolean that_present_port = true;
|
||||
if (this_present_port || that_present_port) {
|
||||
if (!(this_present_port && that_present_port))
|
||||
return false;
|
||||
if (this.port != that.port)
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
List<Object> list = new ArrayList<Object>();
|
||||
|
||||
boolean present_host = true && (isSetHost());
|
||||
list.add(present_host);
|
||||
if (present_host)
|
||||
list.add(host);
|
||||
|
||||
boolean present_port = true;
|
||||
list.add(present_port);
|
||||
if (present_port)
|
||||
list.add(port);
|
||||
|
||||
return list.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(CallbackInfo other) {
|
||||
if (!getClass().equals(other.getClass())) {
|
||||
return getClass().getName().compareTo(other.getClass().getName());
|
||||
}
|
||||
|
||||
int lastComparison = 0;
|
||||
|
||||
lastComparison = Boolean.valueOf(isSetHost()).compareTo(other.isSetHost());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetHost()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, other.host);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
lastComparison = Boolean.valueOf(isSetPort()).compareTo(other.isSetPort());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetPort()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public _Fields fieldForId(int fieldId) {
|
||||
return _Fields.findByThriftId(fieldId);
|
||||
}
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
|
||||
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
|
||||
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("CallbackInfo(");
|
||||
boolean first = true;
|
||||
|
||||
sb.append("host:");
|
||||
if (this.host == null) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
sb.append(this.host);
|
||||
}
|
||||
first = false;
|
||||
if (!first) sb.append(", ");
|
||||
sb.append("port:");
|
||||
sb.append(this.port);
|
||||
first = false;
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public void validate() throws org.apache.thrift.TException {
|
||||
// check for required fields
|
||||
// check for sub-struct validity
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
|
||||
try {
|
||||
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
|
||||
try {
|
||||
// it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
|
||||
__isset_bitfield = 0;
|
||||
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private static class CallbackInfoStandardSchemeFactory implements SchemeFactory {
|
||||
public CallbackInfoStandardScheme getScheme() {
|
||||
return new CallbackInfoStandardScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class CallbackInfoStandardScheme extends StandardScheme<CallbackInfo> {
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot, CallbackInfo struct) throws org.apache.thrift.TException {
|
||||
org.apache.thrift.protocol.TField schemeField;
|
||||
iprot.readStructBegin();
|
||||
while (true)
|
||||
{
|
||||
schemeField = iprot.readFieldBegin();
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
|
||||
break;
|
||||
}
|
||||
switch (schemeField.id) {
|
||||
case 1: // HOST
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
|
||||
struct.host = iprot.readString();
|
||||
struct.setHostIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
case 2: // PORT
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
|
||||
struct.port = iprot.readI32();
|
||||
struct.setPortIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
iprot.readFieldEnd();
|
||||
}
|
||||
iprot.readStructEnd();
|
||||
|
||||
// check for required fields of primitive type, which can't be checked in the validate method
|
||||
struct.validate();
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot, CallbackInfo struct) throws org.apache.thrift.TException {
|
||||
struct.validate();
|
||||
|
||||
oprot.writeStructBegin(STRUCT_DESC);
|
||||
if (struct.host != null) {
|
||||
oprot.writeFieldBegin(HOST_FIELD_DESC);
|
||||
oprot.writeString(struct.host);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
oprot.writeFieldBegin(PORT_FIELD_DESC);
|
||||
oprot.writeI32(struct.port);
|
||||
oprot.writeFieldEnd();
|
||||
oprot.writeFieldStop();
|
||||
oprot.writeStructEnd();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class CallbackInfoTupleSchemeFactory implements SchemeFactory {
|
||||
public CallbackInfoTupleScheme getScheme() {
|
||||
return new CallbackInfoTupleScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class CallbackInfoTupleScheme extends TupleScheme<CallbackInfo> {
|
||||
|
||||
@Override
|
||||
public void write(org.apache.thrift.protocol.TProtocol prot, CallbackInfo struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol oprot = (TTupleProtocol) prot;
|
||||
BitSet optionals = new BitSet();
|
||||
if (struct.isSetHost()) {
|
||||
optionals.set(0);
|
||||
}
|
||||
if (struct.isSetPort()) {
|
||||
optionals.set(1);
|
||||
}
|
||||
oprot.writeBitSet(optionals, 2);
|
||||
if (struct.isSetHost()) {
|
||||
oprot.writeString(struct.host);
|
||||
}
|
||||
if (struct.isSetPort()) {
|
||||
oprot.writeI32(struct.port);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void read(org.apache.thrift.protocol.TProtocol prot, CallbackInfo struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol iprot = (TTupleProtocol) prot;
|
||||
BitSet incoming = iprot.readBitSet(2);
|
||||
if (incoming.get(0)) {
|
||||
struct.host = iprot.readString();
|
||||
struct.setHostIsSet(true);
|
||||
}
|
||||
if (incoming.get(1)) {
|
||||
struct.port = iprot.readI32();
|
||||
struct.setPortIsSet(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,879 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.2)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.thrift;
|
||||
|
||||
import org.apache.thrift.scheme.IScheme;
|
||||
import org.apache.thrift.scheme.SchemeFactory;
|
||||
import org.apache.thrift.scheme.StandardScheme;
|
||||
|
||||
import org.apache.thrift.scheme.TupleScheme;
|
||||
import org.apache.thrift.protocol.TTupleProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolException;
|
||||
import org.apache.thrift.EncodingUtils;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.async.AsyncMethodCallback;
|
||||
import org.apache.thrift.server.AbstractNonblockingServer.*;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Collections;
|
||||
import java.util.BitSet;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import javax.annotation.Generated;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-6-17")
|
||||
public class RemoteInterpreterCallbackService {
|
||||
|
||||
public interface Iface {
|
||||
|
||||
public void callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException;
|
||||
|
||||
}
|
||||
|
||||
public interface AsyncIface {
|
||||
|
||||
public void callback(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
|
||||
|
||||
}
|
||||
|
||||
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
|
||||
public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
|
||||
public Factory() {}
|
||||
public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
|
||||
return new Client(prot);
|
||||
}
|
||||
public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
|
||||
return new Client(iprot, oprot);
|
||||
}
|
||||
}
|
||||
|
||||
public Client(org.apache.thrift.protocol.TProtocol prot)
|
||||
{
|
||||
super(prot, prot);
|
||||
}
|
||||
|
||||
public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
|
||||
super(iprot, oprot);
|
||||
}
|
||||
|
||||
public void callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException
|
||||
{
|
||||
send_callback(callbackInfo);
|
||||
recv_callback();
|
||||
}
|
||||
|
||||
public void send_callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException
|
||||
{
|
||||
callback_args args = new callback_args();
|
||||
args.setCallbackInfo(callbackInfo);
|
||||
sendBase("callback", args);
|
||||
}
|
||||
|
||||
public void recv_callback() throws org.apache.thrift.TException
|
||||
{
|
||||
callback_result result = new callback_result();
|
||||
receiveBase(result, "callback");
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
|
||||
public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
|
||||
private org.apache.thrift.async.TAsyncClientManager clientManager;
|
||||
private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
|
||||
public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
|
||||
this.clientManager = clientManager;
|
||||
this.protocolFactory = protocolFactory;
|
||||
}
|
||||
public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
|
||||
return new AsyncClient(protocolFactory, clientManager, transport);
|
||||
}
|
||||
}
|
||||
|
||||
public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
|
||||
super(protocolFactory, clientManager, transport);
|
||||
}
|
||||
|
||||
public void callback(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
|
||||
checkReady();
|
||||
callback_call method_call = new callback_call(callbackInfo, resultHandler, this, ___protocolFactory, ___transport);
|
||||
this.___currentMethod = method_call;
|
||||
___manager.call(method_call);
|
||||
}
|
||||
|
||||
public static class callback_call extends org.apache.thrift.async.TAsyncMethodCall {
|
||||
private CallbackInfo callbackInfo;
|
||||
public callback_call(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
|
||||
super(client, protocolFactory, transport, resultHandler, false);
|
||||
this.callbackInfo = callbackInfo;
|
||||
}
|
||||
|
||||
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
|
||||
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("callback", org.apache.thrift.protocol.TMessageType.CALL, 0));
|
||||
callback_args args = new callback_args();
|
||||
args.setCallbackInfo(callbackInfo);
|
||||
args.write(prot);
|
||||
prot.writeMessageEnd();
|
||||
}
|
||||
|
||||
public void getResult() throws org.apache.thrift.TException {
|
||||
if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
|
||||
throw new IllegalStateException("Method call not finished!");
|
||||
}
|
||||
org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
|
||||
org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
|
||||
(new Client(prot)).recv_callback();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
|
||||
public Processor(I iface) {
|
||||
super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
|
||||
}
|
||||
|
||||
protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
|
||||
super(iface, getProcessMap(processMap));
|
||||
}
|
||||
|
||||
private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
|
||||
processMap.put("callback", new callback());
|
||||
return processMap;
|
||||
}
|
||||
|
||||
public static class callback<I extends Iface> extends org.apache.thrift.ProcessFunction<I, callback_args> {
|
||||
public callback() {
|
||||
super("callback");
|
||||
}
|
||||
|
||||
public callback_args getEmptyArgsInstance() {
|
||||
return new callback_args();
|
||||
}
|
||||
|
||||
protected boolean isOneway() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public callback_result getResult(I iface, callback_args args) throws org.apache.thrift.TException {
|
||||
callback_result result = new callback_result();
|
||||
iface.callback(args.callbackInfo);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class.getName());
|
||||
public AsyncProcessor(I iface) {
|
||||
super(iface, getProcessMap(new HashMap<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
|
||||
}
|
||||
|
||||
protected AsyncProcessor(I iface, Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
|
||||
super(iface, getProcessMap(processMap));
|
||||
}
|
||||
|
||||
private static <I extends AsyncIface> Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
|
||||
processMap.put("callback", new callback());
|
||||
return processMap;
|
||||
}
|
||||
|
||||
public static class callback<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, callback_args, Void> {
|
||||
public callback() {
|
||||
super("callback");
|
||||
}
|
||||
|
||||
public callback_args getEmptyArgsInstance() {
|
||||
return new callback_args();
|
||||
}
|
||||
|
||||
public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
|
||||
final org.apache.thrift.AsyncProcessFunction fcall = this;
|
||||
return new AsyncMethodCallback<Void>() {
|
||||
public void onComplete(Void o) {
|
||||
callback_result result = new callback_result();
|
||||
try {
|
||||
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception writing to internal frame buffer", e);
|
||||
}
|
||||
fb.close();
|
||||
}
|
||||
public void onError(Exception e) {
|
||||
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
|
||||
org.apache.thrift.TBase msg;
|
||||
callback_result result = new callback_result();
|
||||
{
|
||||
msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
|
||||
msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
|
||||
}
|
||||
try {
|
||||
fcall.sendResponse(fb,msg,msgType,seqid);
|
||||
return;
|
||||
} catch (Exception ex) {
|
||||
LOGGER.error("Exception writing to internal frame buffer", ex);
|
||||
}
|
||||
fb.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected boolean isOneway() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void start(I iface, callback_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
|
||||
iface.callback(args.callbackInfo,resultHandler);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class callback_args implements org.apache.thrift.TBase<callback_args, callback_args._Fields>, java.io.Serializable, Cloneable, Comparable<callback_args> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("callback_args");
|
||||
|
||||
private static final org.apache.thrift.protocol.TField CALLBACK_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("callbackInfo", org.apache.thrift.protocol.TType.STRUCT, (short)1);
|
||||
|
||||
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
|
||||
static {
|
||||
schemes.put(StandardScheme.class, new callback_argsStandardSchemeFactory());
|
||||
schemes.put(TupleScheme.class, new callback_argsTupleSchemeFactory());
|
||||
}
|
||||
|
||||
public CallbackInfo callbackInfo; // required
|
||||
|
||||
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
|
||||
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
|
||||
CALLBACK_INFO((short)1, "callbackInfo");
|
||||
|
||||
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
|
||||
|
||||
static {
|
||||
for (_Fields field : EnumSet.allOf(_Fields.class)) {
|
||||
byName.put(field.getFieldName(), field);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByThriftId(int fieldId) {
|
||||
switch(fieldId) {
|
||||
case 1: // CALLBACK_INFO
|
||||
return CALLBACK_INFO;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, throwing an exception
|
||||
* if it is not found.
|
||||
*/
|
||||
public static _Fields findByThriftIdOrThrow(int fieldId) {
|
||||
_Fields fields = findByThriftId(fieldId);
|
||||
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
|
||||
return fields;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches name, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByName(String name) {
|
||||
return byName.get(name);
|
||||
}
|
||||
|
||||
private final short _thriftId;
|
||||
private final String _fieldName;
|
||||
|
||||
_Fields(short thriftId, String fieldName) {
|
||||
_thriftId = thriftId;
|
||||
_fieldName = fieldName;
|
||||
}
|
||||
|
||||
public short getThriftFieldId() {
|
||||
return _thriftId;
|
||||
}
|
||||
|
||||
public String getFieldName() {
|
||||
return _fieldName;
|
||||
}
|
||||
}
|
||||
|
||||
// isset id assignments
|
||||
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
|
||||
static {
|
||||
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
|
||||
tmpMap.put(_Fields.CALLBACK_INFO, new org.apache.thrift.meta_data.FieldMetaData("callbackInfo", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, CallbackInfo.class)));
|
||||
metaDataMap = Collections.unmodifiableMap(tmpMap);
|
||||
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_args.class, metaDataMap);
|
||||
}
|
||||
|
||||
public callback_args() {
|
||||
}
|
||||
|
||||
public callback_args(
|
||||
CallbackInfo callbackInfo)
|
||||
{
|
||||
this();
|
||||
this.callbackInfo = callbackInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a deep copy on <i>other</i>.
|
||||
*/
|
||||
public callback_args(callback_args other) {
|
||||
if (other.isSetCallbackInfo()) {
|
||||
this.callbackInfo = new CallbackInfo(other.callbackInfo);
|
||||
}
|
||||
}
|
||||
|
||||
public callback_args deepCopy() {
|
||||
return new callback_args(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
this.callbackInfo = null;
|
||||
}
|
||||
|
||||
public CallbackInfo getCallbackInfo() {
|
||||
return this.callbackInfo;
|
||||
}
|
||||
|
||||
public callback_args setCallbackInfo(CallbackInfo callbackInfo) {
|
||||
this.callbackInfo = callbackInfo;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetCallbackInfo() {
|
||||
this.callbackInfo = null;
|
||||
}
|
||||
|
||||
/** Returns true if field callbackInfo is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetCallbackInfo() {
|
||||
return this.callbackInfo != null;
|
||||
}
|
||||
|
||||
public void setCallbackInfoIsSet(boolean value) {
|
||||
if (!value) {
|
||||
this.callbackInfo = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setFieldValue(_Fields field, Object value) {
|
||||
switch (field) {
|
||||
case CALLBACK_INFO:
|
||||
if (value == null) {
|
||||
unsetCallbackInfo();
|
||||
} else {
|
||||
setCallbackInfo((CallbackInfo)value);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public Object getFieldValue(_Fields field) {
|
||||
switch (field) {
|
||||
case CALLBACK_INFO:
|
||||
return getCallbackInfo();
|
||||
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSet(_Fields field) {
|
||||
if (field == null) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
switch (field) {
|
||||
case CALLBACK_INFO:
|
||||
return isSetCallbackInfo();
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
if (that instanceof callback_args)
|
||||
return this.equals((callback_args)that);
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean equals(callback_args that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
|
||||
boolean this_present_callbackInfo = true && this.isSetCallbackInfo();
|
||||
boolean that_present_callbackInfo = true && that.isSetCallbackInfo();
|
||||
if (this_present_callbackInfo || that_present_callbackInfo) {
|
||||
if (!(this_present_callbackInfo && that_present_callbackInfo))
|
||||
return false;
|
||||
if (!this.callbackInfo.equals(that.callbackInfo))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
List<Object> list = new ArrayList<Object>();
|
||||
|
||||
boolean present_callbackInfo = true && (isSetCallbackInfo());
|
||||
list.add(present_callbackInfo);
|
||||
if (present_callbackInfo)
|
||||
list.add(callbackInfo);
|
||||
|
||||
return list.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(callback_args other) {
|
||||
if (!getClass().equals(other.getClass())) {
|
||||
return getClass().getName().compareTo(other.getClass().getName());
|
||||
}
|
||||
|
||||
int lastComparison = 0;
|
||||
|
||||
lastComparison = Boolean.valueOf(isSetCallbackInfo()).compareTo(other.isSetCallbackInfo());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetCallbackInfo()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.callbackInfo, other.callbackInfo);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public _Fields fieldForId(int fieldId) {
|
||||
return _Fields.findByThriftId(fieldId);
|
||||
}
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
|
||||
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
|
||||
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("callback_args(");
|
||||
boolean first = true;
|
||||
|
||||
sb.append("callbackInfo:");
|
||||
if (this.callbackInfo == null) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
sb.append(this.callbackInfo);
|
||||
}
|
||||
first = false;
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public void validate() throws org.apache.thrift.TException {
|
||||
// check for required fields
|
||||
// check for sub-struct validity
|
||||
if (callbackInfo != null) {
|
||||
callbackInfo.validate();
|
||||
}
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
|
||||
try {
|
||||
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
|
||||
try {
|
||||
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private static class callback_argsStandardSchemeFactory implements SchemeFactory {
|
||||
public callback_argsStandardScheme getScheme() {
|
||||
return new callback_argsStandardScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class callback_argsStandardScheme extends StandardScheme<callback_args> {
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot, callback_args struct) throws org.apache.thrift.TException {
|
||||
org.apache.thrift.protocol.TField schemeField;
|
||||
iprot.readStructBegin();
|
||||
while (true)
|
||||
{
|
||||
schemeField = iprot.readFieldBegin();
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
|
||||
break;
|
||||
}
|
||||
switch (schemeField.id) {
|
||||
case 1: // CALLBACK_INFO
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
|
||||
struct.callbackInfo = new CallbackInfo();
|
||||
struct.callbackInfo.read(iprot);
|
||||
struct.setCallbackInfoIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
iprot.readFieldEnd();
|
||||
}
|
||||
iprot.readStructEnd();
|
||||
|
||||
// check for required fields of primitive type, which can't be checked in the validate method
|
||||
struct.validate();
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot, callback_args struct) throws org.apache.thrift.TException {
|
||||
struct.validate();
|
||||
|
||||
oprot.writeStructBegin(STRUCT_DESC);
|
||||
if (struct.callbackInfo != null) {
|
||||
oprot.writeFieldBegin(CALLBACK_INFO_FIELD_DESC);
|
||||
struct.callbackInfo.write(oprot);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
oprot.writeFieldStop();
|
||||
oprot.writeStructEnd();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class callback_argsTupleSchemeFactory implements SchemeFactory {
|
||||
public callback_argsTupleScheme getScheme() {
|
||||
return new callback_argsTupleScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class callback_argsTupleScheme extends TupleScheme<callback_args> {
|
||||
|
||||
@Override
|
||||
public void write(org.apache.thrift.protocol.TProtocol prot, callback_args struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol oprot = (TTupleProtocol) prot;
|
||||
BitSet optionals = new BitSet();
|
||||
if (struct.isSetCallbackInfo()) {
|
||||
optionals.set(0);
|
||||
}
|
||||
oprot.writeBitSet(optionals, 1);
|
||||
if (struct.isSetCallbackInfo()) {
|
||||
struct.callbackInfo.write(oprot);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void read(org.apache.thrift.protocol.TProtocol prot, callback_args struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol iprot = (TTupleProtocol) prot;
|
||||
BitSet incoming = iprot.readBitSet(1);
|
||||
if (incoming.get(0)) {
|
||||
struct.callbackInfo = new CallbackInfo();
|
||||
struct.callbackInfo.read(iprot);
|
||||
struct.setCallbackInfoIsSet(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class callback_result implements org.apache.thrift.TBase<callback_result, callback_result._Fields>, java.io.Serializable, Cloneable, Comparable<callback_result> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("callback_result");
|
||||
|
||||
|
||||
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
|
||||
static {
|
||||
schemes.put(StandardScheme.class, new callback_resultStandardSchemeFactory());
|
||||
schemes.put(TupleScheme.class, new callback_resultTupleSchemeFactory());
|
||||
}
|
||||
|
||||
|
||||
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
|
||||
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
|
||||
;
|
||||
|
||||
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
|
||||
|
||||
static {
|
||||
for (_Fields field : EnumSet.allOf(_Fields.class)) {
|
||||
byName.put(field.getFieldName(), field);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByThriftId(int fieldId) {
|
||||
switch(fieldId) {
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, throwing an exception
|
||||
* if it is not found.
|
||||
*/
|
||||
public static _Fields findByThriftIdOrThrow(int fieldId) {
|
||||
_Fields fields = findByThriftId(fieldId);
|
||||
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
|
||||
return fields;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches name, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByName(String name) {
|
||||
return byName.get(name);
|
||||
}
|
||||
|
||||
private final short _thriftId;
|
||||
private final String _fieldName;
|
||||
|
||||
_Fields(short thriftId, String fieldName) {
|
||||
_thriftId = thriftId;
|
||||
_fieldName = fieldName;
|
||||
}
|
||||
|
||||
public short getThriftFieldId() {
|
||||
return _thriftId;
|
||||
}
|
||||
|
||||
public String getFieldName() {
|
||||
return _fieldName;
|
||||
}
|
||||
}
|
||||
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
|
||||
static {
|
||||
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
|
||||
metaDataMap = Collections.unmodifiableMap(tmpMap);
|
||||
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_result.class, metaDataMap);
|
||||
}
|
||||
|
||||
public callback_result() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a deep copy on <i>other</i>.
|
||||
*/
|
||||
public callback_result(callback_result other) {
|
||||
}
|
||||
|
||||
public callback_result deepCopy() {
|
||||
return new callback_result(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
}
|
||||
|
||||
public void setFieldValue(_Fields field, Object value) {
|
||||
switch (field) {
|
||||
}
|
||||
}
|
||||
|
||||
public Object getFieldValue(_Fields field) {
|
||||
switch (field) {
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSet(_Fields field) {
|
||||
if (field == null) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
switch (field) {
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
if (that instanceof callback_result)
|
||||
return this.equals((callback_result)that);
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean equals(callback_result that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
List<Object> list = new ArrayList<Object>();
|
||||
|
||||
return list.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(callback_result other) {
|
||||
if (!getClass().equals(other.getClass())) {
|
||||
return getClass().getName().compareTo(other.getClass().getName());
|
||||
}
|
||||
|
||||
int lastComparison = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
public _Fields fieldForId(int fieldId) {
|
||||
return _Fields.findByThriftId(fieldId);
|
||||
}
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
|
||||
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
|
||||
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("callback_result(");
|
||||
boolean first = true;
|
||||
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public void validate() throws org.apache.thrift.TException {
|
||||
// check for required fields
|
||||
// check for sub-struct validity
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
|
||||
try {
|
||||
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
|
||||
try {
|
||||
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private static class callback_resultStandardSchemeFactory implements SchemeFactory {
|
||||
public callback_resultStandardScheme getScheme() {
|
||||
return new callback_resultStandardScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class callback_resultStandardScheme extends StandardScheme<callback_result> {
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot, callback_result struct) throws org.apache.thrift.TException {
|
||||
org.apache.thrift.protocol.TField schemeField;
|
||||
iprot.readStructBegin();
|
||||
while (true)
|
||||
{
|
||||
schemeField = iprot.readFieldBegin();
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
|
||||
break;
|
||||
}
|
||||
switch (schemeField.id) {
|
||||
default:
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
iprot.readFieldEnd();
|
||||
}
|
||||
iprot.readStructEnd();
|
||||
|
||||
// check for required fields of primitive type, which can't be checked in the validate method
|
||||
struct.validate();
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot, callback_result struct) throws org.apache.thrift.TException {
|
||||
struct.validate();
|
||||
|
||||
oprot.writeStructBegin(STRUCT_DESC);
|
||||
oprot.writeFieldStop();
|
||||
oprot.writeStructEnd();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class callback_resultTupleSchemeFactory implements SchemeFactory {
|
||||
public callback_resultTupleScheme getScheme() {
|
||||
return new callback_resultTupleScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class callback_resultTupleScheme extends TupleScheme<callback_result> {
|
||||
|
||||
@Override
|
||||
public void write(org.apache.thrift.protocol.TProtocol prot, callback_result struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol oprot = (TTupleProtocol) prot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void read(org.apache.thrift.protocol.TProtocol prot, callback_result struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol iprot = (TTupleProtocol) prot;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -88,6 +88,11 @@ struct InterpreterCompletion {
|
|||
3: string meta
|
||||
}
|
||||
|
||||
struct CallbackInfo {
|
||||
1: string host,
|
||||
2: i32 port
|
||||
}
|
||||
|
||||
service RemoteInterpreterService {
|
||||
|
||||
void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties, 5: string userName);
|
||||
|
|
@ -131,3 +136,7 @@ service RemoteInterpreterService {
|
|||
|
||||
void onReceivedZeppelinResource(1: string object);
|
||||
}
|
||||
|
||||
service RemoteInterpreterCallbackService {
|
||||
void callback(1: CallbackInfo callbackInfo);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ public class RemoteInterpreterServerTest {
|
|||
|
||||
@Test
|
||||
public void testStartStop() throws InterruptedException, IOException, TException {
|
||||
RemoteInterpreterServer server = new RemoteInterpreterServer(
|
||||
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
|
||||
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
|
||||
assertEquals(false, server.isRunning());
|
||||
|
||||
|
|
@ -90,8 +90,8 @@ public class RemoteInterpreterServerTest {
|
|||
|
||||
@Test
|
||||
public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
|
||||
RemoteInterpreterServer server = new RemoteInterpreterServer(
|
||||
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
|
||||
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
|
||||
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
|
||||
assertEquals(false, server.isRunning());
|
||||
|
||||
server.start();
|
||||
|
|
|
|||
|
|
@ -28,6 +28,17 @@ public class RemoteInterpreterUtilsTest {
|
|||
@Test
|
||||
public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
|
||||
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
|
||||
|
||||
String portRange = ":30000";
|
||||
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) <= 30000);
|
||||
|
||||
portRange = "30000:";
|
||||
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) >= 30000);
|
||||
|
||||
portRange = "30000:40000";
|
||||
int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
|
||||
assertTrue(port >= 30000 && port <= 40000);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -232,7 +232,7 @@ public class GetUserList {
|
|||
return userlist;
|
||||
}
|
||||
|
||||
userquery = "select ? from ?";
|
||||
userquery = String.format("SELECT %s FROM %s", username, tablename);
|
||||
|
||||
} catch (IllegalAccessException e) {
|
||||
LOG.error("Error while accessing dataSource for JDBC Realm", e);
|
||||
|
|
@ -242,8 +242,6 @@ public class GetUserList {
|
|||
try {
|
||||
con = dataSource.getConnection();
|
||||
ps = con.prepareStatement(userquery);
|
||||
ps.setString(1, username);
|
||||
ps.setString(2, tablename);
|
||||
rs = ps.executeQuery();
|
||||
while (rs.next()) {
|
||||
userlist.add(rs.getString(1).trim());
|
||||
|
|
|
|||
|
|
@ -476,6 +476,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
}
|
||||
}
|
||||
|
||||
public String getCallbackPortRange() {
|
||||
return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
|
||||
}
|
||||
|
||||
public boolean isWindowsPath(String path){
|
||||
return path.matches("^[A-Za-z]:\\\\.*");
|
||||
}
|
||||
|
|
@ -684,7 +688,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
|
||||
|
||||
ZEPPELIN_HDFS_KEYTAB("zeppelin.hdfs.keytab", ""),
|
||||
ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", "");
|
||||
ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", ""),
|
||||
|
||||
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":");
|
||||
|
||||
private String varName;
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
|
|
|||
|
|
@ -705,7 +705,8 @@ public class InterpreterSetting {
|
|||
// create new remote process
|
||||
remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
|
||||
interpreterRunner != null ? interpreterRunner.getPath() :
|
||||
conf.getInterpreterRemoteRunnerPath(), interpreterDir, localRepoPath,
|
||||
conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
|
||||
interpreterDir, localRepoPath,
|
||||
getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
|
||||
remoteInterpreterProcessListener, appEventListener, group);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,10 +17,23 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.commons.exec.*;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteResultHandler;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.LogOutputStream;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.server.TServer;
|
||||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -29,6 +42,7 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* This class manages start / stop of remote interpreter process
|
||||
|
|
@ -37,11 +51,14 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
implements ExecuteResultHandler {
|
||||
private static final Logger logger = LoggerFactory.getLogger(
|
||||
RemoteInterpreterManagedProcess.class);
|
||||
private final String interpreterRunner;
|
||||
|
||||
private final String interpreterRunner;
|
||||
private final String portRange;
|
||||
private DefaultExecutor executor;
|
||||
private ExecuteWatchdog watchdog;
|
||||
boolean running = false;
|
||||
private AtomicBoolean running = new AtomicBoolean(false);
|
||||
TServer callbackServer;
|
||||
private String host = null;
|
||||
private int port = -1;
|
||||
private final String interpreterDir;
|
||||
private final String localRepoDir;
|
||||
|
|
@ -51,6 +68,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
|
||||
public RemoteInterpreterManagedProcess(
|
||||
String intpRunner,
|
||||
String portRange,
|
||||
String intpDir,
|
||||
String localRepoDir,
|
||||
Map<String, String> env,
|
||||
|
|
@ -61,6 +79,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
super(new RemoteInterpreterEventPoller(listener, appListener),
|
||||
connectTimeout);
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.portRange = portRange;
|
||||
this.env = env;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
|
|
@ -77,6 +96,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
super(remoteInterpreterEventPoller,
|
||||
connectTimeout);
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.portRange = ":";
|
||||
this.env = env;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
|
|
@ -96,18 +116,69 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
@Override
|
||||
public void start(String userName, Boolean isUserImpersonate) {
|
||||
// start server process
|
||||
final String callbackHost;
|
||||
final int callbackPort;
|
||||
try {
|
||||
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
|
||||
logger.info("Choose port {} for RemoteInterpreterProcess", port);
|
||||
callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
|
||||
callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
} catch (IOException e1) {
|
||||
throw new InterpreterException(e1);
|
||||
}
|
||||
|
||||
logger.info("Thrift server for callback will start. Port: {}", callbackPort);
|
||||
try {
|
||||
callbackServer = new TThreadPoolServer(
|
||||
new TThreadPoolServer.Args(new TServerSocket(callbackPort)).processor(
|
||||
new RemoteInterpreterCallbackService.Processor<>(
|
||||
new RemoteInterpreterCallbackService.Iface() {
|
||||
@Override
|
||||
public void callback(CallbackInfo callbackInfo) throws TException {
|
||||
logger.info("Registered: {}", callbackInfo);
|
||||
host = callbackInfo.getHost();
|
||||
port = callbackInfo.getPort();
|
||||
running.set(true);
|
||||
synchronized (running) {
|
||||
running.notify();
|
||||
}
|
||||
}
|
||||
})));
|
||||
// Start thrift server to receive callbackInfo from RemoteInterpreterServer;
|
||||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
callbackServer.serve();
|
||||
}
|
||||
}).start();
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (callbackServer.isServing()) {
|
||||
callbackServer.stop();
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
while (!callbackServer.isServing()) {
|
||||
logger.debug("callbackServer is not serving");
|
||||
Thread.sleep(500);
|
||||
}
|
||||
logger.debug("callbackServer is serving now");
|
||||
} catch (TTransportException e) {
|
||||
logger.error("callback server error.", e);
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("", e);
|
||||
}
|
||||
|
||||
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
|
||||
cmdLine.addArgument("-d", false);
|
||||
cmdLine.addArgument(interpreterDir, false);
|
||||
cmdLine.addArgument("-c", false);
|
||||
cmdLine.addArgument(callbackHost, false);
|
||||
cmdLine.addArgument("-p", false);
|
||||
cmdLine.addArgument(Integer.toString(port), false);
|
||||
cmdLine.addArgument(Integer.toString(callbackPort), false);
|
||||
if (isUserImpersonate && !userName.equals("anonymous")) {
|
||||
cmdLine.addArgument("-u", false);
|
||||
cmdLine.addArgument(userName, false);
|
||||
|
|
@ -133,45 +204,31 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
|
||||
logger.info("Run interpreter process {}", cmdLine);
|
||||
executor.execute(cmdLine, procEnv, this);
|
||||
running = true;
|
||||
} catch (IOException e) {
|
||||
running = false;
|
||||
running.set(false);
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
|
||||
if (!running) {
|
||||
try {
|
||||
cmdOut.flush();
|
||||
} catch (IOException e) {
|
||||
// nothing to do
|
||||
}
|
||||
throw new InterpreterException(new String(cmdOut.toByteArray()));
|
||||
}
|
||||
|
||||
try {
|
||||
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
|
||||
break;
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
|
||||
"Thread.sleep", e);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Remote interpreter not yet accessible at localhost:" + port);
|
||||
try {
|
||||
synchronized (running) {
|
||||
if (!running.get()) {
|
||||
running.wait(getConnectTimeout() * 2);
|
||||
}
|
||||
}
|
||||
if (!running.get()) {
|
||||
callbackServer.stop();
|
||||
throw new InterpreterException("Cannot run interpreter");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Remote interpreter is not accessible");
|
||||
}
|
||||
processOutput.setOutputStream(null);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (callbackServer.isServing()) {
|
||||
callbackServer.stop();
|
||||
}
|
||||
if (isRunning()) {
|
||||
logger.info("kill interpreter process");
|
||||
try {
|
||||
|
|
@ -190,25 +247,25 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
|
||||
executor = null;
|
||||
watchdog = null;
|
||||
running = false;
|
||||
running.set(false);
|
||||
logger.info("Remote process terminated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
logger.info("Interpreter process exited {}", exitValue);
|
||||
running = false;
|
||||
running.set(false);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
logger.info("Interpreter process failed {}", e);
|
||||
running = false;
|
||||
running.set(false);
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
return running.get();
|
||||
}
|
||||
|
||||
private static class ProcessLogOutputStream extends LogOutputStream {
|
||||
|
|
|
|||
|
|
@ -357,16 +357,16 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
config.put("cron", "* * * * * ?");
|
||||
note.setConfig(config);
|
||||
notebook.refreshCron(note.getId());
|
||||
Thread.sleep(1 * 1000);
|
||||
Thread.sleep(2 * 1000);
|
||||
|
||||
// remove cron scheduler.
|
||||
config.put("cron", null);
|
||||
note.setConfig(config);
|
||||
notebook.refreshCron(note.getId());
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(2 * 1000);
|
||||
dateFinished = p.getDateFinished();
|
||||
assertNotNull(dateFinished);
|
||||
Thread.sleep(1 * 1000);
|
||||
Thread.sleep(2 * 1000);
|
||||
assertEquals(dateFinished, p.getDateFinished());
|
||||
notebook.removeNote(note.getId(), anonymous);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue