mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge remote-tracking branch 'upstream/master' into ZEPPELIN-1363
# Conflicts: # zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
This commit is contained in:
commit
bf8194e6f4
42 changed files with 712 additions and 182 deletions
|
|
@ -411,6 +411,25 @@
|
|||
<description>Enable directory listings on server.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.class</name>
|
||||
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
|
||||
<description>LifecycleManager class for managing the lifecycle of interpreters, by default interpreter will
|
||||
be closed after timeout</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
|
||||
<value>60000</value>
|
||||
<description>milliseconds of the interval to checking whether interpreter is time out</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name>
|
||||
<value>3600000</value>
|
||||
<description>milliseconds of the interpreter timeout threshold, by default it is 1 hour</description>
|
||||
</property>
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.server.jetty.name</name>
|
||||
|
|
|
|||
|
|
@ -47,7 +47,10 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class ShellInterpreter extends KerberosInterpreter {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
|
||||
private static final String TIMEOUT_PROPERTY = "shell.command.timeout.millisecs";
|
||||
private String DEFAULT_TIMEOUT_PROPERTY = "60000";
|
||||
|
||||
private static final String DIRECTORY_USER_HOME = "shell.working.directory.user.home";
|
||||
private final boolean isWindows = System.getProperty("os.name").startsWith("Windows");
|
||||
private final String shell = isWindows ? "cmd /c" : "bash -c";
|
||||
|
|
@ -98,7 +101,9 @@ public class ShellInterpreter extends KerberosInterpreter {
|
|||
DefaultExecutor executor = new DefaultExecutor();
|
||||
executor.setStreamHandler(new PumpStreamHandler(
|
||||
contextInterpreter.out, contextInterpreter.out));
|
||||
executor.setWatchdog(new ExecuteWatchdog(Long.valueOf(getProperty(TIMEOUT_PROPERTY))));
|
||||
|
||||
executor.setWatchdog(new ExecuteWatchdog(
|
||||
Long.valueOf(getProperty(TIMEOUT_PROPERTY, DEFAULT_TIMEOUT_PROPERTY))));
|
||||
executors.put(contextInterpreter.getParagraphId(), executor);
|
||||
if (Boolean.valueOf(getProperty(DIRECTORY_USER_HOME))) {
|
||||
executor.setWorkingDirectory(new File(System.getProperty("user.home")));
|
||||
|
|
|
|||
|
|
@ -44,6 +44,6 @@ if [[ -n "$PYTHON" ]] ; then
|
|||
conda update -q conda
|
||||
conda info -a
|
||||
conda config --add channels conda-forge
|
||||
conda install -q matplotlib pandasql ipython jupyter_client ipykernel matplotlib bokeh
|
||||
conda install -q matplotlib pandasql ipython jupyter_client ipykernel matplotlib bokeh=0.12.6
|
||||
pip install -q grpcio ggplot
|
||||
fi
|
||||
|
|
|
|||
|
|
@ -534,6 +534,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT);
|
||||
}
|
||||
|
||||
public String getLifecycleManagerClass() {
|
||||
return getString(ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS);
|
||||
}
|
||||
|
||||
public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf,
|
||||
ConfigurationKeyPredicate predicate) {
|
||||
|
|
@ -701,7 +704,14 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""),
|
||||
ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
|
||||
|
||||
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":");
|
||||
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
|
||||
|
||||
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
|
||||
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager"),
|
||||
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL(
|
||||
"zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", 6000L),
|
||||
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD(
|
||||
"zeppelin.interpreter.lifecyclemanager.timeout.threshold", 3600000L);
|
||||
|
||||
private String varName;
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
|
|
|||
|
|
@ -107,7 +107,13 @@ public class Input<T> implements Serializable {
|
|||
if (displayName != null ? !displayName.equals(input.displayName) : input.displayName != null) {
|
||||
return false;
|
||||
}
|
||||
if (defaultValue != null ?
|
||||
if (defaultValue instanceof Object[]) {
|
||||
if (defaultValue != null ?
|
||||
!Arrays.equals((Object[]) defaultValue, (Object[]) input.defaultValue)
|
||||
: input.defaultValue != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (defaultValue != null ?
|
||||
!defaultValue.equals(input.defaultValue) : input.defaultValue != null) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -142,4 +142,23 @@ public class InterpreterGroup {
|
|||
public boolean isEmpty() {
|
||||
return sessions.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof InterpreterGroup)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
InterpreterGroup that = (InterpreterGroup) o;
|
||||
|
||||
return id != null ? id.equals(that.id) : that.id == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return id != null ? id.hashCode() : 0;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,6 +63,10 @@ public abstract class Job {
|
|||
public boolean isPending() {
|
||||
return this == PENDING;
|
||||
}
|
||||
|
||||
public boolean isCompleted() {
|
||||
return this == FINISHED || this == ERROR || this == ABORT;
|
||||
}
|
||||
}
|
||||
|
||||
private String jobName;
|
||||
|
|
|
|||
|
|
@ -650,7 +650,7 @@ public class NotebookRestApi {
|
|||
checkIfUserCanRun(noteId, "Insufficient privileges you cannot run job for this note");
|
||||
|
||||
try {
|
||||
note.runAll(subject);
|
||||
note.runAll(subject, true);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Exception from run", ex);
|
||||
return new JsonResponse<>(Status.PRECONDITION_FAILED,
|
||||
|
|
|
|||
|
|
@ -1380,13 +1380,13 @@ public class NotebookServer extends WebSocketServlet
|
|||
List<InterpreterSetting> settings =
|
||||
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) {
|
||||
if (setting.getInterpreterGroup(user, note.getId()) == null) {
|
||||
continue;
|
||||
}
|
||||
if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, note.getId())
|
||||
if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId())
|
||||
.getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry =
|
||||
setting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
|
||||
setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
|
||||
|
||||
// first trying to get local registry
|
||||
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
|
||||
|
|
@ -1423,13 +1423,13 @@ public class NotebookServer extends WebSocketServlet
|
|||
List<InterpreterSetting> settings =
|
||||
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) {
|
||||
if (setting.getInterpreterGroup(user, n.getId()) == null) {
|
||||
continue;
|
||||
}
|
||||
if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId())
|
||||
if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId())
|
||||
.getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry =
|
||||
setting.getOrCreateInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
|
||||
setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
|
||||
this.broadcastExcept(n.getId(),
|
||||
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
|
||||
.put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId())
|
||||
|
|
@ -1682,7 +1682,10 @@ public class NotebookServer extends WebSocketServlet
|
|||
Paragraph p = setParagraphUsingMessage(note, fromMessage,
|
||||
paragraphId, text, title, params, config);
|
||||
|
||||
persistAndExecuteSingleParagraph(conn, note, p);
|
||||
if (!persistAndExecuteSingleParagraph(conn, note, p, true)) {
|
||||
// stop execution when one paragraph fails.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1774,7 +1777,7 @@ public class NotebookServer extends WebSocketServlet
|
|||
Paragraph p = setParagraphUsingMessage(note, fromMessage, paragraphId,
|
||||
text, title, params, config);
|
||||
|
||||
persistAndExecuteSingleParagraph(conn, note, p);
|
||||
persistAndExecuteSingleParagraph(conn, note, p, false);
|
||||
}
|
||||
|
||||
private void addNewParagraphIfLastParagraphIsExecuted(Note note, Paragraph p) {
|
||||
|
|
@ -1806,15 +1809,16 @@ public class NotebookServer extends WebSocketServlet
|
|||
}
|
||||
}
|
||||
|
||||
private void persistAndExecuteSingleParagraph(NotebookSocket conn,
|
||||
Note note, Paragraph p) throws IOException {
|
||||
private boolean persistAndExecuteSingleParagraph(NotebookSocket conn,
|
||||
Note note, Paragraph p,
|
||||
boolean blocking) throws IOException {
|
||||
addNewParagraphIfLastParagraphIsExecuted(note, p);
|
||||
if (!persistNoteWithAuthInfo(conn, note, p)) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
note.run(p.getId());
|
||||
return note.run(p.getId(), blocking);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Exception from run", ex);
|
||||
if (p != null) {
|
||||
|
|
@ -1822,6 +1826,7 @@ public class NotebookServer extends WebSocketServlet
|
|||
p.setStatus(Status.ERROR);
|
||||
broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", p));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2301,14 +2306,17 @@ public class NotebookServer extends WebSocketServlet
|
|||
}
|
||||
|
||||
for (InterpreterSetting intpSetting : settings) {
|
||||
if (intpSetting.getInterpreterGroup(user, note.getId()) == null) {
|
||||
continue;
|
||||
}
|
||||
AngularObjectRegistry registry =
|
||||
intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
|
||||
intpSetting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
|
||||
List<AngularObject> objects = registry.getAllWithGlobal(note.getId());
|
||||
for (AngularObject object : objects) {
|
||||
conn.send(serializeMessage(
|
||||
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
|
||||
.put("interpreterGroupId",
|
||||
intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getId())
|
||||
intpSetting.getInterpreterGroup(user, note.getId()).getId())
|
||||
.put("noteId", note.getId()).put("paragraphId", object.getParagraphId())));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,6 +90,9 @@ public abstract class AbstractTestRestApi {
|
|||
"/api/version = anon\n" +
|
||||
"/** = authc";
|
||||
|
||||
protected static File zeppelinHome;
|
||||
protected static File confDir;
|
||||
|
||||
private String getUrl(String path) {
|
||||
String url;
|
||||
if (System.getProperty("url") != null) {
|
||||
|
|
@ -124,10 +127,17 @@ public abstract class AbstractTestRestApi {
|
|||
}
|
||||
};
|
||||
|
||||
private static void start(boolean withAuth) throws Exception {
|
||||
private static void start(boolean withAuth, String testClassName) throws Exception {
|
||||
if (!wasRunning) {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
|
||||
// copy the resources files to a temp folder
|
||||
zeppelinHome = new File("..");
|
||||
LOG.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
|
||||
confDir = new File(zeppelinHome, "conf_" + testClassName);
|
||||
confDir.mkdirs();
|
||||
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), new File("../zeppelin-web/dist").getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
|
||||
|
||||
// some test profile does not build zeppelin-web.
|
||||
// to prevent zeppelin starting up fail, create zeppelin-web/dist directory
|
||||
|
|
@ -142,7 +152,7 @@ public abstract class AbstractTestRestApi {
|
|||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName(), "false");
|
||||
|
||||
// Create a shiro env test.
|
||||
shiroIni = new File("../conf/shiro.ini");
|
||||
shiroIni = new File(confDir, "shiro.ini");
|
||||
if (!shiroIni.exists()) {
|
||||
shiroIni.createNewFile();
|
||||
}
|
||||
|
|
@ -245,12 +255,12 @@ public abstract class AbstractTestRestApi {
|
|||
}
|
||||
}
|
||||
|
||||
protected static void startUpWithAuthenticationEnable() throws Exception {
|
||||
start(true);
|
||||
protected static void startUpWithAuthenticationEnable(String testClassName) throws Exception {
|
||||
start(true, testClassName);
|
||||
}
|
||||
|
||||
protected static void startUp() throws Exception {
|
||||
start(false);
|
||||
protected static void startUp(String testClassName) throws Exception {
|
||||
start(false, testClassName);
|
||||
}
|
||||
|
||||
private static String getHostname() {
|
||||
|
|
@ -339,6 +349,8 @@ public abstract class AbstractTestRestApi {
|
|||
System
|
||||
.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName());
|
||||
}
|
||||
|
||||
FileUtils.deleteDirectory(confDir);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ public class ConfigurationsRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(ConfigurationsRestApi.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class CredentialsRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(CredentialsRestApiTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ public class HeliumRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(HeliumRestApi.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(InterpreterRestApiTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ public class NotebookRepoRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(NotebookRepoRestApiTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUp();
|
||||
startUp(NotebookRestApiTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
@ -120,6 +120,68 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
|
|||
ZeppelinServer.notebook.removeNote(note1.getId(), anonymous);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunAllParagraph_AllSuccess() throws IOException {
|
||||
Note note1 = ZeppelinServer.notebook.createNote(anonymous);
|
||||
// 2 paragraphs
|
||||
// P1:
|
||||
// %python
|
||||
// import time
|
||||
// time.sleep(1)
|
||||
// user='abc'
|
||||
// P2:
|
||||
// %python
|
||||
// from __future__ import print_function
|
||||
// print(user)
|
||||
//
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("%python import time\ntime.sleep(1)\nuser='abc'");
|
||||
p2.setText("%python from __future__ import print_function\nprint(user)");
|
||||
|
||||
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertThat(post, isAllowed());
|
||||
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
|
||||
assertEquals(Job.Status.FINISHED, p1.getStatus());
|
||||
assertEquals(Job.Status.FINISHED, p2.getStatus());
|
||||
assertEquals("abc\n", p2.getResult().message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunAllParagraph_FirstFailed() throws IOException {
|
||||
Note note1 = ZeppelinServer.notebook.createNote(anonymous);
|
||||
// 2 paragraphs
|
||||
// P1:
|
||||
// %python
|
||||
// import time
|
||||
// time.sleep(1)
|
||||
// from __future__ import print_function
|
||||
// print(user)
|
||||
// P2:
|
||||
// %python
|
||||
// user='abc'
|
||||
//
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("%python import time\ntime.sleep(1)\nfrom __future__ import print_function\nprint(user2)");
|
||||
p2.setText("%python user2='abc'\nprint(user2)");
|
||||
|
||||
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertThat(post, isAllowed());
|
||||
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
|
||||
assertEquals(Job.Status.ERROR, p1.getStatus());
|
||||
// p2 will be skipped because p1 is failed.
|
||||
assertEquals(Job.Status.READY, p2.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloneNote() throws IOException {
|
||||
Note note1 = ZeppelinServer.notebook.createNote(anonymous);
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUpWithAuthenticationEnable();
|
||||
AbstractTestRestApi.startUpWithAuthenticationEnable(NotebookSecurityRestApiTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ public class SecurityRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUpWithAuthenticationEnable();
|
||||
AbstractTestRestApi.startUpWithAuthenticationEnable(SecurityRestApiTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(ZeppelinRestApiTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
@ -441,12 +441,6 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
|
|||
String noteId = note.getId();
|
||||
|
||||
note.runAll();
|
||||
|
||||
// wait until paragraph gets started
|
||||
while (!paragraph.getStatus().isRunning()) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
// assume that status of the paragraph is running
|
||||
GetMethod get = httpGet("/notebook/job/" + noteId);
|
||||
assertThat("test get note job: ", get, isAllowed());
|
||||
|
|
@ -494,15 +488,6 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
|
|||
String noteId = note.getId();
|
||||
|
||||
note.runAll();
|
||||
// wait until job is finished or timeout.
|
||||
int timeout = 1;
|
||||
while (!paragraph.isTerminated()) {
|
||||
Thread.sleep(1000);
|
||||
if (timeout++ > 120) {
|
||||
LOG.info("testRunParagraphWithParams timeout job.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Call Run paragraph REST API
|
||||
PostMethod postParagraph = httpPost("/notebook/job/" + noteId + "/" + paragraph.getId(),
|
||||
|
|
@ -534,17 +519,8 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
|
|||
config.put("enabled", true);
|
||||
paragraph.setConfig(config);
|
||||
|
||||
note.runAll();
|
||||
// wait until job is finished or timeout.
|
||||
int timeout = 1;
|
||||
while (!paragraph.isTerminated()) {
|
||||
Thread.sleep(1000);
|
||||
if (timeout++ > 10) {
|
||||
LOG.info("testNoteJobs timeout job.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
note.runAll(AuthenticationInfo.ANONYMOUS, false);
|
||||
|
||||
String jsonRequest = "{\"cron\":\"* * * * * ?\" }";
|
||||
// right cron expression but not exist note.
|
||||
PostMethod postCron = httpPost("/notebook/cron/notexistnote", jsonRequest);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.rest;
|
||||
|
||||
public class ZeppelinServerTest extends AbstractTestRestApi {
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -49,7 +49,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(ZeppelinSparkClusterTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ public class DirAccessTest extends AbstractTestRestApi {
|
|||
public void testDirAccessForbidden() throws Exception {
|
||||
synchronized (this) {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED.getVarName(), "false");
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(DirAccessTest.class.getSimpleName());
|
||||
HttpClient httpClient = new HttpClient();
|
||||
GetMethod getMethod = new GetMethod(getUrlToTest() + "/app/");
|
||||
httpClient.executeMethod(getMethod);
|
||||
|
|
@ -43,7 +43,7 @@ public class DirAccessTest extends AbstractTestRestApi {
|
|||
public void testDirAccessOk() throws Exception {
|
||||
synchronized (this) {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED.getVarName(), "true");
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(DirAccessTest.class.getSimpleName());
|
||||
HttpClient httpClient = new HttpClient();
|
||||
GetMethod getMethod = new GetMethod(getUrlToTest() + "/app/");
|
||||
httpClient.executeMethod(getMethod);
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
|
|||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
AbstractTestRestApi.startUp();
|
||||
AbstractTestRestApi.startUp(NotebookServerTest.class.getSimpleName());
|
||||
gson = new Gson();
|
||||
notebook = ZeppelinServer.notebook;
|
||||
notebookServer = ZeppelinServer.notebookWsServer;
|
||||
|
|
|
|||
|
|
@ -76,10 +76,10 @@ public class InterpreterFactory {
|
|||
if (null != interpreter) {
|
||||
return interpreter;
|
||||
}
|
||||
throw new RuntimeException("No such interpreter: " + replName);
|
||||
}
|
||||
return null;
|
||||
|
||||
} else {
|
||||
throw new RuntimeException("Interpreter " + group + " is not binded to this note");
|
||||
} else if (replNameSplit.length == 1){
|
||||
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
|
||||
// search 'name' from first (default) interpreter group
|
||||
// TODO(jl): Handle with noteId to support defaultInterpreter per note.
|
||||
|
|
@ -90,19 +90,15 @@ public class InterpreterFactory {
|
|||
return interpreter;
|
||||
}
|
||||
|
||||
// next, assume replName is 'group' of interpreter ('name' is ommitted)
|
||||
// next, assume replName is 'group' of interpreter ('name' is omitted)
|
||||
// search interpreter group and return first interpreter.
|
||||
setting = getInterpreterSettingByGroup(settings, replName);
|
||||
|
||||
if (null != setting) {
|
||||
return setting.getDefaultInterpreter(user, noteId);
|
||||
}
|
||||
|
||||
// Support the legacy way to use it
|
||||
for (InterpreterSetting s : settings) {
|
||||
if (s.getGroup().equals(replName)) {
|
||||
return setting.getDefaultInterpreter(user, noteId);
|
||||
}
|
||||
} else {
|
||||
throw new RuntimeException("Either no interpreter named " + replName + " or it is not " +
|
||||
"binded to this note");
|
||||
}
|
||||
}
|
||||
//TODO(zjffdu) throw InterpreterException instead of return null
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext;
|
|||
import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
|
||||
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
|
||||
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
|
||||
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
|
||||
|
|
@ -139,6 +140,7 @@ public class InterpreterSetting {
|
|||
private transient InterpreterLauncher launcher;
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private transient LifecycleManager lifecycleManager;
|
||||
|
||||
/**
|
||||
* Builder class for InterpreterSetting
|
||||
|
|
@ -233,6 +235,11 @@ public class InterpreterSetting {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setLifecycleManager(LifecycleManager lifecycleManager) {
|
||||
interpreterSetting.lifecycleManager = lifecycleManager;
|
||||
return this;
|
||||
}
|
||||
|
||||
public InterpreterSetting create() {
|
||||
// post processing
|
||||
interpreterSetting.postProcessing();
|
||||
|
|
@ -249,6 +256,9 @@ public class InterpreterSetting {
|
|||
|
||||
void postProcessing() {
|
||||
this.status = Status.READY;
|
||||
if (this.lifecycleManager == null) {
|
||||
this.lifecycleManager = new NullLifecycleManager(conf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -321,6 +331,14 @@ public class InterpreterSetting {
|
|||
this.interpreterSettingManager = interpreterSettingManager;
|
||||
}
|
||||
|
||||
public void setLifecycleManager(LifecycleManager lifecycleManager) {
|
||||
this.lifecycleManager = lifecycleManager;
|
||||
}
|
||||
|
||||
public LifecycleManager getLifecycleManager() {
|
||||
return lifecycleManager;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
|
@ -384,7 +402,7 @@ public class InterpreterSetting {
|
|||
this.interpreterGroups.remove(groupId);
|
||||
}
|
||||
|
||||
ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
|
||||
public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
|
||||
String groupId = getInterpreterGroupId(user, noteId);
|
||||
try {
|
||||
interpreterGroupReadLock.lock();
|
||||
|
|
@ -628,7 +646,7 @@ public class InterpreterSetting {
|
|||
for (InterpreterInfo info : interpreterInfos) {
|
||||
Interpreter interpreter = null;
|
||||
interpreter = new RemoteInterpreter(getJavaProperties(), sessionId,
|
||||
info.getClassName(), user);
|
||||
info.getClassName(), user, lifecycleManager);
|
||||
if (info.isDefaultInterpreter()) {
|
||||
interpreters.add(0, interpreter);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ import java.io.FileInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Type;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
|
@ -116,7 +117,7 @@ public class InterpreterSettingManager {
|
|||
private RemoteInterpreterProcessListener remoteInterpreterProcessListener;
|
||||
private ApplicationEventListener appEventListener;
|
||||
private DependencyResolver dependencyResolver;
|
||||
|
||||
private LifecycleManager lifecycleManager;
|
||||
|
||||
public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener,
|
||||
|
|
@ -153,6 +154,14 @@ public class InterpreterSettingManager {
|
|||
this.angularObjectRegistryListener = angularObjectRegistryListener;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.appEventListener = appEventListener;
|
||||
try {
|
||||
this.lifecycleManager = (LifecycleManager)
|
||||
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
|
||||
.newInstance(conf);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Fail to create LifecycleManager", e);
|
||||
}
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
|
|
@ -177,6 +186,7 @@ public class InterpreterSettingManager {
|
|||
remoteInterpreterProcessListener);
|
||||
savedInterpreterSetting.setAppEventListener(appEventListener);
|
||||
savedInterpreterSetting.setDependencyResolver(dependencyResolver);
|
||||
savedInterpreterSetting.setLifecycleManager(lifecycleManager);
|
||||
savedInterpreterSetting.setProperties(InterpreterSetting.convertInterpreterProperties(
|
||||
savedInterpreterSetting.getProperties()
|
||||
));
|
||||
|
|
@ -372,6 +382,7 @@ public class InterpreterSettingManager {
|
|||
interpreterSetting.setAppEventListener(appEventListener);
|
||||
interpreterSetting.setDependencyResolver(dependencyResolver);
|
||||
interpreterSetting.setInterpreterSettingManager(this);
|
||||
interpreterSetting.setLifecycleManager(lifecycleManager);
|
||||
interpreterSetting.postProcessing();
|
||||
interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
|
||||
}
|
||||
|
|
@ -633,6 +644,7 @@ public class InterpreterSettingManager {
|
|||
setting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
|
||||
setting.setDependencyResolver(dependencyResolver);
|
||||
setting.setAngularObjectRegistryListener(angularObjectRegistryListener);
|
||||
setting.setLifecycleManager(lifecycleManager);
|
||||
setting.setInterpreterSettingManager(this);
|
||||
setting.postProcessing();
|
||||
interpreterSettings.put(setting.getId(), setting);
|
||||
|
|
@ -645,6 +657,7 @@ public class InterpreterSettingManager {
|
|||
interpreterSettingTemplates.put(interpreterSetting.getName(), interpreterSetting);
|
||||
interpreterSetting.setAppEventListener(appEventListener);
|
||||
interpreterSetting.setDependencyResolver(dependencyResolver);
|
||||
interpreterSetting.setLifecycleManager(lifecycleManager);
|
||||
interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
|
||||
interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
|
||||
interpreterSetting.setInterpreterSettingManager(this);
|
||||
|
|
@ -790,7 +803,7 @@ public class InterpreterSettingManager {
|
|||
}
|
||||
|
||||
public void restart(String id) throws InterpreterException {
|
||||
restart(id, "", "anonymous");
|
||||
interpreterSettings.get(id).close();
|
||||
}
|
||||
|
||||
public InterpreterSetting get(String id) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
|
||||
/**
|
||||
* Interface for managing the lifecycle of interpreters
|
||||
*/
|
||||
public interface LifecycleManager {
|
||||
|
||||
void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup);
|
||||
|
||||
void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup,
|
||||
String sessionId);
|
||||
|
||||
void onInterpreterUse(ManagedInterpreterGroup interpreterGroup,
|
||||
String sessionId);
|
||||
|
||||
}
|
||||
|
|
@ -47,6 +47,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
ManagedInterpreterGroup(String id, InterpreterSetting interpreterSetting) {
|
||||
super(id);
|
||||
this.interpreterSetting = interpreterSetting;
|
||||
interpreterSetting.getLifecycleManager().onInterpreterGroupCreated(this);
|
||||
}
|
||||
|
||||
public InterpreterSetting getInterpreterSetting() {
|
||||
|
|
@ -81,14 +82,15 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
* @param sessionId
|
||||
*/
|
||||
public synchronized void close(String sessionId) {
|
||||
LOGGER.info("Close Session: " + sessionId);
|
||||
LOGGER.info("Close Session: " + sessionId + " for interpreter setting: " +
|
||||
interpreterSetting.getName());
|
||||
close(sessions.remove(sessionId));
|
||||
//TODO(zjffdu) whether close InterpreterGroup if there's no session left in Zeppelin Server
|
||||
if (sessions.isEmpty() && interpreterSetting != null) {
|
||||
LOGGER.info("Remove this InterpreterGroup: {} as all the sessions are closed", id);
|
||||
interpreterSetting.removeInterpreterGroup(id);
|
||||
if (remoteInterpreterProcess != null) {
|
||||
LOGGER.info("Kill RemoteIntetrpreterProcess");
|
||||
LOGGER.info("Kill RemoteInterpreterProcess");
|
||||
remoteInterpreterProcess.stop();
|
||||
remoteInterpreterProcess = null;
|
||||
}
|
||||
|
|
@ -134,8 +136,10 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
interpreter.setInterpreterGroup(this);
|
||||
}
|
||||
LOGGER.info("Create Session: {} in InterpreterGroup: {} for user: {}", sessionId, id, user);
|
||||
interpreterSetting.getLifecycleManager().onInterpreterSessionCreated(this, sessionId);
|
||||
sessions.put(sessionId, interpreters);
|
||||
return interpreters;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.interpreter.lifecycle;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.LifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
|
||||
/**
|
||||
* Do nothing for the lifecycle of interpreter. User need to explicitly start/stop interpreter.
|
||||
*/
|
||||
public class NullLifecycleManager implements LifecycleManager {
|
||||
|
||||
public NullLifecycleManager(ZeppelinConfiguration zConf) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup,
|
||||
String sessionId) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,75 @@
|
|||
package org.apache.zeppelin.interpreter.lifecycle;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.LifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
||||
/**
|
||||
* This lifecycle manager would close interpreter after it is timeout. By default, it is timeout
|
||||
* after no using in 1 hour.
|
||||
*
|
||||
* For now, this class only manage the lifecycle of interpreter group (will close interpreter
|
||||
* process after timeout). Managing the lifecycle of interpreter session could be done in future
|
||||
* if necessary.
|
||||
*/
|
||||
public class TimeoutLifecycleManager implements LifecycleManager {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutLifecycleManager.class);
|
||||
|
||||
// ManagerInterpreterGroup -> LastTimeUsing timestamp
|
||||
private Map<ManagedInterpreterGroup, Long> interpreterGroups = new ConcurrentHashMap<>();
|
||||
|
||||
private long checkInterval;
|
||||
private long timeoutThreshold;
|
||||
|
||||
private Timer checkTimer;
|
||||
|
||||
public TimeoutLifecycleManager(ZeppelinConfiguration zConf) {
|
||||
this.checkInterval = zConf.getLong(ZeppelinConfiguration.ConfVars
|
||||
.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL);
|
||||
this.timeoutThreshold = zConf.getLong(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD);
|
||||
this.checkTimer = new Timer(true);
|
||||
this.checkTimer.scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
long now = System.currentTimeMillis();
|
||||
for (Map.Entry<ManagedInterpreterGroup, Long> entry : interpreterGroups.entrySet()) {
|
||||
ManagedInterpreterGroup interpreterGroup = entry.getKey();
|
||||
Long lastTimeUsing = entry.getValue();
|
||||
if ((now - lastTimeUsing) > timeoutThreshold ) {
|
||||
LOGGER.info("InterpreterGroup {} is timeout.", interpreterGroup.getId());
|
||||
interpreterGroup.close();
|
||||
interpreterGroups.remove(entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
}, checkInterval, checkInterval);
|
||||
LOGGER.info("TimeoutLifecycleManager is started with checkinterval: " + checkInterval
|
||||
+ ", timeoutThreshold: " + timeoutThreshold);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup) {
|
||||
interpreterGroups.put(interpreterGroup, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup,
|
||||
String sessionId) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {
|
||||
interpreterGroups.put(interpreterGroup, System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
|
@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
|
||||
|
|
@ -66,17 +67,21 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private volatile boolean isOpened = false;
|
||||
private volatile boolean isCreated = false;
|
||||
|
||||
private LifecycleManager lifecycleManager;
|
||||
|
||||
/**
|
||||
* Remote interpreter and manage interpreter process
|
||||
*/
|
||||
public RemoteInterpreter(Properties properties,
|
||||
String sessionId,
|
||||
String className,
|
||||
String userName) {
|
||||
String userName,
|
||||
LifecycleManager lifecycleManager) {
|
||||
super(properties);
|
||||
this.sessionId = sessionId;
|
||||
this.className = className;
|
||||
this.userName = userName;
|
||||
this.lifecycleManager = lifecycleManager;
|
||||
}
|
||||
|
||||
public boolean isOpened() {
|
||||
|
|
@ -149,6 +154,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
}
|
||||
});
|
||||
isOpened = true;
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -189,6 +195,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
}
|
||||
});
|
||||
isOpened = false;
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
} else {
|
||||
LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className);
|
||||
}
|
||||
|
|
@ -218,6 +225,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
interpreterContextRunnerPool.clear(noteId);
|
||||
interpreterContextRunnerPool.addAll(noteId, runners);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
return interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
|
||||
@Override
|
||||
|
|
@ -270,6 +278,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
|
||||
@Override
|
||||
public Void call(Client client) throws Exception {
|
||||
|
|
@ -297,6 +306,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
FormType type = interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<FormType>() {
|
||||
@Override
|
||||
|
|
@ -321,6 +331,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
return interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<Integer>() {
|
||||
@Override
|
||||
|
|
@ -345,6 +356,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
return interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
|
||||
@Override
|
||||
|
|
@ -366,6 +378,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
|
||||
return interpreterProcess.callRemoteFunction(
|
||||
new RemoteInterpreterProcess.RemoteFunction<String>() {
|
||||
@Override
|
||||
|
|
@ -395,7 +408,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private RemoteInterpreterContext convert(InterpreterContext ic) {
|
||||
return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
|
||||
ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
|
||||
gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getNoteGui()),
|
||||
gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getNoteGui()),
|
||||
gson.toJson(ic.getRunners()));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -611,32 +611,39 @@ public class Note implements ParagraphJobListener, JsonSerializable {
|
|||
}
|
||||
AuthenticationInfo authenticationInfo = new AuthenticationInfo();
|
||||
authenticationInfo.setUser(cronExecutingUser);
|
||||
runAll(authenticationInfo);
|
||||
runAll(authenticationInfo, true);
|
||||
}
|
||||
|
||||
public void runAll(AuthenticationInfo authenticationInfo) {
|
||||
public void runAll(AuthenticationInfo authenticationInfo, boolean blocking) {
|
||||
for (Paragraph p : getParagraphs()) {
|
||||
if (!p.isEnabled()) {
|
||||
continue;
|
||||
}
|
||||
p.setAuthenticationInfo(authenticationInfo);
|
||||
run(p.getId());
|
||||
if (!run(p.getId(), blocking)) {
|
||||
logger.warn("Skip running the remain notes because paragraph {} fails", p.getId());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean run(String paragraphId) {
|
||||
return run(paragraphId, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a single paragraph.
|
||||
*
|
||||
* @param paragraphId ID of paragraph
|
||||
*/
|
||||
public void run(String paragraphId) {
|
||||
public boolean run(String paragraphId, boolean blocking) {
|
||||
Paragraph p = getParagraph(paragraphId);
|
||||
p.setListener(jobListenerFactory.getParagraphJobListener(this));
|
||||
|
||||
if (p.isBlankParagraph()) {
|
||||
logger.info("skip to run blank paragraph. {}", p.getId());
|
||||
p.setStatus(Job.Status.FINISHED);
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
|
||||
p.clearRuntimeInfo(null);
|
||||
|
|
@ -657,6 +664,19 @@ public class Note implements ParagraphJobListener, JsonSerializable {
|
|||
p.setAuthenticationInfo(p.getAuthenticationInfo());
|
||||
intp.getScheduler().submit(p);
|
||||
}
|
||||
|
||||
if (blocking) {
|
||||
while (!p.getStatus().isCompleted()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return p.getStatus() == Status.FINISHED;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -704,9 +724,11 @@ public class Note implements ParagraphJobListener, JsonSerializable {
|
|||
}
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
|
||||
if (intpGroup != null) {
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -719,7 +741,10 @@ public class Note implements ParagraphJobListener, JsonSerializable {
|
|||
}
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
|
||||
if (setting.getInterpreterGroup(user, id) == null) {
|
||||
continue;
|
||||
}
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
|
||||
if (registry instanceof RemoteAngularObjectRegistry) {
|
||||
|
|
|
|||
|
|
@ -334,39 +334,41 @@ public class Notebook implements NoteEventListener {
|
|||
|
||||
// remove from all interpreter instance's angular object registry
|
||||
for (InterpreterSetting settings : interpreterSettingManager.get()) {
|
||||
AngularObjectRegistry registry =
|
||||
settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
|
||||
if (registry instanceof RemoteAngularObjectRegistry) {
|
||||
// remove paragraph scope object
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId());
|
||||
InterpreterGroup interpreterGroup = settings.getInterpreterGroup(subject.getUser(), id);
|
||||
if (interpreterGroup != null) {
|
||||
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
|
||||
if (registry instanceof RemoteAngularObjectRegistry) {
|
||||
// remove paragraph scope object
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId());
|
||||
|
||||
// remove app scope object
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
if (appStates != null) {
|
||||
for (ApplicationState app : appStates) {
|
||||
((RemoteAngularObjectRegistry) registry)
|
||||
.removeAllAndNotifyRemoteProcess(id, app.getId());
|
||||
// remove app scope object
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
if (appStates != null) {
|
||||
for (ApplicationState app : appStates) {
|
||||
((RemoteAngularObjectRegistry) registry)
|
||||
.removeAllAndNotifyRemoteProcess(id, app.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// remove note scope object
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
|
||||
} else {
|
||||
// remove paragraph scope object
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
registry.removeAll(id, p.getId());
|
||||
// remove note scope object
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
|
||||
} else {
|
||||
// remove paragraph scope object
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
registry.removeAll(id, p.getId());
|
||||
|
||||
// remove app scope object
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
if (appStates != null) {
|
||||
for (ApplicationState app : appStates) {
|
||||
registry.removeAll(id, app.getId());
|
||||
// remove app scope object
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
if (appStates != null) {
|
||||
for (ApplicationState app : appStates) {
|
||||
registry.removeAll(id, app.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
// remove note scope object
|
||||
registry.removeAll(id, null);
|
||||
}
|
||||
// remove note scope object
|
||||
registry.removeAll(id, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -517,9 +519,8 @@ public class Notebook implements NoteEventListener {
|
|||
SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
|
||||
List<InterpreterSetting> settings = interpreterSettingManager.get();
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(),
|
||||
note.getId());
|
||||
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId());
|
||||
if (intpGroup != null && intpGroup.getId().equals(snapshot.getIntpGroupId())) {
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
String noteId = snapshot.getAngularObject().getNoteId();
|
||||
String paragraphId = snapshot.getAngularObject().getParagraphId();
|
||||
|
|
|
|||
|
|
@ -662,7 +662,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
note.run(getParagraphId());
|
||||
note.run(getParagraphId(), false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -321,14 +321,14 @@ public class RemoteScheduler implements Scheduler {
|
|||
if (job.isAborted()) {
|
||||
job.setStatus(Status.ABORT);
|
||||
} else if (job.getException() != null) {
|
||||
logger.debug("Job ABORT, " + job.getId());
|
||||
logger.debug("Job ABORT, " + job.getId() + ", " + job.getErrorMessage());
|
||||
job.setStatus(Status.ERROR);
|
||||
} else if (jobResult != null && jobResult instanceof InterpreterResult
|
||||
&& ((InterpreterResult) jobResult).code() == Code.ERROR) {
|
||||
logger.debug("Job Error, " + job.getId());
|
||||
logger.debug("Job Error, " + job.getId() + ", " + job.getErrorMessage());
|
||||
job.setStatus(Status.ERROR);
|
||||
} else {
|
||||
logger.debug("Job Finished, " + job.getId());
|
||||
logger.debug("Job Finished, " + job.getId() + ", Result: " + job.getReturn());
|
||||
job.setStatus(Status.FINISHED);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ public abstract class AbstractInterpreterTest {
|
|||
protected File interpreterDir;
|
||||
protected File confDir;
|
||||
protected File notebookDir;
|
||||
protected ZeppelinConfiguration conf;
|
||||
protected ZeppelinConfiguration conf = new ZeppelinConfiguration();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
|
@ -54,9 +54,9 @@ public abstract class AbstractInterpreterTest {
|
|||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "test,mock1,mock2,mock_resource_pool");
|
||||
|
||||
conf = new ZeppelinConfiguration();
|
||||
conf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "test,mock1,mock2,mock_resource_pool");
|
||||
interpreterSettingManager = new InterpreterSettingManager(conf,
|
||||
mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
|
||||
interpreterFactory = new InterpreterFactory(interpreterSettingManager);
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
@ -25,6 +26,7 @@ import java.io.IOException;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class InterpreterFactoryTest extends AbstractInterpreterTest {
|
||||
|
||||
|
|
@ -36,31 +38,41 @@ public class InterpreterFactoryTest extends AbstractInterpreterTest {
|
|||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "") instanceof RemoteInterpreter);
|
||||
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "");
|
||||
// EchoInterpreter is the default interpreter (see zeppelin-interpreter/src/test/resources/conf/interpreter.json)
|
||||
// EchoInterpreter is the default interpreter because mock1 is the default interpreter group
|
||||
assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
|
||||
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test") instanceof RemoteInterpreter);
|
||||
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test");
|
||||
assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
|
||||
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "echo") instanceof RemoteInterpreter);
|
||||
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "echo");
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test2") instanceof RemoteInterpreter);
|
||||
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test2");
|
||||
assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
|
||||
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "double_echo") instanceof RemoteInterpreter);
|
||||
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "double_echo");
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test2.double_echo") instanceof RemoteInterpreter);
|
||||
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test2.double_echo");
|
||||
assertEquals(DoubleEchoInterpreter.class.getName(), remoteInterpreter.getClassName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnknownRepl1() throws IOException {
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertNull(interpreterFactory.getInterpreter("user1", "note1", "test.unknown_repl"));
|
||||
try {
|
||||
interpreterFactory.getInterpreter("user1", "note1", "test.unknown_repl");
|
||||
fail("should fail due to no such interpreter");
|
||||
} catch (RuntimeException e) {
|
||||
assertEquals("No such interpreter: test.unknown_repl", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnknownRepl2() throws IOException {
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertNull(interpreterFactory.getInterpreter("user1", "note1", "unknown_repl"));
|
||||
try {
|
||||
interpreterFactory.getInterpreter("user1", "note1", "unknown_repl");
|
||||
fail("should fail due to no such interpreter");
|
||||
} catch (RuntimeException e) {
|
||||
assertEquals("Either no interpreter named unknown_repl or it is not binded to this note", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,9 +50,9 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
|
|||
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
|
||||
assertEquals("test", interpreterSetting.getName());
|
||||
assertEquals("test", interpreterSetting.getGroup());
|
||||
assertEquals(2, interpreterSetting.getInterpreterInfos().size());
|
||||
assertEquals(3, interpreterSetting.getInterpreterInfos().size());
|
||||
// 3 other builtin properties:
|
||||
// * zeppelin.interpeter.output.limit
|
||||
// * zeppelin.interpreter.output.limit
|
||||
// * zeppelin.interpreter.localRepo
|
||||
// * zeppelin.interpreter.max.poolsize
|
||||
assertEquals(6, interpreterSetting.getJavaProperties().size());
|
||||
|
|
@ -67,7 +67,6 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
|
|||
assertNotNull(interpreterSetting.getAppEventListener());
|
||||
assertNotNull(interpreterSetting.getDependencyResolver());
|
||||
assertNotNull(interpreterSetting.getInterpreterSettingManager());
|
||||
assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath());
|
||||
|
||||
List<RemoteRepository> repositories = interpreterSettingManager.getRepositories();
|
||||
assertEquals(2, repositories.size());
|
||||
|
|
@ -80,14 +79,13 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
|
|||
interpreterSetting = interpreterSettingManager2.getByName("test");
|
||||
assertEquals("test", interpreterSetting.getName());
|
||||
assertEquals("test", interpreterSetting.getGroup());
|
||||
assertEquals(2, interpreterSetting.getInterpreterInfos().size());
|
||||
assertEquals(3, interpreterSetting.getInterpreterInfos().size());
|
||||
assertEquals(6, interpreterSetting.getJavaProperties().size());
|
||||
assertEquals("value_1", interpreterSetting.getJavaProperties().getProperty("property_1"));
|
||||
assertEquals("new_value_2", interpreterSetting.getJavaProperties().getProperty("property_2"));
|
||||
assertEquals("value_3", interpreterSetting.getJavaProperties().getProperty("property_3"));
|
||||
assertEquals("shared", interpreterSetting.getOption().perNote);
|
||||
assertEquals("shared", interpreterSetting.getOption().perUser);
|
||||
assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath());
|
||||
assertEquals(0, interpreterSetting.getDependencies().size());
|
||||
|
||||
repositories = interpreterSettingManager2.getRepositories();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.lifecycle;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS.getVarName(),
|
||||
TimeoutLifecycleManager.class.getName());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL.getVarName(), "1000");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000");
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeout_1() throws InterpreterException, InterruptedException, IOException {
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.echo") instanceof RemoteInterpreter);
|
||||
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.echo");
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
remoteInterpreter.interpret("hello world", context);
|
||||
assertTrue(remoteInterpreter.isOpened());
|
||||
InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
|
||||
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
|
||||
|
||||
Thread.sleep(15 * 1000);
|
||||
// interpreterGroup is timeout, so is removed.
|
||||
assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
|
||||
assertFalse(remoteInterpreter.isOpened());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeout_2() throws InterpreterException, InterruptedException, IOException {
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.sleep") instanceof RemoteInterpreter);
|
||||
final RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.sleep");
|
||||
|
||||
// simulate how zeppelin submit paragraph
|
||||
remoteInterpreter.getScheduler().submit(new Job("test-job", null) {
|
||||
@Override
|
||||
public Object getReturn() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int progress() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> info() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object jobRun() throws Throwable {
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
return remoteInterpreter.interpret("100000", context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean jobAbort() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResult(Object results) {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
while(!remoteInterpreter.isOpened()) {
|
||||
Thread.sleep(1000);
|
||||
LOGGER.info("Wait for interpreter to be started");
|
||||
}
|
||||
|
||||
InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
|
||||
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
|
||||
|
||||
Thread.sleep(15 * 1000);
|
||||
// interpreterGroup is not timeout because getStatus is called periodically.
|
||||
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
|
||||
assertTrue(remoteInterpreter.isOpened());
|
||||
}
|
||||
}
|
||||
|
|
@ -20,6 +20,8 @@ package org.apache.zeppelin.interpreter.remote;
|
|||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.display.Input;
|
||||
import org.apache.zeppelin.display.ui.OptionInput;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.GetAngularObjectSizeInterpreter;
|
||||
|
|
@ -32,11 +34,11 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class RemoteInterpreterTest {
|
||||
|
||||
|
|
@ -413,4 +415,27 @@ public class RemoteInterpreterTest {
|
|||
assertEquals("null", interpreter1.interpret("getProperty property_2", context1).message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertDynamicForms() throws InterpreterException {
|
||||
GUI gui = new GUI();
|
||||
OptionInput.ParamOption[] paramOptions = {
|
||||
new OptionInput.ParamOption("value1", "param1"),
|
||||
new OptionInput.ParamOption("value2", "param2")
|
||||
};
|
||||
List<Object> defaultValues = new ArrayList();
|
||||
defaultValues.add("default1");
|
||||
defaultValues.add("default2");
|
||||
gui.checkbox("checkbox_id", defaultValues, paramOptions);
|
||||
gui.select("select_id", "default", paramOptions);
|
||||
gui.textbox("textbox_id");
|
||||
Map<String, Input> expected = new LinkedHashMap<>(gui.getForms());
|
||||
Interpreter interpreter = interpreterSetting.getDefaultInterpreter("user1", "note1");
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl", null,
|
||||
null, AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), gui,
|
||||
null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
|
||||
interpreter.interpret("text", context);
|
||||
assertArrayEquals(expected.values().toArray(), gui.getForms().values().toArray());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
Map config = p1.getConfig();
|
||||
config.put("enabled", true);
|
||||
p1.setConfig(config);
|
||||
p1.setText("hello world");
|
||||
p1.setText("%mock1 hello world");
|
||||
p1.setAuthenticationInfo(anonymous);
|
||||
note.run(p1.getId());
|
||||
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
|
||||
|
|
@ -268,7 +268,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
Map config = p1.getConfig();
|
||||
config.put("enabled", true);
|
||||
p1.setConfig(config);
|
||||
p1.setText("hello world");
|
||||
p1.setText("%mock1 hello world");
|
||||
p1.setAuthenticationInfo(anonymous);
|
||||
note.run(p1.getId());
|
||||
|
||||
|
|
@ -305,27 +305,22 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
Map config1 = p1.getConfig();
|
||||
config1.put("enabled", true);
|
||||
p1.setConfig(config1);
|
||||
p1.setText("p1");
|
||||
p1.setText("%mock1 p1");
|
||||
|
||||
// p2
|
||||
Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
Map config2 = p2.getConfig();
|
||||
config2.put("enabled", false);
|
||||
p2.setConfig(config2);
|
||||
p2.setText("p2");
|
||||
p2.setText("%mock1 p2");
|
||||
|
||||
// p3
|
||||
Paragraph p3 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p3.setText("p3");
|
||||
p3.setText("%mock1 p3");
|
||||
|
||||
// when
|
||||
note.runAll();
|
||||
|
||||
// wait for finish
|
||||
while(p3.isTerminated() == false || p3.getResult() == null) {
|
||||
Thread.yield();
|
||||
}
|
||||
|
||||
assertEquals("repl1: p1", p1.getResult().message().get(0).getData());
|
||||
assertNull(p2.getResult());
|
||||
assertEquals("repl1: p3", p3.getResult().message().get(0).getData());
|
||||
|
|
@ -415,7 +410,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
Map config = new HashMap<>();
|
||||
p.setConfig(config);
|
||||
p.setText("sleep 1000");
|
||||
p.setText("%mock1 sleep 1000");
|
||||
|
||||
Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p2.setConfig(config);
|
||||
|
|
@ -466,9 +461,6 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
p.setText(simpleText);
|
||||
|
||||
note.runAll();
|
||||
while (p.isTerminated() == false || p.getResult() == null) {
|
||||
Thread.yield();
|
||||
}
|
||||
|
||||
String exportedNoteJson = notebook.exportNote(note.getId());
|
||||
|
||||
|
|
@ -503,7 +495,6 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p.setText("hello world");
|
||||
note.runAll();
|
||||
while(p.isTerminated()==false || p.getResult()==null) Thread.yield();
|
||||
|
||||
p.setStatus(Status.RUNNING);
|
||||
Note cloneNote = notebook.cloneNote(note.getId(), "clone note", anonymous);
|
||||
|
|
@ -549,9 +540,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p.setText("hello world");
|
||||
note.runAll();
|
||||
while (p.isTerminated() == false || p.getResult() == null) {
|
||||
Thread.yield();
|
||||
}
|
||||
|
||||
// Force paragraph to have String type object
|
||||
p.setResult("Exception");
|
||||
|
||||
|
|
@ -572,15 +561,13 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds());
|
||||
|
||||
Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("hello");
|
||||
p1.setText("%mock1 hello");
|
||||
Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p2.setText("%mock2 world");
|
||||
for (InterpreterGroup intpGroup : interpreterSettingManager.getAllInterpreterGroup()) {
|
||||
intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
|
||||
}
|
||||
note.runAll();
|
||||
while (p1.isTerminated() == false || p1.getResult() == null) Thread.yield();
|
||||
while (p2.isTerminated() == false || p2.getResult() == null) Thread.yield();
|
||||
|
||||
assertEquals(2, interpreterSettingManager.getAllResources().size());
|
||||
|
||||
|
|
@ -796,14 +783,14 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
|
||||
// create three paragraphs
|
||||
Paragraph p1 = note.addNewParagraph(anonymous);
|
||||
p1.setText("sleep 1000");
|
||||
p1.setText("%mock1 sleep 1000");
|
||||
Paragraph p2 = note.addNewParagraph(anonymous);
|
||||
p2.setText("sleep 1000");
|
||||
p2.setText("%mock1 sleep 1000");
|
||||
Paragraph p3 = note.addNewParagraph(anonymous);
|
||||
p3.setText("sleep 1000");
|
||||
p3.setText("%mock1 sleep 1000");
|
||||
|
||||
|
||||
note.runAll();
|
||||
note.runAll(AuthenticationInfo.ANONYMOUS, false);
|
||||
|
||||
// wait until first paragraph finishes and second paragraph starts
|
||||
while (p1.getStatus() != Status.FINISHED || p2.getStatus() != Status.RUNNING) Thread.yield();
|
||||
|
|
@ -813,9 +800,9 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
assertEquals(Status.PENDING, p3.getStatus());
|
||||
|
||||
// restart interpreter
|
||||
interpreterSettingManager.restart(interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getId());
|
||||
interpreterSettingManager.restart(interpreterSettingManager.getInterpreterSettingByName("mock1").getId());
|
||||
|
||||
// make sure three differnt status aborted well.
|
||||
// make sure three different status aborted well.
|
||||
assertEquals(Status.FINISHED, p1.getStatus());
|
||||
assertEquals(Status.ABORT, p2.getStatus());
|
||||
assertEquals(Status.ABORT, p3.getStatus());
|
||||
|
|
@ -828,7 +815,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
// create a notes
|
||||
Note note1 = notebook.createNote(anonymous);
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("getId");
|
||||
p1.setText("%mock1 getId");
|
||||
p1.setAuthenticationInfo(anonymous);
|
||||
|
||||
// restart interpreter with per user session enabled
|
||||
|
|
@ -845,7 +832,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
notebook.removeNote(note1.getId(), anonymous);
|
||||
note1 = notebook.createNote(anonymous);
|
||||
p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("getId");
|
||||
p1.setText("%mock1 getId");
|
||||
p1.setAuthenticationInfo(anonymous);
|
||||
|
||||
note1.run(p1.getId());
|
||||
|
|
@ -864,9 +851,9 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
Note note2 = notebook.createNote(anonymous);
|
||||
Paragraph p2 = note2.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
|
||||
p1.setText("getId");
|
||||
p1.setText("%mock1 getId");
|
||||
p1.setAuthenticationInfo(anonymous);
|
||||
p2.setText("getId");
|
||||
p2.setText("%mock1 getId");
|
||||
p2.setAuthenticationInfo(anonymous);
|
||||
|
||||
// run per note session disabled
|
||||
|
|
@ -908,9 +895,9 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
Note note2 = notebook.createNote(anonymous);
|
||||
Paragraph p2 = note2.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
|
||||
p1.setText("getId");
|
||||
p1.setText("%mock1 getId");
|
||||
p1.setAuthenticationInfo(anonymous);
|
||||
p2.setText("getId");
|
||||
p2.setText("%mock1 getId");
|
||||
p2.setAuthenticationInfo(anonymous);
|
||||
|
||||
// shared mode.
|
||||
|
|
@ -925,8 +912,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
// restart interpreter with scoped mode enabled
|
||||
for (InterpreterSetting setting : notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId())) {
|
||||
setting.getOption().setPerNote(InterpreterOption.SCOPED);
|
||||
notebook.getInterpreterSettingManager().restart(setting.getId(), note1.getId(), anonymous.getUser());
|
||||
notebook.getInterpreterSettingManager().restart(setting.getId(), note2.getId(), anonymous.getUser());
|
||||
notebook.getInterpreterSettingManager().restart(setting.getId());
|
||||
}
|
||||
|
||||
// run per note session enabled
|
||||
|
|
@ -941,8 +927,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
// restart interpreter with isolated mode enabled
|
||||
for (InterpreterSetting setting : notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId())) {
|
||||
setting.getOption().setPerNote(InterpreterOption.ISOLATED);
|
||||
notebook.getInterpreterSettingManager().restart(setting.getId(), note1.getId(), anonymous.getUser());
|
||||
notebook.getInterpreterSettingManager().restart(setting.getId(), note2.getId(), anonymous.getUser());
|
||||
setting.getInterpreterSettingManager().restart(setting.getId());
|
||||
}
|
||||
|
||||
// run per note process enabled
|
||||
|
|
@ -964,7 +949,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
|
|||
Note note1 = notebook.createNote(anonymous);
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setAuthenticationInfo(anonymous);
|
||||
p1.setText("getId");
|
||||
p1.setText("%mock1 getId");
|
||||
|
||||
// restart interpreter with per note session enabled
|
||||
for (InterpreterSetting setting : interpreterSettingManager.getInterpreterSettings(note1.getId())) {
|
||||
|
|
|
|||
|
|
@ -42,8 +42,19 @@
|
|||
"description": "desc_2"
|
||||
}
|
||||
},
|
||||
"runner": {
|
||||
"linux": "linux_runner"
|
||||
"editor": {
|
||||
"language": "java",
|
||||
"editOnDblClick": false
|
||||
}
|
||||
},
|
||||
|
||||
{
|
||||
"group": "test",
|
||||
"name": "sleep",
|
||||
"defaultInterpreter": false,
|
||||
"className": "org.apache.zeppelin.interpreter.SleepInterpreter",
|
||||
"properties": {
|
||||
|
||||
},
|
||||
"editor": {
|
||||
"language": "java",
|
||||
|
|
|
|||
Loading…
Reference in a new issue