ZEPPELIN-3364. Followup of ZEPPELIN-3362, improve ZeppelinContext & add more test

This commit is contained in:
Jeff Zhang 2018-03-25 22:13:44 +08:00
parent 5f88452d63
commit 891f1e111a
4 changed files with 92 additions and 17 deletions

View file

@ -17,6 +17,7 @@
import os, sys
import warnings
import base64
from io import BytesIO
@ -34,7 +35,7 @@ class PyZeppelinContext(object):
self.gateway = gateway
self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
self.javaList = gateway.jvm.java.util.ArrayList
self.max_result = 1000
self.max_result = z.getMaxResult()
self._displayhook = lambda *args: None
self._setup_matplotlib()
@ -129,13 +130,13 @@ class PyZeppelinContext(object):
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
# and so a dependency on pandas
self.show_dataframe(p, **kwargs)
elif hasattr(p, '__call__'):
p() #error reporting
else:
print(str(p))
def show_dataframe(self, df, show_index=False, **kwargs):
"""Pretty prints DF using Table Display System
"""
limit = len(df) > self.max_result
exceed_limit = len(df) > self.max_result
header_buf = StringIO("")
if show_index:
idx_name = str(df.index.name) if df.index.name is not None else ""
@ -147,7 +148,7 @@ class PyZeppelinContext(object):
header_buf.write("\n")
body_buf = StringIO("")
rows = df.head(self.max_result).values if limit else df.values
rows = df.head(self.max_result).values if exceed_limit else df.values
index = df.index.values
for idx, row in zip(index, rows):
if show_index:
@ -158,13 +159,12 @@ class PyZeppelinContext(object):
body_buf.write("\t")
body_buf.write(str(cell))
body_buf.write("\n")
body_buf.seek(0); header_buf.seek(0)
#TODO(bzz): fix it, so it shows red notice, as in Spark
print("%table " + header_buf.read() + body_buf.read()) # +
# ("\n<font color=red>Results are limited by {}.</font>" \
# .format(self.max_result) if limit else "")
#)
body_buf.seek(0)
header_buf.seek(0)
print("%table " + header_buf.read() + body_buf.read())
body_buf.close(); header_buf.close()
if exceed_limit:
print("%html <font color=red>Results are limited by {}.</font>".format(self.max_result))
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
**kwargs):
@ -176,7 +176,7 @@ class PyZeppelinContext(object):
img_str = b"data:image/png;base64,"
img_str += base64.b64encode(img.getvalue().strip())
img_tag = "<img src={img} style='width={width};height:{height}'>"
# Decoding is necessary for Python 3 compability
# Decoding is necessary for Python 3 compatibility
img_str = img_str.decode("ascii")
img_str = img_tag.format(img=img_str, width=width, height=height)
elif fmt == "svg":

View file

@ -71,7 +71,9 @@ public class IPythonInterpreterTest {
@Test
public void testIPython() throws IOException, InterruptedException, InterpreterException {
startInterpreter(new Properties());
Properties properties = new Properties();
properties.setProperty("zeppelin.python.maxResult", "3");
startInterpreter(properties);
testInterpreter(interpreter);
}
@ -454,9 +456,29 @@ public class IPythonInterpreterTest {
result = interpreter.interpret("import pandas as pd\ndf = pd.DataFrame({'id':[1,2,3], 'name':['a','b','c']})\nz.show(df)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, interpreterResultMessages.size());
assertEquals(InterpreterResult.Type.TABLE, interpreterResultMessages.get(0).getType());
assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", interpreterResultMessages.get(0).getData());
context = getInterpreterContext();
result = interpreter.interpret("import pandas as pd\ndf = pd.DataFrame({'id':[1,2,3,4], 'name':['a','b','c', 'd']})\nz.show(df)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(2, interpreterResultMessages.size());
assertEquals(InterpreterResult.Type.TABLE, interpreterResultMessages.get(0).getType());
assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", interpreterResultMessages.get(0).getData());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
assertEquals("<font color=red>Results are limited by 3.</font>\n", interpreterResultMessages.get(1).getData());
// z.show(matplotlib)
context = getInterpreterContext();
result = interpreter.interpret("import matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)\nz.show(plt)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(2, interpreterResultMessages.size());
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
assertEquals(InterpreterResult.Type.IMG, interpreterResultMessages.get(1).getType());
// clear output
context = getInterpreterContext();
result = interpreter.interpret("import time\nprint(\"Hello\")\ntime.sleep(0.5)\nz.getInterpreterContext().out().clear()\nprint(\"world\")\n", context);

View file

@ -64,7 +64,7 @@ public class IPySparkInterpreterTest {
p.setProperty("spark.submit.deployMode", "client");
p.setProperty("spark.app.name", "Zeppelin Test");
p.setProperty("zeppelin.spark.useHiveContext", "true");
p.setProperty("zeppelin.spark.maxResult", "1000");
p.setProperty("zeppelin.spark.maxResult", "3");
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
p.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath());

View file

@ -18,7 +18,9 @@ package org.apache.zeppelin.rest;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
import org.apache.zeppelin.interpreter.InterpreterProperty;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
@ -367,15 +369,22 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
Paragraph p3 = note.addNewParagraph(anonymous);
p3.setText("%spark.pyspark print(z.get(\"var_1\"))");
// resources across interpreter processes (via DistributedResourcePool)
Paragraph p4 = note.addNewParagraph(anonymous);
p4.setText("%python print(z.get('var_1'))");
note.run(p1.getId(), true);
note.run(p2.getId(), true);
note.run(p3.getId(), true);
note.run(p4.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
assertEquals(Status.FINISHED, p2.getStatus());
assertEquals("hello world\n", p2.getResult().message().get(0).getData());
assertEquals(Status.FINISHED, p3.getStatus());
assertEquals("hello world\n", p3.getResult().message().get(0).getData());
assertEquals(Status.FINISHED, p4.getStatus());
assertEquals("hello world\n", p4.getResult().message().get(0).getData());
}
@Test
@ -502,8 +511,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
"[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" +
"print(items[0])";
p.setText(code);
note.run(p.getId());
waitForFinish(p);
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
Iterator<String> formIter = p.settings.getForms().keySet().iterator();
@ -519,6 +527,51 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
assertEquals("2", result[2]);
}
@Test
public void testAngularObjects() throws IOException, InterpreterNotFoundException {
Note note = ZeppelinServer.notebook.createNote(anonymous);
Paragraph p1 = note.addNewParagraph(anonymous);
// add local angular object
p1.setText("%spark z.angularBind(\"name\", \"world\")");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
List<AngularObject> angularObjects =
p1.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(note.getId(), null);
assertEquals(1, angularObjects.size());
assertEquals("name", angularObjects.get(0).getName());
assertEquals("world", angularObjects.get(0).get());
// remove local angular object
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%spark z.angularUnbind(\"name\")");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
angularObjects =
p1.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(note.getId(), null);
assertEquals(0, angularObjects.size());
// add global angular object
Paragraph p3 = note.addNewParagraph(anonymous);
p3.setText("%spark z.angularBindGlobal(\"name2\", \"world2\")");
note.run(p3.getId(), true);
assertEquals(Status.FINISHED, p3.getStatus());
List<AngularObject> globalAngularObjects =
p3.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(null, null);
assertEquals(1, globalAngularObjects.size());
assertEquals("name2", globalAngularObjects.get(0).getName());
assertEquals("world2", globalAngularObjects.get(0).get());
// remove global angular object
Paragraph p4 = note.addNewParagraph(anonymous);
p4.setText("%spark z.angularUnbindGlobal(\"name2\")");
note.run(p4.getId(), true);
assertEquals(Status.FINISHED, p4.getStatus());
globalAngularObjects =
p4.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(note.getId(), null);
assertEquals(0, globalAngularObjects.size());
}
@Test
public void testConfInterpreter() throws IOException {
ZeppelinServer.notebook.getInterpreterSettingManager().close();