Update Application Status from RemoteInterpreterProcess event

This commit is contained in:
Lee moon soo 2016-05-07 22:11:35 -07:00
parent f07ada1d94
commit 8186daf58e
12 changed files with 38 additions and 10 deletions

View file

@ -278,4 +278,19 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
RemoteInterpreterEventType.APP_STATUS_UPDATE,
gson.toJson(appendOutput)));
}
/**
* Wait for eventQueue becomes empty
*/
public void waitForEventQueueBecomesEmpty() {
synchronized (eventQueue) {
while (!eventQueue.isEmpty()) {
try {
eventQueue.wait(100);
} catch (InterruptedException e) {
// ignore exception
}
}
}
}
}

View file

@ -92,6 +92,7 @@ public class RemoteInterpreterServer
@Override
public void shutdown() throws TException {
eventClient.waitForEventQueueBecomesEmpty();
if (interpreterGroup != null) {
interpreterGroup.close();
interpreterGroup.destroy();
@ -249,6 +250,7 @@ public class RemoteInterpreterServer
// see NoteInterpreterLoader.SHARED_SESSION
if (appInfo.noteId.equals(noteId) || noteId.equals("shared_session")) {
try {
logger.info("Unload App {} ", appInfo.pkg.getName());
appInfo.app.unload();
// see ApplicationState.Status.UNLOADED
eventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -80,6 +80,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return OUTPUT_UPDATE;
case 10:
return ANGULAR_REGISTRY_PUSH;
case 11:
return APP_STATUS_UPDATE;
default:
return null;
}

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
public class RemoteInterpreterService {
public interface Iface {

View file

@ -48,7 +48,8 @@ enum RemoteInterpreterEventType {
RESOURCE_GET = 7
OUTPUT_APPEND = 8,
OUTPUT_UPDATE = 9,
ANGULAR_REGISTRY_PUSH=10
ANGULAR_REGISTRY_PUSH = 10,
APP_STATUS_UPDATE = 11,
}
struct RemoteInterpreterEvent {

View file

@ -36,8 +36,6 @@ import java.util.concurrent.ExecutorService;
/**
* HeliumApplicationFactory
*
* TODO(moon): unload apps on interpreter restart
*/
public class HeliumApplicationFactory implements ApplicationEventListener, NotebookEventListener {
private final Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
@ -376,6 +374,11 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
@Override
public void onStatusChange(String noteId, String paragraphId, String appId, String status) {
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
if (appToUpdate != null) {
appToUpdate.setStatus(ApplicationState.Status.valueOf(status));
}
if (applicationEventListener != null) {
applicationEventListener.onStatusChange(noteId, paragraphId, appId, status);
}

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.InterpreterGroup;
/**
* Current state of application

View file

@ -76,7 +76,6 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
heliumAppFactory = new HeliumApplicationFactory();
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf,
new InterpreterOption(true), null, null, heliumAppFactory, depResolver);
@ -257,7 +256,12 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
Thread.yield();
}
// when unbind interpreter
// wait until application is executed
while (!"Hello world 1".equals(app.getOutput())) {
Thread.yield();
}
// when restart interpreter
factory.restart(factory.getDefaultInterpreterSettingList().get(0));
while (app.getStatus() == ApplicationState.Status.LOADED) {
Thread.yield();