Shell interpreter improvements

This commit is contained in:
karuppayya 2016-01-22 10:52:16 +05:30
commit 431cc797c7
7 changed files with 69 additions and 14 deletions

View file

@ -19,18 +19,22 @@ package org.apache.zeppelin.shell;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.Executor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@ -41,6 +45,7 @@ import org.slf4j.LoggerFactory;
*/
public class ShellInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
private static final String EXECUTOR_KEY = "executor";
int commandTimeOut = 600000;
static {
@ -61,30 +66,66 @@ public class ShellInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.debug("Run shell command '" + cmd + "'");
long start = System.currentTimeMillis();
CommandLine cmdLine = CommandLine.parse("bash");
cmdLine.addArgument("-c", false);
cmdLine.addArgument(cmd, false);
DefaultExecutor executor = new DefaultExecutor();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
executor.setStreamHandler(new PumpStreamHandler(outputStream));
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
Map<String, Object> info = runningJob.info();
info.put(EXECUTOR_KEY, executor);
try {
int exitValue = executor.execute(cmdLine);
int exitVal = executor.execute(cmdLine);
logger.info("Paragraph " + contextInterpreter.getParagraphId()
+ "return with exit value: " + exitVal);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
} catch (ExecuteException e) {
int exitValue = e.getExitValue();
logger.error("Can not run " + cmd, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
Code code = Code.ERROR;
String msg = errorStream.toString();
if (exitValue == 143) {
code = Code.INCOMPLETE;
msg = msg + "Paragraph received a SIGTERM.\n";
logger.info("The paragraph " + contextInterpreter.getParagraphId()
+ " stopped executing: " + msg);
}
msg += "Exitvalue: " + exitValue;
return new InterpreterResult(code, msg);
} catch (IOException e) {
logger.error("Can not run " + cmd, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
}
@Override
public void cancel(InterpreterContext context) {}
private Job getRunningJob(String paragraphId) {
Job foundJob = null;
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
for (Job job : jobsRunning) {
if (job.getId().equals(paragraphId)) {
foundJob = job;
}
}
return foundJob;
}
@Override
public void cancel(InterpreterContext context) {
Job runningJob = getRunningJob(context.getParagraphId());
if (runningJob != null) {
Map<String, Object> info = runningJob.info();
Object object = info.get(EXECUTOR_KEY);
if (object != null) {
Executor executor = (Executor) object;
ExecuteWatchdog watchdog = executor.getWatchdog();
watchdog.destroyProcess();
}
}
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
@ -97,8 +138,8 @@ public class ShellInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
ShellInterpreter.class.getName() + this.hashCode());
return SchedulerFactory.singleton().createOrGetParallelScheduler(
ShellInterpreter.class.getName() + this.hashCode(), 10);
}
@Override

View file

@ -57,14 +57,15 @@ public class RemoteInterpreter extends Interpreter {
FormType formType;
boolean initialized;
private Map<String, String> env;
private int connectTimeout;
private int maxPoolSize;
public RemoteInterpreter(Properties property,
String className,
String interpreterRunner,
String interpreterPath,
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.className = className;
@ -73,6 +74,7 @@ public class RemoteInterpreter extends Interpreter {
this.interpreterPath = interpreterPath;
env = new HashMap<String, String>();
this.connectTimeout = connectTimeout;
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
@ -124,7 +126,7 @@ public class RemoteInterpreter extends Interpreter {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
int rc = interpreterProcess.reference(getInterpreterGroup());
interpreterProcess.setMaxPoolSize(this.maxPoolSize);
synchronized (interpreterProcess) {
// when first process created
if (rc == 1) {
@ -330,7 +332,7 @@ public class RemoteInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
int maxConcurrency = 10;
int maxConcurrency = maxPoolSize;
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
if (interpreterProcess == null) {
return null;

View file

@ -261,6 +261,12 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
}
}
public void setMaxPoolSize(int size) {
if (clientPool != null) {
//Size + 2 for progress poller , cancel operation
clientPool.setMaxTotal(size + 2);
}
}
/**
* Called when angular object is updated in client side to propagate
* change to the remote process

View file

@ -285,7 +285,10 @@ public class RemoteInterpreterServer
@Override
public Map<String, Object> info() {
return null;
if (infos == null) {
infos = new HashMap<>();
}
return infos;
}
@Override

View file

@ -86,6 +86,7 @@ public abstract class Job {
private transient Throwable exception;
private transient JobListener listener;
private long progressUpdateIntervalMs;
protected Map<String, Object> infos;
public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) {
this.jobName = jobName;

View file

@ -416,6 +416,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.jdbc.JDBCInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
// use specified notebook (id) as homescreen

View file

@ -664,9 +664,10 @@ public class InterpreterFactory {
Properties property) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
property, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, connectTimeout, remoteInterpreterProcessListener));
interpreterPath, connectTimeout, maxPoolSize, remoteInterpreterProcessListener));
return intp;
}