mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
address comments
This commit is contained in:
parent
e545cc3c44
commit
17dc2f1177
6 changed files with 36 additions and 225 deletions
|
|
@ -122,7 +122,7 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
|
|||
export JAVA_OPTS
|
||||
|
||||
JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
|
||||
if [[ -z "${SPARK_YARN_CLUSTER}" ]]; then
|
||||
if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
|
||||
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
|
||||
else
|
||||
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
|
||||
|
|
|
|||
|
|
@ -640,216 +640,10 @@ public class InterpreterSetting {
|
|||
}
|
||||
return interpreters;
|
||||
}
|
||||
<<<<<<< 0c64d9ca676e48a749db9879fa3cebc06eb78b54
|
||||
|
||||
RemoteInterpreterProcess createInterpreterProcess() {
|
||||
RemoteInterpreterProcess remoteInterpreterProcess = null;
|
||||
int connectTimeout =
|
||||
conf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + id;
|
||||
if (option.isExistingProcess()) {
|
||||
// TODO(zjffdu) remove the existing process approach seems no one is using this.
|
||||
// use the existing process
|
||||
remoteInterpreterProcess = new RemoteInterpreterRunningProcess(
|
||||
connectTimeout,
|
||||
remoteInterpreterProcessListener,
|
||||
appEventListener,
|
||||
option.getHost(),
|
||||
option.getPort());
|
||||
} else {
|
||||
// create new remote process
|
||||
remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
|
||||
interpreterRunner != null ? interpreterRunner.getPath() :
|
||||
conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
|
||||
interpreterDir, localRepoPath,
|
||||
getEnvFromInterpreterProperty(), connectTimeout,
|
||||
remoteInterpreterProcessListener, appEventListener, group);
|
||||
}
|
||||
return remoteInterpreterProcess;
|
||||
}
|
||||
|
||||
private boolean isSparkConf(String key, String value) {
|
||||
return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
|
||||
}
|
||||
|
||||
private Map<String, String> getEnvFromInterpreterProperty() {
|
||||
Map<String, String> env = new HashMap<String, String>();
|
||||
Properties javaProperties = getJavaProperties();
|
||||
Properties sparkProperties = new Properties();
|
||||
String sparkMaster = getSparkMaster();
|
||||
for (String key : javaProperties.stringPropertyNames()) {
|
||||
if (RemoteInterpreterUtils.isEnvString(key)) {
|
||||
env.put(key, javaProperties.getProperty(key));
|
||||
}
|
||||
if (isSparkConf(key, javaProperties.getProperty(key))) {
|
||||
sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key)));
|
||||
}
|
||||
}
|
||||
|
||||
setupPropertiesForPySpark(sparkProperties);
|
||||
setupPropertiesForSparkR(sparkProperties, System.getenv("SPARK_HOME"));
|
||||
if (isYarnMode() && getDeployMode().equals("cluster")) {
|
||||
env.put("SPARK_YARN_CLUSTER", "true");
|
||||
}
|
||||
|
||||
StringBuilder sparkConfBuilder = new StringBuilder();
|
||||
if (sparkMaster != null) {
|
||||
sparkConfBuilder.append(" --master " + sparkMaster);
|
||||
}
|
||||
if (isYarnMode() && getDeployMode().equals("cluster")) {
|
||||
sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties");
|
||||
}
|
||||
for (String name : sparkProperties.stringPropertyNames()) {
|
||||
sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
|
||||
}
|
||||
|
||||
env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
|
||||
LOGGER.debug("getEnvFromInterpreterProperty: " + env);
|
||||
return env;
|
||||
}
|
||||
|
||||
private void setupPropertiesForPySpark(Properties sparkProperties) {
|
||||
if (isYarnMode()) {
|
||||
sparkProperties.setProperty("spark.yarn.isPython", "true");
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeSparkProperty(Properties sparkProperties, String propertyName,
|
||||
String propertyValue) {
|
||||
if (sparkProperties.containsKey(propertyName)) {
|
||||
String oldPropertyValue = sparkProperties.getProperty(propertyName);
|
||||
sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
|
||||
} else {
|
||||
sparkProperties.setProperty(propertyName, propertyValue);
|
||||
}
|
||||
}
|
||||
|
||||
private void setupPropertiesForSparkR(Properties sparkProperties,
|
||||
String sparkHome) {
|
||||
File sparkRBasePath = null;
|
||||
if (sparkHome == null) {
|
||||
if (!getSparkMaster().startsWith("local")) {
|
||||
throw new RuntimeException("SPARK_HOME is not specified for non-local mode");
|
||||
}
|
||||
String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
|
||||
sparkRBasePath = new File(zeppelinHome,
|
||||
"interpreter" + File.separator + "spark" + File.separator + "R");
|
||||
} else {
|
||||
sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
|
||||
}
|
||||
|
||||
File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
|
||||
if (sparkRPath.exists() && sparkRPath.isFile()) {
|
||||
mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
|
||||
} else {
|
||||
LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
|
||||
}
|
||||
}
|
||||
|
||||
private String getSparkMaster() {
|
||||
String master = getJavaProperties().getProperty("master");
|
||||
if (master == null) {
|
||||
master = getJavaProperties().getProperty("spark.master", "local[*]");
|
||||
}
|
||||
return master;
|
||||
}
|
||||
|
||||
private String getDeployMode() {
|
||||
String master = getSparkMaster();
|
||||
if (master.equals("yarn-client")) {
|
||||
return "client";
|
||||
} else if (master.equals("yarn-cluster")) {
|
||||
return "cluster";
|
||||
} else if (master.startsWith("local")) {
|
||||
return "client";
|
||||
} else {
|
||||
String deployMode = getJavaProperties().getProperty("spark.submit.deployMode");
|
||||
if (deployMode == null) {
|
||||
throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
|
||||
"is not specified");
|
||||
}
|
||||
if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
|
||||
throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
|
||||
}
|
||||
return deployMode;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isYarnMode() {
|
||||
return getSparkMaster().startsWith("yarn");
|
||||
}
|
||||
|
||||
private String toShellFormat(String value) {
|
||||
if (value.contains("\'") && value.contains("\"")) {
|
||||
throw new RuntimeException("Spark property value could not contain both \" and '");
|
||||
} else if (value.contains("\'")) {
|
||||
return "\"" + value + "\"";
|
||||
} else {
|
||||
return "\'" + value + "\'";
|
||||
=======
|
||||
|
||||
// Create Interpreter in ZeppelinServer for non-remote mode
|
||||
private Interpreter createLocalInterpreter(String className) {
|
||||
LOGGER.info("Create Local Interpreter {} from {}", className, interpreterDir);
|
||||
|
||||
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
|
||||
URLClassLoader ccl = cleanCl.get(interpreterDir);
|
||||
if (ccl == null) {
|
||||
// classloader fallback
|
||||
ccl = URLClassLoader.newInstance(new URL[]{}, oldcl);
|
||||
}
|
||||
|
||||
boolean separateCL = true;
|
||||
try { // check if server's classloader has driver already.
|
||||
Class cls = this.getClass().forName(className);
|
||||
if (cls != null) {
|
||||
separateCL = false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("exception checking server classloader driver", e);
|
||||
}
|
||||
|
||||
URLClassLoader cl;
|
||||
|
||||
if (separateCL == true) {
|
||||
cl = URLClassLoader.newInstance(new URL[]{}, ccl);
|
||||
} else {
|
||||
cl = ccl;
|
||||
}
|
||||
Thread.currentThread().setContextClassLoader(cl);
|
||||
|
||||
Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
|
||||
Constructor<Interpreter> constructor =
|
||||
replClass.getConstructor(new Class[]{Properties.class});
|
||||
Interpreter repl = constructor.newInstance(getJavaProperties());
|
||||
repl.setClassloaderUrls(ccl.getURLs());
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl));
|
||||
return intp;
|
||||
} catch (SecurityException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (InstantiationException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(oldcl);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized RemoteInterpreterProcess createInterpreterProcess() throws IOException {
|
||||
if (launcher == null) {
|
||||
createLauncher();
|
||||
>>>>>>> ZEPPELIN-2685. Improvement on Interpreter class
|
||||
}
|
||||
InterpreterLaunchContext launchContext = new
|
||||
InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, name);
|
||||
|
|
|
|||
|
|
@ -54,8 +54,6 @@ public class ShellScriptLauncher extends InterpreterLauncher {
|
|||
int connectTimeout =
|
||||
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
if (option.isExistingProcess()) {
|
||||
// TODO(zjffdu) remove the existing process approach seems no one is using this.
|
||||
// use the existing process
|
||||
return new RemoteInterpreterRunningProcess(
|
||||
connectTimeout,
|
||||
option.getHost(),
|
||||
|
|
|
|||
|
|
@ -54,9 +54,9 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
|
|||
}
|
||||
|
||||
setupPropertiesForPySpark(sparkProperties);
|
||||
setupPropertiesForSparkR(sparkProperties, properties.getProperty("SPARK_HOME"));
|
||||
setupPropertiesForSparkR(sparkProperties);
|
||||
if (isYarnMode() && getDeployMode().equals("cluster")) {
|
||||
env.put("SPARK_YARN_CLUSTER", "true");
|
||||
env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
|
||||
}
|
||||
|
||||
StringBuilder sparkConfBuilder = new StringBuilder();
|
||||
|
|
@ -71,16 +71,39 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
|
|||
}
|
||||
|
||||
env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
|
||||
// use the HADOOP_CONF_DIR defined in zeppelin-env.sh if it is not
|
||||
// specified in interpreter setting
|
||||
if (!env.containsKey("HADOOP_CONF_DIR") && System.getenv("HADOOP_CONF_DIR") != null) {
|
||||
env.put("HADOOP_CONF_DIR", System.getenv("HADOOP_CONF_DIR"));
|
||||
|
||||
// set these env in the order of
|
||||
// 1. interpreter-setting
|
||||
// 2. zeppelin-env.sh
|
||||
// It is encouraged to set env in interpreter setting, but just for backward compatability,
|
||||
// we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
|
||||
for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) {
|
||||
String envValue = getEnv(envName);
|
||||
if (envValue != null) {
|
||||
env.put(envName, envValue);
|
||||
}
|
||||
}
|
||||
LOGGER.debug("buildEnvFromProperties: " + env);
|
||||
return env;
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get environmental variable in the following order
|
||||
*
|
||||
* 1. interpreter setting
|
||||
* 2. zeppelin-env.sh
|
||||
*
|
||||
*/
|
||||
private String getEnv(String envName) {
|
||||
String env = properties.getProperty(envName);
|
||||
if (env == null) {
|
||||
env = System.getenv(envName);
|
||||
}
|
||||
return env;
|
||||
}
|
||||
|
||||
private boolean isSparkConf(String key, String value) {
|
||||
return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
|
||||
}
|
||||
|
|
@ -101,8 +124,8 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
|
|||
}
|
||||
}
|
||||
|
||||
private void setupPropertiesForSparkR(Properties sparkProperties,
|
||||
String sparkHome) {
|
||||
private void setupPropertiesForSparkR(Properties sparkProperties) {
|
||||
String sparkHome = getEnv("SPARK_HOME");
|
||||
File sparkRBasePath = null;
|
||||
if (sparkHome == null) {
|
||||
if (!getSparkMaster(properties).startsWith("local")) {
|
||||
|
|
@ -129,8 +152,7 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
|
|||
* Order to look for spark master
|
||||
* 1. master in interpreter setting
|
||||
* 2. spark.master interpreter setting
|
||||
* 3. SPARK_HOME in zeppelin-env.sh
|
||||
* 4. use local[*]
|
||||
* 3. use local[*]
|
||||
* @param properties
|
||||
* @return
|
||||
*/
|
||||
|
|
@ -138,9 +160,6 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
|
|||
String master = properties.getProperty("master");
|
||||
if (master == null) {
|
||||
master = properties.getProperty("spark.master");
|
||||
if (master == null) {
|
||||
master = System.getenv("SPARK_HOME");
|
||||
}
|
||||
if (master == null) {
|
||||
master = "local[*]";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -895,7 +895,7 @@ public class Notebook implements NoteEventListener {
|
|||
try {
|
||||
notebook.getInterpreterSettingManager().restart(setting.getId());
|
||||
} catch (InterpreterException e) {
|
||||
logger.warn("Fail to resetart interpreter: " + setting.getId(), e);
|
||||
logger.error("Fail to restart interpreter: " + setting.getId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ public class SparkInterpreterLauncherTest {
|
|||
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
|
||||
assertEquals(3, interpreterProcess.getEnv().size());
|
||||
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
|
||||
assertEquals("true", interpreterProcess.getEnv().get("SPARK_YARN_CLUSTER"));
|
||||
assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
|
||||
assertEquals(" --master yarn-cluster --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
|
||||
}
|
||||
|
||||
|
|
@ -154,7 +154,7 @@ public class SparkInterpreterLauncherTest {
|
|||
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
|
||||
assertEquals(3, interpreterProcess.getEnv().size());
|
||||
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
|
||||
assertEquals("true", interpreterProcess.getEnv().get("SPARK_YARN_CLUSTER"));
|
||||
assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
|
||||
assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue