mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
use one queue and use sleep to improve ipython performance
This commit is contained in:
parent
79f751a0e0
commit
42f3ca097e
1 changed files with 38 additions and 83 deletions
|
|
@ -53,38 +53,45 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
print("execute code:\n")
|
||||
print(request.code.encode('utf-8'))
|
||||
sys.stdout.flush()
|
||||
stderr_queue = queue.Queue(maxsize = 10)
|
||||
text_queue = queue.Queue(maxsize = 10)
|
||||
png_queue = queue.Queue(maxsize = 5)
|
||||
jpeg_queue = queue.Queue(maxsize = 5)
|
||||
html_queue = queue.Queue(maxsize = 10)
|
||||
|
||||
stream_reply_queue = queue.Queue(maxsize = 20)
|
||||
payload_reply = []
|
||||
def _output_hook(msg):
|
||||
msg_type = msg['header']['msg_type']
|
||||
content = msg['content']
|
||||
print("******************* CONTENT ******************")
|
||||
print(str(content)[:400])
|
||||
outstatus, outtype, outtype = ipython_pb2.SUCCESS, None, None
|
||||
if msg_type == 'stream':
|
||||
text_queue.put(content['text'])
|
||||
elif msg_type in ('display_data', 'execute_result'):
|
||||
if 'text/html' in content['data']:
|
||||
html_queue.put(content['data']['text/html'])
|
||||
elif 'image/png' in content['data']:
|
||||
png_queue.put(content['data']['image/png'])
|
||||
elif 'image/jpeg' in content['data']:
|
||||
jpeg_queue.put(content['data']['image/jpeg'])
|
||||
elif 'text/plain' in content['data']:
|
||||
text_queue.put(content['data']['text/plain'])
|
||||
elif 'application/javascript' in content['data']:
|
||||
print('add to html queue: ' + str(content)[:100])
|
||||
html_queue.put('<script> ' + content['data']['application/javascript'] + ' </script>\n')
|
||||
elif 'application/vnd.holoviews_load.v0+json' in content['data']:
|
||||
html_queue.put('<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n')
|
||||
|
||||
outtype = ipython_pb2.TEXT
|
||||
output = content['text']
|
||||
elif msg_type == 'error':
|
||||
stderr_queue.put('\n'.join(content['traceback']))
|
||||
outstatus = ipython_pb2.ERROR
|
||||
outtype = ipython_pb2.TEXT
|
||||
output = '\n'.join(content['traceback'])
|
||||
elif msg_type in ('display_data', 'execute_result'):
|
||||
if 'image/jpeg' in content['data']:
|
||||
type = ipython_pb2.JPEG
|
||||
output = content['data']['image/jpeg']
|
||||
elif 'image/png' in content['data']:
|
||||
type = ipython_pb2.PNG
|
||||
output = content['data']['image/png']
|
||||
if 'text/plain' in content['data']:
|
||||
type = ipython_pb2.TEXT
|
||||
output = content['data']['text/plain']
|
||||
elif 'text/html' in content['data']:
|
||||
type = ipython_pb2.HTML
|
||||
output = content['data']['text/html']
|
||||
elif 'application/javascript' in content['data']:
|
||||
type = ipython_pb2.HTML
|
||||
output = '<script> ' + content['data']['application/javascript'] + ' </script>\n'
|
||||
print('add to html output: ' + str(content)[:100])
|
||||
elif 'application/vnd.holoviews_load.v0+json' in content['data']:
|
||||
type = ipython_pb2.HTML
|
||||
output = '<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n'
|
||||
if outtype:
|
||||
stream_reply_queue.put(
|
||||
ipython_pb2.ExecuteResponse(status=outstatus, type=outtype, output=output)
|
||||
|
||||
payload_reply = []
|
||||
def execute_worker():
|
||||
reply = self._kc.execute_interactive(request.code,
|
||||
output_hook=_output_hook,
|
||||
|
|
@ -97,66 +104,14 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
# We want to ensure that the kernel is alive because in case of OOM or other errors
|
||||
# Execution might be stuck there:
|
||||
# https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
|
||||
while t.is_alive() and self.isKernelAlive():
|
||||
while not text_queue.empty():
|
||||
output = text_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.TEXT,
|
||||
output=output)
|
||||
while not html_queue.empty():
|
||||
output = html_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.HTML,
|
||||
output=output)
|
||||
while not stderr_queue.empty():
|
||||
output = stderr_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
|
||||
type=ipython_pb2.TEXT,
|
||||
output=output)
|
||||
while not png_queue.empty():
|
||||
output = png_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.PNG,
|
||||
output=output)
|
||||
while not jpeg_queue.empty():
|
||||
output = jpeg_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.JPEG,
|
||||
output=output)
|
||||
while t.is_alive() and self.isKernelAlive() or not stream_reply_queue.empty():
|
||||
# Sleeping time to time to reduce cpu usage.
|
||||
# At worst it will bring a 0.05 delay for bunch of messages.
|
||||
# Overall it will improve performance.
|
||||
time.sleep(0.05)
|
||||
while not stream_reply_queue.empty():
|
||||
yield stream_reply_queue.get()
|
||||
|
||||
# if kernel is not alive (should be same as thread is still alive), means that we face
|
||||
# an unexpected issue.
|
||||
if not self.isKernelAlive() or t.is_alive():
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
|
||||
type=ipython_pb2.TEXT,
|
||||
output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
|
||||
return
|
||||
|
||||
while not text_queue.empty():
|
||||
output = text_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.TEXT,
|
||||
output=output)
|
||||
while not html_queue.empty():
|
||||
output = html_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.HTML,
|
||||
output=output)
|
||||
while not stderr_queue.empty():
|
||||
output = stderr_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
|
||||
type=ipython_pb2.TEXT,
|
||||
output=output)
|
||||
while not png_queue.empty():
|
||||
output = png_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.PNG,
|
||||
output=output)
|
||||
while not jpeg_queue.empty():
|
||||
output = jpeg_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.JPEG,
|
||||
output=output)
|
||||
if payload_reply:
|
||||
result = []
|
||||
for payload in payload_reply[0]['content']['payload']:
|
||||
|
|
|
|||
Loading…
Reference in a new issue