mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Update Application Status from RemoteInterpreterProcess event
This commit is contained in:
parent
f07ada1d94
commit
8186daf58e
12 changed files with 38 additions and 10 deletions
|
|
@ -278,4 +278,19 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
RemoteInterpreterEventType.APP_STATUS_UPDATE,
|
||||
gson.toJson(appendOutput)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for eventQueue becomes empty
|
||||
*/
|
||||
public void waitForEventQueueBecomesEmpty() {
|
||||
synchronized (eventQueue) {
|
||||
while (!eventQueue.isEmpty()) {
|
||||
try {
|
||||
eventQueue.wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore exception
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,6 +92,7 @@ public class RemoteInterpreterServer
|
|||
|
||||
@Override
|
||||
public void shutdown() throws TException {
|
||||
eventClient.waitForEventQueueBecomesEmpty();
|
||||
if (interpreterGroup != null) {
|
||||
interpreterGroup.close();
|
||||
interpreterGroup.destroy();
|
||||
|
|
@ -249,6 +250,7 @@ public class RemoteInterpreterServer
|
|||
// see NoteInterpreterLoader.SHARED_SESSION
|
||||
if (appInfo.noteId.equals(noteId) || noteId.equals("shared_session")) {
|
||||
try {
|
||||
logger.info("Unload App {} ", appInfo.pkg.getName());
|
||||
appInfo.app.unload();
|
||||
// see ApplicationState.Status.UNLOADED
|
||||
eventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED");
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
|
||||
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
|
||||
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
|
||||
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
|
||||
|
||||
|
|
|
|||
|
|
@ -80,6 +80,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
|||
return OUTPUT_UPDATE;
|
||||
case 10:
|
||||
return ANGULAR_REGISTRY_PUSH;
|
||||
case 11:
|
||||
return APP_STATUS_UPDATE;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
|
||||
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7")
|
||||
public class RemoteInterpreterService {
|
||||
|
||||
public interface Iface {
|
||||
|
|
|
|||
|
|
@ -48,7 +48,8 @@ enum RemoteInterpreterEventType {
|
|||
RESOURCE_GET = 7
|
||||
OUTPUT_APPEND = 8,
|
||||
OUTPUT_UPDATE = 9,
|
||||
ANGULAR_REGISTRY_PUSH=10
|
||||
ANGULAR_REGISTRY_PUSH = 10,
|
||||
APP_STATUS_UPDATE = 11,
|
||||
}
|
||||
|
||||
struct RemoteInterpreterEvent {
|
||||
|
|
|
|||
|
|
@ -36,8 +36,6 @@ import java.util.concurrent.ExecutorService;
|
|||
|
||||
/**
|
||||
* HeliumApplicationFactory
|
||||
*
|
||||
* TODO(moon): unload apps on interpreter restart
|
||||
*/
|
||||
public class HeliumApplicationFactory implements ApplicationEventListener, NotebookEventListener {
|
||||
private final Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
|
||||
|
|
@ -376,6 +374,11 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
|
||||
@Override
|
||||
public void onStatusChange(String noteId, String paragraphId, String appId, String status) {
|
||||
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
|
||||
if (appToUpdate != null) {
|
||||
appToUpdate.setStatus(ApplicationState.Status.valueOf(status));
|
||||
}
|
||||
|
||||
if (applicationEventListener != null) {
|
||||
applicationEventListener.onStatusChange(noteId, paragraphId, appId, status);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
|
||||
/**
|
||||
* Current state of application
|
||||
|
|
|
|||
|
|
@ -76,7 +76,6 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
|
||||
|
||||
heliumAppFactory = new HeliumApplicationFactory();
|
||||
|
||||
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
|
||||
factory = new InterpreterFactory(conf,
|
||||
new InterpreterOption(true), null, null, heliumAppFactory, depResolver);
|
||||
|
|
@ -257,7 +256,12 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
Thread.yield();
|
||||
}
|
||||
|
||||
// when unbind interpreter
|
||||
// wait until application is executed
|
||||
while (!"Hello world 1".equals(app.getOutput())) {
|
||||
Thread.yield();
|
||||
}
|
||||
|
||||
// when restart interpreter
|
||||
factory.restart(factory.getDefaultInterpreterSettingList().get(0));
|
||||
while (app.getStatus() == ApplicationState.Status.LOADED) {
|
||||
Thread.yield();
|
||||
|
|
|
|||
Loading…
Reference in a new issue