mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
[ZEPPELIN-4066]. Introduce ProcessLauncher to encapsulate process launch
This commit is contained in:
parent
3655c12b87
commit
97049bd55e
19 changed files with 788 additions and 388 deletions
|
|
@ -17,14 +17,9 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteResultHandler;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.LogOutputStream;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
|
@ -48,6 +43,7 @@ import org.apache.zeppelin.python.proto.IPythonStatus;
|
|||
import org.apache.zeppelin.python.proto.StatusRequest;
|
||||
import org.apache.zeppelin.python.proto.StatusResponse;
|
||||
import org.apache.zeppelin.python.proto.StopRequest;
|
||||
import org.apache.zeppelin.util.ProcessLauncher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import py4j.GatewayServer;
|
||||
|
|
@ -66,23 +62,22 @@ import java.util.Properties;
|
|||
/**
|
||||
* IPython Interpreter for Zeppelin
|
||||
*/
|
||||
public class IPythonInterpreter extends Interpreter implements ExecuteResultHandler {
|
||||
public class IPythonInterpreter extends Interpreter {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreter.class);
|
||||
|
||||
private ExecuteWatchdog watchDog;
|
||||
private IPythonProcessLauncher iPythonProcessLauncher;
|
||||
private IPythonClient ipythonClient;
|
||||
private GatewayServer gatewayServer;
|
||||
|
||||
protected BaseZeppelinContext zeppelinContext;
|
||||
private String pythonExecutable;
|
||||
private long ipythonLaunchTimeout;
|
||||
private int ipythonLaunchTimeout;
|
||||
private String additionalPythonPath;
|
||||
private String additionalPythonInitFile;
|
||||
private boolean useBuiltinPy4j = true;
|
||||
private boolean usePy4JAuth = true;
|
||||
private String secret;
|
||||
private volatile boolean pythonProcessRunning = false;
|
||||
|
||||
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
|
||||
|
||||
|
|
@ -135,7 +130,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
throw new InterpreterException("IPython prerequisite is not meet: " +
|
||||
checkPrerequisiteResult);
|
||||
}
|
||||
ipythonLaunchTimeout = Long.parseLong(
|
||||
ipythonLaunchTimeout = Integer.parseInt(
|
||||
getProperty("zeppelin.ipython.launch.timeout", "30000"));
|
||||
this.zeppelinContext = buildZeppelinContext();
|
||||
int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
|
|
@ -149,7 +144,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
launchIPythonKernel(ipythonPort);
|
||||
setupJVMGateway(jvmGatewayPort);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Fail to open IPythonInterpreter", e);
|
||||
throw new InterpreterException("Fail to open IPythonInterpreter", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -253,7 +248,6 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private void launchIPythonKernel(int ipythonPort)
|
||||
throws IOException {
|
||||
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
|
||||
|
|
@ -269,11 +263,6 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
CommandLine cmd = CommandLine.parse(pythonExecutable);
|
||||
cmd.addArgument(pythonWorkDir.getAbsolutePath() + "/ipython_server.py");
|
||||
cmd.addArgument(ipythonPort + "");
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
ProcessLogOutputStream processOutput = new ProcessLogOutputStream(LOGGER);
|
||||
executor.setStreamHandler(new PumpStreamHandler(processOutput));
|
||||
watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
|
||||
executor.setWatchdog(watchDog);
|
||||
|
||||
if (useBuiltinPy4j) {
|
||||
//TODO(zjffdu) don't do hard code on py4j here
|
||||
|
|
@ -290,38 +279,17 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
}
|
||||
|
||||
Map<String, String> envs = setupIPythonEnv();
|
||||
executor.execute(cmd, envs, this);
|
||||
iPythonProcessLauncher = new IPythonProcessLauncher(cmd, envs);
|
||||
iPythonProcessLauncher.launch();
|
||||
iPythonProcessLauncher.waitForReady(ipythonLaunchTimeout);
|
||||
|
||||
// wait until IPython kernel is started or timeout
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (!pythonProcessRunning) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("Interrupted by something", e);
|
||||
}
|
||||
|
||||
try {
|
||||
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
|
||||
if (response.getStatus() == IPythonStatus.RUNNING) {
|
||||
LOGGER.info("IPython Kernel is Running");
|
||||
pythonProcessRunning = true;
|
||||
break;
|
||||
} else {
|
||||
LOGGER.info("Wait for IPython Kernel to be started");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// ignore the exception, because is may happen when grpc server has not started yet.
|
||||
LOGGER.info("Wait for IPython Kernel to be started");
|
||||
}
|
||||
|
||||
if ((System.currentTimeMillis() - startTime) > ipythonLaunchTimeout) {
|
||||
throw new IOException("Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000
|
||||
+ " seconds");
|
||||
}
|
||||
if (iPythonProcessLauncher.isLaunchTimeout()) {
|
||||
throw new IOException("Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000
|
||||
+ " seconds.\n" + iPythonProcessLauncher.getErrorMessage());
|
||||
}
|
||||
if (!pythonProcessRunning) {
|
||||
throw new IOException("Fail to launch IPython Kernel as the python process is failed");
|
||||
if (!iPythonProcessLauncher.isRunning()) {
|
||||
throw new IOException("Fail to launch IPython Kernel as the python process is failed.\n"
|
||||
+ iPythonProcessLauncher.getErrorMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -341,23 +309,31 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
return envs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws InterpreterException {
|
||||
if (watchDog != null) {
|
||||
LOGGER.info("Kill IPython Process");
|
||||
ipythonClient.stop(StopRequest.newBuilder().build());
|
||||
try {
|
||||
ipythonClient.shutdown();
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn("Fail to shutdown IPythonClient");
|
||||
}
|
||||
watchDog.destroyProcess();
|
||||
gatewayServer.shutdown();
|
||||
}
|
||||
@VisibleForTesting
|
||||
public IPythonProcessLauncher getIPythonProcessLauncher() {
|
||||
return iPythonProcessLauncher;
|
||||
}
|
||||
|
||||
public ExecuteWatchdog getWatchDog() {
|
||||
return watchDog;
|
||||
@Override
|
||||
public void close() throws InterpreterException {
|
||||
if (iPythonProcessLauncher != null) {
|
||||
LOGGER.info("Kill IPython Process");
|
||||
if (iPythonProcessLauncher.isRunning()) {
|
||||
ipythonClient.stop(StopRequest.newBuilder().build());
|
||||
try {
|
||||
ipythonClient.shutdown();
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn("Exception happens when shutting down ipythonClient", e);
|
||||
}
|
||||
}
|
||||
iPythonProcessLauncher.stop();
|
||||
iPythonProcessLauncher = null;
|
||||
}
|
||||
if (gatewayServer != null) {
|
||||
LOGGER.info("Shutdown Py4j GatewayServer");
|
||||
gatewayServer.shutdown();
|
||||
gatewayServer = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -376,14 +352,14 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
// or onProcessFailed) when ipython kernel process is exited. Because they are in
|
||||
// 2 different threads. So here we would check ipythonClient's status and sleep 1 second
|
||||
// if ipython kernel is maybe terminated.
|
||||
if (pythonProcessRunning && !ipythonClient.isMaybeIPythonFailed()) {
|
||||
if (iPythonProcessLauncher.isRunning() && !ipythonClient.isMaybeIPythonFailed()) {
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.valueOf(response.getStatus().name()));
|
||||
} else {
|
||||
if (ipythonClient.isMaybeIPythonFailed()) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
if (pythonProcessRunning) {
|
||||
if (iPythonProcessLauncher.isRunning()) {
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.valueOf(response.getStatus().name()));
|
||||
} else {
|
||||
|
|
@ -435,29 +411,43 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
return zeppelinContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
LOGGER.warn("Python Process is completed with exitValue: " + exitValue);
|
||||
pythonProcessRunning = false;
|
||||
}
|
||||
class IPythonProcessLauncher extends ProcessLauncher {
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
LOGGER.warn("Exception happens in Python Process", e);
|
||||
pythonProcessRunning = false;
|
||||
}
|
||||
|
||||
static class ProcessLogOutputStream extends LogOutputStream {
|
||||
|
||||
private Logger logger;
|
||||
|
||||
ProcessLogOutputStream(Logger logger) {
|
||||
this.logger = logger;
|
||||
IPythonProcessLauncher(CommandLine commandLine,
|
||||
Map<String, String> envs) {
|
||||
super(commandLine, envs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processLine(String s, int i) {
|
||||
this.logger.debug("Process Output: " + s);
|
||||
public void waitForReady(int timeout) {
|
||||
// wait until IPython kernel is started or timeout
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (state == State.LAUNCHED) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("Interrupted by something", e);
|
||||
}
|
||||
|
||||
try {
|
||||
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
|
||||
if (response.getStatus() == IPythonStatus.RUNNING) {
|
||||
LOGGER.info("IPython Kernel is Running");
|
||||
onProcessRunning();
|
||||
break;
|
||||
} else {
|
||||
LOGGER.info("Wait for IPython Kernel to be started");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// ignore the exception, because is may happen when grpc server has not started yet.
|
||||
LOGGER.info("Wait for IPython Kernel to be started");
|
||||
}
|
||||
|
||||
if ((System.currentTimeMillis() - startTime) > timeout) {
|
||||
onTimeout();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,11 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.io.Files;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteResultHandler;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
|
@ -37,11 +33,11 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
|
|||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
|
||||
import org.apache.zeppelin.interpreter.InvalidHookException;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.util.ProcessLauncher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import py4j.GatewayServer;
|
||||
|
|
@ -53,30 +49,28 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Interpreter for Python, it is the first implementation of interpreter for Python, so with less
|
||||
* features compared to IPythonInterpreter, but requires less prerequisites than
|
||||
* IPythonInterpreter, only python installation is required.
|
||||
*/
|
||||
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
|
||||
public class PythonInterpreter extends Interpreter {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreter.class);
|
||||
private static final int MAX_TIMEOUT_SEC = 30;
|
||||
|
||||
private GatewayServer gatewayServer;
|
||||
private DefaultExecutor executor;
|
||||
private PythonProcessLauncher pythonProcessLauncher;
|
||||
private File pythonWorkDir;
|
||||
protected boolean useBuiltinPy4j = true;
|
||||
|
||||
// used to forward output from python process to InterpreterOutput
|
||||
private InterpreterOutputStream outputStream;
|
||||
private AtomicBoolean pythonScriptRunning = new AtomicBoolean(false);
|
||||
private AtomicBoolean pythonScriptInitialized = new AtomicBoolean(false);
|
||||
private long pythonPid = -1;
|
||||
private IPythonInterpreter iPythonInterpreter;
|
||||
private BaseZeppelinContext zeppelinContext;
|
||||
private String condaPythonExec; // set by PythonCondaInterpreter
|
||||
// set by PythonCondaInterpreter
|
||||
private String condaPythonExec;
|
||||
private boolean usePy4jAuth = false;
|
||||
|
||||
public PythonInterpreter(Properties property) {
|
||||
|
|
@ -146,24 +140,32 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
cmd.addArgument(serverAddress, false);
|
||||
cmd.addArgument(Integer.toString(port), false);
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
outputStream = new InterpreterOutputStream(LOGGER);
|
||||
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
|
||||
executor.setStreamHandler(streamHandler);
|
||||
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
|
||||
Map<String, String> env = setupPythonEnv();
|
||||
if (usePy4jAuth) {
|
||||
env.put("PY4J_GATEWAY_SECRET", secret);
|
||||
}
|
||||
LOGGER.info("Launching Python Process Command: " + cmd.getExecutable() +
|
||||
" " + StringUtils.join(cmd.getArguments(), " "));
|
||||
executor.execute(cmd, env, this);
|
||||
pythonScriptRunning.set(true);
|
||||
|
||||
pythonProcessLauncher = new PythonProcessLauncher(cmd, env);
|
||||
pythonProcessLauncher.launch();
|
||||
pythonProcessLauncher.waitForReady(MAX_TIMEOUT_SEC * 1000);
|
||||
|
||||
if (!pythonProcessLauncher.isRunning()) {
|
||||
if (pythonProcessLauncher.isLaunchTimeout()) {
|
||||
throw new IOException("Launch python process is time out.\n" +
|
||||
pythonProcessLauncher.getErrorMessage());
|
||||
} else {
|
||||
throw new IOException("Fail to launch python process.\n" +
|
||||
pythonProcessLauncher.getErrorMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public DefaultExecutor getPythonExecutor() {
|
||||
return this.executor;
|
||||
public PythonProcessLauncher getPythonProcessLauncher() {
|
||||
return pythonProcessLauncher;
|
||||
}
|
||||
|
||||
private void createPythonScript() throws IOException {
|
||||
|
|
@ -242,11 +244,15 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
iPythonInterpreter.close();
|
||||
return;
|
||||
}
|
||||
|
||||
pythonScriptRunning.set(false);
|
||||
pythonScriptInitialized.set(false);
|
||||
executor.getWatchdog().destroyProcess();
|
||||
gatewayServer.shutdown();
|
||||
if (pythonProcessLauncher != null) {
|
||||
if (pythonProcessLauncher.isRunning()) {
|
||||
LOGGER.info("Kill python process");
|
||||
pythonProcessLauncher.stop();
|
||||
}
|
||||
}
|
||||
if (gatewayServer != null) {
|
||||
gatewayServer.shutdown();
|
||||
}
|
||||
|
||||
// reset these 2 monitors otherwise when you restart PythonInterpreter it would fails to execute
|
||||
// python code as these 2 objects are in incorrect state.
|
||||
|
|
@ -325,10 +331,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
// called by Python Process
|
||||
public void onPythonScriptInitialized(long pid) {
|
||||
pythonPid = pid;
|
||||
synchronized (pythonScriptInitialized) {
|
||||
synchronized (pythonProcessLauncher) {
|
||||
LOGGER.debug("onPythonScriptInitialized is called");
|
||||
pythonScriptInitialized.set(true);
|
||||
pythonScriptInitialized.notifyAll();
|
||||
pythonProcessLauncher.initialized();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -352,7 +357,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
}
|
||||
|
||||
synchronized (statementFinishedNotifier) {
|
||||
while (statementOutput == null && pythonScriptRunning.get()) {
|
||||
while (statementOutput == null && pythonProcessLauncher.isRunning()) {
|
||||
try {
|
||||
statementFinishedNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
@ -369,41 +374,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
return iPythonInterpreter.interpret(st, context);
|
||||
}
|
||||
|
||||
if (!pythonScriptRunning.get()) {
|
||||
return new InterpreterResult(Code.ERROR, "python process not running "
|
||||
+ outputStream.toString());
|
||||
}
|
||||
|
||||
outputStream.setInterpreterOutput(context.out);
|
||||
|
||||
synchronized (pythonScriptInitialized) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (!pythonScriptInitialized.get() && pythonScriptRunning.get()
|
||||
&& System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
|
||||
try {
|
||||
LOGGER.info("Wait for PythonScript initialized");
|
||||
pythonScriptInitialized.wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<InterpreterResultMessage> errorMessage;
|
||||
try {
|
||||
context.out.flush();
|
||||
errorMessage = context.out.toInterpreterResultMessage();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
if (!pythonScriptInitialized.get()) {
|
||||
// timeout. didn't get initialized message
|
||||
errorMessage.add(new InterpreterResultMessage(
|
||||
InterpreterResult.Type.TEXT, "Failed to initialize Python"));
|
||||
return new InterpreterResult(Code.ERROR, errorMessage);
|
||||
}
|
||||
|
||||
BaseZeppelinContext z = getZeppelinContext();
|
||||
z.setInterpreterContext(context);
|
||||
z.setGui(context.getGui());
|
||||
|
|
@ -421,7 +392,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
if (pythonScriptRunning.get()) {
|
||||
if (pythonProcessLauncher.isRunning()) {
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
} else {
|
||||
return new InterpreterResult(Code.ERROR,
|
||||
|
|
@ -492,7 +463,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
synchronized (statementFinishedNotifier) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (statementOutput == null
|
||||
&& pythonScriptRunning.get()) {
|
||||
&& pythonProcessLauncher.isRunning()) {
|
||||
try {
|
||||
if (System.currentTimeMillis() - startTime > MAX_TIMEOUT_SEC * 1000) {
|
||||
LOGGER.error("Python completion didn't have response for {}sec.", MAX_TIMEOUT_SEC);
|
||||
|
|
@ -594,28 +565,57 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
LOGGER.info("python process terminated. exit code " + exitValue);
|
||||
pythonScriptRunning.set(false);
|
||||
pythonScriptInitialized.set(false);
|
||||
synchronized (statementFinishedNotifier) {
|
||||
statementFinishedNotifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
LOGGER.error("python process failed", e);
|
||||
pythonScriptRunning.set(false);
|
||||
pythonScriptInitialized.set(false);
|
||||
synchronized (statementFinishedNotifier) {
|
||||
statementFinishedNotifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
// Called by Python Process, used for debugging purpose
|
||||
public void logPythonOutput(String message) {
|
||||
LOGGER.debug("Python Process Output: " + message);
|
||||
}
|
||||
|
||||
class PythonProcessLauncher extends ProcessLauncher {
|
||||
|
||||
PythonProcessLauncher(CommandLine commandLine, Map<String, String> envs) {
|
||||
super(commandLine, envs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForReady(int timeout) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
synchronized (this) {
|
||||
while (state == State.LAUNCHED) {
|
||||
LOGGER.info("Waiting for python process initialized");
|
||||
try {
|
||||
wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if ((System.currentTimeMillis() - startTime) > timeout) {
|
||||
onTimeout();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void initialized() {
|
||||
synchronized (this) {
|
||||
this.state = State.RUNNING;
|
||||
notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
super.onProcessFailed(e);
|
||||
synchronized (statementFinishedNotifier) {
|
||||
statementFinishedNotifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
super.onProcessComplete(exitValue);
|
||||
synchronized (statementFinishedNotifier) {
|
||||
statementFinishedNotifier.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
||||
public class IPythonInterpreterTest extends BasePythonInterpreterTest {
|
||||
|
|
@ -305,7 +306,24 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
Thread.sleep(3000);
|
||||
IPythonInterpreter iPythonInterpreter = (IPythonInterpreter)
|
||||
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
|
||||
iPythonInterpreter.getWatchDog().destroyProcess();
|
||||
iPythonInterpreter.getIPythonProcessLauncher().stop();
|
||||
waiter.await(3000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIPythonFailToLaunch() throws InterpreterException {
|
||||
tearDown();
|
||||
|
||||
Properties properties = initIntpProperties();
|
||||
properties.setProperty("zeppelin.python", "invalid_python");
|
||||
|
||||
try {
|
||||
startInterpreter(properties);
|
||||
fail("Should not be able to start IPythonInterpreter");
|
||||
} catch (InterpreterException e) {
|
||||
String exceptionMsg = ExceptionUtils.getStackTrace(e);
|
||||
assertTrue(exceptionMsg, exceptionMsg.contains("No such file or directory"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
||||
public class PythonInterpreterTest extends BasePythonInterpreterTest {
|
||||
|
|
@ -97,7 +98,7 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
//@Test
|
||||
public void testCancelIntp() throws InterruptedException, InterpreterException {
|
||||
assertEquals(InterpreterResult.Code.SUCCESS,
|
||||
interpreter.interpret("a = 1\n", getInterpreterContext()).code());
|
||||
|
|
@ -133,7 +134,35 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
Thread.sleep(3000);
|
||||
PythonInterpreter pythonInterpreter = (PythonInterpreter)
|
||||
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
|
||||
pythonInterpreter.getPythonExecutor().getWatchdog().destroyProcess();
|
||||
pythonInterpreter.getPythonProcessLauncher().stop();
|
||||
waiter.await(3000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailtoLaunchPythonProcess() throws InterpreterException {
|
||||
tearDown();
|
||||
|
||||
intpGroup = new InterpreterGroup();
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("zeppelin.python", "invalid_python");
|
||||
properties.setProperty("zeppelin.python.useIPython", "false");
|
||||
properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
|
||||
|
||||
interpreter = new LazyOpenInterpreter(new PythonInterpreter(properties));
|
||||
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup.get("note").add(interpreter);
|
||||
interpreter.setInterpreterGroup(intpGroup);
|
||||
|
||||
InterpreterContext.set(getInterpreterContext());
|
||||
|
||||
try {
|
||||
interpreter.interpret("1+1", getInterpreterContext());
|
||||
fail("Should fail to open PythonInterpreter");
|
||||
} catch (InterpreterException e) {
|
||||
String stacktrace = ExceptionUtils.getStackTrace(e);
|
||||
assertTrue(stacktrace, stacktrace.contains("No such file or directory"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ package org.apache.zeppelin.spark;
|
|||
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
|
|
@ -40,6 +42,7 @@ import java.util.Properties;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
|
|
@ -47,7 +50,6 @@ import static org.mockito.Mockito.verify;
|
|||
|
||||
public class IPySparkInterpreterTest extends IPythonInterpreterTest {
|
||||
|
||||
private InterpreterGroup intpGroup;
|
||||
private RemoteInterpreterEventClient mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
|
||||
|
||||
@Override
|
||||
|
|
@ -67,7 +69,6 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
|
|||
return p;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void startInterpreter(Properties properties) throws InterpreterException {
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
|
|
@ -101,7 +102,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
|
|||
intpGroup = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
//@Test
|
||||
public void testIPySpark() throws InterruptedException, InterpreterException, IOException {
|
||||
testPySpark(interpreter, mockIntpEventClient);
|
||||
}
|
||||
|
|
@ -239,6 +240,22 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
|
|||
assertTrue(interpreterResultMessages.get(0).getData().contains("(0, 100)"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testIPythonFailToLaunch() throws InterpreterException {
|
||||
tearDown();
|
||||
|
||||
Properties properties = initIntpProperties();
|
||||
properties.setProperty("spark.pyspark.python", "invalid_python");
|
||||
try {
|
||||
startInterpreter(properties);
|
||||
fail("Should not be able to start IPythonInterpreter");
|
||||
} catch (InterpreterException e) {
|
||||
String exceptionMsg = ExceptionUtils.getStackTrace(e);
|
||||
TestCase.assertTrue(exceptionMsg, exceptionMsg.contains("No such file or directory"));
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isSpark2(String sparkVersion) {
|
||||
return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2.");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package org.apache.zeppelin.spark;
|
|||
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
|
|
@ -26,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
|
|||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.python.PythonInterpreter;
|
||||
import org.apache.zeppelin.python.PythonInterpreterTest;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
@ -33,6 +35,8 @@ import java.io.IOException;
|
|||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
|
||||
|
|
@ -94,6 +98,48 @@ public class PySparkInterpreterTest extends PythonInterpreterTest {
|
|||
IPySparkInterpreterTest.testPySpark(interpreter, mockRemoteEventClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testFailtoLaunchPythonProcess() throws InterpreterException {
|
||||
tearDown();
|
||||
|
||||
intpGroup = new InterpreterGroup();
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("zeppelin.spark.useNew", "true");
|
||||
properties.setProperty("spark.app.name", "Zeppelin Test");
|
||||
properties.setProperty("spark.pyspark.python", "invalid_python");
|
||||
properties.setProperty("zeppelin.python.useIPython", "false");
|
||||
properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
|
||||
properties.setProperty("zeppelin.spark.test", "true");
|
||||
properties.setProperty("zeppelin.spark.maxResult", "3");
|
||||
|
||||
interpreter = new LazyOpenInterpreter(new PySparkInterpreter(properties));
|
||||
interpreter.setInterpreterGroup(intpGroup);
|
||||
Interpreter sparkInterpreter =
|
||||
new LazyOpenInterpreter(new SparkInterpreter(properties));
|
||||
sparkInterpreter.setInterpreterGroup(intpGroup);
|
||||
LazyOpenInterpreter iPySparkInterpreter =
|
||||
new LazyOpenInterpreter(new IPySparkInterpreter(properties));
|
||||
iPySparkInterpreter.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup.get("note").add(interpreter);
|
||||
intpGroup.get("note").add(sparkInterpreter);
|
||||
intpGroup.get("note").add(iPySparkInterpreter);
|
||||
|
||||
|
||||
InterpreterContext.set(getInterpreterContext());
|
||||
|
||||
try {
|
||||
interpreter.interpret("1+1", getInterpreterContext());
|
||||
fail("Should fail to open PySparkInterpreter");
|
||||
} catch (InterpreterException e) {
|
||||
String stacktrace = ExceptionUtils.getStackTrace(e);
|
||||
assertTrue(stacktrace, stacktrace.contains("No such file or directory"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InterpreterContext getInterpreterContext() {
|
||||
InterpreterContext context = super.getInterpreterContext();
|
||||
|
|
|
|||
|
|
@ -64,6 +64,8 @@
|
|||
<exclude>org.slf4j:slf4j-log4j12</exclude>
|
||||
<!-- Leave commons-logging unshaded so downstream users can configure logging. -->
|
||||
<exclude>commons-logging:commons-logging</exclude>
|
||||
<!-- Leave commons-exec unshaded so downstream users can use ProcessLauncher. -->
|
||||
<exclude>org.apache.commons:commons-exec</exclude>
|
||||
<!-- Leave log4j unshaded so downstream users can configure logging. -->
|
||||
<exclude>log4j:log4j</exclude>
|
||||
<exclude>com.esotericsoftware:kryo</exclude>
|
||||
|
|
@ -111,6 +113,8 @@
|
|||
<exclude>org/slf4j/**/*</exclude>
|
||||
<exclude>org/apache/commons/logging/*</exclude>
|
||||
<exclude>org/apache/commons/logging/**/*</exclude>
|
||||
<exclude>org/apache/commons/exec/*</exclude>
|
||||
<exclude>org/apache/commons/exec/**/*</exclude>
|
||||
<exclude>org/apache/log4j/*</exclude>
|
||||
<exclude>org/apache/log4j/**/*</exclude>
|
||||
<exclude>org/sonatype/*</exclude>
|
||||
|
|
|
|||
|
|
@ -70,12 +70,13 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
|
||||
|
||||
private String sparkVersion;
|
||||
private String sparkHome;
|
||||
private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous");
|
||||
|
||||
public ZeppelinSparkClusterTest(String sparkVersion) throws Exception {
|
||||
this.sparkVersion = sparkVersion;
|
||||
LOGGER.info("Testing SparkVersion: " + sparkVersion);
|
||||
String sparkHome = DownloadUtils.downloadSpark(sparkVersion);
|
||||
this.sparkHome = DownloadUtils.downloadSpark(sparkVersion);
|
||||
if (!verifiedSparkVersions.contains(sparkVersion)) {
|
||||
verifiedSparkVersions.add(sparkVersion);
|
||||
setupSparkInterpreter(sparkHome);
|
||||
|
|
@ -801,4 +802,41 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailtoLaunchSpark() throws IOException {
|
||||
Note note = null;
|
||||
try {
|
||||
TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().close();
|
||||
note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
|
||||
Paragraph p = note.addNewParagraph(anonymous);
|
||||
p.setText("%spark.conf SPARK_HOME invalid_spark_home");
|
||||
note.run(p.getId(), true);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
|
||||
Paragraph p1 = note.addNewParagraph(anonymous);
|
||||
p1.setText("%spark\nsc.version");
|
||||
note.run(p1.getId(), true);
|
||||
assertEquals(Status.ERROR, p1.getStatus());
|
||||
assertTrue("Actual error message: " + p1.getReturn().message().get(0).getData(),
|
||||
p1.getReturn().message().get(0).getData().contains("No such file or directory"));
|
||||
|
||||
// run it again, and get the same error
|
||||
note.run(p1.getId(), true);
|
||||
assertEquals(Status.ERROR, p1.getStatus());
|
||||
assertTrue("Actual error message: " + p1.getReturn().message().get(0).getData(),
|
||||
p1.getReturn().message().get(0).getData().contains("No such file or directory"));
|
||||
} finally {
|
||||
if (null != note) {
|
||||
TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous);
|
||||
}
|
||||
// reset SPARK_HOME, otherwise it will cause the following test fail
|
||||
InterpreterSetting sparkIntpSetting = TestUtils.getInstance(Notebook.class).getInterpreterSettingManager()
|
||||
.getInterpreterSettingByName("spark");
|
||||
Map<String, InterpreterProperty> sparkProperties =
|
||||
(Map<String, InterpreterProperty>) sparkIntpSetting.getProperties();
|
||||
sparkProperties.put("SPARK_HOME", new InterpreterProperty("SPARK_HOME", sparkHome));
|
||||
sparkIntpSetting.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.remote;
|
|||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
|
|
@ -631,13 +632,15 @@ public class RemoteInterpreterServer extends Thread
|
|||
int lastMessageIndex = resultMessages.size() - 1;
|
||||
if (resultMessages.get(lastMessageIndex).getType() == InterpreterResult.Type.TABLE) {
|
||||
context.getResourcePool().put(
|
||||
context.getNoteId(),
|
||||
context.getParagraphId(),
|
||||
WellKnownResourceName.ZeppelinTableResult.toString(),
|
||||
resultMessages.get(lastMessageIndex));
|
||||
context.getNoteId(),
|
||||
context.getParagraphId(),
|
||||
WellKnownResourceName.ZeppelinTableResult.toString(),
|
||||
resultMessages.get(lastMessageIndex));
|
||||
}
|
||||
}
|
||||
return new InterpreterResult(result.code(), resultMessages);
|
||||
} catch (Throwable e) {
|
||||
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(currentThreadContextClassloader);
|
||||
InterpreterContext.remove();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.util;
|
||||
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteResultHandler;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.LogOutputStream;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Abstract class for launching java process.
|
||||
*/
|
||||
public abstract class ProcessLauncher implements ExecuteResultHandler {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessLauncher.class);
|
||||
|
||||
public enum State {
|
||||
NEW,
|
||||
LAUNCHED,
|
||||
RUNNING,
|
||||
TERMINATED
|
||||
}
|
||||
|
||||
private CommandLine commandLine;
|
||||
private Map<String, String> envs;
|
||||
private ExecuteWatchdog watchdog;
|
||||
private ProcessLogOutputStream processOutput;
|
||||
protected String errorMessage = null;
|
||||
protected State state = State.NEW;
|
||||
private boolean launchTimeout = false;
|
||||
|
||||
public ProcessLauncher(CommandLine commandLine,
|
||||
Map<String, String> envs) {
|
||||
this.commandLine = commandLine;
|
||||
this.envs = envs;
|
||||
}
|
||||
|
||||
public void launch() {
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
this.processOutput = new ProcessLogOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(processOutput));
|
||||
this.watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
|
||||
executor.setWatchdog(watchdog);
|
||||
try {
|
||||
executor.execute(commandLine, envs, this);
|
||||
state = State.LAUNCHED;
|
||||
LOGGER.info("Process is launched: {}", commandLine);
|
||||
} catch (IOException e) {
|
||||
this.processOutput.stopCatchLaunchOutput();
|
||||
LOGGER.error("Fail to launch process: " + commandLine, e);
|
||||
state = State.TERMINATED;
|
||||
errorMessage = e.getMessage();
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void waitForReady(int timeout);
|
||||
|
||||
public void onTimeout() {
|
||||
LOGGER.warn("Process launch is time out.");
|
||||
launchTimeout = true;
|
||||
stop();
|
||||
}
|
||||
|
||||
public void onProcessRunning() {
|
||||
LOGGER.info("Process is running");
|
||||
state = State.RUNNING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
LOGGER.warn("Process is exited with exit value " + exitValue);
|
||||
state = State.TERMINATED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
LOGGER.warn("Process is failed due to " + e);
|
||||
errorMessage = ExceptionUtils.getStackTrace(e);
|
||||
state = State.TERMINATED;
|
||||
}
|
||||
|
||||
public String getErrorMessage() {
|
||||
if (!StringUtils.isBlank(processOutput.getProcessExecutionOutput())) {
|
||||
return processOutput.getProcessExecutionOutput();
|
||||
} else {
|
||||
return this.errorMessage;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isLaunchTimeout() {
|
||||
return launchTimeout;
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return this.state == State.RUNNING;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (watchdog != null) {
|
||||
watchdog.destroyProcess();
|
||||
watchdog = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void stopCatchLaunchOutput() {
|
||||
processOutput.stopCatchLaunchOutput();
|
||||
}
|
||||
|
||||
class ProcessLogOutputStream extends LogOutputStream {
|
||||
|
||||
private boolean catchLaunchOutput = true;
|
||||
private StringBuilder launchOutput = new StringBuilder();
|
||||
|
||||
public void stopCatchLaunchOutput() {
|
||||
this.catchLaunchOutput = false;
|
||||
}
|
||||
|
||||
public String getProcessExecutionOutput() {
|
||||
return launchOutput.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processLine(String s, int i) {
|
||||
LOGGER.debug("Process Output: " + s);
|
||||
if (catchLaunchOutput) {
|
||||
launchOutput.append(s + "\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -379,4 +379,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
started.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getErrorMessage() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -161,6 +161,10 @@ public class RemoteInterpreter extends Interpreter {
|
|||
synchronized (this) {
|
||||
if (!isCreated) {
|
||||
this.interpreterProcess = getOrCreateInterpreterProcess();
|
||||
if (!interpreterProcess.isRunning()) {
|
||||
throw new IOException("Interpreter process is not running:\n" +
|
||||
interpreterProcess.getErrorMessage());
|
||||
}
|
||||
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
|
||||
@Override
|
||||
public Void call(Client client) throws Exception {
|
||||
|
|
@ -213,6 +217,10 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
if (!interpreterProcess.isRunning()) {
|
||||
throw new InterpreterException("Interpreter process is not running:\n" +
|
||||
interpreterProcess.getErrorMessage());
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
return interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
|
||||
|
|
@ -296,6 +304,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
FormType type = interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<FormType>() {
|
||||
|
|
|
|||
|
|
@ -19,38 +19,28 @@ package org.apache.zeppelin.interpreter.remote;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteResultHandler;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.LogOutputStream;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.util.ProcessLauncher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* This class manages start / stop of remote interpreter process
|
||||
*/
|
||||
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
||||
implements ExecuteResultHandler {
|
||||
private static final Logger logger = LoggerFactory.getLogger(
|
||||
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(
|
||||
RemoteInterpreterManagedProcess.class);
|
||||
|
||||
private final String interpreterRunner;
|
||||
private final int zeppelinServerRPCPort;
|
||||
private final String zeppelinServerRPCHost;
|
||||
private final String interpreterPortRange;
|
||||
private DefaultExecutor executor;
|
||||
private ExecuteWatchdog watchdog;
|
||||
private AtomicBoolean running = new AtomicBoolean(false);
|
||||
private InterpreterProcessLauncher interpreterProcessLauncher;
|
||||
private String host = null;
|
||||
private int port = -1;
|
||||
private final String interpreterDir;
|
||||
|
|
@ -119,49 +109,26 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
cmdLine.addArgument("-g", false);
|
||||
cmdLine.addArgument(interpreterSettingName, false);
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
|
||||
ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
|
||||
ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
|
||||
processOutput.setOutputStream(cmdOut);
|
||||
|
||||
executor.setStreamHandler(new PumpStreamHandler(processOutput));
|
||||
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
|
||||
executor.setWatchdog(watchdog);
|
||||
|
||||
try {
|
||||
Map procEnv = EnvironmentUtils.getProcEnvironment();
|
||||
procEnv.putAll(env);
|
||||
|
||||
logger.info("Run interpreter process {}", cmdLine);
|
||||
executor.execute(cmdLine, procEnv, this);
|
||||
} catch (IOException e) {
|
||||
running.set(false);
|
||||
throw new RuntimeException(e);
|
||||
Map procEnv = EnvironmentUtils.getProcEnvironment();
|
||||
procEnv.putAll(env);
|
||||
interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, procEnv);
|
||||
interpreterProcessLauncher.launch();
|
||||
interpreterProcessLauncher.waitForReady(getConnectTimeout());
|
||||
if (interpreterProcessLauncher.isLaunchTimeout()) {
|
||||
throw new IOException(String.format("Interpreter Process creation is time out in %d seconds",
|
||||
getConnectTimeout()/1000) + "\n" + "You can increase timeout threshold via " +
|
||||
"setting zeppelin.interpreter.connect.timeout of this interpreter.\n" +
|
||||
interpreterProcessLauncher.getErrorMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
synchronized (running) {
|
||||
if (!running.get()) {
|
||||
running.wait(getConnectTimeout());
|
||||
}
|
||||
}
|
||||
if (!running.get()) {
|
||||
throw new IOException(new String(
|
||||
String.format("Interpreter Process creation is time out in %d seconds",
|
||||
getConnectTimeout()/1000) + "\n" + "You can increase timeout threshold via " +
|
||||
"setting zeppelin.interpreter.connect.timeout of this interpreter.\n" +
|
||||
cmdOut.toString()));
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Remote interpreter is not accessible");
|
||||
if (!interpreterProcessLauncher.isRunning()) {
|
||||
throw new IOException("Fail to launch interpreter process:\n" +
|
||||
interpreterProcessLauncher.getErrorMessage());
|
||||
}
|
||||
processOutput.setOutputStream(null);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (isRunning()) {
|
||||
logger.info("Kill interpreter process");
|
||||
LOGGER.info("Kill interpreter process");
|
||||
try {
|
||||
callRemoteFunction(new RemoteFunction<Void>() {
|
||||
@Override
|
||||
|
|
@ -171,38 +138,20 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.warn("ignore the exception when shutting down");
|
||||
LOGGER.warn("ignore the exception when shutting down", e);
|
||||
}
|
||||
watchdog.destroyProcess();
|
||||
this.interpreterProcessLauncher.stop();
|
||||
}
|
||||
|
||||
executor = null;
|
||||
watchdog = null;
|
||||
running.set(false);
|
||||
logger.info("Remote process terminated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
logger.info("Interpreter process exited {}", exitValue);
|
||||
running.set(false);
|
||||
|
||||
interpreterProcessLauncher = null;
|
||||
LOGGER.info("Remote process terminated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processStarted(int port, String host) {
|
||||
this.port = port;
|
||||
this.host = host;
|
||||
synchronized (running) {
|
||||
running.set(true);
|
||||
running.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
logger.info("Interpreter process failed {}", e);
|
||||
running.set(false);
|
||||
interpreterProcessLauncher.onProcessRunning();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
@ -235,52 +184,64 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return running.get();
|
||||
return interpreterProcessLauncher != null && interpreterProcessLauncher.isRunning();
|
||||
}
|
||||
|
||||
private static class ProcessLogOutputStream extends LogOutputStream {
|
||||
@Override
|
||||
public String getErrorMessage() {
|
||||
return this.interpreterProcessLauncher != null ? this.interpreterProcessLauncher.getErrorMessage() : "";
|
||||
}
|
||||
|
||||
private Logger logger;
|
||||
OutputStream out;
|
||||
private class InterpreterProcessLauncher extends ProcessLauncher {
|
||||
|
||||
public ProcessLogOutputStream(Logger logger) {
|
||||
this.logger = logger;
|
||||
public InterpreterProcessLauncher(CommandLine commandLine,
|
||||
Map<String, String> envs) {
|
||||
super(commandLine, envs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processLine(String s, int i) {
|
||||
this.logger.debug(s);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte [] b) throws IOException {
|
||||
super.write(b);
|
||||
|
||||
if (out != null) {
|
||||
synchronized (this) {
|
||||
if (out != null) {
|
||||
out.write(b);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte [] b, int offset, int len) throws IOException {
|
||||
super.write(b, offset, len);
|
||||
|
||||
if (out != null) {
|
||||
synchronized (this) {
|
||||
if (out != null) {
|
||||
out.write(b, offset, len);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setOutputStream(OutputStream out) {
|
||||
public void waitForReady(int timeout) {
|
||||
synchronized (this) {
|
||||
this.out = out;
|
||||
if (state != State.RUNNING) {
|
||||
try {
|
||||
wait(timeout);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("Remote interpreter is not accessible", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.stopCatchLaunchOutput();
|
||||
if (state == State.LAUNCHED) {
|
||||
onTimeout();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessRunning() {
|
||||
super.onProcessRunning();
|
||||
synchronized(this) {
|
||||
notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
LOGGER.warn("Process is exited with exit value " + exitValue);
|
||||
// For yarn-cluster mode, client process will exit with exit value 0
|
||||
// after submitting spark app. So don't move to TERMINATED state when exitValue is 0.
|
||||
if (exitValue != 0) {
|
||||
state = State.TERMINATED;
|
||||
synchronized (this) {
|
||||
notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
super.onProcessFailed(e);
|
||||
synchronized (this) {
|
||||
notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -142,4 +142,6 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
|
|||
* called by RemoteInterpreterEventServer to notify that RemoteInterpreter Process is started
|
||||
*/
|
||||
public abstract void processStarted(int port, String host);
|
||||
|
||||
public abstract String getErrorMessage();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -93,4 +93,9 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
public void processStarted(int port, String host) {
|
||||
// assume process is externally managed. nothing to do
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getErrorMessage() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.zeppelin.common.JsonSerializable;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
|
|
@ -385,105 +386,109 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
|
|||
|
||||
@Override
|
||||
protected InterpreterResult jobRun() throws Throwable {
|
||||
this.runtimeInfos.clear();
|
||||
this.interpreter = getBindedInterpreter();
|
||||
if (this.interpreter == null) {
|
||||
LOGGER.error("Can not find interpreter name " + intpText);
|
||||
throw new RuntimeException("Can not find interpreter for " + intpText);
|
||||
}
|
||||
LOGGER.info("Run paragraph [paragraph_id: {}, interpreter: {}, note_id: {}, user: {}]",
|
||||
getId(), this.interpreter.getClassName(), note.getId(), subject.getUser());
|
||||
InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup)
|
||||
interpreter.getInterpreterGroup()).getInterpreterSetting();
|
||||
if (interpreterSetting != null) {
|
||||
interpreterSetting.waitForReady();
|
||||
}
|
||||
if (this.user != null) {
|
||||
if (subject != null && !interpreterSetting.isUserAuthorized(subject.getUsersAndRoles())) {
|
||||
String msg = String.format("%s has no permission for %s", subject.getUser(), intpText);
|
||||
LOGGER.error(msg);
|
||||
return new InterpreterResult(Code.ERROR, msg);
|
||||
try {
|
||||
this.runtimeInfos.clear();
|
||||
this.interpreter = getBindedInterpreter();
|
||||
if (this.interpreter == null) {
|
||||
LOGGER.error("Can not find interpreter name " + intpText);
|
||||
throw new RuntimeException("Can not find interpreter for " + intpText);
|
||||
}
|
||||
LOGGER.info("Run paragraph [paragraph_id: {}, interpreter: {}, note_id: {}, user: {}]",
|
||||
getId(), this.interpreter.getClassName(), note.getId(), subject.getUser());
|
||||
InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup)
|
||||
interpreter.getInterpreterGroup()).getInterpreterSetting();
|
||||
if (interpreterSetting != null) {
|
||||
interpreterSetting.waitForReady();
|
||||
}
|
||||
if (this.user != null) {
|
||||
if (subject != null && !interpreterSetting.isUserAuthorized(subject.getUsersAndRoles())) {
|
||||
String msg = String.format("%s has no permission for %s", subject.getUser(), intpText);
|
||||
LOGGER.error(msg);
|
||||
return new InterpreterResult(Code.ERROR, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (Paragraph p : userParagraphMap.values()) {
|
||||
p.setText(getText());
|
||||
}
|
||||
for (Paragraph p : userParagraphMap.values()) {
|
||||
p.setText(getText());
|
||||
}
|
||||
|
||||
// inject form
|
||||
String script = this.scriptText;
|
||||
if (interpreter.getFormType() == FormType.NATIVE) {
|
||||
settings.clear();
|
||||
} else if (interpreter.getFormType() == FormType.SIMPLE) {
|
||||
// inputs will be built from script body
|
||||
LinkedHashMap<String, Input> inputs = Input.extractSimpleQueryForm(script, false);
|
||||
LinkedHashMap<String, Input> noteInputs = Input.extractSimpleQueryForm(script, true);
|
||||
final AngularObjectRegistry angularRegistry =
|
||||
interpreter.getInterpreterGroup().getAngularObjectRegistry();
|
||||
String scriptBody = extractVariablesFromAngularRegistry(script, inputs, angularRegistry);
|
||||
// inject form
|
||||
String script = this.scriptText;
|
||||
if (interpreter.getFormType() == FormType.NATIVE) {
|
||||
settings.clear();
|
||||
} else if (interpreter.getFormType() == FormType.SIMPLE) {
|
||||
// inputs will be built from script body
|
||||
LinkedHashMap<String, Input> inputs = Input.extractSimpleQueryForm(script, false);
|
||||
LinkedHashMap<String, Input> noteInputs = Input.extractSimpleQueryForm(script, true);
|
||||
final AngularObjectRegistry angularRegistry =
|
||||
interpreter.getInterpreterGroup().getAngularObjectRegistry();
|
||||
String scriptBody = extractVariablesFromAngularRegistry(script, inputs, angularRegistry);
|
||||
|
||||
settings.setForms(inputs);
|
||||
if (!noteInputs.isEmpty()) {
|
||||
if (!note.getNoteForms().isEmpty()) {
|
||||
Map<String, Input> currentNoteForms = note.getNoteForms();
|
||||
for (String s : noteInputs.keySet()) {
|
||||
if (!currentNoteForms.containsKey(s)) {
|
||||
currentNoteForms.put(s, noteInputs.get(s));
|
||||
settings.setForms(inputs);
|
||||
if (!noteInputs.isEmpty()) {
|
||||
if (!note.getNoteForms().isEmpty()) {
|
||||
Map<String, Input> currentNoteForms = note.getNoteForms();
|
||||
for (String s : noteInputs.keySet()) {
|
||||
if (!currentNoteForms.containsKey(s)) {
|
||||
currentNoteForms.put(s, noteInputs.get(s));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
note.setNoteForms(noteInputs);
|
||||
}
|
||||
}
|
||||
script = Input.getSimpleQuery(note.getNoteParams(), scriptBody, true);
|
||||
script = Input.getSimpleQuery(settings.getParams(), script, false);
|
||||
}
|
||||
LOGGER.debug("RUN : " + script);
|
||||
try {
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
InterpreterContext.set(context);
|
||||
InterpreterResult ret = interpreter.interpret(script, context);
|
||||
|
||||
if (interpreter.getFormType() == FormType.NATIVE) {
|
||||
note.setNoteParams(context.getNoteGui().getParams());
|
||||
note.setNoteForms(context.getNoteGui().getForms());
|
||||
}
|
||||
|
||||
if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
|
||||
return getReturn();
|
||||
}
|
||||
|
||||
context.out.flush();
|
||||
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
|
||||
resultMessages.addAll(ret.message());
|
||||
InterpreterResult res = new InterpreterResult(ret.code(), resultMessages);
|
||||
Paragraph p = getUserParagraph(getUser());
|
||||
if (null != p) {
|
||||
p.setResult(res);
|
||||
p.settings.setParams(settings.getParams());
|
||||
}
|
||||
|
||||
// After the paragraph is executed,
|
||||
// need to apply the paragraph to the configuration in the
|
||||
// `interpreter-setting.json` config
|
||||
if (this.configSettingNeedUpdate) {
|
||||
this.configSettingNeedUpdate = false;
|
||||
InterpreterSettingManager intpSettingManager
|
||||
= this.note.getInterpreterSettingManager();
|
||||
if (null != intpSettingManager) {
|
||||
InterpreterGroup intpGroup = interpreter.getInterpreterGroup();
|
||||
if (null != intpGroup && intpGroup instanceof ManagedInterpreterGroup) {
|
||||
String name = ((ManagedInterpreterGroup) intpGroup).getInterpreterSetting().getName();
|
||||
Map<String, Object> config
|
||||
= intpSettingManager.getConfigSetting(name);
|
||||
applyConfigSetting(config);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
note.setNoteForms(noteInputs);
|
||||
}
|
||||
}
|
||||
script = Input.getSimpleQuery(note.getNoteParams(), scriptBody, true);
|
||||
script = Input.getSimpleQuery(settings.getParams(), script, false);
|
||||
}
|
||||
LOGGER.debug("RUN : " + script);
|
||||
try {
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
InterpreterContext.set(context);
|
||||
InterpreterResult ret = interpreter.interpret(script, context);
|
||||
|
||||
if (interpreter.getFormType() == FormType.NATIVE) {
|
||||
note.setNoteParams(context.getNoteGui().getParams());
|
||||
note.setNoteForms(context.getNoteGui().getForms());
|
||||
return res;
|
||||
} finally {
|
||||
InterpreterContext.remove();
|
||||
}
|
||||
|
||||
if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
|
||||
return getReturn();
|
||||
}
|
||||
|
||||
context.out.flush();
|
||||
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
|
||||
resultMessages.addAll(ret.message());
|
||||
InterpreterResult res = new InterpreterResult(ret.code(), resultMessages);
|
||||
Paragraph p = getUserParagraph(getUser());
|
||||
if (null != p) {
|
||||
p.setResult(res);
|
||||
p.settings.setParams(settings.getParams());
|
||||
}
|
||||
|
||||
// After the paragraph is executed,
|
||||
// need to apply the paragraph to the configuration in the
|
||||
// `interpreter-setting.json` config
|
||||
if (this.configSettingNeedUpdate) {
|
||||
this.configSettingNeedUpdate = false;
|
||||
InterpreterSettingManager intpSettingManager
|
||||
= this.note.getInterpreterSettingManager();
|
||||
if (null != intpSettingManager) {
|
||||
InterpreterGroup intpGroup = interpreter.getInterpreterGroup();
|
||||
if (null != intpGroup && intpGroup instanceof ManagedInterpreterGroup) {
|
||||
String name = ((ManagedInterpreterGroup) intpGroup).getInterpreterSetting().getName();
|
||||
Map<String, Object> config
|
||||
= intpSettingManager.getConfigSetting(name);
|
||||
applyConfigSetting(config);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
} finally {
|
||||
InterpreterContext.remove();
|
||||
} catch (Exception e) {
|
||||
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,28 +17,38 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.display.Input;
|
||||
import org.apache.zeppelin.display.ui.OptionInput;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.GetAngularObjectSizeInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.GetEnvPropertyInterpreter;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.After;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class RemoteInterpreterTest extends AbstractInterpreterTest {
|
||||
|
||||
|
|
@ -50,6 +60,11 @@ public class RemoteInterpreterTest extends AbstractInterpreterTest {
|
|||
interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSharedMode() throws InterpreterException, IOException {
|
||||
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
|
||||
|
|
@ -389,4 +404,64 @@ public class RemoteInterpreterTest extends AbstractInterpreterTest {
|
|||
assertArrayEquals(expected.values().toArray(), gui.getForms().values().toArray());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailToLaunchInterpreterProcess_InvalidRunner() {
|
||||
try {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(), "invalid_runner");
|
||||
final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
|
||||
final InterpreterContext context1 = createDummyInterpreterContext();
|
||||
// run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
|
||||
// time overhead of launching the process.
|
||||
try {
|
||||
interpreter1.interpret("1", context1);
|
||||
fail("Should not be able to launch interpreter process");
|
||||
} catch (InterpreterException e) {
|
||||
assertTrue(ExceptionUtils.getStackTrace(e).contains("No such file or directory"));
|
||||
}
|
||||
} finally {
|
||||
System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailToLaunchInterpreterProcess_ErrorInRunner() {
|
||||
try {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(),
|
||||
zeppelinHome.getAbsolutePath() + "/zeppelin-zengine/src/test/resources/bin/interpreter_invalid.sh");
|
||||
final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
|
||||
final InterpreterContext context1 = createDummyInterpreterContext();
|
||||
// run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
|
||||
// time overhead of launching the process.
|
||||
try {
|
||||
interpreter1.interpret("1", context1);
|
||||
fail("Should not be able to launch interpreter process");
|
||||
} catch (InterpreterException e) {
|
||||
assertTrue(ExceptionUtils.getStackTrace(e).contains("invalid_command: command not found"));
|
||||
}
|
||||
} finally {
|
||||
System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailToLaunchInterpreterProcess_Timeout() {
|
||||
try {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(),
|
||||
zeppelinHome.getAbsolutePath() + "/zeppelin-zengine/src/test/resources/bin/interpreter_timeout.sh");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
|
||||
final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
|
||||
final InterpreterContext context1 = createDummyInterpreterContext();
|
||||
// run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
|
||||
// time overhead of launching the process.
|
||||
try {
|
||||
interpreter1.interpret("1", context1);
|
||||
fail("Should not be able to launch interpreter process");
|
||||
} catch (InterpreterException e) {
|
||||
assertTrue(ExceptionUtils.getStackTrace(e).contains("Interpreter Process creation is time out"));
|
||||
}
|
||||
} finally {
|
||||
System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName());
|
||||
System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
19
zeppelin-zengine/src/test/resources/bin/interpreter_invalid.sh
Executable file
19
zeppelin-zengine/src/test/resources/bin/interpreter_invalid.sh
Executable file
|
|
@ -0,0 +1,19 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
invalid_command
|
||||
19
zeppelin-zengine/src/test/resources/bin/interpreter_timeout.sh
Executable file
19
zeppelin-zengine/src/test/resources/bin/interpreter_timeout.sh
Executable file
|
|
@ -0,0 +1,19 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
sleep 100
|
||||
Loading…
Reference in a new issue