diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index b9b6b0a822..8cec32a6f3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -266,4 +266,16 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector { RemoteInterpreterEventType.OUTPUT_UPDATE, gson.toJson(appendOutput))); } + + public void onAppStatusUpdate(String noteId, String paragraphId, String appId, String status) { + Map appendOutput = new HashMap(); + appendOutput.put("noteId", noteId); + appendOutput.put("paragraphId", paragraphId); + appendOutput.put("appId", appId); + appendOutput.put("status", status); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.APP_STATUS_UPDATE, + gson.toJson(appendOutput))); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 9d94d97a8a..48c14d50bd 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -164,7 +164,7 @@ public class RemoteInterpreterEventPoller extends Thread { } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) { // on output update Map outputAppend = gson.fromJson( - event.getData(), new TypeToken>() {}.getType()); + event.getData(), new TypeToken>() {}.getType()); String noteId = outputAppend.get("noteId"); String paragraphId = outputAppend.get("paragraphId"); String outputToUpdate = outputAppend.get("data"); @@ -175,6 +175,17 @@ public class RemoteInterpreterEventPoller extends Thread { } else { appListener.onOutputUpdated(noteId, paragraphId, appId, outputToUpdate); } + } else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) { + // on output update + Map appStatusUpdate = gson.fromJson( + event.getData(), new TypeToken>() {}.getType()); + + String noteId = appStatusUpdate.get("noteId"); + String paragraphId = appStatusUpdate.get("paragraphId"); + String appId = appStatusUpdate.get("appId"); + String status = appStatusUpdate.get("status"); + + appListener.onStatusChange(noteId, paragraphId, appId, status); } logger.debug("Event from remoteproceess {}", event.getType()); } catch (Exception e) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index ffb658ef2a..6cc7e89365 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -242,6 +242,23 @@ public class RemoteInterpreterServer @Override public void close(String noteId, String className) throws TException { + // unload all applications + for (String appId : runningApplications.keySet()) { + RunningApplication appInfo = runningApplications.get(appId); + + // see NoteInterpreterLoader.SHARED_SESSION + if (appInfo.noteId.equals(noteId) || noteId.equals("shared_session")) { + try { + appInfo.app.unload(); + // see ApplicationState.Status.UNLOADED + eventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED"); + } catch (ApplicationException e) { + logger.error(e.getMessage(), e); + } + } + } + + // close interpreters synchronized (interpreterGroup) { List interpreters = interpreterGroup.get(noteId); if (interpreters != null) { @@ -790,7 +807,9 @@ public class RemoteInterpreterServer noteId, paragraphId); app = appLoader.load(pkgInfo, context); - runningApplications.put(applicationInstanceId, new RunningApplication(pkgInfo, app)); + runningApplications.put( + applicationInstanceId, + new RunningApplication(pkgInfo, app, noteId, paragraphId)); return new RemoteApplicationResult(true, ""); } catch (Exception e) { logger.error(e.getMessage(), e); @@ -855,10 +874,17 @@ public class RemoteInterpreterServer private static class RunningApplication { public final Application app; public final HeliumPackage pkg; + public final String noteId; + public final String paragraphId; - public RunningApplication(HeliumPackage pkg, Application app) { + public RunningApplication(HeliumPackage pkg, + Application app, + String noteId, + String paragraphId) { this.app = app; this.pkg = pkg; + this.noteId = noteId; + this.paragraphId = paragraphId; } }; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 66631d2f2a..32dfef6c5b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -38,7 +38,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { RESOURCE_GET(7), OUTPUT_APPEND(8), OUTPUT_UPDATE(9), - ANGULAR_REGISTRY_PUSH(10); + ANGULAR_REGISTRY_PUSH(10), + APP_STATUS_UPDATE(11); private final int value; diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java index 9a882757cc..6e499d400c 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java @@ -229,6 +229,47 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory { notebook.removeNote(note1.getId()); } + + @Test + public void testUnloadOnInterpreterRestart() throws IOException { + // given + HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION, + "name1", + "desc1", + "", + HeliumTestApplication.class.getName(), + new String[][]{}); + + Note note1 = notebook.createNote(); + notebook.bindInterpretersToNote(note1.id(), factory.getDefaultInterpreterSettingList()); + + Paragraph p1 = note1.addParagraph(); + + // make sure interpreter process running + p1.setText("job"); + note1.run(p1.getId()); + while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield(); + + assertEquals(0, p1.getAllApplicationStates().size()); + String appId = heliumAppFactory.loadAndRun(pkg1, p1); + ApplicationState app = p1.getApplicationState(appId); + while (app.getStatus() != ApplicationState.Status.LOADED) { + Thread.yield(); + } + + // when unbind interpreter + factory.restart(factory.getDefaultInterpreterSettingList().get(0)); + while (app.getStatus() == ApplicationState.Status.LOADED) { + Thread.yield(); + } + + // then + assertEquals(ApplicationState.Status.UNLOADED, app.getStatus()); + + // clean + notebook.removeNote(note1.getId()); + } + @Override public ParagraphJobListener getParagraphJobListener(Note note) { return new ParagraphJobListener() {