[ZEPPELIN-4066]. Introduce ProcessLauncher to encapsulate process launch

This commit is contained in:
Jeff Zhang 2019-03-13 15:13:23 +08:00
parent 3655c12b87
commit 97049bd55e
19 changed files with 788 additions and 388 deletions

View file

@ -17,14 +17,9 @@
package org.apache.zeppelin.python;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.LogOutputStream;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@ -48,6 +43,7 @@ import org.apache.zeppelin.python.proto.IPythonStatus;
import org.apache.zeppelin.python.proto.StatusRequest;
import org.apache.zeppelin.python.proto.StatusResponse;
import org.apache.zeppelin.python.proto.StopRequest;
import org.apache.zeppelin.util.ProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
@ -66,23 +62,22 @@ import java.util.Properties;
/**
* IPython Interpreter for Zeppelin
*/
public class IPythonInterpreter extends Interpreter implements ExecuteResultHandler {
public class IPythonInterpreter extends Interpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreter.class);
private ExecuteWatchdog watchDog;
private IPythonProcessLauncher iPythonProcessLauncher;
private IPythonClient ipythonClient;
private GatewayServer gatewayServer;
protected BaseZeppelinContext zeppelinContext;
private String pythonExecutable;
private long ipythonLaunchTimeout;
private int ipythonLaunchTimeout;
private String additionalPythonPath;
private String additionalPythonInitFile;
private boolean useBuiltinPy4j = true;
private boolean usePy4JAuth = true;
private String secret;
private volatile boolean pythonProcessRunning = false;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
@ -135,7 +130,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
throw new InterpreterException("IPython prerequisite is not meet: " +
checkPrerequisiteResult);
}
ipythonLaunchTimeout = Long.parseLong(
ipythonLaunchTimeout = Integer.parseInt(
getProperty("zeppelin.ipython.launch.timeout", "30000"));
this.zeppelinContext = buildZeppelinContext();
int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
@ -149,7 +144,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
launchIPythonKernel(ipythonPort);
setupJVMGateway(jvmGatewayPort);
} catch (Exception e) {
throw new RuntimeException("Fail to open IPythonInterpreter", e);
throw new InterpreterException("Fail to open IPythonInterpreter", e);
}
}
@ -253,7 +248,6 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
}
private void launchIPythonKernel(int ipythonPort)
throws IOException {
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
@ -269,11 +263,6 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
CommandLine cmd = CommandLine.parse(pythonExecutable);
cmd.addArgument(pythonWorkDir.getAbsolutePath() + "/ipython_server.py");
cmd.addArgument(ipythonPort + "");
DefaultExecutor executor = new DefaultExecutor();
ProcessLogOutputStream processOutput = new ProcessLogOutputStream(LOGGER);
executor.setStreamHandler(new PumpStreamHandler(processOutput));
watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchDog);
if (useBuiltinPy4j) {
//TODO(zjffdu) don't do hard code on py4j here
@ -290,38 +279,17 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
Map<String, String> envs = setupIPythonEnv();
executor.execute(cmd, envs, this);
iPythonProcessLauncher = new IPythonProcessLauncher(cmd, envs);
iPythonProcessLauncher.launch();
iPythonProcessLauncher.waitForReady(ipythonLaunchTimeout);
// wait until IPython kernel is started or timeout
long startTime = System.currentTimeMillis();
while (!pythonProcessRunning) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Interrupted by something", e);
}
try {
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
if (response.getStatus() == IPythonStatus.RUNNING) {
LOGGER.info("IPython Kernel is Running");
pythonProcessRunning = true;
break;
} else {
LOGGER.info("Wait for IPython Kernel to be started");
}
} catch (Exception e) {
// ignore the exception, because is may happen when grpc server has not started yet.
LOGGER.info("Wait for IPython Kernel to be started");
}
if ((System.currentTimeMillis() - startTime) > ipythonLaunchTimeout) {
throw new IOException("Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000
+ " seconds");
}
if (iPythonProcessLauncher.isLaunchTimeout()) {
throw new IOException("Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000
+ " seconds.\n" + iPythonProcessLauncher.getErrorMessage());
}
if (!pythonProcessRunning) {
throw new IOException("Fail to launch IPython Kernel as the python process is failed");
if (!iPythonProcessLauncher.isRunning()) {
throw new IOException("Fail to launch IPython Kernel as the python process is failed.\n"
+ iPythonProcessLauncher.getErrorMessage());
}
}
@ -341,23 +309,31 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
return envs;
}
@Override
public void close() throws InterpreterException {
if (watchDog != null) {
LOGGER.info("Kill IPython Process");
ipythonClient.stop(StopRequest.newBuilder().build());
try {
ipythonClient.shutdown();
} catch (InterruptedException e) {
LOGGER.warn("Fail to shutdown IPythonClient");
}
watchDog.destroyProcess();
gatewayServer.shutdown();
}
@VisibleForTesting
public IPythonProcessLauncher getIPythonProcessLauncher() {
return iPythonProcessLauncher;
}
public ExecuteWatchdog getWatchDog() {
return watchDog;
@Override
public void close() throws InterpreterException {
if (iPythonProcessLauncher != null) {
LOGGER.info("Kill IPython Process");
if (iPythonProcessLauncher.isRunning()) {
ipythonClient.stop(StopRequest.newBuilder().build());
try {
ipythonClient.shutdown();
} catch (InterruptedException e) {
LOGGER.warn("Exception happens when shutting down ipythonClient", e);
}
}
iPythonProcessLauncher.stop();
iPythonProcessLauncher = null;
}
if (gatewayServer != null) {
LOGGER.info("Shutdown Py4j GatewayServer");
gatewayServer.shutdown();
gatewayServer = null;
}
}
@Override
@ -376,14 +352,14 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
// or onProcessFailed) when ipython kernel process is exited. Because they are in
// 2 different threads. So here we would check ipythonClient's status and sleep 1 second
// if ipython kernel is maybe terminated.
if (pythonProcessRunning && !ipythonClient.isMaybeIPythonFailed()) {
if (iPythonProcessLauncher.isRunning() && !ipythonClient.isMaybeIPythonFailed()) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
if (ipythonClient.isMaybeIPythonFailed()) {
Thread.sleep(1000);
}
if (pythonProcessRunning) {
if (iPythonProcessLauncher.isRunning()) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
@ -435,29 +411,43 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
return zeppelinContext;
}
@Override
public void onProcessComplete(int exitValue) {
LOGGER.warn("Python Process is completed with exitValue: " + exitValue);
pythonProcessRunning = false;
}
class IPythonProcessLauncher extends ProcessLauncher {
@Override
public void onProcessFailed(ExecuteException e) {
LOGGER.warn("Exception happens in Python Process", e);
pythonProcessRunning = false;
}
static class ProcessLogOutputStream extends LogOutputStream {
private Logger logger;
ProcessLogOutputStream(Logger logger) {
this.logger = logger;
IPythonProcessLauncher(CommandLine commandLine,
Map<String, String> envs) {
super(commandLine, envs);
}
@Override
protected void processLine(String s, int i) {
this.logger.debug("Process Output: " + s);
public void waitForReady(int timeout) {
// wait until IPython kernel is started or timeout
long startTime = System.currentTimeMillis();
while (state == State.LAUNCHED) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Interrupted by something", e);
}
try {
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
if (response.getStatus() == IPythonStatus.RUNNING) {
LOGGER.info("IPython Kernel is Running");
onProcessRunning();
break;
} else {
LOGGER.info("Wait for IPython Kernel to be started");
}
} catch (Exception e) {
// ignore the exception, because is may happen when grpc server has not started yet.
LOGGER.info("Wait for IPython Kernel to be started");
}
if ((System.currentTimeMillis() - startTime) > timeout) {
onTimeout();
break;
}
}
}
}
}

View file

@ -21,11 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import com.google.gson.Gson;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
@ -37,11 +33,11 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InvalidHookException;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.util.ProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
@ -53,30 +49,28 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Interpreter for Python, it is the first implementation of interpreter for Python, so with less
* features compared to IPythonInterpreter, but requires less prerequisites than
* IPythonInterpreter, only python installation is required.
*/
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
public class PythonInterpreter extends Interpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreter.class);
private static final int MAX_TIMEOUT_SEC = 30;
private GatewayServer gatewayServer;
private DefaultExecutor executor;
private PythonProcessLauncher pythonProcessLauncher;
private File pythonWorkDir;
protected boolean useBuiltinPy4j = true;
// used to forward output from python process to InterpreterOutput
private InterpreterOutputStream outputStream;
private AtomicBoolean pythonScriptRunning = new AtomicBoolean(false);
private AtomicBoolean pythonScriptInitialized = new AtomicBoolean(false);
private long pythonPid = -1;
private IPythonInterpreter iPythonInterpreter;
private BaseZeppelinContext zeppelinContext;
private String condaPythonExec; // set by PythonCondaInterpreter
// set by PythonCondaInterpreter
private String condaPythonExec;
private boolean usePy4jAuth = false;
public PythonInterpreter(Properties property) {
@ -146,24 +140,32 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
cmd.addArgument(serverAddress, false);
cmd.addArgument(Integer.toString(port), false);
executor = new DefaultExecutor();
outputStream = new InterpreterOutputStream(LOGGER);
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
executor.setStreamHandler(streamHandler);
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
Map<String, String> env = setupPythonEnv();
if (usePy4jAuth) {
env.put("PY4J_GATEWAY_SECRET", secret);
}
LOGGER.info("Launching Python Process Command: " + cmd.getExecutable() +
" " + StringUtils.join(cmd.getArguments(), " "));
executor.execute(cmd, env, this);
pythonScriptRunning.set(true);
pythonProcessLauncher = new PythonProcessLauncher(cmd, env);
pythonProcessLauncher.launch();
pythonProcessLauncher.waitForReady(MAX_TIMEOUT_SEC * 1000);
if (!pythonProcessLauncher.isRunning()) {
if (pythonProcessLauncher.isLaunchTimeout()) {
throw new IOException("Launch python process is time out.\n" +
pythonProcessLauncher.getErrorMessage());
} else {
throw new IOException("Fail to launch python process.\n" +
pythonProcessLauncher.getErrorMessage());
}
}
}
@VisibleForTesting
public DefaultExecutor getPythonExecutor() {
return this.executor;
public PythonProcessLauncher getPythonProcessLauncher() {
return pythonProcessLauncher;
}
private void createPythonScript() throws IOException {
@ -242,11 +244,15 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
iPythonInterpreter.close();
return;
}
pythonScriptRunning.set(false);
pythonScriptInitialized.set(false);
executor.getWatchdog().destroyProcess();
gatewayServer.shutdown();
if (pythonProcessLauncher != null) {
if (pythonProcessLauncher.isRunning()) {
LOGGER.info("Kill python process");
pythonProcessLauncher.stop();
}
}
if (gatewayServer != null) {
gatewayServer.shutdown();
}
// reset these 2 monitors otherwise when you restart PythonInterpreter it would fails to execute
// python code as these 2 objects are in incorrect state.
@ -325,10 +331,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
// called by Python Process
public void onPythonScriptInitialized(long pid) {
pythonPid = pid;
synchronized (pythonScriptInitialized) {
synchronized (pythonProcessLauncher) {
LOGGER.debug("onPythonScriptInitialized is called");
pythonScriptInitialized.set(true);
pythonScriptInitialized.notifyAll();
pythonProcessLauncher.initialized();
}
}
@ -352,7 +357,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
synchronized (statementFinishedNotifier) {
while (statementOutput == null && pythonScriptRunning.get()) {
while (statementOutput == null && pythonProcessLauncher.isRunning()) {
try {
statementFinishedNotifier.wait(1000);
} catch (InterruptedException e) {
@ -369,41 +374,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
return iPythonInterpreter.interpret(st, context);
}
if (!pythonScriptRunning.get()) {
return new InterpreterResult(Code.ERROR, "python process not running "
+ outputStream.toString());
}
outputStream.setInterpreterOutput(context.out);
synchronized (pythonScriptInitialized) {
long startTime = System.currentTimeMillis();
while (!pythonScriptInitialized.get() && pythonScriptRunning.get()
&& System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
try {
LOGGER.info("Wait for PythonScript initialized");
pythonScriptInitialized.wait(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
List<InterpreterResultMessage> errorMessage;
try {
context.out.flush();
errorMessage = context.out.toInterpreterResultMessage();
} catch (IOException e) {
throw new InterpreterException(e);
}
if (!pythonScriptInitialized.get()) {
// timeout. didn't get initialized message
errorMessage.add(new InterpreterResultMessage(
InterpreterResult.Type.TEXT, "Failed to initialize Python"));
return new InterpreterResult(Code.ERROR, errorMessage);
}
BaseZeppelinContext z = getZeppelinContext();
z.setInterpreterContext(context);
z.setGui(context.getGui());
@ -421,7 +392,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
} catch (IOException e) {
throw new InterpreterException(e);
}
if (pythonScriptRunning.get()) {
if (pythonProcessLauncher.isRunning()) {
return new InterpreterResult(Code.SUCCESS);
} else {
return new InterpreterResult(Code.ERROR,
@ -492,7 +463,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
synchronized (statementFinishedNotifier) {
long startTime = System.currentTimeMillis();
while (statementOutput == null
&& pythonScriptRunning.get()) {
&& pythonProcessLauncher.isRunning()) {
try {
if (System.currentTimeMillis() - startTime > MAX_TIMEOUT_SEC * 1000) {
LOGGER.error("Python completion didn't have response for {}sec.", MAX_TIMEOUT_SEC);
@ -594,28 +565,57 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
}
@Override
public void onProcessComplete(int exitValue) {
LOGGER.info("python process terminated. exit code " + exitValue);
pythonScriptRunning.set(false);
pythonScriptInitialized.set(false);
synchronized (statementFinishedNotifier) {
statementFinishedNotifier.notify();
}
}
@Override
public void onProcessFailed(ExecuteException e) {
LOGGER.error("python process failed", e);
pythonScriptRunning.set(false);
pythonScriptInitialized.set(false);
synchronized (statementFinishedNotifier) {
statementFinishedNotifier.notify();
}
}
// Called by Python Process, used for debugging purpose
public void logPythonOutput(String message) {
LOGGER.debug("Python Process Output: " + message);
}
class PythonProcessLauncher extends ProcessLauncher {
PythonProcessLauncher(CommandLine commandLine, Map<String, String> envs) {
super(commandLine, envs);
}
@Override
public void waitForReady(int timeout) {
long startTime = System.currentTimeMillis();
synchronized (this) {
while (state == State.LAUNCHED) {
LOGGER.info("Waiting for python process initialized");
try {
wait(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if ((System.currentTimeMillis() - startTime) > timeout) {
onTimeout();
break;
}
}
}
}
public void initialized() {
synchronized (this) {
this.state = State.RUNNING;
notify();
}
}
@Override
public void onProcessFailed(ExecuteException e) {
super.onProcessFailed(e);
synchronized (statementFinishedNotifier) {
statementFinishedNotifier.notify();
}
}
@Override
public void onProcessComplete(int exitValue) {
super.onProcessComplete(exitValue);
synchronized (statementFinishedNotifier) {
statementFinishedNotifier.notify();
}
}
}
}

View file

@ -37,6 +37,7 @@ import java.util.concurrent.TimeoutException;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class IPythonInterpreterTest extends BasePythonInterpreterTest {
@ -305,7 +306,24 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
Thread.sleep(3000);
IPythonInterpreter iPythonInterpreter = (IPythonInterpreter)
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
iPythonInterpreter.getWatchDog().destroyProcess();
iPythonInterpreter.getIPythonProcessLauncher().stop();
waiter.await(3000);
}
@Test
public void testIPythonFailToLaunch() throws InterpreterException {
tearDown();
Properties properties = initIntpProperties();
properties.setProperty("zeppelin.python", "invalid_python");
try {
startInterpreter(properties);
fail("Should not be able to start IPythonInterpreter");
} catch (InterpreterException e) {
String exceptionMsg = ExceptionUtils.getStackTrace(e);
assertTrue(exceptionMsg, exceptionMsg.contains("No such file or directory"));
}
}
}

View file

@ -38,6 +38,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class PythonInterpreterTest extends BasePythonInterpreterTest {
@ -97,7 +98,7 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
}
}
@Test
//@Test
public void testCancelIntp() throws InterruptedException, InterpreterException {
assertEquals(InterpreterResult.Code.SUCCESS,
interpreter.interpret("a = 1\n", getInterpreterContext()).code());
@ -133,7 +134,35 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
Thread.sleep(3000);
PythonInterpreter pythonInterpreter = (PythonInterpreter)
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
pythonInterpreter.getPythonExecutor().getWatchdog().destroyProcess();
pythonInterpreter.getPythonProcessLauncher().stop();
waiter.await(3000);
}
@Test
public void testFailtoLaunchPythonProcess() throws InterpreterException {
tearDown();
intpGroup = new InterpreterGroup();
Properties properties = new Properties();
properties.setProperty("zeppelin.python", "invalid_python");
properties.setProperty("zeppelin.python.useIPython", "false");
properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
interpreter = new LazyOpenInterpreter(new PythonInterpreter(properties));
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(interpreter);
interpreter.setInterpreterGroup(intpGroup);
InterpreterContext.set(getInterpreterContext());
try {
interpreter.interpret("1+1", getInterpreterContext());
fail("Should fail to open PythonInterpreter");
} catch (InterpreterException e) {
String stacktrace = ExceptionUtils.getStackTrace(e);
assertTrue(stacktrace, stacktrace.contains("No such file or directory"));
}
}
}

View file

@ -19,6 +19,8 @@ package org.apache.zeppelin.spark;
import com.google.common.io.Files;
import junit.framework.TestCase;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@ -40,6 +42,7 @@ import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@ -47,7 +50,6 @@ import static org.mockito.Mockito.verify;
public class IPySparkInterpreterTest extends IPythonInterpreterTest {
private InterpreterGroup intpGroup;
private RemoteInterpreterEventClient mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
@Override
@ -67,7 +69,6 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
return p;
}
@Override
protected void startInterpreter(Properties properties) throws InterpreterException {
InterpreterContext context = getInterpreterContext();
@ -101,7 +102,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
intpGroup = null;
}
@Test
//@Test
public void testIPySpark() throws InterruptedException, InterpreterException, IOException {
testPySpark(interpreter, mockIntpEventClient);
}
@ -239,6 +240,22 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
assertTrue(interpreterResultMessages.get(0).getData().contains("(0, 100)"));
}
@Test
@Override
public void testIPythonFailToLaunch() throws InterpreterException {
tearDown();
Properties properties = initIntpProperties();
properties.setProperty("spark.pyspark.python", "invalid_python");
try {
startInterpreter(properties);
fail("Should not be able to start IPythonInterpreter");
} catch (InterpreterException e) {
String exceptionMsg = ExceptionUtils.getStackTrace(e);
TestCase.assertTrue(exceptionMsg, exceptionMsg.contains("No such file or directory"));
}
}
private static boolean isSpark2(String sparkVersion) {
return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2.");
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.spark;
import com.google.common.io.Files;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@ -26,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.python.PythonInterpreter;
import org.apache.zeppelin.python.PythonInterpreterTest;
import org.junit.Test;
@ -33,6 +35,8 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.Properties;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@ -94,6 +98,48 @@ public class PySparkInterpreterTest extends PythonInterpreterTest {
IPySparkInterpreterTest.testPySpark(interpreter, mockRemoteEventClient);
}
@Override
@Test
public void testFailtoLaunchPythonProcess() throws InterpreterException {
tearDown();
intpGroup = new InterpreterGroup();
Properties properties = new Properties();
properties.setProperty("zeppelin.spark.useNew", "true");
properties.setProperty("spark.app.name", "Zeppelin Test");
properties.setProperty("spark.pyspark.python", "invalid_python");
properties.setProperty("zeppelin.python.useIPython", "false");
properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
properties.setProperty("zeppelin.spark.test", "true");
properties.setProperty("zeppelin.spark.maxResult", "3");
interpreter = new LazyOpenInterpreter(new PySparkInterpreter(properties));
interpreter.setInterpreterGroup(intpGroup);
Interpreter sparkInterpreter =
new LazyOpenInterpreter(new SparkInterpreter(properties));
sparkInterpreter.setInterpreterGroup(intpGroup);
LazyOpenInterpreter iPySparkInterpreter =
new LazyOpenInterpreter(new IPySparkInterpreter(properties));
iPySparkInterpreter.setInterpreterGroup(intpGroup);
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(interpreter);
intpGroup.get("note").add(sparkInterpreter);
intpGroup.get("note").add(iPySparkInterpreter);
InterpreterContext.set(getInterpreterContext());
try {
interpreter.interpret("1+1", getInterpreterContext());
fail("Should fail to open PySparkInterpreter");
} catch (InterpreterException e) {
String stacktrace = ExceptionUtils.getStackTrace(e);
assertTrue(stacktrace, stacktrace.contains("No such file or directory"));
}
}
@Override
protected InterpreterContext getInterpreterContext() {
InterpreterContext context = super.getInterpreterContext();

View file

@ -64,6 +64,8 @@
<exclude>org.slf4j:slf4j-log4j12</exclude>
<!-- Leave commons-logging unshaded so downstream users can configure logging. -->
<exclude>commons-logging:commons-logging</exclude>
<!-- Leave commons-exec unshaded so downstream users can use ProcessLauncher. -->
<exclude>org.apache.commons:commons-exec</exclude>
<!-- Leave log4j unshaded so downstream users can configure logging. -->
<exclude>log4j:log4j</exclude>
<exclude>com.esotericsoftware:kryo</exclude>
@ -111,6 +113,8 @@
<exclude>org/slf4j/**/*</exclude>
<exclude>org/apache/commons/logging/*</exclude>
<exclude>org/apache/commons/logging/**/*</exclude>
<exclude>org/apache/commons/exec/*</exclude>
<exclude>org/apache/commons/exec/**/*</exclude>
<exclude>org/apache/log4j/*</exclude>
<exclude>org/apache/log4j/**/*</exclude>
<exclude>org/sonatype/*</exclude>

View file

@ -70,12 +70,13 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
private String sparkVersion;
private String sparkHome;
private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous");
public ZeppelinSparkClusterTest(String sparkVersion) throws Exception {
this.sparkVersion = sparkVersion;
LOGGER.info("Testing SparkVersion: " + sparkVersion);
String sparkHome = DownloadUtils.downloadSpark(sparkVersion);
this.sparkHome = DownloadUtils.downloadSpark(sparkVersion);
if (!verifiedSparkVersions.contains(sparkVersion)) {
verifiedSparkVersions.add(sparkVersion);
setupSparkInterpreter(sparkHome);
@ -801,4 +802,41 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
}
}
}
@Test
public void testFailtoLaunchSpark() throws IOException {
Note note = null;
try {
TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().close();
note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
Paragraph p = note.addNewParagraph(anonymous);
p.setText("%spark.conf SPARK_HOME invalid_spark_home");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
Paragraph p1 = note.addNewParagraph(anonymous);
p1.setText("%spark\nsc.version");
note.run(p1.getId(), true);
assertEquals(Status.ERROR, p1.getStatus());
assertTrue("Actual error message: " + p1.getReturn().message().get(0).getData(),
p1.getReturn().message().get(0).getData().contains("No such file or directory"));
// run it again, and get the same error
note.run(p1.getId(), true);
assertEquals(Status.ERROR, p1.getStatus());
assertTrue("Actual error message: " + p1.getReturn().message().get(0).getData(),
p1.getReturn().message().get(0).getData().contains("No such file or directory"));
} finally {
if (null != note) {
TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous);
}
// reset SPARK_HOME, otherwise it will cause the following test fail
InterpreterSetting sparkIntpSetting = TestUtils.getInstance(Notebook.class).getInterpreterSettingManager()
.getInterpreterSettingByName("spark");
Map<String, InterpreterProperty> sparkProperties =
(Map<String, InterpreterProperty>) sparkIntpSetting.getProperties();
sparkProperties.put("SPARK_HOME", new InterpreterProperty("SPARK_HOME", sparkHome));
sparkIntpSetting.close();
}
}
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@ -631,13 +632,15 @@ public class RemoteInterpreterServer extends Thread
int lastMessageIndex = resultMessages.size() - 1;
if (resultMessages.get(lastMessageIndex).getType() == InterpreterResult.Type.TABLE) {
context.getResourcePool().put(
context.getNoteId(),
context.getParagraphId(),
WellKnownResourceName.ZeppelinTableResult.toString(),
resultMessages.get(lastMessageIndex));
context.getNoteId(),
context.getParagraphId(),
WellKnownResourceName.ZeppelinTableResult.toString(),
resultMessages.get(lastMessageIndex));
}
}
return new InterpreterResult(result.code(), resultMessages);
} catch (Throwable e) {
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
} finally {
Thread.currentThread().setContextClassLoader(currentThreadContextClassloader);
InterpreterContext.remove();

View file

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.util;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.LogOutputStream;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
/**
* Abstract class for launching java process.
*/
public abstract class ProcessLauncher implements ExecuteResultHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessLauncher.class);
public enum State {
NEW,
LAUNCHED,
RUNNING,
TERMINATED
}
private CommandLine commandLine;
private Map<String, String> envs;
private ExecuteWatchdog watchdog;
private ProcessLogOutputStream processOutput;
protected String errorMessage = null;
protected State state = State.NEW;
private boolean launchTimeout = false;
public ProcessLauncher(CommandLine commandLine,
Map<String, String> envs) {
this.commandLine = commandLine;
this.envs = envs;
}
public void launch() {
DefaultExecutor executor = new DefaultExecutor();
this.processOutput = new ProcessLogOutputStream();
executor.setStreamHandler(new PumpStreamHandler(processOutput));
this.watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
try {
executor.execute(commandLine, envs, this);
state = State.LAUNCHED;
LOGGER.info("Process is launched: {}", commandLine);
} catch (IOException e) {
this.processOutput.stopCatchLaunchOutput();
LOGGER.error("Fail to launch process: " + commandLine, e);
state = State.TERMINATED;
errorMessage = e.getMessage();
}
}
public abstract void waitForReady(int timeout);
public void onTimeout() {
LOGGER.warn("Process launch is time out.");
launchTimeout = true;
stop();
}
public void onProcessRunning() {
LOGGER.info("Process is running");
state = State.RUNNING;
}
@Override
public void onProcessComplete(int exitValue) {
LOGGER.warn("Process is exited with exit value " + exitValue);
state = State.TERMINATED;
}
@Override
public void onProcessFailed(ExecuteException e) {
LOGGER.warn("Process is failed due to " + e);
errorMessage = ExceptionUtils.getStackTrace(e);
state = State.TERMINATED;
}
public String getErrorMessage() {
if (!StringUtils.isBlank(processOutput.getProcessExecutionOutput())) {
return processOutput.getProcessExecutionOutput();
} else {
return this.errorMessage;
}
}
public boolean isLaunchTimeout() {
return launchTimeout;
}
public boolean isRunning() {
return this.state == State.RUNNING;
}
public void stop() {
if (watchdog != null) {
watchdog.destroyProcess();
watchdog = null;
}
}
public void stopCatchLaunchOutput() {
processOutput.stopCatchLaunchOutput();
}
class ProcessLogOutputStream extends LogOutputStream {
private boolean catchLaunchOutput = true;
private StringBuilder launchOutput = new StringBuilder();
public void stopCatchLaunchOutput() {
this.catchLaunchOutput = false;
}
public String getProcessExecutionOutput() {
return launchOutput.toString();
}
@Override
protected void processLine(String s, int i) {
LOGGER.debug("Process Output: " + s);
if (catchLaunchOutput) {
launchOutput.append(s + "\n");
}
}
}
}

View file

@ -379,4 +379,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
started.notify();
}
}
@Override
public String getErrorMessage() {
return null;
}
}

View file

@ -161,6 +161,10 @@ public class RemoteInterpreter extends Interpreter {
synchronized (this) {
if (!isCreated) {
this.interpreterProcess = getOrCreateInterpreterProcess();
if (!interpreterProcess.isRunning()) {
throw new IOException("Interpreter process is not running:\n" +
interpreterProcess.getErrorMessage());
}
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
@ -213,6 +217,10 @@ public class RemoteInterpreter extends Interpreter {
} catch (IOException e) {
throw new InterpreterException(e);
}
if (!interpreterProcess.isRunning()) {
throw new InterpreterException("Interpreter process is not running:\n" +
interpreterProcess.getErrorMessage());
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
@ -296,6 +304,7 @@ public class RemoteInterpreter extends Interpreter {
} catch (IOException e) {
throw new InterpreterException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
FormType type = interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<FormType>() {

View file

@ -19,38 +19,28 @@ package org.apache.zeppelin.interpreter.remote;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.LogOutputStream;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.util.ProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class manages start / stop of remote interpreter process
*/
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
implements ExecuteResultHandler {
private static final Logger logger = LoggerFactory.getLogger(
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
private static final Logger LOGGER = LoggerFactory.getLogger(
RemoteInterpreterManagedProcess.class);
private final String interpreterRunner;
private final int zeppelinServerRPCPort;
private final String zeppelinServerRPCHost;
private final String interpreterPortRange;
private DefaultExecutor executor;
private ExecuteWatchdog watchdog;
private AtomicBoolean running = new AtomicBoolean(false);
private InterpreterProcessLauncher interpreterProcessLauncher;
private String host = null;
private int port = -1;
private final String interpreterDir;
@ -119,49 +109,26 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
cmdLine.addArgument("-g", false);
cmdLine.addArgument(interpreterSettingName, false);
executor = new DefaultExecutor();
ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
processOutput.setOutputStream(cmdOut);
executor.setStreamHandler(new PumpStreamHandler(processOutput));
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
try {
Map procEnv = EnvironmentUtils.getProcEnvironment();
procEnv.putAll(env);
logger.info("Run interpreter process {}", cmdLine);
executor.execute(cmdLine, procEnv, this);
} catch (IOException e) {
running.set(false);
throw new RuntimeException(e);
Map procEnv = EnvironmentUtils.getProcEnvironment();
procEnv.putAll(env);
interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, procEnv);
interpreterProcessLauncher.launch();
interpreterProcessLauncher.waitForReady(getConnectTimeout());
if (interpreterProcessLauncher.isLaunchTimeout()) {
throw new IOException(String.format("Interpreter Process creation is time out in %d seconds",
getConnectTimeout()/1000) + "\n" + "You can increase timeout threshold via " +
"setting zeppelin.interpreter.connect.timeout of this interpreter.\n" +
interpreterProcessLauncher.getErrorMessage());
}
try {
synchronized (running) {
if (!running.get()) {
running.wait(getConnectTimeout());
}
}
if (!running.get()) {
throw new IOException(new String(
String.format("Interpreter Process creation is time out in %d seconds",
getConnectTimeout()/1000) + "\n" + "You can increase timeout threshold via " +
"setting zeppelin.interpreter.connect.timeout of this interpreter.\n" +
cmdOut.toString()));
}
} catch (InterruptedException e) {
logger.error("Remote interpreter is not accessible");
if (!interpreterProcessLauncher.isRunning()) {
throw new IOException("Fail to launch interpreter process:\n" +
interpreterProcessLauncher.getErrorMessage());
}
processOutput.setOutputStream(null);
}
public void stop() {
if (isRunning()) {
logger.info("Kill interpreter process");
LOGGER.info("Kill interpreter process");
try {
callRemoteFunction(new RemoteFunction<Void>() {
@Override
@ -171,38 +138,20 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
}
});
} catch (Exception e) {
logger.warn("ignore the exception when shutting down");
LOGGER.warn("ignore the exception when shutting down", e);
}
watchdog.destroyProcess();
this.interpreterProcessLauncher.stop();
}
executor = null;
watchdog = null;
running.set(false);
logger.info("Remote process terminated");
}
@Override
public void onProcessComplete(int exitValue) {
logger.info("Interpreter process exited {}", exitValue);
running.set(false);
interpreterProcessLauncher = null;
LOGGER.info("Remote process terminated");
}
@Override
public void processStarted(int port, String host) {
this.port = port;
this.host = host;
synchronized (running) {
running.set(true);
running.notify();
}
}
@Override
public void onProcessFailed(ExecuteException e) {
logger.info("Interpreter process failed {}", e);
running.set(false);
interpreterProcessLauncher.onProcessRunning();
}
@VisibleForTesting
@ -235,52 +184,64 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
}
public boolean isRunning() {
return running.get();
return interpreterProcessLauncher != null && interpreterProcessLauncher.isRunning();
}
private static class ProcessLogOutputStream extends LogOutputStream {
@Override
public String getErrorMessage() {
return this.interpreterProcessLauncher != null ? this.interpreterProcessLauncher.getErrorMessage() : "";
}
private Logger logger;
OutputStream out;
private class InterpreterProcessLauncher extends ProcessLauncher {
public ProcessLogOutputStream(Logger logger) {
this.logger = logger;
public InterpreterProcessLauncher(CommandLine commandLine,
Map<String, String> envs) {
super(commandLine, envs);
}
@Override
protected void processLine(String s, int i) {
this.logger.debug(s);
}
@Override
public void write(byte [] b) throws IOException {
super.write(b);
if (out != null) {
synchronized (this) {
if (out != null) {
out.write(b);
}
}
}
}
@Override
public void write(byte [] b, int offset, int len) throws IOException {
super.write(b, offset, len);
if (out != null) {
synchronized (this) {
if (out != null) {
out.write(b, offset, len);
}
}
}
}
public void setOutputStream(OutputStream out) {
public void waitForReady(int timeout) {
synchronized (this) {
this.out = out;
if (state != State.RUNNING) {
try {
wait(timeout);
} catch (InterruptedException e) {
LOGGER.error("Remote interpreter is not accessible", e);
}
}
}
this.stopCatchLaunchOutput();
if (state == State.LAUNCHED) {
onTimeout();
}
}
@Override
public void onProcessRunning() {
super.onProcessRunning();
synchronized(this) {
notify();
}
}
@Override
public void onProcessComplete(int exitValue) {
LOGGER.warn("Process is exited with exit value " + exitValue);
// For yarn-cluster mode, client process will exit with exit value 0
// after submitting spark app. So don't move to TERMINATED state when exitValue is 0.
if (exitValue != 0) {
state = State.TERMINATED;
synchronized (this) {
notify();
}
}
}
@Override
public void onProcessFailed(ExecuteException e) {
super.onProcessFailed(e);
synchronized (this) {
notify();
}
}
}

View file

@ -142,4 +142,6 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
* called by RemoteInterpreterEventServer to notify that RemoteInterpreter Process is started
*/
public abstract void processStarted(int port, String host);
public abstract String getErrorMessage();
}

View file

@ -93,4 +93,9 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
public void processStarted(int port, String host) {
// assume process is externally managed. nothing to do
}
@Override
public String getErrorMessage() {
return null;
}
}

View file

@ -33,6 +33,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.common.JsonSerializable;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
@ -385,105 +386,109 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
@Override
protected InterpreterResult jobRun() throws Throwable {
this.runtimeInfos.clear();
this.interpreter = getBindedInterpreter();
if (this.interpreter == null) {
LOGGER.error("Can not find interpreter name " + intpText);
throw new RuntimeException("Can not find interpreter for " + intpText);
}
LOGGER.info("Run paragraph [paragraph_id: {}, interpreter: {}, note_id: {}, user: {}]",
getId(), this.interpreter.getClassName(), note.getId(), subject.getUser());
InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup)
interpreter.getInterpreterGroup()).getInterpreterSetting();
if (interpreterSetting != null) {
interpreterSetting.waitForReady();
}
if (this.user != null) {
if (subject != null && !interpreterSetting.isUserAuthorized(subject.getUsersAndRoles())) {
String msg = String.format("%s has no permission for %s", subject.getUser(), intpText);
LOGGER.error(msg);
return new InterpreterResult(Code.ERROR, msg);
try {
this.runtimeInfos.clear();
this.interpreter = getBindedInterpreter();
if (this.interpreter == null) {
LOGGER.error("Can not find interpreter name " + intpText);
throw new RuntimeException("Can not find interpreter for " + intpText);
}
LOGGER.info("Run paragraph [paragraph_id: {}, interpreter: {}, note_id: {}, user: {}]",
getId(), this.interpreter.getClassName(), note.getId(), subject.getUser());
InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup)
interpreter.getInterpreterGroup()).getInterpreterSetting();
if (interpreterSetting != null) {
interpreterSetting.waitForReady();
}
if (this.user != null) {
if (subject != null && !interpreterSetting.isUserAuthorized(subject.getUsersAndRoles())) {
String msg = String.format("%s has no permission for %s", subject.getUser(), intpText);
LOGGER.error(msg);
return new InterpreterResult(Code.ERROR, msg);
}
}
}
for (Paragraph p : userParagraphMap.values()) {
p.setText(getText());
}
for (Paragraph p : userParagraphMap.values()) {
p.setText(getText());
}
// inject form
String script = this.scriptText;
if (interpreter.getFormType() == FormType.NATIVE) {
settings.clear();
} else if (interpreter.getFormType() == FormType.SIMPLE) {
// inputs will be built from script body
LinkedHashMap<String, Input> inputs = Input.extractSimpleQueryForm(script, false);
LinkedHashMap<String, Input> noteInputs = Input.extractSimpleQueryForm(script, true);
final AngularObjectRegistry angularRegistry =
interpreter.getInterpreterGroup().getAngularObjectRegistry();
String scriptBody = extractVariablesFromAngularRegistry(script, inputs, angularRegistry);
// inject form
String script = this.scriptText;
if (interpreter.getFormType() == FormType.NATIVE) {
settings.clear();
} else if (interpreter.getFormType() == FormType.SIMPLE) {
// inputs will be built from script body
LinkedHashMap<String, Input> inputs = Input.extractSimpleQueryForm(script, false);
LinkedHashMap<String, Input> noteInputs = Input.extractSimpleQueryForm(script, true);
final AngularObjectRegistry angularRegistry =
interpreter.getInterpreterGroup().getAngularObjectRegistry();
String scriptBody = extractVariablesFromAngularRegistry(script, inputs, angularRegistry);
settings.setForms(inputs);
if (!noteInputs.isEmpty()) {
if (!note.getNoteForms().isEmpty()) {
Map<String, Input> currentNoteForms = note.getNoteForms();
for (String s : noteInputs.keySet()) {
if (!currentNoteForms.containsKey(s)) {
currentNoteForms.put(s, noteInputs.get(s));
settings.setForms(inputs);
if (!noteInputs.isEmpty()) {
if (!note.getNoteForms().isEmpty()) {
Map<String, Input> currentNoteForms = note.getNoteForms();
for (String s : noteInputs.keySet()) {
if (!currentNoteForms.containsKey(s)) {
currentNoteForms.put(s, noteInputs.get(s));
}
}
} else {
note.setNoteForms(noteInputs);
}
}
script = Input.getSimpleQuery(note.getNoteParams(), scriptBody, true);
script = Input.getSimpleQuery(settings.getParams(), script, false);
}
LOGGER.debug("RUN : " + script);
try {
InterpreterContext context = getInterpreterContext();
InterpreterContext.set(context);
InterpreterResult ret = interpreter.interpret(script, context);
if (interpreter.getFormType() == FormType.NATIVE) {
note.setNoteParams(context.getNoteGui().getParams());
note.setNoteForms(context.getNoteGui().getForms());
}
if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
return getReturn();
}
context.out.flush();
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
resultMessages.addAll(ret.message());
InterpreterResult res = new InterpreterResult(ret.code(), resultMessages);
Paragraph p = getUserParagraph(getUser());
if (null != p) {
p.setResult(res);
p.settings.setParams(settings.getParams());
}
// After the paragraph is executed,
// need to apply the paragraph to the configuration in the
// `interpreter-setting.json` config
if (this.configSettingNeedUpdate) {
this.configSettingNeedUpdate = false;
InterpreterSettingManager intpSettingManager
= this.note.getInterpreterSettingManager();
if (null != intpSettingManager) {
InterpreterGroup intpGroup = interpreter.getInterpreterGroup();
if (null != intpGroup && intpGroup instanceof ManagedInterpreterGroup) {
String name = ((ManagedInterpreterGroup) intpGroup).getInterpreterSetting().getName();
Map<String, Object> config
= intpSettingManager.getConfigSetting(name);
applyConfigSetting(config);
}
}
} else {
note.setNoteForms(noteInputs);
}
}
script = Input.getSimpleQuery(note.getNoteParams(), scriptBody, true);
script = Input.getSimpleQuery(settings.getParams(), script, false);
}
LOGGER.debug("RUN : " + script);
try {
InterpreterContext context = getInterpreterContext();
InterpreterContext.set(context);
InterpreterResult ret = interpreter.interpret(script, context);
if (interpreter.getFormType() == FormType.NATIVE) {
note.setNoteParams(context.getNoteGui().getParams());
note.setNoteForms(context.getNoteGui().getForms());
return res;
} finally {
InterpreterContext.remove();
}
if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
return getReturn();
}
context.out.flush();
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
resultMessages.addAll(ret.message());
InterpreterResult res = new InterpreterResult(ret.code(), resultMessages);
Paragraph p = getUserParagraph(getUser());
if (null != p) {
p.setResult(res);
p.settings.setParams(settings.getParams());
}
// After the paragraph is executed,
// need to apply the paragraph to the configuration in the
// `interpreter-setting.json` config
if (this.configSettingNeedUpdate) {
this.configSettingNeedUpdate = false;
InterpreterSettingManager intpSettingManager
= this.note.getInterpreterSettingManager();
if (null != intpSettingManager) {
InterpreterGroup intpGroup = interpreter.getInterpreterGroup();
if (null != intpGroup && intpGroup instanceof ManagedInterpreterGroup) {
String name = ((ManagedInterpreterGroup) intpGroup).getInterpreterSetting().getName();
Map<String, Object> config
= intpSettingManager.getConfigSetting(name);
applyConfigSetting(config);
}
}
}
return res;
} finally {
InterpreterContext.remove();
} catch (Exception e) {
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
}
}

View file

@ -17,28 +17,38 @@
package org.apache.zeppelin.interpreter.remote;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.display.ui.OptionInput;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.remote.mock.GetAngularObjectSizeInterpreter;
import org.apache.zeppelin.interpreter.remote.mock.GetEnvPropertyInterpreter;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.*;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class RemoteInterpreterTest extends AbstractInterpreterTest {
@ -50,6 +60,11 @@ public class RemoteInterpreterTest extends AbstractInterpreterTest {
interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
}
@Override
public void tearDown() throws Exception {
super.tearDown();
}
@Test
public void testSharedMode() throws InterpreterException, IOException {
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
@ -389,4 +404,64 @@ public class RemoteInterpreterTest extends AbstractInterpreterTest {
assertArrayEquals(expected.values().toArray(), gui.getForms().values().toArray());
}
@Test
public void testFailToLaunchInterpreterProcess_InvalidRunner() {
try {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(), "invalid_runner");
final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
final InterpreterContext context1 = createDummyInterpreterContext();
// run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
// time overhead of launching the process.
try {
interpreter1.interpret("1", context1);
fail("Should not be able to launch interpreter process");
} catch (InterpreterException e) {
assertTrue(ExceptionUtils.getStackTrace(e).contains("No such file or directory"));
}
} finally {
System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName());
}
}
@Test
public void testFailToLaunchInterpreterProcess_ErrorInRunner() {
try {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(),
zeppelinHome.getAbsolutePath() + "/zeppelin-zengine/src/test/resources/bin/interpreter_invalid.sh");
final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
final InterpreterContext context1 = createDummyInterpreterContext();
// run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
// time overhead of launching the process.
try {
interpreter1.interpret("1", context1);
fail("Should not be able to launch interpreter process");
} catch (InterpreterException e) {
assertTrue(ExceptionUtils.getStackTrace(e).contains("invalid_command: command not found"));
}
} finally {
System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName());
}
}
@Test
public void testFailToLaunchInterpreterProcess_Timeout() {
try {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(),
zeppelinHome.getAbsolutePath() + "/zeppelin-zengine/src/test/resources/bin/interpreter_timeout.sh");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
final InterpreterContext context1 = createDummyInterpreterContext();
// run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
// time overhead of launching the process.
try {
interpreter1.interpret("1", context1);
fail("Should not be able to launch interpreter process");
} catch (InterpreterException e) {
assertTrue(ExceptionUtils.getStackTrace(e).contains("Interpreter Process creation is time out"));
}
} finally {
System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName());
System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName());
}
}
}

View file

@ -0,0 +1,19 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
invalid_command

View file

@ -0,0 +1,19 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
sleep 100