ZEPPELIN-4081. when the python process is killed,the task state is still running

This commit is contained in:
Jeff Zhang 2019-03-21 11:18:02 +08:00
parent 4219d55234
commit 907faacf60
10 changed files with 141 additions and 21 deletions

View file

@ -260,6 +260,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
(Apache 2.0) Nimbus JOSE+JWT (https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home)
(Apache 2.0) jarchivelib (https://github.com/thrau/jarchivelib)
(Apache 2.0) Google Cloud Client Library for Java (https://github.com/GoogleCloudPlatform/google-cloud-java)
(Apache 2.0) concurrentunit (https://github.com/jhalterman/concurrentunit)
========================================================================
BSD 3-Clause licenses

View file

@ -87,6 +87,14 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>
<version>0.4.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View file

@ -53,6 +53,7 @@ public class IPythonClient {
private final ManagedChannel channel;
private final IPythonGrpc.IPythonBlockingStub blockingStub;
private final IPythonGrpc.IPythonStub asyncStub;
private volatile boolean maybeIPythonFailed = false;
private SecureRandom random = new SecureRandom();
@ -83,6 +84,7 @@ public class IPythonClient {
final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
.setStatus(ExecuteStatus.SUCCESS);
final AtomicBoolean completedFlag = new AtomicBoolean(false);
maybeIPythonFailed = false;
LOGGER.debug("stream_execute code:\n" + request.getCode());
asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
int index = 0;
@ -137,7 +139,7 @@ public class IPythonClient {
}
LOGGER.error("Fail to call IPython grpc", throwable);
finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
maybeIPythonFailed = true;
completedFlag.set(true);
synchronized (completedFlag) {
completedFlag.notify();
@ -204,6 +206,9 @@ public class IPythonClient {
asyncStub.stop(request, null);
}
public boolean isMaybeIPythonFailed() {
return maybeIPythonFailed;
}
public static void main(String[] args) {
IPythonClient client = new IPythonClient("localhost", 50053);

View file

@ -82,7 +82,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
private boolean useBuiltinPy4j = true;
private boolean usePy4JAuth = true;
private String secret;
private volatile boolean pythonProcessFailed = false;
private volatile boolean pythonProcessRunning = false;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
@ -294,7 +294,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
// wait until IPython kernel is started or timeout
long startTime = System.currentTimeMillis();
while (!pythonProcessFailed) {
while (!pythonProcessRunning) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
@ -305,6 +305,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
if (response.getStatus() == IPythonStatus.RUNNING) {
LOGGER.info("IPython Kernel is Running");
pythonProcessRunning = true;
break;
} else {
LOGGER.info("Wait for IPython Kernel to be started");
@ -319,7 +320,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
+ " seconds");
}
}
if (pythonProcessFailed) {
if (!pythonProcessRunning) {
throw new IOException("Fail to launch IPython Kernel as the python process is failed");
}
}
@ -355,23 +356,44 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
}
public ExecuteWatchdog getWatchDog() {
return watchDog;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
zeppelinContext.setGui(context.getGui());
zeppelinContext.setNoteGui(context.getNoteGui());
zeppelinContext.setInterpreterContext(context);
interpreterOutput.setInterpreterOutput(context.out);
ExecuteResponse response =
ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
interpreterOutput);
try {
ExecuteResponse response =
ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
interpreterOutput);
interpreterOutput.getInterpreterOutput().flush();
} catch (IOException e) {
throw new RuntimeException("Fail to write output", e);
// It is not known which method is called first (ipythonClient.stream_execute
// or onProcessFailed) when ipython kernel process is exited. Because they are in
// 2 different threads. So here we would check ipythonClient's status and sleep 1 second
// if ipython kernel is maybe terminated.
if (pythonProcessRunning && !ipythonClient.isMaybeIPythonFailed()) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
if (ipythonClient.isMaybeIPythonFailed()) {
Thread.sleep(1000);
}
if (pythonProcessRunning) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"IPython kernel is abnormally exited, please check your code and log.");
}
}
} catch (Exception e) {
throw new InterpreterException("Fail to interpret python code", e);
}
InterpreterResult result = new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
return result;
}
@Override
@ -416,12 +438,13 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
@Override
public void onProcessComplete(int exitValue) {
LOGGER.warn("Python Process is completed with exitValue: " + exitValue);
pythonProcessRunning = false;
}
@Override
public void onProcessFailed(ExecuteException e) {
LOGGER.warn("Exception happens in Python Process", e);
pythonProcessFailed = true;
pythonProcessRunning = false;
}
static class ProcessLogOutputStream extends LogOutputStream {

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.python;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import com.google.gson.Gson;
import org.apache.commons.exec.CommandLine;
@ -160,7 +161,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
pythonScriptRunning.set(true);
}
@VisibleForTesting
public DefaultExecutor getPythonExecutor() {
return this.executor;
}
private void createPythonScript() throws IOException {
// set java.io.tmpdir to /tmp on MacOS, because docker can not share the /var folder which will
@ -348,7 +352,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
synchronized (statementFinishedNotifier) {
while (statementOutput == null) {
while (statementOutput == null && pythonScriptRunning.get()) {
try {
statementFinishedNotifier.wait(1000);
} catch (InterruptedException e) {
@ -374,7 +378,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
synchronized (pythonScriptInitialized) {
long startTime = System.currentTimeMillis();
while (!pythonScriptInitialized.get()
while (!pythonScriptInitialized.get() && pythonScriptRunning.get()
&& System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
try {
LOGGER.info("Wait for PythonScript initialized");
@ -417,7 +421,12 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
} catch (IOException e) {
throw new InterpreterException(e);
}
return new InterpreterResult(Code.SUCCESS);
if (pythonScriptRunning.get()) {
return new InterpreterResult(Code.SUCCESS);
} else {
return new InterpreterResult(Code.ERROR,
"Python process is abnormally exited, please check your code and log.");
}
}
}
@ -590,6 +599,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
LOGGER.info("python process terminated. exit code " + exitValue);
pythonScriptRunning.set(false);
pythonScriptInitialized.set(false);
synchronized (statementFinishedNotifier) {
statementFinishedNotifier.notify();
}
}
@Override
@ -597,6 +609,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
LOGGER.error("python process failed", e);
pythonScriptRunning.set(false);
pythonScriptInitialized.set(false);
synchronized (statementFinishedNotifier) {
statementFinishedNotifier.notify();
}
}
// Called by Python Process, used for debugging purpose

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.python;
import net.jodah.concurrentunit.ConcurrentTestCase;
import org.apache.zeppelin.display.ui.CheckBox;
import org.apache.zeppelin.display.ui.Password;
import org.apache.zeppelin.display.ui.Select;
@ -41,7 +42,7 @@ import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
public abstract class BasePythonInterpreterTest {
public abstract class BasePythonInterpreterTest extends ConcurrentTestCase {
protected InterpreterGroup intpGroup;
protected Interpreter interpreter;

View file

@ -17,6 +17,8 @@
package org.apache.zeppelin.python;
import net.jodah.concurrentunit.Waiter;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@ -30,6 +32,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
@ -235,4 +238,30 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
@Test
public void testIPythonProcessKilled() throws InterruptedException, TimeoutException {
final Waiter waiter = new Waiter();
Thread thread = new Thread() {
@Override
public void run() {
try {
InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
getInterpreterContext());
waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
waiter.assertEquals(
"IPython kernel is abnormally exited, please check your code and log.",
result.message().get(0).getData());
} catch (InterpreterException e) {
waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
}
waiter.resume();
}
};
thread.start();
Thread.sleep(3000);
IPythonInterpreter iPythonInterpreter = (IPythonInterpreter)
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
iPythonInterpreter.getWatchDog().destroyProcess();
waiter.await(3000);
}
}

View file

@ -17,6 +17,8 @@
package org.apache.zeppelin.python;
import net.jodah.concurrentunit.Waiter;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@ -28,6 +30,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -38,7 +41,7 @@ import static org.junit.Assert.assertTrue;
public class PythonInterpreterTest extends BasePythonInterpreterTest {
@Override
public void setUp() throws InterpreterException {
@ -50,6 +53,7 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
interpreter = new LazyOpenInterpreter(new PythonInterpreter(properties));
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(interpreter);
interpreter.setInterpreterGroup(intpGroup);
@ -105,4 +109,31 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
t.join(2000);
assertFalse(t.isAlive());
}
@Test
public void testPythonProcessKilled() throws InterruptedException, TimeoutException {
final Waiter waiter = new Waiter();
Thread thread = new Thread() {
@Override
public void run() {
try {
InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
getInterpreterContext());
waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
waiter.assertEquals(
"Python process is abnormally exited, please check your code and log.",
result.message().get(0).getData());
} catch (InterpreterException e) {
waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
}
waiter.resume();
}
};
thread.start();
Thread.sleep(3000);
PythonInterpreter pythonInterpreter = (PythonInterpreter)
((LazyOpenInterpreter) interpreter).getInnerInterpreter();
pythonInterpreter.getPythonExecutor().getWatchdog().destroyProcess();
waiter.await(3000);
}
}

View file

@ -379,6 +379,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>
<version>0.4.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View file

@ -83,7 +83,8 @@ public class IPySparkInterpreter extends IPythonInterpreter {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
InterpreterContext.set(context);
String jobGroupId = Utils.buildJobGroupId(context);
String jobDesc = Utils.buildJobDesc(context);