mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-2197. Interpreter Lifecycle Manager
This commit is contained in:
parent
84cb4b5fb9
commit
00d0183577
14 changed files with 339 additions and 13 deletions
|
|
@ -411,6 +411,25 @@
|
|||
<description>Enable directory listings on server.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.class</name>
|
||||
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
|
||||
<description>LifecycleManager class for managing the lifecycle of interpreters, by default interpreter will
|
||||
be closed after timeout</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
|
||||
<value>60000</value>
|
||||
<description>milliseconds of the interval to checking whether interpreter is time out</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name>
|
||||
<value>3600000</value>
|
||||
<description>milliseconds of the interpreter timeout threshold, by default it is 1 hour</description>
|
||||
</property>
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.server.jetty.name</name>
|
||||
|
|
|
|||
|
|
@ -534,6 +534,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT);
|
||||
}
|
||||
|
||||
public String getLifecycleManagerClass() {
|
||||
return getString(ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS);
|
||||
}
|
||||
|
||||
public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf,
|
||||
ConfigurationKeyPredicate predicate) {
|
||||
|
|
@ -701,7 +704,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""),
|
||||
ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
|
||||
|
||||
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":");
|
||||
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
|
||||
|
||||
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
|
||||
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager");
|
||||
|
||||
private String varName;
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
|
|
|||
|
|
@ -142,4 +142,23 @@ public class InterpreterGroup {
|
|||
public boolean isEmpty() {
|
||||
return sessions.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof InterpreterGroup)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
InterpreterGroup that = (InterpreterGroup) o;
|
||||
|
||||
return id != null ? id.equals(that.id) : that.id == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return id != null ? id.hashCode() : 0;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext;
|
|||
import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
|
||||
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
|
||||
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
|
||||
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
|
||||
|
|
@ -139,6 +140,7 @@ public class InterpreterSetting {
|
|||
private transient InterpreterLauncher launcher;
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private transient LifecycleManager lifecycleManager;
|
||||
|
||||
/**
|
||||
* Builder class for InterpreterSetting
|
||||
|
|
@ -233,6 +235,11 @@ public class InterpreterSetting {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setLifecycleManager(LifecycleManager lifecycleManager) {
|
||||
interpreterSetting.lifecycleManager = lifecycleManager;
|
||||
return this;
|
||||
}
|
||||
|
||||
public InterpreterSetting create() {
|
||||
// post processing
|
||||
interpreterSetting.postProcessing();
|
||||
|
|
@ -249,6 +256,9 @@ public class InterpreterSetting {
|
|||
|
||||
void postProcessing() {
|
||||
this.status = Status.READY;
|
||||
if (this.lifecycleManager == null) {
|
||||
this.lifecycleManager = new NullLifecycleManager(conf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -321,6 +331,14 @@ public class InterpreterSetting {
|
|||
this.interpreterSettingManager = interpreterSettingManager;
|
||||
}
|
||||
|
||||
public void setLifecycleManager(LifecycleManager lifecycleManager) {
|
||||
this.lifecycleManager = lifecycleManager;
|
||||
}
|
||||
|
||||
public LifecycleManager getLifecycleManager() {
|
||||
return lifecycleManager;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
|
@ -628,7 +646,7 @@ public class InterpreterSetting {
|
|||
for (InterpreterInfo info : interpreterInfos) {
|
||||
Interpreter interpreter = null;
|
||||
interpreter = new RemoteInterpreter(getJavaProperties(), sessionId,
|
||||
info.getClassName(), user);
|
||||
info.getClassName(), user, lifecycleManager);
|
||||
if (info.isDefaultInterpreter()) {
|
||||
interpreters.add(0, interpreter);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ import java.io.FileInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Type;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
|
@ -116,7 +117,7 @@ public class InterpreterSettingManager {
|
|||
private RemoteInterpreterProcessListener remoteInterpreterProcessListener;
|
||||
private ApplicationEventListener appEventListener;
|
||||
private DependencyResolver dependencyResolver;
|
||||
|
||||
private LifecycleManager lifecycleManager;
|
||||
|
||||
public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener,
|
||||
|
|
@ -153,6 +154,14 @@ public class InterpreterSettingManager {
|
|||
this.angularObjectRegistryListener = angularObjectRegistryListener;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.appEventListener = appEventListener;
|
||||
try {
|
||||
this.lifecycleManager = (LifecycleManager)
|
||||
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
|
||||
.newInstance(conf);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Fail to create LifecyleManager", e);
|
||||
}
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
|
|
@ -177,6 +186,7 @@ public class InterpreterSettingManager {
|
|||
remoteInterpreterProcessListener);
|
||||
savedInterpreterSetting.setAppEventListener(appEventListener);
|
||||
savedInterpreterSetting.setDependencyResolver(dependencyResolver);
|
||||
savedInterpreterSetting.setLifecycleManager(lifecycleManager);
|
||||
savedInterpreterSetting.setProperties(InterpreterSetting.convertInterpreterProperties(
|
||||
savedInterpreterSetting.getProperties()
|
||||
));
|
||||
|
|
@ -372,6 +382,7 @@ public class InterpreterSettingManager {
|
|||
interpreterSetting.setAppEventListener(appEventListener);
|
||||
interpreterSetting.setDependencyResolver(dependencyResolver);
|
||||
interpreterSetting.setInterpreterSettingManager(this);
|
||||
interpreterSetting.setLifecycleManager(lifecycleManager);
|
||||
interpreterSetting.postProcessing();
|
||||
interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
|
||||
}
|
||||
|
|
@ -633,6 +644,7 @@ public class InterpreterSettingManager {
|
|||
setting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
|
||||
setting.setDependencyResolver(dependencyResolver);
|
||||
setting.setAngularObjectRegistryListener(angularObjectRegistryListener);
|
||||
setting.setLifecycleManager(lifecycleManager);
|
||||
setting.setInterpreterSettingManager(this);
|
||||
setting.postProcessing();
|
||||
interpreterSettings.put(setting.getId(), setting);
|
||||
|
|
@ -645,6 +657,7 @@ public class InterpreterSettingManager {
|
|||
interpreterSettingTemplates.put(interpreterSetting.getName(), interpreterSetting);
|
||||
interpreterSetting.setAppEventListener(appEventListener);
|
||||
interpreterSetting.setDependencyResolver(dependencyResolver);
|
||||
interpreterSetting.setLifecycleManager(lifecycleManager);
|
||||
interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
|
||||
interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
|
||||
interpreterSetting.setInterpreterSettingManager(this);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
|
||||
/**
|
||||
* Interface for managing the lifecycle of interpreters
|
||||
*/
|
||||
public interface LifecycleManager {
|
||||
|
||||
void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup);
|
||||
|
||||
void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup,
|
||||
String sessionId);
|
||||
|
||||
void onInterpreterUse(ManagedInterpreterGroup interpreterGroup,
|
||||
String sessionId);
|
||||
|
||||
}
|
||||
|
|
@ -47,6 +47,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
ManagedInterpreterGroup(String id, InterpreterSetting interpreterSetting) {
|
||||
super(id);
|
||||
this.interpreterSetting = interpreterSetting;
|
||||
interpreterSetting.getLifecycleManager().onInterpreterGroupCreated(this);
|
||||
}
|
||||
|
||||
public InterpreterSetting getInterpreterSetting() {
|
||||
|
|
@ -88,7 +89,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
LOGGER.info("Remove this InterpreterGroup: {} as all the sessions are closed", id);
|
||||
interpreterSetting.removeInterpreterGroup(id);
|
||||
if (remoteInterpreterProcess != null) {
|
||||
LOGGER.info("Kill RemoteIntetrpreterProcess");
|
||||
LOGGER.info("Kill RemoteInterpreterProcess");
|
||||
remoteInterpreterProcess.stop();
|
||||
remoteInterpreterProcess = null;
|
||||
}
|
||||
|
|
@ -134,8 +135,10 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
interpreter.setInterpreterGroup(this);
|
||||
}
|
||||
LOGGER.info("Create Session: {} in InterpreterGroup: {} for user: {}", sessionId, id, user);
|
||||
interpreterSetting.getLifecycleManager().onInterpreterSessionCreated(this, sessionId);
|
||||
sessions.put(sessionId, interpreters);
|
||||
return interpreters;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.interpreter.lifecycle;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.LifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
|
||||
/**
|
||||
* Do nothing for the lifecycle of interpreter. User need to explicitly start/stop interpreter.
|
||||
*/
|
||||
public class NullLifecycleManager implements LifecycleManager {
|
||||
|
||||
public NullLifecycleManager(ZeppelinConfiguration zConf) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup,
|
||||
String sessionId) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,75 @@
|
|||
package org.apache.zeppelin.interpreter.lifecycle;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.LifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
||||
/**
|
||||
* This lifecycle manager would close interpreter after it is timeout. By default, it is timeout
|
||||
* after no using in 1 hour.
|
||||
*
|
||||
* For now, this class only manage the lifecycle of interpreter group (will close interpreter
|
||||
* process after timeout). Managing the lifecycle of interpreter session could be done in future
|
||||
* if necessary.
|
||||
*/
|
||||
public class TimeoutLifecycleManager implements LifecycleManager {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutLifecycleManager.class);
|
||||
|
||||
// ManagerInterpreter -> LastTimeUsing timestamp
|
||||
private Map<ManagedInterpreterGroup, Long> interpreterGroups = new ConcurrentHashMap<>();
|
||||
|
||||
private long checkInterval;
|
||||
private long timeoutThreshold;
|
||||
|
||||
private Timer checkTimer;
|
||||
|
||||
public TimeoutLifecycleManager(ZeppelinConfiguration zConf) {
|
||||
this.checkInterval = zConf.getLong("zeppelin.interpreter.lifecyclemanager." +
|
||||
"timeout.checkinterval", 60000);
|
||||
this.timeoutThreshold = zConf.getLong("zeppelin.interpreter.lifecyclemanager.timeout.threshold",
|
||||
1000 * 60 * 60);
|
||||
this.checkTimer = new Timer(true);
|
||||
this.checkTimer.scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
long now = System.currentTimeMillis();
|
||||
for (Map.Entry<ManagedInterpreterGroup, Long> entry : interpreterGroups.entrySet()) {
|
||||
ManagedInterpreterGroup interpreterGroup = entry.getKey();
|
||||
Long lastTimeUsing = entry.getValue();
|
||||
if ((now - lastTimeUsing) > timeoutThreshold ) {
|
||||
LOGGER.info("Interpreter {} is timeout.", interpreterGroup.getId());
|
||||
interpreterGroup.close();
|
||||
interpreterGroups.remove(entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
}, checkInterval, checkInterval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup) {
|
||||
interpreterGroups.put(interpreterGroup, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup,
|
||||
String sessionId) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {
|
||||
interpreterGroups.put(interpreterGroup, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
|
||||
|
|
@ -66,17 +67,21 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private volatile boolean isOpened = false;
|
||||
private volatile boolean isCreated = false;
|
||||
|
||||
private LifecycleManager lifecycleManager;
|
||||
|
||||
/**
|
||||
* Remote interpreter and manage interpreter process
|
||||
*/
|
||||
public RemoteInterpreter(Properties properties,
|
||||
String sessionId,
|
||||
String className,
|
||||
String userName) {
|
||||
String userName,
|
||||
LifecycleManager lifecycleManager) {
|
||||
super(properties);
|
||||
this.sessionId = sessionId;
|
||||
this.className = className;
|
||||
this.userName = userName;
|
||||
this.lifecycleManager = lifecycleManager;
|
||||
}
|
||||
|
||||
public boolean isOpened() {
|
||||
|
|
@ -149,6 +154,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
}
|
||||
});
|
||||
isOpened = true;
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -189,6 +195,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
}
|
||||
});
|
||||
isOpened = false;
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
} else {
|
||||
LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className);
|
||||
}
|
||||
|
|
@ -218,6 +225,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
interpreterContextRunnerPool.clear(noteId);
|
||||
interpreterContextRunnerPool.addAll(noteId, runners);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
return interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
|
||||
@Override
|
||||
|
|
@ -266,6 +274,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
|
||||
@Override
|
||||
public Void call(Client client) throws Exception {
|
||||
|
|
@ -293,6 +302,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
FormType type = interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<FormType>() {
|
||||
@Override
|
||||
|
|
@ -317,6 +327,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
return interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<Integer>() {
|
||||
@Override
|
||||
|
|
@ -341,6 +352,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
return interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
|
||||
@Override
|
||||
|
|
@ -362,6 +374,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
return interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<String>() {
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ public abstract class AbstractInterpreterTest {
|
|||
protected File interpreterDir;
|
||||
protected File confDir;
|
||||
protected File notebookDir;
|
||||
protected ZeppelinConfiguration conf;
|
||||
protected ZeppelinConfiguration conf = new ZeppelinConfiguration();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
|
@ -55,7 +55,6 @@ public abstract class AbstractInterpreterTest {
|
|||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
|
||||
|
||||
conf = new ZeppelinConfiguration();
|
||||
conf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "test,mock1,mock2,mock_resource_pool");
|
||||
interpreterSettingManager = new InterpreterSettingManager(conf,
|
||||
mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
|
|||
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
|
||||
assertEquals("test", interpreterSetting.getName());
|
||||
assertEquals("test", interpreterSetting.getGroup());
|
||||
assertEquals(2, interpreterSetting.getInterpreterInfos().size());
|
||||
assertEquals(3, interpreterSetting.getInterpreterInfos().size());
|
||||
// 3 other builtin properties:
|
||||
// * zeppelin.interpeter.output.limit
|
||||
// * zeppelin.interpreter.localRepo
|
||||
|
|
@ -67,7 +67,6 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
|
|||
assertNotNull(interpreterSetting.getAppEventListener());
|
||||
assertNotNull(interpreterSetting.getDependencyResolver());
|
||||
assertNotNull(interpreterSetting.getInterpreterSettingManager());
|
||||
assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath());
|
||||
|
||||
List<RemoteRepository> repositories = interpreterSettingManager.getRepositories();
|
||||
assertEquals(2, repositories.size());
|
||||
|
|
@ -80,14 +79,13 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
|
|||
interpreterSetting = interpreterSettingManager2.getByName("test");
|
||||
assertEquals("test", interpreterSetting.getName());
|
||||
assertEquals("test", interpreterSetting.getGroup());
|
||||
assertEquals(2, interpreterSetting.getInterpreterInfos().size());
|
||||
assertEquals(3, interpreterSetting.getInterpreterInfos().size());
|
||||
assertEquals(6, interpreterSetting.getJavaProperties().size());
|
||||
assertEquals("value_1", interpreterSetting.getJavaProperties().getProperty("property_1"));
|
||||
assertEquals("new_value_2", interpreterSetting.getJavaProperties().getProperty("property_2"));
|
||||
assertEquals("value_3", interpreterSetting.getJavaProperties().getProperty("property_3"));
|
||||
assertEquals("shared", interpreterSetting.getOption().perNote);
|
||||
assertEquals("shared", interpreterSetting.getOption().perUser);
|
||||
assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath());
|
||||
assertEquals(0, interpreterSetting.getDependencies().size());
|
||||
|
||||
repositories = interpreterSettingManager2.getRepositories();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.lifecycle;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS.getVarName(),
|
||||
TimeoutLifecycleManager.class.getName());
|
||||
conf.setProperty("zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", 5000);
|
||||
conf.setProperty("zeppelin.interpreter.lifecyclemanager.timeout.threshold", 10000);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeout_1() throws InterpreterException, InterruptedException, IOException {
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.echo") instanceof RemoteInterpreter);
|
||||
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.echo");
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
remoteInterpreter.interpret("hello world", context);
|
||||
assertTrue(remoteInterpreter.isOpened());
|
||||
InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
|
||||
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
|
||||
|
||||
Thread.sleep(15 * 1000);
|
||||
// interpreterGroup is timeout, so is removed.
|
||||
assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
|
||||
assertFalse(remoteInterpreter.isOpened());
|
||||
}
|
||||
}
|
||||
|
|
@ -42,8 +42,19 @@
|
|||
"description": "desc_2"
|
||||
}
|
||||
},
|
||||
"runner": {
|
||||
"linux": "linux_runner"
|
||||
"editor": {
|
||||
"language": "java",
|
||||
"editOnDblClick": false
|
||||
}
|
||||
},
|
||||
|
||||
{
|
||||
"group": "test",
|
||||
"name": "sleep",
|
||||
"defaultInterpreter": false,
|
||||
"className": "org.apache.zeppelin.interpreter.SleepInterpreter",
|
||||
"properties": {
|
||||
|
||||
},
|
||||
"editor": {
|
||||
"language": "java",
|
||||
|
|
|
|||
Loading…
Reference in a new issue