Merge remote-tracking branch 'origin/master' into ZEPPELIN-2813

This commit is contained in:
tinkoff-dwh 2017-10-31 15:45:54 +03:00
commit 2fd89a82f2
51 changed files with 784 additions and 239 deletions

View file

@ -411,6 +411,25 @@
<description>Enable directory listings on server.</description>
</property>
<property>
<name>zeppelin.interpreter.lifecyclemanager.class</name>
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
<description>LifecycleManager class for managing the lifecycle of interpreters, by default interpreter will
be closed after timeout</description>
</property>
<property>
<name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
<value>60000</value>
<description>milliseconds of the interval to checking whether interpreter is time out</description>
</property>
<property>
<name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name>
<value>3600000</value>
<description>milliseconds of the interpreter timeout threshold, by default it is 1 hour</description>
</property>
<!--
<property>
<name>zeppelin.server.jetty.name</name>

View file

@ -145,6 +145,11 @@ You can also set other Spark properties which are not listed in the table. For a
<td>true</td>
<td>Do not change - developer only setting, not for production use</td>
</tr>
<tr>
<td>zeppelin.spark.uiWebUrl</td>
<td></td>
<td>Overrides Spark UI default URL. Value should be a full URL (ex: http://{hostName}/{uniquePath}</td>
</tr>
</table>
Without any configuration, Spark interpreter works out of box in local mode. But if you want to connect to your Spark cluster, you'll need to follow below two simple steps.
@ -184,7 +189,7 @@ For example,
* **yarn-cluster** in Yarn cluster mode
* **mesos://host:5050** in Mesos cluster
That's it. Zeppelin will work with any version of Spark and any deployment type without rebuilding Zeppelin in this way.
That's it. Zeppelin will work with any version of Spark and any deployment type without rebuilding Zeppelin in this way.
For the further information about Spark & Zeppelin version compatibility, please refer to "Available Interpreters" section in [Zeppelin download page](https://zeppelin.apache.org/download.html).
> Note that without exporting `SPARK_HOME`, it's running in local mode with included version of Spark. The included version may vary depending on the build profile.
@ -216,7 +221,7 @@ There are two ways to load external libraries in Spark interpreter. First is usi
Please see [Dependency Management](../usage/interpreter/dependency_management.html) for the details.
### 2. Loading Spark Properties
Once `SPARK_HOME` is set in `conf/zeppelin-env.sh`, Zeppelin uses `spark-submit` as spark interpreter runner. `spark-submit` supports two ways to load configurations.
Once `SPARK_HOME` is set in `conf/zeppelin-env.sh`, Zeppelin uses `spark-submit` as spark interpreter runner. `spark-submit` supports two ways to load configurations.
The first is command line options such as --master and Zeppelin can pass these options to `spark-submit` by exporting `SPARK_SUBMIT_OPTIONS` in `conf/zeppelin-env.sh`. Second is reading configuration options from `SPARK_HOME/conf/spark-defaults.conf`. Spark properties that user can set to distribute libraries are:
<table class="table-configuration">
@ -249,7 +254,7 @@ Here are few examples:
```bash
export SPARK_SUBMIT_OPTIONS="--packages com.databricks:spark-csv_2.10:1.2.0 --jars /path/mylib1.jar,/path/mylib2.jar --files /path/mylib1.py,/path/mylib2.zip,/path/mylib3.egg"
```
* `SPARK_HOME/conf/spark-defaults.conf`
```
@ -414,17 +419,17 @@ To learn more about dynamic form, checkout [Dynamic Form](../usage/dynamic_form/
## Matplotlib Integration (pyspark)
Both the `python` and `pyspark` interpreters have built-in support for inline visualization using `matplotlib`,
a popular plotting library for python. More details can be found in the [python interpreter documentation](../interpreter/python.html),
since matplotlib support is identical. More advanced interactive plotting can be done with pyspark through
Both the `python` and `pyspark` interpreters have built-in support for inline visualization using `matplotlib`,
a popular plotting library for python. More details can be found in the [python interpreter documentation](../interpreter/python.html),
since matplotlib support is identical. More advanced interactive plotting can be done with pyspark through
utilizing Zeppelin's built-in [Angular Display System](../usage/display_system/angular_backend.html), as shown below:
<img class="img-responsive" src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/matplotlibAngularExample.gif" />
## Interpreter setting option
You can choose one of `shared`, `scoped` and `isolated` options wheh you configure Spark interpreter.
Spark interpreter creates separated Scala compiler per each notebook but share a single SparkContext in `scoped` mode (experimental).
You can choose one of `shared`, `scoped` and `isolated` options wheh you configure Spark interpreter.
Spark interpreter creates separated Scala compiler per each notebook but share a single SparkContext in `scoped` mode (experimental).
It creates separated SparkContext per each notebook in `isolated` mode.
## IPython support

View file

@ -34,12 +34,12 @@ import java.util.Properties;
public class KnitR extends Interpreter implements WrappedInterpreter {
KnitRInterpreter intp;
public KnitR(Properties property, Boolean startSpark) {
super(property);
intp = new KnitRInterpreter(property, startSpark);
public KnitR(Properties properties, Boolean startSpark) {
super(properties);
intp = new KnitRInterpreter(properties, startSpark);
}
public KnitR(Properties property) {
this(property, true);
public KnitR(Properties properties) {
this(properties, true);
}
public KnitR() {
@ -47,38 +47,39 @@ public class KnitR extends Interpreter implements WrappedInterpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
intp.open();
}
@Override
public void close() {
public void close() throws InterpreterException {
intp.close();
}
@Override
public InterpreterResult interpret(String s, InterpreterContext interpreterContext) {
public InterpreterResult interpret(String s, InterpreterContext interpreterContext)
throws InterpreterException {
return intp.interpret(s, interpreterContext);
}
@Override
public void cancel(InterpreterContext interpreterContext) {
public void cancel(InterpreterContext interpreterContext) throws InterpreterException {
intp.cancel(interpreterContext);
}
@Override
public FormType getFormType() {
public FormType getFormType() throws InterpreterException {
return intp.getFormType();
}
@Override
public int getProgress(InterpreterContext interpreterContext) {
public int getProgress(InterpreterContext interpreterContext) throws InterpreterException {
return intp.getProgress(interpreterContext);
}
@Override
public List<InterpreterCompletion> completion(String s, int i,
InterpreterContext interpreterContext) {
InterpreterContext interpreterContext) throws InterpreterException {
List completion = intp.completion(s, i, interpreterContext);
return completion;
}
@ -94,14 +95,14 @@ public class KnitR extends Interpreter implements WrappedInterpreter {
}
@Override
public void setProperty(Properties property) {
super.setProperty(property);
intp.setProperty(property);
public void setProperties(Properties properties) {
super.setProperties(properties);
intp.setProperties(properties);
}
@Override
public Properties getProperty() {
return intp.getProperty();
public Properties getProperties() {
return intp.getProperties();
}
@Override

View file

@ -34,12 +34,12 @@ import java.util.Properties;
public class RRepl extends Interpreter implements WrappedInterpreter {
RReplInterpreter intp;
public RRepl(Properties property, Boolean startSpark) {
super(property);
intp = new RReplInterpreter(property, startSpark);
public RRepl(Properties properties, Boolean startSpark) {
super(properties);
intp = new RReplInterpreter(properties, startSpark);
}
public RRepl(Properties property) {
this(property, true);
public RRepl(Properties properties) {
this(properties, true);
}
public RRepl() {
@ -47,38 +47,39 @@ public class RRepl extends Interpreter implements WrappedInterpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
intp.open();
}
@Override
public void close() {
public void close() throws InterpreterException {
intp.close();
}
@Override
public InterpreterResult interpret(String s, InterpreterContext interpreterContext) {
public InterpreterResult interpret(String s, InterpreterContext interpreterContext)
throws InterpreterException {
return intp.interpret(s, interpreterContext);
}
@Override
public void cancel(InterpreterContext interpreterContext) {
public void cancel(InterpreterContext interpreterContext) throws InterpreterException {
intp.cancel(interpreterContext);
}
@Override
public FormType getFormType() {
public FormType getFormType() throws InterpreterException {
return intp.getFormType();
}
@Override
public int getProgress(InterpreterContext interpreterContext) {
public int getProgress(InterpreterContext interpreterContext) throws InterpreterException {
return intp.getProgress(interpreterContext);
}
@Override
public List<InterpreterCompletion> completion(String s, int i,
InterpreterContext interpreterContext) {
InterpreterContext interpreterContext) throws InterpreterException {
List completion = intp.completion(s, i, interpreterContext);
return completion;
}
@ -94,14 +95,14 @@ public class RRepl extends Interpreter implements WrappedInterpreter {
}
@Override
public void setProperty(Properties property) {
super.setProperty(property);
intp.setProperty(property);
public void setProperties(Properties properties) {
super.setProperties(properties);
intp.setProperties(properties);
}
@Override
public Properties getProperty() {
return intp.getProperty();
public Properties getProperties() {
return intp.getProperties();
}
@Override

View file

@ -27,9 +27,9 @@ import org.apache.zeppelin.interpreter.InterpreterResult
import org.apache.zeppelin.rinterpreter.rscala.RException
class KnitRInterpreter(property: Properties, startSpark : Boolean = true) extends RInterpreter(property, startSpark) {
def this(property : Properties) = {
this(property, true)
class KnitRInterpreter(properties: Properties, startSpark : Boolean = true) extends RInterpreter(properties, startSpark) {
def this(properties : Properties) = {
this(properties, true)
}
override def open: Unit = {

View file

@ -41,7 +41,7 @@ abstract class RInterpreter(properties : Properties, startSpark : Boolean = true
def getrContext: RContext = rContext
protected lazy val rContext : RContext = synchronized{ RContext(property, this.getInterpreterGroup().getId()) }
protected lazy val rContext : RContext = synchronized{ RContext(properties, this.getInterpreterGroup().getId()) }
def open: Unit = rContext.synchronized {
logger.trace("RInterpreter opening")

View file

@ -26,12 +26,12 @@ import org.apache.zeppelin.interpreter.InterpreterContext
import org.apache.zeppelin.interpreter.InterpreterResult
import org.apache.zeppelin.rinterpreter.rscala.RException
class RReplInterpreter(property: Properties, startSpark : Boolean = true) extends RInterpreter(property, startSpark) {
class RReplInterpreter(properties: Properties, startSpark : Boolean = true) extends RInterpreter(properties, startSpark) {
// protected val rContext : RContext = RContext(property)
// protected val rContext : RContext = RContext(properties)
def this(property : Properties) = {
this(property, true)
def this(properties : Properties) = {
this(properties, true)
}
private var firstCell : Boolean = true
def interpret(st: String, context: InterpreterContext): InterpreterResult = {

View file

@ -85,7 +85,7 @@ class RInterpreterTest extends FlatSpec {
it should "have persistent properties" in {
val props = new Properties()
props.setProperty("hello", "world")
rint.setProperty(props)
rint.setProperties(props)
assertResult("world") {
rint.getProperty("hello")
}

View file

@ -47,7 +47,10 @@ import org.slf4j.LoggerFactory;
*/
public class ShellInterpreter extends KerberosInterpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(ShellInterpreter.class);
private static final String TIMEOUT_PROPERTY = "shell.command.timeout.millisecs";
private String DEFAULT_TIMEOUT_PROPERTY = "60000";
private static final String DIRECTORY_USER_HOME = "shell.working.directory.user.home";
private final boolean isWindows = System.getProperty("os.name").startsWith("Windows");
private final String shell = isWindows ? "cmd /c" : "bash -c";
@ -98,7 +101,9 @@ public class ShellInterpreter extends KerberosInterpreter {
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(new PumpStreamHandler(
contextInterpreter.out, contextInterpreter.out));
executor.setWatchdog(new ExecuteWatchdog(Long.valueOf(getProperty(TIMEOUT_PROPERTY))));
executor.setWatchdog(new ExecuteWatchdog(
Long.valueOf(getProperty(TIMEOUT_PROPERTY, DEFAULT_TIMEOUT_PROPERTY))));
executors.put(contextInterpreter.getParagraphId(), executor);
if (Boolean.valueOf(getProperty(DIRECTORY_USER_HOME))) {
executor.setWorkingDirectory(new File(System.getProperty("user.home")));

View file

@ -26,11 +26,9 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -51,7 +49,6 @@ import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.SparkUI;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.DefaultInterpreterProperty;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@ -72,7 +69,6 @@ import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import scala.Console;
import scala.Enumeration.Value;
import scala.None;
@ -206,7 +202,7 @@ public class SparkInterpreter extends Interpreter {
private String getJobUrl(int jobId) {
String jobUrl = null;
if (sparkUrl != null) {
jobUrl = sparkUrl + "/jobs/job?id=" + jobId;
jobUrl = sparkUrl + "/jobs/job/?id=" + jobId;
}
return jobUrl;
}
@ -936,6 +932,11 @@ public class SparkInterpreter extends Interpreter {
return sparkUrl;
}
String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
if (!StringUtils.isBlank(sparkUrlProp)) {
return sparkUrlProp;
}
if (sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0)) {
Option<String> uiWebUrlOption = (Option<String>) Utils.invokeMethod(sc, "uiWebUrl");
if (uiWebUrlOption.isDefined()) {

View file

@ -67,6 +67,13 @@
"defaultValue": true,
"description": "Do not change - developer only setting, not for production use",
"type": "checkbox"
},
"zeppelin.spark.uiWebUrl": {
"envName": null,
"propertyName": "zeppelin.spark.uiWebUrl",
"defaultValue": "",
"description": "Override Spark UI default URL",
"type": "string"
}
},
"editor": {

View file

@ -347,7 +347,7 @@ public class SparkInterpreterTest {
}
String sparkUIUrl = repl.getSparkUIUrl();
assertNotNull(jobUrl);
assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id="));
assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job/?id="));
}
}

View file

@ -534,6 +534,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT);
}
public String getLifecycleManagerClass() {
return getString(ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS);
}
public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf,
ConfigurationKeyPredicate predicate) {
@ -701,7 +704,14 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""),
ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":");
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager"),
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL(
"zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", 6000L),
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD(
"zeppelin.interpreter.lifecyclemanager.timeout.threshold", 3600000L);
private String varName;
@SuppressWarnings("rawtypes")

View file

@ -107,7 +107,13 @@ public class Input<T> implements Serializable {
if (displayName != null ? !displayName.equals(input.displayName) : input.displayName != null) {
return false;
}
if (defaultValue != null ?
if (defaultValue instanceof Object[]) {
if (defaultValue != null ?
!Arrays.equals((Object[]) defaultValue, (Object[]) input.defaultValue)
: input.defaultValue != null) {
return false;
}
} else if (defaultValue != null ?
!defaultValue.equals(input.defaultValue) : input.defaultValue != null) {
return false;
}

View file

@ -142,4 +142,23 @@ public class InterpreterGroup {
public boolean isEmpty() {
return sessions.isEmpty();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof InterpreterGroup)) {
return false;
}
InterpreterGroup that = (InterpreterGroup) o;
return id != null ? id.equals(that.id) : that.id == null;
}
@Override
public int hashCode() {
return id != null ? id.hashCode() : 0;
}
}

View file

@ -63,6 +63,10 @@ public abstract class Job {
public boolean isPending() {
return this == PENDING;
}
public boolean isCompleted() {
return this == FINISHED || this == ERROR || this == ABORT;
}
}
private String jobName;

View file

@ -650,7 +650,7 @@ public class NotebookRestApi {
checkIfUserCanRun(noteId, "Insufficient privileges you cannot run job for this note");
try {
note.runAll(subject);
note.runAll(subject, true);
} catch (Exception ex) {
LOG.error("Exception from run", ex);
return new JsonResponse<>(Status.PRECONDITION_FAILED,

View file

@ -1379,13 +1379,13 @@ public class NotebookServer extends WebSocketServlet
List<InterpreterSetting> settings =
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) {
if (setting.getInterpreterGroup(user, note.getId()) == null) {
continue;
}
if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, note.getId())
if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId())
.getId())) {
AngularObjectRegistry angularObjectRegistry =
setting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
// first trying to get local registry
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
@ -1422,13 +1422,13 @@ public class NotebookServer extends WebSocketServlet
List<InterpreterSetting> settings =
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) {
if (setting.getInterpreterGroup(user, n.getId()) == null) {
continue;
}
if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId())
if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId())
.getId())) {
AngularObjectRegistry angularObjectRegistry =
setting.getOrCreateInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
this.broadcastExcept(n.getId(),
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId())
@ -1681,7 +1681,10 @@ public class NotebookServer extends WebSocketServlet
Paragraph p = setParagraphUsingMessage(note, fromMessage,
paragraphId, text, title, params, config);
persistAndExecuteSingleParagraph(conn, note, p);
if (!persistAndExecuteSingleParagraph(conn, note, p, true)) {
// stop execution when one paragraph fails.
break;
}
}
}
@ -1773,7 +1776,7 @@ public class NotebookServer extends WebSocketServlet
Paragraph p = setParagraphUsingMessage(note, fromMessage, paragraphId,
text, title, params, config);
persistAndExecuteSingleParagraph(conn, note, p);
persistAndExecuteSingleParagraph(conn, note, p, false);
}
private void addNewParagraphIfLastParagraphIsExecuted(Note note, Paragraph p) {
@ -1805,15 +1808,16 @@ public class NotebookServer extends WebSocketServlet
}
}
private void persistAndExecuteSingleParagraph(NotebookSocket conn,
Note note, Paragraph p) throws IOException {
private boolean persistAndExecuteSingleParagraph(NotebookSocket conn,
Note note, Paragraph p,
boolean blocking) throws IOException {
addNewParagraphIfLastParagraphIsExecuted(note, p);
if (!persistNoteWithAuthInfo(conn, note, p)) {
return;
return false;
}
try {
note.run(p.getId());
return note.run(p.getId(), blocking);
} catch (Exception ex) {
LOG.error("Exception from run", ex);
if (p != null) {
@ -1821,6 +1825,7 @@ public class NotebookServer extends WebSocketServlet
p.setStatus(Status.ERROR);
broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", p));
}
return false;
}
}
@ -2319,14 +2324,17 @@ public class NotebookServer extends WebSocketServlet
}
for (InterpreterSetting intpSetting : settings) {
if (intpSetting.getInterpreterGroup(user, note.getId()) == null) {
continue;
}
AngularObjectRegistry registry =
intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
intpSetting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
List<AngularObject> objects = registry.getAllWithGlobal(note.getId());
for (AngularObject object : objects) {
conn.send(serializeMessage(
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
.put("interpreterGroupId",
intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getId())
intpSetting.getInterpreterGroup(user, note.getId()).getId())
.put("noteId", note.getId()).put("paragraphId", object.getParagraphId())));
}
}

View file

@ -90,6 +90,9 @@ public abstract class AbstractTestRestApi {
"/api/version = anon\n" +
"/** = authc";
protected static File zeppelinHome;
protected static File confDir;
private String getUrl(String path) {
String url;
if (System.getProperty("url") != null) {
@ -124,10 +127,17 @@ public abstract class AbstractTestRestApi {
}
};
private static void start(boolean withAuth) throws Exception {
private static void start(boolean withAuth, String testClassName) throws Exception {
if (!wasRunning) {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
// copy the resources files to a temp folder
zeppelinHome = new File("..");
LOG.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
confDir = new File(zeppelinHome, "conf_" + testClassName);
confDir.mkdirs();
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), new File("../zeppelin-web/dist").getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
// some test profile does not build zeppelin-web.
// to prevent zeppelin starting up fail, create zeppelin-web/dist directory
@ -142,7 +152,7 @@ public abstract class AbstractTestRestApi {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName(), "false");
// Create a shiro env test.
shiroIni = new File("../conf/shiro.ini");
shiroIni = new File(confDir, "shiro.ini");
if (!shiroIni.exists()) {
shiroIni.createNewFile();
}
@ -245,12 +255,12 @@ public abstract class AbstractTestRestApi {
}
}
protected static void startUpWithAuthenticationEnable() throws Exception {
start(true);
protected static void startUpWithAuthenticationEnable(String testClassName) throws Exception {
start(true, testClassName);
}
protected static void startUp() throws Exception {
start(false);
protected static void startUp(String testClassName) throws Exception {
start(false, testClassName);
}
private static String getHostname() {
@ -339,6 +349,8 @@ public abstract class AbstractTestRestApi {
System
.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName());
}
FileUtils.deleteDirectory(confDir);
}
}

View file

@ -36,7 +36,7 @@ public class ConfigurationsRestApiTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(ConfigurationsRestApi.class.getSimpleName());
}
@AfterClass

View file

@ -43,7 +43,7 @@ public class CredentialsRestApiTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(CredentialsRestApiTest.class.getSimpleName());
}
@AfterClass

View file

@ -39,7 +39,7 @@ public class HeliumRestApiTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(HeliumRestApi.class.getSimpleName());
}
@AfterClass

View file

@ -59,7 +59,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(InterpreterRestApiTest.class.getSimpleName());
}
@AfterClass

View file

@ -50,7 +50,7 @@ public class NotebookRepoRestApiTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(NotebookRepoRestApiTest.class.getSimpleName());
}
@AfterClass

View file

@ -55,7 +55,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
startUp(NotebookRestApiTest.class.getSimpleName());
}
@AfterClass
@ -120,6 +120,68 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
ZeppelinServer.notebook.removeNote(note1.getId(), anonymous);
}
@Test
public void testRunAllParagraph_AllSuccess() throws IOException {
Note note1 = ZeppelinServer.notebook.createNote(anonymous);
// 2 paragraphs
// P1:
// %python
// import time
// time.sleep(1)
// user='abc'
// P2:
// %python
// from __future__ import print_function
// print(user)
//
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("%python import time\ntime.sleep(1)\nuser='abc'");
p2.setText("%python from __future__ import print_function\nprint(user)");
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
assertThat(post, isAllowed());
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
}.getType());
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
assertEquals(Job.Status.FINISHED, p1.getStatus());
assertEquals(Job.Status.FINISHED, p2.getStatus());
assertEquals("abc\n", p2.getResult().message().get(0).getData());
}
@Test
public void testRunAllParagraph_FirstFailed() throws IOException {
Note note1 = ZeppelinServer.notebook.createNote(anonymous);
// 2 paragraphs
// P1:
// %python
// import time
// time.sleep(1)
// from __future__ import print_function
// print(user)
// P2:
// %python
// user='abc'
//
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("%python import time\ntime.sleep(1)\nfrom __future__ import print_function\nprint(user2)");
p2.setText("%python user2='abc'\nprint(user2)");
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
assertThat(post, isAllowed());
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
}.getType());
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
assertEquals(Job.Status.ERROR, p1.getStatus());
// p2 will be skipped because p1 is failed.
assertEquals(Job.Status.READY, p2.getStatus());
}
@Test
public void testCloneNote() throws IOException {
Note note1 = ZeppelinServer.notebook.createNote(anonymous);

View file

@ -46,7 +46,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUpWithAuthenticationEnable();
AbstractTestRestApi.startUpWithAuthenticationEnable(NotebookSecurityRestApiTest.class.getSimpleName());
}
@AfterClass

View file

@ -40,7 +40,7 @@ public class SecurityRestApiTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUpWithAuthenticationEnable();
AbstractTestRestApi.startUpWithAuthenticationEnable(SecurityRestApiTest.class.getSimpleName());
}
@AfterClass

View file

@ -56,7 +56,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(ZeppelinRestApiTest.class.getSimpleName());
}
@AfterClass
@ -441,12 +441,6 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
String noteId = note.getId();
note.runAll();
// wait until paragraph gets started
while (!paragraph.getStatus().isRunning()) {
Thread.sleep(100);
}
// assume that status of the paragraph is running
GetMethod get = httpGet("/notebook/job/" + noteId);
assertThat("test get note job: ", get, isAllowed());
@ -494,15 +488,6 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
String noteId = note.getId();
note.runAll();
// wait until job is finished or timeout.
int timeout = 1;
while (!paragraph.isTerminated()) {
Thread.sleep(1000);
if (timeout++ > 120) {
LOG.info("testRunParagraphWithParams timeout job.");
break;
}
}
// Call Run paragraph REST API
PostMethod postParagraph = httpPost("/notebook/job/" + noteId + "/" + paragraph.getId(),
@ -534,17 +519,8 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
config.put("enabled", true);
paragraph.setConfig(config);
note.runAll();
// wait until job is finished or timeout.
int timeout = 1;
while (!paragraph.isTerminated()) {
Thread.sleep(1000);
if (timeout++ > 10) {
LOG.info("testNoteJobs timeout job.");
break;
}
}
note.runAll(AuthenticationInfo.ANONYMOUS, false);
String jsonRequest = "{\"cron\":\"* * * * * ?\" }";
// right cron expression but not exist note.
PostMethod postCron = httpPost("/notebook/cron/notexistnote", jsonRequest);

View file

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.rest;
public class ZeppelinServerTest extends AbstractTestRestApi {
}

View file

@ -49,7 +49,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(ZeppelinSparkClusterTest.class.getSimpleName());
}
@AfterClass

View file

@ -30,7 +30,7 @@ public class DirAccessTest extends AbstractTestRestApi {
public void testDirAccessForbidden() throws Exception {
synchronized (this) {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED.getVarName(), "false");
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(DirAccessTest.class.getSimpleName());
HttpClient httpClient = new HttpClient();
GetMethod getMethod = new GetMethod(getUrlToTest() + "/app/");
httpClient.executeMethod(getMethod);
@ -43,7 +43,7 @@ public class DirAccessTest extends AbstractTestRestApi {
public void testDirAccessOk() throws Exception {
synchronized (this) {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED.getVarName(), "true");
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(DirAccessTest.class.getSimpleName());
HttpClient httpClient = new HttpClient();
GetMethod getMethod = new GetMethod(getUrlToTest() + "/app/");
httpClient.executeMethod(getMethod);

View file

@ -63,7 +63,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
AbstractTestRestApi.startUp(NotebookServerTest.class.getSimpleName());
gson = new Gson();
notebook = ZeppelinServer.notebook;
notebookServer = ZeppelinServer.notebookWsServer;

View file

@ -76,10 +76,10 @@ public class InterpreterFactory {
if (null != interpreter) {
return interpreter;
}
throw new RuntimeException("No such interpreter: " + replName);
}
return null;
} else {
throw new RuntimeException("Interpreter " + group + " is not binded to this note");
} else if (replNameSplit.length == 1){
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
// search 'name' from first (default) interpreter group
// TODO(jl): Handle with noteId to support defaultInterpreter per note.
@ -90,19 +90,15 @@ public class InterpreterFactory {
return interpreter;
}
// next, assume replName is 'group' of interpreter ('name' is ommitted)
// next, assume replName is 'group' of interpreter ('name' is omitted)
// search interpreter group and return first interpreter.
setting = getInterpreterSettingByGroup(settings, replName);
if (null != setting) {
return setting.getDefaultInterpreter(user, noteId);
}
// Support the legacy way to use it
for (InterpreterSetting s : settings) {
if (s.getGroup().equals(replName)) {
return setting.getDefaultInterpreter(user, noteId);
}
} else {
throw new RuntimeException("Either no interpreter named " + replName + " or it is not " +
"binded to this note");
}
}
//TODO(zjffdu) throw InterpreterException instead of return null

View file

@ -37,6 +37,7 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext;
import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
@ -139,6 +140,7 @@ public class InterpreterSetting {
private transient InterpreterLauncher launcher;
///////////////////////////////////////////////////////////////////////////////////////////
private transient LifecycleManager lifecycleManager;
/**
* Builder class for InterpreterSetting
@ -233,6 +235,11 @@ public class InterpreterSetting {
return this;
}
public Builder setLifecycleManager(LifecycleManager lifecycleManager) {
interpreterSetting.lifecycleManager = lifecycleManager;
return this;
}
public InterpreterSetting create() {
// post processing
interpreterSetting.postProcessing();
@ -249,6 +256,9 @@ public class InterpreterSetting {
void postProcessing() {
this.status = Status.READY;
if (this.lifecycleManager == null) {
this.lifecycleManager = new NullLifecycleManager(conf);
}
}
/**
@ -321,6 +331,14 @@ public class InterpreterSetting {
this.interpreterSettingManager = interpreterSettingManager;
}
public void setLifecycleManager(LifecycleManager lifecycleManager) {
this.lifecycleManager = lifecycleManager;
}
public LifecycleManager getLifecycleManager() {
return lifecycleManager;
}
public String getId() {
return id;
}
@ -384,7 +402,7 @@ public class InterpreterSetting {
this.interpreterGroups.remove(groupId);
}
ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
String groupId = getInterpreterGroupId(user, noteId);
try {
interpreterGroupReadLock.lock();
@ -628,7 +646,7 @@ public class InterpreterSetting {
for (InterpreterInfo info : interpreterInfos) {
Interpreter interpreter = null;
interpreter = new RemoteInterpreter(getJavaProperties(), sessionId,
info.getClassName(), user);
info.getClassName(), user, lifecycleManager);
if (info.isDefaultInterpreter()) {
interpreters.add(0, interpreter);
} else {
@ -645,7 +663,7 @@ public class InterpreterSetting {
createLauncher();
}
InterpreterLaunchContext launchContext = new
InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, name);
InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, group);
RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
process.setRemoteInterpreterEventPoller(
new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));

View file

@ -51,6 +51,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URL;
@ -116,7 +117,7 @@ public class InterpreterSettingManager {
private RemoteInterpreterProcessListener remoteInterpreterProcessListener;
private ApplicationEventListener appEventListener;
private DependencyResolver dependencyResolver;
private LifecycleManager lifecycleManager;
public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
AngularObjectRegistryListener angularObjectRegistryListener,
@ -153,6 +154,14 @@ public class InterpreterSettingManager {
this.angularObjectRegistryListener = angularObjectRegistryListener;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.appEventListener = appEventListener;
try {
this.lifecycleManager = (LifecycleManager)
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
.newInstance(conf);
} catch (Exception e) {
throw new IOException("Fail to create LifecycleManager", e);
}
init();
}
@ -177,6 +186,7 @@ public class InterpreterSettingManager {
remoteInterpreterProcessListener);
savedInterpreterSetting.setAppEventListener(appEventListener);
savedInterpreterSetting.setDependencyResolver(dependencyResolver);
savedInterpreterSetting.setLifecycleManager(lifecycleManager);
savedInterpreterSetting.setProperties(InterpreterSetting.convertInterpreterProperties(
savedInterpreterSetting.getProperties()
));
@ -372,6 +382,7 @@ public class InterpreterSettingManager {
interpreterSetting.setAppEventListener(appEventListener);
interpreterSetting.setDependencyResolver(dependencyResolver);
interpreterSetting.setInterpreterSettingManager(this);
interpreterSetting.setLifecycleManager(lifecycleManager);
interpreterSetting.postProcessing();
interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
}
@ -633,6 +644,7 @@ public class InterpreterSettingManager {
setting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
setting.setDependencyResolver(dependencyResolver);
setting.setAngularObjectRegistryListener(angularObjectRegistryListener);
setting.setLifecycleManager(lifecycleManager);
setting.setInterpreterSettingManager(this);
setting.postProcessing();
interpreterSettings.put(setting.getId(), setting);
@ -645,6 +657,7 @@ public class InterpreterSettingManager {
interpreterSettingTemplates.put(interpreterSetting.getName(), interpreterSetting);
interpreterSetting.setAppEventListener(appEventListener);
interpreterSetting.setDependencyResolver(dependencyResolver);
interpreterSetting.setLifecycleManager(lifecycleManager);
interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
interpreterSetting.setInterpreterSettingManager(this);
@ -790,7 +803,7 @@ public class InterpreterSettingManager {
}
public void restart(String id) throws InterpreterException {
restart(id, "", "anonymous");
interpreterSettings.get(id).close();
}
public InterpreterSetting get(String id) {

View file

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
/**
* Interface for managing the lifecycle of interpreters
*/
public interface LifecycleManager {
void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup);
void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup,
String sessionId);
void onInterpreterUse(ManagedInterpreterGroup interpreterGroup,
String sessionId);
}

View file

@ -47,6 +47,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
ManagedInterpreterGroup(String id, InterpreterSetting interpreterSetting) {
super(id);
this.interpreterSetting = interpreterSetting;
interpreterSetting.getLifecycleManager().onInterpreterGroupCreated(this);
}
public InterpreterSetting getInterpreterSetting() {
@ -81,14 +82,15 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
* @param sessionId
*/
public synchronized void close(String sessionId) {
LOGGER.info("Close Session: " + sessionId);
LOGGER.info("Close Session: " + sessionId + " for interpreter setting: " +
interpreterSetting.getName());
close(sessions.remove(sessionId));
//TODO(zjffdu) whether close InterpreterGroup if there's no session left in Zeppelin Server
if (sessions.isEmpty() && interpreterSetting != null) {
LOGGER.info("Remove this InterpreterGroup: {} as all the sessions are closed", id);
interpreterSetting.removeInterpreterGroup(id);
if (remoteInterpreterProcess != null) {
LOGGER.info("Kill RemoteIntetrpreterProcess");
LOGGER.info("Kill RemoteInterpreterProcess");
remoteInterpreterProcess.stop();
remoteInterpreterProcess = null;
}
@ -134,8 +136,10 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
interpreter.setInterpreterGroup(this);
}
LOGGER.info("Create Session: {} in InterpreterGroup: {} for user: {}", sessionId, id, user);
interpreterSetting.getLifecycleManager().onInterpreterSessionCreated(this, sessionId);
sessions.put(sessionId, interpreters);
return interpreters;
}
}
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.lifecycle;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.LifecycleManager;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
/**
* Do nothing for the lifecycle of interpreter. User need to explicitly start/stop interpreter.
*/
public class NullLifecycleManager implements LifecycleManager {
public NullLifecycleManager(ZeppelinConfiguration zConf) {
}
@Override
public void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup) {
}
@Override
public void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup,
String sessionId) {
}
@Override
public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {
}
}

View file

@ -0,0 +1,75 @@
package org.apache.zeppelin.interpreter.lifecycle;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.LifecycleManager;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
/**
* This lifecycle manager would close interpreter after it is timeout. By default, it is timeout
* after no using in 1 hour.
*
* For now, this class only manage the lifecycle of interpreter group (will close interpreter
* process after timeout). Managing the lifecycle of interpreter session could be done in future
* if necessary.
*/
public class TimeoutLifecycleManager implements LifecycleManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutLifecycleManager.class);
// ManagerInterpreterGroup -> LastTimeUsing timestamp
private Map<ManagedInterpreterGroup, Long> interpreterGroups = new ConcurrentHashMap<>();
private long checkInterval;
private long timeoutThreshold;
private Timer checkTimer;
public TimeoutLifecycleManager(ZeppelinConfiguration zConf) {
this.checkInterval = zConf.getLong(ZeppelinConfiguration.ConfVars
.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL);
this.timeoutThreshold = zConf.getLong(
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD);
this.checkTimer = new Timer(true);
this.checkTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
long now = System.currentTimeMillis();
for (Map.Entry<ManagedInterpreterGroup, Long> entry : interpreterGroups.entrySet()) {
ManagedInterpreterGroup interpreterGroup = entry.getKey();
Long lastTimeUsing = entry.getValue();
if ((now - lastTimeUsing) > timeoutThreshold ) {
LOGGER.info("InterpreterGroup {} is timeout.", interpreterGroup.getId());
interpreterGroup.close();
interpreterGroups.remove(entry.getKey());
}
}
}
}, checkInterval, checkInterval);
LOGGER.info("TimeoutLifecycleManager is started with checkinterval: " + checkInterval
+ ", timeoutThreshold: " + timeoutThreshold);
}
@Override
public void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup) {
interpreterGroups.put(interpreterGroup, System.currentTimeMillis());
}
@Override
public void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup,
String sessionId) {
}
@Override
public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {
interpreterGroups.put(interpreterGroup, System.currentTimeMillis());
}
}

View file

@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LifecycleManager;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
@ -66,17 +67,21 @@ public class RemoteInterpreter extends Interpreter {
private volatile boolean isOpened = false;
private volatile boolean isCreated = false;
private LifecycleManager lifecycleManager;
/**
* Remote interpreter and manage interpreter process
*/
public RemoteInterpreter(Properties properties,
String sessionId,
String className,
String userName) {
String userName,
LifecycleManager lifecycleManager) {
super(properties);
this.sessionId = sessionId;
this.className = className;
this.userName = userName;
this.lifecycleManager = lifecycleManager;
}
public boolean isOpened() {
@ -149,6 +154,7 @@ public class RemoteInterpreter extends Interpreter {
}
});
isOpened = true;
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
}
}
}
@ -189,6 +195,7 @@ public class RemoteInterpreter extends Interpreter {
}
});
isOpened = false;
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
} else {
LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className);
}
@ -218,6 +225,7 @@ public class RemoteInterpreter extends Interpreter {
interpreterContextRunnerPool.clear(noteId);
interpreterContextRunnerPool.addAll(noteId, runners);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
@Override
@ -266,6 +274,7 @@ public class RemoteInterpreter extends Interpreter {
} catch (IOException e) {
throw new InterpreterException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
@ -293,6 +302,7 @@ public class RemoteInterpreter extends Interpreter {
} catch (IOException e) {
throw new InterpreterException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
FormType type = interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<FormType>() {
@Override
@ -317,6 +327,7 @@ public class RemoteInterpreter extends Interpreter {
} catch (IOException e) {
throw new InterpreterException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<Integer>() {
@Override
@ -341,6 +352,7 @@ public class RemoteInterpreter extends Interpreter {
} catch (IOException e) {
throw new InterpreterException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
@Override
@ -362,6 +374,7 @@ public class RemoteInterpreter extends Interpreter {
} catch (IOException e) {
throw new RuntimeException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<String>() {
@Override
@ -391,7 +404,7 @@ public class RemoteInterpreter extends Interpreter {
private RemoteInterpreterContext convert(InterpreterContext ic) {
return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getRunners()));
}
private InterpreterResult convert(RemoteInterpreterResult result) {

View file

@ -592,32 +592,39 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
AuthenticationInfo authenticationInfo = new AuthenticationInfo();
authenticationInfo.setUser(cronExecutingUser);
runAll(authenticationInfo);
runAll(authenticationInfo, true);
}
public void runAll(AuthenticationInfo authenticationInfo) {
public void runAll(AuthenticationInfo authenticationInfo, boolean blocking) {
for (Paragraph p : getParagraphs()) {
if (!p.isEnabled()) {
continue;
}
p.setAuthenticationInfo(authenticationInfo);
run(p.getId());
if (!run(p.getId(), blocking)) {
logger.warn("Skip running the remain notes because paragraph {} fails", p.getId());
break;
}
}
}
public boolean run(String paragraphId) {
return run(paragraphId, false);
}
/**
* Run a single paragraph.
*
* @param paragraphId ID of paragraph
*/
public void run(String paragraphId) {
public boolean run(String paragraphId, boolean blocking) {
Paragraph p = getParagraph(paragraphId);
p.setListener(jobListenerFactory.getParagraphJobListener(this));
if (p.isBlankParagraph()) {
logger.info("skip to run blank paragraph. {}", p.getId());
p.setStatus(Job.Status.FINISHED);
return;
return true;
}
p.clearRuntimeInfo(null);
@ -638,6 +645,19 @@ public class Note implements ParagraphJobListener, JsonSerializable {
p.setAuthenticationInfo(p.getAuthenticationInfo());
intp.getScheduler().submit(p);
}
if (blocking) {
while (!p.getStatus().isCompleted()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return p.getStatus() == Status.FINISHED;
} else {
return true;
}
}
/**
@ -685,9 +705,11 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
if (intpGroup != null) {
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
}
}
}
@ -700,7 +722,10 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
if (setting.getInterpreterGroup(user, id) == null) {
continue;
}
InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {

View file

@ -334,39 +334,41 @@ public class Notebook implements NoteEventListener {
// remove from all interpreter instance's angular object registry
for (InterpreterSetting settings : interpreterSettingManager.get()) {
AngularObjectRegistry registry =
settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId());
InterpreterGroup interpreterGroup = settings.getInterpreterGroup(subject.getUser(), id);
if (interpreterGroup != null) {
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId());
// remove app scope object
List<ApplicationState> appStates = p.getAllApplicationStates();
if (appStates != null) {
for (ApplicationState app : appStates) {
((RemoteAngularObjectRegistry) registry)
.removeAllAndNotifyRemoteProcess(id, app.getId());
// remove app scope object
List<ApplicationState> appStates = p.getAllApplicationStates();
if (appStates != null) {
for (ApplicationState app : appStates) {
((RemoteAngularObjectRegistry) registry)
.removeAllAndNotifyRemoteProcess(id, app.getId());
}
}
}
}
// remove note scope object
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
} else {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
registry.removeAll(id, p.getId());
// remove note scope object
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
} else {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
registry.removeAll(id, p.getId());
// remove app scope object
List<ApplicationState> appStates = p.getAllApplicationStates();
if (appStates != null) {
for (ApplicationState app : appStates) {
registry.removeAll(id, app.getId());
// remove app scope object
List<ApplicationState> appStates = p.getAllApplicationStates();
if (appStates != null) {
for (ApplicationState app : appStates) {
registry.removeAll(id, app.getId());
}
}
}
// remove note scope object
registry.removeAll(id, null);
}
// remove note scope object
registry.removeAll(id, null);
}
}
@ -517,9 +519,8 @@ public class Notebook implements NoteEventListener {
SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
List<InterpreterSetting> settings = interpreterSettingManager.get();
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(),
note.getId());
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId());
if (intpGroup != null && intpGroup.getId().equals(snapshot.getIntpGroupId())) {
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
String noteId = snapshot.getAngularObject().getNoteId();
String paragraphId = snapshot.getAngularObject().getParagraphId();

View file

@ -646,7 +646,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
@Override
public void run() {
note.run(getParagraphId());
note.run(getParagraphId(), false);
}
}

View file

@ -321,14 +321,14 @@ public class RemoteScheduler implements Scheduler {
if (job.isAborted()) {
job.setStatus(Status.ABORT);
} else if (job.getException() != null) {
logger.debug("Job ABORT, " + job.getId());
logger.debug("Job ABORT, " + job.getId() + ", " + job.getErrorMessage());
job.setStatus(Status.ERROR);
} else if (jobResult != null && jobResult instanceof InterpreterResult
&& ((InterpreterResult) jobResult).code() == Code.ERROR) {
logger.debug("Job Error, " + job.getId());
logger.debug("Job Error, " + job.getId() + ", " + job.getErrorMessage());
job.setStatus(Status.ERROR);
} else {
logger.debug("Job Finished, " + job.getId());
logger.debug("Job Finished, " + job.getId() + ", Result: " + job.getReturn());
job.setStatus(Status.FINISHED);
}

View file

@ -32,7 +32,7 @@ public abstract class AbstractInterpreterTest {
protected File interpreterDir;
protected File confDir;
protected File notebookDir;
protected ZeppelinConfiguration conf;
protected ZeppelinConfiguration conf = new ZeppelinConfiguration();
@Before
public void setUp() throws Exception {
@ -54,9 +54,9 @@ public abstract class AbstractInterpreterTest {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "test,mock1,mock2,mock_resource_pool");
conf = new ZeppelinConfiguration();
conf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "test,mock1,mock2,mock_resource_pool");
interpreterSettingManager = new InterpreterSettingManager(conf,
mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
interpreterFactory = new InterpreterFactory(interpreterSettingManager);

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.junit.Test;
@ -25,6 +26,7 @@ import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class InterpreterFactoryTest extends AbstractInterpreterTest {
@ -36,31 +38,41 @@ public class InterpreterFactoryTest extends AbstractInterpreterTest {
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "") instanceof RemoteInterpreter);
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "");
// EchoInterpreter is the default interpreter (see zeppelin-interpreter/src/test/resources/conf/interpreter.json)
// EchoInterpreter is the default interpreter because mock1 is the default interpreter group
assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test") instanceof RemoteInterpreter);
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test");
assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "echo") instanceof RemoteInterpreter);
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "echo");
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test2") instanceof RemoteInterpreter);
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test2");
assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "double_echo") instanceof RemoteInterpreter);
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "double_echo");
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test2.double_echo") instanceof RemoteInterpreter);
remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test2.double_echo");
assertEquals(DoubleEchoInterpreter.class.getName(), remoteInterpreter.getClassName());
}
@Test
public void testUnknownRepl1() throws IOException {
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
assertNull(interpreterFactory.getInterpreter("user1", "note1", "test.unknown_repl"));
try {
interpreterFactory.getInterpreter("user1", "note1", "test.unknown_repl");
fail("should fail due to no such interpreter");
} catch (RuntimeException e) {
assertEquals("No such interpreter: test.unknown_repl", e.getMessage());
}
}
@Test
public void testUnknownRepl2() throws IOException {
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
assertNull(interpreterFactory.getInterpreter("user1", "note1", "unknown_repl"));
try {
interpreterFactory.getInterpreter("user1", "note1", "unknown_repl");
fail("should fail due to no such interpreter");
} catch (RuntimeException e) {
assertEquals("Either no interpreter named unknown_repl or it is not binded to this note", e.getMessage());
}
}
}

View file

@ -50,9 +50,9 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
assertEquals("test", interpreterSetting.getName());
assertEquals("test", interpreterSetting.getGroup());
assertEquals(2, interpreterSetting.getInterpreterInfos().size());
assertEquals(3, interpreterSetting.getInterpreterInfos().size());
// 3 other builtin properties:
// * zeppelin.interpeter.output.limit
// * zeppelin.interpreter.output.limit
// * zeppelin.interpreter.localRepo
// * zeppelin.interpreter.max.poolsize
assertEquals(6, interpreterSetting.getJavaProperties().size());
@ -67,7 +67,6 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
assertNotNull(interpreterSetting.getAppEventListener());
assertNotNull(interpreterSetting.getDependencyResolver());
assertNotNull(interpreterSetting.getInterpreterSettingManager());
assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath());
List<RemoteRepository> repositories = interpreterSettingManager.getRepositories();
assertEquals(2, repositories.size());
@ -80,14 +79,13 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
interpreterSetting = interpreterSettingManager2.getByName("test");
assertEquals("test", interpreterSetting.getName());
assertEquals("test", interpreterSetting.getGroup());
assertEquals(2, interpreterSetting.getInterpreterInfos().size());
assertEquals(3, interpreterSetting.getInterpreterInfos().size());
assertEquals(6, interpreterSetting.getJavaProperties().size());
assertEquals("value_1", interpreterSetting.getJavaProperties().getProperty("property_1"));
assertEquals("new_value_2", interpreterSetting.getJavaProperties().getProperty("property_2"));
assertEquals("value_3", interpreterSetting.getJavaProperties().getProperty("property_3"));
assertEquals("shared", interpreterSetting.getOption().perNote);
assertEquals("shared", interpreterSetting.getOption().perUser);
assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath());
assertEquals(0, interpreterSetting.getDependencies().size());
repositories = interpreterSettingManager2.getRepositories();

View file

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.lifecycle;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
@Override
public void setUp() throws Exception {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS.getVarName(),
TimeoutLifecycleManager.class.getName());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL.getVarName(), "1000");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000");
super.setUp();
}
@Test
public void testTimeout_1() throws InterpreterException, InterruptedException, IOException {
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.echo") instanceof RemoteInterpreter);
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.echo");
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
null, null, new ArrayList<InterpreterContextRunner>(), null);
remoteInterpreter.interpret("hello world", context);
assertTrue(remoteInterpreter.isOpened());
InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
Thread.sleep(15 * 1000);
// interpreterGroup is timeout, so is removed.
assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
assertFalse(remoteInterpreter.isOpened());
}
@Test
public void testTimeout_2() throws InterpreterException, InterruptedException, IOException {
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.sleep") instanceof RemoteInterpreter);
final RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.sleep");
// simulate how zeppelin submit paragraph
remoteInterpreter.getScheduler().submit(new Job("test-job", null) {
@Override
public Object getReturn() {
return null;
}
@Override
public int progress() {
return 0;
}
@Override
public Map<String, Object> info() {
return null;
}
@Override
protected Object jobRun() throws Throwable {
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
null, null, new ArrayList<InterpreterContextRunner>(), null);
return remoteInterpreter.interpret("100000", context);
}
@Override
protected boolean jobAbort() {
return false;
}
@Override
public void setResult(Object results) {
}
});
while(!remoteInterpreter.isOpened()) {
Thread.sleep(1000);
LOGGER.info("Wait for interpreter to be started");
}
InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
Thread.sleep(15 * 1000);
// interpreterGroup is not timeout because getStatus is called periodically.
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
assertTrue(remoteInterpreter.isOpened());
}
}

View file

@ -20,6 +20,8 @@ package org.apache.zeppelin.interpreter.remote;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.display.ui.OptionInput;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.remote.mock.GetAngularObjectSizeInterpreter;
@ -32,11 +34,11 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
public class RemoteInterpreterTest {
@ -413,4 +415,27 @@ public class RemoteInterpreterTest {
assertEquals("null", interpreter1.interpret("getProperty property_2", context1).message().get(0).getData());
}
@Test
public void testConvertDynamicForms() throws InterpreterException {
GUI gui = new GUI();
OptionInput.ParamOption[] paramOptions = {
new OptionInput.ParamOption("value1", "param1"),
new OptionInput.ParamOption("value2", "param2")
};
List<Object> defaultValues = new ArrayList();
defaultValues.add("default1");
defaultValues.add("default2");
gui.checkbox("checkbox_id", defaultValues, paramOptions);
gui.select("select_id", "default", paramOptions);
gui.textbox("textbox_id");
Map<String, Input> expected = new LinkedHashMap<>(gui.getForms());
Interpreter interpreter = interpreterSetting.getDefaultInterpreter("user1", "note1");
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl", null,
null, AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), gui,
null, null, new ArrayList<InterpreterContextRunner>(), null);
interpreter.interpret("text", context);
assertArrayEquals(expected.values().toArray(), gui.getForms().values().toArray());
}
}

View file

@ -114,7 +114,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
Map config = p1.getConfig();
config.put("enabled", true);
p1.setConfig(config);
p1.setText("hello world");
p1.setText("%mock1 hello world");
p1.setAuthenticationInfo(anonymous);
note.run(p1.getId());
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
@ -268,7 +268,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
Map config = p1.getConfig();
config.put("enabled", true);
p1.setConfig(config);
p1.setText("hello world");
p1.setText("%mock1 hello world");
p1.setAuthenticationInfo(anonymous);
note.run(p1.getId());
@ -305,27 +305,22 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
Map config1 = p1.getConfig();
config1.put("enabled", true);
p1.setConfig(config1);
p1.setText("p1");
p1.setText("%mock1 p1");
// p2
Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
Map config2 = p2.getConfig();
config2.put("enabled", false);
p2.setConfig(config2);
p2.setText("p2");
p2.setText("%mock1 p2");
// p3
Paragraph p3 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p3.setText("p3");
p3.setText("%mock1 p3");
// when
note.runAll();
// wait for finish
while(p3.isTerminated() == false || p3.getResult() == null) {
Thread.yield();
}
assertEquals("repl1: p1", p1.getResult().message().get(0).getData());
assertNull(p2.getResult());
assertEquals("repl1: p3", p3.getResult().message().get(0).getData());
@ -415,7 +410,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
Map config = new HashMap<>();
p.setConfig(config);
p.setText("sleep 1000");
p.setText("%mock1 sleep 1000");
Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p2.setConfig(config);
@ -466,9 +461,6 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
p.setText(simpleText);
note.runAll();
while (p.isTerminated() == false || p.getResult() == null) {
Thread.yield();
}
String exportedNoteJson = notebook.exportNote(note.getId());
@ -503,7 +495,6 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p.setText("hello world");
note.runAll();
while(p.isTerminated()==false || p.getResult()==null) Thread.yield();
p.setStatus(Status.RUNNING);
Note cloneNote = notebook.cloneNote(note.getId(), "clone note", anonymous);
@ -549,9 +540,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p.setText("hello world");
note.runAll();
while (p.isTerminated() == false || p.getResult() == null) {
Thread.yield();
}
// Force paragraph to have String type object
p.setResult("Exception");
@ -572,15 +561,13 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds());
Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("hello");
p1.setText("%mock1 hello");
Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p2.setText("%mock2 world");
for (InterpreterGroup intpGroup : interpreterSettingManager.getAllInterpreterGroup()) {
intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
}
note.runAll();
while (p1.isTerminated() == false || p1.getResult() == null) Thread.yield();
while (p2.isTerminated() == false || p2.getResult() == null) Thread.yield();
assertEquals(2, interpreterSettingManager.getAllResources().size());
@ -796,14 +783,14 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
// create three paragraphs
Paragraph p1 = note.addNewParagraph(anonymous);
p1.setText("sleep 1000");
p1.setText("%mock1 sleep 1000");
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("sleep 1000");
p2.setText("%mock1 sleep 1000");
Paragraph p3 = note.addNewParagraph(anonymous);
p3.setText("sleep 1000");
p3.setText("%mock1 sleep 1000");
note.runAll();
note.runAll(AuthenticationInfo.ANONYMOUS, false);
// wait until first paragraph finishes and second paragraph starts
while (p1.getStatus() != Status.FINISHED || p2.getStatus() != Status.RUNNING) Thread.yield();
@ -813,9 +800,9 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
assertEquals(Status.PENDING, p3.getStatus());
// restart interpreter
interpreterSettingManager.restart(interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getId());
interpreterSettingManager.restart(interpreterSettingManager.getInterpreterSettingByName("mock1").getId());
// make sure three differnt status aborted well.
// make sure three different status aborted well.
assertEquals(Status.FINISHED, p1.getStatus());
assertEquals(Status.ABORT, p2.getStatus());
assertEquals(Status.ABORT, p3.getStatus());
@ -828,7 +815,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
// create a notes
Note note1 = notebook.createNote(anonymous);
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("getId");
p1.setText("%mock1 getId");
p1.setAuthenticationInfo(anonymous);
// restart interpreter with per user session enabled
@ -845,7 +832,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
notebook.removeNote(note1.getId(), anonymous);
note1 = notebook.createNote(anonymous);
p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("getId");
p1.setText("%mock1 getId");
p1.setAuthenticationInfo(anonymous);
note1.run(p1.getId());
@ -864,9 +851,9 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
Note note2 = notebook.createNote(anonymous);
Paragraph p2 = note2.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("getId");
p1.setText("%mock1 getId");
p1.setAuthenticationInfo(anonymous);
p2.setText("getId");
p2.setText("%mock1 getId");
p2.setAuthenticationInfo(anonymous);
// run per note session disabled
@ -908,9 +895,9 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
Note note2 = notebook.createNote(anonymous);
Paragraph p2 = note2.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("getId");
p1.setText("%mock1 getId");
p1.setAuthenticationInfo(anonymous);
p2.setText("getId");
p2.setText("%mock1 getId");
p2.setAuthenticationInfo(anonymous);
// shared mode.
@ -925,8 +912,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
// restart interpreter with scoped mode enabled
for (InterpreterSetting setting : notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId())) {
setting.getOption().setPerNote(InterpreterOption.SCOPED);
notebook.getInterpreterSettingManager().restart(setting.getId(), note1.getId(), anonymous.getUser());
notebook.getInterpreterSettingManager().restart(setting.getId(), note2.getId(), anonymous.getUser());
notebook.getInterpreterSettingManager().restart(setting.getId());
}
// run per note session enabled
@ -941,8 +927,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
// restart interpreter with isolated mode enabled
for (InterpreterSetting setting : notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId())) {
setting.getOption().setPerNote(InterpreterOption.ISOLATED);
notebook.getInterpreterSettingManager().restart(setting.getId(), note1.getId(), anonymous.getUser());
notebook.getInterpreterSettingManager().restart(setting.getId(), note2.getId(), anonymous.getUser());
setting.getInterpreterSettingManager().restart(setting.getId());
}
// run per note process enabled
@ -964,7 +949,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
Note note1 = notebook.createNote(anonymous);
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setAuthenticationInfo(anonymous);
p1.setText("getId");
p1.setText("%mock1 getId");
// restart interpreter with per note session enabled
for (InterpreterSetting setting : interpreterSettingManager.getInterpreterSettings(note1.getId())) {

View file

@ -42,8 +42,19 @@
"description": "desc_2"
}
},
"runner": {
"linux": "linux_runner"
"editor": {
"language": "java",
"editOnDblClick": false
}
},
{
"group": "test",
"name": "sleep",
"defaultInterpreter": false,
"className": "org.apache.zeppelin.interpreter.SleepInterpreter",
"properties": {
},
"editor": {
"language": "java",