This commit is contained in:
karuppayya 2016-01-11 13:38:40 +05:30
parent 540bfa8b05
commit 30078ac751

View file

@ -28,10 +28,10 @@ import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.Executor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Job;
@ -45,33 +45,20 @@ import org.slf4j.LoggerFactory;
*/
public class ShellInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
private static final String WATCHDOG_KEY = "watchdog";
int commandTimeOut;
private static final String EXECUTOR_KEY = "executor";
int commandTimeOut = 60000;
static {
Interpreter.register(
"sh",
"sh",
ShellInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("shell.commandtimeout", "60000", "Timeout period for shell command").build());
ShellInterpreter.class.getName());
}
public ShellInterpreter(Properties property) {
super(property);
commandTimeOut = parseIntProperty("shell.commandtimeout", 60000);
}
private int parseIntProperty(String prop, int defaultvalue) {
String propValue = getProperty(prop);
try {
defaultvalue = Integer.parseInt(propValue);
} catch (Exception e) {
logger.info("Unable to parse property" + prop + ". Using defaults:" + defaultvalue);
}
return defaultvalue;
}
@Override
public void open() {}
@ -89,13 +76,11 @@ public class ShellInterpreter extends Interpreter {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(commandTimeOut);
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
Map<String, Object> info = runningJob.info();
info.put(WATCHDOG_KEY, executeWatchdog);
executor.setWatchdog(executeWatchdog);
info.put(EXECUTOR_KEY, executor);
try {
executor.execute(cmdLine);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
@ -106,7 +91,7 @@ public class ShellInterpreter extends Interpreter {
String msg = errorStream.toString();
if (exitValue == 143) {
code = Code.INCOMPLETE;
msg = "Paragraph received a SIGTERM";
msg = msg + "Paragraph received a SIGTERM";
}
return new InterpreterResult(code, msg);
} catch (IOException e) {
@ -131,9 +116,10 @@ public class ShellInterpreter extends Interpreter {
Job runningJob = getRunningJob(context.getParagraphId());
if (runningJob != null) {
Map<String, Object> info = runningJob.info();
Object object = info.get(WATCHDOG_KEY);
Object object = info.get(EXECUTOR_KEY);
if (object != null) {
ExecuteWatchdog watchdog = (ExecuteWatchdog) object;
Executor executor = (Executor) object;
ExecuteWatchdog watchdog = executor.getWatchdog();
watchdog.destroyProcess();
}
}