ZEPPELIN-3538. Fail to bootstrap PySpark in yarn cluster mode

This commit is contained in:
Jeff Zhang 2018-06-13 10:14:14 +08:00
parent b7d98b3f11
commit 3a1d8a737b
5 changed files with 28 additions and 8 deletions

View file

@ -68,7 +68,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreter.class);
private static final int MAX_TIMEOUT_SEC = 10;
private static final int MAX_TIMEOUT_SEC = 30;
private GatewayServer gatewayServer;
private DefaultExecutor executor;
@ -292,10 +292,16 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
public class PythonInterpretRequest {
public String statements;
public boolean isForCompletion;
public boolean isCallHooks;
public PythonInterpretRequest(String statements, boolean isForCompletion) {
this(statements, isForCompletion, true);
}
public PythonInterpretRequest(String statements, boolean isForCompletion, boolean isCallHooks) {
this.statements = statements;
this.isForCompletion = isForCompletion;
this.isCallHooks = isCallHooks;
}
public String statements() {
@ -305,6 +311,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
public boolean isForCompletion() {
return isForCompletion;
}
public boolean isCallHooks() {
return isCallHooks;
}
}
// called by Python Process
@ -602,7 +612,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
String bootstrapCode =
IOUtils.toString(getClass().getClassLoader().getResourceAsStream(resourceName));
try {
InterpreterResult result = interpret(bootstrapCode, InterpreterContext.get());
// Add hook explicitly, otherwise python will fail to execute the statement
InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()",
InterpreterContext.get());
if (result.code() != Code.SUCCESS) {
throw new IOException("Fail to run bootstrap script: " + resourceName);
}

View file

@ -111,12 +111,18 @@ while True :
# Get post-execute hooks
try:
global_hook = intp.getHook('post_exec_dev')
if req.isCallHooks():
global_hook = intp.getHook('post_exec_dev')
else:
global_hook = None
except:
global_hook = None
try:
user_hook = __zeppelin__.getHook('post_exec')
if req.isCallHooks():
user_hook = __zeppelin__.getHook('post_exec')
else:
user_hook = None
except:
user_hook = None
@ -133,7 +139,6 @@ while True :
to_run_hooks = []
if (nhooks > 0):
to_run_hooks = code.body[-nhooks:]
to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
[code.body[-(nhooks + 1)]])
try:

View file

@ -130,11 +130,11 @@ public class PySparkInterpreter extends PythonInterpreter {
try {
URLClassLoader newCl = new URLClassLoader(urls, oldCl);
Thread.currentThread().setContextClassLoader(newCl);
// create Python Process and JVM gateway
super.open();
// must create spark interpreter after ClassLoader is set, otherwise the additional jars
// can not be loaded by spark repl.
this.sparkInterpreter = getSparkInterpreter();
// create Python Process and JVM gateway
super.open();
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
}
@ -175,7 +175,7 @@ public class PySparkInterpreter extends PythonInterpreter {
String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
callPython(new PythonInterpretRequest(
String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc),
false));
false, false));
}
// Run python shell

View file

@ -38,6 +38,7 @@ public class MiniZeppelin {
FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties"));
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "60000");
conf = new ZeppelinConfiguration();
interpreterSettingManager = new InterpreterSettingManager(conf,
mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));

View file

@ -44,3 +44,5 @@ log4j.logger.org.hibernate.type=ALL
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zeppelin.plugin=DEBUG
log4j.logger.org.apache.zeppelin.spark=DEBUG
log4j.logger.org.apache.zeppelin.python=DEBUG