[ZEPPELIN-4692]. zeppelin pyspark doesn't print java output

This commit is contained in:
Jeff Zhang 2020-03-24 11:09:09 +08:00
parent 8988723061
commit e1a5ead7eb
5 changed files with 78 additions and 30 deletions

View file

@ -296,15 +296,31 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
"df = pd.DataFrame(np.random.randn(1000, 4), index=idx, columns=list('ABCD')).cumsum()\n" +
"import hvplot.pandas\n" +
"df.hvplot()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(context.out.toInterpreterResultMessage().get(0).getData(),
InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(4, interpreterResultMessages.size());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
// docs_json is the source data of plotting which bokeh would use to render the plotting.
assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json"));
if (isPython2) {
// python 2 will have one extra output
// %text /home/travis/miniconda/lib/python2.7/site-packages/param/parameterized.py:2812:
// UserWarning: Config option `use_jedi` not recognized by `IPCompleter`.
// return inst.__call__(*args,**params)
assertEquals(5, interpreterResultMessages.size());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(4).getType());
// docs_json is the source data of plotting which bokeh would use to render the plotting.
assertTrue(interpreterResultMessages.get(4).getData().contains("docs_json"));
} else {
assertEquals(4, interpreterResultMessages.size());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
// docs_json is the source data of plotting which bokeh would use to render the plotting.
assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json"));
}
}

View file

@ -23,11 +23,13 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Map;
import java.util.Properties;
@ -91,25 +93,35 @@ public class IPySparkInterpreter extends IPythonInterpreter {
@Override
public InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
InterpreterContext.set(context);
String jobGroupId = Utils.buildJobGroupId(context);
String jobDesc = Utils.buildJobDesc(context);
String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
InterpreterResult result = super.interpret(setJobGroupStmt, context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
// redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
PrintStream originalStdout = System.out;
PrintStream originalStderr = System.err;
try {
System.setOut(new PrintStream(context.out));
System.setErr(new PrintStream(context.out));
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
InterpreterContext.set(context);
String jobGroupId = Utils.buildJobGroupId(context);
String jobDesc = Utils.buildJobDesc(context);
String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
InterpreterResult result = super.interpret(setJobGroupStmt, context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
}
String pool = "None";
if (context.getLocalProperties().containsKey("pool")) {
pool = "'" + context.getLocalProperties().get("pool") + "'";
}
String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
result = super.interpret(setPoolStmt, context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
}
return super.interpret(st, context);
} finally {
System.setOut(originalStdout);
System.setErr(originalStderr);
}
String pool = "None";
if (context.getLocalProperties().containsKey("pool")) {
pool = "'" + context.getLocalProperties().get("pool") + "'";
}
String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
result = super.interpret(setPoolStmt, context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
}
return super.interpret(st, context);
}
@Override

View file

@ -25,6 +25,7 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.apache.zeppelin.python.PythonInterpreter;
import org.slf4j.Logger;
@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
@ -125,8 +127,18 @@ public class PySparkInterpreter extends PythonInterpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
return super.interpret(st, context);
// redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
PrintStream originalStdout = System.out;
PrintStream originalStderr = System.err;
try {
System.setOut(new PrintStream(context.out));
System.setErr(new PrintStream(context.out));
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
return super.interpret(st, context);
} finally {
System.setOut(originalStdout);
System.setErr(originalStderr);
}
}
@Override

View file

@ -217,6 +217,14 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
assertTrue(completions.size() > 0);
completions.contains(new InterpreterCompletion("sc", "sc", ""));
// python call java via py4j
context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret("sc._jvm.java.lang.System.out.println(\"hello world\")", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, interpreterResultMessages.size());
assertEquals("hello world\n", interpreterResultMessages.get(0).getData());
// pyspark streaming TODO(zjffdu) disable pyspark streaming test temporary
context = createInterpreterContext(mockIntpEventClient);
// result = interpreter.interpret(

View file

@ -36,11 +36,11 @@ if [[ -n "$PYTHON" ]] ; then
if [[ "$PYTHON" == "2" ]] ; then
pip install -q numpy==1.14.5 pandas==0.21.1 matplotlib==2.1.1 scipy==1.2.1 grpcio==1.19.0 bkzep==0.6.1 hvplot==0.5.2 \
protobuf==3.7.0 pandasql==0.7.3 ipython==5.8.0 ipykernel==4.10.0 bokeh==1.3.4
protobuf==3.7.0 pandasql==0.7.3 ipython==5.8.0 ipykernel==4.10.0 bokeh==1.3.4 panel==0.6.0 holoviews==1.12.3
else
pip install -q pycodestyle==2.5.0
pip install -q numpy==1.17.3 pandas==0.25.0 scipy==1.3.1 grpcio==1.19.0 bkzep==0.6.1 hvplot==0.5.2 protobuf==3.10.0 \
pandasql==0.7.3 ipython==7.8.0 matplotlib==3.0.3 ipykernel==5.1.2 jupyter_client==5.3.4 bokeh==1.3.4 pycodestyle==2.5.0 apache_beam==2.15.0
pandasql==0.7.3 ipython==7.8.0 matplotlib==3.0.3 ipykernel==5.1.2 jupyter_client==5.3.4 bokeh==1.3.4 panel==0.6.0 holoviews==1.12.3 pycodestyle==2.5.0 apache_beam==2.15.0
fi
if [[ -n "$TENSORFLOW" ]] ; then