mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
fix
This commit is contained in:
parent
540bfa8b05
commit
30078ac751
1 changed files with 10 additions and 24 deletions
|
|
@ -28,10 +28,10 @@ import org.apache.commons.exec.CommandLine;
|
|||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
|
|
@ -45,33 +45,20 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class ShellInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
private static final String WATCHDOG_KEY = "watchdog";
|
||||
int commandTimeOut;
|
||||
private static final String EXECUTOR_KEY = "executor";
|
||||
int commandTimeOut = 60000;
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"sh",
|
||||
"sh",
|
||||
ShellInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add("shell.commandtimeout", "60000", "Timeout period for shell command").build());
|
||||
ShellInterpreter.class.getName());
|
||||
}
|
||||
|
||||
public ShellInterpreter(Properties property) {
|
||||
super(property);
|
||||
commandTimeOut = parseIntProperty("shell.commandtimeout", 60000);
|
||||
}
|
||||
|
||||
private int parseIntProperty(String prop, int defaultvalue) {
|
||||
String propValue = getProperty(prop);
|
||||
try {
|
||||
defaultvalue = Integer.parseInt(propValue);
|
||||
} catch (Exception e) {
|
||||
logger.info("Unable to parse property" + prop + ". Using defaults:" + defaultvalue);
|
||||
}
|
||||
return defaultvalue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {}
|
||||
|
||||
|
|
@ -89,13 +76,11 @@ public class ShellInterpreter extends Interpreter {
|
|||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
|
||||
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
|
||||
|
||||
ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(commandTimeOut);
|
||||
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
|
||||
Map<String, Object> info = runningJob.info();
|
||||
info.put(WATCHDOG_KEY, executeWatchdog);
|
||||
|
||||
executor.setWatchdog(executeWatchdog);
|
||||
info.put(EXECUTOR_KEY, executor);
|
||||
try {
|
||||
executor.execute(cmdLine);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
|
||||
|
|
@ -106,7 +91,7 @@ public class ShellInterpreter extends Interpreter {
|
|||
String msg = errorStream.toString();
|
||||
if (exitValue == 143) {
|
||||
code = Code.INCOMPLETE;
|
||||
msg = "Paragraph received a SIGTERM";
|
||||
msg = msg + "Paragraph received a SIGTERM";
|
||||
}
|
||||
return new InterpreterResult(code, msg);
|
||||
} catch (IOException e) {
|
||||
|
|
@ -131,9 +116,10 @@ public class ShellInterpreter extends Interpreter {
|
|||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
if (runningJob != null) {
|
||||
Map<String, Object> info = runningJob.info();
|
||||
Object object = info.get(WATCHDOG_KEY);
|
||||
Object object = info.get(EXECUTOR_KEY);
|
||||
if (object != null) {
|
||||
ExecuteWatchdog watchdog = (ExecuteWatchdog) object;
|
||||
Executor executor = (Executor) object;
|
||||
ExecuteWatchdog watchdog = executor.getWatchdog();
|
||||
watchdog.destroyProcess();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue