mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-4081. when the python process is killed,the task state is still running
This commit is contained in:
parent
4219d55234
commit
907faacf60
10 changed files with 141 additions and 21 deletions
1
LICENSE
1
LICENSE
|
|
@ -260,6 +260,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
|
|||
(Apache 2.0) Nimbus JOSE+JWT (https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home)
|
||||
(Apache 2.0) jarchivelib (https://github.com/thrau/jarchivelib)
|
||||
(Apache 2.0) Google Cloud Client Library for Java (https://github.com/GoogleCloudPlatform/google-cloud-java)
|
||||
(Apache 2.0) concurrentunit (https://github.com/jhalterman/concurrentunit)
|
||||
|
||||
========================================================================
|
||||
BSD 3-Clause licenses
|
||||
|
|
|
|||
|
|
@ -87,6 +87,14 @@
|
|||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.jodah</groupId>
|
||||
<artifactId>concurrentunit</artifactId>
|
||||
<version>0.4.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ public class IPythonClient {
|
|||
private final ManagedChannel channel;
|
||||
private final IPythonGrpc.IPythonBlockingStub blockingStub;
|
||||
private final IPythonGrpc.IPythonStub asyncStub;
|
||||
private volatile boolean maybeIPythonFailed = false;
|
||||
|
||||
private SecureRandom random = new SecureRandom();
|
||||
|
||||
|
|
@ -83,6 +84,7 @@ public class IPythonClient {
|
|||
final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
|
||||
.setStatus(ExecuteStatus.SUCCESS);
|
||||
final AtomicBoolean completedFlag = new AtomicBoolean(false);
|
||||
maybeIPythonFailed = false;
|
||||
LOGGER.debug("stream_execute code:\n" + request.getCode());
|
||||
asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
|
||||
int index = 0;
|
||||
|
|
@ -137,7 +139,7 @@ public class IPythonClient {
|
|||
}
|
||||
LOGGER.error("Fail to call IPython grpc", throwable);
|
||||
finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
|
||||
|
||||
maybeIPythonFailed = true;
|
||||
completedFlag.set(true);
|
||||
synchronized (completedFlag) {
|
||||
completedFlag.notify();
|
||||
|
|
@ -204,6 +206,9 @@ public class IPythonClient {
|
|||
asyncStub.stop(request, null);
|
||||
}
|
||||
|
||||
public boolean isMaybeIPythonFailed() {
|
||||
return maybeIPythonFailed;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
IPythonClient client = new IPythonClient("localhost", 50053);
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
private boolean useBuiltinPy4j = true;
|
||||
private boolean usePy4JAuth = true;
|
||||
private String secret;
|
||||
private volatile boolean pythonProcessFailed = false;
|
||||
private volatile boolean pythonProcessRunning = false;
|
||||
|
||||
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
|
||||
|
||||
|
|
@ -294,7 +294,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
|
||||
// wait until IPython kernel is started or timeout
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (!pythonProcessFailed) {
|
||||
while (!pythonProcessRunning) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
@ -305,6 +305,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
|
||||
if (response.getStatus() == IPythonStatus.RUNNING) {
|
||||
LOGGER.info("IPython Kernel is Running");
|
||||
pythonProcessRunning = true;
|
||||
break;
|
||||
} else {
|
||||
LOGGER.info("Wait for IPython Kernel to be started");
|
||||
|
|
@ -319,7 +320,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
+ " seconds");
|
||||
}
|
||||
}
|
||||
if (pythonProcessFailed) {
|
||||
if (!pythonProcessRunning) {
|
||||
throw new IOException("Fail to launch IPython Kernel as the python process is failed");
|
||||
}
|
||||
}
|
||||
|
|
@ -355,23 +356,44 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
}
|
||||
}
|
||||
|
||||
public ExecuteWatchdog getWatchDog() {
|
||||
return watchDog;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
public InterpreterResult interpret(String st,
|
||||
InterpreterContext context) throws InterpreterException {
|
||||
zeppelinContext.setGui(context.getGui());
|
||||
zeppelinContext.setNoteGui(context.getNoteGui());
|
||||
zeppelinContext.setInterpreterContext(context);
|
||||
interpreterOutput.setInterpreterOutput(context.out);
|
||||
ExecuteResponse response =
|
||||
ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
|
||||
interpreterOutput);
|
||||
try {
|
||||
ExecuteResponse response =
|
||||
ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
|
||||
interpreterOutput);
|
||||
interpreterOutput.getInterpreterOutput().flush();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Fail to write output", e);
|
||||
// It is not known which method is called first (ipythonClient.stream_execute
|
||||
// or onProcessFailed) when ipython kernel process is exited. Because they are in
|
||||
// 2 different threads. So here we would check ipythonClient's status and sleep 1 second
|
||||
// if ipython kernel is maybe terminated.
|
||||
if (pythonProcessRunning && !ipythonClient.isMaybeIPythonFailed()) {
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.valueOf(response.getStatus().name()));
|
||||
} else {
|
||||
if (ipythonClient.isMaybeIPythonFailed()) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
if (pythonProcessRunning) {
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.valueOf(response.getStatus().name()));
|
||||
} else {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"IPython kernel is abnormally exited, please check your code and log.");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new InterpreterException("Fail to interpret python code", e);
|
||||
}
|
||||
InterpreterResult result = new InterpreterResult(
|
||||
InterpreterResult.Code.valueOf(response.getStatus().name()));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -416,12 +438,13 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
LOGGER.warn("Python Process is completed with exitValue: " + exitValue);
|
||||
pythonProcessRunning = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
LOGGER.warn("Exception happens in Python Process", e);
|
||||
pythonProcessFailed = true;
|
||||
pythonProcessRunning = false;
|
||||
}
|
||||
|
||||
static class ProcessLogOutputStream extends LogOutputStream {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
|
|
@ -160,7 +161,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
pythonScriptRunning.set(true);
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public DefaultExecutor getPythonExecutor() {
|
||||
return this.executor;
|
||||
}
|
||||
|
||||
private void createPythonScript() throws IOException {
|
||||
// set java.io.tmpdir to /tmp on MacOS, because docker can not share the /var folder which will
|
||||
|
|
@ -348,7 +352,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
}
|
||||
|
||||
synchronized (statementFinishedNotifier) {
|
||||
while (statementOutput == null) {
|
||||
while (statementOutput == null && pythonScriptRunning.get()) {
|
||||
try {
|
||||
statementFinishedNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
@ -374,7 +378,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
|
||||
synchronized (pythonScriptInitialized) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (!pythonScriptInitialized.get()
|
||||
while (!pythonScriptInitialized.get() && pythonScriptRunning.get()
|
||||
&& System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
|
||||
try {
|
||||
LOGGER.info("Wait for PythonScript initialized");
|
||||
|
|
@ -417,7 +421,12 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
if (pythonScriptRunning.get()) {
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
} else {
|
||||
return new InterpreterResult(Code.ERROR,
|
||||
"Python process is abnormally exited, please check your code and log.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -590,6 +599,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
LOGGER.info("python process terminated. exit code " + exitValue);
|
||||
pythonScriptRunning.set(false);
|
||||
pythonScriptInitialized.set(false);
|
||||
synchronized (statementFinishedNotifier) {
|
||||
statementFinishedNotifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -597,6 +609,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
LOGGER.error("python process failed", e);
|
||||
pythonScriptRunning.set(false);
|
||||
pythonScriptInitialized.set(false);
|
||||
synchronized (statementFinishedNotifier) {
|
||||
statementFinishedNotifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
// Called by Python Process, used for debugging purpose
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import net.jodah.concurrentunit.ConcurrentTestCase;
|
||||
import org.apache.zeppelin.display.ui.CheckBox;
|
||||
import org.apache.zeppelin.display.ui.Password;
|
||||
import org.apache.zeppelin.display.ui.Select;
|
||||
|
|
@ -41,7 +42,7 @@ import static junit.framework.TestCase.assertTrue;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public abstract class BasePythonInterpreterTest {
|
||||
public abstract class BasePythonInterpreterTest extends ConcurrentTestCase {
|
||||
|
||||
protected InterpreterGroup intpGroup;
|
||||
protected Interpreter interpreter;
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import net.jodah.concurrentunit.Waiter;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
|
|
@ -30,6 +32,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -235,4 +238,30 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIPythonProcessKilled() throws InterruptedException, TimeoutException {
|
||||
final Waiter waiter = new Waiter();
|
||||
Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
|
||||
getInterpreterContext());
|
||||
waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
waiter.assertEquals(
|
||||
"IPython kernel is abnormally exited, please check your code and log.",
|
||||
result.message().get(0).getData());
|
||||
} catch (InterpreterException e) {
|
||||
waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
waiter.resume();
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
Thread.sleep(3000);
|
||||
IPythonInterpreter iPythonInterpreter = (IPythonInterpreter)
|
||||
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
|
||||
iPythonInterpreter.getWatchDog().destroyProcess();
|
||||
waiter.await(3000);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import net.jodah.concurrentunit.Waiter;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
|
|
@ -28,6 +30,7 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
|
@ -38,7 +41,7 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
|
||||
public class PythonInterpreterTest extends BasePythonInterpreterTest {
|
||||
|
||||
|
||||
@Override
|
||||
public void setUp() throws InterpreterException {
|
||||
|
||||
|
|
@ -50,6 +53,7 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
|
||||
|
||||
interpreter = new LazyOpenInterpreter(new PythonInterpreter(properties));
|
||||
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup.get("note").add(interpreter);
|
||||
interpreter.setInterpreterGroup(intpGroup);
|
||||
|
|
@ -105,4 +109,31 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
t.join(2000);
|
||||
assertFalse(t.isAlive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPythonProcessKilled() throws InterruptedException, TimeoutException {
|
||||
final Waiter waiter = new Waiter();
|
||||
Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
|
||||
getInterpreterContext());
|
||||
waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
waiter.assertEquals(
|
||||
"Python process is abnormally exited, please check your code and log.",
|
||||
result.message().get(0).getData());
|
||||
} catch (InterpreterException e) {
|
||||
waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
waiter.resume();
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
Thread.sleep(3000);
|
||||
PythonInterpreter pythonInterpreter = (PythonInterpreter)
|
||||
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
|
||||
pythonInterpreter.getPythonExecutor().getWatchdog().destroyProcess();
|
||||
waiter.await(3000);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -379,6 +379,12 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.jodah</groupId>
|
||||
<artifactId>concurrentunit</artifactId>
|
||||
<version>0.4.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
|||
|
|
@ -83,7 +83,8 @@ public class IPySparkInterpreter extends IPythonInterpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
public InterpreterResult interpret(String st,
|
||||
InterpreterContext context) throws InterpreterException {
|
||||
InterpreterContext.set(context);
|
||||
String jobGroupId = Utils.buildJobGroupId(context);
|
||||
String jobDesc = Utils.buildJobDesc(context);
|
||||
|
|
|
|||
Loading…
Reference in a new issue