address comments

This commit is contained in:
Jeff Zhang 2017-09-22 15:00:00 +08:00
parent e545cc3c44
commit 17dc2f1177
6 changed files with 36 additions and 225 deletions

View file

@ -122,7 +122,7 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
export JAVA_OPTS
JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
if [[ -z "${SPARK_YARN_CLUSTER}" ]]; then
if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
else
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"

View file

@ -640,216 +640,10 @@ public class InterpreterSetting {
}
return interpreters;
}
<<<<<<< 0c64d9ca676e48a749db9879fa3cebc06eb78b54
RemoteInterpreterProcess createInterpreterProcess() {
RemoteInterpreterProcess remoteInterpreterProcess = null;
int connectTimeout =
conf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + id;
if (option.isExistingProcess()) {
// TODO(zjffdu) remove the existing process approach seems no one is using this.
// use the existing process
remoteInterpreterProcess = new RemoteInterpreterRunningProcess(
connectTimeout,
remoteInterpreterProcessListener,
appEventListener,
option.getHost(),
option.getPort());
} else {
// create new remote process
remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
interpreterRunner != null ? interpreterRunner.getPath() :
conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
interpreterDir, localRepoPath,
getEnvFromInterpreterProperty(), connectTimeout,
remoteInterpreterProcessListener, appEventListener, group);
}
return remoteInterpreterProcess;
}
private boolean isSparkConf(String key, String value) {
return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
}
private Map<String, String> getEnvFromInterpreterProperty() {
Map<String, String> env = new HashMap<String, String>();
Properties javaProperties = getJavaProperties();
Properties sparkProperties = new Properties();
String sparkMaster = getSparkMaster();
for (String key : javaProperties.stringPropertyNames()) {
if (RemoteInterpreterUtils.isEnvString(key)) {
env.put(key, javaProperties.getProperty(key));
}
if (isSparkConf(key, javaProperties.getProperty(key))) {
sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key)));
}
}
setupPropertiesForPySpark(sparkProperties);
setupPropertiesForSparkR(sparkProperties, System.getenv("SPARK_HOME"));
if (isYarnMode() && getDeployMode().equals("cluster")) {
env.put("SPARK_YARN_CLUSTER", "true");
}
StringBuilder sparkConfBuilder = new StringBuilder();
if (sparkMaster != null) {
sparkConfBuilder.append(" --master " + sparkMaster);
}
if (isYarnMode() && getDeployMode().equals("cluster")) {
sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties");
}
for (String name : sparkProperties.stringPropertyNames()) {
sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
}
env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
LOGGER.debug("getEnvFromInterpreterProperty: " + env);
return env;
}
private void setupPropertiesForPySpark(Properties sparkProperties) {
if (isYarnMode()) {
sparkProperties.setProperty("spark.yarn.isPython", "true");
}
}
private void mergeSparkProperty(Properties sparkProperties, String propertyName,
String propertyValue) {
if (sparkProperties.containsKey(propertyName)) {
String oldPropertyValue = sparkProperties.getProperty(propertyName);
sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
} else {
sparkProperties.setProperty(propertyName, propertyValue);
}
}
private void setupPropertiesForSparkR(Properties sparkProperties,
String sparkHome) {
File sparkRBasePath = null;
if (sparkHome == null) {
if (!getSparkMaster().startsWith("local")) {
throw new RuntimeException("SPARK_HOME is not specified for non-local mode");
}
String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
sparkRBasePath = new File(zeppelinHome,
"interpreter" + File.separator + "spark" + File.separator + "R");
} else {
sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
}
File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
if (sparkRPath.exists() && sparkRPath.isFile()) {
mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
} else {
LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
}
}
private String getSparkMaster() {
String master = getJavaProperties().getProperty("master");
if (master == null) {
master = getJavaProperties().getProperty("spark.master", "local[*]");
}
return master;
}
private String getDeployMode() {
String master = getSparkMaster();
if (master.equals("yarn-client")) {
return "client";
} else if (master.equals("yarn-cluster")) {
return "cluster";
} else if (master.startsWith("local")) {
return "client";
} else {
String deployMode = getJavaProperties().getProperty("spark.submit.deployMode");
if (deployMode == null) {
throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
"is not specified");
}
if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
}
return deployMode;
}
}
private boolean isYarnMode() {
return getSparkMaster().startsWith("yarn");
}
private String toShellFormat(String value) {
if (value.contains("\'") && value.contains("\"")) {
throw new RuntimeException("Spark property value could not contain both \" and '");
} else if (value.contains("\'")) {
return "\"" + value + "\"";
} else {
return "\'" + value + "\'";
=======
// Create Interpreter in ZeppelinServer for non-remote mode
private Interpreter createLocalInterpreter(String className) {
LOGGER.info("Create Local Interpreter {} from {}", className, interpreterDir);
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
try {
URLClassLoader ccl = cleanCl.get(interpreterDir);
if (ccl == null) {
// classloader fallback
ccl = URLClassLoader.newInstance(new URL[]{}, oldcl);
}
boolean separateCL = true;
try { // check if server's classloader has driver already.
Class cls = this.getClass().forName(className);
if (cls != null) {
separateCL = false;
}
} catch (Exception e) {
LOGGER.error("exception checking server classloader driver", e);
}
URLClassLoader cl;
if (separateCL == true) {
cl = URLClassLoader.newInstance(new URL[]{}, ccl);
} else {
cl = ccl;
}
Thread.currentThread().setContextClassLoader(cl);
Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
Constructor<Interpreter> constructor =
replClass.getConstructor(new Class[]{Properties.class});
Interpreter repl = constructor.newInstance(getJavaProperties());
repl.setClassloaderUrls(ccl.getURLs());
LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl));
return intp;
} catch (SecurityException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
} catch (IllegalArgumentException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} finally {
Thread.currentThread().setContextClassLoader(oldcl);
}
}
synchronized RemoteInterpreterProcess createInterpreterProcess() throws IOException {
if (launcher == null) {
createLauncher();
>>>>>>> ZEPPELIN-2685. Improvement on Interpreter class
}
InterpreterLaunchContext launchContext = new
InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, name);

View file

@ -54,8 +54,6 @@ public class ShellScriptLauncher extends InterpreterLauncher {
int connectTimeout =
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
if (option.isExistingProcess()) {
// TODO(zjffdu) remove the existing process approach seems no one is using this.
// use the existing process
return new RemoteInterpreterRunningProcess(
connectTimeout,
option.getHost(),

View file

@ -54,9 +54,9 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
}
setupPropertiesForPySpark(sparkProperties);
setupPropertiesForSparkR(sparkProperties, properties.getProperty("SPARK_HOME"));
setupPropertiesForSparkR(sparkProperties);
if (isYarnMode() && getDeployMode().equals("cluster")) {
env.put("SPARK_YARN_CLUSTER", "true");
env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
}
StringBuilder sparkConfBuilder = new StringBuilder();
@ -71,16 +71,39 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
}
env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
// use the HADOOP_CONF_DIR defined in zeppelin-env.sh if it is not
// specified in interpreter setting
if (!env.containsKey("HADOOP_CONF_DIR") && System.getenv("HADOOP_CONF_DIR") != null) {
env.put("HADOOP_CONF_DIR", System.getenv("HADOOP_CONF_DIR"));
// set these env in the order of
// 1. interpreter-setting
// 2. zeppelin-env.sh
// It is encouraged to set env in interpreter setting, but just for backward compatability,
// we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) {
String envValue = getEnv(envName);
if (envValue != null) {
env.put(envName, envValue);
}
}
LOGGER.debug("buildEnvFromProperties: " + env);
return env;
}
/**
* get environmental variable in the following order
*
* 1. interpreter setting
* 2. zeppelin-env.sh
*
*/
private String getEnv(String envName) {
String env = properties.getProperty(envName);
if (env == null) {
env = System.getenv(envName);
}
return env;
}
private boolean isSparkConf(String key, String value) {
return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
}
@ -101,8 +124,8 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
}
}
private void setupPropertiesForSparkR(Properties sparkProperties,
String sparkHome) {
private void setupPropertiesForSparkR(Properties sparkProperties) {
String sparkHome = getEnv("SPARK_HOME");
File sparkRBasePath = null;
if (sparkHome == null) {
if (!getSparkMaster(properties).startsWith("local")) {
@ -129,8 +152,7 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
* Order to look for spark master
* 1. master in interpreter setting
* 2. spark.master interpreter setting
* 3. SPARK_HOME in zeppelin-env.sh
* 4. use local[*]
* 3. use local[*]
* @param properties
* @return
*/
@ -138,9 +160,6 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
String master = properties.getProperty("master");
if (master == null) {
master = properties.getProperty("spark.master");
if (master == null) {
master = System.getenv("SPARK_HOME");
}
if (master == null) {
master = "local[*]";
}

View file

@ -895,7 +895,7 @@ public class Notebook implements NoteEventListener {
try {
notebook.getInterpreterSettingManager().restart(setting.getId());
} catch (InterpreterException e) {
logger.warn("Fail to resetart interpreter: " + setting.getId(), e);
logger.error("Fail to restart interpreter: " + setting.getId(), e);
}
}
}

View file

@ -127,7 +127,7 @@ public class SparkInterpreterLauncherTest {
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(3, interpreterProcess.getEnv().size());
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals("true", interpreterProcess.getEnv().get("SPARK_YARN_CLUSTER"));
assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
assertEquals(" --master yarn-cluster --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@ -154,7 +154,7 @@ public class SparkInterpreterLauncherTest {
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(3, interpreterProcess.getEnv().size());
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals("true", interpreterProcess.getEnv().get("SPARK_YARN_CLUSTER"));
assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
}