mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
UnloadApp on interpreter restart
This commit is contained in:
parent
3b891deb58
commit
83eba9826e
5 changed files with 95 additions and 4 deletions
|
|
@ -266,4 +266,16 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
RemoteInterpreterEventType.OUTPUT_UPDATE,
|
||||
gson.toJson(appendOutput)));
|
||||
}
|
||||
|
||||
public void onAppStatusUpdate(String noteId, String paragraphId, String appId, String status) {
|
||||
Map<String, String> appendOutput = new HashMap<String, String>();
|
||||
appendOutput.put("noteId", noteId);
|
||||
appendOutput.put("paragraphId", paragraphId);
|
||||
appendOutput.put("appId", appId);
|
||||
appendOutput.put("status", status);
|
||||
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
RemoteInterpreterEventType.APP_STATUS_UPDATE,
|
||||
gson.toJson(appendOutput)));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -164,7 +164,7 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
|
||||
// on output update
|
||||
Map<String, String> outputAppend = gson.fromJson(
|
||||
event.getData(), new TypeToken<Map<String, String>>() {}.getType());
|
||||
event.getData(), new TypeToken<Map<String, String>>() {}.getType());
|
||||
String noteId = outputAppend.get("noteId");
|
||||
String paragraphId = outputAppend.get("paragraphId");
|
||||
String outputToUpdate = outputAppend.get("data");
|
||||
|
|
@ -175,6 +175,17 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
} else {
|
||||
appListener.onOutputUpdated(noteId, paragraphId, appId, outputToUpdate);
|
||||
}
|
||||
} else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) {
|
||||
// on output update
|
||||
Map<String, String> appStatusUpdate = gson.fromJson(
|
||||
event.getData(), new TypeToken<Map<String, String>>() {}.getType());
|
||||
|
||||
String noteId = appStatusUpdate.get("noteId");
|
||||
String paragraphId = appStatusUpdate.get("paragraphId");
|
||||
String appId = appStatusUpdate.get("appId");
|
||||
String status = appStatusUpdate.get("status");
|
||||
|
||||
appListener.onStatusChange(noteId, paragraphId, appId, status);
|
||||
}
|
||||
logger.debug("Event from remoteproceess {}", event.getType());
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -242,6 +242,23 @@ public class RemoteInterpreterServer
|
|||
|
||||
@Override
|
||||
public void close(String noteId, String className) throws TException {
|
||||
// unload all applications
|
||||
for (String appId : runningApplications.keySet()) {
|
||||
RunningApplication appInfo = runningApplications.get(appId);
|
||||
|
||||
// see NoteInterpreterLoader.SHARED_SESSION
|
||||
if (appInfo.noteId.equals(noteId) || noteId.equals("shared_session")) {
|
||||
try {
|
||||
appInfo.app.unload();
|
||||
// see ApplicationState.Status.UNLOADED
|
||||
eventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED");
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// close interpreters
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
if (interpreters != null) {
|
||||
|
|
@ -790,7 +807,9 @@ public class RemoteInterpreterServer
|
|||
noteId,
|
||||
paragraphId);
|
||||
app = appLoader.load(pkgInfo, context);
|
||||
runningApplications.put(applicationInstanceId, new RunningApplication(pkgInfo, app));
|
||||
runningApplications.put(
|
||||
applicationInstanceId,
|
||||
new RunningApplication(pkgInfo, app, noteId, paragraphId));
|
||||
return new RemoteApplicationResult(true, "");
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
|
|
@ -855,10 +874,17 @@ public class RemoteInterpreterServer
|
|||
private static class RunningApplication {
|
||||
public final Application app;
|
||||
public final HeliumPackage pkg;
|
||||
public final String noteId;
|
||||
public final String paragraphId;
|
||||
|
||||
public RunningApplication(HeliumPackage pkg, Application app) {
|
||||
public RunningApplication(HeliumPackage pkg,
|
||||
Application app,
|
||||
String noteId,
|
||||
String paragraphId) {
|
||||
this.app = app;
|
||||
this.pkg = pkg;
|
||||
this.noteId = noteId;
|
||||
this.paragraphId = paragraphId;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
|||
RESOURCE_GET(7),
|
||||
OUTPUT_APPEND(8),
|
||||
OUTPUT_UPDATE(9),
|
||||
ANGULAR_REGISTRY_PUSH(10);
|
||||
ANGULAR_REGISTRY_PUSH(10),
|
||||
APP_STATUS_UPDATE(11);
|
||||
|
||||
private final int value;
|
||||
|
||||
|
|
|
|||
|
|
@ -229,6 +229,47 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
notebook.removeNote(note1.getId());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUnloadOnInterpreterRestart() throws IOException {
|
||||
// given
|
||||
HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
|
||||
"name1",
|
||||
"desc1",
|
||||
"",
|
||||
HeliumTestApplication.class.getName(),
|
||||
new String[][]{});
|
||||
|
||||
Note note1 = notebook.createNote();
|
||||
notebook.bindInterpretersToNote(note1.id(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
Paragraph p1 = note1.addParagraph();
|
||||
|
||||
// make sure interpreter process running
|
||||
p1.setText("job");
|
||||
note1.run(p1.getId());
|
||||
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
|
||||
|
||||
assertEquals(0, p1.getAllApplicationStates().size());
|
||||
String appId = heliumAppFactory.loadAndRun(pkg1, p1);
|
||||
ApplicationState app = p1.getApplicationState(appId);
|
||||
while (app.getStatus() != ApplicationState.Status.LOADED) {
|
||||
Thread.yield();
|
||||
}
|
||||
|
||||
// when unbind interpreter
|
||||
factory.restart(factory.getDefaultInterpreterSettingList().get(0));
|
||||
while (app.getStatus() == ApplicationState.Status.LOADED) {
|
||||
Thread.yield();
|
||||
}
|
||||
|
||||
// then
|
||||
assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());
|
||||
|
||||
// clean
|
||||
notebook.removeNote(note1.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParagraphJobListener getParagraphJobListener(Note note) {
|
||||
return new ParagraphJobListener() {
|
||||
|
|
|
|||
Loading…
Reference in a new issue