mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-3362. Unify ZeppelinContext of PythonInterpreter & IPythonInterpreter
This commit is contained in:
parent
3eea57ab26
commit
b5dcbc9f0b
11 changed files with 289 additions and 462 deletions
|
|
@ -210,6 +210,22 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
|
|||
throw new IOException("Fail to setup JVMGateway\n" + response.getOutput());
|
||||
}
|
||||
|
||||
input =
|
||||
getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py");
|
||||
lines = IOUtils.readLines(input);
|
||||
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode(StringUtils.join(lines, System.lineSeparator())).build());
|
||||
if (response.getStatus() == ExecuteStatus.ERROR) {
|
||||
throw new IOException("Fail to import ZeppelinContext\n" + response.getOutput());
|
||||
}
|
||||
|
||||
response = ipythonClient.block_execute(ExecuteRequest.newBuilder()
|
||||
.setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)")
|
||||
.build());
|
||||
if (response.getStatus() == ExecuteStatus.ERROR) {
|
||||
throw new IOException("Fail to setup ZeppelinContext\n" + response.getOutput());
|
||||
}
|
||||
|
||||
if (additionalPythonInitFile != null) {
|
||||
input = getClass().getClassLoader().getResourceAsStream(additionalPythonInitFile);
|
||||
lines = IOUtils.readLines(input);
|
||||
|
|
|
|||
|
|
@ -65,6 +65,7 @@ import py4j.GatewayServer;
|
|||
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class);
|
||||
public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py";
|
||||
public static final String ZEPPELIN_CONTEXT = "python/zeppelin_context.py";
|
||||
public static final String ZEPPELIN_PY4JPATH = "interpreter/python/py4j-0.9.2/src";
|
||||
public static final String ZEPPELIN_PYTHON_LIBS = "interpreter/lib/python";
|
||||
public static final String DEFAULT_ZEPPELIN_PYTHON = "python";
|
||||
|
|
@ -125,7 +126,11 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
}
|
||||
|
||||
copyFile(out, ZEPPELIN_PYTHON);
|
||||
logger.info("File {} created", scriptPath);
|
||||
// copy zeppelin_context.py as well
|
||||
File zOut = new File(out.getParent() + "/zeppelin_context.py");
|
||||
copyFile(zOut, ZEPPELIN_CONTEXT);
|
||||
|
||||
logger.info("File {} , {} created", scriptPath, zOut.getAbsolutePath());
|
||||
}
|
||||
|
||||
public String getScriptPath() {
|
||||
|
|
@ -181,7 +186,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
cmd.addArgument(getLocalIp(), false);
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
outputStream = new InterpreterOutputStream(logger);
|
||||
outputStream = new InterpreterOutputStream(LOG);
|
||||
PipedOutputStream ps = new PipedOutputStream();
|
||||
in = null;
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -17,130 +17,8 @@
|
|||
|
||||
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
|
||||
|
||||
from io import BytesIO
|
||||
try:
|
||||
from StringIO import StringIO
|
||||
except ImportError:
|
||||
from io import StringIO
|
||||
|
||||
class PyZeppelinContext(object):
|
||||
""" A context impl that uses Py4j to communicate to JVM
|
||||
"""
|
||||
|
||||
def __init__(self, z):
|
||||
self.z = z
|
||||
self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
|
||||
self.javaList = gateway.jvm.java.util.ArrayList
|
||||
self.max_result = z.getMaxResult()
|
||||
|
||||
def getInterpreterContext(self):
|
||||
return self.z.getInterpreterContext()
|
||||
|
||||
def input(self, name, defaultValue=""):
|
||||
return self.z.input(name, defaultValue)
|
||||
|
||||
def textbox(self, name, defaultValue=""):
|
||||
return self.z.textbox(name, defaultValue)
|
||||
|
||||
def noteTextbox(self, name, defaultValue=""):
|
||||
return self.z.noteTextbox(name, defaultValue)
|
||||
|
||||
def select(self, name, options, defaultValue=""):
|
||||
return self.z.select(name, defaultValue, self.getParamOptions(options))
|
||||
|
||||
def noteSelect(self, name, options, defaultValue=""):
|
||||
return self.z.noteSelect(name, defaultValue, self.getParamOptions(options))
|
||||
|
||||
def checkbox(self, name, options, defaultChecked=[]):
|
||||
return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
|
||||
|
||||
def noteCheckbox(self, name, options, defaultChecked=[]):
|
||||
return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
|
||||
|
||||
def getParamOptions(self, options):
|
||||
javaOptions = gateway.new_array(self.paramOption, len(options))
|
||||
i = 0
|
||||
for tuple in options:
|
||||
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
|
||||
i += 1
|
||||
return javaOptions
|
||||
|
||||
def getDefaultChecked(self, defaultChecked):
|
||||
javaDefaultChecked = self.javaList()
|
||||
for check in defaultChecked:
|
||||
javaDefaultChecked.append(check)
|
||||
return javaDefaultChecked
|
||||
|
||||
def show(self, p, **kwargs):
|
||||
if type(p).__name__ == "DataFrame": # does not play well with sub-classes
|
||||
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
|
||||
# and so a dependency on pandas
|
||||
self.show_dataframe(p, **kwargs)
|
||||
elif hasattr(p, '__call__'):
|
||||
p() #error reporting
|
||||
|
||||
def show_dataframe(self, df, show_index=False, **kwargs):
|
||||
"""Pretty prints DF using Table Display System
|
||||
"""
|
||||
limit = len(df) > self.max_result
|
||||
header_buf = StringIO("")
|
||||
if show_index:
|
||||
idx_name = str(df.index.name) if df.index.name is not None else ""
|
||||
header_buf.write(idx_name + "\t")
|
||||
header_buf.write(str(df.columns[0]))
|
||||
for col in df.columns[1:]:
|
||||
header_buf.write("\t")
|
||||
header_buf.write(str(col))
|
||||
header_buf.write("\n")
|
||||
|
||||
body_buf = StringIO("")
|
||||
rows = df.head(self.max_result).values if limit else df.values
|
||||
index = df.index.values
|
||||
for idx, row in zip(index, rows):
|
||||
if show_index:
|
||||
body_buf.write("%html <strong>{}</strong>".format(idx))
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(row[0]))
|
||||
for cell in row[1:]:
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(cell))
|
||||
body_buf.write("\n")
|
||||
body_buf.seek(0); header_buf.seek(0)
|
||||
#TODO(bzz): fix it, so it shows red notice, as in Spark
|
||||
print("%table " + header_buf.read() + body_buf.read()) # +
|
||||
# ("\n<font color=red>Results are limited by {}.</font>" \
|
||||
# .format(self.max_result) if limit else "")
|
||||
#)
|
||||
body_buf.close(); header_buf.close()
|
||||
|
||||
def registerHook(self, event, cmd, replName=None):
|
||||
if replName is None:
|
||||
self.z.registerHook(event, cmd)
|
||||
else:
|
||||
self.z.registerHook(event, cmd, replName)
|
||||
|
||||
def unregisterHook(self, event, replName=None):
|
||||
if replName is None:
|
||||
self.z.unregisterHook(event)
|
||||
else:
|
||||
self.z.unregisterHook(event, replName)
|
||||
|
||||
def registerNoteHook(self, event, cmd, noteId, replName=None):
|
||||
if replName is None:
|
||||
self.z.registerNoteHook(event, cmd, noteId)
|
||||
else:
|
||||
self.z.registerNoteHook(event, cmd, noteId, replName)
|
||||
|
||||
def unregisterNoteHook(self, event, noteId, replName=None):
|
||||
if replName is None:
|
||||
self.z.unregisterNoteHook(event, noteId)
|
||||
else:
|
||||
self.z.unregisterNoteHook(event, noteId, replName)
|
||||
|
||||
# start JVM gateway
|
||||
client = GatewayClient(address='127.0.0.1', port=${JVM_GATEWAY_PORT})
|
||||
gateway = JavaGateway(client)
|
||||
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
|
||||
intp = gateway.entry_point
|
||||
z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext())
|
||||
|
||||
|
|
|
|||
224
python/src/main/resources/python/zeppelin_context.py
Normal file
224
python/src/main/resources/python/zeppelin_context.py
Normal file
|
|
@ -0,0 +1,224 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
import os, sys
|
||||
import warnings
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
try:
|
||||
from StringIO import StringIO
|
||||
except ImportError:
|
||||
from io import StringIO
|
||||
|
||||
class PyZeppelinContext(object):
|
||||
""" A context impl that uses Py4j to communicate to JVM
|
||||
"""
|
||||
|
||||
def __init__(self, z, gateway):
|
||||
self.z = z
|
||||
self.gateway = gateway
|
||||
self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
|
||||
self.javaList = gateway.jvm.java.util.ArrayList
|
||||
self.max_result = 1000
|
||||
self._displayhook = lambda *args: None
|
||||
self._setup_matplotlib()
|
||||
|
||||
# By implementing special methods it makes operating on it more Pythonic
|
||||
def __setitem__(self, key, item):
|
||||
self.z.put(key, item)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.z.get(key)
|
||||
|
||||
def __delitem__(self, key):
|
||||
self.z.remove(key)
|
||||
|
||||
def __contains__(self, item):
|
||||
return self.z.containsKey(item)
|
||||
|
||||
def add(self, key, value):
|
||||
self.__setitem__(key, value)
|
||||
|
||||
def put(self, key, value):
|
||||
self.__setitem__(key, value)
|
||||
|
||||
def get(self, key):
|
||||
return self.__getitem__(key)
|
||||
|
||||
def getInterpreterContext(self):
|
||||
return self.z.getInterpreterContext()
|
||||
|
||||
def input(self, name, defaultValue=""):
|
||||
return self.z.input(name, defaultValue)
|
||||
|
||||
def textbox(self, name, defaultValue=""):
|
||||
return self.z.textbox(name, defaultValue)
|
||||
|
||||
def noteTextbox(self, name, defaultValue=""):
|
||||
return self.z.noteTextbox(name, defaultValue)
|
||||
|
||||
def select(self, name, options, defaultValue=""):
|
||||
return self.z.select(name, defaultValue, self.getParamOptions(options))
|
||||
|
||||
def noteSelect(self, name, options, defaultValue=""):
|
||||
return self.z.noteSelect(name, defaultValue, self.getParamOptions(options))
|
||||
|
||||
def checkbox(self, name, options, defaultChecked=[]):
|
||||
return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
|
||||
|
||||
def noteCheckbox(self, name, options, defaultChecked=[]):
|
||||
return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
|
||||
|
||||
def registerHook(self, event, cmd, replName=None):
|
||||
if replName is None:
|
||||
self.z.registerHook(event, cmd)
|
||||
else:
|
||||
self.z.registerHook(event, cmd, replName)
|
||||
|
||||
def unregisterHook(self, event, replName=None):
|
||||
if replName is None:
|
||||
self.z.unregisterHook(event)
|
||||
else:
|
||||
self.z.unregisterHook(event, replName)
|
||||
|
||||
def registerNoteHook(self, event, cmd, noteId, replName=None):
|
||||
if replName is None:
|
||||
self.z.registerNoteHook(event, cmd, noteId)
|
||||
else:
|
||||
self.z.registerNoteHook(event, cmd, noteId, replName)
|
||||
|
||||
def unregisterNoteHook(self, event, noteId, replName=None):
|
||||
if replName is None:
|
||||
self.z.unregisterNoteHook(event, noteId)
|
||||
else:
|
||||
self.z.unregisterNoteHook(event, noteId, replName)
|
||||
|
||||
def getParamOptions(self, options):
|
||||
javaOptions = self.gateway.new_array(self.paramOption, len(options))
|
||||
i = 0
|
||||
for tuple in options:
|
||||
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
|
||||
i += 1
|
||||
return javaOptions
|
||||
|
||||
def getDefaultChecked(self, defaultChecked):
|
||||
javaDefaultChecked = self.javaList()
|
||||
for check in defaultChecked:
|
||||
javaDefaultChecked.append(check)
|
||||
return javaDefaultChecked
|
||||
|
||||
def show(self, p, **kwargs):
|
||||
if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
|
||||
self.show_matplotlib(p, **kwargs)
|
||||
elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
|
||||
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
|
||||
# and so a dependency on pandas
|
||||
self.show_dataframe(p, **kwargs)
|
||||
elif hasattr(p, '__call__'):
|
||||
p() #error reporting
|
||||
|
||||
def show_dataframe(self, df, show_index=False, **kwargs):
|
||||
"""Pretty prints DF using Table Display System
|
||||
"""
|
||||
limit = len(df) > self.max_result
|
||||
header_buf = StringIO("")
|
||||
if show_index:
|
||||
idx_name = str(df.index.name) if df.index.name is not None else ""
|
||||
header_buf.write(idx_name + "\t")
|
||||
header_buf.write(str(df.columns[0]))
|
||||
for col in df.columns[1:]:
|
||||
header_buf.write("\t")
|
||||
header_buf.write(str(col))
|
||||
header_buf.write("\n")
|
||||
|
||||
body_buf = StringIO("")
|
||||
rows = df.head(self.max_result).values if limit else df.values
|
||||
index = df.index.values
|
||||
for idx, row in zip(index, rows):
|
||||
if show_index:
|
||||
body_buf.write("%html <strong>{}</strong>".format(idx))
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(row[0]))
|
||||
for cell in row[1:]:
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(cell))
|
||||
body_buf.write("\n")
|
||||
body_buf.seek(0); header_buf.seek(0)
|
||||
#TODO(bzz): fix it, so it shows red notice, as in Spark
|
||||
print("%table " + header_buf.read() + body_buf.read()) # +
|
||||
# ("\n<font color=red>Results are limited by {}.</font>" \
|
||||
# .format(self.max_result) if limit else "")
|
||||
#)
|
||||
body_buf.close(); header_buf.close()
|
||||
|
||||
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
|
||||
**kwargs):
|
||||
"""Matplotlib show function
|
||||
"""
|
||||
if fmt == "png":
|
||||
img = BytesIO()
|
||||
p.savefig(img, format=fmt)
|
||||
img_str = b"data:image/png;base64,"
|
||||
img_str += base64.b64encode(img.getvalue().strip())
|
||||
img_tag = "<img src={img} style='width={width};height:{height}'>"
|
||||
# Decoding is necessary for Python 3 compability
|
||||
img_str = img_str.decode("ascii")
|
||||
img_str = img_tag.format(img=img_str, width=width, height=height)
|
||||
elif fmt == "svg":
|
||||
img = StringIO()
|
||||
p.savefig(img, format=fmt)
|
||||
img_str = img.getvalue()
|
||||
else:
|
||||
raise ValueError("fmt must be 'png' or 'svg'")
|
||||
|
||||
html = "%html <div style='width:{width};height:{height}'>{img}<div>"
|
||||
print(html.format(width=width, height=height, img=img_str))
|
||||
img.close()
|
||||
|
||||
def configure_mpl(self, **kwargs):
|
||||
import mpl_config
|
||||
mpl_config.configure(**kwargs)
|
||||
|
||||
def _setup_matplotlib(self):
|
||||
# If we don't have matplotlib installed don't bother continuing
|
||||
try:
|
||||
import matplotlib
|
||||
except ImportError:
|
||||
return
|
||||
|
||||
# Make sure custom backends are available in the PYTHONPATH
|
||||
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
|
||||
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
|
||||
if mpl_path not in sys.path:
|
||||
sys.path.append(mpl_path)
|
||||
|
||||
# Finally check if backend exists, and if so configure as appropriate
|
||||
try:
|
||||
matplotlib.use('module://backend_zinline')
|
||||
import backend_zinline
|
||||
|
||||
# Everything looks good so make config assuming that we are using
|
||||
# an inline backend
|
||||
self._displayhook = backend_zinline.displayhook
|
||||
self.configure_mpl(width=600, height=400, dpi=72, fontsize=10,
|
||||
interactive=True, format='png', context=self.z)
|
||||
except ImportError:
|
||||
# Fall back to Agg if no custom backend installed
|
||||
matplotlib.use('Agg')
|
||||
warnings.warn("Unable to load inline matplotlib backend, "
|
||||
"falling back to Agg")
|
||||
|
|
@ -47,182 +47,6 @@ class Logger(object):
|
|||
def flush(self):
|
||||
pass
|
||||
|
||||
|
||||
class PyZeppelinContext(object):
|
||||
""" A context impl that uses Py4j to communicate to JVM
|
||||
"""
|
||||
|
||||
def __init__(self, z):
|
||||
self.z = z
|
||||
self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
|
||||
self.javaList = gateway.jvm.java.util.ArrayList
|
||||
self.max_result = 1000
|
||||
self._displayhook = lambda *args: None
|
||||
self._setup_matplotlib()
|
||||
|
||||
def getInterpreterContext(self):
|
||||
return self.z.getInterpreterContext()
|
||||
|
||||
def input(self, name, defaultValue=""):
|
||||
return self.z.input(name, defaultValue)
|
||||
|
||||
def textbox(self, name, defaultValue=""):
|
||||
return self.z.textbox(name, defaultValue)
|
||||
|
||||
def noteTextbox(self, name, defaultValue=""):
|
||||
return self.z.noteTextbox(name, defaultValue)
|
||||
|
||||
def select(self, name, options, defaultValue=""):
|
||||
return self.z.select(name, defaultValue, self.getParamOptions(options))
|
||||
|
||||
def noteSelect(self, name, options, defaultValue=""):
|
||||
return self.z.noteSelect(name, defaultValue, self.getParamOptions(options))
|
||||
|
||||
def checkbox(self, name, options, defaultChecked=[]):
|
||||
return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
|
||||
|
||||
def noteCheckbox(self, name, options, defaultChecked=[]):
|
||||
return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
|
||||
|
||||
def registerHook(self, event, cmd, replName=None):
|
||||
if replName is None:
|
||||
self.z.registerHook(event, cmd)
|
||||
else:
|
||||
self.z.registerHook(event, cmd, replName)
|
||||
|
||||
def unregisterHook(self, event, replName=None):
|
||||
if replName is None:
|
||||
self.z.unregisterHook(event)
|
||||
else:
|
||||
self.z.unregisterHook(event, replName)
|
||||
|
||||
def registerNoteHook(self, event, cmd, noteId, replName=None):
|
||||
if replName is None:
|
||||
self.z.registerNoteHook(event, cmd, noteId)
|
||||
else:
|
||||
self.z.registerNoteHook(event, cmd, noteId, replName)
|
||||
|
||||
def unregisterNoteHook(self, event, noteId, replName=None):
|
||||
if replName is None:
|
||||
self.z.unregisterNoteHook(event, noteId)
|
||||
else:
|
||||
self.z.unregisterNoteHook(event, noteId, replName)
|
||||
|
||||
def getParamOptions(self, options):
|
||||
javaOptions = gateway.new_array(self.paramOption, len(options))
|
||||
i = 0
|
||||
for tuple in options:
|
||||
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
|
||||
i += 1
|
||||
return javaOptions
|
||||
|
||||
def getDefaultChecked(self, defaultChecked):
|
||||
javaDefaultChecked = self.javaList()
|
||||
for check in defaultChecked:
|
||||
javaDefaultChecked.append(check)
|
||||
return javaDefaultChecked
|
||||
|
||||
def show(self, p, **kwargs):
|
||||
if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
|
||||
self.show_matplotlib(p, **kwargs)
|
||||
elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
|
||||
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
|
||||
# and so a dependency on pandas
|
||||
self.show_dataframe(p, **kwargs)
|
||||
elif hasattr(p, '__call__'):
|
||||
p() #error reporting
|
||||
|
||||
def show_dataframe(self, df, show_index=False, **kwargs):
|
||||
"""Pretty prints DF using Table Display System
|
||||
"""
|
||||
limit = len(df) > self.max_result
|
||||
header_buf = StringIO("")
|
||||
if show_index:
|
||||
idx_name = str(df.index.name) if df.index.name is not None else ""
|
||||
header_buf.write(idx_name + "\t")
|
||||
header_buf.write(str(df.columns[0]))
|
||||
for col in df.columns[1:]:
|
||||
header_buf.write("\t")
|
||||
header_buf.write(str(col))
|
||||
header_buf.write("\n")
|
||||
|
||||
body_buf = StringIO("")
|
||||
rows = df.head(self.max_result).values if limit else df.values
|
||||
index = df.index.values
|
||||
for idx, row in zip(index, rows):
|
||||
if show_index:
|
||||
body_buf.write("%html <strong>{}</strong>".format(idx))
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(row[0]))
|
||||
for cell in row[1:]:
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(cell))
|
||||
body_buf.write("\n")
|
||||
body_buf.seek(0); header_buf.seek(0)
|
||||
#TODO(bzz): fix it, so it shows red notice, as in Spark
|
||||
print("%table " + header_buf.read() + body_buf.read()) # +
|
||||
# ("\n<font color=red>Results are limited by {}.</font>" \
|
||||
# .format(self.max_result) if limit else "")
|
||||
#)
|
||||
body_buf.close(); header_buf.close()
|
||||
|
||||
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
|
||||
**kwargs):
|
||||
"""Matplotlib show function
|
||||
"""
|
||||
if fmt == "png":
|
||||
img = BytesIO()
|
||||
p.savefig(img, format=fmt)
|
||||
img_str = b"data:image/png;base64,"
|
||||
img_str += base64.b64encode(img.getvalue().strip())
|
||||
img_tag = "<img src={img} style='width={width};height:{height}'>"
|
||||
# Decoding is necessary for Python 3 compability
|
||||
img_str = img_str.decode("ascii")
|
||||
img_str = img_tag.format(img=img_str, width=width, height=height)
|
||||
elif fmt == "svg":
|
||||
img = StringIO()
|
||||
p.savefig(img, format=fmt)
|
||||
img_str = img.getvalue()
|
||||
else:
|
||||
raise ValueError("fmt must be 'png' or 'svg'")
|
||||
|
||||
html = "%html <div style='width:{width};height:{height}'>{img}<div>"
|
||||
print(html.format(width=width, height=height, img=img_str))
|
||||
img.close()
|
||||
|
||||
def configure_mpl(self, **kwargs):
|
||||
import mpl_config
|
||||
mpl_config.configure(**kwargs)
|
||||
|
||||
def _setup_matplotlib(self):
|
||||
# If we don't have matplotlib installed don't bother continuing
|
||||
try:
|
||||
import matplotlib
|
||||
except ImportError:
|
||||
return
|
||||
# Make sure custom backends are available in the PYTHONPATH
|
||||
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
|
||||
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
|
||||
if mpl_path not in sys.path:
|
||||
sys.path.append(mpl_path)
|
||||
|
||||
# Finally check if backend exists, and if so configure as appropriate
|
||||
try:
|
||||
matplotlib.use('module://backend_zinline')
|
||||
import backend_zinline
|
||||
|
||||
# Everything looks good so make config assuming that we are using
|
||||
# an inline backend
|
||||
self._displayhook = backend_zinline.displayhook
|
||||
self.configure_mpl(width=600, height=400, dpi=72,
|
||||
fontsize=10, interactive=True, format='png')
|
||||
except ImportError:
|
||||
# Fall back to Agg if no custom backend installed
|
||||
matplotlib.use('Agg')
|
||||
warnings.warn("Unable to load inline matplotlib backend, "
|
||||
"falling back to Agg")
|
||||
|
||||
|
||||
def handler_stop_signals(sig, frame):
|
||||
sys.exit("Got signal : " + str(sig))
|
||||
|
||||
|
|
@ -236,14 +60,16 @@ if len(sys.argv) >= 3:
|
|||
_zcUserQueryNameSpace = {}
|
||||
client = GatewayClient(address=host, port=int(sys.argv[1]))
|
||||
|
||||
#gateway = JavaGateway(client, auto_convert = True)
|
||||
gateway = JavaGateway(client)
|
||||
|
||||
intp = gateway.entry_point
|
||||
intp.onPythonScriptInitialized(os.getpid())
|
||||
|
||||
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
|
||||
z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext())
|
||||
|
||||
from zeppelin_context import PyZeppelinContext
|
||||
|
||||
z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)
|
||||
__zeppelin__._setup_matplotlib()
|
||||
|
||||
_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ public class IPythonInterpreterTest {
|
|||
@Test
|
||||
public void testGrpcFrameSize() throws InterpreterException, IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("zeppelin.ipython.grpc.message_size", "4");
|
||||
properties.setProperty("zeppelin.ipython.grpc.message_size", "200");
|
||||
startInterpreter(properties);
|
||||
|
||||
// to make this test can run under both python2 and python3
|
||||
|
|
@ -86,11 +86,11 @@ public class IPythonInterpreterTest {
|
|||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
result = interpreter.interpret("print(11111111111111111111111111111)", context);
|
||||
result = interpreter.interpret("print('1'*300)", context);
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
List<InterpreterResultMessage> interpreterResultMessages = context.out.toInterpreterResultMessage();
|
||||
assertEquals(1, interpreterResultMessages.size());
|
||||
assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 32 exceeds maximum: 4"));
|
||||
assertTrue(interpreterResultMessages.get(0).getData().contains("Frame size 304 exceeds maximum: 200"));
|
||||
|
||||
// next call continue work
|
||||
result = interpreter.interpret("print(1)", context);
|
||||
|
|
@ -99,14 +99,14 @@ public class IPythonInterpreterTest {
|
|||
close();
|
||||
|
||||
// increase framesize to make it work
|
||||
properties.setProperty("zeppelin.ipython.grpc.message_size", "40");
|
||||
properties.setProperty("zeppelin.ipython.grpc.message_size", "500");
|
||||
startInterpreter(properties);
|
||||
// to make this test can run under both python2 and python3
|
||||
result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
context = getInterpreterContext();
|
||||
result = interpreter.interpret("print(11111111111111111111111111111)", context);
|
||||
result = interpreter.interpret("print('1'*300)", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,3 +29,4 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
|
|||
log4j.rootLogger=INFO, stdout
|
||||
log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG
|
||||
log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG
|
||||
log4j.logger.org.apache.zeppelin.python=DEBUG
|
||||
|
|
@ -114,6 +114,16 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
FileOutputStream outStream = new FileOutputStream(out.getParent() + "/zeppelin_context.py");
|
||||
IOUtils.copy(
|
||||
classLoader.getResourceAsStream("python/zeppelin_context.py"),
|
||||
outStream);
|
||||
outStream.close();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
LOGGER.info("File {} created", scriptPath);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -54,8 +54,8 @@ else:
|
|||
|
||||
class IPySparkZeppelinContext(PyZeppelinContext):
|
||||
|
||||
def __init__(self, z):
|
||||
super(IPySparkZeppelinContext, self).__init__(z)
|
||||
def __init__(self, z, gateway):
|
||||
super(IPySparkZeppelinContext, self).__init__(z, gateway)
|
||||
|
||||
def show(self, obj):
|
||||
from pyspark.sql import DataFrame
|
||||
|
|
@ -64,4 +64,4 @@ class IPySparkZeppelinContext(PyZeppelinContext):
|
|||
else:
|
||||
super(IPySparkZeppelinContext, self).show(obj)
|
||||
|
||||
z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext())
|
||||
z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext(), gateway)
|
||||
|
|
|
|||
|
|
@ -41,155 +41,6 @@ class Logger(object):
|
|||
pass
|
||||
|
||||
|
||||
class PyZeppelinContext(dict):
|
||||
def __init__(self, zc):
|
||||
self.z = zc
|
||||
self._displayhook = lambda *args: None
|
||||
|
||||
def show(self, obj):
|
||||
from pyspark.sql import DataFrame
|
||||
if isinstance(obj, DataFrame):
|
||||
print(self.z.showData(obj._jdf))
|
||||
else:
|
||||
print(str(obj))
|
||||
|
||||
# By implementing special methods it makes operating on it more Pythonic
|
||||
def __setitem__(self, key, item):
|
||||
self.z.put(key, item)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.z.get(key)
|
||||
|
||||
def __delitem__(self, key):
|
||||
self.z.remove(key)
|
||||
|
||||
def __contains__(self, item):
|
||||
return self.z.containsKey(item)
|
||||
|
||||
def add(self, key, value):
|
||||
self.__setitem__(key, value)
|
||||
|
||||
def put(self, key, value):
|
||||
self.__setitem__(key, value)
|
||||
|
||||
def get(self, key):
|
||||
return self.__getitem__(key)
|
||||
|
||||
def getInterpreterContext(self):
|
||||
return self.z.getInterpreterContext()
|
||||
|
||||
def input(self, name, defaultValue=""):
|
||||
return self.z.input(name, defaultValue)
|
||||
|
||||
def textbox(self, name, defaultValue=""):
|
||||
return self.z.textbox(name, defaultValue)
|
||||
|
||||
def noteTextbox(self, name, defaultValue=""):
|
||||
return self.z.noteTextbox(name, defaultValue)
|
||||
|
||||
def select(self, name, options, defaultValue=""):
|
||||
# auto_convert to ArrayList doesn't match the method signature on JVM side
|
||||
return self.z.select(name, defaultValue, self.getParamOptions(options))
|
||||
|
||||
def noteSelect(self, name, options, defaultValue=""):
|
||||
return self.z.noteSelect(name, defaultValue, self.getParamOptions(options))
|
||||
|
||||
def checkbox(self, name, options, defaultChecked=None):
|
||||
optionsIterable = self.getParamOptions(options)
|
||||
defaultCheckedIterables = self.getDefaultChecked(defaultChecked)
|
||||
checkedItems = gateway.jvm.scala.collection.JavaConversions.seqAsJavaList(self.z.checkbox(name, defaultCheckedIterables, optionsIterable))
|
||||
result = []
|
||||
for checkedItem in checkedItems:
|
||||
result.append(checkedItem)
|
||||
return result;
|
||||
|
||||
def noteCheckbox(self, name, options, defaultChecked=None):
|
||||
optionsIterable = self.getParamOptions(options)
|
||||
defaultCheckedIterables = self.getDefaultChecked(defaultChecked)
|
||||
checkedItems = gateway.jvm.scala.collection.JavaConversions.seqAsJavaList(self.z.noteCheckbox(name, defaultCheckedIterables, optionsIterable))
|
||||
result = []
|
||||
for checkedItem in checkedItems:
|
||||
result.append(checkedItem)
|
||||
return result;
|
||||
|
||||
def getParamOptions(self, options):
|
||||
tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
|
||||
return gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples)
|
||||
|
||||
def getDefaultChecked(self, defaultChecked):
|
||||
if defaultChecked is None:
|
||||
defaultChecked = []
|
||||
return gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(defaultChecked)
|
||||
|
||||
def registerHook(self, event, cmd, replName=None):
|
||||
if replName is None:
|
||||
self.z.registerHook(event, cmd)
|
||||
else:
|
||||
self.z.registerHook(event, cmd, replName)
|
||||
|
||||
def unregisterHook(self, event, replName=None):
|
||||
if replName is None:
|
||||
self.z.unregisterHook(event)
|
||||
else:
|
||||
self.z.unregisterHook(event, replName)
|
||||
|
||||
def registerNoteHook(self, event, cmd, noteId, replName=None):
|
||||
if replName is None:
|
||||
self.z.registerNoteHook(event, cmd, noteId)
|
||||
else:
|
||||
self.z.registerNoteHook(event, cmd, noteId, replName)
|
||||
|
||||
def unregisterNoteHook(self, event, noteId, replName=None):
|
||||
if replName is None:
|
||||
self.z.unregisterNoteHook(event, noteId)
|
||||
else:
|
||||
self.z.unregisterNoteHook(event, noteId, replName)
|
||||
|
||||
def getHook(self, event, replName=None):
|
||||
if replName is None:
|
||||
return self.z.getHook(event)
|
||||
return self.z.getHook(event, replName)
|
||||
|
||||
def _setup_matplotlib(self):
|
||||
# If we don't have matplotlib installed don't bother continuing
|
||||
try:
|
||||
import matplotlib
|
||||
except ImportError:
|
||||
return
|
||||
|
||||
# Make sure custom backends are available in the PYTHONPATH
|
||||
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
|
||||
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
|
||||
if mpl_path not in sys.path:
|
||||
sys.path.append(mpl_path)
|
||||
|
||||
# Finally check if backend exists, and if so configure as appropriate
|
||||
try:
|
||||
matplotlib.use('module://backend_zinline')
|
||||
import backend_zinline
|
||||
|
||||
# Everything looks good so make config assuming that we are using
|
||||
# an inline backend
|
||||
self._displayhook = backend_zinline.displayhook
|
||||
self.configure_mpl(width=600, height=400, dpi=72, fontsize=10,
|
||||
interactive=True, format='png', context=self.z)
|
||||
except ImportError:
|
||||
# Fall back to Agg if no custom backend installed
|
||||
matplotlib.use('Agg')
|
||||
warnings.warn("Unable to load inline matplotlib backend, "
|
||||
"falling back to Agg")
|
||||
|
||||
def configure_mpl(self, **kwargs):
|
||||
import mpl_config
|
||||
mpl_config.configure(**kwargs)
|
||||
|
||||
def __tupleToScalaTuple2(self, tuple):
|
||||
if (len(tuple) == 2):
|
||||
return gateway.jvm.scala.Tuple2(tuple[0], tuple[1])
|
||||
else:
|
||||
raise IndexError("options must be a list of tuple of 2")
|
||||
|
||||
|
||||
class SparkVersion(object):
|
||||
SPARK_1_4_0 = 10400
|
||||
SPARK_1_3_0 = 10300
|
||||
|
|
@ -322,7 +173,24 @@ completion = __zeppelin_completion__ = PySparkCompletion(intp)
|
|||
_zcUserQueryNameSpace["completion"] = completion
|
||||
_zcUserQueryNameSpace["__zeppelin_completion__"] = __zeppelin_completion__
|
||||
|
||||
z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext())
|
||||
|
||||
from zeppelin_context import PyZeppelinContext
|
||||
|
||||
#TODO(zjffdu) merge it with IPySparkZeppelinContext
|
||||
class PySparkZeppelinContext(PyZeppelinContext):
|
||||
|
||||
def __init__(self, z, gateway):
|
||||
super(PySparkZeppelinContext, self).__init__(z, gateway)
|
||||
|
||||
def show(self, obj):
|
||||
from pyspark.sql import DataFrame
|
||||
if isinstance(obj, DataFrame):
|
||||
print(self.z.showData(obj._jdf))
|
||||
else:
|
||||
super(PySparkZeppelinContext, self).show(obj)
|
||||
|
||||
z = __zeppelin__ = PySparkZeppelinContext(intp.getZeppelinContext(), gateway)
|
||||
|
||||
__zeppelin__._setup_matplotlib()
|
||||
_zcUserQueryNameSpace["z"] = z
|
||||
_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__
|
||||
|
|
|
|||
|
|
@ -60,8 +60,7 @@ public class PluginManager {
|
|||
(Class.forName(notebookRepoClassName).newInstance());
|
||||
return notebookRepo;
|
||||
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
|
||||
LOGGER.warn("Fail to instantiate notebookrepo:" + notebookRepoClassName, e);
|
||||
return null;
|
||||
LOGGER.warn("Fail to instantiate notebookrepo from classpath directly:" + notebookRepoClassName, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue