Merge branch 'master' into ZEPPELIN-277

This commit is contained in:
Malay Majithia 2017-10-10 18:06:48 +05:30
commit b09730e039
19 changed files with 1882 additions and 120 deletions

View file

@ -23,7 +23,7 @@ function usage() {
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
}
while getopts "hp:d:l:v:u:g:" o; do
while getopts "hc:p:d:l:v:u:g:" o; do
case ${o} in
h)
usage
@ -32,8 +32,11 @@ while getopts "hp:d:l:v:u:g:" o; do
d)
INTERPRETER_DIR=${OPTARG}
;;
c)
CALLBACK_HOST=${OPTARG} # This will be used callback host
;;
p)
PORT=${OPTARG}
PORT=${OPTARG} # This will be used callback port
;;
l)
LOCAL_INTERPRETER_REPO=${OPTARG}
@ -202,12 +205,12 @@ fi
if [[ -n "${SPARK_SUBMIT}" ]]; then
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}`
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
else
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${PORT}`
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
fi
else
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} `
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} `
fi
if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then

View file

@ -430,4 +430,10 @@
<description>The HTTP X-XSS-Protection response header is a feature of Internet Explorer, Chrome and Safari that stops pages from loading when they detect reflected cross-site scripting (XSS) attacks. When value is set to 1 and a cross-site scripting attack is detected, the browser will sanitize the page (remove the unsafe parts).</description>
</property>
-->
<!--
<property>
<name>zeppelin.interpreter.callback.portRange</name>
<value>10000:10010</value>
</property>
-->
</configuration>

View file

@ -196,6 +196,13 @@ Staring from 0.6.1 SparkSession is available as variable `spark` when you are us
<a name="dependencyloading"> </a>
### How to pass property to SparkConf
There're 2 kinds of properties that would be passed to SparkConf
* Standard spark property (prefix with `spark.`). e.g. `spark.executor.memory` will be passed to `SparkConf`
* Non-standard spark property (prefix with `zeppelin.spark.`). e.g. `zeppelin.spark.property_1`, `property_1` will be passed to `SparkConf`
## Dependency Management
There are two ways to load external libraries in Spark interpreter. First is using interpreter setting menu and second is loading Spark properties.

View file

@ -38,8 +38,8 @@ public class ZeppelinDevServer extends
private DevInterpreter interpreter = null;
private InterpreterOutput out;
public ZeppelinDevServer(int port) throws TException {
super(port);
public ZeppelinDevServer(int port) throws TException, IOException {
super(null, port);
}
@Override

View file

@ -381,9 +381,16 @@ public class SparkInterpreter extends Interpreter {
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));
if (key.startsWith("spark.") && !val.trim().isEmpty()) {
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
conf.set(key, val);
if (!val.trim().isEmpty()) {
if (key.startsWith("spark.")) {
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
conf.set(key, val);
}
if (key.startsWith("zeppelin.spark.")) {
String sparkPropertyKey = key.substring("zeppelin.spark.".length());
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
conf.set(sparkPropertyKey, val);
}
}
}
@ -509,9 +516,17 @@ public class SparkInterpreter extends Interpreter {
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));
if (key.startsWith("spark.") && !val.trim().isEmpty()) {
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
conf.set(key, val);
if (!val.trim().isEmpty()) {
if (key.startsWith("spark.")) {
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
conf.set(key, val);
}
if (key.startsWith("zeppelin.spark.")) {
String sparkPropertyKey = key.substring("zeppelin.spark.".length());
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
conf.set(sparkPropertyKey, val);
}
}
}
setupConfForPySpark(conf);
@ -1096,23 +1111,33 @@ public class SparkInterpreter extends Interpreter {
if (buf.length() < cursor) {
cursor = buf.length();
}
String completionText = getCompletionTargetString(buf, cursor);
if (completionText == null) {
completionText = "";
cursor = completionText.length();
}
ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
Candidates ret = c.complete(completionText, cursor);
List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
List<InterpreterCompletion> completions = new LinkedList<>();
for (String candidate : candidates) {
completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
if (Utils.isScala2_10() || !Utils.isCompilerAboveScala2_11_7()) {
String singleToken = getCompletionTargetString(buf, cursor);
Candidates ret = c.complete(singleToken, singleToken.length());
List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
List<InterpreterCompletion> completions = new LinkedList<>();
for (String candidate : candidates) {
completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
}
return completions;
} else {
Candidates ret = c.complete(buf, cursor);
List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
List<InterpreterCompletion> completions = new LinkedList<>();
for (String candidate : candidates) {
completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
}
return completions;
}
return completions;
}
private String getCompletionTargetString(String text, int cursor) {

View file

@ -24,18 +24,22 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Utility and helper functions for the Spark Interpreter
*/
class Utils {
public static Logger logger = LoggerFactory.getLogger(Utils.class);
private static final String SCALA_COMPILER_VERSION = evaluateScalaCompilerVersion();
static Object invokeMethod(Object o, String name) {
return invokeMethod(o, name, new Class[]{}, new Object[]{});
}
static Object invokeMethod(Object o, String name, Class[] argTypes, Object[] params) {
static Object invokeMethod(Object o, String name, Class<?>[] argTypes, Object[] params) {
try {
return o.getClass().getMethod(name, argTypes).invoke(o, params);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
@ -44,7 +48,7 @@ class Utils {
return null;
}
static Object invokeStaticMethod(Class c, String name, Class[] argTypes, Object[] params) {
static Object invokeStaticMethod(Class<?> c, String name, Class<?>[] argTypes, Object[] params) {
try {
return c.getMethod(name, argTypes).invoke(null, params);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
@ -53,17 +57,17 @@ class Utils {
return null;
}
static Object invokeStaticMethod(Class c, String name) {
static Object invokeStaticMethod(Class<?> c, String name) {
return invokeStaticMethod(c, name, new Class[]{}, new Object[]{});
}
static Class findClass(String name) {
static Class<?> findClass(String name) {
return findClass(name, false);
}
static Class findClass(String name, boolean silence) {
static Class<?> findClass(String name, boolean silence) {
try {
return Utils.class.forName(name);
return Class.forName(name);
} catch (ClassNotFoundException e) {
if (!silence) {
logger.error(e.getMessage(), e);
@ -72,7 +76,7 @@ class Utils {
}
}
static Object instantiateClass(String name, Class[] argTypes, Object[] params) {
static Object instantiateClass(String name, Class<?>[] argTypes, Object[] params) {
try {
Constructor<?> constructor = Utils.class.getClassLoader()
.loadClass(name).getConstructor(argTypes);
@ -87,7 +91,7 @@ class Utils {
// function works after intp is initialized
static boolean isScala2_10() {
try {
Utils.class.forName("org.apache.spark.repl.SparkIMain");
Class.forName("org.apache.spark.repl.SparkIMain");
return true;
} catch (ClassNotFoundException e) {
return false;
@ -99,10 +103,45 @@ class Utils {
static boolean isScala2_11() {
return !isScala2_10();
}
static boolean isCompilerAboveScala2_11_7() {
if (isScala2_10() || SCALA_COMPILER_VERSION == null) {
return false;
}
Pattern p = Pattern.compile("([0-9]+)[.]([0-9]+)[.]([0-9]+)");
Matcher m = p.matcher(SCALA_COMPILER_VERSION);
if (m.matches()) {
int major = Integer.parseInt(m.group(1));
int minor = Integer.parseInt(m.group(2));
int bugfix = Integer.parseInt(m.group(3));
return (major > 2 || (major == 2 && minor > 11) || (major == 2 && minor == 11 && bugfix > 7));
}
return false;
}
private static String evaluateScalaCompilerVersion() {
String version = null;
try {
Properties p = new Properties();
Class<?> completionClass = findClass("scala.tools.nsc.interpreter.JLineCompletion");
if (completionClass != null) {
try (java.io.InputStream in = completionClass.getClass()
.getResourceAsStream("/compiler.properties")) {
p.load(in);
version = p.getProperty("version.number");
} catch (java.io.IOException e) {
logger.error("Failed to evaluate Scala compiler version", e);
}
}
} catch (RuntimeException e) {
logger.error("Failed to evaluate Scala compiler version", e);
}
return version;
}
static boolean isSpark2() {
try {
Utils.class.forName("org.apache.spark.sql.SparkSession");
Class.forName("org.apache.spark.sql.SparkSession");
return true;
} catch (ClassNotFoundException e) {
return false;

View file

@ -78,7 +78,7 @@ public class SparkInterpreterTest {
p.setProperty("zeppelin.spark.maxResult", "1000");
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
p.setProperty("zeppelin.spark.property_1", "value_1");
return p;
}
@ -151,6 +151,13 @@ public class SparkInterpreterTest {
*/
}
@Test
public void testNonStandardSparkProperties() throws IOException {
// throw NoSuchElementException if no such property is found
InterpreterResult result = repl.interpret("sc.getConf.get(\"property_1\")", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
@Test
public void testNextLineInvocation() {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
@ -305,6 +312,22 @@ public class SparkInterpreterTest {
assertTrue(completions.size() > 0);
}
@Test
public void testMultilineCompletion() {
String buf = "val x = 1\nsc.";
List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
assertTrue(completions.size() > 0);
}
@Test
public void testMultilineCompletionNewVar() {
Assume.assumeFalse("this feature does not work with scala 2.10", Utils.isScala2_10());
Assume.assumeTrue("This feature does not work with scala < 2.11.8", Utils.isCompilerAboveScala2_11_7());
String buf = "val x = sc\nx.";
List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
assertTrue(completions.size() > 0);
}
@Test
public void testParagraphUrls() {
String paraId = "test_para_job_url";

View file

@ -17,29 +17,55 @@
package org.apache.zeppelin.interpreter.remote;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.*;
import org.apache.zeppelin.helium.*;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.helium.Application;
import org.apache.zeppelin.helium.ApplicationContext;
import org.apache.zeppelin.helium.ApplicationException;
import org.apache.zeppelin.helium.ApplicationLoader;
import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterHookListener;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.*;
import org.apache.zeppelin.resource.*;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
import org.apache.zeppelin.resource.DistributedResourcePool;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourceSet;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@ -49,8 +75,22 @@ import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Entry point for Interpreter process.
@ -70,6 +110,9 @@ public class RemoteInterpreterServer
Gson gson = new Gson();
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
private String callbackHost;
private int callbackPort;
private String host;
private int port;
private TThreadPoolServer server;
@ -87,11 +130,34 @@ public class RemoteInterpreterServer
// Hold information for manual progress update
private ConcurrentMap<String, Integer> progressMap = new ConcurrentHashMap<>();
public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
private boolean isTest;
public RemoteInterpreterServer(String callbackHost, int port) throws IOException,
TTransportException {
this(callbackHost, port, false);
}
public RemoteInterpreterServer(String callbackHost, int port, boolean isTest)
throws TTransportException, IOException {
if (null != callbackHost) {
this.callbackHost = callbackHost;
this.callbackPort = port;
} else {
// DevInterpreter
this.port = port;
}
this.isTest = isTest;
processor = new RemoteInterpreterService.Processor<>(this);
TServerSocket serverTransport = new TServerSocket(port);
TServerSocket serverTransport;
if (null == callbackHost) {
// Dev Interpreter
serverTransport = new TServerSocket(port);
} else {
this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
this.host = RemoteInterpreterUtils.findAvailableHostAddress();
serverTransport = new TServerSocket(this.port);
}
server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport).processor(processor));
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
@ -100,6 +166,36 @@ public class RemoteInterpreterServer
@Override
public void run() {
if (null != callbackHost && !isTest) {
new Thread(new Runnable() {
boolean interrupted = false;
@Override
public void run() {
while (!interrupted && !server.isServing()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
interrupted = true;
}
}
if (!interrupted) {
CallbackInfo callbackInfo = new CallbackInfo(host, port);
try {
RemoteInterpreterUtils
.registerInterpreter(callbackHost, callbackPort, callbackInfo);
} catch (TException e) {
logger.error("Error while registering interpreter: {}", callbackInfo, e);
try {
shutdown();
} catch (TException e1) {
logger.warn("Exception occurs while shutting down", e1);
}
}
}
}
}).start();
}
logger.info("Starting remote interpreter server on port {}", port);
server.serve();
}
@ -151,13 +247,15 @@ public class RemoteInterpreterServer
public static void main(String[] args)
throws TTransportException, InterruptedException {
throws TTransportException, InterruptedException, IOException {
String callbackHost = null;
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
callbackHost = args[0];
port = Integer.parseInt(args[1]);
}
RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port);
RemoteInterpreterServer remoteInterpreterServer =
new RemoteInterpreterServer(callbackHost, port);
remoteInterpreterServer.start();
remoteInterpreterServer.join();
System.exit(0);

View file

@ -17,27 +17,96 @@
package org.apache.zeppelin.interpreter.remote;
import java.io.IOException;
import java.net.ConnectException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Collections;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
/**
*
*/
public class RemoteInterpreterUtils {
static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class);
public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
int port;
try (ServerSocket socket = new ServerSocket(0);) {
port = socket.getLocalPort();
socket.close();
return findRandomAvailablePortOnAllLocalInterfaces(":");
}
/**
* start:end
*
* @param portRange
* @return
* @throws IOException
*/
public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange)
throws IOException {
// ':' is the default value which means no constraints on the portRange
if (portRange == null || portRange.equals(":")) {
int port;
try (ServerSocket socket = new ServerSocket(0);) {
port = socket.getLocalPort();
socket.close();
}
return port;
}
return port;
// valid user registered port https://en.wikipedia.org/wiki/Registered_port
int start = 1024;
int end = 49151;
String[] ports = portRange.split(":", -1);
if (!ports[0].isEmpty()) {
start = Integer.parseInt(ports[0]);
}
if (!ports[1].isEmpty()) {
end = Integer.parseInt(ports[1]);
}
for (int i = start; i <= end; ++i) {
try {
ServerSocket socket = new ServerSocket(i);
return socket.getLocalPort();
} catch (Exception e) {
// ignore this
}
}
throw new IOException("No available port in the portRange: " + portRange);
}
public static String findAvailableHostAddress() throws UnknownHostException, SocketException {
InetAddress address = InetAddress.getLocalHost();
if (address.isLoopbackAddress()) {
for (NetworkInterface networkInterface : Collections
.list(NetworkInterface.getNetworkInterfaces())) {
if (!networkInterface.isLoopback()) {
for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) {
InetAddress a = interfaceAddress.getAddress();
if (a instanceof Inet4Address) {
return a.getHostAddress();
}
}
}
}
}
return address.getHostAddress();
}
public static boolean checkIfRemoteEndpointAccessible(String host, int port) {
@ -80,4 +149,17 @@ public class RemoteInterpreterUtils {
return key.matches("^[A-Z_0-9]*");
}
public static void registerInterpreter(String callbackHost, int callbackPort,
final CallbackInfo callbackInfo) throws TException {
LOGGER.info("callbackHost: {}, callbackPort: {}, callbackInfo: {}", callbackHost, callbackPort,
callbackInfo);
try (TTransport transport = new TSocket(callbackHost, callbackPort)) {
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
RemoteInterpreterCallbackService.Client client = new RemoteInterpreterCallbackService.Client(
protocol);
client.callback(callbackInfo);
}
}
}

View file

@ -0,0 +1,518 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package org.apache.zeppelin.interpreter.thrift;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.SchemeFactory;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.scheme.TupleScheme;
import org.apache.thrift.protocol.TTupleProtocol;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.EncodingUtils;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.server.AbstractNonblockingServer.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.EnumMap;
import java.util.Set;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.Collections;
import java.util.BitSet;
import java.nio.ByteBuffer;
import java.util.Arrays;
import javax.annotation.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-6-17")
public class CallbackInfo implements org.apache.thrift.TBase<CallbackInfo, CallbackInfo._Fields>, java.io.Serializable, Cloneable, Comparable<CallbackInfo> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CallbackInfo");
private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
schemes.put(StandardScheme.class, new CallbackInfoStandardSchemeFactory());
schemes.put(TupleScheme.class, new CallbackInfoTupleSchemeFactory());
}
public String host; // required
public int port; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
HOST((short)1, "host"),
PORT((short)2, "port");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}
/**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
case 1: // HOST
return HOST;
case 2: // PORT
return PORT;
default:
return null;
}
}
/**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}
/**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
}
private final short _thriftId;
private final String _fieldName;
_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {
return _thriftId;
}
public String getFieldName() {
return _fieldName;
}
}
// isset id assignments
private static final int __PORT_ISSET_ID = 0;
private byte __isset_bitfield = 0;
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CallbackInfo.class, metaDataMap);
}
public CallbackInfo() {
}
public CallbackInfo(
String host,
int port)
{
this();
this.host = host;
this.port = port;
setPortIsSet(true);
}
/**
* Performs a deep copy on <i>other</i>.
*/
public CallbackInfo(CallbackInfo other) {
__isset_bitfield = other.__isset_bitfield;
if (other.isSetHost()) {
this.host = other.host;
}
this.port = other.port;
}
public CallbackInfo deepCopy() {
return new CallbackInfo(this);
}
@Override
public void clear() {
this.host = null;
setPortIsSet(false);
this.port = 0;
}
public String getHost() {
return this.host;
}
public CallbackInfo setHost(String host) {
this.host = host;
return this;
}
public void unsetHost() {
this.host = null;
}
/** Returns true if field host is set (has been assigned a value) and false otherwise */
public boolean isSetHost() {
return this.host != null;
}
public void setHostIsSet(boolean value) {
if (!value) {
this.host = null;
}
}
public int getPort() {
return this.port;
}
public CallbackInfo setPort(int port) {
this.port = port;
setPortIsSet(true);
return this;
}
public void unsetPort() {
__isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID);
}
/** Returns true if field port is set (has been assigned a value) and false otherwise */
public boolean isSetPort() {
return EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID);
}
public void setPortIsSet(boolean value) {
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value);
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case HOST:
if (value == null) {
unsetHost();
} else {
setHost((String)value);
}
break;
case PORT:
if (value == null) {
unsetPort();
} else {
setPort((Integer)value);
}
break;
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
case HOST:
return getHost();
case PORT:
return Integer.valueOf(getPort());
}
throw new IllegalStateException();
}
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}
switch (field) {
case HOST:
return isSetHost();
case PORT:
return isSetPort();
}
throw new IllegalStateException();
}
@Override
public boolean equals(Object that) {
if (that == null)
return false;
if (that instanceof CallbackInfo)
return this.equals((CallbackInfo)that);
return false;
}
public boolean equals(CallbackInfo that) {
if (that == null)
return false;
boolean this_present_host = true && this.isSetHost();
boolean that_present_host = true && that.isSetHost();
if (this_present_host || that_present_host) {
if (!(this_present_host && that_present_host))
return false;
if (!this.host.equals(that.host))
return false;
}
boolean this_present_port = true;
boolean that_present_port = true;
if (this_present_port || that_present_port) {
if (!(this_present_port && that_present_port))
return false;
if (this.port != that.port)
return false;
}
return true;
}
@Override
public int hashCode() {
List<Object> list = new ArrayList<Object>();
boolean present_host = true && (isSetHost());
list.add(present_host);
if (present_host)
list.add(host);
boolean present_port = true;
list.add(present_port);
if (present_port)
list.add(port);
return list.hashCode();
}
@Override
public int compareTo(CallbackInfo other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}
int lastComparison = 0;
lastComparison = Boolean.valueOf(isSetHost()).compareTo(other.isSetHost());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetHost()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, other.host);
if (lastComparison != 0) {
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetPort()).compareTo(other.isSetPort());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetPort()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}
public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("CallbackInfo(");
boolean first = true;
sb.append("host:");
if (this.host == null) {
sb.append("null");
} else {
sb.append(this.host);
}
first = false;
if (!first) sb.append(", ");
sb.append("port:");
sb.append(this.port);
first = false;
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
// it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
__isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private static class CallbackInfoStandardSchemeFactory implements SchemeFactory {
public CallbackInfoStandardScheme getScheme() {
return new CallbackInfoStandardScheme();
}
}
private static class CallbackInfoStandardScheme extends StandardScheme<CallbackInfo> {
public void read(org.apache.thrift.protocol.TProtocol iprot, CallbackInfo struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // HOST
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.host = iprot.readString();
struct.setHostIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 2: // PORT
if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
struct.port = iprot.readI32();
struct.setPortIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
public void write(org.apache.thrift.protocol.TProtocol oprot, CallbackInfo struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.host != null) {
oprot.writeFieldBegin(HOST_FIELD_DESC);
oprot.writeString(struct.host);
oprot.writeFieldEnd();
}
oprot.writeFieldBegin(PORT_FIELD_DESC);
oprot.writeI32(struct.port);
oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
private static class CallbackInfoTupleSchemeFactory implements SchemeFactory {
public CallbackInfoTupleScheme getScheme() {
return new CallbackInfoTupleScheme();
}
}
private static class CallbackInfoTupleScheme extends TupleScheme<CallbackInfo> {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, CallbackInfo struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
BitSet optionals = new BitSet();
if (struct.isSetHost()) {
optionals.set(0);
}
if (struct.isSetPort()) {
optionals.set(1);
}
oprot.writeBitSet(optionals, 2);
if (struct.isSetHost()) {
oprot.writeString(struct.host);
}
if (struct.isSetPort()) {
oprot.writeI32(struct.port);
}
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, CallbackInfo struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
struct.host = iprot.readString();
struct.setHostIsSet(true);
}
if (incoming.get(1)) {
struct.port = iprot.readI32();
struct.setPortIsSet(true);
}
}
}
}

View file

@ -0,0 +1,879 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package org.apache.zeppelin.interpreter.thrift;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.SchemeFactory;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.scheme.TupleScheme;
import org.apache.thrift.protocol.TTupleProtocol;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.EncodingUtils;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.server.AbstractNonblockingServer.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.EnumMap;
import java.util.Set;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.Collections;
import java.util.BitSet;
import java.nio.ByteBuffer;
import java.util.Arrays;
import javax.annotation.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-6-17")
public class RemoteInterpreterCallbackService {
public interface Iface {
public void callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException;
}
public interface AsyncIface {
public void callback(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
}
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
public Factory() {}
public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
return new Client(prot);
}
public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
return new Client(iprot, oprot);
}
}
public Client(org.apache.thrift.protocol.TProtocol prot)
{
super(prot, prot);
}
public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
super(iprot, oprot);
}
public void callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException
{
send_callback(callbackInfo);
recv_callback();
}
public void send_callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException
{
callback_args args = new callback_args();
args.setCallbackInfo(callbackInfo);
sendBase("callback", args);
}
public void recv_callback() throws org.apache.thrift.TException
{
callback_result result = new callback_result();
receiveBase(result, "callback");
return;
}
}
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
private org.apache.thrift.async.TAsyncClientManager clientManager;
private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
this.clientManager = clientManager;
this.protocolFactory = protocolFactory;
}
public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
return new AsyncClient(protocolFactory, clientManager, transport);
}
}
public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
super(protocolFactory, clientManager, transport);
}
public void callback(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
callback_call method_call = new callback_call(callbackInfo, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
public static class callback_call extends org.apache.thrift.async.TAsyncMethodCall {
private CallbackInfo callbackInfo;
public callback_call(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.callbackInfo = callbackInfo;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("callback", org.apache.thrift.protocol.TMessageType.CALL, 0));
callback_args args = new callback_args();
args.setCallbackInfo(callbackInfo);
args.write(prot);
prot.writeMessageEnd();
}
public void getResult() throws org.apache.thrift.TException {
if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
throw new IllegalStateException("Method call not finished!");
}
org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
(new Client(prot)).recv_callback();
}
}
}
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
public Processor(I iface) {
super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
processMap.put("callback", new callback());
return processMap;
}
public static class callback<I extends Iface> extends org.apache.thrift.ProcessFunction<I, callback_args> {
public callback() {
super("callback");
}
public callback_args getEmptyArgsInstance() {
return new callback_args();
}
protected boolean isOneway() {
return false;
}
public callback_result getResult(I iface, callback_args args) throws org.apache.thrift.TException {
callback_result result = new callback_result();
iface.callback(args.callbackInfo);
return result;
}
}
}
public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class.getName());
public AsyncProcessor(I iface) {
super(iface, getProcessMap(new HashMap<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
}
protected AsyncProcessor(I iface, Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends AsyncIface> Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
processMap.put("callback", new callback());
return processMap;
}
public static class callback<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, callback_args, Void> {
public callback() {
super("callback");
}
public callback_args getEmptyArgsInstance() {
return new callback_args();
}
public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
return new AsyncMethodCallback<Void>() {
public void onComplete(Void o) {
callback_result result = new callback_result();
try {
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
return;
} catch (Exception e) {
LOGGER.error("Exception writing to internal frame buffer", e);
}
fb.close();
}
public void onError(Exception e) {
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
org.apache.thrift.TBase msg;
callback_result result = new callback_result();
{
msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
}
try {
fcall.sendResponse(fb,msg,msgType,seqid);
return;
} catch (Exception ex) {
LOGGER.error("Exception writing to internal frame buffer", ex);
}
fb.close();
}
};
}
protected boolean isOneway() {
return false;
}
public void start(I iface, callback_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
iface.callback(args.callbackInfo,resultHandler);
}
}
}
public static class callback_args implements org.apache.thrift.TBase<callback_args, callback_args._Fields>, java.io.Serializable, Cloneable, Comparable<callback_args> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("callback_args");
private static final org.apache.thrift.protocol.TField CALLBACK_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("callbackInfo", org.apache.thrift.protocol.TType.STRUCT, (short)1);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
schemes.put(StandardScheme.class, new callback_argsStandardSchemeFactory());
schemes.put(TupleScheme.class, new callback_argsTupleSchemeFactory());
}
public CallbackInfo callbackInfo; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
CALLBACK_INFO((short)1, "callbackInfo");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}
/**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
case 1: // CALLBACK_INFO
return CALLBACK_INFO;
default:
return null;
}
}
/**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}
/**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
}
private final short _thriftId;
private final String _fieldName;
_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {
return _thriftId;
}
public String getFieldName() {
return _fieldName;
}
}
// isset id assignments
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.CALLBACK_INFO, new org.apache.thrift.meta_data.FieldMetaData("callbackInfo", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, CallbackInfo.class)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_args.class, metaDataMap);
}
public callback_args() {
}
public callback_args(
CallbackInfo callbackInfo)
{
this();
this.callbackInfo = callbackInfo;
}
/**
* Performs a deep copy on <i>other</i>.
*/
public callback_args(callback_args other) {
if (other.isSetCallbackInfo()) {
this.callbackInfo = new CallbackInfo(other.callbackInfo);
}
}
public callback_args deepCopy() {
return new callback_args(this);
}
@Override
public void clear() {
this.callbackInfo = null;
}
public CallbackInfo getCallbackInfo() {
return this.callbackInfo;
}
public callback_args setCallbackInfo(CallbackInfo callbackInfo) {
this.callbackInfo = callbackInfo;
return this;
}
public void unsetCallbackInfo() {
this.callbackInfo = null;
}
/** Returns true if field callbackInfo is set (has been assigned a value) and false otherwise */
public boolean isSetCallbackInfo() {
return this.callbackInfo != null;
}
public void setCallbackInfoIsSet(boolean value) {
if (!value) {
this.callbackInfo = null;
}
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case CALLBACK_INFO:
if (value == null) {
unsetCallbackInfo();
} else {
setCallbackInfo((CallbackInfo)value);
}
break;
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
case CALLBACK_INFO:
return getCallbackInfo();
}
throw new IllegalStateException();
}
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}
switch (field) {
case CALLBACK_INFO:
return isSetCallbackInfo();
}
throw new IllegalStateException();
}
@Override
public boolean equals(Object that) {
if (that == null)
return false;
if (that instanceof callback_args)
return this.equals((callback_args)that);
return false;
}
public boolean equals(callback_args that) {
if (that == null)
return false;
boolean this_present_callbackInfo = true && this.isSetCallbackInfo();
boolean that_present_callbackInfo = true && that.isSetCallbackInfo();
if (this_present_callbackInfo || that_present_callbackInfo) {
if (!(this_present_callbackInfo && that_present_callbackInfo))
return false;
if (!this.callbackInfo.equals(that.callbackInfo))
return false;
}
return true;
}
@Override
public int hashCode() {
List<Object> list = new ArrayList<Object>();
boolean present_callbackInfo = true && (isSetCallbackInfo());
list.add(present_callbackInfo);
if (present_callbackInfo)
list.add(callbackInfo);
return list.hashCode();
}
@Override
public int compareTo(callback_args other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}
int lastComparison = 0;
lastComparison = Boolean.valueOf(isSetCallbackInfo()).compareTo(other.isSetCallbackInfo());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetCallbackInfo()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.callbackInfo, other.callbackInfo);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}
public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("callback_args(");
boolean first = true;
sb.append("callbackInfo:");
if (this.callbackInfo == null) {
sb.append("null");
} else {
sb.append(this.callbackInfo);
}
first = false;
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
if (callbackInfo != null) {
callbackInfo.validate();
}
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private static class callback_argsStandardSchemeFactory implements SchemeFactory {
public callback_argsStandardScheme getScheme() {
return new callback_argsStandardScheme();
}
}
private static class callback_argsStandardScheme extends StandardScheme<callback_args> {
public void read(org.apache.thrift.protocol.TProtocol iprot, callback_args struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // CALLBACK_INFO
if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
struct.callbackInfo = new CallbackInfo();
struct.callbackInfo.read(iprot);
struct.setCallbackInfoIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
public void write(org.apache.thrift.protocol.TProtocol oprot, callback_args struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.callbackInfo != null) {
oprot.writeFieldBegin(CALLBACK_INFO_FIELD_DESC);
struct.callbackInfo.write(oprot);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
private static class callback_argsTupleSchemeFactory implements SchemeFactory {
public callback_argsTupleScheme getScheme() {
return new callback_argsTupleScheme();
}
}
private static class callback_argsTupleScheme extends TupleScheme<callback_args> {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, callback_args struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
BitSet optionals = new BitSet();
if (struct.isSetCallbackInfo()) {
optionals.set(0);
}
oprot.writeBitSet(optionals, 1);
if (struct.isSetCallbackInfo()) {
struct.callbackInfo.write(oprot);
}
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, callback_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
struct.callbackInfo = new CallbackInfo();
struct.callbackInfo.read(iprot);
struct.setCallbackInfoIsSet(true);
}
}
}
}
public static class callback_result implements org.apache.thrift.TBase<callback_result, callback_result._Fields>, java.io.Serializable, Cloneable, Comparable<callback_result> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("callback_result");
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
schemes.put(StandardScheme.class, new callback_resultStandardSchemeFactory());
schemes.put(TupleScheme.class, new callback_resultTupleSchemeFactory());
}
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
;
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}
/**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
default:
return null;
}
}
/**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}
/**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
}
private final short _thriftId;
private final String _fieldName;
_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {
return _thriftId;
}
public String getFieldName() {
return _fieldName;
}
}
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_result.class, metaDataMap);
}
public callback_result() {
}
/**
* Performs a deep copy on <i>other</i>.
*/
public callback_result(callback_result other) {
}
public callback_result deepCopy() {
return new callback_result(this);
}
@Override
public void clear() {
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
}
throw new IllegalStateException();
}
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}
switch (field) {
}
throw new IllegalStateException();
}
@Override
public boolean equals(Object that) {
if (that == null)
return false;
if (that instanceof callback_result)
return this.equals((callback_result)that);
return false;
}
public boolean equals(callback_result that) {
if (that == null)
return false;
return true;
}
@Override
public int hashCode() {
List<Object> list = new ArrayList<Object>();
return list.hashCode();
}
@Override
public int compareTo(callback_result other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}
int lastComparison = 0;
return 0;
}
public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("callback_result(");
boolean first = true;
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private static class callback_resultStandardSchemeFactory implements SchemeFactory {
public callback_resultStandardScheme getScheme() {
return new callback_resultStandardScheme();
}
}
private static class callback_resultStandardScheme extends StandardScheme<callback_result> {
public void read(org.apache.thrift.protocol.TProtocol iprot, callback_result struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
public void write(org.apache.thrift.protocol.TProtocol oprot, callback_result struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
private static class callback_resultTupleSchemeFactory implements SchemeFactory {
public callback_resultTupleScheme getScheme() {
return new callback_resultTupleScheme();
}
}
private static class callback_resultTupleScheme extends TupleScheme<callback_result> {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, callback_result struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, callback_result struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
}
}
}
}

View file

@ -88,6 +88,11 @@ struct InterpreterCompletion {
3: string meta
}
struct CallbackInfo {
1: string host,
2: i32 port
}
service RemoteInterpreterService {
void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties, 5: string userName);
@ -131,3 +136,7 @@ service RemoteInterpreterService {
void onReceivedZeppelinResource(1: string object);
}
service RemoteInterpreterCallbackService {
void callback(1: CallbackInfo callbackInfo);
}

View file

@ -42,7 +42,7 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStop() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer(
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
assertEquals(false, server.isRunning());
@ -90,8 +90,8 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer(
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
assertEquals(false, server.isRunning());
server.start();

View file

@ -28,6 +28,17 @@ public class RemoteInterpreterUtilsTest {
@Test
public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
String portRange = ":30000";
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) <= 30000);
portRange = "30000:";
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) >= 30000);
portRange = "30000:40000";
int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
assertTrue(port >= 30000 && port <= 40000);
}
}

View file

@ -232,7 +232,7 @@ public class GetUserList {
return userlist;
}
userquery = "select ? from ?";
userquery = String.format("SELECT %s FROM %s", username, tablename);
} catch (IllegalAccessException e) {
LOG.error("Error while accessing dataSource for JDBC Realm", e);
@ -242,8 +242,6 @@ public class GetUserList {
try {
con = dataSource.getConnection();
ps = con.prepareStatement(userquery);
ps.setString(1, username);
ps.setString(2, tablename);
rs = ps.executeQuery();
while (rs.next()) {
userlist.add(rs.getString(1).trim());

View file

@ -476,6 +476,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
}
}
public String getCallbackPortRange() {
return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
}
public boolean isWindowsPath(String path){
return path.matches("^[A-Za-z]:\\\\.*");
}
@ -684,7 +688,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
ZEPPELIN_HDFS_KEYTAB("zeppelin.hdfs.keytab", ""),
ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", "");
ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", ""),
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":");
private String varName;
@SuppressWarnings("rawtypes")

View file

@ -705,7 +705,8 @@ public class InterpreterSetting {
// create new remote process
remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
interpreterRunner != null ? interpreterRunner.getPath() :
conf.getInterpreterRemoteRunnerPath(), interpreterDir, localRepoPath,
conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
interpreterDir, localRepoPath,
getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
remoteInterpreterProcessListener, appEventListener, group);
}

View file

@ -17,10 +17,23 @@
package org.apache.zeppelin.interpreter.remote;
import org.apache.commons.exec.*;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.LogOutputStream;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,6 +42,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class manages start / stop of remote interpreter process
@ -37,11 +51,14 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
implements ExecuteResultHandler {
private static final Logger logger = LoggerFactory.getLogger(
RemoteInterpreterManagedProcess.class);
private final String interpreterRunner;
private final String interpreterRunner;
private final String portRange;
private DefaultExecutor executor;
private ExecuteWatchdog watchdog;
boolean running = false;
private AtomicBoolean running = new AtomicBoolean(false);
TServer callbackServer;
private String host = null;
private int port = -1;
private final String interpreterDir;
private final String localRepoDir;
@ -51,6 +68,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
public RemoteInterpreterManagedProcess(
String intpRunner,
String portRange,
String intpDir,
String localRepoDir,
Map<String, String> env,
@ -61,6 +79,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
super(new RemoteInterpreterEventPoller(listener, appListener),
connectTimeout);
this.interpreterRunner = intpRunner;
this.portRange = portRange;
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
@ -77,6 +96,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
super(remoteInterpreterEventPoller,
connectTimeout);
this.interpreterRunner = intpRunner;
this.portRange = ":";
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
@ -96,18 +116,69 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
@Override
public void start(String userName, Boolean isUserImpersonate) {
// start server process
final String callbackHost;
final int callbackPort;
try {
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
logger.info("Choose port {} for RemoteInterpreterProcess", port);
callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e1) {
throw new InterpreterException(e1);
}
logger.info("Thrift server for callback will start. Port: {}", callbackPort);
try {
callbackServer = new TThreadPoolServer(
new TThreadPoolServer.Args(new TServerSocket(callbackPort)).processor(
new RemoteInterpreterCallbackService.Processor<>(
new RemoteInterpreterCallbackService.Iface() {
@Override
public void callback(CallbackInfo callbackInfo) throws TException {
logger.info("Registered: {}", callbackInfo);
host = callbackInfo.getHost();
port = callbackInfo.getPort();
running.set(true);
synchronized (running) {
running.notify();
}
}
})));
// Start thrift server to receive callbackInfo from RemoteInterpreterServer;
new Thread(new Runnable() {
@Override
public void run() {
callbackServer.serve();
}
}).start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
if (callbackServer.isServing()) {
callbackServer.stop();
}
}
}));
while (!callbackServer.isServing()) {
logger.debug("callbackServer is not serving");
Thread.sleep(500);
}
logger.debug("callbackServer is serving now");
} catch (TTransportException e) {
logger.error("callback server error.", e);
} catch (InterruptedException e) {
logger.warn("", e);
}
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
cmdLine.addArgument(interpreterDir, false);
cmdLine.addArgument("-c", false);
cmdLine.addArgument(callbackHost, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(port), false);
cmdLine.addArgument(Integer.toString(callbackPort), false);
if (isUserImpersonate && !userName.equals("anonymous")) {
cmdLine.addArgument("-u", false);
cmdLine.addArgument(userName, false);
@ -133,45 +204,31 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
logger.info("Run interpreter process {}", cmdLine);
executor.execute(cmdLine, procEnv, this);
running = true;
} catch (IOException e) {
running = false;
running.set(false);
throw new InterpreterException(e);
}
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
if (!running) {
try {
cmdOut.flush();
} catch (IOException e) {
// nothing to do
}
throw new InterpreterException(new String(cmdOut.toByteArray()));
}
try {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
break;
} else {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
"Thread.sleep", e);
}
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Remote interpreter not yet accessible at localhost:" + port);
try {
synchronized (running) {
if (!running.get()) {
running.wait(getConnectTimeout() * 2);
}
}
if (!running.get()) {
callbackServer.stop();
throw new InterpreterException("Cannot run interpreter");
}
} catch (InterruptedException e) {
logger.error("Remote interpreter is not accessible");
}
processOutput.setOutputStream(null);
}
public void stop() {
if (callbackServer.isServing()) {
callbackServer.stop();
}
if (isRunning()) {
logger.info("kill interpreter process");
try {
@ -190,25 +247,25 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
executor = null;
watchdog = null;
running = false;
running.set(false);
logger.info("Remote process terminated");
}
@Override
public void onProcessComplete(int exitValue) {
logger.info("Interpreter process exited {}", exitValue);
running = false;
running.set(false);
}
@Override
public void onProcessFailed(ExecuteException e) {
logger.info("Interpreter process failed {}", e);
running = false;
running.set(false);
}
public boolean isRunning() {
return running;
return running.get();
}
private static class ProcessLogOutputStream extends LogOutputStream {

View file

@ -357,16 +357,16 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
config.put("cron", "* * * * * ?");
note.setConfig(config);
notebook.refreshCron(note.getId());
Thread.sleep(1 * 1000);
Thread.sleep(2 * 1000);
// remove cron scheduler.
config.put("cron", null);
note.setConfig(config);
notebook.refreshCron(note.getId());
Thread.sleep(1000);
Thread.sleep(2 * 1000);
dateFinished = p.getDateFinished();
assertNotNull(dateFinished);
Thread.sleep(1 * 1000);
Thread.sleep(2 * 1000);
assertEquals(dateFinished, p.getDateFinished());
notebook.removeNote(note.getId(), anonymous);
}