mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service
This commit is contained in:
parent
d925e052ad
commit
7e885bd388
9 changed files with 73 additions and 50 deletions
|
|
@ -20,10 +20,10 @@ bin=$(dirname "${BASH_SOURCE-$0}")
|
|||
bin=$(cd "${bin}">/dev/null; pwd)
|
||||
|
||||
function usage() {
|
||||
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
|
||||
echo "usage) $0 -p <port> -r <intp_port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
|
||||
}
|
||||
|
||||
while getopts "hc:p:d:l:v:u:g:" o; do
|
||||
while getopts "hc:p:r:d:l:v:u:g:" o; do
|
||||
case ${o} in
|
||||
h)
|
||||
usage
|
||||
|
|
@ -36,7 +36,10 @@ while getopts "hc:p:d:l:v:u:g:" o; do
|
|||
CALLBACK_HOST=${OPTARG} # This will be used callback host
|
||||
;;
|
||||
p)
|
||||
PORT=${OPTARG} # This will be used callback port
|
||||
PORT=${OPTARG} # This will be used for callback port
|
||||
;;
|
||||
r)
|
||||
INTP_PORT=${OPTARG} # This will be used for interpreter process port
|
||||
;;
|
||||
l)
|
||||
LOCAL_INTERPRETER_REPO=${OPTARG}
|
||||
|
|
@ -206,12 +209,12 @@ fi
|
|||
|
||||
if [[ -n "${SPARK_SUBMIT}" ]]; then
|
||||
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
|
||||
else
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
|
||||
fi
|
||||
else
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} `
|
||||
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
|
||||
fi
|
||||
|
||||
if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ public class ZeppelinDevServer extends
|
|||
private DevInterpreter interpreter = null;
|
||||
private InterpreterOutput out;
|
||||
public ZeppelinDevServer(int port) throws TException, IOException {
|
||||
super(null, port);
|
||||
super(null, port, "");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -488,6 +488,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
|
||||
}
|
||||
|
||||
public String getInterpreterPortRange() {
|
||||
return getString(ConfVars.ZEPPELIN_INTERPRETER_PORTRANGE);
|
||||
}
|
||||
|
||||
public boolean isWindowsPath(String path){
|
||||
return path.matches("^[A-Za-z]:\\\\.*");
|
||||
}
|
||||
|
|
@ -705,6 +709,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
|
||||
|
||||
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
|
||||
ZEPPELIN_INTERPRETER_PORTRANGE("zeppelin.interpreter.portRange", ":"),
|
||||
|
||||
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
|
||||
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager"),
|
||||
|
|
|
|||
|
|
@ -132,19 +132,19 @@ public class RemoteInterpreterServer
|
|||
|
||||
private boolean isTest;
|
||||
|
||||
public RemoteInterpreterServer(String callbackHost, int port) throws IOException,
|
||||
TTransportException {
|
||||
this(callbackHost, port, false);
|
||||
public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange)
|
||||
throws IOException, TTransportException {
|
||||
this(callbackHost, callbackPort, portRange, false);
|
||||
}
|
||||
|
||||
public RemoteInterpreterServer(String callbackHost, int port, boolean isTest)
|
||||
throws TTransportException, IOException {
|
||||
public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange,
|
||||
boolean isTest) throws TTransportException, IOException {
|
||||
if (null != callbackHost) {
|
||||
this.callbackHost = callbackHost;
|
||||
this.callbackPort = port;
|
||||
this.callbackPort = callbackPort;
|
||||
} else {
|
||||
// DevInterpreter
|
||||
this.port = port;
|
||||
this.port = callbackPort;
|
||||
}
|
||||
this.isTest = isTest;
|
||||
|
||||
|
|
@ -152,14 +152,16 @@ public class RemoteInterpreterServer
|
|||
TServerSocket serverTransport;
|
||||
if (null == callbackHost) {
|
||||
// Dev Interpreter
|
||||
serverTransport = new TServerSocket(port);
|
||||
serverTransport = new TServerSocket(callbackPort);
|
||||
} else {
|
||||
this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange);
|
||||
this.port = serverTransport.getServerSocket().getLocalPort();
|
||||
this.host = RemoteInterpreterUtils.findAvailableHostAddress();
|
||||
serverTransport = new TServerSocket(this.port);
|
||||
logger.info("Launching ThriftServer at " + this.host + ":" + this.port);
|
||||
}
|
||||
server = new TThreadPoolServer(
|
||||
new TThreadPoolServer.Args(serverTransport).processor(processor));
|
||||
logger.info("Starting remote interpreter server on port {}", port);
|
||||
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
|
||||
remoteWorksController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
|
||||
}
|
||||
|
|
@ -254,12 +256,16 @@ public class RemoteInterpreterServer
|
|||
throws TTransportException, InterruptedException, IOException {
|
||||
String callbackHost = null;
|
||||
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
|
||||
String portRange = ":";
|
||||
if (args.length > 0) {
|
||||
callbackHost = args[0];
|
||||
port = Integer.parseInt(args[1]);
|
||||
if (args.length > 2) {
|
||||
portRange = args[2];
|
||||
}
|
||||
}
|
||||
RemoteInterpreterServer remoteInterpreterServer =
|
||||
new RemoteInterpreterServer(callbackHost, port);
|
||||
new RemoteInterpreterServer(callbackHost, port, portRange);
|
||||
remoteInterpreterServer.start();
|
||||
remoteInterpreterServer.join();
|
||||
System.exit(0);
|
||||
|
|
|
|||
|
|
@ -32,8 +32,10 @@ import java.util.Collections;
|
|||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -48,7 +50,12 @@ public class RemoteInterpreterUtils {
|
|||
|
||||
|
||||
public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
|
||||
return findRandomAvailablePortOnAllLocalInterfaces(":");
|
||||
int port;
|
||||
try (ServerSocket socket = new ServerSocket(0);) {
|
||||
port = socket.getLocalPort();
|
||||
socket.close();
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -58,21 +65,22 @@ public class RemoteInterpreterUtils {
|
|||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange)
|
||||
public static TServerSocket createTServerSocket(String portRange)
|
||||
throws IOException {
|
||||
|
||||
TServerSocket tSocket = null;
|
||||
// ':' is the default value which means no constraints on the portRange
|
||||
if (portRange == null || portRange.equals(":")) {
|
||||
int port;
|
||||
try (ServerSocket socket = new ServerSocket(0);) {
|
||||
port = socket.getLocalPort();
|
||||
socket.close();
|
||||
try {
|
||||
tSocket = new TServerSocket(0);
|
||||
return tSocket;
|
||||
} catch (TTransportException e) {
|
||||
throw new IOException("Fail to create TServerSocket", e);
|
||||
}
|
||||
return port;
|
||||
}
|
||||
// valid user registered port https://en.wikipedia.org/wiki/Registered_port
|
||||
int start = 1024;
|
||||
int end = 49151;
|
||||
int end = 65535;
|
||||
String[] ports = portRange.split(":", -1);
|
||||
if (!ports[0].isEmpty()) {
|
||||
start = Integer.parseInt(ports[0]);
|
||||
|
|
@ -82,8 +90,8 @@ public class RemoteInterpreterUtils {
|
|||
}
|
||||
for (int i = start; i <= end; ++i) {
|
||||
try {
|
||||
ServerSocket socket = new ServerSocket(i);
|
||||
return socket.getLocalPort();
|
||||
tSocket = new TServerSocket(i);
|
||||
return tSocket;
|
||||
} catch (Exception e) {
|
||||
// ignore this
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class RemoteInterpreterServerTest {
|
|||
@Test
|
||||
public void testStartStop() throws InterruptedException, IOException, TException {
|
||||
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
|
||||
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
|
||||
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
|
||||
assertEquals(false, server.isRunning());
|
||||
|
||||
server.start();
|
||||
|
|
@ -91,7 +91,7 @@ public class RemoteInterpreterServerTest {
|
|||
@Test
|
||||
public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
|
||||
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
|
||||
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
|
||||
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
|
||||
assertEquals(false, server.isRunning());
|
||||
|
||||
server.start();
|
||||
|
|
|
|||
|
|
@ -26,17 +26,17 @@ import static org.junit.Assert.assertTrue;
|
|||
public class RemoteInterpreterUtilsTest {
|
||||
|
||||
@Test
|
||||
public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
|
||||
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
|
||||
public void testCreateTServerSocket() throws IOException {
|
||||
assertTrue(RemoteInterpreterUtils.createTServerSocket(":").getServerSocket().getLocalPort() > 0);
|
||||
|
||||
String portRange = ":30000";
|
||||
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) <= 30000);
|
||||
assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort() <= 30000);
|
||||
|
||||
portRange = "30000:";
|
||||
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) >= 30000);
|
||||
assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort() >= 30000);
|
||||
|
||||
portRange = "30000:40000";
|
||||
int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
|
||||
int port = RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort();
|
||||
assertTrue(port >= 30000 && port <= 40000);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ public class ShellScriptLauncher extends InterpreterLauncher {
|
|||
+ context.getInterpreterGroupId();
|
||||
return new RemoteInterpreterManagedProcess(
|
||||
runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
|
||||
zConf.getCallbackPortRange(),
|
||||
zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
|
||||
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
|
||||
buildEnvFromProperties(), connectTimeout, groupName);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,8 +31,6 @@ import org.apache.thrift.server.TServer;
|
|||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
|
|
@ -54,11 +52,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
RemoteInterpreterManagedProcess.class);
|
||||
|
||||
private final String interpreterRunner;
|
||||
private final String portRange;
|
||||
private final String callbackPortRange;
|
||||
private final String interpreterPortRange;
|
||||
private DefaultExecutor executor;
|
||||
private ExecuteWatchdog watchdog;
|
||||
private AtomicBoolean running = new AtomicBoolean(false);
|
||||
TServer callbackServer;
|
||||
private TServer callbackServer;
|
||||
private String host = null;
|
||||
private int port = -1;
|
||||
private final String interpreterDir;
|
||||
|
|
@ -69,7 +68,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
|
||||
public RemoteInterpreterManagedProcess(
|
||||
String intpRunner,
|
||||
String portRange,
|
||||
String callbackPortRange,
|
||||
String interpreterPortRange,
|
||||
String intpDir,
|
||||
String localRepoDir,
|
||||
Map<String, String> env,
|
||||
|
|
@ -77,7 +77,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
String interpreterGroupName) {
|
||||
super(connectTimeout);
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.portRange = portRange;
|
||||
this.callbackPortRange = callbackPortRange;
|
||||
this.interpreterPortRange = interpreterPortRange;
|
||||
this.env = env;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
|
|
@ -86,7 +87,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
|
||||
@Override
|
||||
public String getHost() {
|
||||
return "localhost";
|
||||
return host;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -99,11 +100,11 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
// start server process
|
||||
final String callbackHost;
|
||||
final int callbackPort;
|
||||
TServerSocket tSocket = null;
|
||||
try {
|
||||
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
|
||||
logger.info("Choose port {} for RemoteInterpreterProcess", port);
|
||||
tSocket = RemoteInterpreterUtils.createTServerSocket(callbackPortRange);
|
||||
callbackPort = tSocket.getServerSocket().getLocalPort();
|
||||
callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
|
||||
callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
} catch (IOException e1) {
|
||||
throw new RuntimeException(e1);
|
||||
}
|
||||
|
|
@ -111,12 +112,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
logger.info("Thrift server for callback will start. Port: {}", callbackPort);
|
||||
try {
|
||||
callbackServer = new TThreadPoolServer(
|
||||
new TThreadPoolServer.Args(new TServerSocket(callbackPort)).processor(
|
||||
new TThreadPoolServer.Args(tSocket).processor(
|
||||
new RemoteInterpreterCallbackService.Processor<>(
|
||||
new RemoteInterpreterCallbackService.Iface() {
|
||||
@Override
|
||||
public void callback(CallbackInfo callbackInfo) throws TException {
|
||||
logger.info("Registered: {}", callbackInfo);
|
||||
logger.info("RemoteInterpreterServer Registered: {}", callbackInfo);
|
||||
host = callbackInfo.getHost();
|
||||
port = callbackInfo.getPort();
|
||||
running.set(true);
|
||||
|
|
@ -147,8 +148,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
Thread.sleep(500);
|
||||
}
|
||||
logger.debug("callbackServer is serving now");
|
||||
} catch (TTransportException e) {
|
||||
logger.error("callback server error.", e);
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("", e);
|
||||
}
|
||||
|
|
@ -160,6 +159,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
cmdLine.addArgument(callbackHost, false);
|
||||
cmdLine.addArgument("-p", false);
|
||||
cmdLine.addArgument(Integer.toString(callbackPort), false);
|
||||
cmdLine.addArgument("-r", false);
|
||||
cmdLine.addArgument(interpreterPortRange, false);
|
||||
if (isUserImpersonate && !userName.equals("anonymous")) {
|
||||
cmdLine.addArgument("-u", false);
|
||||
cmdLine.addArgument(userName, false);
|
||||
|
|
|
|||
Loading…
Reference in a new issue