Managed interpreter process and Running interpreter process

This commit is contained in:
Lee moon soo 2016-04-13 02:13:49 +02:00
parent b47ca744b9
commit 98f3872c6a
13 changed files with 352 additions and 135 deletions

View file

@ -22,11 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.Application;
import org.apache.zeppelin.helium.ApplicationException;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;

View file

@ -23,9 +23,7 @@ import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View file

@ -55,8 +55,12 @@ public class RemoteInterpreter extends Interpreter {
private Map<String, String> env;
private int connectTimeout;
private int maxPoolSize;
private static String schedulerName;
private String host;
private int port;
/**
* Remote interpreter and manage interpreter process
*/
public RemoteInterpreter(Properties property,
String noteId,
String className,
@ -81,6 +85,32 @@ public class RemoteInterpreter extends Interpreter {
this.applicationEventListener = appListener;
}
/**
* Connect to existing process
*/
public RemoteInterpreter(Properties property,
String noteId,
String className,
String host,
int port,
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener) {
super(property);
this.noteId = noteId;
this.className = className;
initialized = false;
this.host = host;
this.port = port;
this.connectTimeout = connectTimeout;
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
}
// VisibleForTesting
public RemoteInterpreter(Properties property,
String noteId,
@ -110,6 +140,10 @@ public class RemoteInterpreter extends Interpreter {
return className;
}
private boolean connectToExistingProcess() {
return host != null && port > 0;
}
public RemoteInterpreterProcess getInterpreterProcess() {
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup == null) {
@ -118,10 +152,20 @@ public class RemoteInterpreter extends Interpreter {
synchronized (intpGroup) {
if (intpGroup.getRemoteInterpreterProcess() == null) {
// create new remote process
RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
remoteInterpreterProcessListener, applicationEventListener);
RemoteInterpreterProcess remoteProcess;
if (connectToExistingProcess()) {
remoteProcess = new RemoteInterpreterRunningProcess(
connectTimeout,
remoteInterpreterProcessListener,
applicationEventListener,
host,
port);
} else {
// create new remote process
remoteProcess = new RemoteInterpreterManagedProcess(
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
remoteInterpreterProcessListener, applicationEventListener);
}
intpGroup.setRemoteInterpreterProcess(remoteProcess);
}

View file

@ -25,7 +25,6 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
@ -37,7 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

View file

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
/**
* This class manages start / stop of remote interpreter process
*/
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess implements ExecuteResultHandler {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterManagedProcess.class);
private final String interpreterRunner;
private DefaultExecutor executor;
private ExecuteWatchdog watchdog;
boolean running = false;
private int port = -1;
private final String interpreterDir;
private final String localRepoDir;
private Map<String, String> env;
public RemoteInterpreterManagedProcess(
String intpRunner,
String intpDir,
String localRepoDir,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
super(new RemoteInterpreterEventPoller(listener, appListener),
connectTimeout);
this.interpreterRunner = intpRunner;
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
}
RemoteInterpreterManagedProcess(String intpRunner,
String intpDir,
String localRepoDir,
Map<String, String> env,
RemoteInterpreterEventPoller remoteInterpreterEventPoller,
int connectTimeout) {
super(remoteInterpreterEventPoller,
connectTimeout);
this.interpreterRunner = intpRunner;
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
}
@Override
public String getHost() {
return "localhost";
}
@Override
public int getPort() {
return port;
}
@Override
public void start() {
// start server process
try {
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e1) {
throw new InterpreterException(e1);
}
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
cmdLine.addArgument(interpreterDir, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(port), false);
cmdLine.addArgument("-l", false);
cmdLine.addArgument(localRepoDir, false);
executor = new DefaultExecutor();
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
try {
Map procEnv = EnvironmentUtils.getProcEnvironment();
procEnv.putAll(env);
logger.info("Run interpreter process {}", cmdLine);
executor.execute(cmdLine, procEnv, this);
running = true;
} catch (IOException e) {
running = false;
throw new InterpreterException(e);
}
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
break;
} else {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
"Thread.sleep", e);
}
}
}
}
public void stop() {
if (isRunning()) {
logger.info("kill interpreter process");
watchdog.destroyProcess();
}
executor = null;
watchdog = null;
running = false;
logger.info("Remote process terminated");
}
@Override
public void onProcessComplete(int exitValue) {
logger.info("Interpreter process exited {}", exitValue);
running = false;
}
@Override
public void onProcessFailed(ExecuteException e) {
logger.info("Interpreter process failed {}", e);
running = false;
}
public boolean isRunning() {
return running;
}
}

View file

@ -14,134 +14,68 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TException;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* Abstract class for interpreter process
*/
public class RemoteInterpreterProcess implements ExecuteResultHandler {
public abstract class RemoteInterpreterProcess {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
private final AtomicInteger referenceCount;
private DefaultExecutor executor;
private ExecuteWatchdog watchdog;
boolean running = false;
private int port = -1;
private final String interpreterRunner;
private final String interpreterDir;
private final String localRepoDir;
private GenericObjectPool<Client> clientPool;
private Map<String, String> env;
private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
private int connectTimeout;
public RemoteInterpreterProcess(
String intpRunner,
String intpDir,
String localRepoDir,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
this(intpRunner,
intpDir,
localRepoDir,
env,
new RemoteInterpreterEventPoller(listener, appListener),
this(new RemoteInterpreterEventPoller(listener, appListener),
connectTimeout);
}
RemoteInterpreterProcess(String intpRunner,
String intpDir,
String localRepoDir,
Map<String, String> env,
RemoteInterpreterEventPoller remoteInterpreterEventPoller,
int connectTimeout) {
this.interpreterRunner = intpRunner;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
this.env = env;
RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
int connectTimeout) {
this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
referenceCount = new AtomicInteger(0);
this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
this.connectTimeout = connectTimeout;
}
public abstract String getHost();
public abstract int getPort();
public abstract void start();
public abstract void stop();
public abstract boolean isRunning();
public int getPort() {
return port;
public int getConnectTimeout() {
return connectTimeout;
}
public int reference(InterpreterGroup interpreterGroup) {
synchronized (referenceCount) {
if (executor == null) {
// start server process
try {
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e1) {
throw new InterpreterException(e1);
}
if (!isRunning()) {
start();
}
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
cmdLine.addArgument(interpreterDir, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(port), false);
cmdLine.addArgument("-l", false);
cmdLine.addArgument(localRepoDir, false);
executor = new DefaultExecutor();
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
running = true;
try {
Map procEnv = EnvironmentUtils.getProcEnvironment();
procEnv.putAll(env);
logger.info("Run interpreter process {}", cmdLine);
executor.execute(cmdLine, procEnv, this);
} catch (IOException e) {
running = false;
throw new InterpreterException(e);
}
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < connectTimeout) {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
break;
} else {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
"Thread.sleep", e);
}
}
}
clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port));
if (clientPool == null) {
clientPool = new GenericObjectPool<Client>(new ClientFactory(getHost(), getPort()));
clientPool.setTestOnReturn(true);
remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
remoteInterpreterEventPoller.setInterpreterProcess(this);
@ -224,16 +158,6 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
break;
}
}
if (isRunning()) {
logger.info("kill interpreter process");
watchdog.destroyProcess();
}
executor = null;
watchdog = null;
running = false;
logger.info("Remote process terminated");
}
return r;
}
@ -245,23 +169,6 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
}
}
@Override
public void onProcessComplete(int exitValue) {
logger.info("Interpreter process exited {}", exitValue);
running = false;
}
@Override
public void onProcessFailed(ExecuteException e) {
logger.info("Interpreter process failed {}", e);
running = false;
}
public boolean isRunning() {
return running;
}
public int getNumActiveClient() {
if (clientPool == null) {
return 0;

View file

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class connects to existing process
*/
public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
private final String host;
private final int port;
public RemoteInterpreterRunningProcess(
int connectTimeout,
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener,
String host,
int port
) {
super(connectTimeout, listener, appListener);
this.host = host;
this.port = port;
}
@Override
public String getHost() {
return host;
}
@Override
public int getPort() {
return port;
}
@Override
public void start() {
// assume process is externally managed. nothing to do
}
@Override
public void stop() {
// assume process is externally managed. nothing to do
}
@Override
public boolean isRunning() {
return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
}
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.resource;
import com.google.gson.Gson;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.slf4j.Logger;

View file

@ -20,6 +20,7 @@ package org.apache.zeppelin.scheduler;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.scheduler.Job.Status;
@ -49,8 +50,8 @@ public class RemoteScheduler implements Scheduler {
private RemoteInterpreterProcess interpreterProcess;
public RemoteScheduler(String name, ExecutorService executor, String noteId,
RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
int maxConcurrency) {
RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
int maxConcurrency) {
this.name = name;
this.executor = executor;
this.listener = listener;

View file

@ -32,7 +32,7 @@ public class RemoteInterpreterProcessTest {
@Test
public void testStartStop() {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
10 * 1000, null, null);
assertFalse(rip.isRunning());
@ -49,7 +49,7 @@ public class RemoteInterpreterProcessTest {
@Test
public void testClientFactory() throws Exception {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
mock(RemoteInterpreterEventPoller.class), 10 * 1000);
rip.reference(intpGroup);

View file

@ -30,7 +30,6 @@ import java.util.Properties;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;

View file

@ -506,11 +506,18 @@ public class InterpreterFactory {
Interpreter intp;
if (option.isRemote()) {
intp = createRemoteRepl(info.getPath(),
noteId,
info.getClassName(),
properties,
interpreterGroup.id);
if (option.isConnectExistingProcess()) {
intp = connectToRemoteRepl(
noteId,
info.getClassName(),
option.getHost(), option.getPort(), properties);
} else {
intp = createRemoteRepl(info.getPath(),
noteId,
info.getClassName(),
properties,
interpreterGroup.id);
}
} else {
intp = createRepl(info.getPath(),
info.getClassName(),
@ -813,6 +820,26 @@ public class InterpreterFactory {
}
}
private Interpreter connectToRemoteRepl(String noteId,
String className,
String host,
int port,
Properties property) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(
new RemoteInterpreter(
property,
noteId,
className,
host,
port,
connectTimeout,
maxPoolSize,
remoteInterpreterProcessListener,
appEventListener));
return intp;
}
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
Properties property, String interpreterId) {

View file

@ -22,6 +22,8 @@ package org.apache.zeppelin.interpreter;
*/
public class InterpreterOption {
boolean remote;
String host = null;
int port = -1;
boolean perNoteSession;
public InterpreterOption() {
@ -47,4 +49,17 @@ public class InterpreterOption {
public void setPerNoteSession(boolean perNoteSession) {
this.perNoteSession = perNoteSession;
}
}
public boolean isConnectExistingProcess() {
return (host != null && port != -1);
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
}