mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
fix
This commit is contained in:
parent
a731b815e1
commit
b0a97a16eb
3 changed files with 69 additions and 12 deletions
|
|
@ -19,7 +19,9 @@ package org.apache.zeppelin.shell;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
|
|
@ -29,8 +31,10 @@ import org.apache.commons.exec.ExecuteWatchdog;
|
|||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -45,16 +49,33 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class ShellInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
int commandTimeOut = 600000;
|
||||
private static final String WATCHDOG_KEY = "watchdog";
|
||||
int commandTimeOut;
|
||||
|
||||
static {
|
||||
Interpreter.register("sh", ShellInterpreter.class.getName());
|
||||
Interpreter.register(
|
||||
"sh",
|
||||
"sh",
|
||||
ShellInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add("shell.commandtimeout", "60000", "Timeout period for shell command").build());
|
||||
}
|
||||
|
||||
public ShellInterpreter(Properties property) {
|
||||
super(property);
|
||||
commandTimeOut = parseIntProperty("shell.commandtimeout", 60000);
|
||||
}
|
||||
|
||||
private int parseIntProperty(String prop, int defaultvalue) {
|
||||
String propValue = getProperty(prop);
|
||||
try {
|
||||
defaultvalue = Integer.parseInt(propValue);
|
||||
} catch (Exception e) {
|
||||
logger.info("Unable to parse property" + prop + ". Using defaults:" + defaultvalue);
|
||||
}
|
||||
return defaultvalue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {}
|
||||
|
||||
|
|
@ -65,30 +86,62 @@ public class ShellInterpreter extends Interpreter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run shell command '" + cmd + "'");
|
||||
long start = System.currentTimeMillis();
|
||||
CommandLine cmdLine = CommandLine.parse("bash");
|
||||
cmdLine.addArgument("-c", false);
|
||||
cmdLine.addArgument(cmd, false);
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outputStream));
|
||||
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
|
||||
|
||||
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
|
||||
ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(commandTimeOut);
|
||||
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
|
||||
Map<String, Object> info = runningJob.info();
|
||||
info.put(WATCHDOG_KEY, executeWatchdog);
|
||||
|
||||
executor.setWatchdog(executeWatchdog);
|
||||
try {
|
||||
int exitValue = executor.execute(cmdLine);
|
||||
executor.execute(cmdLine);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
|
||||
} catch (ExecuteException e) {
|
||||
int exitValue = e.getExitValue();
|
||||
logger.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
Code code = Code.ERROR;
|
||||
String msg = errorStream.toString();
|
||||
if (exitValue == 143) {
|
||||
code = Code.INCOMPLETE;
|
||||
msg = "Paragraph received a SIGTERM";
|
||||
}
|
||||
return new InterpreterResult(code, msg);
|
||||
} catch (IOException e) {
|
||||
logger.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {}
|
||||
private Job getRunningJob(String paragraphId) {
|
||||
Job foundJob = null;
|
||||
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
|
||||
for (Job job : jobsRunning) {
|
||||
if (job.getId().equals(paragraphId)) {
|
||||
foundJob = job;
|
||||
}
|
||||
}
|
||||
return foundJob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
if (runningJob != null) {
|
||||
Map<String, Object> info = runningJob.info();
|
||||
Object object = info.get(WATCHDOG_KEY);
|
||||
if (object != null) {
|
||||
ExecuteWatchdog watchdog = (ExecuteWatchdog) object;
|
||||
watchdog.destroyProcess();
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
|
|
@ -101,8 +154,8 @@ public class ShellInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
ShellInterpreter.class.getName() + this.hashCode());
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
ShellInterpreter.class.getName() + this.hashCode(), 10);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -268,7 +268,10 @@ public class RemoteInterpreterServer
|
|||
|
||||
@Override
|
||||
public Map<String, Object> info() {
|
||||
return null;
|
||||
if (infos == null) {
|
||||
infos = new HashMap<>();
|
||||
}
|
||||
return infos;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -85,6 +85,7 @@ public abstract class Job {
|
|||
private transient Throwable exception;
|
||||
private transient JobListener listener;
|
||||
private long progressUpdateIntervalMs;
|
||||
protected Map<String, Object> infos;
|
||||
|
||||
public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) {
|
||||
this.jobName = jobName;
|
||||
|
|
|
|||
Loading…
Reference in a new issue