This commit is contained in:
Karup 2016-01-08 21:16:17 +05:30
parent 51812cb80c
commit 7d938bd86e
3 changed files with 70 additions and 13 deletions

View file

@ -19,7 +19,9 @@ package org.apache.zeppelin.shell;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.exec.CommandLine;
@ -29,8 +31,10 @@ import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@ -41,16 +45,33 @@ import org.slf4j.LoggerFactory;
*/
public class ShellInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
int commandTimeOut = 600000;
private static final String WATCHDOG_KEY = "watchdog";
int commandTimeOut;
static {
Interpreter.register("sh", ShellInterpreter.class.getName());
Interpreter.register(
"sh",
"sh",
ShellInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("shell.commandtimeout", "60000", "Timeout period for shell command").build());
}
public ShellInterpreter(Properties property) {
super(property);
commandTimeOut = parseIntProperty("shell.commandtimeout", 60000);
}
private int parseIntProperty(String prop, int defaultvalue) {
String propValue = getProperty(prop);
try {
defaultvalue = Integer.parseInt(propValue);
} catch (Exception e) {
logger.info("Unable to parse property" + prop + ". Using defaults:" + defaultvalue);
}
return defaultvalue;
}
@Override
public void open() {}
@ -60,31 +81,63 @@ public class ShellInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.debug("Run shell command '" + cmd + "'");
long start = System.currentTimeMillis();
logger.info("Run shell command '" + cmd + "'");
CommandLine cmdLine = CommandLine.parse("bash");
cmdLine.addArgument("-c", false);
cmdLine.addArgument(cmd, false);
DefaultExecutor executor = new DefaultExecutor();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
executor.setStreamHandler(new PumpStreamHandler(outputStream));
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(commandTimeOut);
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
Map<String, Object> info = runningJob.info();
info.put(WATCHDOG_KEY, executeWatchdog);
executor.setWatchdog(executeWatchdog);
try {
int exitValue = executor.execute(cmdLine);
executor.execute(cmdLine);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
} catch (ExecuteException e) {
int exitValue = e.getExitValue();
logger.error("Can not run " + cmd, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
Code code = Code.ERROR;
String msg = errorStream.toString();
if (exitValue == 143) {
code = Code.INCOMPLETE;
msg = "Paragraph received a SIGTERM";
}
return new InterpreterResult(code, msg);
} catch (IOException e) {
logger.error("Can not run " + cmd, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
}
@Override
public void cancel(InterpreterContext context) {}
private Job getRunningJob(String paragraphId) {
Job foundJob = null;
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
for (Job job : jobsRunning) {
if (job.getId().equals(paragraphId)) {
foundJob = job;
}
}
return foundJob;
}
@Override
public void cancel(InterpreterContext context) {
Job runningJob = getRunningJob(context.getParagraphId());
if (runningJob != null) {
Map<String, Object> info = runningJob.info();
Object object = info.get(WATCHDOG_KEY);
if (object != null) {
ExecuteWatchdog watchdog = (ExecuteWatchdog) object;
watchdog.destroyProcess();
}
}
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
@ -97,8 +150,8 @@ public class ShellInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
ShellInterpreter.class.getName() + this.hashCode());
return SchedulerFactory.singleton().createOrGetParallelScheduler(
ShellInterpreter.class.getName() + this.hashCode(), 10);
}
@Override

View file

@ -290,7 +290,10 @@ public class RemoteInterpreterServer
@Override
public Map<String, Object> info() {
return null;
if (infos == null) {
infos = new HashMap<>();
}
return infos;
}
@Override

View file

@ -84,6 +84,7 @@ public abstract class Job {
private transient Throwable exception;
private transient JobListener listener;
private long progressUpdateIntervalMs;
protected Map<String, Object> infos;
public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) {
this.jobName = jobName;