mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-3538. Fail to bootstrap PySpark in yarn cluster mode
This commit is contained in:
parent
b7d98b3f11
commit
3a1d8a737b
5 changed files with 28 additions and 8 deletions
|
|
@ -68,7 +68,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
*/
|
||||
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreter.class);
|
||||
private static final int MAX_TIMEOUT_SEC = 10;
|
||||
private static final int MAX_TIMEOUT_SEC = 30;
|
||||
|
||||
private GatewayServer gatewayServer;
|
||||
private DefaultExecutor executor;
|
||||
|
|
@ -292,10 +292,16 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
public class PythonInterpretRequest {
|
||||
public String statements;
|
||||
public boolean isForCompletion;
|
||||
public boolean isCallHooks;
|
||||
|
||||
public PythonInterpretRequest(String statements, boolean isForCompletion) {
|
||||
this(statements, isForCompletion, true);
|
||||
}
|
||||
|
||||
public PythonInterpretRequest(String statements, boolean isForCompletion, boolean isCallHooks) {
|
||||
this.statements = statements;
|
||||
this.isForCompletion = isForCompletion;
|
||||
this.isCallHooks = isCallHooks;
|
||||
}
|
||||
|
||||
public String statements() {
|
||||
|
|
@ -305,6 +311,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
public boolean isForCompletion() {
|
||||
return isForCompletion;
|
||||
}
|
||||
|
||||
public boolean isCallHooks() {
|
||||
return isCallHooks;
|
||||
}
|
||||
}
|
||||
|
||||
// called by Python Process
|
||||
|
|
@ -602,7 +612,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
String bootstrapCode =
|
||||
IOUtils.toString(getClass().getClassLoader().getResourceAsStream(resourceName));
|
||||
try {
|
||||
InterpreterResult result = interpret(bootstrapCode, InterpreterContext.get());
|
||||
// Add hook explicitly, otherwise python will fail to execute the statement
|
||||
InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()",
|
||||
InterpreterContext.get());
|
||||
if (result.code() != Code.SUCCESS) {
|
||||
throw new IOException("Fail to run bootstrap script: " + resourceName);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -111,12 +111,18 @@ while True :
|
|||
|
||||
# Get post-execute hooks
|
||||
try:
|
||||
global_hook = intp.getHook('post_exec_dev')
|
||||
if req.isCallHooks():
|
||||
global_hook = intp.getHook('post_exec_dev')
|
||||
else:
|
||||
global_hook = None
|
||||
except:
|
||||
global_hook = None
|
||||
|
||||
try:
|
||||
user_hook = __zeppelin__.getHook('post_exec')
|
||||
if req.isCallHooks():
|
||||
user_hook = __zeppelin__.getHook('post_exec')
|
||||
else:
|
||||
user_hook = None
|
||||
except:
|
||||
user_hook = None
|
||||
|
||||
|
|
@ -133,7 +139,6 @@ while True :
|
|||
to_run_hooks = []
|
||||
if (nhooks > 0):
|
||||
to_run_hooks = code.body[-nhooks:]
|
||||
|
||||
to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
|
||||
[code.body[-(nhooks + 1)]])
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -130,11 +130,11 @@ public class PySparkInterpreter extends PythonInterpreter {
|
|||
try {
|
||||
URLClassLoader newCl = new URLClassLoader(urls, oldCl);
|
||||
Thread.currentThread().setContextClassLoader(newCl);
|
||||
// create Python Process and JVM gateway
|
||||
super.open();
|
||||
// must create spark interpreter after ClassLoader is set, otherwise the additional jars
|
||||
// can not be loaded by spark repl.
|
||||
this.sparkInterpreter = getSparkInterpreter();
|
||||
// create Python Process and JVM gateway
|
||||
super.open();
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(oldCl);
|
||||
}
|
||||
|
|
@ -175,7 +175,7 @@ public class PySparkInterpreter extends PythonInterpreter {
|
|||
String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
|
||||
callPython(new PythonInterpretRequest(
|
||||
String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc),
|
||||
false));
|
||||
false, false));
|
||||
}
|
||||
|
||||
// Run python shell
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ public class MiniZeppelin {
|
|||
FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties"));
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "60000");
|
||||
conf = new ZeppelinConfiguration();
|
||||
interpreterSettingManager = new InterpreterSettingManager(conf,
|
||||
mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
|
||||
|
|
|
|||
|
|
@ -44,3 +44,5 @@ log4j.logger.org.hibernate.type=ALL
|
|||
|
||||
log4j.logger.org.apache.hadoop=WARN
|
||||
log4j.logger.org.apache.zeppelin.plugin=DEBUG
|
||||
log4j.logger.org.apache.zeppelin.spark=DEBUG
|
||||
log4j.logger.org.apache.zeppelin.python=DEBUG
|
||||
Loading…
Reference in a new issue