mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
handle kernel crash
This commit is contained in:
parent
4219d55234
commit
5fe84dfef9
1 changed files with 32 additions and 12 deletions
|
|
@ -16,6 +16,7 @@
|
|||
from __future__ import print_function
|
||||
|
||||
import jupyter_client
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
|
@ -25,8 +26,6 @@ import grpc
|
|||
import ipython_pb2
|
||||
import ipython_pb2_grpc
|
||||
|
||||
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
|
||||
|
||||
is_py2 = sys.version[0] == '2'
|
||||
if is_py2:
|
||||
import Queue as queue
|
||||
|
|
@ -34,8 +33,6 @@ else:
|
|||
import queue as queue
|
||||
|
||||
|
||||
TIMEOUT = 60*60*24*365*100 # 100 years
|
||||
|
||||
class IPython(ipython_pb2_grpc.IPythonServicer):
|
||||
|
||||
def __init__(self, server):
|
||||
|
|
@ -73,13 +70,16 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
def execute_worker():
|
||||
reply = self._kc.execute_interactive(request.code,
|
||||
output_hook=_output_hook,
|
||||
timeout=TIMEOUT)
|
||||
timeout=None)
|
||||
payload_reply.append(reply)
|
||||
|
||||
t = threading.Thread(name="ConsumerThread", target=execute_worker)
|
||||
t.start()
|
||||
|
||||
while t.is_alive():
|
||||
# We want to ensure that the kernel is alive because in case of OOM or other errors
|
||||
# Execution might be stuck there:
|
||||
# https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
|
||||
while t.is_alive() and self.isKernelAlive():
|
||||
while not stdout_queue.empty():
|
||||
output = stdout_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
|
|
@ -96,6 +96,14 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
type=ipython_pb2.IMAGE,
|
||||
output=output)
|
||||
|
||||
# if kernel is not alive (should be same as thread is still alive), means that we face
|
||||
# an unexpected issue.
|
||||
if not self.isKernelAlive() or t.is_alive():
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
|
||||
type=ipython_pb2.TEXT,
|
||||
output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
|
||||
return
|
||||
|
||||
while not stdout_queue.empty():
|
||||
output = stdout_queue.get()
|
||||
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
|
||||
|
|
@ -127,15 +135,21 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
|
|||
return ipython_pb2.CancelResponse()
|
||||
|
||||
def complete(self, request, context):
|
||||
reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=TIMEOUT)
|
||||
reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None)
|
||||
return ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
|
||||
|
||||
def status(self, request, context):
|
||||
return ipython_pb2.StatusResponse(status = self._status)
|
||||
|
||||
def isKernelAlive(self):
|
||||
return self._km.is_alive()
|
||||
|
||||
def terminate(self):
|
||||
self._km.shutdown_kernel()
|
||||
|
||||
def stop(self, request, context):
|
||||
self._server.stop(0)
|
||||
sys.exit(0)
|
||||
self.terminate()
|
||||
return ipython_pb2.StopResponse()
|
||||
|
||||
|
||||
def serve(port):
|
||||
|
|
@ -146,10 +160,16 @@ def serve(port):
|
|||
server.start()
|
||||
ipython.start()
|
||||
try:
|
||||
while True:
|
||||
time.sleep(_ONE_DAY_IN_SECONDS)
|
||||
while ipython.isKernelAlive():
|
||||
time.sleep(5)
|
||||
except KeyboardInterrupt:
|
||||
server.stop(0)
|
||||
print("interrupted")
|
||||
finally:
|
||||
print("shutdown")
|
||||
# we let 2 sc for all request to be complete
|
||||
server.stop(2)
|
||||
ipython.terminate()
|
||||
os._exit(0)
|
||||
|
||||
if __name__ == '__main__':
|
||||
serve(sys.argv[1])
|
||||
|
|
|
|||
Loading…
Reference in a new issue