mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Managed interpreter process and Running interpreter process
This commit is contained in:
parent
b47ca744b9
commit
98f3872c6a
13 changed files with 352 additions and 135 deletions
|
|
@ -22,11 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.helium.Application;
|
||||
import org.apache.zeppelin.helium.ApplicationException;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
|
|||
|
|
@ -23,9 +23,7 @@ import org.apache.thrift.TException;
|
|||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
|
|||
|
|
@ -55,8 +55,12 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private Map<String, String> env;
|
||||
private int connectTimeout;
|
||||
private int maxPoolSize;
|
||||
private static String schedulerName;
|
||||
private String host;
|
||||
private int port;
|
||||
|
||||
/**
|
||||
* Remote interpreter and manage interpreter process
|
||||
*/
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
String className,
|
||||
|
|
@ -81,6 +85,32 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.applicationEventListener = appListener;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Connect to existing process
|
||||
*/
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
String className,
|
||||
String host,
|
||||
int port,
|
||||
int connectTimeout,
|
||||
int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener) {
|
||||
super(property);
|
||||
this.noteId = noteId;
|
||||
this.className = className;
|
||||
initialized = false;
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.connectTimeout = connectTimeout;
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.applicationEventListener = appListener;
|
||||
}
|
||||
|
||||
|
||||
// VisibleForTesting
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
|
|
@ -110,6 +140,10 @@ public class RemoteInterpreter extends Interpreter {
|
|||
return className;
|
||||
}
|
||||
|
||||
private boolean connectToExistingProcess() {
|
||||
return host != null && port > 0;
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcess getInterpreterProcess() {
|
||||
InterpreterGroup intpGroup = getInterpreterGroup();
|
||||
if (intpGroup == null) {
|
||||
|
|
@ -118,10 +152,20 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
synchronized (intpGroup) {
|
||||
if (intpGroup.getRemoteInterpreterProcess() == null) {
|
||||
// create new remote process
|
||||
RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
|
||||
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
|
||||
remoteInterpreterProcessListener, applicationEventListener);
|
||||
RemoteInterpreterProcess remoteProcess;
|
||||
if (connectToExistingProcess()) {
|
||||
remoteProcess = new RemoteInterpreterRunningProcess(
|
||||
connectTimeout,
|
||||
remoteInterpreterProcessListener,
|
||||
applicationEventListener,
|
||||
host,
|
||||
port);
|
||||
} else {
|
||||
// create new remote process
|
||||
remoteProcess = new RemoteInterpreterManagedProcess(
|
||||
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
|
||||
remoteInterpreterProcessListener, applicationEventListener);
|
||||
}
|
||||
|
||||
intpGroup.setRemoteInterpreterProcess(remoteProcess);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,6 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
|
|||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
|
|
@ -37,7 +36,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.commons.exec.*;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class manages start / stop of remote interpreter process
|
||||
*/
|
||||
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess implements ExecuteResultHandler {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterManagedProcess.class);
|
||||
private final String interpreterRunner;
|
||||
|
||||
private DefaultExecutor executor;
|
||||
private ExecuteWatchdog watchdog;
|
||||
boolean running = false;
|
||||
private int port = -1;
|
||||
private final String interpreterDir;
|
||||
private final String localRepoDir;
|
||||
|
||||
private Map<String, String> env;
|
||||
|
||||
public RemoteInterpreterManagedProcess(
|
||||
String intpRunner,
|
||||
String intpDir,
|
||||
String localRepoDir,
|
||||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener) {
|
||||
super(new RemoteInterpreterEventPoller(listener, appListener),
|
||||
connectTimeout);
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.env = env;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
}
|
||||
|
||||
RemoteInterpreterManagedProcess(String intpRunner,
|
||||
String intpDir,
|
||||
String localRepoDir,
|
||||
Map<String, String> env,
|
||||
RemoteInterpreterEventPoller remoteInterpreterEventPoller,
|
||||
int connectTimeout) {
|
||||
super(remoteInterpreterEventPoller,
|
||||
connectTimeout);
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.env = env;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHost() {
|
||||
return "localhost";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
// start server process
|
||||
try {
|
||||
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
} catch (IOException e1) {
|
||||
throw new InterpreterException(e1);
|
||||
}
|
||||
|
||||
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
|
||||
cmdLine.addArgument("-d", false);
|
||||
cmdLine.addArgument(interpreterDir, false);
|
||||
cmdLine.addArgument("-p", false);
|
||||
cmdLine.addArgument(Integer.toString(port), false);
|
||||
cmdLine.addArgument("-l", false);
|
||||
cmdLine.addArgument(localRepoDir, false);
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
|
||||
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
|
||||
executor.setWatchdog(watchdog);
|
||||
|
||||
try {
|
||||
Map procEnv = EnvironmentUtils.getProcEnvironment();
|
||||
procEnv.putAll(env);
|
||||
|
||||
logger.info("Run interpreter process {}", cmdLine);
|
||||
executor.execute(cmdLine, procEnv, this);
|
||||
running = true;
|
||||
} catch (IOException e) {
|
||||
running = false;
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
|
||||
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
|
||||
break;
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
|
||||
"Thread.sleep", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (isRunning()) {
|
||||
logger.info("kill interpreter process");
|
||||
watchdog.destroyProcess();
|
||||
}
|
||||
|
||||
executor = null;
|
||||
watchdog = null;
|
||||
running = false;
|
||||
logger.info("Remote process terminated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
logger.info("Interpreter process exited {}", exitValue);
|
||||
running = false;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
logger.info("Interpreter process failed {}", e);
|
||||
running = false;
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
}
|
||||
|
|
@ -14,134 +14,68 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.exec.*;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPool;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*
|
||||
* Abstract class for interpreter process
|
||||
*/
|
||||
public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
||||
public abstract class RemoteInterpreterProcess {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
|
||||
|
||||
private final AtomicInteger referenceCount;
|
||||
private DefaultExecutor executor;
|
||||
private ExecuteWatchdog watchdog;
|
||||
boolean running = false;
|
||||
private int port = -1;
|
||||
private final String interpreterRunner;
|
||||
private final String interpreterDir;
|
||||
private final String localRepoDir;
|
||||
|
||||
private GenericObjectPool<Client> clientPool;
|
||||
private Map<String, String> env;
|
||||
private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
|
||||
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
|
||||
private int connectTimeout;
|
||||
|
||||
public RemoteInterpreterProcess(
|
||||
String intpRunner,
|
||||
String intpDir,
|
||||
String localRepoDir,
|
||||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener) {
|
||||
this(intpRunner,
|
||||
intpDir,
|
||||
localRepoDir,
|
||||
env,
|
||||
new RemoteInterpreterEventPoller(listener, appListener),
|
||||
this(new RemoteInterpreterEventPoller(listener, appListener),
|
||||
connectTimeout);
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess(String intpRunner,
|
||||
String intpDir,
|
||||
String localRepoDir,
|
||||
Map<String, String> env,
|
||||
RemoteInterpreterEventPoller remoteInterpreterEventPoller,
|
||||
int connectTimeout) {
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
this.env = env;
|
||||
RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
|
||||
int connectTimeout) {
|
||||
this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
|
||||
referenceCount = new AtomicInteger(0);
|
||||
this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
|
||||
this.connectTimeout = connectTimeout;
|
||||
}
|
||||
|
||||
public abstract String getHost();
|
||||
public abstract int getPort();
|
||||
public abstract void start();
|
||||
public abstract void stop();
|
||||
public abstract boolean isRunning();
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
public int getConnectTimeout() {
|
||||
return connectTimeout;
|
||||
}
|
||||
|
||||
public int reference(InterpreterGroup interpreterGroup) {
|
||||
synchronized (referenceCount) {
|
||||
if (executor == null) {
|
||||
// start server process
|
||||
try {
|
||||
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
} catch (IOException e1) {
|
||||
throw new InterpreterException(e1);
|
||||
}
|
||||
if (!isRunning()) {
|
||||
start();
|
||||
}
|
||||
|
||||
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
|
||||
cmdLine.addArgument("-d", false);
|
||||
cmdLine.addArgument(interpreterDir, false);
|
||||
cmdLine.addArgument("-p", false);
|
||||
cmdLine.addArgument(Integer.toString(port), false);
|
||||
cmdLine.addArgument("-l", false);
|
||||
cmdLine.addArgument(localRepoDir, false);
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
|
||||
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
|
||||
executor.setWatchdog(watchdog);
|
||||
|
||||
running = true;
|
||||
try {
|
||||
Map procEnv = EnvironmentUtils.getProcEnvironment();
|
||||
procEnv.putAll(env);
|
||||
|
||||
logger.info("Run interpreter process {}", cmdLine);
|
||||
executor.execute(cmdLine, procEnv, this);
|
||||
} catch (IOException e) {
|
||||
running = false;
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTime < connectTimeout) {
|
||||
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
|
||||
break;
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
|
||||
"Thread.sleep", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port));
|
||||
if (clientPool == null) {
|
||||
clientPool = new GenericObjectPool<Client>(new ClientFactory(getHost(), getPort()));
|
||||
clientPool.setTestOnReturn(true);
|
||||
|
||||
remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
|
||||
remoteInterpreterEventPoller.setInterpreterProcess(this);
|
||||
|
|
@ -224,16 +158,6 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (isRunning()) {
|
||||
logger.info("kill interpreter process");
|
||||
watchdog.destroyProcess();
|
||||
}
|
||||
|
||||
executor = null;
|
||||
watchdog = null;
|
||||
running = false;
|
||||
logger.info("Remote process terminated");
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
|
@ -245,23 +169,6 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
logger.info("Interpreter process exited {}", exitValue);
|
||||
running = false;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
logger.info("Interpreter process failed {}", e);
|
||||
running = false;
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
public int getNumActiveClient() {
|
||||
if (clientPool == null) {
|
||||
return 0;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class connects to existing process
|
||||
*/
|
||||
public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
||||
private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
|
||||
private final String host;
|
||||
private final int port;
|
||||
|
||||
public RemoteInterpreterRunningProcess(
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener,
|
||||
String host,
|
||||
int port
|
||||
) {
|
||||
super(connectTimeout, listener, appListener);
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
// assume process is externally managed. nothing to do
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
// assume process is externally managed. nothing to do
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
|
||||
}
|
||||
}
|
||||
|
|
@ -19,6 +19,7 @@ package org.apache.zeppelin.resource;
|
|||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.slf4j.Logger;
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.scheduler;
|
|||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
|
|
@ -49,8 +50,8 @@ public class RemoteScheduler implements Scheduler {
|
|||
private RemoteInterpreterProcess interpreterProcess;
|
||||
|
||||
public RemoteScheduler(String name, ExecutorService executor, String noteId,
|
||||
RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
|
||||
int maxConcurrency) {
|
||||
RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
|
||||
int maxConcurrency) {
|
||||
this.name = name;
|
||||
this.executor = executor;
|
||||
this.listener = listener;
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ public class RemoteInterpreterProcessTest {
|
|||
@Test
|
||||
public void testStartStop() {
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
|
||||
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
|
||||
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
|
||||
10 * 1000, null, null);
|
||||
assertFalse(rip.isRunning());
|
||||
|
|
@ -49,7 +49,7 @@ public class RemoteInterpreterProcessTest {
|
|||
@Test
|
||||
public void testClientFactory() throws Exception {
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
|
||||
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
|
||||
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
|
||||
mock(RemoteInterpreterEventPoller.class), 10 * 1000);
|
||||
rip.reference(intpGroup);
|
||||
|
|
|
|||
|
|
@ -30,7 +30,6 @@ import java.util.Properties;
|
|||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
|
|
|
|||
|
|
@ -506,11 +506,18 @@ public class InterpreterFactory {
|
|||
Interpreter intp;
|
||||
|
||||
if (option.isRemote()) {
|
||||
intp = createRemoteRepl(info.getPath(),
|
||||
noteId,
|
||||
info.getClassName(),
|
||||
properties,
|
||||
interpreterGroup.id);
|
||||
if (option.isConnectExistingProcess()) {
|
||||
intp = connectToRemoteRepl(
|
||||
noteId,
|
||||
info.getClassName(),
|
||||
option.getHost(), option.getPort(), properties);
|
||||
} else {
|
||||
intp = createRemoteRepl(info.getPath(),
|
||||
noteId,
|
||||
info.getClassName(),
|
||||
properties,
|
||||
interpreterGroup.id);
|
||||
}
|
||||
} else {
|
||||
intp = createRepl(info.getPath(),
|
||||
info.getClassName(),
|
||||
|
|
@ -813,6 +820,26 @@ public class InterpreterFactory {
|
|||
}
|
||||
}
|
||||
|
||||
private Interpreter connectToRemoteRepl(String noteId,
|
||||
String className,
|
||||
String host,
|
||||
int port,
|
||||
Properties property) {
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(
|
||||
new RemoteInterpreter(
|
||||
property,
|
||||
noteId,
|
||||
className,
|
||||
host,
|
||||
port,
|
||||
connectTimeout,
|
||||
maxPoolSize,
|
||||
remoteInterpreterProcessListener,
|
||||
appEventListener));
|
||||
return intp;
|
||||
}
|
||||
|
||||
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
|
||||
Properties property, String interpreterId) {
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ package org.apache.zeppelin.interpreter;
|
|||
*/
|
||||
public class InterpreterOption {
|
||||
boolean remote;
|
||||
String host = null;
|
||||
int port = -1;
|
||||
boolean perNoteSession;
|
||||
|
||||
public InterpreterOption() {
|
||||
|
|
@ -47,4 +49,17 @@ public class InterpreterOption {
|
|||
public void setPerNoteSession(boolean perNoteSession) {
|
||||
this.perNoteSession = perNoteSession;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public boolean isConnectExistingProcess() {
|
||||
return (host != null && port != -1);
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue