mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-2898. Support Yarn-Cluster for Spark Interpreter
This commit is contained in:
parent
3fb67f9cc5
commit
9da7c4b8c2
33 changed files with 1012 additions and 199 deletions
|
|
@ -66,7 +66,8 @@ matrix:
|
|||
# Several tests were excluded from this configuration due to the following issues:
|
||||
# HeliumApplicationFactoryTest - https://issues.apache.org/jira/browse/ZEPPELIN-2470
|
||||
# After issues are fixed these tests need to be included back by removing them from the "-Dtests.to.exclude" property
|
||||
- jdk: "oraclejdk8"
|
||||
- sudo: required
|
||||
jdk: "oraclejdk8"
|
||||
dist: precise
|
||||
env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Pweb-ci -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org.apache.zeppelin.spark.*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
|
||||
|
||||
|
|
@ -143,6 +144,7 @@ before_script:
|
|||
- if [[ -n $LIVY_VER ]]; then ./testing/downloadLivy.sh $LIVY_VER; fi
|
||||
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-$LIVY_VER-bin; fi
|
||||
- if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
|
||||
- export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
|
||||
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
|
||||
- echo "export ZEPPELIN_HELIUM_REGISTRY=helium" >> conf/zeppelin-env.sh
|
||||
- tail conf/zeppelin-env.sh
|
||||
|
|
|
|||
|
|
@ -122,7 +122,11 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
|
|||
export JAVA_OPTS
|
||||
|
||||
JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
|
||||
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
|
||||
if [[ -z "${SPARK_YARN_CLUSTER}" ]]; then
|
||||
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
|
||||
else
|
||||
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
|
||||
fi
|
||||
export JAVA_INTP_OPTS
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -143,7 +143,12 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
|
|||
export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}"
|
||||
fi
|
||||
unset PYSPARKPATH
|
||||
export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}"
|
||||
fi
|
||||
|
||||
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
|
||||
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
|
||||
else
|
||||
# autodetect HADOOP_CONF_HOME by heuristic
|
||||
if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
|
||||
if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
|
||||
|
|
@ -152,13 +157,8 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
|
|||
export HADOOP_CONF_DIR="/etc/hadoop/conf"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
|
||||
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
|
||||
fi
|
||||
|
||||
export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}"
|
||||
fi
|
||||
|
||||
elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then
|
||||
if [[ -n "${HBASE_CONF_DIR}" ]]; then
|
||||
ZEPPELIN_INTP_CLASSPATH+=":${HBASE_CONF_DIR}"
|
||||
|
|
|
|||
23
conf/log4j_yarn_cluster.properties
Normal file
23
conf/log4j_yarn_cluster.properties
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
log4j.rootLogger = INFO, stdout
|
||||
|
||||
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
|
||||
|
||||
|
|
@ -424,7 +424,7 @@ It creates separated SparkContext per each notebook in `isolated` mode.
|
|||
## IPython support
|
||||
|
||||
By default, zeppelin would use IPython in `pyspark` when IPython is available, Otherwise it would fall back to the original PySpark implementation.
|
||||
If you don't want to use IPython, then you can set `zeppelin.spark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
|
||||
If you don't want to use IPython, then you can set `zeppelin.pyspark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
|
||||
[Python Interpreter](python.html)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -78,6 +78,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
private long ipythonLaunchTimeout;
|
||||
private String additionalPythonPath;
|
||||
private String additionalPythonInitFile;
|
||||
private boolean useBuiltinPy4j = true;
|
||||
|
||||
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
|
||||
|
||||
|
|
@ -92,6 +93,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
* @param additionalPythonPath
|
||||
*/
|
||||
public void setAdditionalPythonPath(String additionalPythonPath) {
|
||||
LOGGER.info("setAdditionalPythonPath: " + additionalPythonPath);
|
||||
this.additionalPythonPath = additionalPythonPath;
|
||||
}
|
||||
|
||||
|
|
@ -105,6 +107,10 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
this.additionalPythonInitFile = additionalPythonInitFile;
|
||||
}
|
||||
|
||||
public void setAddBulitinPy4j(boolean add) {
|
||||
this.useBuiltinPy4j = add;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
try {
|
||||
|
|
@ -113,6 +119,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
return;
|
||||
}
|
||||
pythonExecutable = getProperty().getProperty("zeppelin.python", "python");
|
||||
LOGGER.info("Python Exec: " + pythonExecutable);
|
||||
ipythonLaunchTimeout = Long.parseLong(
|
||||
getProperty().getProperty("zeppelin.ipython.launch.timeout", "30000"));
|
||||
this.zeppelinContext = new PythonZeppelinContext(
|
||||
|
|
@ -218,29 +225,34 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
|
||||
executor.setWatchdog(watchDog);
|
||||
|
||||
String py4jLibPath = null;
|
||||
if (System.getenv("ZEPPELIN_HOME") != null) {
|
||||
py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
|
||||
+ PythonInterpreter.ZEPPELIN_PY4JPATH;
|
||||
} else {
|
||||
Path workingPath = Paths.get("..").toAbsolutePath();
|
||||
py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
|
||||
}
|
||||
if (additionalPythonPath != null) {
|
||||
// put the py4j at the end, because additionalPythonPath may already contain py4j.
|
||||
// e.g. PySparkInterpreter
|
||||
additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
|
||||
} else {
|
||||
additionalPythonPath = py4jLibPath;
|
||||
if (useBuiltinPy4j) {
|
||||
String py4jLibPath = null;
|
||||
if (System.getenv("ZEPPELIN_HOME") != null) {
|
||||
py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
|
||||
+ PythonInterpreter.ZEPPELIN_PY4JPATH;
|
||||
} else {
|
||||
Path workingPath = Paths.get("..").toAbsolutePath();
|
||||
py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
|
||||
}
|
||||
if (additionalPythonPath != null) {
|
||||
// put the py4j at the end, because additionalPythonPath may already contain py4j.
|
||||
// e.g. PySparkInterpreter
|
||||
additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
|
||||
} else {
|
||||
additionalPythonPath = py4jLibPath;
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> envs = EnvironmentUtils.getProcEnvironment();
|
||||
if (envs.containsKey("PYTHONPATH")) {
|
||||
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
|
||||
if (additionalPythonPath != null) {
|
||||
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
|
||||
}
|
||||
} else {
|
||||
envs.put("PYTHONPATH", additionalPythonPath);
|
||||
}
|
||||
|
||||
LOGGER.debug("PYTHONPATH: " + envs.get("PYTHONPATH"));
|
||||
LOGGER.info("PYTHONPATH: " + envs.get("PYTHONPATH"));
|
||||
executor.execute(cmd, envs, this);
|
||||
|
||||
// wait until IPython kernel is started or timeout
|
||||
|
|
|
|||
|
|
@ -44,12 +44,15 @@ public class IPySparkInterpreter extends IPythonInterpreter {
|
|||
|
||||
@Override
|
||||
public void open() {
|
||||
getProperty().setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
|
||||
property.setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
|
||||
sparkInterpreter = getSparkInterpreter();
|
||||
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
|
||||
String additionalPythonPath = conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
|
||||
":../interpreter/lib/python";
|
||||
setAdditionalPythonPath(additionalPythonPath);
|
||||
// only set PYTHONPATH in local or yarn-client mode.
|
||||
// yarn-cluster will setup PYTHONPATH automatically.
|
||||
if (!conf.get("spark.submit.deployMode").equals("cluster")) {
|
||||
setAdditionalPythonPath(PythonUtils.sparkPythonPath());
|
||||
setAddBulitinPy4j(false);
|
||||
}
|
||||
setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
|
||||
super.open();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
public void open() {
|
||||
// try IPySparkInterpreter first
|
||||
iPySparkInterpreter = getIPySparkInterpreter();
|
||||
if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true") &&
|
||||
if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true") &&
|
||||
iPySparkInterpreter.checkIPythonPrerequisite()) {
|
||||
try {
|
||||
iPySparkInterpreter.open();
|
||||
|
|
@ -133,7 +133,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
}
|
||||
iPySparkInterpreter = null;
|
||||
|
||||
if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true")) {
|
||||
if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) {
|
||||
// don't print it when it is in testing, just for easy output check in test.
|
||||
try {
|
||||
InterpreterContext.get().out.write(("IPython is not available, " +
|
||||
|
|
@ -202,13 +202,18 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
}
|
||||
}
|
||||
|
||||
private Map setupPySparkEnv() throws IOException{
|
||||
private Map setupPySparkEnv() throws IOException {
|
||||
Map env = EnvironmentUtils.getProcEnvironment();
|
||||
|
||||
if (!env.containsKey("PYTHONPATH")) {
|
||||
SparkConf conf = getSparkConf();
|
||||
env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
|
||||
":../interpreter/lib/python");
|
||||
// only set PYTHONPATH in local or yarn-client mode.
|
||||
// yarn-cluster will setup PYTHONPATH automatically.
|
||||
SparkConf conf = getSparkConf();
|
||||
if (!conf.get("spark.submit.deployMode", "client").equals("cluster")) {
|
||||
if (!env.containsKey("PYTHONPATH")) {
|
||||
env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
|
||||
} else {
|
||||
env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
|
||||
}
|
||||
}
|
||||
|
||||
// get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT
|
||||
|
|
@ -223,7 +228,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
}
|
||||
}
|
||||
|
||||
LOGGER.debug("PYTHONPATH: " + env.get("PYTHONPATH"));
|
||||
LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH"));
|
||||
return env;
|
||||
}
|
||||
|
||||
|
|
@ -251,6 +256,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
gatewayServer.start();
|
||||
|
||||
String pythonExec = getPythonExec(property);
|
||||
LOGGER.info("pythonExec: " + pythonExec);
|
||||
CommandLine cmd = CommandLine.parse(pythonExec);
|
||||
cmd.addArgument(scriptPath, false);
|
||||
cmd.addArgument(Integer.toString(port), false);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.spark;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Util class for PySpark
|
||||
*/
|
||||
public class PythonUtils {
|
||||
|
||||
/**
|
||||
* Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from ZEPPELIN_HOME
|
||||
* when it is embedded mode.
|
||||
*
|
||||
* This method will called in zeppelin server process and spark driver process when it is
|
||||
* local or yarn-client mode.
|
||||
*/
|
||||
public static String sparkPythonPath() {
|
||||
List<String> pythonPath = new ArrayList<String>();
|
||||
String sparkHome = System.getenv("SPARK_HOME");
|
||||
String zeppelinHome = System.getenv("ZEPPELIN_HOME");
|
||||
if (zeppelinHome == null) {
|
||||
zeppelinHome = new File("..").getAbsolutePath();
|
||||
}
|
||||
if (sparkHome != null) {
|
||||
// non-embedded mode when SPARK_HOME is specified.
|
||||
File pyspark = new File(sparkHome, "python/lib/pyspark.zip");
|
||||
if (!pyspark.exists()) {
|
||||
throw new RuntimeException("No pyspark.zip found under " + sparkHome + "/python/lib");
|
||||
}
|
||||
pythonPath.add(pyspark.getAbsolutePath());
|
||||
File[] py4j = new File(sparkHome + "/python/lib").listFiles(new FilenameFilter() {
|
||||
@Override
|
||||
public boolean accept(File dir, String name) {
|
||||
return name.startsWith("py4j");
|
||||
}
|
||||
});
|
||||
if (py4j.length == 0) {
|
||||
throw new RuntimeException("No py4j files found under " + sparkHome + "/python/lib");
|
||||
} else if (py4j.length > 1) {
|
||||
throw new RuntimeException("Multiple py4j files found under " + sparkHome + "/python/lib");
|
||||
} else {
|
||||
pythonPath.add(py4j[0].getAbsolutePath());
|
||||
}
|
||||
} else {
|
||||
// embedded mode
|
||||
File pyspark = new File(zeppelinHome, "interpreter/spark/pyspark/pyspark.zip");
|
||||
if (!pyspark.exists()) {
|
||||
throw new RuntimeException("No pyspark.zip found: " + pyspark.getAbsolutePath());
|
||||
}
|
||||
pythonPath.add(pyspark.getAbsolutePath());
|
||||
File[] py4j = new File(zeppelinHome, "interpreter/spark/pyspark").listFiles(
|
||||
new FilenameFilter() {
|
||||
@Override
|
||||
public boolean accept(File dir, String name) {
|
||||
return name.startsWith("py4j");
|
||||
}
|
||||
});
|
||||
if (py4j.length == 0) {
|
||||
throw new RuntimeException("No py4j files found under " + zeppelinHome +
|
||||
"/interpreter/spark/pyspark");
|
||||
} else if (py4j.length > 1) {
|
||||
throw new RuntimeException("Multiple py4j files found under " + sparkHome +
|
||||
"/interpreter/spark/pyspark");
|
||||
} else {
|
||||
pythonPath.add(py4j[0].getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
// add ${ZEPPELIN_HOME}/interpreter/lib/python for all the cases
|
||||
pythonPath.add(zeppelinHome + "/interpreter/lib/python");
|
||||
return StringUtils.join(pythonPath, ":");
|
||||
}
|
||||
}
|
||||
|
|
@ -351,7 +351,11 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
public boolean isYarnMode() {
|
||||
return getProperty("master").startsWith("yarn");
|
||||
String master = getProperty("master");
|
||||
if (master == null) {
|
||||
master = getProperty().getProperty("spark.master", "local[*]");
|
||||
}
|
||||
return master.startsWith("yarn");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -371,11 +375,6 @@ public class SparkInterpreter extends Interpreter {
|
|||
conf.set("spark.executor.uri", execUri);
|
||||
}
|
||||
conf.set("spark.scheduler.mode", "FAIR");
|
||||
conf.setMaster(getProperty("master"));
|
||||
if (isYarnMode()) {
|
||||
conf.set("master", "yarn");
|
||||
conf.set("spark.submit.deployMode", "client");
|
||||
}
|
||||
|
||||
Properties intpProperty = getProperty();
|
||||
for (Object k : intpProperty.keySet()) {
|
||||
|
|
@ -394,8 +393,6 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
setupConfForPySpark(conf);
|
||||
setupConfForSparkR(conf);
|
||||
Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
|
||||
Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
|
||||
Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
|
||||
|
|
@ -529,96 +526,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
}
|
||||
setupConfForPySpark(conf);
|
||||
setupConfForSparkR(conf);
|
||||
SparkContext sparkContext = new SparkContext(conf);
|
||||
return sparkContext;
|
||||
}
|
||||
|
||||
private void setupConfForPySpark(SparkConf conf) {
|
||||
Object pysparkBaseProperty =
|
||||
new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue();
|
||||
String pysparkBasePath = pysparkBaseProperty != null ? pysparkBaseProperty.toString() : null;
|
||||
File pysparkPath;
|
||||
if (null == pysparkBasePath) {
|
||||
pysparkBasePath =
|
||||
new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../")
|
||||
.getValue().toString();
|
||||
pysparkPath = new File(pysparkBasePath,
|
||||
"interpreter" + File.separator + "spark" + File.separator + "pyspark");
|
||||
} else {
|
||||
pysparkPath = new File(pysparkBasePath,
|
||||
"python" + File.separator + "lib");
|
||||
}
|
||||
|
||||
//Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist
|
||||
//TODO(zjffdu), this is not maintainable when new version is added.
|
||||
String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip",
|
||||
"py4j-0.10.1-src.zip", "py4j-0.10.3-src.zip", "py4j-0.10.4-src.zip"};
|
||||
ArrayList<String> pythonLibUris = new ArrayList<>();
|
||||
for (String lib : pythonLibs) {
|
||||
File libFile = new File(pysparkPath, lib);
|
||||
if (libFile.exists()) {
|
||||
pythonLibUris.add(libFile.toURI().toString());
|
||||
}
|
||||
}
|
||||
pythonLibUris.trimToSize();
|
||||
|
||||
// Distribute two libraries(pyspark.zip and py4j-*.zip) to workers
|
||||
// when spark version is less than or equal to 1.4.1
|
||||
if (pythonLibUris.size() == 2) {
|
||||
try {
|
||||
String confValue = conf.get("spark.yarn.dist.files");
|
||||
conf.set("spark.yarn.dist.files", confValue + "," + Joiner.on(",").join(pythonLibUris));
|
||||
} catch (NoSuchElementException e) {
|
||||
conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris));
|
||||
}
|
||||
if (!useSparkSubmit()) {
|
||||
conf.set("spark.files", conf.get("spark.yarn.dist.files"));
|
||||
}
|
||||
conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs));
|
||||
conf.set("spark.submit.pyFiles", Joiner.on(",").join(pythonLibUris));
|
||||
}
|
||||
|
||||
// Distributes needed libraries to workers
|
||||
// when spark version is greater than or equal to 1.5.0
|
||||
if (isYarnMode()) {
|
||||
conf.set("spark.yarn.isPython", "true");
|
||||
}
|
||||
}
|
||||
|
||||
private void setupConfForSparkR(SparkConf conf) {
|
||||
Object sparkRBaseProperty =
|
||||
new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue();
|
||||
String sparkRBasePath = sparkRBaseProperty != null ? sparkRBaseProperty.toString() : null;
|
||||
File sparkRPath;
|
||||
if (null == sparkRBasePath) {
|
||||
sparkRBasePath =
|
||||
new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../")
|
||||
.getValue().toString();
|
||||
sparkRPath = new File(sparkRBasePath,
|
||||
"interpreter" + File.separator + "spark" + File.separator + "R");
|
||||
} else {
|
||||
sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib");
|
||||
}
|
||||
|
||||
sparkRPath = new File(sparkRPath, "sparkr.zip");
|
||||
if (sparkRPath.exists() && sparkRPath.isFile()) {
|
||||
String archives = null;
|
||||
if (conf.contains("spark.yarn.dist.archives")) {
|
||||
archives = conf.get("spark.yarn.dist.archives");
|
||||
}
|
||||
if (archives != null) {
|
||||
archives = archives + "," + sparkRPath + "#sparkr";
|
||||
} else {
|
||||
archives = sparkRPath + "#sparkr";
|
||||
}
|
||||
conf.set("spark.yarn.dist.archives", archives);
|
||||
} else {
|
||||
logger.warn("sparkr.zip is not found, sparkr may not work.");
|
||||
}
|
||||
}
|
||||
|
||||
static final String toString(Object o) {
|
||||
return (o instanceof String) ? (String) o : "";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -150,9 +150,9 @@
|
|||
"description": "Python command to run pyspark with",
|
||||
"type": "string"
|
||||
},
|
||||
"zeppelin.spark.useIPython": {
|
||||
"zeppelin.pyspark.useIPython": {
|
||||
"envName": null,
|
||||
"propertyName": "zeppelin.spark.useIPython",
|
||||
"propertyName": "zeppelin.pyspark.useIPython",
|
||||
"defaultValue": true,
|
||||
"description": "whether use IPython when it is available",
|
||||
"type": "checkbox"
|
||||
|
|
|
|||
|
|
@ -59,6 +59,7 @@ public class IPySparkInterpreterTest {
|
|||
Properties p = new Properties();
|
||||
p.setProperty("spark.master", "local[4]");
|
||||
p.setProperty("master", "local[4]");
|
||||
p.setProperty("spark.submit.deployMode", "client");
|
||||
p.setProperty("spark.app.name", "Zeppelin Test");
|
||||
p.setProperty("zeppelin.spark.useHiveContext", "true");
|
||||
p.setProperty("zeppelin.spark.maxResult", "1000");
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ public class PySparkInterpreterMatplotlibTest {
|
|||
p.setProperty("zeppelin.spark.importImplicit", "true");
|
||||
p.setProperty("zeppelin.pyspark.python", "python");
|
||||
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
|
||||
p.setProperty("zeppelin.spark.useIPython", "false");
|
||||
p.setProperty("zeppelin.pyspark.useIPython", "false");
|
||||
return p;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ public class PySparkInterpreterTest {
|
|||
p.setProperty("zeppelin.spark.importImplicit", "true");
|
||||
p.setProperty("zeppelin.pyspark.python", "python");
|
||||
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
|
||||
p.setProperty("zeppelin.spark.useIPython", "false");
|
||||
p.setProperty("zeppelin.pyspark.useIPython", "false");
|
||||
return p;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
|
@ -34,7 +36,7 @@ import org.apache.zeppelin.resource.ResourcePool;
|
|||
public class InterpreterContext {
|
||||
private static final ThreadLocal<InterpreterContext> threadIC = new ThreadLocal<>();
|
||||
|
||||
public final InterpreterOutput out;
|
||||
public InterpreterOutput out;
|
||||
|
||||
public static InterpreterContext get() {
|
||||
return threadIC.get();
|
||||
|
|
@ -48,21 +50,46 @@ public class InterpreterContext {
|
|||
threadIC.remove();
|
||||
}
|
||||
|
||||
private final String noteId;
|
||||
private final String replName;
|
||||
private final String paragraphTitle;
|
||||
private final String paragraphId;
|
||||
private final String paragraphText;
|
||||
private String noteId;
|
||||
private String replName;
|
||||
private String paragraphTitle;
|
||||
private String paragraphId;
|
||||
private String paragraphText;
|
||||
private AuthenticationInfo authenticationInfo;
|
||||
private final Map<String, Object> config;
|
||||
private GUI gui;
|
||||
private Map<String, Object> config = new HashMap<>();
|
||||
private GUI gui = new GUI();
|
||||
private AngularObjectRegistry angularObjectRegistry;
|
||||
private ResourcePool resourcePool;
|
||||
private List<InterpreterContextRunner> runners;
|
||||
private List<InterpreterContextRunner> runners = new ArrayList<>();
|
||||
private String className;
|
||||
private RemoteEventClientWrapper client;
|
||||
private RemoteWorksController remoteWorksController;
|
||||
private final Map<String, Integer> progressMap;
|
||||
private Map<String, Integer> progressMap;
|
||||
|
||||
/**
|
||||
* Builder class for InterpreterContext
|
||||
*/
|
||||
public static class Builder {
|
||||
private InterpreterContext context = new InterpreterContext();
|
||||
|
||||
public Builder setNoteId(String noteId) {
|
||||
context.noteId = noteId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setParagraphId(String paragraphId) {
|
||||
context.paragraphId = paragraphId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public InterpreterContext getContext() {
|
||||
return context;
|
||||
}
|
||||
}
|
||||
|
||||
private InterpreterContext() {
|
||||
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
public InterpreterContext(String noteId,
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class RemoteInterpreterServerTest {
|
|||
@Test
|
||||
public void testStartStop() throws InterruptedException, IOException, TException {
|
||||
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
|
||||
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
|
||||
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
|
||||
assertEquals(false, server.isRunning());
|
||||
|
||||
server.start();
|
||||
|
|
|
|||
|
|
@ -93,6 +93,18 @@
|
|||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-json</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ public class AuthenticationIT extends AbstractZeppelinIT {
|
|||
}
|
||||
|
||||
try {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
|
||||
File file = new File(shiroPath);
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT {
|
|||
return;
|
||||
}
|
||||
try {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
|
||||
interpreterOptionPath = conf.getRelativeDir(String.format("%s/interpreter.json", conf.getConfDir()));
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ public class PersonalizeActionsIT extends AbstractZeppelinIT {
|
|||
return;
|
||||
}
|
||||
try {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
|
||||
File file = new File(shiroPath);
|
||||
|
|
|
|||
|
|
@ -126,8 +126,8 @@ public abstract class AbstractTestRestApi {
|
|||
|
||||
private static void start(boolean withAuth) throws Exception {
|
||||
if (!wasRunning) {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), "../zeppelin-web/dist");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), new File("../zeppelin-web/dist").getAbsolutePath());
|
||||
|
||||
// some test profile does not build zeppelin-web.
|
||||
// to prevent zeppelin starting up fail, create zeppelin-web/dist directory
|
||||
|
|
@ -211,7 +211,7 @@ public abstract class AbstractTestRestApi {
|
|||
// set spark home for pyspark
|
||||
sparkProperties.put("spark.home",
|
||||
new InterpreterProperty("spark.home", getSparkHome(), InterpreterPropertyType.TEXTAREA.getValue()));
|
||||
sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
|
||||
sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
|
||||
|
||||
sparkIntpSetting.setProperties(sparkProperties);
|
||||
pySpark = true;
|
||||
|
|
@ -234,7 +234,7 @@ public abstract class AbstractTestRestApi {
|
|||
new InterpreterProperty("spark.home", sparkHome, InterpreterPropertyType.TEXTAREA.getValue()));
|
||||
sparkProperties.put("zeppelin.spark.useHiveContext",
|
||||
new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue()));
|
||||
sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
|
||||
sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
|
||||
|
||||
pySpark = true;
|
||||
sparkR = true;
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@
|
|||
|
||||
<properties>
|
||||
<!--library versions-->
|
||||
<hadoop.version>2.6.0</hadoop.version>
|
||||
<hadoop.version>2.7.3</hadoop.version>
|
||||
<commons.lang3.version>3.4</commons.lang3.version>
|
||||
<commons.vfs2.version>2.0</commons.vfs2.version>
|
||||
<aws.sdk.s3.version>1.10.62</aws.sdk.s3.version>
|
||||
|
|
@ -214,12 +214,6 @@
|
|||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency> <!-- because there are two of them above -->
|
||||
<groupId>xml-apis</groupId>
|
||||
<artifactId>xml-apis</artifactId>
|
||||
<version>${xml.apis.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jgit</groupId>
|
||||
<artifactId>org.eclipse.jgit</artifactId>
|
||||
|
|
@ -298,22 +292,22 @@
|
|||
<version>${commons.lang3.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongo-java-driver</artifactId>
|
||||
<version>3.4.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
<version>1.5</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongo-java-driver</artifactId>
|
||||
<version>3.4.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
|
|
@ -323,11 +317,14 @@
|
|||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-json</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
</exclusion>
|
||||
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
|
|
@ -348,10 +345,6 @@
|
|||
<groupId>commons-httpclient</groupId>
|
||||
<artifactId>commons-httpclient</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jgit</groupId>
|
||||
<artifactId>org.eclipse.jgit</artifactId>
|
||||
|
|
@ -378,6 +371,265 @@
|
|||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
<classifier>tests</classifier>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-json</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.jackrabbit</groupId>
|
||||
<artifactId>jackrabbit-webdav</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-httpclient</groupId>
|
||||
<artifactId>commons-httpclient</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jgit</groupId>
|
||||
<artifactId>org.eclipse.jgit</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.jcraft</groupId>
|
||||
<artifactId>jsch</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>xml-apis</groupId>
|
||||
<artifactId>xml-apis</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>xerces</groupId>
|
||||
<artifactId>xercesImpl</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-core-asl</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
<classifier>tests</classifier>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<!--<exclusion>-->
|
||||
<!--<groupId>com.sun.jersey</groupId>-->
|
||||
<!--<artifactId>jersey-core</artifactId>-->
|
||||
<!--</exclusion>-->
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-json</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
</exclusion>
|
||||
<!--<exclusion>-->
|
||||
<!--<groupId>com.sun.jersey</groupId>-->
|
||||
<!--<artifactId>jersey-server</artifactId>-->
|
||||
<!--</exclusion>-->
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.jackrabbit</groupId>
|
||||
<artifactId>jackrabbit-webdav</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-httpclient</groupId>
|
||||
<artifactId>commons-httpclient</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jgit</groupId>
|
||||
<artifactId>org.eclipse.jgit</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.jcraft</groupId>
|
||||
<artifactId>jsch</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>xml-apis</groupId>
|
||||
<artifactId>xml-apis</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>xerces</groupId>
|
||||
<artifactId>xercesImpl</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-tests</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
<classifier>tests</classifier>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
</exclusion>
|
||||
<!--<exclusion>-->
|
||||
<!--<groupId>com.sun.jersey</groupId>-->
|
||||
<!--<artifactId>jersey-json</artifactId>-->
|
||||
<!--</exclusion>-->
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.jackrabbit</groupId>
|
||||
<artifactId>jackrabbit-webdav</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-httpclient</groupId>
|
||||
<artifactId>commons-httpclient</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jgit</groupId>
|
||||
<artifactId>org.eclipse.jgit</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.jcraft</groupId>
|
||||
<artifactId>jsch</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>xml-apis</groupId>
|
||||
<artifactId>xml-apis</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>xerces</groupId>
|
||||
<artifactId>xercesImpl</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-core-asl</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-jaxrs</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-xc</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-spark_2.10</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.errorprone</groupId>
|
||||
<artifactId>error_prone_annotations</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.grpc</groupId>
|
||||
<artifactId>grpc-context</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
@ -392,6 +644,12 @@
|
|||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<forkMode>always</forkMode>
|
||||
<systemProperties>
|
||||
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
|
||||
</systemProperties>
|
||||
<environmentVariables>
|
||||
<!--<ZEPPELIN_HOME>..</ZEPPELIN_HOME>-->
|
||||
</environmentVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
|
|
|
|||
|
|
@ -493,7 +493,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
}
|
||||
|
||||
public String getConfDir() {
|
||||
return getString(ConfVars.ZEPPELIN_CONF_DIR);
|
||||
return getRelativeDir(ConfVars.ZEPPELIN_CONF_DIR);
|
||||
}
|
||||
|
||||
public List<String> getAllowedOrigins()
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.gson.JsonArray;
|
||||
|
|
@ -26,6 +27,7 @@ import com.google.gson.JsonObject;
|
|||
import com.google.gson.annotations.SerializedName;
|
||||
import com.google.gson.internal.StringMap;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.dep.Dependency;
|
||||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
|
|
@ -44,6 +46,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FilenameFilter;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
|
|
@ -55,6 +58,7 @@ import java.util.HashSet;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
|
@ -452,7 +456,9 @@ public class InterpreterSetting {
|
|||
Properties jProperties = new Properties();
|
||||
Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties;
|
||||
for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) {
|
||||
jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
|
||||
if (entry.getValue().getValue() != null) {
|
||||
jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
|
||||
}
|
||||
}
|
||||
|
||||
if (!jProperties.containsKey("zeppelin.interpreter.output.limit")) {
|
||||
|
|
@ -707,22 +713,133 @@ public class InterpreterSetting {
|
|||
interpreterRunner != null ? interpreterRunner.getPath() :
|
||||
conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
|
||||
interpreterDir, localRepoPath,
|
||||
getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
|
||||
getEnvFromInterpreterProperty(), connectTimeout,
|
||||
remoteInterpreterProcessListener, appEventListener, group);
|
||||
}
|
||||
return remoteInterpreterProcess;
|
||||
}
|
||||
|
||||
private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
|
||||
Map<String, String> env = new HashMap<>();
|
||||
for (Object key : property.keySet()) {
|
||||
if (RemoteInterpreterUtils.isEnvString((String) key)) {
|
||||
env.put((String) key, property.getProperty((String) key));
|
||||
private boolean isSparkConf(String key, String value) {
|
||||
return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
|
||||
}
|
||||
|
||||
private Map<String, String> getEnvFromInterpreterProperty() {
|
||||
Map<String, String> env = new HashMap<String, String>();
|
||||
Properties javaProperties = getJavaProperties();
|
||||
Properties sparkProperties = new Properties();
|
||||
String sparkMaster = getSparkMaster();
|
||||
for (String key : javaProperties.stringPropertyNames()) {
|
||||
if (RemoteInterpreterUtils.isEnvString(key)) {
|
||||
env.put(key, javaProperties.getProperty(key));
|
||||
}
|
||||
if (isSparkConf(key, javaProperties.getProperty(key))) {
|
||||
sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key)));
|
||||
}
|
||||
}
|
||||
|
||||
setupPropertiesForPySpark(sparkProperties);
|
||||
setupPropertiesForSparkR(sparkProperties, javaProperties.getProperty("SPARK_HOME"));
|
||||
if (isYarnMode() && getDeployMode().equals("cluster")) {
|
||||
env.put("SPARK_YARN_CLUSTER", "true");
|
||||
}
|
||||
|
||||
StringBuilder sparkConfBuilder = new StringBuilder();
|
||||
if (sparkMaster != null) {
|
||||
sparkConfBuilder.append(" --master " + sparkMaster);
|
||||
}
|
||||
if (isYarnMode() && getDeployMode().equals("cluster")) {
|
||||
sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties");
|
||||
}
|
||||
for (String name : sparkProperties.stringPropertyNames()) {
|
||||
sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
|
||||
}
|
||||
|
||||
env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
|
||||
LOGGER.debug("getEnvFromInterpreterProperty: " + env);
|
||||
return env;
|
||||
}
|
||||
|
||||
private void setupPropertiesForPySpark(Properties sparkProperties) {
|
||||
if (isYarnMode()) {
|
||||
sparkProperties.setProperty("spark.yarn.isPython", "true");
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeSparkProperty(Properties sparkProperties, String propertyName,
|
||||
String propertyValue) {
|
||||
if (sparkProperties.containsKey(propertyName)) {
|
||||
String oldPropertyValue = sparkProperties.getProperty(propertyName);
|
||||
sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
|
||||
} else {
|
||||
sparkProperties.setProperty(propertyName, propertyValue);
|
||||
}
|
||||
}
|
||||
|
||||
private void setupPropertiesForSparkR(Properties sparkProperties,
|
||||
String sparkHome) {
|
||||
File sparkRBasePath = null;
|
||||
if (sparkHome == null) {
|
||||
if (!getSparkMaster().startsWith("local")) {
|
||||
throw new RuntimeException("SPARK_HOME is not specified for non-local mode");
|
||||
}
|
||||
String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
|
||||
sparkRBasePath = new File(zeppelinHome,
|
||||
"interpreter" + File.separator + "spark" + File.separator + "R");
|
||||
} else {
|
||||
sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
|
||||
}
|
||||
|
||||
File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
|
||||
if (sparkRPath.exists() && sparkRPath.isFile()) {
|
||||
mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
|
||||
} else {
|
||||
LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
|
||||
}
|
||||
}
|
||||
|
||||
private String getSparkMaster() {
|
||||
String master = getJavaProperties().getProperty("master");
|
||||
if (master == null) {
|
||||
master = getJavaProperties().getProperty("spark.master", "local[*]");
|
||||
}
|
||||
return master;
|
||||
}
|
||||
|
||||
private String getDeployMode() {
|
||||
String master = getSparkMaster();
|
||||
if (master.equals("yarn-client")) {
|
||||
return "client";
|
||||
} else if (master.equals("yarn-cluster")) {
|
||||
return "cluster";
|
||||
} else if (master.startsWith("local")) {
|
||||
return "client";
|
||||
} else {
|
||||
String deployMode = getJavaProperties().getProperty("spark.submit.deployMode");
|
||||
if (deployMode == null) {
|
||||
throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
|
||||
"is not specified");
|
||||
}
|
||||
if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
|
||||
throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
|
||||
}
|
||||
return deployMode;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isYarnMode() {
|
||||
return getSparkMaster().startsWith("yarn");
|
||||
}
|
||||
|
||||
private String toShellFormat(String value) {
|
||||
if (value.contains("\'") && value.contains("\"")) {
|
||||
throw new RuntimeException("Spark property value could not contain both \" and '");
|
||||
} else if (value.contains("\'")) {
|
||||
return "\"" + value + "\"";
|
||||
} else {
|
||||
return "\'" + value + "\'";
|
||||
}
|
||||
}
|
||||
|
||||
private List<Interpreter> getOrCreateSession(String user, String noteId) {
|
||||
ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId);
|
||||
Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " +
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ public class InterpreterSettingManager {
|
|||
this.interpreterDirPath = Paths.get(conf.getInterpreterDir());
|
||||
LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath);
|
||||
this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath());
|
||||
LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath);
|
||||
LOGGER.debug("InterpreterSettingPath: {}", interpreterSettingPath);
|
||||
this.dependencyResolver = new DependencyResolver(
|
||||
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
|
||||
this.interpreterRepositories = dependencyResolver.getRepos();
|
||||
|
|
@ -283,7 +283,7 @@ public class InterpreterSettingManager {
|
|||
private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
|
||||
String interpreterJson) throws IOException {
|
||||
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
|
||||
ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
|
||||
ClassLoader tempClassLoader = new URLClassLoader(urls, null);
|
||||
|
||||
URL url = tempClassLoader.getResource(interpreterJson);
|
||||
if (url == null) {
|
||||
|
|
@ -392,6 +392,17 @@ public class InterpreterSettingManager {
|
|||
return settings;
|
||||
}
|
||||
|
||||
public InterpreterSetting getInterpreterSettingByName(String name) {
|
||||
synchronized (interpreterSettings) {
|
||||
for (InterpreterSetting setting : interpreterSettings.values()) {
|
||||
if (setting.getName().equals(name)) {
|
||||
return setting;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("No such interpreter setting: " + name);
|
||||
}
|
||||
|
||||
public ManagedInterpreterGroup getInterpreterGroupById(String groupId) {
|
||||
for (InterpreterSetting setting : interpreterSettings.values()) {
|
||||
ManagedInterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId);
|
||||
|
|
|
|||
|
|
@ -254,7 +254,11 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
try {
|
||||
clearUnreadEvents(interpreterProcess.getClient());
|
||||
} catch (Exception e1) {
|
||||
logger.error("Can't get RemoteInterpreterEvent", e1);
|
||||
if (shutdown) {
|
||||
logger.error("Can not get RemoteInterpreterEvent because it is shutdown.");
|
||||
} else {
|
||||
logger.error("Can't get RemoteInterpreterEvent", e1);
|
||||
}
|
||||
}
|
||||
if (appendFuture != null) {
|
||||
appendFuture.cancel(true);
|
||||
|
|
|
|||
|
|
@ -226,6 +226,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
}
|
||||
|
||||
public void stop() {
|
||||
// shutdown EventPoller first.
|
||||
this.remoteInterpreterEventPoller.shutdown();
|
||||
if (callbackServer.isServing()) {
|
||||
callbackServer.stop();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ public abstract class RemoteInterpreterProcess {
|
|||
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
|
||||
|
||||
private GenericObjectPool<Client> clientPool;
|
||||
private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
|
||||
protected final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
|
||||
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
|
||||
private int connectTimeout;
|
||||
|
||||
|
|
|
|||
|
|
@ -25,14 +25,10 @@ import static org.mockito.Mockito.mock;
|
|||
*/
|
||||
public abstract class AbstractInterpreterTest {
|
||||
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractInterpreterTest.class);
|
||||
private static final String INTERPRETER_SCRIPT =
|
||||
System.getProperty("os.name").startsWith("Windows") ?
|
||||
"../bin/interpreter.cmd" :
|
||||
"../bin/interpreter.sh";
|
||||
|
||||
protected InterpreterSettingManager interpreterSettingManager;
|
||||
protected InterpreterFactory interpreterFactory;
|
||||
protected File testRootDir;
|
||||
protected File zeppelinHome;
|
||||
protected File interpreterDir;
|
||||
protected File confDir;
|
||||
protected File notebookDir;
|
||||
|
|
@ -41,12 +37,11 @@ public abstract class AbstractInterpreterTest {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
// copy the resources files to a temp folder
|
||||
testRootDir = new File(System.getProperty("java.io.tmpdir") + "/Zeppelin_Test_" + System.currentTimeMillis());
|
||||
testRootDir.mkdirs();
|
||||
LOGGER.info("Create tmp directory: {} as root folder of ZEPPELIN_INTERPRETER_DIR & ZEPPELIN_CONF_DIR", testRootDir.getAbsolutePath());
|
||||
interpreterDir = new File(testRootDir, "interpreter");
|
||||
confDir = new File(testRootDir, "conf");
|
||||
notebookDir = new File(testRootDir, "notebook");
|
||||
zeppelinHome = new File("..");
|
||||
LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
|
||||
interpreterDir = new File(zeppelinHome, "interpreter_" + getClass().getSimpleName());
|
||||
confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName());
|
||||
notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName());
|
||||
|
||||
interpreterDir.mkdirs();
|
||||
confDir.mkdirs();
|
||||
|
|
@ -55,10 +50,10 @@ public abstract class AbstractInterpreterTest {
|
|||
FileUtils.copyDirectory(new File("src/test/resources/interpreter"), interpreterDir);
|
||||
FileUtils.copyDirectory(new File("src/test/resources/conf"), confDir);
|
||||
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(), INTERPRETER_SCRIPT);
|
||||
|
||||
conf = new ZeppelinConfiguration();
|
||||
interpreterSettingManager = new InterpreterSettingManager(conf,
|
||||
|
|
@ -69,6 +64,8 @@ public abstract class AbstractInterpreterTest {
|
|||
@After
|
||||
public void tearDown() throws Exception {
|
||||
interpreterSettingManager.close();
|
||||
FileUtils.deleteDirectory(testRootDir);
|
||||
FileUtils.deleteDirectory(interpreterDir);
|
||||
FileUtils.deleteDirectory(confDir);
|
||||
FileUtils.deleteDirectory(notebookDir);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,114 @@
|
|||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* Util class for creating a Mini Hadoop cluster in local machine to test scenarios that needs
|
||||
* hadoop cluster.
|
||||
*/
|
||||
public class MiniHadoopCluster {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(MiniHadoopCluster.class);
|
||||
|
||||
private Configuration hadoopConf;
|
||||
private MiniDFSCluster dfsCluster;
|
||||
private MiniYARNCluster yarnCluster;
|
||||
private String configPath = new File("target/tests/hadoop_conf").getAbsolutePath();
|
||||
|
||||
@BeforeClass
|
||||
public void start() throws IOException {
|
||||
LOGGER.info("Starting MiniHadoopCluster ...");
|
||||
this.hadoopConf = new Configuration();
|
||||
new File(configPath).mkdirs();
|
||||
// start MiniDFSCluster
|
||||
this.dfsCluster = new MiniDFSCluster.Builder(hadoopConf)
|
||||
.numDataNodes(2)
|
||||
.format(true)
|
||||
.waitSafeMode(true)
|
||||
.build();
|
||||
this.dfsCluster.waitActive();
|
||||
saveConfig(hadoopConf, configPath + "/core-site.xml");
|
||||
|
||||
// start MiniYarnCluster
|
||||
YarnConfiguration baseConfig = new YarnConfiguration(hadoopConf);
|
||||
this.yarnCluster = new MiniYARNCluster(getClass().getName(), 2,
|
||||
1, 1);
|
||||
yarnCluster.init(baseConfig);
|
||||
|
||||
// Install a shutdown hook for stop the service and kill all running applications.
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
yarnCluster.stop();
|
||||
}
|
||||
});
|
||||
|
||||
yarnCluster.start();
|
||||
|
||||
// Workaround for YARN-2642.
|
||||
Configuration yarnConfig = yarnCluster.getConfig();
|
||||
long start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < 30 * 1000) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
if (!yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) {
|
||||
throw new IOException("RM not up yes");
|
||||
}
|
||||
|
||||
LOGGER.info("RM address in configuration is " + yarnConfig.get(YarnConfiguration.RM_ADDRESS));
|
||||
saveConfig(yarnConfig,configPath + "/yarn-site.xml");
|
||||
}
|
||||
|
||||
protected void saveConfig(Configuration conf, String dest) throws IOException {
|
||||
Configuration redacted = new Configuration(conf);
|
||||
// This setting references a test class that is not available when using a real Spark
|
||||
// installation, so remove it from client configs.
|
||||
redacted.unset("net.topology.node.switch.mapping.impl");
|
||||
|
||||
FileOutputStream out = new FileOutputStream(dest);
|
||||
try {
|
||||
redacted.writeXml(out);
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
LOGGER.info("Save configuration to " + dest);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void stop() {
|
||||
if (this.yarnCluster != null) {
|
||||
this.yarnCluster.stop();
|
||||
}
|
||||
if (this.dfsCluster != null) {
|
||||
this.dfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public String getConfigPath() {
|
||||
return configPath;
|
||||
}
|
||||
|
||||
public MiniYARNCluster getYarnCluster() {
|
||||
return yarnCluster;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class MiniZeppelin {
|
||||
|
||||
protected static final Logger LOGGER = LoggerFactory.getLogger(MiniZeppelin.class);
|
||||
|
||||
protected InterpreterSettingManager interpreterSettingManager;
|
||||
protected InterpreterFactory interpreterFactory;
|
||||
protected File zeppelinHome;
|
||||
private File confDir;
|
||||
private File notebookDir;
|
||||
protected ZeppelinConfiguration conf;
|
||||
|
||||
public void start() throws IOException {
|
||||
zeppelinHome = new File("..");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
|
||||
zeppelinHome.getAbsolutePath());
|
||||
confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName());
|
||||
notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName());
|
||||
confDir.mkdirs();
|
||||
notebookDir.mkdirs();
|
||||
LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
|
||||
FileUtils.copyFile(new File(zeppelinHome, "conf/log4j.properties"), new File(confDir, "log4j.properties"));
|
||||
FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties"));
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
|
||||
conf = new ZeppelinConfiguration();
|
||||
interpreterSettingManager = new InterpreterSettingManager(conf,
|
||||
mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
|
||||
interpreterFactory = new InterpreterFactory(interpreterSettingManager);
|
||||
}
|
||||
|
||||
public void stop() throws IOException {
|
||||
interpreterSettingManager.close();
|
||||
FileUtils.deleteDirectory(confDir);
|
||||
FileUtils.deleteDirectory(notebookDir);
|
||||
}
|
||||
|
||||
public File getZeppelinHome() {
|
||||
return zeppelinHome;
|
||||
}
|
||||
|
||||
public File getZeppelinConfDir() {
|
||||
return confDir;
|
||||
}
|
||||
|
||||
public InterpreterFactory getInterpreterFactory() {
|
||||
return interpreterFactory;
|
||||
}
|
||||
|
||||
public InterpreterSettingManager getInterpreterSettingManager() {
|
||||
return interpreterSettingManager;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,147 @@
|
|||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SparkInterpreterModeTest {
|
||||
|
||||
private static MiniHadoopCluster hadoopCluster;
|
||||
private static MiniZeppelin zeppelin;
|
||||
private static InterpreterFactory interpreterFactory;
|
||||
private static InterpreterSettingManager interpreterSettingManager;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws IOException {
|
||||
hadoopCluster = new MiniHadoopCluster();
|
||||
hadoopCluster.start();
|
||||
|
||||
zeppelin = new MiniZeppelin();
|
||||
zeppelin.start();
|
||||
interpreterFactory = zeppelin.getInterpreterFactory();
|
||||
interpreterSettingManager = zeppelin.getInterpreterSettingManager();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
if (zeppelin != null) {
|
||||
zeppelin.stop();
|
||||
}
|
||||
if (hadoopCluster != null) {
|
||||
hadoopCluster.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void testInterpreterBasics() throws IOException {
|
||||
// test SparkInterpreter
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
|
||||
Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark");
|
||||
|
||||
InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
|
||||
InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
|
||||
interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
|
||||
assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
|
||||
|
||||
// test PySparkInterpreter
|
||||
Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark");
|
||||
interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
|
||||
|
||||
// test IPySparkInterpreter
|
||||
Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.ipyspark");
|
||||
interpreterResult = ipySparkInterpreter.interpret("sqlContext.table('test').show()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
|
||||
|
||||
// test SparkSQLInterpreter
|
||||
Interpreter sqlInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.sql");
|
||||
interpreterResult = sqlInterpreter.interpret("select count(1) from test", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
|
||||
assertEquals("count(1)\n2\n", interpreterResult.message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalMode() throws IOException, YarnException {
|
||||
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
|
||||
sparkInterpreterSetting.setProperty("master", "local[*]");
|
||||
sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
|
||||
sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
|
||||
sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
|
||||
sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
|
||||
|
||||
testInterpreterBasics();
|
||||
|
||||
// no yarn application launched
|
||||
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
|
||||
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
|
||||
assertEquals(0, response.getApplicationList().size());
|
||||
|
||||
interpreterSettingManager.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClientMode() throws IOException, YarnException, InterruptedException {
|
||||
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
|
||||
sparkInterpreterSetting.setProperty("master", "yarn-client");
|
||||
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
|
||||
sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
|
||||
sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
|
||||
sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
|
||||
sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
|
||||
sparkInterpreterSetting.setProperty("PYSPARK_PYTHON", getPythonExec());
|
||||
sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
|
||||
|
||||
testInterpreterBasics();
|
||||
|
||||
// 1 yarn application launched
|
||||
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
|
||||
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
|
||||
assertEquals(1, response.getApplicationList().size());
|
||||
|
||||
interpreterSettingManager.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClusterMode() throws IOException, YarnException, InterruptedException {
|
||||
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
|
||||
sparkInterpreterSetting.setProperty("master", "yarn-cluster");
|
||||
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
|
||||
sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
|
||||
sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
|
||||
sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
|
||||
sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
|
||||
sparkInterpreterSetting.setProperty("spark.pyspark.python", getPythonExec());
|
||||
sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
|
||||
|
||||
testInterpreterBasics();
|
||||
|
||||
// 1 yarn application launched
|
||||
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
|
||||
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
|
||||
assertEquals(1, response.getApplicationList().size());
|
||||
|
||||
interpreterSettingManager.close();
|
||||
}
|
||||
|
||||
private String getPythonExec() throws IOException, InterruptedException {
|
||||
Process process = Runtime.getRuntime().exec(new String[]{"which", "python"});
|
||||
if (process.waitFor() != 0) {
|
||||
throw new RuntimeException("Fail to run command: which python.");
|
||||
}
|
||||
return IOUtils.toString(process.getInputStream()).trim();
|
||||
}
|
||||
}
|
||||
|
|
@ -73,9 +73,7 @@ public class VFSNotebookRepoTest extends AbstractInterpreterTest implements JobL
|
|||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (!FileUtils.deleteQuietly(testRootDir)) {
|
||||
LOG.error("Failed to delete {} ", testRootDir.getName());
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in a new issue