ZEPPELIN-2898. Support Yarn-Cluster for Spark Interpreter

This commit is contained in:
Jeff Zhang 2017-09-04 21:54:56 +08:00
parent 3fb67f9cc5
commit 9da7c4b8c2
33 changed files with 1012 additions and 199 deletions

View file

@ -66,7 +66,8 @@ matrix:
# Several tests were excluded from this configuration due to the following issues:
# HeliumApplicationFactoryTest - https://issues.apache.org/jira/browse/ZEPPELIN-2470
# After issues are fixed these tests need to be included back by removing them from the "-Dtests.to.exclude" property
- jdk: "oraclejdk8"
- sudo: required
jdk: "oraclejdk8"
dist: precise
env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Pweb-ci -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org.apache.zeppelin.spark.*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
@ -143,6 +144,7 @@ before_script:
- if [[ -n $LIVY_VER ]]; then ./testing/downloadLivy.sh $LIVY_VER; fi
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-$LIVY_VER-bin; fi
- if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
- export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
- echo "export ZEPPELIN_HELIUM_REGISTRY=helium" >> conf/zeppelin-env.sh
- tail conf/zeppelin-env.sh

View file

@ -122,7 +122,11 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
export JAVA_OPTS
JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
if [[ -z "${SPARK_YARN_CLUSTER}" ]]; then
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
else
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
fi
export JAVA_INTP_OPTS

View file

@ -143,7 +143,12 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}"
fi
unset PYSPARKPATH
export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}"
fi
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
else
# autodetect HADOOP_CONF_HOME by heuristic
if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
@ -152,13 +157,8 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
export HADOOP_CONF_DIR="/etc/hadoop/conf"
fi
fi
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
fi
export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}"
fi
elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then
if [[ -n "${HBASE_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HBASE_CONF_DIR}"

View file

@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
log4j.rootLogger = INFO, stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n

View file

@ -424,7 +424,7 @@ It creates separated SparkContext per each notebook in `isolated` mode.
## IPython support
By default, zeppelin would use IPython in `pyspark` when IPython is available, Otherwise it would fall back to the original PySpark implementation.
If you don't want to use IPython, then you can set `zeppelin.spark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
If you don't want to use IPython, then you can set `zeppelin.pyspark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
[Python Interpreter](python.html)

View file

@ -78,6 +78,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
private long ipythonLaunchTimeout;
private String additionalPythonPath;
private String additionalPythonInitFile;
private boolean useBuiltinPy4j = true;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
@ -92,6 +93,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
* @param additionalPythonPath
*/
public void setAdditionalPythonPath(String additionalPythonPath) {
LOGGER.info("setAdditionalPythonPath: " + additionalPythonPath);
this.additionalPythonPath = additionalPythonPath;
}
@ -105,6 +107,10 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
this.additionalPythonInitFile = additionalPythonInitFile;
}
public void setAddBulitinPy4j(boolean add) {
this.useBuiltinPy4j = add;
}
@Override
public void open() {
try {
@ -113,6 +119,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
return;
}
pythonExecutable = getProperty().getProperty("zeppelin.python", "python");
LOGGER.info("Python Exec: " + pythonExecutable);
ipythonLaunchTimeout = Long.parseLong(
getProperty().getProperty("zeppelin.ipython.launch.timeout", "30000"));
this.zeppelinContext = new PythonZeppelinContext(
@ -218,29 +225,34 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchDog);
String py4jLibPath = null;
if (System.getenv("ZEPPELIN_HOME") != null) {
py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
+ PythonInterpreter.ZEPPELIN_PY4JPATH;
} else {
Path workingPath = Paths.get("..").toAbsolutePath();
py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
}
if (additionalPythonPath != null) {
// put the py4j at the end, because additionalPythonPath may already contain py4j.
// e.g. PySparkInterpreter
additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
} else {
additionalPythonPath = py4jLibPath;
if (useBuiltinPy4j) {
String py4jLibPath = null;
if (System.getenv("ZEPPELIN_HOME") != null) {
py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
+ PythonInterpreter.ZEPPELIN_PY4JPATH;
} else {
Path workingPath = Paths.get("..").toAbsolutePath();
py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
}
if (additionalPythonPath != null) {
// put the py4j at the end, because additionalPythonPath may already contain py4j.
// e.g. PySparkInterpreter
additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
} else {
additionalPythonPath = py4jLibPath;
}
}
Map<String, String> envs = EnvironmentUtils.getProcEnvironment();
if (envs.containsKey("PYTHONPATH")) {
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
if (additionalPythonPath != null) {
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
}
} else {
envs.put("PYTHONPATH", additionalPythonPath);
}
LOGGER.debug("PYTHONPATH: " + envs.get("PYTHONPATH"));
LOGGER.info("PYTHONPATH: " + envs.get("PYTHONPATH"));
executor.execute(cmd, envs, this);
// wait until IPython kernel is started or timeout

View file

@ -44,12 +44,15 @@ public class IPySparkInterpreter extends IPythonInterpreter {
@Override
public void open() {
getProperty().setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
property.setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
sparkInterpreter = getSparkInterpreter();
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
String additionalPythonPath = conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
":../interpreter/lib/python";
setAdditionalPythonPath(additionalPythonPath);
// only set PYTHONPATH in local or yarn-client mode.
// yarn-cluster will setup PYTHONPATH automatically.
if (!conf.get("spark.submit.deployMode").equals("cluster")) {
setAdditionalPythonPath(PythonUtils.sparkPythonPath());
setAddBulitinPy4j(false);
}
setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
super.open();
}

View file

@ -115,7 +115,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
public void open() {
// try IPySparkInterpreter first
iPySparkInterpreter = getIPySparkInterpreter();
if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true") &&
if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true") &&
iPySparkInterpreter.checkIPythonPrerequisite()) {
try {
iPySparkInterpreter.open();
@ -133,7 +133,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
iPySparkInterpreter = null;
if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true")) {
if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) {
// don't print it when it is in testing, just for easy output check in test.
try {
InterpreterContext.get().out.write(("IPython is not available, " +
@ -202,13 +202,18 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
private Map setupPySparkEnv() throws IOException{
private Map setupPySparkEnv() throws IOException {
Map env = EnvironmentUtils.getProcEnvironment();
if (!env.containsKey("PYTHONPATH")) {
SparkConf conf = getSparkConf();
env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
":../interpreter/lib/python");
// only set PYTHONPATH in local or yarn-client mode.
// yarn-cluster will setup PYTHONPATH automatically.
SparkConf conf = getSparkConf();
if (!conf.get("spark.submit.deployMode", "client").equals("cluster")) {
if (!env.containsKey("PYTHONPATH")) {
env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
} else {
env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
}
}
// get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT
@ -223,7 +228,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
LOGGER.debug("PYTHONPATH: " + env.get("PYTHONPATH"));
LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH"));
return env;
}
@ -251,6 +256,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
gatewayServer.start();
String pythonExec = getPythonExec(property);
LOGGER.info("pythonExec: " + pythonExec);
CommandLine cmd = CommandLine.parse(pythonExec);
cmd.addArgument(scriptPath, false);
cmd.addArgument(Integer.toString(port), false);

View file

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.List;
/**
* Util class for PySpark
*/
public class PythonUtils {
/**
* Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from ZEPPELIN_HOME
* when it is embedded mode.
*
* This method will called in zeppelin server process and spark driver process when it is
* local or yarn-client mode.
*/
public static String sparkPythonPath() {
List<String> pythonPath = new ArrayList<String>();
String sparkHome = System.getenv("SPARK_HOME");
String zeppelinHome = System.getenv("ZEPPELIN_HOME");
if (zeppelinHome == null) {
zeppelinHome = new File("..").getAbsolutePath();
}
if (sparkHome != null) {
// non-embedded mode when SPARK_HOME is specified.
File pyspark = new File(sparkHome, "python/lib/pyspark.zip");
if (!pyspark.exists()) {
throw new RuntimeException("No pyspark.zip found under " + sparkHome + "/python/lib");
}
pythonPath.add(pyspark.getAbsolutePath());
File[] py4j = new File(sparkHome + "/python/lib").listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("py4j");
}
});
if (py4j.length == 0) {
throw new RuntimeException("No py4j files found under " + sparkHome + "/python/lib");
} else if (py4j.length > 1) {
throw new RuntimeException("Multiple py4j files found under " + sparkHome + "/python/lib");
} else {
pythonPath.add(py4j[0].getAbsolutePath());
}
} else {
// embedded mode
File pyspark = new File(zeppelinHome, "interpreter/spark/pyspark/pyspark.zip");
if (!pyspark.exists()) {
throw new RuntimeException("No pyspark.zip found: " + pyspark.getAbsolutePath());
}
pythonPath.add(pyspark.getAbsolutePath());
File[] py4j = new File(zeppelinHome, "interpreter/spark/pyspark").listFiles(
new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("py4j");
}
});
if (py4j.length == 0) {
throw new RuntimeException("No py4j files found under " + zeppelinHome +
"/interpreter/spark/pyspark");
} else if (py4j.length > 1) {
throw new RuntimeException("Multiple py4j files found under " + sparkHome +
"/interpreter/spark/pyspark");
} else {
pythonPath.add(py4j[0].getAbsolutePath());
}
}
// add ${ZEPPELIN_HOME}/interpreter/lib/python for all the cases
pythonPath.add(zeppelinHome + "/interpreter/lib/python");
return StringUtils.join(pythonPath, ":");
}
}

View file

@ -351,7 +351,11 @@ public class SparkInterpreter extends Interpreter {
}
public boolean isYarnMode() {
return getProperty("master").startsWith("yarn");
String master = getProperty("master");
if (master == null) {
master = getProperty().getProperty("spark.master", "local[*]");
}
return master.startsWith("yarn");
}
/**
@ -371,11 +375,6 @@ public class SparkInterpreter extends Interpreter {
conf.set("spark.executor.uri", execUri);
}
conf.set("spark.scheduler.mode", "FAIR");
conf.setMaster(getProperty("master"));
if (isYarnMode()) {
conf.set("master", "yarn");
conf.set("spark.submit.deployMode", "client");
}
Properties intpProperty = getProperty();
for (Object k : intpProperty.keySet()) {
@ -394,8 +393,6 @@ public class SparkInterpreter extends Interpreter {
}
}
setupConfForPySpark(conf);
setupConfForSparkR(conf);
Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
@ -529,96 +526,10 @@ public class SparkInterpreter extends Interpreter {
}
}
}
setupConfForPySpark(conf);
setupConfForSparkR(conf);
SparkContext sparkContext = new SparkContext(conf);
return sparkContext;
}
private void setupConfForPySpark(SparkConf conf) {
Object pysparkBaseProperty =
new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue();
String pysparkBasePath = pysparkBaseProperty != null ? pysparkBaseProperty.toString() : null;
File pysparkPath;
if (null == pysparkBasePath) {
pysparkBasePath =
new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../")
.getValue().toString();
pysparkPath = new File(pysparkBasePath,
"interpreter" + File.separator + "spark" + File.separator + "pyspark");
} else {
pysparkPath = new File(pysparkBasePath,
"python" + File.separator + "lib");
}
//Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist
//TODO(zjffdu), this is not maintainable when new version is added.
String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip",
"py4j-0.10.1-src.zip", "py4j-0.10.3-src.zip", "py4j-0.10.4-src.zip"};
ArrayList<String> pythonLibUris = new ArrayList<>();
for (String lib : pythonLibs) {
File libFile = new File(pysparkPath, lib);
if (libFile.exists()) {
pythonLibUris.add(libFile.toURI().toString());
}
}
pythonLibUris.trimToSize();
// Distribute two libraries(pyspark.zip and py4j-*.zip) to workers
// when spark version is less than or equal to 1.4.1
if (pythonLibUris.size() == 2) {
try {
String confValue = conf.get("spark.yarn.dist.files");
conf.set("spark.yarn.dist.files", confValue + "," + Joiner.on(",").join(pythonLibUris));
} catch (NoSuchElementException e) {
conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris));
}
if (!useSparkSubmit()) {
conf.set("spark.files", conf.get("spark.yarn.dist.files"));
}
conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs));
conf.set("spark.submit.pyFiles", Joiner.on(",").join(pythonLibUris));
}
// Distributes needed libraries to workers
// when spark version is greater than or equal to 1.5.0
if (isYarnMode()) {
conf.set("spark.yarn.isPython", "true");
}
}
private void setupConfForSparkR(SparkConf conf) {
Object sparkRBaseProperty =
new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue();
String sparkRBasePath = sparkRBaseProperty != null ? sparkRBaseProperty.toString() : null;
File sparkRPath;
if (null == sparkRBasePath) {
sparkRBasePath =
new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../")
.getValue().toString();
sparkRPath = new File(sparkRBasePath,
"interpreter" + File.separator + "spark" + File.separator + "R");
} else {
sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib");
}
sparkRPath = new File(sparkRPath, "sparkr.zip");
if (sparkRPath.exists() && sparkRPath.isFile()) {
String archives = null;
if (conf.contains("spark.yarn.dist.archives")) {
archives = conf.get("spark.yarn.dist.archives");
}
if (archives != null) {
archives = archives + "," + sparkRPath + "#sparkr";
} else {
archives = sparkRPath + "#sparkr";
}
conf.set("spark.yarn.dist.archives", archives);
} else {
logger.warn("sparkr.zip is not found, sparkr may not work.");
}
}
static final String toString(Object o) {
return (o instanceof String) ? (String) o : "";
}

View file

@ -150,9 +150,9 @@
"description": "Python command to run pyspark with",
"type": "string"
},
"zeppelin.spark.useIPython": {
"zeppelin.pyspark.useIPython": {
"envName": null,
"propertyName": "zeppelin.spark.useIPython",
"propertyName": "zeppelin.pyspark.useIPython",
"defaultValue": true,
"description": "whether use IPython when it is available",
"type": "checkbox"

View file

@ -59,6 +59,7 @@ public class IPySparkInterpreterTest {
Properties p = new Properties();
p.setProperty("spark.master", "local[4]");
p.setProperty("master", "local[4]");
p.setProperty("spark.submit.deployMode", "client");
p.setProperty("spark.app.name", "Zeppelin Test");
p.setProperty("zeppelin.spark.useHiveContext", "true");
p.setProperty("zeppelin.spark.maxResult", "1000");

View file

@ -89,7 +89,7 @@ public class PySparkInterpreterMatplotlibTest {
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
p.setProperty("zeppelin.spark.useIPython", "false");
p.setProperty("zeppelin.pyspark.useIPython", "false");
return p;
}

View file

@ -59,7 +59,7 @@ public class PySparkInterpreterTest {
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
p.setProperty("zeppelin.spark.useIPython", "false");
p.setProperty("zeppelin.pyspark.useIPython", "false");
return p;
}

View file

@ -17,6 +17,8 @@
package org.apache.zeppelin.interpreter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -34,7 +36,7 @@ import org.apache.zeppelin.resource.ResourcePool;
public class InterpreterContext {
private static final ThreadLocal<InterpreterContext> threadIC = new ThreadLocal<>();
public final InterpreterOutput out;
public InterpreterOutput out;
public static InterpreterContext get() {
return threadIC.get();
@ -48,21 +50,46 @@ public class InterpreterContext {
threadIC.remove();
}
private final String noteId;
private final String replName;
private final String paragraphTitle;
private final String paragraphId;
private final String paragraphText;
private String noteId;
private String replName;
private String paragraphTitle;
private String paragraphId;
private String paragraphText;
private AuthenticationInfo authenticationInfo;
private final Map<String, Object> config;
private GUI gui;
private Map<String, Object> config = new HashMap<>();
private GUI gui = new GUI();
private AngularObjectRegistry angularObjectRegistry;
private ResourcePool resourcePool;
private List<InterpreterContextRunner> runners;
private List<InterpreterContextRunner> runners = new ArrayList<>();
private String className;
private RemoteEventClientWrapper client;
private RemoteWorksController remoteWorksController;
private final Map<String, Integer> progressMap;
private Map<String, Integer> progressMap;
/**
* Builder class for InterpreterContext
*/
public static class Builder {
private InterpreterContext context = new InterpreterContext();
public Builder setNoteId(String noteId) {
context.noteId = noteId;
return this;
}
public Builder setParagraphId(String paragraphId) {
context.paragraphId = paragraphId;
return this;
}
public InterpreterContext getContext() {
return context;
}
}
private InterpreterContext() {
}
// visible for testing
public InterpreterContext(String noteId,

View file

@ -43,7 +43,7 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStop() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
assertEquals(false, server.isRunning());
server.start();

View file

@ -93,6 +93,18 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
</exclusions>
</dependency>

View file

@ -83,7 +83,7 @@ public class AuthenticationIT extends AbstractZeppelinIT {
}
try {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
File file = new File(shiroPath);

View file

@ -82,7 +82,7 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT {
return;
}
try {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
interpreterOptionPath = conf.getRelativeDir(String.format("%s/interpreter.json", conf.getConfDir()));

View file

@ -74,7 +74,7 @@ public class PersonalizeActionsIT extends AbstractZeppelinIT {
return;
}
try {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
File file = new File(shiroPath);

View file

@ -126,8 +126,8 @@ public abstract class AbstractTestRestApi {
private static void start(boolean withAuth) throws Exception {
if (!wasRunning) {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), "../zeppelin-web/dist");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), new File("../zeppelin-web/dist").getAbsolutePath());
// some test profile does not build zeppelin-web.
// to prevent zeppelin starting up fail, create zeppelin-web/dist directory
@ -211,7 +211,7 @@ public abstract class AbstractTestRestApi {
// set spark home for pyspark
sparkProperties.put("spark.home",
new InterpreterProperty("spark.home", getSparkHome(), InterpreterPropertyType.TEXTAREA.getValue()));
sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
sparkIntpSetting.setProperties(sparkProperties);
pySpark = true;
@ -234,7 +234,7 @@ public abstract class AbstractTestRestApi {
new InterpreterProperty("spark.home", sparkHome, InterpreterPropertyType.TEXTAREA.getValue()));
sparkProperties.put("zeppelin.spark.useHiveContext",
new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue()));
sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
pySpark = true;
sparkR = true;

View file

@ -36,7 +36,7 @@
<properties>
<!--library versions-->
<hadoop.version>2.6.0</hadoop.version>
<hadoop.version>2.7.3</hadoop.version>
<commons.lang3.version>3.4</commons.lang3.version>
<commons.vfs2.version>2.0</commons.vfs2.version>
<aws.sdk.s3.version>1.10.62</aws.sdk.s3.version>
@ -214,12 +214,6 @@
</exclusions>
</dependency>
<dependency> <!-- because there are two of them above -->
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
<version>${xml.apis.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jgit</groupId>
<artifactId>org.eclipse.jgit</artifactId>
@ -298,22 +292,22 @@
<version>${commons.lang3.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
@ -323,11 +317,14 @@
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
@ -348,10 +345,6 @@
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jgit</groupId>
<artifactId>org.eclipse.jgit</artifactId>
@ -378,6 +371,265 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-webdav</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jgit</groupId>
<artifactId>org.eclipse.jgit</artifactId>
</exclusion>
<exclusion>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<!--<exclusion>-->
<!--<groupId>com.sun.jersey</groupId>-->
<!--<artifactId>jersey-core</artifactId>-->
<!--</exclusion>-->
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</exclusion>
<!--<exclusion>-->
<!--<groupId>com.sun.jersey</groupId>-->
<!--<artifactId>jersey-server</artifactId>-->
<!--</exclusion>-->
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-webdav</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jgit</groupId>
<artifactId>org.eclipse.jgit</artifactId>
</exclusion>
<exclusion>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${hadoop.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<!--<exclusion>-->
<!--<groupId>com.sun.jersey</groupId>-->
<!--<artifactId>jersey-json</artifactId>-->
<!--</exclusion>-->
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-webdav</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jgit</groupId>
<artifactId>org.eclipse.jgit</artifactId>
</exclusion>
<exclusion>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-spark_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
@ -392,6 +644,12 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkMode>always</forkMode>
<systemProperties>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
</systemProperties>
<environmentVariables>
<!--<ZEPPELIN_HOME>..</ZEPPELIN_HOME>-->
</environmentVariables>
</configuration>
</plugin>
</plugins>

View file

@ -493,7 +493,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
}
public String getConfDir() {
return getString(ConfVars.ZEPPELIN_CONF_DIR);
return getRelativeDir(ConfVars.ZEPPELIN_CONF_DIR);
}
public List<String> getAllowedOrigins()

View file

@ -18,6 +18,7 @@
package org.apache.zeppelin.interpreter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.gson.JsonArray;
@ -26,6 +27,7 @@ import com.google.gson.JsonObject;
import com.google.gson.annotations.SerializedName;
import com.google.gson.internal.StringMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.dep.Dependency;
import org.apache.zeppelin.dep.DependencyResolver;
@ -44,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
@ -55,6 +58,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -452,7 +456,9 @@ public class InterpreterSetting {
Properties jProperties = new Properties();
Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties;
for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) {
jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
if (entry.getValue().getValue() != null) {
jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
}
}
if (!jProperties.containsKey("zeppelin.interpreter.output.limit")) {
@ -707,22 +713,133 @@ public class InterpreterSetting {
interpreterRunner != null ? interpreterRunner.getPath() :
conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
interpreterDir, localRepoPath,
getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
getEnvFromInterpreterProperty(), connectTimeout,
remoteInterpreterProcessListener, appEventListener, group);
}
return remoteInterpreterProcess;
}
private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
Map<String, String> env = new HashMap<>();
for (Object key : property.keySet()) {
if (RemoteInterpreterUtils.isEnvString((String) key)) {
env.put((String) key, property.getProperty((String) key));
private boolean isSparkConf(String key, String value) {
return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
}
private Map<String, String> getEnvFromInterpreterProperty() {
Map<String, String> env = new HashMap<String, String>();
Properties javaProperties = getJavaProperties();
Properties sparkProperties = new Properties();
String sparkMaster = getSparkMaster();
for (String key : javaProperties.stringPropertyNames()) {
if (RemoteInterpreterUtils.isEnvString(key)) {
env.put(key, javaProperties.getProperty(key));
}
if (isSparkConf(key, javaProperties.getProperty(key))) {
sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key)));
}
}
setupPropertiesForPySpark(sparkProperties);
setupPropertiesForSparkR(sparkProperties, javaProperties.getProperty("SPARK_HOME"));
if (isYarnMode() && getDeployMode().equals("cluster")) {
env.put("SPARK_YARN_CLUSTER", "true");
}
StringBuilder sparkConfBuilder = new StringBuilder();
if (sparkMaster != null) {
sparkConfBuilder.append(" --master " + sparkMaster);
}
if (isYarnMode() && getDeployMode().equals("cluster")) {
sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties");
}
for (String name : sparkProperties.stringPropertyNames()) {
sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
}
env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
LOGGER.debug("getEnvFromInterpreterProperty: " + env);
return env;
}
private void setupPropertiesForPySpark(Properties sparkProperties) {
if (isYarnMode()) {
sparkProperties.setProperty("spark.yarn.isPython", "true");
}
}
private void mergeSparkProperty(Properties sparkProperties, String propertyName,
String propertyValue) {
if (sparkProperties.containsKey(propertyName)) {
String oldPropertyValue = sparkProperties.getProperty(propertyName);
sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
} else {
sparkProperties.setProperty(propertyName, propertyValue);
}
}
private void setupPropertiesForSparkR(Properties sparkProperties,
String sparkHome) {
File sparkRBasePath = null;
if (sparkHome == null) {
if (!getSparkMaster().startsWith("local")) {
throw new RuntimeException("SPARK_HOME is not specified for non-local mode");
}
String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
sparkRBasePath = new File(zeppelinHome,
"interpreter" + File.separator + "spark" + File.separator + "R");
} else {
sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
}
File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
if (sparkRPath.exists() && sparkRPath.isFile()) {
mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
} else {
LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
}
}
private String getSparkMaster() {
String master = getJavaProperties().getProperty("master");
if (master == null) {
master = getJavaProperties().getProperty("spark.master", "local[*]");
}
return master;
}
private String getDeployMode() {
String master = getSparkMaster();
if (master.equals("yarn-client")) {
return "client";
} else if (master.equals("yarn-cluster")) {
return "cluster";
} else if (master.startsWith("local")) {
return "client";
} else {
String deployMode = getJavaProperties().getProperty("spark.submit.deployMode");
if (deployMode == null) {
throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
"is not specified");
}
if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
}
return deployMode;
}
}
private boolean isYarnMode() {
return getSparkMaster().startsWith("yarn");
}
private String toShellFormat(String value) {
if (value.contains("\'") && value.contains("\"")) {
throw new RuntimeException("Spark property value could not contain both \" and '");
} else if (value.contains("\'")) {
return "\"" + value + "\"";
} else {
return "\'" + value + "\'";
}
}
private List<Interpreter> getOrCreateSession(String user, String noteId) {
ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId);
Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " +

View file

@ -142,7 +142,7 @@ public class InterpreterSettingManager {
this.interpreterDirPath = Paths.get(conf.getInterpreterDir());
LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath);
this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath());
LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath);
LOGGER.debug("InterpreterSettingPath: {}", interpreterSettingPath);
this.dependencyResolver = new DependencyResolver(
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
this.interpreterRepositories = dependencyResolver.getRepos();
@ -283,7 +283,7 @@ public class InterpreterSettingManager {
private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
String interpreterJson) throws IOException {
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
ClassLoader tempClassLoader = new URLClassLoader(urls, null);
URL url = tempClassLoader.getResource(interpreterJson);
if (url == null) {
@ -392,6 +392,17 @@ public class InterpreterSettingManager {
return settings;
}
public InterpreterSetting getInterpreterSettingByName(String name) {
synchronized (interpreterSettings) {
for (InterpreterSetting setting : interpreterSettings.values()) {
if (setting.getName().equals(name)) {
return setting;
}
}
}
throw new RuntimeException("No such interpreter setting: " + name);
}
public ManagedInterpreterGroup getInterpreterGroupById(String groupId) {
for (InterpreterSetting setting : interpreterSettings.values()) {
ManagedInterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId);

View file

@ -254,7 +254,11 @@ public class RemoteInterpreterEventPoller extends Thread {
try {
clearUnreadEvents(interpreterProcess.getClient());
} catch (Exception e1) {
logger.error("Can't get RemoteInterpreterEvent", e1);
if (shutdown) {
logger.error("Can not get RemoteInterpreterEvent because it is shutdown.");
} else {
logger.error("Can't get RemoteInterpreterEvent", e1);
}
}
if (appendFuture != null) {
appendFuture.cancel(true);

View file

@ -226,6 +226,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
}
public void stop() {
// shutdown EventPoller first.
this.remoteInterpreterEventPoller.shutdown();
if (callbackServer.isServing()) {
callbackServer.stop();
}

View file

@ -32,7 +32,7 @@ public abstract class RemoteInterpreterProcess {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
private GenericObjectPool<Client> clientPool;
private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
protected final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
private int connectTimeout;

View file

@ -25,14 +25,10 @@ import static org.mockito.Mockito.mock;
*/
public abstract class AbstractInterpreterTest {
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractInterpreterTest.class);
private static final String INTERPRETER_SCRIPT =
System.getProperty("os.name").startsWith("Windows") ?
"../bin/interpreter.cmd" :
"../bin/interpreter.sh";
protected InterpreterSettingManager interpreterSettingManager;
protected InterpreterFactory interpreterFactory;
protected File testRootDir;
protected File zeppelinHome;
protected File interpreterDir;
protected File confDir;
protected File notebookDir;
@ -41,12 +37,11 @@ public abstract class AbstractInterpreterTest {
@Before
public void setUp() throws Exception {
// copy the resources files to a temp folder
testRootDir = new File(System.getProperty("java.io.tmpdir") + "/Zeppelin_Test_" + System.currentTimeMillis());
testRootDir.mkdirs();
LOGGER.info("Create tmp directory: {} as root folder of ZEPPELIN_INTERPRETER_DIR & ZEPPELIN_CONF_DIR", testRootDir.getAbsolutePath());
interpreterDir = new File(testRootDir, "interpreter");
confDir = new File(testRootDir, "conf");
notebookDir = new File(testRootDir, "notebook");
zeppelinHome = new File("..");
LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
interpreterDir = new File(zeppelinHome, "interpreter_" + getClass().getSimpleName());
confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName());
notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName());
interpreterDir.mkdirs();
confDir.mkdirs();
@ -55,10 +50,10 @@ public abstract class AbstractInterpreterTest {
FileUtils.copyDirectory(new File("src/test/resources/interpreter"), interpreterDir);
FileUtils.copyDirectory(new File("src/test/resources/conf"), confDir);
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(), INTERPRETER_SCRIPT);
conf = new ZeppelinConfiguration();
interpreterSettingManager = new InterpreterSettingManager(conf,
@ -69,6 +64,8 @@ public abstract class AbstractInterpreterTest {
@After
public void tearDown() throws Exception {
interpreterSettingManager.close();
FileUtils.deleteDirectory(testRootDir);
FileUtils.deleteDirectory(interpreterDir);
FileUtils.deleteDirectory(confDir);
FileUtils.deleteDirectory(notebookDir);
}
}

View file

@ -0,0 +1,114 @@
package org.apache.zeppelin.interpreter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
/**
*
* Util class for creating a Mini Hadoop cluster in local machine to test scenarios that needs
* hadoop cluster.
*/
public class MiniHadoopCluster {
private static Logger LOGGER = LoggerFactory.getLogger(MiniHadoopCluster.class);
private Configuration hadoopConf;
private MiniDFSCluster dfsCluster;
private MiniYARNCluster yarnCluster;
private String configPath = new File("target/tests/hadoop_conf").getAbsolutePath();
@BeforeClass
public void start() throws IOException {
LOGGER.info("Starting MiniHadoopCluster ...");
this.hadoopConf = new Configuration();
new File(configPath).mkdirs();
// start MiniDFSCluster
this.dfsCluster = new MiniDFSCluster.Builder(hadoopConf)
.numDataNodes(2)
.format(true)
.waitSafeMode(true)
.build();
this.dfsCluster.waitActive();
saveConfig(hadoopConf, configPath + "/core-site.xml");
// start MiniYarnCluster
YarnConfiguration baseConfig = new YarnConfiguration(hadoopConf);
this.yarnCluster = new MiniYARNCluster(getClass().getName(), 2,
1, 1);
yarnCluster.init(baseConfig);
// Install a shutdown hook for stop the service and kill all running applications.
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
yarnCluster.stop();
}
});
yarnCluster.start();
// Workaround for YARN-2642.
Configuration yarnConfig = yarnCluster.getConfig();
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 30 * 1000) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new IOException(e);
}
if (!yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) {
break;
}
}
if (yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) {
throw new IOException("RM not up yes");
}
LOGGER.info("RM address in configuration is " + yarnConfig.get(YarnConfiguration.RM_ADDRESS));
saveConfig(yarnConfig,configPath + "/yarn-site.xml");
}
protected void saveConfig(Configuration conf, String dest) throws IOException {
Configuration redacted = new Configuration(conf);
// This setting references a test class that is not available when using a real Spark
// installation, so remove it from client configs.
redacted.unset("net.topology.node.switch.mapping.impl");
FileOutputStream out = new FileOutputStream(dest);
try {
redacted.writeXml(out);
} finally {
out.close();
}
LOGGER.info("Save configuration to " + dest);
}
@AfterClass
public void stop() {
if (this.yarnCluster != null) {
this.yarnCluster.stop();
}
if (this.dfsCluster != null) {
this.dfsCluster.shutdown();
}
}
public String getConfigPath() {
return configPath;
}
public MiniYARNCluster getYarnCluster() {
return yarnCluster;
}
}

View file

@ -0,0 +1,68 @@
package org.apache.zeppelin.interpreter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import static org.mockito.Mockito.mock;
public class MiniZeppelin {
protected static final Logger LOGGER = LoggerFactory.getLogger(MiniZeppelin.class);
protected InterpreterSettingManager interpreterSettingManager;
protected InterpreterFactory interpreterFactory;
protected File zeppelinHome;
private File confDir;
private File notebookDir;
protected ZeppelinConfiguration conf;
public void start() throws IOException {
zeppelinHome = new File("..");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
zeppelinHome.getAbsolutePath());
confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName());
notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName());
confDir.mkdirs();
notebookDir.mkdirs();
LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
FileUtils.copyFile(new File(zeppelinHome, "conf/log4j.properties"), new File(confDir, "log4j.properties"));
FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties"));
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
conf = new ZeppelinConfiguration();
interpreterSettingManager = new InterpreterSettingManager(conf,
mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
interpreterFactory = new InterpreterFactory(interpreterSettingManager);
}
public void stop() throws IOException {
interpreterSettingManager.close();
FileUtils.deleteDirectory(confDir);
FileUtils.deleteDirectory(notebookDir);
}
public File getZeppelinHome() {
return zeppelinHome;
}
public File getZeppelinConfDir() {
return confDir;
}
public InterpreterFactory getInterpreterFactory() {
return interpreterFactory;
}
public InterpreterSettingManager getInterpreterSettingManager() {
return interpreterSettingManager;
}
}

View file

@ -0,0 +1,147 @@
package org.apache.zeppelin.interpreter;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SparkInterpreterModeTest {
private static MiniHadoopCluster hadoopCluster;
private static MiniZeppelin zeppelin;
private static InterpreterFactory interpreterFactory;
private static InterpreterSettingManager interpreterSettingManager;
@BeforeClass
public static void setUp() throws IOException {
hadoopCluster = new MiniHadoopCluster();
hadoopCluster.start();
zeppelin = new MiniZeppelin();
zeppelin.start();
interpreterFactory = zeppelin.getInterpreterFactory();
interpreterSettingManager = zeppelin.getInterpreterSettingManager();
}
@AfterClass
public static void tearDown() throws IOException {
if (zeppelin != null) {
zeppelin.stop();
}
if (hadoopCluster != null) {
hadoopCluster.stop();
}
}
private void testInterpreterBasics() throws IOException {
// test SparkInterpreter
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark");
InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
// test PySparkInterpreter
Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark");
interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
// test IPySparkInterpreter
Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.ipyspark");
interpreterResult = ipySparkInterpreter.interpret("sqlContext.table('test').show()", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
// test SparkSQLInterpreter
Interpreter sqlInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.sql");
interpreterResult = sqlInterpreter.interpret("select count(1) from test", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("count(1)\n2\n", interpreterResult.message().get(0).getData());
}
@Test
public void testLocalMode() throws IOException, YarnException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "local[*]");
sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
testInterpreterBasics();
// no yarn application launched
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
assertEquals(0, response.getApplicationList().size());
interpreterSettingManager.close();
}
@Test
public void testYarnClientMode() throws IOException, YarnException, InterruptedException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "yarn-client");
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
sparkInterpreterSetting.setProperty("PYSPARK_PYTHON", getPythonExec());
sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
testInterpreterBasics();
// 1 yarn application launched
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
assertEquals(1, response.getApplicationList().size());
interpreterSettingManager.close();
}
@Test
public void testYarnClusterMode() throws IOException, YarnException, InterruptedException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "yarn-cluster");
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
sparkInterpreterSetting.setProperty("spark.pyspark.python", getPythonExec());
sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
testInterpreterBasics();
// 1 yarn application launched
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
assertEquals(1, response.getApplicationList().size());
interpreterSettingManager.close();
}
private String getPythonExec() throws IOException, InterruptedException {
Process process = Runtime.getRuntime().exec(new String[]{"which", "python"});
if (process.waitFor() != 0) {
throw new RuntimeException("Fail to run command: which python.");
}
return IOUtils.toString(process.getInputStream()).trim();
}
}

View file

@ -73,9 +73,7 @@ public class VFSNotebookRepoTest extends AbstractInterpreterTest implements JobL
@After
public void tearDown() throws Exception {
if (!FileUtils.deleteQuietly(testRootDir)) {
LOG.error("Failed to delete {} ", testRootDir.getName());
}
super.tearDown();
}
@Test