Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Bruno Bonnin 2017-01-28 10:12:38 +01:00
commit 86153a85ad
46 changed files with 663 additions and 165 deletions

View file

@ -94,7 +94,9 @@ install:
before_script:
- travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
- ./testing/setupLivy.sh
- if [[ -n $LIVY_VER ]]; then ./testing/downloadLivy.sh $LIVY_VER; fi
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-server-$LIVY_VER; fi
- if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
- tail conf/zeppelin-env.sh

View file

@ -40,6 +40,7 @@ fi
addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
addJarInDir "${ZEPPELIN_HOME}/lib"
addJarInDir "${ZEPPELIN_HOME}/lib/interpreter"
CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
$ZEPPELIN_RUNNER $JAVA_OPTS -cp $CLASSPATH $ZEPPELIN_INSTALL_INTERPRETER_MAIN ${@}

View file

@ -214,7 +214,7 @@ eval $INTERPRETER_RUN_COMMAND &
pid=$!
if [[ -z "${pid}" ]]; then
return 1;
exit 1;
else
echo ${pid} > ${ZEPPELIN_PID}
fi

View file

@ -216,6 +216,11 @@
<description>Interpreter process connect timeout in msec.</description>
</property>
<property>
<name>zeppelin.interpreter.output.limit</name>
<value>102400</value>
<description>Output message from interpreter exceeding the limit will be truncated</description>
</property>
<property>
<name>zeppelin.ssl</name>

View file

@ -24,19 +24,25 @@
# then pr[#PR] branch will be created.
#
import sys, os, subprocess
import json, urllib
from __future__ import print_function
import sys, os, subprocess, json, codecs
if sys.version_info[0] == 2:
from urllib import urlopen
else:
from urllib.request import urlopen
if len(sys.argv) == 1:
print "usage) " + sys.argv[0] + " [#PR]"
print " eg) " + sys.argv[0] + " 122"
print("usage) " + sys.argv[0] + " [#PR]")
print(" eg) " + sys.argv[0] + " 122")
sys.exit(1)
pr=sys.argv[1]
githubApi="https://api.github.com/repos/apache/zeppelin"
prInfo = json.load(urllib.urlopen(githubApi + "/pulls/" + pr))
reader = codecs.getreader("utf-8")
prInfo = json.load(reader(urlopen(githubApi + "/pulls/" + pr)))
if "message" in prInfo and prInfo["message"] == "Not Found":
sys.stderr.write("PullRequest #" + pr + " not found\n")
sys.exit(1)
@ -44,6 +50,7 @@ if "message" in prInfo and prInfo["message"] == "Not Found":
prUser=prInfo['user']['login']
prRepoUrl=prInfo['head']['repo']['clone_url']
prBranch=prInfo['head']['label'].replace(":", "/")
print(prBranch)
# create local branch
exitCode = os.system("git checkout -b pr" + pr)
@ -63,21 +70,21 @@ if exitCode != 0:
sys.exit(1)
currentBranch = subprocess.check_output(["git rev-parse --abbrev-ref HEAD"], shell=True).rstrip()
currentBranch = subprocess.check_output("git rev-parse --abbrev-ref HEAD", shell=True).rstrip().decode("utf-8")
print "Merge branch " + prBranch + " into " + currentBranch
print("Merge branch " + prBranch + " into " + currentBranch)
rev = subprocess.check_output(["git rev-parse " + prBranch], shell=True).rstrip()
prAuthor = subprocess.check_output(["git --no-pager show -s --format='%an <%ae>' " + rev], shell=True).rstrip()
prAuthorDate = subprocess.check_output(["git --no-pager show -s --format='%ad' " + rev], shell=True).rstrip()
rev = subprocess.check_output("git rev-parse " + prBranch, shell=True).rstrip().decode("utf-8")
prAuthor = subprocess.check_output("git --no-pager show -s --format=\"%an <%ae>\" " + rev, shell=True).rstrip().decode("utf-8")
prAuthorDate = subprocess.check_output("git --no-pager show -s --format=\"%ad\" " + rev, shell=True).rstrip().decode("utf-8")
prTitle = prInfo['title']
prBody = prInfo['body']
commitList = subprocess.check_output(["git log --pretty=format:'%h' " + currentBranch + ".." + prBranch], shell=True).rstrip()
commitList = subprocess.check_output("git log --pretty=format:\"%h\" " + currentBranch + ".." + prBranch, shell=True).rstrip().decode("utf-8")
authorList = []
for commitHash in commitList.split("\n"):
a = subprocess.check_output(["git show -s --pretty=format:'%an <%ae>' "+commitHash], shell=True).rstrip()
a = subprocess.check_output("git show -s --pretty=format:\"%an <%ae>\" "+commitHash, shell=True).rstrip().decode("utf-8")
if a not in authorList:
authorList.append(a)
@ -85,20 +92,20 @@ commitMsg = prTitle + "\n"
if prBody :
commitMsg += prBody + "\n\n"
for author in authorList:
commitMsg += "Author: " + author+"\n"
commitMsg += "Author: " + author +"\n"
commitMsg += "\n"
commitMsg += "Closes #" + pr + " from " + prBranch + " and squashes the following commits:\n\n"
commitMsg += subprocess.check_output(["git log --pretty=format:'%h [%an] %s' " + currentBranch + ".." + prBranch], shell=True).rstrip()
commitMsg += subprocess.check_output("git log --pretty=format:\"%h [%an] %s\" " + currentBranch + ".." + prBranch, shell=True).rstrip().decode("utf-8")
exitCode = os.system("git merge --no-commit --squash " + prBranch)
if exitCode != 0:
sys.stderr.write("Can not merge\n")
sys.exit(1)
exitCode = os.system('git commit -a --author "' + prAuthor + '" --date "' + prAuthorDate + '" -m"' + commitMsg.encode('utf-8') + '"')
exitCode = os.system('git commit -a --author "' + prAuthor + '" --date "' + prAuthorDate + '" -m"' + commitMsg + '"')
if exitCode != 0:
sys.stderr.write("Commit failed\n")
sys.exit(1)
os.system("git remote remove " + prUser)
print "Branch " + prBranch + " is merged into " + currentBranch
print("Branch " + prBranch + " is merged into " + currentBranch)

View file

@ -248,6 +248,12 @@ If both are defined, then the **environment variables** will take priority.
<td>interpreter</td>
<td>Interpreter directory</td>
</tr>
<tr>
<td>ZEPPELIN_INTERPRETER_OUTPUT_LIMIT</td>
<td>zeppelin.interpreter.output.limit</td>
<td>102400</td>
<td>Output message from interpreter exceeding the limit will be truncated</td>
</tr>
<tr>
<td>ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE</td>
<td>zeppelin.websocket.max.text.message.size</td>

View file

@ -56,6 +56,8 @@ Also you can separate option's display name and value, using `${formName=default
<img src="../assets/themes/zeppelin/img/screenshots/form_select_displayname.png" />
Hit enter after selecting option to run the paragraph with new value.
### Checkbox form
For multi-selection, you can create a checkbox form using `${checkbox:formName=defaultValue1|defaultValue2...,option1|option2...}`. The variable will be substituted by a comma-separated string based on the selected items. For example:

View file

@ -98,20 +98,12 @@ public abstract class BaseLivyInterprereter extends Interpreter {
if (sessionInfo.appId == null) {
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
// explicitly by ourselves.
sessionInfo.appId = extractStatementResult(
interpret("sc.applicationId", null, false, false).message()
.get(0).getData());
sessionInfo.appId = extractAppId();
}
if (sessionInfo.appInfo == null ||
StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
null, false, false);
sessionInfo.webUIAddress = extractStatementResult(
interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
.message().get(0).getData());
sessionInfo.webUIAddress = extractWebUIAddress();
} else {
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
}
@ -130,6 +122,10 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
}
protected abstract String extractAppId() throws LivyException;
protected abstract String extractWebUIAddress() throws LivyException;
public SessionInfo getSessionInfo() {
return sessionInfo;
}
@ -148,25 +144,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
InterpreterUtils.getMostRelevantMessage(e));
}
}
/**
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
* from following:
* res0: String = application_1473129941656_0048
*
* @param result
* @return
*/
private String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("=")) >= 0) {
return result.substring(pos + 1).trim();
} else {
throw new RuntimeException("No result can be extracted from '" + result + "', " +
"something must be wrong");
}
}
@Override
public void cancel(InterpreterContext context) {
if (livyVersion.isCancelSupported()) {
@ -206,19 +184,13 @@ public abstract class BaseLivyInterprereter extends Interpreter {
conf.put(entry.getKey().toString().substring(5), entry.getValue().toString());
}
CreateSessionRequest request = new CreateSessionRequest(kind, user, conf);
CreateSessionRequest request = new CreateSessionRequest(kind,
user == null || user.equals("anonymous") ? null : user, conf);
SessionInfo sessionInfo = SessionInfo.fromJson(
callRestAPI("/sessions", "POST", request.toJson()));
long start = System.currentTimeMillis();
// pull the session status until it is idle or timeout
while (!sessionInfo.isReady()) {
LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state,
sessionInfo.appId);
if (sessionInfo.isFinished()) {
String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
+ ", log: " + sessionInfo.log;
throw new LivyException(msg);
}
if ((System.currentTimeMillis() - start) / 1000 > sessionCreationTimeout) {
String msg = "The creation of session " + sessionInfo.id + " is timeout within "
+ sessionCreationTimeout + " seconds, appId: " + sessionInfo.appId
@ -227,6 +199,13 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
Thread.sleep(pullStatusInterval);
sessionInfo = getSessionInfo(sessionInfo.id);
LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state,
sessionInfo.appId);
if (sessionInfo.isFinished()) {
String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
+ ", log: " + sessionInfo.log;
throw new LivyException(msg);
}
}
return sessionInfo;
} catch (Exception e) {
@ -438,7 +417,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|| response.getStatusCode().value() == 201) {
return response.getBody();
} else if (response.getStatusCode().value() == 404) {
if (response.getBody().matches("Session '\\d+' not found.")) {
if (response.getBody().matches("\"Session '\\d+' not found.\"")) {
throw new SessionNotFoundException(response.getBody());
} else {
throw new APINotFoundException("No rest api found for " + targetURL +

View file

@ -33,7 +33,7 @@ import java.util.Properties;
/**
* Livy PySpark interpreter for Zeppelin.
*/
public class LivyPySpark3Interpreter extends BaseLivyInterprereter {
public class LivyPySpark3Interpreter extends LivyPySparkBaseInterpreter {
public LivyPySpark3Interpreter(Properties property) {
super(property);
@ -43,4 +43,5 @@ public class LivyPySpark3Interpreter extends BaseLivyInterprereter {
public String getSessionKind() {
return "pyspark3";
}
}

View file

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
import java.util.Properties;
/**
* Base class for PySpark Interpreter
*/
public abstract class LivyPySparkBaseInterpreter extends BaseLivyInterprereter {
public LivyPySparkBaseInterpreter(Properties property) {
super(property);
}
@Override
protected String extractAppId() throws LivyException {
return extractStatementResult(
interpret("sc.applicationId", null, false, false).message()
.get(0).getData());
}
@Override
protected String extractWebUIAddress() throws LivyException {
return extractStatementResult(
interpret(
"sc._jsc.sc().ui().get().appUIAddress()", null, false, false)
.message().get(0).getData());
}
/**
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
* from following:
* u'application_1473129941656_0048'
*
* @param result
* @return
*/
private String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("'")) >= 0) {
return result.substring(pos + 1, result.length() - 1).trim();
} else {
throw new RuntimeException("No result can be extracted from '" + result + "', " +
"something must be wrong");
}
}
}

View file

@ -33,7 +33,7 @@ import java.util.Properties;
/**
* Livy PySpark interpreter for Zeppelin.
*/
public class LivyPySparkInterpreter extends BaseLivyInterprereter {
public class LivyPySparkInterpreter extends LivyPySparkBaseInterpreter {
public LivyPySparkInterpreter(Properties property) {
super(property);
@ -43,4 +43,6 @@ public class LivyPySparkInterpreter extends BaseLivyInterprereter {
public String getSessionKind() {
return "pyspark";
}
}

View file

@ -44,4 +44,39 @@ public class LivySparkInterpreter extends BaseLivyInterprereter {
return "spark";
}
@Override
protected String extractAppId() throws LivyException {
return extractStatementResult(
interpret("sc.applicationId", null, false, false).message()
.get(0).getData());
}
@Override
protected String extractWebUIAddress() throws LivyException {
interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
null, false, false);
return extractStatementResult(
interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
.message().get(0).getData());
}
/**
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
* from following:
* res0: String = application_1473129941656_0048
*
* @param result
* @return
*/
private String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("=")) >= 0) {
return result.substring(pos + 1).trim();
} else {
throw new RuntimeException("No result can be extracted from '" + result + "', " +
"something must be wrong");
}
}
}

View file

@ -43,4 +43,16 @@ public class LivySparkRInterpreter extends BaseLivyInterprereter {
public String getSessionKind() {
return "sparkr";
}
@Override
protected String extractAppId() throws LivyException {
//TODO(zjffdu) depends on SparkR
return null;
}
@Override
protected String extractWebUIAddress() throws LivyException {
//TODO(zjffdu) depends on SparkR
return null;
}
}

View file

@ -22,6 +22,8 @@ import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@ -123,28 +125,12 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
// assumption is correct for now. Ideally livy should return table type. We may do it in
// the future release of livy.
if (message.getType() == InterpreterResult.Type.TEXT) {
StringBuilder resMsg = new StringBuilder();
String[] rows = message.getData().split("\n");
String[] headers = rows[1].split("\\|");
for (int head = 1; head < headers.length; head++) {
resMsg.append(headers[head].trim()).append("\t");
List<String> rows = parseSQLOutput(message.getData());
result2.add(InterpreterResult.Type.TABLE, StringUtils.join(rows, "\n"));
if (rows.size() >= (maxResult + 1)) {
result2.add(InterpreterResult.Type.HTML,
"<font color=red>Results are limited by " + maxResult + ".</font>");
}
resMsg.append("\n");
if (rows[3].indexOf("+") == 0) {
} else {
for (int cols = 3; cols < rows.length - 1; cols++) {
String[] col = rows[cols].split("\\|");
for (int data = 1; data < col.length; data++) {
resMsg.append(col[data].trim()).append("\t");
}
resMsg.append("\n");
}
}
if (rows[rows.length - 1].indexOf("only") == 0) {
resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
}
result2.add(InterpreterResult.Type.TABLE, resMsg.toString());
} else {
result2.add(message.getType(), message.getData());
}
@ -160,6 +146,54 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
}
}
protected List<String> parseSQLOutput(String output) {
List<String> rows = new ArrayList<>();
String[] lines = output.split("\n");
// at least 4 lines, even for empty sql output
// +---+---+
// | a| b|
// +---+---+
// +---+---+
// use the first line to determinte the position of feach cell
String[] tokens = StringUtils.split(lines[0], "\\+");
// pairs keeps the start/end position of each cell. We parse it from the first row
// which use '+' as separator
List<Pair> pairs = new ArrayList<>();
int start = 0;
int end = 0;
for (String token : tokens) {
start = end + 1;
end = start + token.length();
pairs.add(new Pair(start, end));
}
for (String line : lines) {
// skip line like "+---+---+" and "only showing top 1 row"
if (!line.matches("(\\+\\-+)+\\+") || line.contains("only showing")) {
List<String> cells = new ArrayList<>();
for (Pair pair : pairs) {
// strip the blank space around the cell
cells.add(line.substring(pair.start, pair.end).trim());
}
rows.add(StringUtils.join(cells, "\t"));
}
}
return rows;
}
/**
* Represent the start and end index of each cell
*/
private static class Pair {
private int start;
private int end;
public Pair(int start, int end) {
this.start = start;
this.end = end;
}
}
public boolean concurrentSQL() {
return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
}
@ -185,4 +219,16 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
public void close() {
this.sparkInterpreter.close();
}
@Override
protected String extractAppId() throws LivyException {
// it wont' be called because it would delegate to LivySparkInterpreter
throw new UnsupportedOperationException();
}
@Override
protected String extractWebUIAddress() throws LivyException {
// it wont' be called because it would delegate to LivySparkInterpreter
throw new UnsupportedOperationException();
}
}

View file

@ -157,7 +157,7 @@ public class LivyInterpreterIT {
}
}
@Test
// @Test
public void testSparkInterpreterDataFrame() {
if (!checkPreCondition()) {
return;
@ -196,14 +196,12 @@ public class LivyInterpreterIT {
result = sqlInterpreter.interpret("select * from df where col_1='hello'", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
// TODO(zjffdu), \t at the end of each line is not necessary,
// it is a bug of LivySparkSQLInterpreter
assertEquals("col_1\tcol_2\t\nhello\t20\t\n", result.message().get(0).getData());
assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
// double quotes
result = sqlInterpreter.interpret("select * from df where col_1=\"hello\"", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals("col_1\tcol_2\t\nhello\t20\t\n", result.message().get(0).getData());
assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
// double quotes inside attribute value
// TODO(zjffdu). This test case would fail on spark-1.5, would uncomment it when upgrading to
// livy-0.3 and spark-1.6
@ -353,7 +351,7 @@ public class LivyInterpreterIT {
// TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
}
@Test
// @Test
public void testLivyTutorialNote() throws IOException {
if (!checkPreCondition()) {
return;

View file

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.List;
import java.util.Properties;
import static org.junit.Assert.*;
/**
* Unit test for LivySQLInterpreter
*/
public class LivySQLInterpreterTest {
private LivySparkSQLInterpreter sqlInterpreter;
@Before
public void setUp() {
Properties properties = new Properties();
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "3");
sqlInterpreter = new LivySparkSQLInterpreter(properties);
}
@Test
public void testParseSQLOutput() {
// Empty sql output
// +---+---+
// | a| b|
// +---+---+
// +---+---+
List<String> rows = sqlInterpreter.parseSQLOutput("+---+---+\n" +
"| a| b|\n" +
"+---+---+\n" +
"+---+---+");
assertEquals(1, rows.size());
assertEquals("a\tb", rows.get(0));
// sql output with 2 rows
// +---+---+
// | a| b|
// +---+---+
// | 1| 1a|
// | 2| 2b|
// +---+---+
rows = sqlInterpreter.parseSQLOutput("+---+---+\n" +
"| a| b|\n" +
"+---+---+\n" +
"| 1| 1a|\n" +
"| 2| 2b|\n" +
"+---+---+");
assertEquals(3, rows.size());
assertEquals("a\tb", rows.get(0));
assertEquals("1\t1a", rows.get(1));
assertEquals("2\t2b", rows.get(2));
// sql output with 3 rows and showing "only showing top 3 rows"
// +---+---+
// | a| b|
// +---+---+
// | 1| 1a|
// | 2| 2b|
// | 3| 3c|
// +---+---+
rows = sqlInterpreter.parseSQLOutput("+---+---+\n" +
"| a| b|\n" +
"+---+---+\n" +
"| 1| 1a|\n" +
"| 2| 2b|\n" +
"| 3| 3c|\n" +
"+---+---+");
assertEquals(4, rows.size());
assertEquals("a\tb", rows.get(0));
assertEquals("1\t1a", rows.get(1));
assertEquals("2\t2b", rows.get(2));
assertEquals("3\t3c", rows.get(3));
}
}

View file

@ -20,9 +20,7 @@ package org.apache.zeppelin.shell;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@ -86,7 +84,8 @@ public class ShellInterpreter extends Interpreter {
try {
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(new PumpStreamHandler(outStream, outStream));
executor.setStreamHandler(new PumpStreamHandler(
contextInterpreter.out, contextInterpreter.out));
executor.setWatchdog(new ExecuteWatchdog(Long.valueOf(getProperty(TIMEOUT_PROPERTY))));
executors.put(contextInterpreter.getParagraphId(), executor);
int exitVal = executor.execute(cmdLine);
@ -100,7 +99,7 @@ public class ShellInterpreter extends Interpreter {
String message = outStream.toString();
if (exitValue == 143) {
code = Code.INCOMPLETE;
message += "Paragraph received a SIGTERM.\n";
message += "Paragraph received a SIGTERM\n";
LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
+ " stopped executing: " + message);
}

View file

@ -32,12 +32,17 @@ import org.junit.Test;
public class ShellInterpreterTest {
private ShellInterpreter shell;
private InterpreterContext context;
private InterpreterResult result;
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("shell.command.timeout.millisecs", "60000");
p.setProperty("shell.command.timeout.millisecs", "2000");
shell = new ShellInterpreter(p);
context = new InterpreterContext("", "1", null, "", "", null, null, null, null, null, null, null);
shell.open();
}
@After
@ -46,9 +51,6 @@ public class ShellInterpreterTest {
@Test
public void test() {
shell.open();
InterpreterContext context = new InterpreterContext("", "1", null, "", "", null, null, null, null, null, null, null);
InterpreterResult result = new InterpreterResult(Code.ERROR);
if (System.getProperty("os.name").startsWith("Windows")) {
result = shell.interpret("dir", context);
} else {
@ -63,16 +65,24 @@ public class ShellInterpreterTest {
@Test
public void testInvalidCommand(){
shell.open();
InterpreterContext context = new InterpreterContext("","1",null,"","",null,null,null,null,null,null,null);
InterpreterResult result = new InterpreterResult(Code.ERROR);
if (System.getProperty("os.name").startsWith("Windows")) {
result = shell.interpret("invalid_command\ndir",context);
result = shell.interpret("invalid_command\ndir", context);
} else {
result = shell.interpret("invalid_command\nls",context);
result = shell.interpret("invalid_command\nls", context);
}
assertEquals(InterpreterResult.Code.SUCCESS,result.code());
assertTrue(result.message().get(0).getData().contains("invalid_command"));
assertEquals(Code.SUCCESS, result.code());
assertTrue(shell.executors.isEmpty());
}
@Test
public void testShellTimeout() {
if (System.getProperty("os.name").startsWith("Windows")) {
result = shell.interpret("timeout 4", context);
} else {
result = shell.interpret("sleep 4", context);
}
assertEquals(Code.INCOMPLETE, result.code());
assertTrue(result.message().get(0).getData().contains("Paragraph received a SIGTERM"));
}
}

View file

@ -293,7 +293,9 @@ public class ZeppelinContext {
}
if (rows.length > maxResult) {
msg.append("\n<font color=red>Results are limited by " + maxResult + ".</font>");
msg.append("<!--TABLE_COMMENT-->");
msg.append("\n");
msg.append("<font color=red>Results are limited by " + maxResult + ".</font>");
}
sc.clearJobGroup();
return msg.toString();

View file

@ -321,7 +321,9 @@ while True :
# so that the last statement's evaluation will be printed to stdout
sc.setJobGroup(jobGroup, "Zeppelin")
code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
to_run_hooks = code.body[-nhooks:]
to_run_hooks = []
if (nhooks > 0):
to_run_hooks = code.body[-nhooks:]
to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
[code.body[-(nhooks + 1)]])

View file

@ -213,7 +213,7 @@ public class PySparkInterpreterMatplotlibTest {
// again but in a different color.
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret2 = pyspark.interpret("plt.show()", context);
assertNotSame(ret1.message().get(1).getData(), ret2.message().get(1).getData());
assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData());
}
@Test
@ -226,7 +226,7 @@ public class PySparkInterpreterMatplotlibTest {
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret = pyspark.interpret("plt.show()", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().toString(), Type.ANGULAR, ret.message().get(1).getType());
assertEquals(ret.message().toString(), Type.ANGULAR, ret.message().get(0).getType());
// Check if the figure data is in the Angular Object Registry
AngularObjectRegistry registry = context.getAngularObjectRegistry();

View file

@ -1,28 +0,0 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -xe
if [[ -n $LIVY_VER ]]; then
./testing/downloadLivy.sh
export LIVY_HOME=`pwd`/livy-server-$LIVY_VER
export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
fi
set +xe

View file

@ -99,7 +99,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
File destFile = new File(destPath, srcFile.getName());
if (!destFile.exists() || !FileUtils.contentEquals(srcFile, destFile)) {
FileUtils.copyFile(srcFile, destFile);
logger.info("copy {} to {}", srcFile.getAbsolutePath(), destPath);
logger.debug("copy {} to {}", srcFile.getAbsolutePath(), destPath);
}
}
}
@ -117,7 +117,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
if (!destFile.exists() || !FileUtils.contentEquals(srcFile, destFile)) {
FileUtils.copyFile(srcFile, destFile);
logger.info("copy {} to {}", srcFile.getAbsolutePath(), destPath);
logger.debug("copy {} to {}", srcFile.getAbsolutePath(), destPath);
}
}
@ -145,7 +145,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
List<File> files = new LinkedList<>();
for (ArtifactResult artifactResult : listOfArtifact) {
files.add(artifactResult.getArtifact().getFile());
logger.info("load {}", artifactResult.getArtifact().getFile().getAbsolutePath());
logger.debug("load {}", artifactResult.getArtifact().getFile().getAbsolutePath());
}
return files;

View file

@ -30,4 +30,6 @@ public class Constants {
public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914;
public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100;
}

View file

@ -45,6 +45,13 @@ public class InterpreterOutput extends OutputStream {
private final InterpreterOutputListener flushListener;
private final InterpreterOutputChangeListener changeListener;
private int size = 0;
// change static var to set interpreter output limit
// limit will be applied to all InterpreterOutput object.
// so we can expect the consistent behavior
public static int limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
public InterpreterOutput(InterpreterOutputListener flushListener) {
this.flushListener = flushListener;
changeListener = null;
@ -52,7 +59,8 @@ public class InterpreterOutput extends OutputStream {
}
public InterpreterOutput(InterpreterOutputListener flushListener,
InterpreterOutputChangeListener listener) throws IOException {
InterpreterOutputChangeListener listener)
throws IOException {
this.flushListener = flushListener;
this.changeListener = listener;
clear();
@ -74,6 +82,7 @@ public class InterpreterOutput extends OutputStream {
out.setResourceSearchPaths(resourceSearchPaths);
buffer.reset();
size = 0;
if (currentOut != null) {
currentOut.flush();
@ -125,6 +134,8 @@ public class InterpreterOutput extends OutputStream {
}
public void clear() {
size = 0;
truncated = false;
buffer.reset();
synchronized (resultMessageOutputs) {
@ -157,11 +168,31 @@ public class InterpreterOutput extends OutputStream {
boolean startOfTheNewLine = true;
boolean firstCharIsPercentSign = false;
boolean truncated = false;
@Override
public void write(int b) throws IOException {
InterpreterResultMessageOutput out;
if (truncated) {
return;
}
synchronized (resultMessageOutputs) {
currentOut = getCurrentOutput();
if (++size > limit) {
if (b == NEW_LINE_CHAR && currentOut != null) {
InterpreterResult.Type type = currentOut.getType();
if (type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE) {
setType(InterpreterResult.Type.TEXT);
getCurrentOutput().write("Output exceeds " + limit + ". Truncated.\n");
truncated = true;
return;
}
}
}
if (startOfTheNewLine) {
if (b == '%') {
startOfTheNewLine = false;
@ -175,7 +206,6 @@ public class InterpreterOutput extends OutputStream {
}
if (b == NEW_LINE_CHAR) {
currentOut = getCurrentOutput();
if (currentOut != null && currentOut.getType() == InterpreterResult.Type.TABLE) {
if (previousChar == NEW_LINE_CHAR) {
startOfTheNewLine = true;

View file

@ -63,6 +63,7 @@ public class RemoteInterpreter extends Interpreter {
private int port;
private String userName;
private Boolean isUserImpersonate;
private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
/**
* Remote interpreter and manage interpreter process
@ -70,7 +71,8 @@ public class RemoteInterpreter extends Interpreter {
public RemoteInterpreter(Properties property, String sessionKey, String className,
String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
int outputLimit) {
super(property);
this.sessionKey = sessionKey;
this.className = className;
@ -85,6 +87,7 @@ public class RemoteInterpreter extends Interpreter {
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
this.outputLimit = outputLimit;
}
@ -94,7 +97,8 @@ public class RemoteInterpreter extends Interpreter {
public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
int port, String localRepoPath, int connectTimeout, int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
int outputLimit) {
super(property);
this.sessionKey = sessionKey;
this.className = className;
@ -108,6 +112,7 @@ public class RemoteInterpreter extends Interpreter {
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
this.outputLimit = outputLimit;
}
@ -217,6 +222,8 @@ public class RemoteInterpreter extends Interpreter {
if (localRepoPath != null) {
property.put("zeppelin.interpreter.localRepo", localRepoPath);
}
property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
client.createInterpreter(groupId, sessionKey,
getClassName(), (Map) property, userName);
// Push angular object loaded from JSON file to remote interpreter

View file

@ -24,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
@ -109,7 +110,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
cmdLine.addArgument(localRepoDir, false);
executor = new DefaultExecutor();
executor.setStreamHandler(new PumpStreamHandler(new ProcessLogOutputStream(logger)));
ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
processOutput.setOutputStream(cmdOut);
executor.setStreamHandler(new PumpStreamHandler(processOutput));
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
@ -128,6 +134,15 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
if (!running) {
try {
cmdOut.flush();
} catch (IOException e) {
// nothing to do
}
throw new InterpreterException(new String(cmdOut.toByteArray()));
}
try {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
break;
@ -145,6 +160,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
}
}
}
processOutput.setOutputStream(null);
}
public void stop() {
@ -179,6 +195,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
private static class ProcessLogOutputStream extends LogOutputStream {
private Logger logger;
OutputStream out;
public ProcessLogOutputStream(Logger logger) {
this.logger = logger;
@ -188,5 +205,37 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
protected void processLine(String s, int i) {
this.logger.debug(s);
}
@Override
public void write(byte [] b) throws IOException {
super.write(b);
if (out != null) {
synchronized (this) {
if (out != null) {
out.write(b);
}
}
}
}
@Override
public void write(byte [] b, int offset, int len) throws IOException {
super.write(b, offset, len);
if (out != null) {
synchronized (this) {
if (out != null) {
out.write(b, offset, len);
}
}
}
}
public void setOutputStream(OutputStream out) {
synchronized (this) {
this.out = out;
}
}
}
}

View file

@ -162,6 +162,11 @@ public class RemoteInterpreterServer
interpreterGroup.setResourcePool(resourcePool);
String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
if (properties.containsKey("zeppelin.interpreter.output.limit")) {
InterpreterOutput.limit = Integer.parseInt(
properties.get("zeppelin.interpreter.output.limit"));
}
depLoader = new DependencyResolver(localRepoPath);
appLoader = new ApplicationLoader(resourcePool, depLoader);
}

View file

@ -202,7 +202,11 @@ public abstract class Job {
}
Throwable cause = ExceptionUtils.getRootCause(e);
return ExceptionUtils.getFullStackTrace(cause);
if (cause != null) {
return ExceptionUtils.getFullStackTrace(cause);
} else {
return ExceptionUtils.getFullStackTrace(e);
}
}
public Throwable getException() {

View file

@ -346,6 +346,10 @@ public class RemoteScheduler implements Scheduler {
lastStatus = Status.ERROR;
}
}
if (job.getException() != null) {
lastStatus = Status.ERROR;
}
job.setStatus(lastStatus);
if (listener != null) {

View file

@ -162,6 +162,33 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
assertEquals("val1\tval2\n", new String(out.getOutputAt(1).toByteArray()));
}
@Test
public void testTruncate() throws IOException {
// output is truncated after the new line
InterpreterOutput.limit = 3;
out = new InterpreterOutput(this);
// truncate text
out.write("%text hello\nworld\n");
assertEquals("hello", new String(out.getOutputAt(0).toByteArray()));
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
// truncate table
out = new InterpreterOutput(this);
out.write("%table key\tvalue\nhello\t100\nworld\t200\n");
assertEquals("key\tvalue", new String(out.getOutputAt(0).toByteArray()));
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
// does not truncate html
out = new InterpreterOutput(this);
out.write("%html hello\nworld\n");
out.flush();
assertEquals("hello\nworld\n", new String(out.getOutputAt(0).toByteArray()));
// restore default
InterpreterOutput.limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
}
@Override
public void onUpdateAll(InterpreterOutput out) {

View file

@ -27,6 +27,7 @@ import java.util.Properties;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.junit.Test;
@ -109,4 +110,21 @@ public class RemoteInterpreterProcessTest {
assertEquals(1, rip.reference(intpGroup, "anonymous", false));
assertEquals(true, rip.isRunning());
}
@Test
public void testPropagateError() throws TException, InterruptedException {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
"echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
10 * 1000, null, null);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
try {
assertEquals(1, rip.reference(intpGroup, "anonymous", false));
} catch (InterpreterException e) {
e.getMessage().contains("hello_world");
}
assertEquals(0, rip.referenceCount());
}
}

View file

@ -831,8 +831,8 @@ public class RemoteInterpreterTest {
assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData());
assertEquals(0, intp.interpret("getProperty MY_ENV1", context).message().size());
assertEquals(0, intp.interpret("getEnv my.property.1", context).message().size());
assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code());
assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code());
assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData());
intp.close();

View file

@ -37,6 +37,7 @@ import org.apache.zeppelin.helium.Helium;
import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.helium.HeliumVisualizationFactory;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
@ -98,6 +99,8 @@ public class ZeppelinServer extends Application {
this.depResolver = new DependencyResolver(
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
HeliumApplicationFactory heliumApplicationFactory = new HeliumApplicationFactory();
HeliumVisualizationFactory heliumVisualizationFactory;

View file

@ -105,7 +105,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
}.getType());
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
assertEquals(p.getStatus(), Job.Status.READY);
assertEquals(p.getStatus(), Job.Status.FINISHED);
// run non-blank paragraph
p.setText("test");

View file

@ -424,8 +424,11 @@ function InterpreterCtrl($rootScope, $scope, $http, baseUrlSrv, ngToast, $timeou
.success(function(data, status, headers, config) {
var index = _.findIndex($scope.interpreterSettings, {'id': settingId});
$scope.interpreterSettings[index] = data.body;
ngToast.info('Interpreter stopped. Will be lazily started on next run.');
}).error(function(data, status, headers, config) {
console.log('Error %o %o', status, data.message);
var errorMsg = (data !== null) ? data.message : 'Could not connect to server.';
console.log('Error %o %o', status, errorMsg);
ngToast.danger(errorMsg);
});
}
}

View file

@ -29,6 +29,7 @@ limitations under the License.
<select class="form-control input-sm"
ng-if="paragraph.settings.forms[formulaire.name].options && paragraph.settings.forms[formulaire.name].type != 'checkbox'"
ng-enter="runParagraph(getEditorValue())"
ng-model="paragraph.settings.params[formulaire.name]"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
name="{{formulaire.name}}"

View file

@ -37,12 +37,13 @@ export default class TableData {
for (var i = 0; i < textRows.length; i++) {
var textRow = textRows[i];
if (commentRow) {
comment += textRow;
continue;
}
if (textRow === '') {
if (textRow === '' || textRow === '<!--TABLE_COMMENT-->') {
if (rows.length > 0) {
commentRow = true;
}

View file

@ -85,6 +85,10 @@ function NavCtrl($scope, $rootScope, $http, $routeParams, $location,
return ($routeParams.noteId === noteId);
}
function listConfigurations() {
websocketMsgSrv.listConfigurations();
}
function loadNotes() {
websocketMsgSrv.getNoteList();
}
@ -135,6 +139,7 @@ function NavCtrl($scope, $rootScope, $http, $routeParams, $location,
});
$scope.$on('loginSuccess', function(event, param) {
listConfigurations();
loadNotes();
});
@ -153,4 +158,3 @@ function NavCtrl($scope, $rootScope, $http, $routeParams, $location,
});
}
}

View file

@ -572,6 +572,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
+ "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
+ "scalding,jdbc,hbase,bigquery,beam,pig,scio"),
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
// use specified notebook (id) as homescreen

View file

@ -1150,7 +1150,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
LazyOpenInterpreter intp = new LazyOpenInterpreter(
new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath,
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
userName, isUserImpersonate));
userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));
return intp;
}
@ -1175,7 +1175,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
RemoteInterpreter remoteInterpreter =
new RemoteInterpreter(property, interpreterSessionKey, className,
interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate,
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT));
remoteInterpreter.addEnv(env);
return new LazyOpenInterpreter(remoteInterpreter);

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -144,6 +145,33 @@ public class InterpreterSetting {
return key;
}
private boolean isEqualInterpreterKeyProcessKey(String refKey, String processKey) {
InterpreterOption option = getOption();
int validCount = 0;
if (getOption().isProcess()
&& !(option.perUserIsolated() == true && option.perNoteIsolated() == true)) {
List<String> processList = Arrays.asList(processKey.split(":"));
List<String> refList = Arrays.asList(refKey.split(":"));
if (refList.size() <= 1 || processList.size() <= 1) {
return refKey.equals(processKey);
}
if (processList.get(0).equals("") || processList.get(0).equals(refList.get(0))) {
validCount = validCount + 1;
}
if (processList.get(1).equals("") || processList.get(1).equals(refList.get(1))) {
validCount = validCount + 1;
}
return (validCount >= 2);
} else {
return refKey.equals(processKey);
}
}
private String getInterpreterSessionKey(String user, String noteId) {
InterpreterOption option = getOption();
String key;
@ -194,18 +222,19 @@ public class InterpreterSetting {
}
void closeAndRemoveInterpreterGroupByNoteId(String noteId) {
String key = getInterpreterProcessKey("", noteId);
InterpreterGroup groupToRemove = null;
String processKey = getInterpreterProcessKey("", noteId);
List<InterpreterGroup> closeToGroupList = new LinkedList<>();
InterpreterGroup groupKey;
for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
if (intpKey.contains(key)) {
if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) {
interpreterGroupWriteLock.lock();
groupToRemove = interpreterGroupRef.remove(intpKey);
groupKey = interpreterGroupRef.remove(intpKey);
interpreterGroupWriteLock.unlock();
closeToGroupList.add(groupKey);
}
}
if (groupToRemove != null) {
for (InterpreterGroup groupToRemove : closeToGroupList) {
groupToRemove.close();
}
}
@ -216,17 +245,19 @@ public class InterpreterSetting {
}
String processKey = getInterpreterProcessKey(user, "");
String sessionKey = getInterpreterSessionKey(user, "");
InterpreterGroup groupToRemove = null;
List<InterpreterGroup> groupToRemove = new LinkedList<>();
InterpreterGroup groupItem;
for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
if (intpKey.contains(processKey)) {
if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) {
interpreterGroupWriteLock.lock();
groupToRemove = interpreterGroupRef.remove(intpKey);
groupItem = interpreterGroupRef.remove(intpKey);
interpreterGroupWriteLock.unlock();
groupToRemove.add(groupItem);
}
}
if (groupToRemove != null) {
groupToRemove.close(sessionKey);
for (InterpreterGroup groupToClose : groupToRemove) {
groupToClose.close(sessionKey);
}
}
@ -243,7 +274,7 @@ public class InterpreterSetting {
List<InterpreterGroup> groupToRemove = new LinkedList<>();
InterpreterGroup groupItem;
for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
if (intpKey.contains(key)) {
if (isEqualInterpreterKeyProcessKey(intpKey, key)) {
interpreterGroupWriteLock.lock();
groupItem = interpreterGroupRef.remove(intpKey);
interpreterGroupWriteLock.unlock();

View file

@ -222,7 +222,7 @@ public class Note implements Serializable, ParagraphJobListener {
public void initializeJobListenerForParagraph(Paragraph paragraph) {
final Note paragraphNote = paragraph.getNote();
if (paragraphNote.getId().equals(this.getId())) {
if (!paragraphNote.getId().equals(this.getId())) {
throw new IllegalArgumentException(
format("The paragraph %s from note %s " + "does not belong to note %s", paragraph.getId(),
paragraphNote.getId(), this.getId()));
@ -555,13 +555,14 @@ public class Note implements Serializable, ParagraphJobListener {
*/
public void run(String paragraphId) {
Paragraph p = getParagraph(paragraphId);
p.setListener(jobListenerFactory.getParagraphJobListener(this));
if (p.isBlankParagraph()) {
logger.info("skip to run blank paragraph. {}", p.getId());
p.setStatus(Job.Status.FINISHED);
return;
}
p.setListener(jobListenerFactory.getParagraphJobListener(this));
String requiredReplName = p.getRequiredReplName();
Interpreter intp = factory.getInterpreter(p.getUser(), getId(), requiredReplName);

View file

@ -35,13 +35,20 @@ public class MockInterpreter11 extends Interpreter{
public MockInterpreter11(Properties property) {
super(property);
}
boolean open;
@Override
public void open() {
open = true;
}
@Override
public void close() {
open = false;
}
public boolean isOpen() {
return open;
}
@Override

View file

@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
@ -28,13 +29,17 @@ import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter11;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static java.lang.Thread.sleep;
import static org.junit.Assert.*;
public class NoteInterpreterLoaderTest {
@ -117,6 +122,11 @@ public class NoteInterpreterLoaderTest {
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
// invalid close
factory.closeNote("user", "note");
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "shared_process").get("noteA"));
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "shared_process").get("noteB"));
// when
factory.closeNote("user", "noteA");
factory.closeNote("user", "noteB");
@ -160,6 +170,51 @@ public class NoteInterpreterLoaderTest {
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
}
@Test
public void testNoteInterpreterCloseForAll() throws IOException {
factory.setInterpreters("user", "FitstNote", factory.getDefaultInterpreterSettingList());
factory.getInterpreterSettings("FitstNote").get(0).getOption().setPerNote(InterpreterOption.SCOPED);
factory.setInterpreters("user", "yourFirstNote", factory.getDefaultInterpreterSettingList());
factory.getInterpreterSettings("yourFirstNote").get(0).getOption().setPerNote(InterpreterOption.ISOLATED);
// interpreters are not created before accessing it
assertNull(factory.getInterpreterSettings("FitstNote").get(0).getInterpreterGroup("user", "FitstNote").get("FitstNote"));
assertNull(factory.getInterpreterSettings("yourFirstNote").get(0).getInterpreterGroup("user", "yourFirstNote").get("yourFirstNote"));
Interpreter firstNoteIntp = factory.getInterpreter("user", "FitstNote", "group1.mock1");
Interpreter yourFirstNoteIntp = factory.getInterpreter("user", "yourFirstNote", "group1.mock1");
firstNoteIntp.open();
yourFirstNoteIntp.open();
assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
factory.closeNote("user", "FitstNote");
assertFalse(((LazyOpenInterpreter)firstNoteIntp).isOpen());
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
//reopen
firstNoteIntp.open();
assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
// invalid check
factory.closeNote("invalid", "Note");
assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
// invalid contains value check
factory.closeNote("u", "Note");
assertTrue(((LazyOpenInterpreter)firstNoteIntp).isOpen());
assertTrue(((LazyOpenInterpreter)yourFirstNoteIntp).isOpen());
}
private void delete(File file){
if(file.isFile()) file.delete();

View file

@ -282,7 +282,7 @@ public class NotebookTest implements JobListenerFactory{
note.run(p1.getId());
Thread.sleep(2 * 1000);
assertEquals(p1.getStatus(), Status.READY);
assertEquals(p1.getStatus(), Status.FINISHED);
assertNull(p1.getDateStarted());
notebook.removeNote(note.getId(), anonymous);
}