mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Shell interpreter improvements
This commit is contained in:
commit
431cc797c7
7 changed files with 69 additions and 14 deletions
|
|
@ -19,18 +19,22 @@ package org.apache.zeppelin.shell;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -41,6 +45,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class ShellInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
private static final String EXECUTOR_KEY = "executor";
|
||||
int commandTimeOut = 600000;
|
||||
|
||||
static {
|
||||
|
|
@ -61,30 +66,66 @@ public class ShellInterpreter extends Interpreter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.debug("Run shell command '" + cmd + "'");
|
||||
long start = System.currentTimeMillis();
|
||||
CommandLine cmdLine = CommandLine.parse("bash");
|
||||
cmdLine.addArgument("-c", false);
|
||||
cmdLine.addArgument(cmd, false);
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outputStream));
|
||||
|
||||
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
|
||||
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
|
||||
|
||||
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
|
||||
Map<String, Object> info = runningJob.info();
|
||||
info.put(EXECUTOR_KEY, executor);
|
||||
try {
|
||||
int exitValue = executor.execute(cmdLine);
|
||||
int exitVal = executor.execute(cmdLine);
|
||||
logger.info("Paragraph " + contextInterpreter.getParagraphId()
|
||||
+ "return with exit value: " + exitVal);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
|
||||
} catch (ExecuteException e) {
|
||||
int exitValue = e.getExitValue();
|
||||
logger.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
Code code = Code.ERROR;
|
||||
String msg = errorStream.toString();
|
||||
if (exitValue == 143) {
|
||||
code = Code.INCOMPLETE;
|
||||
msg = msg + "Paragraph received a SIGTERM.\n";
|
||||
logger.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " stopped executing: " + msg);
|
||||
}
|
||||
msg += "Exitvalue: " + exitValue;
|
||||
return new InterpreterResult(code, msg);
|
||||
} catch (IOException e) {
|
||||
logger.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {}
|
||||
private Job getRunningJob(String paragraphId) {
|
||||
Job foundJob = null;
|
||||
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
|
||||
for (Job job : jobsRunning) {
|
||||
if (job.getId().equals(paragraphId)) {
|
||||
foundJob = job;
|
||||
}
|
||||
}
|
||||
return foundJob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
if (runningJob != null) {
|
||||
Map<String, Object> info = runningJob.info();
|
||||
Object object = info.get(EXECUTOR_KEY);
|
||||
if (object != null) {
|
||||
Executor executor = (Executor) object;
|
||||
ExecuteWatchdog watchdog = executor.getWatchdog();
|
||||
watchdog.destroyProcess();
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
|
|
@ -97,8 +138,8 @@ public class ShellInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
ShellInterpreter.class.getName() + this.hashCode());
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
ShellInterpreter.class.getName() + this.hashCode(), 10);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -57,14 +57,15 @@ public class RemoteInterpreter extends Interpreter {
|
|||
FormType formType;
|
||||
boolean initialized;
|
||||
private Map<String, String> env;
|
||||
|
||||
private int connectTimeout;
|
||||
private int maxPoolSize;
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
int connectTimeout,
|
||||
int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
super(property);
|
||||
this.className = className;
|
||||
|
|
@ -73,6 +74,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.interpreterPath = interpreterPath;
|
||||
env = new HashMap<String, String>();
|
||||
this.connectTimeout = connectTimeout;
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
}
|
||||
|
||||
|
|
@ -124,7 +126,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
int rc = interpreterProcess.reference(getInterpreterGroup());
|
||||
|
||||
interpreterProcess.setMaxPoolSize(this.maxPoolSize);
|
||||
synchronized (interpreterProcess) {
|
||||
// when first process created
|
||||
if (rc == 1) {
|
||||
|
|
@ -330,7 +332,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
int maxConcurrency = 10;
|
||||
int maxConcurrency = maxPoolSize;
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
if (interpreterProcess == null) {
|
||||
return null;
|
||||
|
|
|
|||
|
|
@ -261,6 +261,12 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public void setMaxPoolSize(int size) {
|
||||
if (clientPool != null) {
|
||||
//Size + 2 for progress poller , cancel operation
|
||||
clientPool.setMaxTotal(size + 2);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Called when angular object is updated in client side to propagate
|
||||
* change to the remote process
|
||||
|
|
|
|||
|
|
@ -285,7 +285,10 @@ public class RemoteInterpreterServer
|
|||
|
||||
@Override
|
||||
public Map<String, Object> info() {
|
||||
return null;
|
||||
if (infos == null) {
|
||||
infos = new HashMap<>();
|
||||
}
|
||||
return infos;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@ public abstract class Job {
|
|||
private transient Throwable exception;
|
||||
private transient JobListener listener;
|
||||
private long progressUpdateIntervalMs;
|
||||
protected Map<String, Object> infos;
|
||||
|
||||
public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) {
|
||||
this.jobName = jobName;
|
||||
|
|
|
|||
|
|
@ -416,6 +416,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.jdbc.JDBCInterpreter"),
|
||||
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
|
||||
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
|
||||
ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
// use specified notebook (id) as homescreen
|
||||
|
|
|
|||
|
|
@ -664,9 +664,10 @@ public class InterpreterFactory {
|
|||
Properties property) {
|
||||
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
|
||||
property, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, connectTimeout, remoteInterpreterProcessListener));
|
||||
interpreterPath, connectTimeout, maxPoolSize, remoteInterpreterProcessListener));
|
||||
return intp;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue