ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service

This commit is contained in:
Jeff Zhang 2017-11-08 12:08:55 +08:00
parent d925e052ad
commit 7e885bd388
9 changed files with 73 additions and 50 deletions

View file

@ -20,10 +20,10 @@ bin=$(dirname "${BASH_SOURCE-$0}")
bin=$(cd "${bin}">/dev/null; pwd)
function usage() {
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
echo "usage) $0 -p <port> -r <intp_port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
}
while getopts "hc:p:d:l:v:u:g:" o; do
while getopts "hc:p:r:d:l:v:u:g:" o; do
case ${o} in
h)
usage
@ -36,7 +36,10 @@ while getopts "hc:p:d:l:v:u:g:" o; do
CALLBACK_HOST=${OPTARG} # This will be used callback host
;;
p)
PORT=${OPTARG} # This will be used callback port
PORT=${OPTARG} # This will be used for callback port
;;
r)
INTP_PORT=${OPTARG} # This will be used for interpreter process port
;;
l)
LOCAL_INTERPRETER_REPO=${OPTARG}
@ -206,12 +209,12 @@ fi
if [[ -n "${SPARK_SUBMIT}" ]]; then
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
else
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
fi
else
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} `
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
fi
if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then

View file

@ -39,7 +39,7 @@ public class ZeppelinDevServer extends
private DevInterpreter interpreter = null;
private InterpreterOutput out;
public ZeppelinDevServer(int port) throws TException, IOException {
super(null, port);
super(null, port, "");
}
@Override

View file

@ -488,6 +488,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
}
public String getInterpreterPortRange() {
return getString(ConfVars.ZEPPELIN_INTERPRETER_PORTRANGE);
}
public boolean isWindowsPath(String path){
return path.matches("^[A-Za-z]:\\\\.*");
}
@ -705,6 +709,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
ZEPPELIN_INTERPRETER_PORTRANGE("zeppelin.interpreter.portRange", ":"),
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager"),

View file

@ -132,19 +132,19 @@ public class RemoteInterpreterServer
private boolean isTest;
public RemoteInterpreterServer(String callbackHost, int port) throws IOException,
TTransportException {
this(callbackHost, port, false);
public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange)
throws IOException, TTransportException {
this(callbackHost, callbackPort, portRange, false);
}
public RemoteInterpreterServer(String callbackHost, int port, boolean isTest)
throws TTransportException, IOException {
public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange,
boolean isTest) throws TTransportException, IOException {
if (null != callbackHost) {
this.callbackHost = callbackHost;
this.callbackPort = port;
this.callbackPort = callbackPort;
} else {
// DevInterpreter
this.port = port;
this.port = callbackPort;
}
this.isTest = isTest;
@ -152,14 +152,16 @@ public class RemoteInterpreterServer
TServerSocket serverTransport;
if (null == callbackHost) {
// Dev Interpreter
serverTransport = new TServerSocket(port);
serverTransport = new TServerSocket(callbackPort);
} else {
this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange);
this.port = serverTransport.getServerSocket().getLocalPort();
this.host = RemoteInterpreterUtils.findAvailableHostAddress();
serverTransport = new TServerSocket(this.port);
logger.info("Launching ThriftServer at " + this.host + ":" + this.port);
}
server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport).processor(processor));
logger.info("Starting remote interpreter server on port {}", port);
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
remoteWorksController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
}
@ -254,12 +256,16 @@ public class RemoteInterpreterServer
throws TTransportException, InterruptedException, IOException {
String callbackHost = null;
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
String portRange = ":";
if (args.length > 0) {
callbackHost = args[0];
port = Integer.parseInt(args[1]);
if (args.length > 2) {
portRange = args[2];
}
}
RemoteInterpreterServer remoteInterpreterServer =
new RemoteInterpreterServer(callbackHost, port);
new RemoteInterpreterServer(callbackHost, port, portRange);
remoteInterpreterServer.start();
remoteInterpreterServer.join();
System.exit(0);

View file

@ -32,8 +32,10 @@ import java.util.Collections;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,7 +50,12 @@ public class RemoteInterpreterUtils {
public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
return findRandomAvailablePortOnAllLocalInterfaces(":");
int port;
try (ServerSocket socket = new ServerSocket(0);) {
port = socket.getLocalPort();
socket.close();
}
return port;
}
/**
@ -58,21 +65,22 @@ public class RemoteInterpreterUtils {
* @return
* @throws IOException
*/
public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange)
public static TServerSocket createTServerSocket(String portRange)
throws IOException {
TServerSocket tSocket = null;
// ':' is the default value which means no constraints on the portRange
if (portRange == null || portRange.equals(":")) {
int port;
try (ServerSocket socket = new ServerSocket(0);) {
port = socket.getLocalPort();
socket.close();
try {
tSocket = new TServerSocket(0);
return tSocket;
} catch (TTransportException e) {
throw new IOException("Fail to create TServerSocket", e);
}
return port;
}
// valid user registered port https://en.wikipedia.org/wiki/Registered_port
int start = 1024;
int end = 49151;
int end = 65535;
String[] ports = portRange.split(":", -1);
if (!ports[0].isEmpty()) {
start = Integer.parseInt(ports[0]);
@ -82,8 +90,8 @@ public class RemoteInterpreterUtils {
}
for (int i = start; i <= end; ++i) {
try {
ServerSocket socket = new ServerSocket(i);
return socket.getLocalPort();
tSocket = new TServerSocket(i);
return tSocket;
} catch (Exception e) {
// ignore this
}

View file

@ -43,7 +43,7 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStop() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
assertEquals(false, server.isRunning());
server.start();
@ -91,7 +91,7 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
assertEquals(false, server.isRunning());
server.start();

View file

@ -26,17 +26,17 @@ import static org.junit.Assert.assertTrue;
public class RemoteInterpreterUtilsTest {
@Test
public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
public void testCreateTServerSocket() throws IOException {
assertTrue(RemoteInterpreterUtils.createTServerSocket(":").getServerSocket().getLocalPort() > 0);
String portRange = ":30000";
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) <= 30000);
assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort() <= 30000);
portRange = "30000:";
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) >= 30000);
assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort() >= 30000);
portRange = "30000:40000";
int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
int port = RemoteInterpreterUtils.createTServerSocket(portRange).getServerSocket().getLocalPort();
assertTrue(port >= 30000 && port <= 40000);
}

View file

@ -64,7 +64,7 @@ public class ShellScriptLauncher extends InterpreterLauncher {
+ context.getInterpreterGroupId();
return new RemoteInterpreterManagedProcess(
runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
zConf.getCallbackPortRange(),
zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
buildEnvFromProperties(), connectTimeout, groupName);
}

View file

@ -31,8 +31,6 @@ import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
@ -54,11 +52,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
RemoteInterpreterManagedProcess.class);
private final String interpreterRunner;
private final String portRange;
private final String callbackPortRange;
private final String interpreterPortRange;
private DefaultExecutor executor;
private ExecuteWatchdog watchdog;
private AtomicBoolean running = new AtomicBoolean(false);
TServer callbackServer;
private TServer callbackServer;
private String host = null;
private int port = -1;
private final String interpreterDir;
@ -69,7 +68,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
public RemoteInterpreterManagedProcess(
String intpRunner,
String portRange,
String callbackPortRange,
String interpreterPortRange,
String intpDir,
String localRepoDir,
Map<String, String> env,
@ -77,7 +77,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
String interpreterGroupName) {
super(connectTimeout);
this.interpreterRunner = intpRunner;
this.portRange = portRange;
this.callbackPortRange = callbackPortRange;
this.interpreterPortRange = interpreterPortRange;
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
@ -86,7 +87,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
@Override
public String getHost() {
return "localhost";
return host;
}
@Override
@ -99,11 +100,11 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
// start server process
final String callbackHost;
final int callbackPort;
TServerSocket tSocket = null;
try {
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
logger.info("Choose port {} for RemoteInterpreterProcess", port);
tSocket = RemoteInterpreterUtils.createTServerSocket(callbackPortRange);
callbackPort = tSocket.getServerSocket().getLocalPort();
callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e1) {
throw new RuntimeException(e1);
}
@ -111,12 +112,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
logger.info("Thrift server for callback will start. Port: {}", callbackPort);
try {
callbackServer = new TThreadPoolServer(
new TThreadPoolServer.Args(new TServerSocket(callbackPort)).processor(
new TThreadPoolServer.Args(tSocket).processor(
new RemoteInterpreterCallbackService.Processor<>(
new RemoteInterpreterCallbackService.Iface() {
@Override
public void callback(CallbackInfo callbackInfo) throws TException {
logger.info("Registered: {}", callbackInfo);
logger.info("RemoteInterpreterServer Registered: {}", callbackInfo);
host = callbackInfo.getHost();
port = callbackInfo.getPort();
running.set(true);
@ -147,8 +148,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
Thread.sleep(500);
}
logger.debug("callbackServer is serving now");
} catch (TTransportException e) {
logger.error("callback server error.", e);
} catch (InterruptedException e) {
logger.warn("", e);
}
@ -160,6 +159,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
cmdLine.addArgument(callbackHost, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(callbackPort), false);
cmdLine.addArgument("-r", false);
cmdLine.addArgument(interpreterPortRange, false);
if (isUserImpersonate && !userName.equals("anonymous")) {
cmdLine.addArgument("-u", false);
cmdLine.addArgument(userName, false);