[ZEPPELIN-4265]. Support more types of output for IPythonInterpreter

This commit is contained in:
Jeff Zhang 2019-07-19 17:11:50 +08:00
parent de4c0bc4a2
commit 2ebac75686
8 changed files with 250 additions and 325 deletions

View file

@ -88,35 +88,47 @@ public class IPythonClient {
LOGGER.debug("stream_execute code:\n" + request.getCode());
asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
int index = 0;
boolean isPreviousOutputImage = false;
@Override
public void onNext(ExecuteResponse executeResponse) {
LOGGER.debug("Interpreter Streaming Output: " + executeResponse.getType() +
"\t" + executeResponse.getOutput());
if (index != 0) {
try {
// We need to add line separator first, because zeppelin only recoginize the % at
// the line beginning.
interpreterOutput.write("\n".getBytes());
} catch (IOException e) {
LOGGER.error("Unexpected IOException", e);
}
}
if (executeResponse.getType() == OutputType.TEXT) {
try {
LOGGER.debug("Interpreter Streaming Output: " + executeResponse.getOutput());
if (isPreviousOutputImage) {
// add '\n' when switch from image to text
interpreterOutput.write("\n%text ".getBytes());
if (executeResponse.getOutput().startsWith("%")) {
// the output from ipython kernel maybe specify format already.
interpreterOutput.write((executeResponse.getOutput()).getBytes());
} else {
interpreterOutput.write(("%text " + executeResponse.getOutput()).getBytes());
}
isPreviousOutputImage = false;
interpreterOutput.write(executeResponse.getOutput().getBytes());
interpreterOutput.getInterpreterOutput().flush();
} catch (IOException e) {
LOGGER.error("Unexpected IOException", e);
}
}
if (executeResponse.getType() == OutputType.IMAGE) {
if (executeResponse.getType() == OutputType.PNG ||
executeResponse.getType() == OutputType.JPEG) {
try {
LOGGER.debug("Interpreter Streaming Output: IMAGE_DATA");
if (index != 0) {
// add '\n' if this is the not the first element. otherwise it would mix the image
// with the text
interpreterOutput.write("\n".getBytes());
}
interpreterOutput.write(("%img " + executeResponse.getOutput()).getBytes());
interpreterOutput.getInterpreterOutput().flush();
isPreviousOutputImage = true;
} catch (IOException e) {
LOGGER.error("Unexpected IOException", e);
}
}
if (executeResponse.getType() == OutputType.HTML) {
try {
interpreterOutput.write(("%html\n" + executeResponse.getOutput()).getBytes());
interpreterOutput.getInterpreterOutput().flush();
} catch (IOException e) {
LOGGER.error("Unexpected IOException", e);
}

View file

@ -53,7 +53,12 @@ enum IPythonStatus {
enum OutputType {
TEXT = 0;
IMAGE = 1;
PNG = 1;
JPEG = 2;
HTML = 3;
SVG = 4;
JSON = 5;
LaTeX = 6;
}
// The request message containing the code

View file

@ -23,14 +23,23 @@ import ipython_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50053')
stub = ipython_pb2_grpc.IPythonStub(channel)
response = stub.execute(ipython_pb2.ExecuteRequest(code="import time\nfor i in range(1,4):\n\ttime.sleep(1)\n\tprint(i)\n" +
"%matplotlib inline\nimport matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)"))
response = stub.execute(ipython_pb2.ExecuteRequest(code="""
from bokeh.io import output_notebook
import bkzep
output_notebook(notebook_type='zeppelin')
import hvplot.streamz
from streamz.dataframe import Random
streaming_df = Random(freq='5ms')
streaming_df.hvplot(backlog=100, height=400, width=500) +\
streaming_df.hvplot.hexbin(x='x', y='z', backlog=2000, height=400, width=500);
"""))
for r in response:
print("output:" + r.output)
response = stub.execute(ipython_pb2.ExecuteRequest(code="range?"))
for r in response:
print(r)
if __name__ == '__main__':
run()

View file

@ -23,7 +23,6 @@ from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@ -35,7 +34,8 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='ipython.proto',
package='ipython',
syntax='proto3',
serialized_pb=_b('\n\ripython.proto\x12\x07ipython\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.ipython.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionResponse\x12\x0f\n\x07matches\x18\x01 \x03(\t\"\x0f\n\rStatusRequest\"8\n\x0eStatusResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.IPythonStatus\"\r\n\x0bStopRequest\"\x0e\n\x0cStopResponse*\'\n\rExecuteStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01**\n\rIPythonStatus\x12\x0c\n\x08STARTING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01*!\n\nOutputType\x12\x08\n\x04TEXT\x10\x00\x12\t\n\x05IMAGE\x10\x01\x32\xc3\x02\n\x07IPython\x12@\n\x07\x65xecute\x12\x17.ipython.ExecuteRequest\x1a\x18.ipython.ExecuteResponse\"\x00\x30\x01\x12\x45\n\x08\x63omplete\x12\x1a.ipython.CompletionRequest\x1a\x1b.ipython.CompletionResponse\"\x00\x12;\n\x06\x63\x61ncel\x12\x16.ipython.CancelRequest\x1a\x17.ipython.CancelResponse\"\x00\x12;\n\x06status\x12\x16.ipython.StatusRequest\x1a\x17.ipython.StatusResponse\"\x00\x12\x35\n\x04stop\x12\x14.ipython.StopRequest\x1a\x15.ipython.StopResponse\"\x00\x42<\n org.apache.zeppelin.python.protoB\x0cIPythonProtoP\x01\xa2\x02\x07IPythonb\x06proto3')
serialized_options=_b('\n org.apache.zeppelin.python.protoB\014IPythonProtoP\001\242\002\007IPython'),
serialized_pb=_b('\n\ripython.proto\x12\x07ipython\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.ipython.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionResponse\x12\x0f\n\x07matches\x18\x01 \x03(\t\"\x0f\n\rStatusRequest\"8\n\x0eStatusResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.IPythonStatus\"\r\n\x0bStopRequest\"\x0e\n\x0cStopResponse*\'\n\rExecuteStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01**\n\rIPythonStatus\x12\x0c\n\x08STARTING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01*Q\n\nOutputType\x12\x08\n\x04TEXT\x10\x00\x12\x07\n\x03PNG\x10\x01\x12\x08\n\x04JPEG\x10\x02\x12\x08\n\x04HTML\x10\x03\x12\x07\n\x03SVG\x10\x04\x12\x08\n\x04JSON\x10\x05\x12\t\n\x05LaTeX\x10\x06\x32\xc3\x02\n\x07IPython\x12@\n\x07\x65xecute\x12\x17.ipython.ExecuteRequest\x1a\x18.ipython.ExecuteResponse\"\x00\x30\x01\x12\x45\n\x08\x63omplete\x12\x1a.ipython.CompletionRequest\x1a\x1b.ipython.CompletionResponse\"\x00\x12;\n\x06\x63\x61ncel\x12\x16.ipython.CancelRequest\x1a\x17.ipython.CancelResponse\"\x00\x12;\n\x06status\x12\x16.ipython.StatusRequest\x1a\x17.ipython.StatusResponse\"\x00\x12\x35\n\x04stop\x12\x14.ipython.StopRequest\x1a\x15.ipython.StopResponse\"\x00\x42<\n org.apache.zeppelin.python.protoB\x0cIPythonProtoP\x01\xa2\x02\x07IPythonb\x06proto3')
)
_EXECUTESTATUS = _descriptor.EnumDescriptor(
@ -46,15 +46,15 @@ _EXECUTESTATUS = _descriptor.EnumDescriptor(
values=[
_descriptor.EnumValueDescriptor(
name='SUCCESS', index=0, number=0,
options=None,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='ERROR', index=1, number=1,
options=None,
serialized_options=None,
type=None),
],
containing_type=None,
options=None,
serialized_options=None,
serialized_start=399,
serialized_end=438,
)
@ -69,15 +69,15 @@ _IPYTHONSTATUS = _descriptor.EnumDescriptor(
values=[
_descriptor.EnumValueDescriptor(
name='STARTING', index=0, number=0,
options=None,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='RUNNING', index=1, number=1,
options=None,
serialized_options=None,
type=None),
],
containing_type=None,
options=None,
serialized_options=None,
serialized_start=440,
serialized_end=482,
)
@ -92,17 +92,37 @@ _OUTPUTTYPE = _descriptor.EnumDescriptor(
values=[
_descriptor.EnumValueDescriptor(
name='TEXT', index=0, number=0,
options=None,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='IMAGE', index=1, number=1,
options=None,
name='PNG', index=1, number=1,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='JPEG', index=2, number=2,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='HTML', index=3, number=3,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='SVG', index=4, number=4,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='JSON', index=5, number=5,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='LaTeX', index=6, number=6,
serialized_options=None,
type=None),
],
containing_type=None,
options=None,
serialized_options=None,
serialized_start=484,
serialized_end=517,
serialized_end=565,
)
_sym_db.RegisterEnumDescriptor(_OUTPUTTYPE)
@ -112,7 +132,12 @@ ERROR = 1
STARTING = 0
RUNNING = 1
TEXT = 0
IMAGE = 1
PNG = 1
JPEG = 2
HTML = 3
SVG = 4
JSON = 5
LaTeX = 6
@ -129,14 +154,14 @@ _EXECUTEREQUEST = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -160,28 +185,28 @@ _EXECUTERESPONSE = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='type', full_name='ipython.ExecuteResponse.type', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='output', full_name='ipython.ExecuteResponse.output', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -205,7 +230,7 @@ _CANCELREQUEST = _descriptor.Descriptor(
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -229,7 +254,7 @@ _CANCELRESPONSE = _descriptor.Descriptor(
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -253,21 +278,21 @@ _COMPLETIONREQUEST = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='cursor', full_name='ipython.CompletionRequest.cursor', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -291,14 +316,14 @@ _COMPLETIONRESPONSE = _descriptor.Descriptor(
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -322,7 +347,7 @@ _STATUSREQUEST = _descriptor.Descriptor(
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -346,14 +371,14 @@ _STATUSRESPONSE = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -377,7 +402,7 @@ _STOPREQUEST = _descriptor.Descriptor(
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -401,7 +426,7 @@ _STOPRESPONSE = _descriptor.Descriptor(
nested_types=[],
enum_types=[
],
options=None,
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -500,252 +525,65 @@ StopResponse = _reflection.GeneratedProtocolMessageType('StopResponse', (_messag
_sym_db.RegisterMessage(StopResponse)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n org.apache.zeppelin.python.protoB\014IPythonProtoP\001\242\002\007IPython'))
try:
# THESE ELEMENTS WILL BE DEPRECATED.
# Please use the generated *_pb2_grpc.py files instead.
import grpc
from grpc.beta import implementations as beta_implementations
from grpc.beta import interfaces as beta_interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import utilities as face_utilities
DESCRIPTOR._options = None
_IPYTHON = _descriptor.ServiceDescriptor(
name='IPython',
full_name='ipython.IPython',
file=DESCRIPTOR,
index=0,
serialized_options=None,
serialized_start=568,
serialized_end=891,
methods=[
_descriptor.MethodDescriptor(
name='execute',
full_name='ipython.IPython.execute',
index=0,
containing_service=None,
input_type=_EXECUTEREQUEST,
output_type=_EXECUTERESPONSE,
serialized_options=None,
),
_descriptor.MethodDescriptor(
name='complete',
full_name='ipython.IPython.complete',
index=1,
containing_service=None,
input_type=_COMPLETIONREQUEST,
output_type=_COMPLETIONRESPONSE,
serialized_options=None,
),
_descriptor.MethodDescriptor(
name='cancel',
full_name='ipython.IPython.cancel',
index=2,
containing_service=None,
input_type=_CANCELREQUEST,
output_type=_CANCELRESPONSE,
serialized_options=None,
),
_descriptor.MethodDescriptor(
name='status',
full_name='ipython.IPython.status',
index=3,
containing_service=None,
input_type=_STATUSREQUEST,
output_type=_STATUSRESPONSE,
serialized_options=None,
),
_descriptor.MethodDescriptor(
name='stop',
full_name='ipython.IPython.stop',
index=4,
containing_service=None,
input_type=_STOPREQUEST,
output_type=_STOPRESPONSE,
serialized_options=None,
),
])
_sym_db.RegisterServiceDescriptor(_IPYTHON)
class IPythonStub(object):
"""The IPython service definition.
"""
DESCRIPTOR.services_by_name['IPython'] = _IPYTHON
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.execute = channel.unary_stream(
'/ipython.IPython/execute',
request_serializer=ExecuteRequest.SerializeToString,
response_deserializer=ExecuteResponse.FromString,
)
self.complete = channel.unary_unary(
'/ipython.IPython/complete',
request_serializer=CompletionRequest.SerializeToString,
response_deserializer=CompletionResponse.FromString,
)
self.cancel = channel.unary_unary(
'/ipython.IPython/cancel',
request_serializer=CancelRequest.SerializeToString,
response_deserializer=CancelResponse.FromString,
)
self.status = channel.unary_unary(
'/ipython.IPython/status',
request_serializer=StatusRequest.SerializeToString,
response_deserializer=StatusResponse.FromString,
)
self.stop = channel.unary_unary(
'/ipython.IPython/stop',
request_serializer=StopRequest.SerializeToString,
response_deserializer=StopResponse.FromString,
)
class IPythonServicer(object):
"""The IPython service definition.
"""
def execute(self, request, context):
"""Sends code
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def complete(self, request, context):
"""Get completion
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def cancel(self, request, context):
"""Cancel the running statement
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def status(self, request, context):
"""Get ipython kernel status
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def stop(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_IPythonServicer_to_server(servicer, server):
rpc_method_handlers = {
'execute': grpc.unary_stream_rpc_method_handler(
servicer.execute,
request_deserializer=ExecuteRequest.FromString,
response_serializer=ExecuteResponse.SerializeToString,
),
'complete': grpc.unary_unary_rpc_method_handler(
servicer.complete,
request_deserializer=CompletionRequest.FromString,
response_serializer=CompletionResponse.SerializeToString,
),
'cancel': grpc.unary_unary_rpc_method_handler(
servicer.cancel,
request_deserializer=CancelRequest.FromString,
response_serializer=CancelResponse.SerializeToString,
),
'status': grpc.unary_unary_rpc_method_handler(
servicer.status,
request_deserializer=StatusRequest.FromString,
response_serializer=StatusResponse.SerializeToString,
),
'stop': grpc.unary_unary_rpc_method_handler(
servicer.stop,
request_deserializer=StopRequest.FromString,
response_serializer=StopResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'ipython.IPython', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
class BetaIPythonServicer(object):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This class was generated
only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0."""
"""The IPython service definition.
"""
def execute(self, request, context):
"""Sends code
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def complete(self, request, context):
"""Get completion
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def cancel(self, request, context):
"""Cancel the running statement
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def status(self, request, context):
"""Get ipython kernel status
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def stop(self, request, context):
# missing associated documentation comment in .proto file
pass
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
class BetaIPythonStub(object):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This class was generated
only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0."""
"""The IPython service definition.
"""
def execute(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
"""Sends code
"""
raise NotImplementedError()
def complete(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
"""Get completion
"""
raise NotImplementedError()
complete.future = None
def cancel(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
"""Cancel the running statement
"""
raise NotImplementedError()
cancel.future = None
def status(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
"""Get ipython kernel status
"""
raise NotImplementedError()
status.future = None
def stop(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
# missing associated documentation comment in .proto file
pass
raise NotImplementedError()
stop.future = None
def beta_create_IPython_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This function was
generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0"""
request_deserializers = {
('ipython.IPython', 'cancel'): CancelRequest.FromString,
('ipython.IPython', 'complete'): CompletionRequest.FromString,
('ipython.IPython', 'execute'): ExecuteRequest.FromString,
('ipython.IPython', 'status'): StatusRequest.FromString,
('ipython.IPython', 'stop'): StopRequest.FromString,
}
response_serializers = {
('ipython.IPython', 'cancel'): CancelResponse.SerializeToString,
('ipython.IPython', 'complete'): CompletionResponse.SerializeToString,
('ipython.IPython', 'execute'): ExecuteResponse.SerializeToString,
('ipython.IPython', 'status'): StatusResponse.SerializeToString,
('ipython.IPython', 'stop'): StopResponse.SerializeToString,
}
method_implementations = {
('ipython.IPython', 'cancel'): face_utilities.unary_unary_inline(servicer.cancel),
('ipython.IPython', 'complete'): face_utilities.unary_unary_inline(servicer.complete),
('ipython.IPython', 'execute'): face_utilities.unary_stream_inline(servicer.execute),
('ipython.IPython', 'status'): face_utilities.unary_unary_inline(servicer.status),
('ipython.IPython', 'stop'): face_utilities.unary_unary_inline(servicer.stop),
}
server_options = beta_implementations.server_options(request_deserializers=request_deserializers, response_serializers=response_serializers, thread_pool=pool, thread_pool_size=pool_size, default_timeout=default_timeout, maximum_timeout=maximum_timeout)
return beta_implementations.server(method_implementations, options=server_options)
def beta_create_IPython_stub(channel, host=None, metadata_transformer=None, pool=None, pool_size=None):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This function was
generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0"""
request_serializers = {
('ipython.IPython', 'cancel'): CancelRequest.SerializeToString,
('ipython.IPython', 'complete'): CompletionRequest.SerializeToString,
('ipython.IPython', 'execute'): ExecuteRequest.SerializeToString,
('ipython.IPython', 'status'): StatusRequest.SerializeToString,
('ipython.IPython', 'stop'): StopRequest.SerializeToString,
}
response_deserializers = {
('ipython.IPython', 'cancel'): CancelResponse.FromString,
('ipython.IPython', 'complete'): CompletionResponse.FromString,
('ipython.IPython', 'execute'): ExecuteResponse.FromString,
('ipython.IPython', 'status'): StatusResponse.FromString,
('ipython.IPython', 'stop'): StopResponse.FromString,
}
cardinalities = {
'cancel': cardinality.Cardinality.UNARY_UNARY,
'complete': cardinality.Cardinality.UNARY_UNARY,
'execute': cardinality.Cardinality.UNARY_STREAM,
'status': cardinality.Cardinality.UNARY_UNARY,
'stop': cardinality.Cardinality.UNARY_UNARY,
}
stub_options = beta_implementations.stub_options(host=host, metadata_transformer=metadata_transformer, request_serializers=request_serializers, response_deserializers=response_deserializers, thread_pool=pool, thread_pool_size=pool_size)
return beta_implementations.dynamic_stub(channel, 'ipython.IPython', cardinalities, options=stub_options)
except ImportError:
pass
# @@protoc_insertion_point(module_scope)

View file

@ -49,23 +49,37 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
print("execute code:\n")
print(request.code.encode('utf-8'))
sys.stdout.flush()
stdout_queue = queue.Queue(maxsize = 10)
stderr_queue = queue.Queue(maxsize = 10)
image_queue = queue.Queue(maxsize = 5)
text_queue = queue.Queue(maxsize = 10)
png_queue = queue.Queue(maxsize = 5)
jpeg_queue = queue.Queue(maxsize = 5)
html_queue = queue.Queue(maxsize = 10)
def _output_hook(msg):
msg_type = msg['header']['msg_type']
content = msg['content']
print("******************* CONTENT ******************")
print(str(content)[:400])
if msg_type == 'stream':
stdout_queue.put(content['text'])
text_queue.put(content['text'])
elif msg_type in ('display_data', 'execute_result'):
stdout_queue.put(content['data'].get('text/plain', ''))
if 'image/png' in content['data']:
image_queue.put(content['data']['image/png'])
if 'text/html' in content['data']:
html_queue.put(content['data']['text/html'])
elif 'image/png' in content['data']:
png_queue.put(content['data']['image/png'])
elif 'image/jpeg' in content['data']:
jpeg_queue.put(content['data']['image/jpeg'])
elif 'text/plain' in content['data']:
text_queue.put(content['data']['text/plain'])
elif 'application/javascript' in content['data']:
print('add to html queue: ' + str(content)[:100])
html_queue.put('<script> ' + content['data']['application/javascript'] + ' </script>\n')
elif 'application/vnd.holoviews_load.v0+json' in content['data']:
html_queue.put('<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n')
elif msg_type == 'error':
stderr_queue.put('\n'.join(content['traceback']))
payload_reply = []
def execute_worker():
reply = self._kc.execute_interactive(request.code,
@ -80,21 +94,32 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
# Execution might be stuck there:
# https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
while t.is_alive() and self.isKernelAlive():
while not stdout_queue.empty():
output = stdout_queue.get()
while not text_queue.empty():
output = text_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.TEXT,
output=output)
while not html_queue.empty():
output = html_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.HTML,
output=output)
while not stderr_queue.empty():
output = stderr_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
type=ipython_pb2.TEXT,
output=output)
while not image_queue.empty():
output = image_queue.get()
while not png_queue.empty():
output = png_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.IMAGE,
type=ipython_pb2.PNG,
output=output)
while not jpeg_queue.empty():
output = jpeg_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.JPEG,
output=output)
# if kernel is not alive (should be same as thread is still alive), means that we face
# an unexpected issue.
@ -104,22 +129,31 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
return
while not stdout_queue.empty():
output = stdout_queue.get()
while not text_queue.empty():
output = text_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.TEXT,
output=output)
while not html_queue.empty():
output = html_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.HTML,
output=output)
while not stderr_queue.empty():
output = stderr_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
type=ipython_pb2.TEXT,
output=output)
while not image_queue.empty():
output = image_queue.get()
while not png_queue.empty():
output = png_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.IMAGE,
type=ipython_pb2.PNG,
output=output)
while not jpeg_queue.empty():
output = jpeg_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
type=ipython_pb2.JPEG,
output=output)
if payload_reply:
result = []
for payload in payload_reply[0]['content']['payload']:

View file

@ -185,7 +185,6 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
// check there must be one IMAGE output
boolean hasImageOutput = false;
boolean hasLineText = false;
boolean hasFigureText = false;
for (InterpreterResultMessage msg : interpreterResultMessages) {
if (msg.getType() == InterpreterResult.Type.IMG) {
hasImageOutput = true;
@ -194,14 +193,9 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
&& msg.getData().contains("matplotlib.lines.Line2D")) {
hasLineText = true;
}
if (msg.getType() == InterpreterResult.Type.TEXT
&& msg.getData().contains("matplotlib.figure.Figure")) {
hasFigureText = true;
}
}
assertTrue("No Image Output", hasImageOutput);
assertTrue("No Line Text", hasLineText);
assertTrue("No Figure Text", hasFigureText);
// bokeh
// bokeh initialization
@ -256,6 +250,35 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
assertTrue("No Image Output", hasImageOutput);
}
// TODO(zjffdu) Enable it after new altair is released with this PR.
// https://github.com/altair-viz/altair/pull/1620
//@Test
public void testHtmlOutput() throws InterpreterException, IOException {
// html output
InterpreterContext context = getInterpreterContext();
InterpreterResult result = interpreter.interpret(
" import altair as alt\n" +
" print(alt.renderers.active)\n" +
" alt.renderers.enable(\"colab\")\n" +
" import altair as alt\n" +
" # load a simple dataset as a pandas DataFrame\n" +
" from vega_datasets import data\n" +
" cars = data.cars()\n" +
" \n" +
" alt.Chart(cars).mark_point().encode(\n" +
" x='Horsepower',\n" +
" y='Miles_per_Gallon',\n" +
" color='Origin',\n" +
" ).interactive()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(2, context.out.size());
assertEquals(InterpreterResult.Type.TEXT,
context.out.toInterpreterResultMessage().get(0).getType());
assertEquals(InterpreterResult.Type.HTML,
context.out.toInterpreterResultMessage().get(1).getType());
}
@Test
public void testGrpcFrameSize() throws InterpreterException, IOException {
tearDown();

View file

@ -96,7 +96,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
File destFile = new File(destPath, srcFile.getName());
if (!destFile.exists() || !FileUtils.contentEquals(srcFile, destFile)) {
FileUtils.copyFile(srcFile, destFile);
logger.info("copy {} to {}", srcFile.getAbsolutePath(), destPath);
logger.debug("copy {} to {}", srcFile.getAbsolutePath(), destPath);
}
}
}
@ -114,7 +114,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
if (!destFile.exists() || !FileUtils.contentEquals(srcFile, destFile)) {
FileUtils.copyFile(srcFile, destFile);
logger.info("copy {} to {}", srcFile.getAbsolutePath(), destPath);
logger.debug("copy {} to {}", srcFile.getAbsolutePath(), destPath);
}
}
@ -142,7 +142,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
List<File> files = new LinkedList<>();
for (ArtifactResult artifactResult : listOfArtifact) {
files.add(artifactResult.getArtifact().getFile());
logger.info("load {}", artifactResult.getArtifact().getFile().getAbsolutePath());
logger.debug("load {}", artifactResult.getArtifact().getFile().getAbsolutePath());
}
return files;

View file

@ -102,6 +102,10 @@ public class VFSNotebookRepo implements NotebookRepo {
private Map<String, NoteInfo> listFolder(FileObject fileObject) throws IOException {
Map<String, NoteInfo> noteInfos = new HashMap<>();
if (fileObject.isFolder()) {
if (fileObject.getName().getBaseName().startsWith(".")) {
LOGGER.warn("Skip hidden folder: " + fileObject.getName().getPath());
return noteInfos;
}
for (FileObject child : fileObject.getChildren()) {
noteInfos.putAll(listFolder(child));
}