mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
move synchronize near the thread check
This commit is contained in:
parent
616f0122fe
commit
f7cae95381
1 changed files with 35 additions and 37 deletions
|
|
@ -86,45 +86,43 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
|
||||
payload_reply = []
|
||||
def execute_worker():
|
||||
with self._lock:
|
||||
reply = self._kc.execute_interactive(request.code,
|
||||
output_hook=_output_hook,
|
||||
timeout=None)
|
||||
payload_reply.append(reply)
|
||||
reply = self._kc.execute_interactive(request.code,
|
||||
output_hook=_output_hook,
|
||||
timeout=None)
|
||||
payload_reply.append(reply)
|
||||
|
||||
t = threading.Thread(name="ConsumerThread", target=execute_worker)
|
||||
t.start()
|
||||
|
||||
# We want to ensure that the kernel is alive because in case of OOM or other errors
|
||||
# Execution might be stuck there:
|
||||
# https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
|
||||
while t.is_alive() and self.isKernelAlive():
|
||||
while not text_queue.empty():
|
||||
output = text_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.TEXT,
|
||||
output=output)
|
||||
while not html_queue.empty():
|
||||
output = html_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.HTML,
|
||||
output=output)
|
||||
while not stderr_queue.empty():
|
||||
output = stderr_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
|
||||
type=ipython_pb2.TEXT,
|
||||
output=output)
|
||||
while not png_queue.empty():
|
||||
output = png_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.PNG,
|
||||
output=output)
|
||||
while not jpeg_queue.empty():
|
||||
output = jpeg_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.JPEG,
|
||||
output=output)
|
||||
|
||||
with self._lock:
|
||||
t.start()
|
||||
# We want to ensure that the kernel is alive because in case of OOM or other errors
|
||||
# Execution might be stuck there:
|
||||
# https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
|
||||
while t.is_alive() and self.isKernelAlive():
|
||||
while not text_queue.empty():
|
||||
output = text_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.TEXT,
|
||||
output=output)
|
||||
while not html_queue.empty():
|
||||
output = html_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.HTML,
|
||||
output=output)
|
||||
while not stderr_queue.empty():
|
||||
output = stderr_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
|
||||
type=ipython_pb2.TEXT,
|
||||
output=output)
|
||||
while not png_queue.empty():
|
||||
output = png_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.PNG,
|
||||
output=output)
|
||||
while not jpeg_queue.empty():
|
||||
output = jpeg_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
type=ipython_pb2.JPEG,
|
||||
output=output)
|
||||
|
||||
# if kernel is not alive (should be same as thread is still alive), means that we face
|
||||
# an unexpected issue.
|
||||
|
|
|
|||
Loading…
Reference in a new issue