mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge remote-tracking branch 'upstream/master'
This commit is contained in:
commit
86153a85ad
46 changed files with 663 additions and 165 deletions
|
|
@ -94,7 +94,9 @@ install:
|
|||
|
||||
before_script:
|
||||
- travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
|
||||
- ./testing/setupLivy.sh
|
||||
- if [[ -n $LIVY_VER ]]; then ./testing/downloadLivy.sh $LIVY_VER; fi
|
||||
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-server-$LIVY_VER; fi
|
||||
- if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
|
||||
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
|
||||
- tail conf/zeppelin-env.sh
|
||||
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ fi
|
|||
addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
|
||||
|
||||
addJarInDir "${ZEPPELIN_HOME}/lib"
|
||||
addJarInDir "${ZEPPELIN_HOME}/lib/interpreter"
|
||||
|
||||
CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
|
||||
$ZEPPELIN_RUNNER $JAVA_OPTS -cp $CLASSPATH $ZEPPELIN_INSTALL_INTERPRETER_MAIN ${@}
|
||||
|
|
|
|||
|
|
@ -214,7 +214,7 @@ eval $INTERPRETER_RUN_COMMAND &
|
|||
|
||||
pid=$!
|
||||
if [[ -z "${pid}" ]]; then
|
||||
return 1;
|
||||
exit 1;
|
||||
else
|
||||
echo ${pid} > ${ZEPPELIN_PID}
|
||||
fi
|
||||
|
|
|
|||
|
|
@ -216,6 +216,11 @@
|
|||
<description>Interpreter process connect timeout in msec.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.output.limit</name>
|
||||
<value>102400</value>
|
||||
<description>Output message from interpreter exceeding the limit will be truncated</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.ssl</name>
|
||||
|
|
|
|||
|
|
@ -24,19 +24,25 @@
|
|||
# then pr[#PR] branch will be created.
|
||||
#
|
||||
|
||||
import sys, os, subprocess
|
||||
import json, urllib
|
||||
from __future__ import print_function
|
||||
import sys, os, subprocess, json, codecs
|
||||
|
||||
if sys.version_info[0] == 2:
|
||||
from urllib import urlopen
|
||||
else:
|
||||
from urllib.request import urlopen
|
||||
|
||||
if len(sys.argv) == 1:
|
||||
print "usage) " + sys.argv[0] + " [#PR]"
|
||||
print " eg) " + sys.argv[0] + " 122"
|
||||
print("usage) " + sys.argv[0] + " [#PR]")
|
||||
print(" eg) " + sys.argv[0] + " 122")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
pr=sys.argv[1]
|
||||
githubApi="https://api.github.com/repos/apache/zeppelin"
|
||||
|
||||
prInfo = json.load(urllib.urlopen(githubApi + "/pulls/" + pr))
|
||||
reader = codecs.getreader("utf-8")
|
||||
prInfo = json.load(reader(urlopen(githubApi + "/pulls/" + pr)))
|
||||
if "message" in prInfo and prInfo["message"] == "Not Found":
|
||||
sys.stderr.write("PullRequest #" + pr + " not found\n")
|
||||
sys.exit(1)
|
||||
|
|
@ -44,6 +50,7 @@ if "message" in prInfo and prInfo["message"] == "Not Found":
|
|||
prUser=prInfo['user']['login']
|
||||
prRepoUrl=prInfo['head']['repo']['clone_url']
|
||||
prBranch=prInfo['head']['label'].replace(":", "/")
|
||||
print(prBranch)
|
||||
|
||||
# create local branch
|
||||
exitCode = os.system("git checkout -b pr" + pr)
|
||||
|
|
@ -63,21 +70,21 @@ if exitCode != 0:
|
|||
sys.exit(1)
|
||||
|
||||
|
||||
currentBranch = subprocess.check_output(["git rev-parse --abbrev-ref HEAD"], shell=True).rstrip()
|
||||
currentBranch = subprocess.check_output("git rev-parse --abbrev-ref HEAD", shell=True).rstrip().decode("utf-8")
|
||||
|
||||
print "Merge branch " + prBranch + " into " + currentBranch
|
||||
print("Merge branch " + prBranch + " into " + currentBranch)
|
||||
|
||||
rev = subprocess.check_output(["git rev-parse " + prBranch], shell=True).rstrip()
|
||||
prAuthor = subprocess.check_output(["git --no-pager show -s --format='%an <%ae>' " + rev], shell=True).rstrip()
|
||||
prAuthorDate = subprocess.check_output(["git --no-pager show -s --format='%ad' " + rev], shell=True).rstrip()
|
||||
rev = subprocess.check_output("git rev-parse " + prBranch, shell=True).rstrip().decode("utf-8")
|
||||
prAuthor = subprocess.check_output("git --no-pager show -s --format=\"%an <%ae>\" " + rev, shell=True).rstrip().decode("utf-8")
|
||||
prAuthorDate = subprocess.check_output("git --no-pager show -s --format=\"%ad\" " + rev, shell=True).rstrip().decode("utf-8")
|
||||
|
||||
prTitle = prInfo['title']
|
||||
prBody = prInfo['body']
|
||||
|
||||
commitList = subprocess.check_output(["git log --pretty=format:'%h' " + currentBranch + ".." + prBranch], shell=True).rstrip()
|
||||
commitList = subprocess.check_output("git log --pretty=format:\"%h\" " + currentBranch + ".." + prBranch, shell=True).rstrip().decode("utf-8")
|
||||
authorList = []
|
||||
for commitHash in commitList.split("\n"):
|
||||
a = subprocess.check_output(["git show -s --pretty=format:'%an <%ae>' "+commitHash], shell=True).rstrip()
|
||||
a = subprocess.check_output("git show -s --pretty=format:\"%an <%ae>\" "+commitHash, shell=True).rstrip().decode("utf-8")
|
||||
if a not in authorList:
|
||||
authorList.append(a)
|
||||
|
||||
|
|
@ -85,20 +92,20 @@ commitMsg = prTitle + "\n"
|
|||
if prBody :
|
||||
commitMsg += prBody + "\n\n"
|
||||
for author in authorList:
|
||||
commitMsg += "Author: " + author+"\n"
|
||||
commitMsg += "Author: " + author +"\n"
|
||||
commitMsg += "\n"
|
||||
commitMsg += "Closes #" + pr + " from " + prBranch + " and squashes the following commits:\n\n"
|
||||
commitMsg += subprocess.check_output(["git log --pretty=format:'%h [%an] %s' " + currentBranch + ".." + prBranch], shell=True).rstrip()
|
||||
commitMsg += subprocess.check_output("git log --pretty=format:\"%h [%an] %s\" " + currentBranch + ".." + prBranch, shell=True).rstrip().decode("utf-8")
|
||||
|
||||
exitCode = os.system("git merge --no-commit --squash " + prBranch)
|
||||
if exitCode != 0:
|
||||
sys.stderr.write("Can not merge\n")
|
||||
sys.exit(1)
|
||||
|
||||
exitCode = os.system('git commit -a --author "' + prAuthor + '" --date "' + prAuthorDate + '" -m"' + commitMsg.encode('utf-8') + '"')
|
||||
exitCode = os.system('git commit -a --author "' + prAuthor + '" --date "' + prAuthorDate + '" -m"' + commitMsg + '"')
|
||||
if exitCode != 0:
|
||||
sys.stderr.write("Commit failed\n")
|
||||
sys.exit(1)
|
||||
|
||||
os.system("git remote remove " + prUser)
|
||||
print "Branch " + prBranch + " is merged into " + currentBranch
|
||||
print("Branch " + prBranch + " is merged into " + currentBranch)
|
||||
|
|
|
|||
|
|
@ -248,6 +248,12 @@ If both are defined, then the **environment variables** will take priority.
|
|||
<td>interpreter</td>
|
||||
<td>Interpreter directory</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>ZEPPELIN_INTERPRETER_OUTPUT_LIMIT</td>
|
||||
<td>zeppelin.interpreter.output.limit</td>
|
||||
<td>102400</td>
|
||||
<td>Output message from interpreter exceeding the limit will be truncated</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE</td>
|
||||
<td>zeppelin.websocket.max.text.message.size</td>
|
||||
|
|
|
|||
|
|
@ -56,6 +56,8 @@ Also you can separate option's display name and value, using `${formName=default
|
|||
|
||||
<img src="../assets/themes/zeppelin/img/screenshots/form_select_displayname.png" />
|
||||
|
||||
Hit enter after selecting option to run the paragraph with new value.
|
||||
|
||||
### Checkbox form
|
||||
|
||||
For multi-selection, you can create a checkbox form using `${checkbox:formName=defaultValue1|defaultValue2...,option1|option2...}`. The variable will be substituted by a comma-separated string based on the selected items. For example:
|
||||
|
|
|
|||
|
|
@ -98,20 +98,12 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
if (sessionInfo.appId == null) {
|
||||
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
|
||||
// explicitly by ourselves.
|
||||
sessionInfo.appId = extractStatementResult(
|
||||
interpret("sc.applicationId", null, false, false).message()
|
||||
.get(0).getData());
|
||||
sessionInfo.appId = extractAppId();
|
||||
}
|
||||
|
||||
if (sessionInfo.appInfo == null ||
|
||||
StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
|
||||
interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
|
||||
null, false, false);
|
||||
sessionInfo.webUIAddress = extractStatementResult(
|
||||
interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
|
||||
.message().get(0).getData());
|
||||
sessionInfo.webUIAddress = extractWebUIAddress();
|
||||
} else {
|
||||
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
|
||||
}
|
||||
|
|
@ -130,6 +122,10 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
protected abstract String extractAppId() throws LivyException;
|
||||
|
||||
protected abstract String extractWebUIAddress() throws LivyException;
|
||||
|
||||
public SessionInfo getSessionInfo() {
|
||||
return sessionInfo;
|
||||
}
|
||||
|
|
@ -148,25 +144,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
|
||||
* from following:
|
||||
* res0: String = application_1473129941656_0048
|
||||
*
|
||||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("=")) >= 0) {
|
||||
return result.substring(pos + 1).trim();
|
||||
} else {
|
||||
throw new RuntimeException("No result can be extracted from '" + result + "', " +
|
||||
"something must be wrong");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
if (livyVersion.isCancelSupported()) {
|
||||
|
|
@ -206,19 +184,13 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
conf.put(entry.getKey().toString().substring(5), entry.getValue().toString());
|
||||
}
|
||||
|
||||
CreateSessionRequest request = new CreateSessionRequest(kind, user, conf);
|
||||
CreateSessionRequest request = new CreateSessionRequest(kind,
|
||||
user == null || user.equals("anonymous") ? null : user, conf);
|
||||
SessionInfo sessionInfo = SessionInfo.fromJson(
|
||||
callRestAPI("/sessions", "POST", request.toJson()));
|
||||
long start = System.currentTimeMillis();
|
||||
// pull the session status until it is idle or timeout
|
||||
while (!sessionInfo.isReady()) {
|
||||
LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state,
|
||||
sessionInfo.appId);
|
||||
if (sessionInfo.isFinished()) {
|
||||
String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
|
||||
+ ", log: " + sessionInfo.log;
|
||||
throw new LivyException(msg);
|
||||
}
|
||||
if ((System.currentTimeMillis() - start) / 1000 > sessionCreationTimeout) {
|
||||
String msg = "The creation of session " + sessionInfo.id + " is timeout within "
|
||||
+ sessionCreationTimeout + " seconds, appId: " + sessionInfo.appId
|
||||
|
|
@ -227,6 +199,13 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
Thread.sleep(pullStatusInterval);
|
||||
sessionInfo = getSessionInfo(sessionInfo.id);
|
||||
LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state,
|
||||
sessionInfo.appId);
|
||||
if (sessionInfo.isFinished()) {
|
||||
String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
|
||||
+ ", log: " + sessionInfo.log;
|
||||
throw new LivyException(msg);
|
||||
}
|
||||
}
|
||||
return sessionInfo;
|
||||
} catch (Exception e) {
|
||||
|
|
@ -438,7 +417,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
|| response.getStatusCode().value() == 201) {
|
||||
return response.getBody();
|
||||
} else if (response.getStatusCode().value() == 404) {
|
||||
if (response.getBody().matches("Session '\\d+' not found.")) {
|
||||
if (response.getBody().matches("\"Session '\\d+' not found.\"")) {
|
||||
throw new SessionNotFoundException(response.getBody());
|
||||
} else {
|
||||
throw new APINotFoundException("No rest api found for " + targetURL +
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ import java.util.Properties;
|
|||
/**
|
||||
* Livy PySpark interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivyPySpark3Interpreter extends BaseLivyInterprereter {
|
||||
public class LivyPySpark3Interpreter extends LivyPySparkBaseInterpreter {
|
||||
|
||||
public LivyPySpark3Interpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -43,4 +43,5 @@ public class LivyPySpark3Interpreter extends BaseLivyInterprereter {
|
|||
public String getSessionKind() {
|
||||
return "pyspark3";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Base class for PySpark Interpreter
|
||||
*/
|
||||
public abstract class LivyPySparkBaseInterpreter extends BaseLivyInterprereter {
|
||||
|
||||
public LivyPySparkBaseInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractAppId() throws LivyException {
|
||||
return extractStatementResult(
|
||||
interpret("sc.applicationId", null, false, false).message()
|
||||
.get(0).getData());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractWebUIAddress() throws LivyException {
|
||||
return extractStatementResult(
|
||||
interpret(
|
||||
"sc._jsc.sc().ui().get().appUIAddress()", null, false, false)
|
||||
.message().get(0).getData());
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
|
||||
* from following:
|
||||
* u'application_1473129941656_0048'
|
||||
*
|
||||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("'")) >= 0) {
|
||||
return result.substring(pos + 1, result.length() - 1).trim();
|
||||
} else {
|
||||
throw new RuntimeException("No result can be extracted from '" + result + "', " +
|
||||
"something must be wrong");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -33,7 +33,7 @@ import java.util.Properties;
|
|||
/**
|
||||
* Livy PySpark interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivyPySparkInterpreter extends BaseLivyInterprereter {
|
||||
public class LivyPySparkInterpreter extends LivyPySparkBaseInterpreter {
|
||||
|
||||
public LivyPySparkInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -43,4 +43,6 @@ public class LivyPySparkInterpreter extends BaseLivyInterprereter {
|
|||
public String getSessionKind() {
|
||||
return "pyspark";
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,4 +44,39 @@ public class LivySparkInterpreter extends BaseLivyInterprereter {
|
|||
return "spark";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractAppId() throws LivyException {
|
||||
return extractStatementResult(
|
||||
interpret("sc.applicationId", null, false, false).message()
|
||||
.get(0).getData());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractWebUIAddress() throws LivyException {
|
||||
interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
|
||||
null, false, false);
|
||||
return extractStatementResult(
|
||||
interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
|
||||
.message().get(0).getData());
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
|
||||
* from following:
|
||||
* res0: String = application_1473129941656_0048
|
||||
*
|
||||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("=")) >= 0) {
|
||||
return result.substring(pos + 1).trim();
|
||||
} else {
|
||||
throw new RuntimeException("No result can be extracted from '" + result + "', " +
|
||||
"something must be wrong");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,4 +43,16 @@ public class LivySparkRInterpreter extends BaseLivyInterprereter {
|
|||
public String getSessionKind() {
|
||||
return "sparkr";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractAppId() throws LivyException {
|
||||
//TODO(zjffdu) depends on SparkR
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractWebUIAddress() throws LivyException {
|
||||
//TODO(zjffdu) depends on SparkR
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import org.apache.zeppelin.interpreter.*;
|
|||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
|
|
@ -123,28 +125,12 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
// assumption is correct for now. Ideally livy should return table type. We may do it in
|
||||
// the future release of livy.
|
||||
if (message.getType() == InterpreterResult.Type.TEXT) {
|
||||
StringBuilder resMsg = new StringBuilder();
|
||||
String[] rows = message.getData().split("\n");
|
||||
String[] headers = rows[1].split("\\|");
|
||||
for (int head = 1; head < headers.length; head++) {
|
||||
resMsg.append(headers[head].trim()).append("\t");
|
||||
List<String> rows = parseSQLOutput(message.getData());
|
||||
result2.add(InterpreterResult.Type.TABLE, StringUtils.join(rows, "\n"));
|
||||
if (rows.size() >= (maxResult + 1)) {
|
||||
result2.add(InterpreterResult.Type.HTML,
|
||||
"<font color=red>Results are limited by " + maxResult + ".</font>");
|
||||
}
|
||||
resMsg.append("\n");
|
||||
if (rows[3].indexOf("+") == 0) {
|
||||
|
||||
} else {
|
||||
for (int cols = 3; cols < rows.length - 1; cols++) {
|
||||
String[] col = rows[cols].split("\\|");
|
||||
for (int data = 1; data < col.length; data++) {
|
||||
resMsg.append(col[data].trim()).append("\t");
|
||||
}
|
||||
resMsg.append("\n");
|
||||
}
|
||||
}
|
||||
if (rows[rows.length - 1].indexOf("only") == 0) {
|
||||
resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
|
||||
}
|
||||
result2.add(InterpreterResult.Type.TABLE, resMsg.toString());
|
||||
} else {
|
||||
result2.add(message.getType(), message.getData());
|
||||
}
|
||||
|
|
@ -160,6 +146,54 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
}
|
||||
}
|
||||
|
||||
protected List<String> parseSQLOutput(String output) {
|
||||
List<String> rows = new ArrayList<>();
|
||||
String[] lines = output.split("\n");
|
||||
// at least 4 lines, even for empty sql output
|
||||
// +---+---+
|
||||
// | a| b|
|
||||
// +---+---+
|
||||
// +---+---+
|
||||
|
||||
// use the first line to determinte the position of feach cell
|
||||
String[] tokens = StringUtils.split(lines[0], "\\+");
|
||||
// pairs keeps the start/end position of each cell. We parse it from the first row
|
||||
// which use '+' as separator
|
||||
List<Pair> pairs = new ArrayList<>();
|
||||
int start = 0;
|
||||
int end = 0;
|
||||
for (String token : tokens) {
|
||||
start = end + 1;
|
||||
end = start + token.length();
|
||||
pairs.add(new Pair(start, end));
|
||||
}
|
||||
|
||||
for (String line : lines) {
|
||||
// skip line like "+---+---+" and "only showing top 1 row"
|
||||
if (!line.matches("(\\+\\-+)+\\+") || line.contains("only showing")) {
|
||||
List<String> cells = new ArrayList<>();
|
||||
for (Pair pair : pairs) {
|
||||
// strip the blank space around the cell
|
||||
cells.add(line.substring(pair.start, pair.end).trim());
|
||||
}
|
||||
rows.add(StringUtils.join(cells, "\t"));
|
||||
}
|
||||
}
|
||||
return rows;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represent the start and end index of each cell
|
||||
*/
|
||||
private static class Pair {
|
||||
private int start;
|
||||
private int end;
|
||||
public Pair(int start, int end) {
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean concurrentSQL() {
|
||||
return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
|
||||
}
|
||||
|
|
@ -185,4 +219,16 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
public void close() {
|
||||
this.sparkInterpreter.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractAppId() throws LivyException {
|
||||
// it wont' be called because it would delegate to LivySparkInterpreter
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractWebUIAddress() throws LivyException {
|
||||
// it wont' be called because it would delegate to LivySparkInterpreter
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -157,7 +157,7 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
public void testSparkInterpreterDataFrame() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
@ -196,14 +196,12 @@ public class LivyInterpreterIT {
|
|||
result = sqlInterpreter.interpret("select * from df where col_1='hello'", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
// TODO(zjffdu), \t at the end of each line is not necessary,
|
||||
// it is a bug of LivySparkSQLInterpreter
|
||||
assertEquals("col_1\tcol_2\t\nhello\t20\t\n", result.message().get(0).getData());
|
||||
assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
|
||||
// double quotes
|
||||
result = sqlInterpreter.interpret("select * from df where col_1=\"hello\"", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
assertEquals("col_1\tcol_2\t\nhello\t20\t\n", result.message().get(0).getData());
|
||||
assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
|
||||
// double quotes inside attribute value
|
||||
// TODO(zjffdu). This test case would fail on spark-1.5, would uncomment it when upgrading to
|
||||
// livy-0.3 and spark-1.6
|
||||
|
|
@ -353,7 +351,7 @@ public class LivyInterpreterIT {
|
|||
// TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
|
||||
}
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
public void testLivyTutorialNote() throws IOException {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Unit test for LivySQLInterpreter
|
||||
*/
|
||||
public class LivySQLInterpreterTest {
|
||||
|
||||
private LivySparkSQLInterpreter sqlInterpreter;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
|
||||
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "3");
|
||||
sqlInterpreter = new LivySparkSQLInterpreter(properties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseSQLOutput() {
|
||||
// Empty sql output
|
||||
// +---+---+
|
||||
// | a| b|
|
||||
// +---+---+
|
||||
// +---+---+
|
||||
List<String> rows = sqlInterpreter.parseSQLOutput("+---+---+\n" +
|
||||
"| a| b|\n" +
|
||||
"+---+---+\n" +
|
||||
"+---+---+");
|
||||
assertEquals(1, rows.size());
|
||||
assertEquals("a\tb", rows.get(0));
|
||||
|
||||
|
||||
// sql output with 2 rows
|
||||
// +---+---+
|
||||
// | a| b|
|
||||
// +---+---+
|
||||
// | 1| 1a|
|
||||
// | 2| 2b|
|
||||
// +---+---+
|
||||
rows = sqlInterpreter.parseSQLOutput("+---+---+\n" +
|
||||
"| a| b|\n" +
|
||||
"+---+---+\n" +
|
||||
"| 1| 1a|\n" +
|
||||
"| 2| 2b|\n" +
|
||||
"+---+---+");
|
||||
assertEquals(3, rows.size());
|
||||
assertEquals("a\tb", rows.get(0));
|
||||
assertEquals("1\t1a", rows.get(1));
|
||||
assertEquals("2\t2b", rows.get(2));
|
||||
|
||||
|
||||
// sql output with 3 rows and showing "only showing top 3 rows"
|
||||
// +---+---+
|
||||
// | a| b|
|
||||
// +---+---+
|
||||
// | 1| 1a|
|
||||
// | 2| 2b|
|
||||
// | 3| 3c|
|
||||
// +---+---+
|
||||
rows = sqlInterpreter.parseSQLOutput("+---+---+\n" +
|
||||
"| a| b|\n" +
|
||||
"+---+---+\n" +
|
||||
"| 1| 1a|\n" +
|
||||
"| 2| 2b|\n" +
|
||||
"| 3| 3c|\n" +
|
||||
"+---+---+");
|
||||
assertEquals(4, rows.size());
|
||||
assertEquals("a\tb", rows.get(0));
|
||||
assertEquals("1\t1a", rows.get(1));
|
||||
assertEquals("2\t2b", rows.get(2));
|
||||
assertEquals("3\t3c", rows.get(3));
|
||||
}
|
||||
}
|
||||
|
|
@ -20,9 +20,7 @@ package org.apache.zeppelin.shell;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
|
@ -86,7 +84,8 @@ public class ShellInterpreter extends Interpreter {
|
|||
|
||||
try {
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outStream, outStream));
|
||||
executor.setStreamHandler(new PumpStreamHandler(
|
||||
contextInterpreter.out, contextInterpreter.out));
|
||||
executor.setWatchdog(new ExecuteWatchdog(Long.valueOf(getProperty(TIMEOUT_PROPERTY))));
|
||||
executors.put(contextInterpreter.getParagraphId(), executor);
|
||||
int exitVal = executor.execute(cmdLine);
|
||||
|
|
@ -100,7 +99,7 @@ public class ShellInterpreter extends Interpreter {
|
|||
String message = outStream.toString();
|
||||
if (exitValue == 143) {
|
||||
code = Code.INCOMPLETE;
|
||||
message += "Paragraph received a SIGTERM.\n";
|
||||
message += "Paragraph received a SIGTERM\n";
|
||||
LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " stopped executing: " + message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,12 +32,17 @@ import org.junit.Test;
|
|||
public class ShellInterpreterTest {
|
||||
|
||||
private ShellInterpreter shell;
|
||||
private InterpreterContext context;
|
||||
private InterpreterResult result;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Properties p = new Properties();
|
||||
p.setProperty("shell.command.timeout.millisecs", "60000");
|
||||
p.setProperty("shell.command.timeout.millisecs", "2000");
|
||||
shell = new ShellInterpreter(p);
|
||||
|
||||
context = new InterpreterContext("", "1", null, "", "", null, null, null, null, null, null, null);
|
||||
shell.open();
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
@ -46,9 +51,6 @@ public class ShellInterpreterTest {
|
|||
|
||||
@Test
|
||||
public void test() {
|
||||
shell.open();
|
||||
InterpreterContext context = new InterpreterContext("", "1", null, "", "", null, null, null, null, null, null, null);
|
||||
InterpreterResult result = new InterpreterResult(Code.ERROR);
|
||||
if (System.getProperty("os.name").startsWith("Windows")) {
|
||||
result = shell.interpret("dir", context);
|
||||
} else {
|
||||
|
|
@ -63,16 +65,24 @@ public class ShellInterpreterTest {
|
|||
|
||||
@Test
|
||||
public void testInvalidCommand(){
|
||||
shell.open();
|
||||
InterpreterContext context = new InterpreterContext("","1",null,"","",null,null,null,null,null,null,null);
|
||||
InterpreterResult result = new InterpreterResult(Code.ERROR);
|
||||
if (System.getProperty("os.name").startsWith("Windows")) {
|
||||
result = shell.interpret("invalid_command\ndir",context);
|
||||
result = shell.interpret("invalid_command\ndir", context);
|
||||
} else {
|
||||
result = shell.interpret("invalid_command\nls",context);
|
||||
result = shell.interpret("invalid_command\nls", context);
|
||||
}
|
||||
assertEquals(InterpreterResult.Code.SUCCESS,result.code());
|
||||
assertTrue(result.message().get(0).getData().contains("invalid_command"));
|
||||
assertEquals(Code.SUCCESS, result.code());
|
||||
assertTrue(shell.executors.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShellTimeout() {
|
||||
if (System.getProperty("os.name").startsWith("Windows")) {
|
||||
result = shell.interpret("timeout 4", context);
|
||||
} else {
|
||||
result = shell.interpret("sleep 4", context);
|
||||
}
|
||||
|
||||
assertEquals(Code.INCOMPLETE, result.code());
|
||||
assertTrue(result.message().get(0).getData().contains("Paragraph received a SIGTERM"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -293,7 +293,9 @@ public class ZeppelinContext {
|
|||
}
|
||||
|
||||
if (rows.length > maxResult) {
|
||||
msg.append("\n<font color=red>Results are limited by " + maxResult + ".</font>");
|
||||
msg.append("<!--TABLE_COMMENT-->");
|
||||
msg.append("\n");
|
||||
msg.append("<font color=red>Results are limited by " + maxResult + ".</font>");
|
||||
}
|
||||
sc.clearJobGroup();
|
||||
return msg.toString();
|
||||
|
|
|
|||
|
|
@ -321,7 +321,9 @@ while True :
|
|||
# so that the last statement's evaluation will be printed to stdout
|
||||
sc.setJobGroup(jobGroup, "Zeppelin")
|
||||
code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
|
||||
to_run_hooks = code.body[-nhooks:]
|
||||
to_run_hooks = []
|
||||
if (nhooks > 0):
|
||||
to_run_hooks = code.body[-nhooks:]
|
||||
to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
|
||||
[code.body[-(nhooks + 1)]])
|
||||
|
||||
|
|
|
|||
|
|
@ -213,7 +213,7 @@ public class PySparkInterpreterMatplotlibTest {
|
|||
// again but in a different color.
|
||||
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret2 = pyspark.interpret("plt.show()", context);
|
||||
assertNotSame(ret1.message().get(1).getData(), ret2.message().get(1).getData());
|
||||
assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -226,7 +226,7 @@ public class PySparkInterpreterMatplotlibTest {
|
|||
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret = pyspark.interpret("plt.show()", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertEquals(ret.message().toString(), Type.ANGULAR, ret.message().get(1).getType());
|
||||
assertEquals(ret.message().toString(), Type.ANGULAR, ret.message().get(0).getType());
|
||||
|
||||
// Check if the figure data is in the Angular Object Registry
|
||||
AngularObjectRegistry registry = context.getAngularObjectRegistry();
|
||||
|
|
|
|||
|
|
@ -1,28 +0,0 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
|
||||
set -xe
|
||||
|
||||
if [[ -n $LIVY_VER ]]; then
|
||||
./testing/downloadLivy.sh
|
||||
export LIVY_HOME=`pwd`/livy-server-$LIVY_VER
|
||||
export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
|
||||
fi
|
||||
|
||||
set +xe
|
||||
|
|
@ -99,7 +99,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
|
|||
File destFile = new File(destPath, srcFile.getName());
|
||||
if (!destFile.exists() || !FileUtils.contentEquals(srcFile, destFile)) {
|
||||
FileUtils.copyFile(srcFile, destFile);
|
||||
logger.info("copy {} to {}", srcFile.getAbsolutePath(), destPath);
|
||||
logger.debug("copy {} to {}", srcFile.getAbsolutePath(), destPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -117,7 +117,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
|
|||
|
||||
if (!destFile.exists() || !FileUtils.contentEquals(srcFile, destFile)) {
|
||||
FileUtils.copyFile(srcFile, destFile);
|
||||
logger.info("copy {} to {}", srcFile.getAbsolutePath(), destPath);
|
||||
logger.debug("copy {} to {}", srcFile.getAbsolutePath(), destPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -145,7 +145,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
|
|||
List<File> files = new LinkedList<>();
|
||||
for (ArtifactResult artifactResult : listOfArtifact) {
|
||||
files.add(artifactResult.getArtifact().getFile());
|
||||
logger.info("load {}", artifactResult.getArtifact().getFile().getAbsolutePath());
|
||||
logger.debug("load {}", artifactResult.getArtifact().getFile().getAbsolutePath());
|
||||
}
|
||||
|
||||
return files;
|
||||
|
|
|
|||
|
|
@ -30,4 +30,6 @@ public class Constants {
|
|||
|
||||
public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914;
|
||||
|
||||
public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,13 @@ public class InterpreterOutput extends OutputStream {
|
|||
private final InterpreterOutputListener flushListener;
|
||||
private final InterpreterOutputChangeListener changeListener;
|
||||
|
||||
private int size = 0;
|
||||
|
||||
// change static var to set interpreter output limit
|
||||
// limit will be applied to all InterpreterOutput object.
|
||||
// so we can expect the consistent behavior
|
||||
public static int limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
|
||||
|
||||
public InterpreterOutput(InterpreterOutputListener flushListener) {
|
||||
this.flushListener = flushListener;
|
||||
changeListener = null;
|
||||
|
|
@ -52,7 +59,8 @@ public class InterpreterOutput extends OutputStream {
|
|||
}
|
||||
|
||||
public InterpreterOutput(InterpreterOutputListener flushListener,
|
||||
InterpreterOutputChangeListener listener) throws IOException {
|
||||
InterpreterOutputChangeListener listener)
|
||||
throws IOException {
|
||||
this.flushListener = flushListener;
|
||||
this.changeListener = listener;
|
||||
clear();
|
||||
|
|
@ -74,6 +82,7 @@ public class InterpreterOutput extends OutputStream {
|
|||
out.setResourceSearchPaths(resourceSearchPaths);
|
||||
|
||||
buffer.reset();
|
||||
size = 0;
|
||||
|
||||
if (currentOut != null) {
|
||||
currentOut.flush();
|
||||
|
|
@ -125,6 +134,8 @@ public class InterpreterOutput extends OutputStream {
|
|||
}
|
||||
|
||||
public void clear() {
|
||||
size = 0;
|
||||
truncated = false;
|
||||
buffer.reset();
|
||||
|
||||
synchronized (resultMessageOutputs) {
|
||||
|
|
@ -157,11 +168,31 @@ public class InterpreterOutput extends OutputStream {
|
|||
boolean startOfTheNewLine = true;
|
||||
boolean firstCharIsPercentSign = false;
|
||||
|
||||
boolean truncated = false;
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
InterpreterResultMessageOutput out;
|
||||
if (truncated) {
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (resultMessageOutputs) {
|
||||
currentOut = getCurrentOutput();
|
||||
|
||||
if (++size > limit) {
|
||||
if (b == NEW_LINE_CHAR && currentOut != null) {
|
||||
InterpreterResult.Type type = currentOut.getType();
|
||||
if (type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE) {
|
||||
|
||||
setType(InterpreterResult.Type.TEXT);
|
||||
getCurrentOutput().write("Output exceeds " + limit + ". Truncated.\n");
|
||||
truncated = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (startOfTheNewLine) {
|
||||
if (b == '%') {
|
||||
startOfTheNewLine = false;
|
||||
|
|
@ -175,7 +206,6 @@ public class InterpreterOutput extends OutputStream {
|
|||
}
|
||||
|
||||
if (b == NEW_LINE_CHAR) {
|
||||
currentOut = getCurrentOutput();
|
||||
if (currentOut != null && currentOut.getType() == InterpreterResult.Type.TABLE) {
|
||||
if (previousChar == NEW_LINE_CHAR) {
|
||||
startOfTheNewLine = true;
|
||||
|
|
|
|||
|
|
@ -63,6 +63,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private int port;
|
||||
private String userName;
|
||||
private Boolean isUserImpersonate;
|
||||
private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
|
||||
|
||||
/**
|
||||
* Remote interpreter and manage interpreter process
|
||||
|
|
@ -70,7 +71,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
public RemoteInterpreter(Properties property, String sessionKey, String className,
|
||||
String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
|
||||
int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
|
||||
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
|
||||
int outputLimit) {
|
||||
super(property);
|
||||
this.sessionKey = sessionKey;
|
||||
this.className = className;
|
||||
|
|
@ -85,6 +87,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.applicationEventListener = appListener;
|
||||
this.userName = userName;
|
||||
this.isUserImpersonate = isUserImpersonate;
|
||||
this.outputLimit = outputLimit;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -94,7 +97,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
|
||||
int port, String localRepoPath, int connectTimeout, int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
|
||||
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
|
||||
int outputLimit) {
|
||||
super(property);
|
||||
this.sessionKey = sessionKey;
|
||||
this.className = className;
|
||||
|
|
@ -108,6 +112,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.applicationEventListener = appListener;
|
||||
this.userName = userName;
|
||||
this.isUserImpersonate = isUserImpersonate;
|
||||
this.outputLimit = outputLimit;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -217,6 +222,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
if (localRepoPath != null) {
|
||||
property.put("zeppelin.interpreter.localRepo", localRepoPath);
|
||||
}
|
||||
|
||||
property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
|
||||
client.createInterpreter(groupId, sessionKey,
|
||||
getClassName(), (Map) property, userName);
|
||||
// Push angular object loaded from JSON file to remote interpreter
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
|
|
@ -109,7 +110,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
cmdLine.addArgument(localRepoDir, false);
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
executor.setStreamHandler(new PumpStreamHandler(new ProcessLogOutputStream(logger)));
|
||||
|
||||
ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
|
||||
ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
|
||||
processOutput.setOutputStream(cmdOut);
|
||||
|
||||
executor.setStreamHandler(new PumpStreamHandler(processOutput));
|
||||
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
|
||||
executor.setWatchdog(watchdog);
|
||||
|
||||
|
|
@ -128,6 +134,15 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
|
||||
if (!running) {
|
||||
try {
|
||||
cmdOut.flush();
|
||||
} catch (IOException e) {
|
||||
// nothing to do
|
||||
}
|
||||
throw new InterpreterException(new String(cmdOut.toByteArray()));
|
||||
}
|
||||
|
||||
try {
|
||||
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
|
||||
break;
|
||||
|
|
@ -145,6 +160,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
}
|
||||
}
|
||||
}
|
||||
processOutput.setOutputStream(null);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
|
|
@ -179,6 +195,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
private static class ProcessLogOutputStream extends LogOutputStream {
|
||||
|
||||
private Logger logger;
|
||||
OutputStream out;
|
||||
|
||||
public ProcessLogOutputStream(Logger logger) {
|
||||
this.logger = logger;
|
||||
|
|
@ -188,5 +205,37 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
protected void processLine(String s, int i) {
|
||||
this.logger.debug(s);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte [] b) throws IOException {
|
||||
super.write(b);
|
||||
|
||||
if (out != null) {
|
||||
synchronized (this) {
|
||||
if (out != null) {
|
||||
out.write(b);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte [] b, int offset, int len) throws IOException {
|
||||
super.write(b, offset, len);
|
||||
|
||||
if (out != null) {
|
||||
synchronized (this) {
|
||||
if (out != null) {
|
||||
out.write(b, offset, len);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setOutputStream(OutputStream out) {
|
||||
synchronized (this) {
|
||||
this.out = out;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -162,6 +162,11 @@ public class RemoteInterpreterServer
|
|||
interpreterGroup.setResourcePool(resourcePool);
|
||||
|
||||
String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
|
||||
if (properties.containsKey("zeppelin.interpreter.output.limit")) {
|
||||
InterpreterOutput.limit = Integer.parseInt(
|
||||
properties.get("zeppelin.interpreter.output.limit"));
|
||||
}
|
||||
|
||||
depLoader = new DependencyResolver(localRepoPath);
|
||||
appLoader = new ApplicationLoader(resourcePool, depLoader);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -202,7 +202,11 @@ public abstract class Job {
|
|||
}
|
||||
|
||||
Throwable cause = ExceptionUtils.getRootCause(e);
|
||||
return ExceptionUtils.getFullStackTrace(cause);
|
||||
if (cause != null) {
|
||||
return ExceptionUtils.getFullStackTrace(cause);
|
||||
} else {
|
||||
return ExceptionUtils.getFullStackTrace(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Throwable getException() {
|
||||
|
|
|
|||
|
|
@ -346,6 +346,10 @@ public class RemoteScheduler implements Scheduler {
|
|||
lastStatus = Status.ERROR;
|
||||
}
|
||||
}
|
||||
if (job.getException() != null) {
|
||||
lastStatus = Status.ERROR;
|
||||
}
|
||||
|
||||
job.setStatus(lastStatus);
|
||||
|
||||
if (listener != null) {
|
||||
|
|
|
|||
|
|
@ -162,6 +162,33 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
|
|||
assertEquals("val1\tval2\n", new String(out.getOutputAt(1).toByteArray()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncate() throws IOException {
|
||||
// output is truncated after the new line
|
||||
InterpreterOutput.limit = 3;
|
||||
out = new InterpreterOutput(this);
|
||||
|
||||
// truncate text
|
||||
out.write("%text hello\nworld\n");
|
||||
assertEquals("hello", new String(out.getOutputAt(0).toByteArray()));
|
||||
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
|
||||
|
||||
// truncate table
|
||||
out = new InterpreterOutput(this);
|
||||
out.write("%table key\tvalue\nhello\t100\nworld\t200\n");
|
||||
assertEquals("key\tvalue", new String(out.getOutputAt(0).toByteArray()));
|
||||
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
|
||||
|
||||
// does not truncate html
|
||||
out = new InterpreterOutput(this);
|
||||
out.write("%html hello\nworld\n");
|
||||
out.flush();
|
||||
assertEquals("hello\nworld\n", new String(out.getOutputAt(0).toByteArray()));
|
||||
|
||||
// restore default
|
||||
InterpreterOutput.limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onUpdateAll(InterpreterOutput out) {
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import java.util.Properties;
|
|||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.interpreter.Constants;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.junit.Test;
|
||||
|
|
@ -109,4 +110,21 @@ public class RemoteInterpreterProcessTest {
|
|||
assertEquals(1, rip.reference(intpGroup, "anonymous", false));
|
||||
assertEquals(true, rip.isRunning());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPropagateError() throws TException, InterruptedException {
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
|
||||
"echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
|
||||
10 * 1000, null, null);
|
||||
assertFalse(rip.isRunning());
|
||||
assertEquals(0, rip.referenceCount());
|
||||
try {
|
||||
assertEquals(1, rip.reference(intpGroup, "anonymous", false));
|
||||
} catch (InterpreterException e) {
|
||||
e.getMessage().contains("hello_world");
|
||||
}
|
||||
assertEquals(0, rip.referenceCount());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -831,8 +831,8 @@ public class RemoteInterpreterTest {
|
|||
|
||||
|
||||
assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData());
|
||||
assertEquals(0, intp.interpret("getProperty MY_ENV1", context).message().size());
|
||||
assertEquals(0, intp.interpret("getEnv my.property.1", context).message().size());
|
||||
assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code());
|
||||
assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code());
|
||||
assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData());
|
||||
|
||||
intp.close();
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ import org.apache.zeppelin.helium.Helium;
|
|||
import org.apache.zeppelin.helium.HeliumApplicationFactory;
|
||||
import org.apache.zeppelin.helium.HeliumVisualizationFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.NotebookAuthorization;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
|
||||
|
|
@ -98,6 +99,8 @@ public class ZeppelinServer extends Application {
|
|||
this.depResolver = new DependencyResolver(
|
||||
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
|
||||
|
||||
InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
|
||||
|
||||
HeliumApplicationFactory heliumApplicationFactory = new HeliumApplicationFactory();
|
||||
HeliumVisualizationFactory heliumVisualizationFactory;
|
||||
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
|
|||
}.getType());
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(p.getStatus(), Job.Status.READY);
|
||||
assertEquals(p.getStatus(), Job.Status.FINISHED);
|
||||
|
||||
// run non-blank paragraph
|
||||
p.setText("test");
|
||||
|
|
|
|||
|
|
@ -424,8 +424,11 @@ function InterpreterCtrl($rootScope, $scope, $http, baseUrlSrv, ngToast, $timeou
|
|||
.success(function(data, status, headers, config) {
|
||||
var index = _.findIndex($scope.interpreterSettings, {'id': settingId});
|
||||
$scope.interpreterSettings[index] = data.body;
|
||||
ngToast.info('Interpreter stopped. Will be lazily started on next run.');
|
||||
}).error(function(data, status, headers, config) {
|
||||
console.log('Error %o %o', status, data.message);
|
||||
var errorMsg = (data !== null) ? data.message : 'Could not connect to server.';
|
||||
console.log('Error %o %o', status, errorMsg);
|
||||
ngToast.danger(errorMsg);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ limitations under the License.
|
|||
|
||||
<select class="form-control input-sm"
|
||||
ng-if="paragraph.settings.forms[formulaire.name].options && paragraph.settings.forms[formulaire.name].type != 'checkbox'"
|
||||
ng-enter="runParagraph(getEditorValue())"
|
||||
ng-model="paragraph.settings.params[formulaire.name]"
|
||||
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
|
||||
name="{{formulaire.name}}"
|
||||
|
|
|
|||
|
|
@ -37,12 +37,13 @@ export default class TableData {
|
|||
|
||||
for (var i = 0; i < textRows.length; i++) {
|
||||
var textRow = textRows[i];
|
||||
|
||||
if (commentRow) {
|
||||
comment += textRow;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (textRow === '') {
|
||||
if (textRow === '' || textRow === '<!--TABLE_COMMENT-->') {
|
||||
if (rows.length > 0) {
|
||||
commentRow = true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,6 +85,10 @@ function NavCtrl($scope, $rootScope, $http, $routeParams, $location,
|
|||
return ($routeParams.noteId === noteId);
|
||||
}
|
||||
|
||||
function listConfigurations() {
|
||||
websocketMsgSrv.listConfigurations();
|
||||
}
|
||||
|
||||
function loadNotes() {
|
||||
websocketMsgSrv.getNoteList();
|
||||
}
|
||||
|
|
@ -135,6 +139,7 @@ function NavCtrl($scope, $rootScope, $http, $routeParams, $location,
|
|||
});
|
||||
|
||||
$scope.$on('loginSuccess', function(event, param) {
|
||||
listConfigurations();
|
||||
loadNotes();
|
||||
});
|
||||
|
||||
|
|
@ -153,4 +158,3 @@ function NavCtrl($scope, $rootScope, $http, $routeParams, $location,
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -572,6 +572,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
|
||||
+ "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
|
||||
+ "scalding,jdbc,hbase,bigquery,beam,pig,scio"),
|
||||
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
// use specified notebook (id) as homescreen
|
||||
|
|
|
|||
|
|
@ -1150,7 +1150,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
LazyOpenInterpreter intp = new LazyOpenInterpreter(
|
||||
new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath,
|
||||
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
|
||||
userName, isUserImpersonate));
|
||||
userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));
|
||||
return intp;
|
||||
}
|
||||
|
||||
|
|
@ -1175,7 +1175,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
RemoteInterpreter remoteInterpreter =
|
||||
new RemoteInterpreter(property, interpreterSessionKey, className,
|
||||
interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
|
||||
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
|
||||
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate,
|
||||
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT));
|
||||
remoteInterpreter.addEnv(env);
|
||||
|
||||
return new LazyOpenInterpreter(remoteInterpreter);
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
|
@ -144,6 +145,33 @@ public class InterpreterSetting {
|
|||
return key;
|
||||
}
|
||||
|
||||
private boolean isEqualInterpreterKeyProcessKey(String refKey, String processKey) {
|
||||
InterpreterOption option = getOption();
|
||||
int validCount = 0;
|
||||
if (getOption().isProcess()
|
||||
&& !(option.perUserIsolated() == true && option.perNoteIsolated() == true)) {
|
||||
|
||||
List<String> processList = Arrays.asList(processKey.split(":"));
|
||||
List<String> refList = Arrays.asList(refKey.split(":"));
|
||||
|
||||
if (refList.size() <= 1 || processList.size() <= 1) {
|
||||
return refKey.equals(processKey);
|
||||
}
|
||||
|
||||
if (processList.get(0).equals("") || processList.get(0).equals(refList.get(0))) {
|
||||
validCount = validCount + 1;
|
||||
}
|
||||
|
||||
if (processList.get(1).equals("") || processList.get(1).equals(refList.get(1))) {
|
||||
validCount = validCount + 1;
|
||||
}
|
||||
|
||||
return (validCount >= 2);
|
||||
} else {
|
||||
return refKey.equals(processKey);
|
||||
}
|
||||
}
|
||||
|
||||
private String getInterpreterSessionKey(String user, String noteId) {
|
||||
InterpreterOption option = getOption();
|
||||
String key;
|
||||
|
|
@ -194,18 +222,19 @@ public class InterpreterSetting {
|
|||
}
|
||||
|
||||
void closeAndRemoveInterpreterGroupByNoteId(String noteId) {
|
||||
String key = getInterpreterProcessKey("", noteId);
|
||||
|
||||
InterpreterGroup groupToRemove = null;
|
||||
String processKey = getInterpreterProcessKey("", noteId);
|
||||
List<InterpreterGroup> closeToGroupList = new LinkedList<>();
|
||||
InterpreterGroup groupKey;
|
||||
for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
|
||||
if (intpKey.contains(key)) {
|
||||
if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) {
|
||||
interpreterGroupWriteLock.lock();
|
||||
groupToRemove = interpreterGroupRef.remove(intpKey);
|
||||
groupKey = interpreterGroupRef.remove(intpKey);
|
||||
interpreterGroupWriteLock.unlock();
|
||||
closeToGroupList.add(groupKey);
|
||||
}
|
||||
}
|
||||
|
||||
if (groupToRemove != null) {
|
||||
for (InterpreterGroup groupToRemove : closeToGroupList) {
|
||||
groupToRemove.close();
|
||||
}
|
||||
}
|
||||
|
|
@ -216,17 +245,19 @@ public class InterpreterSetting {
|
|||
}
|
||||
String processKey = getInterpreterProcessKey(user, "");
|
||||
String sessionKey = getInterpreterSessionKey(user, "");
|
||||
InterpreterGroup groupToRemove = null;
|
||||
List<InterpreterGroup> groupToRemove = new LinkedList<>();
|
||||
InterpreterGroup groupItem;
|
||||
for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
|
||||
if (intpKey.contains(processKey)) {
|
||||
if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) {
|
||||
interpreterGroupWriteLock.lock();
|
||||
groupToRemove = interpreterGroupRef.remove(intpKey);
|
||||
groupItem = interpreterGroupRef.remove(intpKey);
|
||||
interpreterGroupWriteLock.unlock();
|
||||
groupToRemove.add(groupItem);
|
||||
}
|
||||
}
|
||||
|
||||
if (groupToRemove != null) {
|
||||
groupToRemove.close(sessionKey);
|
||||
for (InterpreterGroup groupToClose : groupToRemove) {
|
||||
groupToClose.close(sessionKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -243,7 +274,7 @@ public class InterpreterSetting {
|
|||
List<InterpreterGroup> groupToRemove = new LinkedList<>();
|
||||
InterpreterGroup groupItem;
|
||||
for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
|
||||
if (intpKey.contains(key)) {
|
||||
if (isEqualInterpreterKeyProcessKey(intpKey, key)) {
|
||||
interpreterGroupWriteLock.lock();
|
||||
groupItem = interpreterGroupRef.remove(intpKey);
|
||||
interpreterGroupWriteLock.unlock();
|
||||
|
|
|
|||
|
|
@ -222,7 +222,7 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
|
||||
public void initializeJobListenerForParagraph(Paragraph paragraph) {
|
||||
final Note paragraphNote = paragraph.getNote();
|
||||
if (paragraphNote.getId().equals(this.getId())) {
|
||||
if (!paragraphNote.getId().equals(this.getId())) {
|
||||
throw new IllegalArgumentException(
|
||||
format("The paragraph %s from note %s " + "does not belong to note %s", paragraph.getId(),
|
||||
paragraphNote.getId(), this.getId()));
|
||||
|
|
@ -555,13 +555,14 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
*/
|
||||
public void run(String paragraphId) {
|
||||
Paragraph p = getParagraph(paragraphId);
|
||||
|
||||
p.setListener(jobListenerFactory.getParagraphJobListener(this));
|
||||
|
||||
if (p.isBlankParagraph()) {
|
||||
logger.info("skip to run blank paragraph. {}", p.getId());
|
||||
p.setStatus(Job.Status.FINISHED);
|
||||
return;
|
||||
}
|
||||
|
||||
p.setListener(jobListenerFactory.getParagraphJobListener(this));
|
||||
String requiredReplName = p.getRequiredReplName();
|
||||
Interpreter intp = factory.getInterpreter(p.getUser(), getId(), requiredReplName);
|
||||
|
||||
|
|
|
|||
|
|
@ -35,13 +35,20 @@ public class MockInterpreter11 extends Interpreter{
|
|||
public MockInterpreter11(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
boolean open;
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
open = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
open = false;
|
||||
}
|
||||
|
||||
public boolean isOpen() {
|
||||
return open;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
|
|
@ -28,13 +29,17 @@ import org.apache.zeppelin.interpreter.Interpreter;
|
|||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter11;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class NoteInterpreterLoaderTest {
|
||||
|
|
@ -117,6 +122,11 @@ public class NoteInterpreterLoaderTest {
|
|||
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
|
||||
|
||||
// invalid close
|
||||
factory.closeNote("user", "note");
|
||||
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "shared_process").get("noteA"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "shared_process").get("noteB"));
|
||||
|
||||
// when
|
||||
factory.closeNote("user", "noteA");
|
||||
factory.closeNote("user", "noteB");
|
||||
|
|
@ -160,6 +170,51 @@ public class NoteInterpreterLoaderTest {
|
|||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoteInterpreterCloseForAll() throws IOException {
|
||||
factory.setInterpreters("user", "FitstNote", factory.getDefaultInterpreterSettingList());
|
||||
factory.getInterpreterSettings("FitstNote").get(0).getOption().setPerNote(InterpreterOption.SCOPED);
|
||||
|
||||
factory.setInterpreters("user", "yourFirstNote", factory.getDefaultInterpreterSettingList());
|
||||
factory.getInterpreterSettings("yourFirstNote").get(0).getOption().setPerNote(InterpreterOption.ISOLATED);
|
||||
|
||||
// interpreters are not created before accessing it
|
||||
assertNull(factory.getInterpreterSettings("FitstNote").get(0).getInterpreterGroup("user", "FitstNote").get("FitstNote"));
|
||||
assertNull(factory.getInterpreterSettings("yourFirstNote").get(0).getInterpreterGroup("user", "yourFirstNote").get("yourFirstNote"));
|
||||
|
||||
Interpreter firstNoteIntp = factory.getInterpreter("user", "FitstNote", "group1.mock1");
|
||||
Interpreter yourFirstNoteIntp = factory.getInterpreter("user", "yourFirstNote", "group1.mock1");
|
||||
|
||||
firstNoteIntp.open();
|
||||
yourFirstNoteIntp.open();
|
||||
|
||||
assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
|
||||
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
|
||||
|
||||
factory.closeNote("user", "FitstNote");
|
||||
|
||||
assertFalse(((LazyOpenInterpreter)firstNoteIntp).isOpen());
|
||||
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
|
||||
|
||||
//reopen
|
||||
firstNoteIntp.open();
|
||||
|
||||
assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
|
||||
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
|
||||
|
||||
// invalid check
|
||||
factory.closeNote("invalid", "Note");
|
||||
|
||||
assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
|
||||
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
|
||||
|
||||
// invalid contains value check
|
||||
factory.closeNote("u", "Note");
|
||||
|
||||
assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
|
||||
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
|
||||
}
|
||||
|
||||
|
||||
private void delete(File file){
|
||||
if(file.isFile()) file.delete();
|
||||
|
|
|
|||
|
|
@ -282,7 +282,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
note.run(p1.getId());
|
||||
|
||||
Thread.sleep(2 * 1000);
|
||||
assertEquals(p1.getStatus(), Status.READY);
|
||||
assertEquals(p1.getStatus(), Status.FINISHED);
|
||||
assertNull(p1.getDateStarted());
|
||||
notebook.removeNote(note.getId(), anonymous);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue