diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py index 4b68efdf27..36e0a13099 100644 --- a/python/src/main/resources/grpc/python/ipython_server.py +++ b/python/src/main/resources/grpc/python/ipython_server.py @@ -16,6 +16,7 @@ from __future__ import print_function import jupyter_client +import os import sys import threading import time @@ -25,8 +26,6 @@ import grpc import ipython_pb2 import ipython_pb2_grpc -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 - is_py2 = sys.version[0] == '2' if is_py2: import Queue as queue @@ -34,8 +33,6 @@ else: import queue as queue -TIMEOUT = 60*60*24*365*100 # 100 years - class IPython(ipython_pb2_grpc.IPythonServicer): def __init__(self, server): @@ -73,13 +70,16 @@ class IPython(ipython_pb2_grpc.IPythonServicer): def execute_worker(): reply = self._kc.execute_interactive(request.code, output_hook=_output_hook, - timeout=TIMEOUT) + timeout=None) payload_reply.append(reply) t = threading.Thread(name="ConsumerThread", target=execute_worker) t.start() - while t.is_alive(): + # We want to ensure that the kernel is alive because in case of OOM or other errors + # Execution might be stuck there: + # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32 + while t.is_alive() and self.isKernelAlive(): while not stdout_queue.empty(): output = stdout_queue.get() yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, @@ -96,6 +96,14 @@ class IPython(ipython_pb2_grpc.IPythonServicer): type=ipython_pb2.IMAGE, output=output) + # if kernel is not alive (should be same as thread is still alive), means that we face + # an unexpected issue. + if not self.isKernelAlive() or t.is_alive(): + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR, + type=ipython_pb2.TEXT, + output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.") + return + while not stdout_queue.empty(): output = stdout_queue.get() yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, @@ -127,15 +135,21 @@ class IPython(ipython_pb2_grpc.IPythonServicer): return ipython_pb2.CancelResponse() def complete(self, request, context): - reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=TIMEOUT) + reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None) return ipython_pb2.CompletionResponse(matches=reply['content']['matches']) def status(self, request, context): return ipython_pb2.StatusResponse(status = self._status) + def isKernelAlive(self): + return self._km.is_alive() + + def terminate(self): + self._km.shutdown_kernel() + def stop(self, request, context): - self._server.stop(0) - sys.exit(0) + self.terminate() + return ipython_pb2.StopResponse() def serve(port): @@ -146,10 +160,16 @@ def serve(port): server.start() ipython.start() try: - while True: - time.sleep(_ONE_DAY_IN_SECONDS) + while ipython.isKernelAlive(): + time.sleep(5) except KeyboardInterrupt: - server.stop(0) + print("interrupted") + finally: + print("shutdown") + # we let 2 sc for all request to be complete + server.stop(2) + ipython.terminate() + os._exit(0) if __name__ == '__main__': serve(sys.argv[1])