mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-3364. Followup of ZEPPELIN-3362, improve ZeppelinContext & add more test
This commit is contained in:
parent
5f88452d63
commit
891f1e111a
4 changed files with 92 additions and 17 deletions
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
import os, sys
|
||||
import warnings
|
||||
import base64
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
|
|
@ -34,7 +35,7 @@ class PyZeppelinContext(object):
|
|||
self.gateway = gateway
|
||||
self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
|
||||
self.javaList = gateway.jvm.java.util.ArrayList
|
||||
self.max_result = 1000
|
||||
self.max_result = z.getMaxResult()
|
||||
self._displayhook = lambda *args: None
|
||||
self._setup_matplotlib()
|
||||
|
||||
|
|
@ -129,13 +130,13 @@ class PyZeppelinContext(object):
|
|||
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
|
||||
# and so a dependency on pandas
|
||||
self.show_dataframe(p, **kwargs)
|
||||
elif hasattr(p, '__call__'):
|
||||
p() #error reporting
|
||||
|
||||
else:
|
||||
print(str(p))
|
||||
|
||||
def show_dataframe(self, df, show_index=False, **kwargs):
|
||||
"""Pretty prints DF using Table Display System
|
||||
"""
|
||||
limit = len(df) > self.max_result
|
||||
exceed_limit = len(df) > self.max_result
|
||||
header_buf = StringIO("")
|
||||
if show_index:
|
||||
idx_name = str(df.index.name) if df.index.name is not None else ""
|
||||
|
|
@ -147,7 +148,7 @@ class PyZeppelinContext(object):
|
|||
header_buf.write("\n")
|
||||
|
||||
body_buf = StringIO("")
|
||||
rows = df.head(self.max_result).values if limit else df.values
|
||||
rows = df.head(self.max_result).values if exceed_limit else df.values
|
||||
index = df.index.values
|
||||
for idx, row in zip(index, rows):
|
||||
if show_index:
|
||||
|
|
@ -158,13 +159,12 @@ class PyZeppelinContext(object):
|
|||
body_buf.write("\t")
|
||||
body_buf.write(str(cell))
|
||||
body_buf.write("\n")
|
||||
body_buf.seek(0); header_buf.seek(0)
|
||||
#TODO(bzz): fix it, so it shows red notice, as in Spark
|
||||
print("%table " + header_buf.read() + body_buf.read()) # +
|
||||
# ("\n<font color=red>Results are limited by {}.</font>" \
|
||||
# .format(self.max_result) if limit else "")
|
||||
#)
|
||||
body_buf.seek(0)
|
||||
header_buf.seek(0)
|
||||
print("%table " + header_buf.read() + body_buf.read())
|
||||
body_buf.close(); header_buf.close()
|
||||
if exceed_limit:
|
||||
print("%html <font color=red>Results are limited by {}.</font>".format(self.max_result))
|
||||
|
||||
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
|
||||
**kwargs):
|
||||
|
|
@ -176,7 +176,7 @@ class PyZeppelinContext(object):
|
|||
img_str = b"data:image/png;base64,"
|
||||
img_str += base64.b64encode(img.getvalue().strip())
|
||||
img_tag = "<img src={img} style='width={width};height:{height}'>"
|
||||
# Decoding is necessary for Python 3 compability
|
||||
# Decoding is necessary for Python 3 compatibility
|
||||
img_str = img_str.decode("ascii")
|
||||
img_str = img_tag.format(img=img_str, width=width, height=height)
|
||||
elif fmt == "svg":
|
||||
|
|
|
|||
|
|
@ -71,7 +71,9 @@ public class IPythonInterpreterTest {
|
|||
|
||||
@Test
|
||||
public void testIPython() throws IOException, InterruptedException, InterpreterException {
|
||||
startInterpreter(new Properties());
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("zeppelin.python.maxResult", "3");
|
||||
startInterpreter(properties);
|
||||
testInterpreter(interpreter);
|
||||
}
|
||||
|
||||
|
|
@ -454,9 +456,29 @@ public class IPythonInterpreterTest {
|
|||
result = interpreter.interpret("import pandas as pd\ndf = pd.DataFrame({'id':[1,2,3], 'name':['a','b','c']})\nz.show(df)", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
interpreterResultMessages = context.out.toInterpreterResultMessage();
|
||||
assertEquals(1, interpreterResultMessages.size());
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResultMessages.get(0).getType());
|
||||
assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", interpreterResultMessages.get(0).getData());
|
||||
|
||||
context = getInterpreterContext();
|
||||
result = interpreter.interpret("import pandas as pd\ndf = pd.DataFrame({'id':[1,2,3,4], 'name':['a','b','c', 'd']})\nz.show(df)", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
interpreterResultMessages = context.out.toInterpreterResultMessage();
|
||||
assertEquals(2, interpreterResultMessages.size());
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResultMessages.get(0).getType());
|
||||
assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", interpreterResultMessages.get(0).getData());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
|
||||
assertEquals("<font color=red>Results are limited by 3.</font>\n", interpreterResultMessages.get(1).getData());
|
||||
|
||||
// z.show(matplotlib)
|
||||
context = getInterpreterContext();
|
||||
result = interpreter.interpret("import matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)\nz.show(plt)", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
interpreterResultMessages = context.out.toInterpreterResultMessage();
|
||||
assertEquals(2, interpreterResultMessages.size());
|
||||
assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
|
||||
assertEquals(InterpreterResult.Type.IMG, interpreterResultMessages.get(1).getType());
|
||||
|
||||
// clear output
|
||||
context = getInterpreterContext();
|
||||
result = interpreter.interpret("import time\nprint(\"Hello\")\ntime.sleep(0.5)\nz.getInterpreterContext().out().clear()\nprint(\"world\")\n", context);
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ public class IPySparkInterpreterTest {
|
|||
p.setProperty("spark.submit.deployMode", "client");
|
||||
p.setProperty("spark.app.name", "Zeppelin Test");
|
||||
p.setProperty("zeppelin.spark.useHiveContext", "true");
|
||||
p.setProperty("zeppelin.spark.maxResult", "1000");
|
||||
p.setProperty("zeppelin.spark.maxResult", "3");
|
||||
p.setProperty("zeppelin.spark.importImplicit", "true");
|
||||
p.setProperty("zeppelin.pyspark.python", "python");
|
||||
p.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath());
|
||||
|
|
|
|||
|
|
@ -18,7 +18,9 @@ package org.apache.zeppelin.rest;
|
|||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterProperty;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
|
|
@ -367,15 +369,22 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
Paragraph p3 = note.addNewParagraph(anonymous);
|
||||
p3.setText("%spark.pyspark print(z.get(\"var_1\"))");
|
||||
|
||||
// resources across interpreter processes (via DistributedResourcePool)
|
||||
Paragraph p4 = note.addNewParagraph(anonymous);
|
||||
p4.setText("%python print(z.get('var_1'))");
|
||||
|
||||
note.run(p1.getId(), true);
|
||||
note.run(p2.getId(), true);
|
||||
note.run(p3.getId(), true);
|
||||
note.run(p4.getId(), true);
|
||||
|
||||
assertEquals(Status.FINISHED, p1.getStatus());
|
||||
assertEquals(Status.FINISHED, p2.getStatus());
|
||||
assertEquals("hello world\n", p2.getResult().message().get(0).getData());
|
||||
assertEquals(Status.FINISHED, p3.getStatus());
|
||||
assertEquals("hello world\n", p3.getResult().message().get(0).getData());
|
||||
assertEquals(Status.FINISHED, p4.getStatus());
|
||||
assertEquals("hello world\n", p4.getResult().message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -502,8 +511,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
"[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" +
|
||||
"print(items[0])";
|
||||
p.setText(code);
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
note.run(p.getId(), true);
|
||||
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
Iterator<String> formIter = p.settings.getForms().keySet().iterator();
|
||||
|
|
@ -519,6 +527,51 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
assertEquals("2", result[2]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAngularObjects() throws IOException, InterpreterNotFoundException {
|
||||
Note note = ZeppelinServer.notebook.createNote(anonymous);
|
||||
Paragraph p1 = note.addNewParagraph(anonymous);
|
||||
|
||||
// add local angular object
|
||||
p1.setText("%spark z.angularBind(\"name\", \"world\")");
|
||||
note.run(p1.getId(), true);
|
||||
assertEquals(Status.FINISHED, p1.getStatus());
|
||||
List<AngularObject> angularObjects =
|
||||
p1.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(note.getId(), null);
|
||||
assertEquals(1, angularObjects.size());
|
||||
assertEquals("name", angularObjects.get(0).getName());
|
||||
assertEquals("world", angularObjects.get(0).get());
|
||||
|
||||
// remove local angular object
|
||||
Paragraph p2 = note.addNewParagraph(anonymous);
|
||||
p2.setText("%spark z.angularUnbind(\"name\")");
|
||||
note.run(p2.getId(), true);
|
||||
assertEquals(Status.FINISHED, p2.getStatus());
|
||||
angularObjects =
|
||||
p1.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(note.getId(), null);
|
||||
assertEquals(0, angularObjects.size());
|
||||
|
||||
// add global angular object
|
||||
Paragraph p3 = note.addNewParagraph(anonymous);
|
||||
p3.setText("%spark z.angularBindGlobal(\"name2\", \"world2\")");
|
||||
note.run(p3.getId(), true);
|
||||
assertEquals(Status.FINISHED, p3.getStatus());
|
||||
List<AngularObject> globalAngularObjects =
|
||||
p3.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(null, null);
|
||||
assertEquals(1, globalAngularObjects.size());
|
||||
assertEquals("name2", globalAngularObjects.get(0).getName());
|
||||
assertEquals("world2", globalAngularObjects.get(0).get());
|
||||
|
||||
// remove global angular object
|
||||
Paragraph p4 = note.addNewParagraph(anonymous);
|
||||
p4.setText("%spark z.angularUnbindGlobal(\"name2\")");
|
||||
note.run(p4.getId(), true);
|
||||
assertEquals(Status.FINISHED, p4.getStatus());
|
||||
globalAngularObjects =
|
||||
p4.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry().getAll(note.getId(), null);
|
||||
assertEquals(0, globalAngularObjects.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfInterpreter() throws IOException {
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().close();
|
||||
|
|
|
|||
Loading…
Reference in a new issue