ZEPPELIN-3362. Unify ZeppelinContext of PythonInterpreter & IPythonInterpreter

This commit is contained in:
Jeff Zhang 2018-03-23 17:23:59 +08:00
parent 3eea57ab26
commit b5dcbc9f0b
11 changed files with 289 additions and 462 deletions

View file

@ -210,6 +210,22 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
throw new IOException("Fail to setup JVMGateway\n" + response.getOutput());
}
input =
getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py");
lines = IOUtils.readLines(input);
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())).build());
if (response.getStatus() == ExecuteStatus.ERROR) {
throw new IOException("Fail to import ZeppelinContext\n" + response.getOutput());
}
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
.setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)")
.build());
if (response.getStatus() == ExecuteStatus.ERROR) {
throw new IOException("Fail to setup ZeppelinContext\n" + response.getOutput());
}
if (additionalPythonInitFile != null) {
input = getClass().getClassLoader().getResourceAsStream(additionalPythonInitFile);
lines = IOUtils.readLines(input);

View file

@ -65,6 +65,7 @@ import py4j.GatewayServer;
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class);
public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py";
public static final String ZEPPELIN_CONTEXT = "python/zeppelin_context.py";
public static final String ZEPPELIN_PY4JPATH = "interpreter/python/py4j-0.9.2/src";
public static final String ZEPPELIN_PYTHON_LIBS = "interpreter/lib/python";
public static final String DEFAULT_ZEPPELIN_PYTHON = "python";
@ -125,7 +126,11 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
copyFile(out, ZEPPELIN_PYTHON);
logger.info("File {} created", scriptPath);
// copy zeppelin_context.py as well
File zOut = new File(out.getParent() + "/zeppelin_context.py");
copyFile(zOut, ZEPPELIN_CONTEXT);
logger.info("File {} , {} created", scriptPath, zOut.getAbsolutePath());
}
public String getScriptPath() {
@ -181,7 +186,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
cmd.addArgument(getLocalIp(), false);
executor = new DefaultExecutor();
outputStream = new InterpreterOutputStream(logger);
outputStream = new InterpreterOutputStream(LOG);
PipedOutputStream ps = new PipedOutputStream();
in = null;
try {

View file

@ -17,130 +17,8 @@
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from io import BytesIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
class PyZeppelinContext(object):
""" A context impl that uses Py4j to communicate to JVM
"""
def __init__(self, z):
self.z = z
self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
self.javaList = gateway.jvm.java.util.ArrayList
self.max_result = z.getMaxResult()
def getInterpreterContext(self):
return self.z.getInterpreterContext()
def input(self, name, defaultValue=""):
return self.z.input(name, defaultValue)
def textbox(self, name, defaultValue=""):
return self.z.textbox(name, defaultValue)
def noteTextbox(self, name, defaultValue=""):
return self.z.noteTextbox(name, defaultValue)
def select(self, name, options, defaultValue=""):
return self.z.select(name, defaultValue, self.getParamOptions(options))
def noteSelect(self, name, options, defaultValue=""):
return self.z.noteSelect(name, defaultValue, self.getParamOptions(options))
def checkbox(self, name, options, defaultChecked=[]):
return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
def noteCheckbox(self, name, options, defaultChecked=[]):
return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
def getParamOptions(self, options):
javaOptions = gateway.new_array(self.paramOption, len(options))
i = 0
for tuple in options:
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
i += 1
return javaOptions
def getDefaultChecked(self, defaultChecked):
javaDefaultChecked = self.javaList()
for check in defaultChecked:
javaDefaultChecked.append(check)
return javaDefaultChecked
def show(self, p, **kwargs):
if type(p).__name__ == "DataFrame": # does not play well with sub-classes
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
# and so a dependency on pandas
self.show_dataframe(p, **kwargs)
elif hasattr(p, '__call__'):
p() #error reporting
def show_dataframe(self, df, show_index=False, **kwargs):
"""Pretty prints DF using Table Display System
"""
limit = len(df) > self.max_result
header_buf = StringIO("")
if show_index:
idx_name = str(df.index.name) if df.index.name is not None else ""
header_buf.write(idx_name + "\t")
header_buf.write(str(df.columns[0]))
for col in df.columns[1:]:
header_buf.write("\t")
header_buf.write(str(col))
header_buf.write("\n")
body_buf = StringIO("")
rows = df.head(self.max_result).values if limit else df.values
index = df.index.values
for idx, row in zip(index, rows):
if show_index:
body_buf.write("%html <strong>{}</strong>".format(idx))
body_buf.write("\t")
body_buf.write(str(row[0]))
for cell in row[1:]:
body_buf.write("\t")
body_buf.write(str(cell))
body_buf.write("\n")
body_buf.seek(0); header_buf.seek(0)
#TODO(bzz): fix it, so it shows red notice, as in Spark
print("%table " + header_buf.read() + body_buf.read()) # +
# ("\n<font color=red>Results are limited by {}.</font>" \
# .format(self.max_result) if limit else "")
#)
body_buf.close(); header_buf.close()
def registerHook(self, event, cmd, replName=None):
if replName is None:
self.z.registerHook(event, cmd)
else:
self.z.registerHook(event, cmd, replName)
def unregisterHook(self, event, replName=None):
if replName is None:
self.z.unregisterHook(event)
else:
self.z.unregisterHook(event, replName)
def registerNoteHook(self, event, cmd, noteId, replName=None):
if replName is None:
self.z.registerNoteHook(event, cmd, noteId)
else:
self.z.registerNoteHook(event, cmd, noteId, replName)
def unregisterNoteHook(self, event, noteId, replName=None):
if replName is None:
self.z.unregisterNoteHook(event, noteId)
else:
self.z.unregisterNoteHook(event, noteId, replName)
# start JVM gateway
client = GatewayClient(address='127.0.0.1', port=${JVM_GATEWAY_PORT})
gateway = JavaGateway(client)
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
intp = gateway.entry_point
z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext())

View file

@ -0,0 +1,224 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os, sys
import warnings
from io import BytesIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
class PyZeppelinContext(object):
""" A context impl that uses Py4j to communicate to JVM
"""
def __init__(self, z, gateway):
self.z = z
self.gateway = gateway
self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
self.javaList = gateway.jvm.java.util.ArrayList
self.max_result = 1000
self._displayhook = lambda *args: None
self._setup_matplotlib()
# By implementing special methods it makes operating on it more Pythonic
def __setitem__(self, key, item):
self.z.put(key, item)
def __getitem__(self, key):
return self.z.get(key)
def __delitem__(self, key):
self.z.remove(key)
def __contains__(self, item):
return self.z.containsKey(item)
def add(self, key, value):
self.__setitem__(key, value)
def put(self, key, value):
self.__setitem__(key, value)
def get(self, key):
return self.__getitem__(key)
def getInterpreterContext(self):
return self.z.getInterpreterContext()
def input(self, name, defaultValue=""):
return self.z.input(name, defaultValue)
def textbox(self, name, defaultValue=""):
return self.z.textbox(name, defaultValue)
def noteTextbox(self, name, defaultValue=""):
return self.z.noteTextbox(name, defaultValue)
def select(self, name, options, defaultValue=""):
return self.z.select(name, defaultValue, self.getParamOptions(options))
def noteSelect(self, name, options, defaultValue=""):
return self.z.noteSelect(name, defaultValue, self.getParamOptions(options))
def checkbox(self, name, options, defaultChecked=[]):
return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
def noteCheckbox(self, name, options, defaultChecked=[]):
return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
def registerHook(self, event, cmd, replName=None):
if replName is None:
self.z.registerHook(event, cmd)
else:
self.z.registerHook(event, cmd, replName)
def unregisterHook(self, event, replName=None):
if replName is None:
self.z.unregisterHook(event)
else:
self.z.unregisterHook(event, replName)
def registerNoteHook(self, event, cmd, noteId, replName=None):
if replName is None:
self.z.registerNoteHook(event, cmd, noteId)
else:
self.z.registerNoteHook(event, cmd, noteId, replName)
def unregisterNoteHook(self, event, noteId, replName=None):
if replName is None:
self.z.unregisterNoteHook(event, noteId)
else:
self.z.unregisterNoteHook(event, noteId, replName)
def getParamOptions(self, options):
javaOptions = self.gateway.new_array(self.paramOption, len(options))
i = 0
for tuple in options:
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
i += 1
return javaOptions
def getDefaultChecked(self, defaultChecked):
javaDefaultChecked = self.javaList()
for check in defaultChecked:
javaDefaultChecked.append(check)
return javaDefaultChecked
def show(self, p, **kwargs):
if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
self.show_matplotlib(p, **kwargs)
elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
# and so a dependency on pandas
self.show_dataframe(p, **kwargs)
elif hasattr(p, '__call__'):
p() #error reporting
def show_dataframe(self, df, show_index=False, **kwargs):
"""Pretty prints DF using Table Display System
"""
limit = len(df) > self.max_result
header_buf = StringIO("")
if show_index:
idx_name = str(df.index.name) if df.index.name is not None else ""
header_buf.write(idx_name + "\t")
header_buf.write(str(df.columns[0]))
for col in df.columns[1:]:
header_buf.write("\t")
header_buf.write(str(col))
header_buf.write("\n")
body_buf = StringIO("")
rows = df.head(self.max_result).values if limit else df.values
index = df.index.values
for idx, row in zip(index, rows):
if show_index:
body_buf.write("%html <strong>{}</strong>".format(idx))
body_buf.write("\t")
body_buf.write(str(row[0]))
for cell in row[1:]:
body_buf.write("\t")
body_buf.write(str(cell))
body_buf.write("\n")
body_buf.seek(0); header_buf.seek(0)
#TODO(bzz): fix it, so it shows red notice, as in Spark
print("%table " + header_buf.read() + body_buf.read()) # +
# ("\n<font color=red>Results are limited by {}.</font>" \
# .format(self.max_result) if limit else "")
#)
body_buf.close(); header_buf.close()
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
**kwargs):
"""Matplotlib show function
"""
if fmt == "png":
img = BytesIO()
p.savefig(img, format=fmt)
img_str = b"data:image/png;base64,"
img_str += base64.b64encode(img.getvalue().strip())
img_tag = "<img src={img} style='width={width};height:{height}'>"
# Decoding is necessary for Python 3 compability
img_str = img_str.decode("ascii")
img_str = img_tag.format(img=img_str, width=width, height=height)
elif fmt == "svg":
img = StringIO()
p.savefig(img, format=fmt)
img_str = img.getvalue()
else:
raise ValueError("fmt must be 'png' or 'svg'")
html = "%html <div style='width:{width};height:{height}'>{img}<div>"
print(html.format(width=width, height=height, img=img_str))
img.close()
def configure_mpl(self, **kwargs):
import mpl_config
mpl_config.configure(**kwargs)
def _setup_matplotlib(self):
# If we don't have matplotlib installed don't bother continuing
try:
import matplotlib
except ImportError:
return
# Make sure custom backends are available in the PYTHONPATH
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
if mpl_path not in sys.path:
sys.path.append(mpl_path)
# Finally check if backend exists, and if so configure as appropriate
try:
matplotlib.use('module://backend_zinline')
import backend_zinline
# Everything looks good so make config assuming that we are using
# an inline backend
self._displayhook = backend_zinline.displayhook
self.configure_mpl(width=600, height=400, dpi=72, fontsize=10,
interactive=True, format='png', context=self.z)
except ImportError:
# Fall back to Agg if no custom backend installed
matplotlib.use('Agg')
warnings.warn("Unable to load inline matplotlib backend, "
"falling back to Agg")

View file

@ -47,182 +47,6 @@ class Logger(object):
def flush(self):
pass
class PyZeppelinContext(object):
""" A context impl that uses Py4j to communicate to JVM
"""
def __init__(self, z):
self.z = z
self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
self.javaList = gateway.jvm.java.util.ArrayList
self.max_result = 1000
self._displayhook = lambda *args: None
self._setup_matplotlib()
def getInterpreterContext(self):
return self.z.getInterpreterContext()
def input(self, name, defaultValue=""):
return self.z.input(name, defaultValue)
def textbox(self, name, defaultValue=""):
return self.z.textbox(name, defaultValue)
def noteTextbox(self, name, defaultValue=""):
return self.z.noteTextbox(name, defaultValue)
def select(self, name, options, defaultValue=""):
return self.z.select(name, defaultValue, self.getParamOptions(options))
def noteSelect(self, name, options, defaultValue=""):
return self.z.noteSelect(name, defaultValue, self.getParamOptions(options))
def checkbox(self, name, options, defaultChecked=[]):
return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
def noteCheckbox(self, name, options, defaultChecked=[]):
return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
def registerHook(self, event, cmd, replName=None):
if replName is None:
self.z.registerHook(event, cmd)
else:
self.z.registerHook(event, cmd, replName)
def unregisterHook(self, event, replName=None):
if replName is None:
self.z.unregisterHook(event)
else:
self.z.unregisterHook(event, replName)
def registerNoteHook(self, event, cmd, noteId, replName=None):
if replName is None:
self.z.registerNoteHook(event, cmd, noteId)
else:
self.z.registerNoteHook(event, cmd, noteId, replName)
def unregisterNoteHook(self, event, noteId, replName=None):
if replName is None:
self.z.unregisterNoteHook(event, noteId)
else:
self.z.unregisterNoteHook(event, noteId, replName)
def getParamOptions(self, options):
javaOptions = gateway.new_array(self.paramOption, len(options))
i = 0
for tuple in options:
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
i += 1
return javaOptions
def getDefaultChecked(self, defaultChecked):
javaDefaultChecked = self.javaList()
for check in defaultChecked:
javaDefaultChecked.append(check)
return javaDefaultChecked
def show(self, p, **kwargs):
if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
self.show_matplotlib(p, **kwargs)
elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
# and so a dependency on pandas
self.show_dataframe(p, **kwargs)
elif hasattr(p, '__call__'):
p() #error reporting
def show_dataframe(self, df, show_index=False, **kwargs):
"""Pretty prints DF using Table Display System
"""
limit = len(df) > self.max_result
header_buf = StringIO("")
if show_index:
idx_name = str(df.index.name) if df.index.name is not None else ""
header_buf.write(idx_name + "\t")
header_buf.write(str(df.columns[0]))
for col in df.columns[1:]:
header_buf.write("\t")
header_buf.write(str(col))
header_buf.write("\n")
body_buf = StringIO("")
rows = df.head(self.max_result).values if limit else df.values
index = df.index.values
for idx, row in zip(index, rows):
if show_index:
body_buf.write("%html <strong>{}</strong>".format(idx))
body_buf.write("\t")
body_buf.write(str(row[0]))
for cell in row[1:]:
body_buf.write("\t")
body_buf.write(str(cell))
body_buf.write("\n")
body_buf.seek(0); header_buf.seek(0)
#TODO(bzz): fix it, so it shows red notice, as in Spark
print("%table " + header_buf.read() + body_buf.read()) # +
# ("\n<font color=red>Results are limited by {}.</font>" \
# .format(self.max_result) if limit else "")
#)
body_buf.close(); header_buf.close()
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
**kwargs):
"""Matplotlib show function
"""
if fmt == "png":
img = BytesIO()
p.savefig(img, format=fmt)
img_str = b"data:image/png;base64,"
img_str += base64.b64encode(img.getvalue().strip())
img_tag = "<img src={img} style='width={width};height:{height}'>"
# Decoding is necessary for Python 3 compability
img_str = img_str.decode("ascii")
img_str = img_tag.format(img=img_str, width=width, height=height)
elif fmt == "svg":
img = StringIO()
p.savefig(img, format=fmt)
img_str = img.getvalue()
else:
raise ValueError("fmt must be 'png' or 'svg'")
html = "%html <div style='width:{width};height:{height}'>{img}<div>"
print(html.format(width=width, height=height, img=img_str))
img.close()
def configure_mpl(self, **kwargs):
import mpl_config
mpl_config.configure(**kwargs)
def _setup_matplotlib(self):
# If we don't have matplotlib installed don't bother continuing
try:
import matplotlib
except ImportError:
return
# Make sure custom backends are available in the PYTHONPATH
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
if mpl_path not in sys.path:
sys.path.append(mpl_path)
# Finally check if backend exists, and if so configure as appropriate
try:
matplotlib.use('module://backend_zinline')
import backend_zinline
# Everything looks good so make config assuming that we are using
# an inline backend
self._displayhook = backend_zinline.displayhook
self.configure_mpl(width=600, height=400, dpi=72,
fontsize=10, interactive=True, format='png')
except ImportError:
# Fall back to Agg if no custom backend installed
matplotlib.use('Agg')
warnings.warn("Unable to load inline matplotlib backend, "
"falling back to Agg")
def handler_stop_signals(sig, frame):
sys.exit("Got signal : " + str(sig))
@ -236,14 +60,16 @@ if len(sys.argv) >= 3:
_zcUserQueryNameSpace = {}
client = GatewayClient(address=host, port=int(sys.argv[1]))
#gateway = JavaGateway(client, auto_convert = True)
gateway = JavaGateway(client)
intp = gateway.entry_point
intp.onPythonScriptInitialized(os.getpid())
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext())
from zeppelin_context import PyZeppelinContext
z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)
__zeppelin__._setup_matplotlib()
_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__

View file

@ -78,7 +78,7 @@ public class IPythonInterpreterTest {
@Test
public void testGrpcFrameSize() throws InterpreterException, IOException {
Properties properties = new Properties();
properties.setProperty("zeppelin.ipython.grpc.message_size", "4");
properties.setProperty("zeppelin.ipython.grpc.message_size", "200");
startInterpreter(properties);
// to make this test can run under both python2 and python3
@ -86,11 +86,11 @@ public class IPythonInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
InterpreterContext context = getInterpreterContext();
result = interpreter.interpret("print(11111111111111111111111111111)", context);
result = interpreter.interpret("print('1'*300)", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
List<InterpreterResultMessage> interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, interpreterResultMessages.size());
assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 32 exceeds maximum: 4"));
assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 304 exceeds maximum: 200"));
// next call continue work
result = interpreter.interpret("print(1)", context);
@ -99,14 +99,14 @@ public class IPythonInterpreterTest {
close();
// increase framesize to make it work
properties.setProperty("zeppelin.ipython.grpc.message_size", "40");
properties.setProperty("zeppelin.ipython.grpc.message_size", "500");
startInterpreter(properties);
// to make this test can run under both python2 and python3
result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
context = getInterpreterContext();
result = interpreter.interpret("print(11111111111111111111111111111)", context);
result = interpreter.interpret("print('1'*300)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}

View file

@ -29,3 +29,4 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG
log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG
log4j.logger.org.apache.zeppelin.python=DEBUG

View file

@ -114,6 +114,16 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
throw new InterpreterException(e);
}
try {
FileOutputStream outStream = new FileOutputStream(out.getParent() + "/zeppelin_context.py");
IOUtils.copy(
classLoader.getResourceAsStream("python/zeppelin_context.py"),
outStream);
outStream.close();
} catch (IOException e) {
throw new InterpreterException(e);
}
LOGGER.info("File {} created", scriptPath);
}

View file

@ -54,8 +54,8 @@ else:
class IPySparkZeppelinContext(PyZeppelinContext):
def __init__(self, z):
super(IPySparkZeppelinContext, self).__init__(z)
def __init__(self, z, gateway):
super(IPySparkZeppelinContext, self).__init__(z, gateway)
def show(self, obj):
from pyspark.sql import DataFrame
@ -64,4 +64,4 @@ class IPySparkZeppelinContext(PyZeppelinContext):
else:
super(IPySparkZeppelinContext, self).show(obj)
z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext())
z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext(), gateway)

View file

@ -41,155 +41,6 @@ class Logger(object):
pass
class PyZeppelinContext(dict):
def __init__(self, zc):
self.z = zc
self._displayhook = lambda *args: None
def show(self, obj):
from pyspark.sql import DataFrame
if isinstance(obj, DataFrame):
print(self.z.showData(obj._jdf))
else:
print(str(obj))
# By implementing special methods it makes operating on it more Pythonic
def __setitem__(self, key, item):
self.z.put(key, item)
def __getitem__(self, key):
return self.z.get(key)
def __delitem__(self, key):
self.z.remove(key)
def __contains__(self, item):
return self.z.containsKey(item)
def add(self, key, value):
self.__setitem__(key, value)
def put(self, key, value):
self.__setitem__(key, value)
def get(self, key):
return self.__getitem__(key)
def getInterpreterContext(self):
return self.z.getInterpreterContext()
def input(self, name, defaultValue=""):
return self.z.input(name, defaultValue)
def textbox(self, name, defaultValue=""):
return self.z.textbox(name, defaultValue)
def noteTextbox(self, name, defaultValue=""):
return self.z.noteTextbox(name, defaultValue)
def select(self, name, options, defaultValue=""):
# auto_convert to ArrayList doesn't match the method signature on JVM side
return self.z.select(name, defaultValue, self.getParamOptions(options))
def noteSelect(self, name, options, defaultValue=""):
return self.z.noteSelect(name, defaultValue, self.getParamOptions(options))
def checkbox(self, name, options, defaultChecked=None):
optionsIterable = self.getParamOptions(options)
defaultCheckedIterables = self.getDefaultChecked(defaultChecked)
checkedItems = gateway.jvm.scala.collection.JavaConversions.seqAsJavaList(self.z.checkbox(name, defaultCheckedIterables, optionsIterable))
result = []
for checkedItem in checkedItems:
result.append(checkedItem)
return result;
def noteCheckbox(self, name, options, defaultChecked=None):
optionsIterable = self.getParamOptions(options)
defaultCheckedIterables = self.getDefaultChecked(defaultChecked)
checkedItems = gateway.jvm.scala.collection.JavaConversions.seqAsJavaList(self.z.noteCheckbox(name, defaultCheckedIterables, optionsIterable))
result = []
for checkedItem in checkedItems:
result.append(checkedItem)
return result;
def getParamOptions(self, options):
tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
return gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples)
def getDefaultChecked(self, defaultChecked):
if defaultChecked is None:
defaultChecked = []
return gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(defaultChecked)
def registerHook(self, event, cmd, replName=None):
if replName is None:
self.z.registerHook(event, cmd)
else:
self.z.registerHook(event, cmd, replName)
def unregisterHook(self, event, replName=None):
if replName is None:
self.z.unregisterHook(event)
else:
self.z.unregisterHook(event, replName)
def registerNoteHook(self, event, cmd, noteId, replName=None):
if replName is None:
self.z.registerNoteHook(event, cmd, noteId)
else:
self.z.registerNoteHook(event, cmd, noteId, replName)
def unregisterNoteHook(self, event, noteId, replName=None):
if replName is None:
self.z.unregisterNoteHook(event, noteId)
else:
self.z.unregisterNoteHook(event, noteId, replName)
def getHook(self, event, replName=None):
if replName is None:
return self.z.getHook(event)
return self.z.getHook(event, replName)
def _setup_matplotlib(self):
# If we don't have matplotlib installed don't bother continuing
try:
import matplotlib
except ImportError:
return
# Make sure custom backends are available in the PYTHONPATH
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
if mpl_path not in sys.path:
sys.path.append(mpl_path)
# Finally check if backend exists, and if so configure as appropriate
try:
matplotlib.use('module://backend_zinline')
import backend_zinline
# Everything looks good so make config assuming that we are using
# an inline backend
self._displayhook = backend_zinline.displayhook
self.configure_mpl(width=600, height=400, dpi=72, fontsize=10,
interactive=True, format='png', context=self.z)
except ImportError:
# Fall back to Agg if no custom backend installed
matplotlib.use('Agg')
warnings.warn("Unable to load inline matplotlib backend, "
"falling back to Agg")
def configure_mpl(self, **kwargs):
import mpl_config
mpl_config.configure(**kwargs)
def __tupleToScalaTuple2(self, tuple):
if (len(tuple) == 2):
return gateway.jvm.scala.Tuple2(tuple[0], tuple[1])
else:
raise IndexError("options must be a list of tuple of 2")
class SparkVersion(object):
SPARK_1_4_0 = 10400
SPARK_1_3_0 = 10300
@ -322,7 +173,24 @@ completion = __zeppelin_completion__ = PySparkCompletion(intp)
_zcUserQueryNameSpace["completion"] = completion
_zcUserQueryNameSpace["__zeppelin_completion__"] = __zeppelin_completion__
z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext())
from zeppelin_context import PyZeppelinContext
#TODO(zjffdu) merge it with IPySparkZeppelinContext
class PySparkZeppelinContext(PyZeppelinContext):
def __init__(self, z, gateway):
super(PySparkZeppelinContext, self).__init__(z, gateway)
def show(self, obj):
from pyspark.sql import DataFrame
if isinstance(obj, DataFrame):
print(self.z.showData(obj._jdf))
else:
super(PySparkZeppelinContext, self).show(obj)
z = __zeppelin__ = PySparkZeppelinContext(intp.getZeppelinContext(), gateway)
__zeppelin__._setup_matplotlib()
_zcUserQueryNameSpace["z"] = z
_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__

View file

@ -60,8 +60,7 @@ public class PluginManager {
(Class.forName(notebookRepoClassName).newInstance());
return notebookRepo;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
LOGGER.warn("Fail to instantiate notebookrepo:" + notebookRepoClassName, e);
return null;
LOGGER.warn("Fail to instantiate notebookrepo from classpath directly:" + notebookRepoClassName, e);
}
}