mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Increase thread pool size
This commit is contained in:
parent
8cd6fd4d0e
commit
351888d108
4 changed files with 15 additions and 5 deletions
|
|
@ -56,14 +56,15 @@ public class RemoteInterpreter extends Interpreter {
|
|||
FormType formType;
|
||||
boolean initialized;
|
||||
private Map<String, String> env;
|
||||
|
||||
private int connectTimeout;
|
||||
private int maxPoolSize;
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
int connectTimeout) {
|
||||
int connectTimeout,
|
||||
int maxPoolSize) {
|
||||
super(property);
|
||||
|
||||
this.className = className;
|
||||
|
|
@ -72,6 +73,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.interpreterPath = interpreterPath;
|
||||
env = new HashMap<String, String>();
|
||||
this.connectTimeout = connectTimeout;
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
}
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
|
|
@ -119,7 +121,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
int rc = interpreterProcess.reference(getInterpreterGroup());
|
||||
|
||||
interpreterProcess.setMaxPoolSize(this.maxPoolSize);
|
||||
synchronized (interpreterProcess) {
|
||||
// when first process created
|
||||
if (rc == 1) {
|
||||
|
|
@ -325,7 +327,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
int maxConcurrency = 10;
|
||||
int maxConcurrency = maxPoolSize;
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
if (interpreterProcess == null) {
|
||||
return null;
|
||||
|
|
|
|||
|
|
@ -254,6 +254,12 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public void setMaxPoolSize(int size) {
|
||||
if (clientPool != null) {
|
||||
//Size + 2 for progress poller , cancel opeartion
|
||||
clientPool.setMaxTotal(size + 2);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Called when angular object is updated in client side to propagate
|
||||
* change to the remote process
|
||||
|
|
|
|||
|
|
@ -415,6 +415,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.scalding.ScaldingInterpreter"),
|
||||
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
|
||||
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
|
||||
ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
// use specified notebook (id) as homescreen
|
||||
|
|
|
|||
|
|
@ -667,9 +667,10 @@ public class InterpreterFactory {
|
|||
Properties property) {
|
||||
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
|
||||
property, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, connectTimeout));
|
||||
interpreterPath, connectTimeout, maxPoolSize));
|
||||
return intp;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue