mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Add state COMPLETED & also add transition method for more detail logging
This commit is contained in:
parent
97049bd55e
commit
b847d6d861
2 changed files with 19 additions and 7 deletions
|
|
@ -43,7 +43,8 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
|
|||
NEW,
|
||||
LAUNCHED,
|
||||
RUNNING,
|
||||
TERMINATED
|
||||
TERMINATED,
|
||||
COMPLETED
|
||||
}
|
||||
|
||||
private CommandLine commandLine;
|
||||
|
|
@ -68,18 +69,23 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
|
|||
executor.setWatchdog(watchdog);
|
||||
try {
|
||||
executor.execute(commandLine, envs, this);
|
||||
state = State.LAUNCHED;
|
||||
transition(State.LAUNCHED);
|
||||
LOGGER.info("Process is launched: {}", commandLine);
|
||||
} catch (IOException e) {
|
||||
this.processOutput.stopCatchLaunchOutput();
|
||||
LOGGER.error("Fail to launch process: " + commandLine, e);
|
||||
state = State.TERMINATED;
|
||||
transition(State.TERMINATED);
|
||||
errorMessage = e.getMessage();
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void waitForReady(int timeout);
|
||||
|
||||
public void transition(State state) {
|
||||
this.state = state;
|
||||
LOGGER.info("Process state is transitioned to " + state);
|
||||
}
|
||||
|
||||
public void onTimeout() {
|
||||
LOGGER.warn("Process launch is time out.");
|
||||
launchTimeout = true;
|
||||
|
|
@ -88,20 +94,24 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
|
|||
|
||||
public void onProcessRunning() {
|
||||
LOGGER.info("Process is running");
|
||||
state = State.RUNNING;
|
||||
transition(State.RUNNING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
LOGGER.warn("Process is exited with exit value " + exitValue);
|
||||
state = State.TERMINATED;
|
||||
if (exitValue == 0) {
|
||||
transition(State.COMPLETED);
|
||||
} else {
|
||||
transition(State.TERMINATED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
LOGGER.warn("Process is failed due to " + e);
|
||||
errorMessage = ExceptionUtils.getStackTrace(e);
|
||||
state = State.TERMINATED;
|
||||
transition(State.TERMINATED);
|
||||
}
|
||||
|
||||
public String getErrorMessage() {
|
||||
|
|
|
|||
|
|
@ -230,10 +230,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
|
|||
// For yarn-cluster mode, client process will exit with exit value 0
|
||||
// after submitting spark app. So don't move to TERMINATED state when exitValue is 0.
|
||||
if (exitValue != 0) {
|
||||
state = State.TERMINATED;
|
||||
transition(State.TERMINATED);
|
||||
synchronized (this) {
|
||||
notify();
|
||||
}
|
||||
} else {
|
||||
transition(State.COMPLETED);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue