ZEPPELIN-3234. z.show() compatibility with previous release

This commit is contained in:
Jeff Zhang 2018-02-15 14:19:00 +08:00
parent 2be8f35065
commit 39637eea17
5 changed files with 50 additions and 7 deletions

View file

@ -30,6 +30,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@ -76,7 +77,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
private IPythonClient ipythonClient;
private GatewayServer gatewayServer;
private PythonZeppelinContext zeppelinContext;
protected BaseZeppelinContext zeppelinContext;
private String pythonExecutable;
private long ipythonLaunchTimeout;
private String additionalPythonPath;
@ -114,6 +115,12 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
this.useBuiltinPy4j = add;
}
public BaseZeppelinContext buildZeppelinContext() {
return new PythonZeppelinContext(
getInterpreterGroup().getInterpreterHookRegistry(),
Integer.parseInt(getProperty("zeppelin.python.maxResult", "1000")));
}
@Override
public void open() throws InterpreterException {
try {
@ -130,9 +137,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
ipythonLaunchTimeout = Long.parseLong(
getProperty("zeppelin.ipython.launch.timeout", "30000"));
this.zeppelinContext = new PythonZeppelinContext(
getInterpreterGroup().getInterpreterHookRegistry(),
Integer.parseInt(getProperty("zeppelin.python.maxResult", "1000")));
this.zeppelinContext = buildZeppelinContext();
int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
@ -312,6 +317,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
public InterpreterResult interpret(String st, InterpreterContext context) {
zeppelinContext.setGui(context.getGui());
zeppelinContext.setNoteGui(context.getNoteGui());
zeppelinContext.setInterpreterContext(context);
interpreterOutput.setInterpreterOutput(context.out);
ExecuteResponse response =
ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
@ -361,7 +367,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
return completions;
}
public PythonZeppelinContext getZeppelinContext() {
public BaseZeppelinContext getZeppelinContext() {
return zeppelinContext;
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@ -91,6 +92,11 @@ public class IPySparkInterpreter extends IPythonInterpreter {
return spark;
}
@Override
public BaseZeppelinContext buildZeppelinContext() {
return sparkInterpreter.getZeppelinContext();
}
@Override
public void cancel(InterpreterContext context) throws InterpreterException {
super.cancel(context);

View file

@ -51,3 +51,17 @@ if intp.isSpark2():
sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
else:
sqlContext = sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
class IPySparkZeppelinContext(PyZeppelinContext):
def __init__(self, z):
super(IPySparkZeppelinContext, self).__init__(z)
def show(self, obj):
from pyspark.sql import DataFrame
if isinstance(obj, DataFrame):
print(self.z.showData(obj._jdf))
else:
super(IPySparkZeppelinContext, self).show(obj)
z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext())

View file

@ -116,6 +116,15 @@ public class IPySparkInterpreterTest {
"| 1| a|\n" +
"| 2| b|\n" +
"+---+---+\n\n", interpreterResultMessages.get(0).getData());
context = getInterpreterContext();
result = iPySparkInterpreter.interpret("z.show(df)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(
"_1 _2\n" +
"1 a\n" +
"2 b\n", interpreterResultMessages.get(0).getData());
} else {
result = iPySparkInterpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@ -127,6 +136,15 @@ public class IPySparkInterpreterTest {
"| 1| a|\n" +
"| 2| b|\n" +
"+---+---+\n\n", interpreterResultMessages.get(0).getData());
context = getInterpreterContext();
result = iPySparkInterpreter.interpret("z.show(df)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(
"_1 _2\n" +
"1 a\n" +
"2 b\n", interpreterResultMessages.get(0).getData());
}
// cancel

View file

@ -46,7 +46,6 @@ log4j.logger.org.hibernate.type=ALL
log4j.logger.org.apache.zeppelin.interpreter=DEBUG
log4j.logger.org.apache.zeppelin.spark=DEBUG
log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG
log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG
log4j.logger.org.apache.zeppelin.python=DEBUG
log4j.logger.org.apache.spark.repl.Main=INFO