mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
[ZEPPELIN-4692]. zeppelin pyspark doesn't print java output
This commit is contained in:
parent
8988723061
commit
e1a5ead7eb
5 changed files with 78 additions and 30 deletions
|
|
@ -296,15 +296,31 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
"df = pd.DataFrame(np.random.randn(1000, 4), index=idx, columns=list('ABCD')).cumsum()\n" +
|
||||
"import hvplot.pandas\n" +
|
||||
"df.hvplot()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(context.out.toInterpreterResultMessage().get(0).getData(),
|
||||
InterpreterResult.Code.SUCCESS, result.code());
|
||||
interpreterResultMessages = context.out.toInterpreterResultMessage();
|
||||
assertEquals(4, interpreterResultMessages.size());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
|
||||
// docs_json is the source data of plotting which bokeh would use to render the plotting.
|
||||
assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json"));
|
||||
|
||||
if (isPython2) {
|
||||
// python 2 will have one extra output
|
||||
// %text /home/travis/miniconda/lib/python2.7/site-packages/param/parameterized.py:2812:
|
||||
// UserWarning: Config option `use_jedi` not recognized by `IPCompleter`.
|
||||
// return inst.__call__(*args,**params)
|
||||
assertEquals(5, interpreterResultMessages.size());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(4).getType());
|
||||
// docs_json is the source data of plotting which bokeh would use to render the plotting.
|
||||
assertTrue(interpreterResultMessages.get(4).getData().contains("docs_json"));
|
||||
} else {
|
||||
assertEquals(4, interpreterResultMessages.size());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
|
||||
// docs_json is the source data of plotting which bokeh would use to render the plotting.
|
||||
assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json"));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -23,11 +23,13 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.python.IPythonInterpreter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
|
|
@ -91,25 +93,35 @@ public class IPySparkInterpreter extends IPythonInterpreter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String st,
|
||||
InterpreterContext context) throws InterpreterException {
|
||||
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
|
||||
InterpreterContext.set(context);
|
||||
String jobGroupId = Utils.buildJobGroupId(context);
|
||||
String jobDesc = Utils.buildJobDesc(context);
|
||||
String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
|
||||
InterpreterResult result = super.interpret(setJobGroupStmt, context);
|
||||
if (result.code().equals(InterpreterResult.Code.ERROR)) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
|
||||
// redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
|
||||
PrintStream originalStdout = System.out;
|
||||
PrintStream originalStderr = System.err;
|
||||
try {
|
||||
System.setOut(new PrintStream(context.out));
|
||||
System.setErr(new PrintStream(context.out));
|
||||
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
|
||||
InterpreterContext.set(context);
|
||||
String jobGroupId = Utils.buildJobGroupId(context);
|
||||
String jobDesc = Utils.buildJobDesc(context);
|
||||
String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
|
||||
InterpreterResult result = super.interpret(setJobGroupStmt, context);
|
||||
if (result.code().equals(InterpreterResult.Code.ERROR)) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
|
||||
}
|
||||
String pool = "None";
|
||||
if (context.getLocalProperties().containsKey("pool")) {
|
||||
pool = "'" + context.getLocalProperties().get("pool") + "'";
|
||||
}
|
||||
String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
|
||||
result = super.interpret(setPoolStmt, context);
|
||||
if (result.code().equals(InterpreterResult.Code.ERROR)) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
|
||||
}
|
||||
return super.interpret(st, context);
|
||||
} finally {
|
||||
System.setOut(originalStdout);
|
||||
System.setErr(originalStderr);
|
||||
}
|
||||
String pool = "None";
|
||||
if (context.getLocalProperties().containsKey("pool")) {
|
||||
pool = "'" + context.getLocalProperties().get("pool") + "'";
|
||||
}
|
||||
String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
|
||||
result = super.interpret(setPoolStmt, context);
|
||||
if (result.code().equals(InterpreterResult.Code.ERROR)) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
|
||||
}
|
||||
return super.interpret(st, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.python.IPythonInterpreter;
|
||||
import org.apache.zeppelin.python.PythonInterpreter;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
|
|
@ -125,8 +127,18 @@ public class PySparkInterpreter extends PythonInterpreter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
|
||||
return super.interpret(st, context);
|
||||
// redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
|
||||
PrintStream originalStdout = System.out;
|
||||
PrintStream originalStderr = System.err;
|
||||
try {
|
||||
System.setOut(new PrintStream(context.out));
|
||||
System.setErr(new PrintStream(context.out));
|
||||
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
|
||||
return super.interpret(st, context);
|
||||
} finally {
|
||||
System.setOut(originalStdout);
|
||||
System.setErr(originalStderr);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -217,6 +217,14 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
|
|||
assertTrue(completions.size() > 0);
|
||||
completions.contains(new InterpreterCompletion("sc", "sc", ""));
|
||||
|
||||
// python call java via py4j
|
||||
context = createInterpreterContext(mockIntpEventClient);
|
||||
result = interpreter.interpret("sc._jvm.java.lang.System.out.println(\"hello world\")", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
interpreterResultMessages = context.out.toInterpreterResultMessage();
|
||||
assertEquals(1, interpreterResultMessages.size());
|
||||
assertEquals("hello world\n", interpreterResultMessages.get(0).getData());
|
||||
|
||||
// pyspark streaming TODO(zjffdu) disable pyspark streaming test temporary
|
||||
context = createInterpreterContext(mockIntpEventClient);
|
||||
// result = interpreter.interpret(
|
||||
|
|
|
|||
|
|
@ -36,11 +36,11 @@ if [[ -n "$PYTHON" ]] ; then
|
|||
|
||||
if [[ "$PYTHON" == "2" ]] ; then
|
||||
pip install -q numpy==1.14.5 pandas==0.21.1 matplotlib==2.1.1 scipy==1.2.1 grpcio==1.19.0 bkzep==0.6.1 hvplot==0.5.2 \
|
||||
protobuf==3.7.0 pandasql==0.7.3 ipython==5.8.0 ipykernel==4.10.0 bokeh==1.3.4
|
||||
protobuf==3.7.0 pandasql==0.7.3 ipython==5.8.0 ipykernel==4.10.0 bokeh==1.3.4 panel==0.6.0 holoviews==1.12.3
|
||||
else
|
||||
pip install -q pycodestyle==2.5.0
|
||||
pip install -q numpy==1.17.3 pandas==0.25.0 scipy==1.3.1 grpcio==1.19.0 bkzep==0.6.1 hvplot==0.5.2 protobuf==3.10.0 \
|
||||
pandasql==0.7.3 ipython==7.8.0 matplotlib==3.0.3 ipykernel==5.1.2 jupyter_client==5.3.4 bokeh==1.3.4 pycodestyle==2.5.0 apache_beam==2.15.0
|
||||
pandasql==0.7.3 ipython==7.8.0 matplotlib==3.0.3 ipykernel==5.1.2 jupyter_client==5.3.4 bokeh==1.3.4 panel==0.6.0 holoviews==1.12.3 pycodestyle==2.5.0 apache_beam==2.15.0
|
||||
fi
|
||||
|
||||
if [[ -n "$TENSORFLOW" ]] ; then
|
||||
|
|
|
|||
Loading…
Reference in a new issue