sync -> async api

This commit is contained in:
Lee moon soo 2016-04-03 05:18:54 +09:00
parent 9f5c493e41
commit 4eaeea7205
6 changed files with 314 additions and 177 deletions

View file

@ -22,5 +22,4 @@ package org.apache.zeppelin.helium;
public interface ApplicationEventListener {
public void onOutputAppend(String noteId, String paragraphId, String appId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output);
}

View file

@ -712,7 +712,6 @@ public class RemoteInterpreterServer
return new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
logger.info("Append output ----------------" + new String(line));
eventClient.onAppOutputAppend(noteId, paragraphId, appId, new String(line));
}
@ -725,7 +724,6 @@ public class RemoteInterpreterServer
private ApplicationContext getApplicationContext(
HeliumPackage packageInfo, String noteId, String paragraphId, String applicationInstanceId) {
System.err.println("get app context ************");
InterpreterOutput out = createAppOutput(noteId, paragraphId, applicationInstanceId);
return new ApplicationContext(noteId, paragraphId, out);
}

View file

@ -30,8 +30,12 @@ import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.annotation.ExceptionProxy;
import java.util.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* HeliumApplicationFactory
@ -42,123 +46,79 @@ import java.util.*;
* 4. example app
* 5. dev mode
* 6. app launcher
* 7. offline mode. front-end table data / pivot panel access
*/
public class HeliumApplicationFactory implements ApplicationEventListener {
Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
private final Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
private final ExecutorService executor;
private final Gson gson;
private Notebook notebook;
private ApplicationEventListener applicationEventListener;
public HeliumApplicationFactory() {
public HeliumApplicationFactory(ExecutorService executor) {
gson = new Gson();
this.executor = executor;
}
private static String generateApplicationId(HeliumPackage pkg, Paragraph paragraph) {
return "app_" + paragraph.getNote().getId() + "-" + paragraph.getId() + pkg.getName();
}
private boolean isRemote(InterpreterGroup group) {
return group.getAngularObjectRegistry() instanceof RemoteAngularObjectRegistry;
}
/**
* Load pkg
* Load pkg and run task
*/
public String load(HeliumPackage pkg, Paragraph paragraph) throws ApplicationException {
Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
InterpreterGroup intpGroup = intp.getInterpreterGroup();
RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess();
if (intpProcess == null) {
throw new ApplicationException("Target interpreter process is not running");
}
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
String appId = generateApplicationId(pkg, paragraph);
String pkgInfo = gson.toJson(pkg);
ApplicationState appState = null;
synchronized (paragraph.apps) {
for (ApplicationState as : paragraph.apps) {
if (as.getName().equals(pkg.getName())) {
appState = as;
break;
}
}
if (appState == null) {
appState = new ApplicationState(appId, pkg.getName());
paragraph.apps.add(appState);
}
}
synchronized (appState) {
if (appState.getStatus() == ApplicationState.ApplicationStatus.LOADED) {
logger.info("Application {} already loaded on paragraph {}",
pkg.getName(), paragraph.getId());
return appId;
}
appState.setStatus(ApplicationState.ApplicationStatus.LOADING);
try {
RemoteApplicationResult ret = client.loadApplication(
appId,
pkgInfo,
paragraph.getNote().getId(),
paragraph.getId());
if (ret.isSuccess()) {
appState.setStatus(ApplicationState.ApplicationStatus.LOADED);
} else {
appState.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
throw new ApplicationException(ret.getMsg());
}
return appId;
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
appState.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
public String loadAndRun(HeliumPackage pkg, Paragraph paragraph) {
ApplicationState appState = paragraph.createOrGetApplicationState(pkg);
executor.submit(new LoadApplication(appState, pkg, paragraph));
return appState.getId();
}
/**
* Unload pkg
* Load application and run in the remote process
*/
public void unload(Paragraph paragraph, String appName) throws ApplicationException {
private class LoadApplication implements Runnable {
private final HeliumPackage pkg;
private final Paragraph paragraph;
private final ApplicationState appState;
ApplicationState appsToUnload = null;
public LoadApplication(ApplicationState appState, HeliumPackage pkg, Paragraph paragraph) {
this.appState = appState;
this.pkg = pkg;
this.paragraph = paragraph;
}
synchronized (paragraph.apps) {
for (ApplicationState as : paragraph.apps) {
if (as.getName().equals(appName)) {
appsToUnload = as;
break;
@Override
public void run() {
try {
// get interpreter process
Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
InterpreterGroup intpGroup = intp.getInterpreterGroup();
RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess();
if (intpProcess == null) {
throw new ApplicationException("Target interpreter process is not running");
}
// load application
load(intpProcess, appState);
// run application
RunApplication runTask = new RunApplication(paragraph, appState.getId());
runTask.run();
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (appState != null) {
appState.setStatus(ApplicationState.Status.ERROR);
appState.setOutput(e.getMessage());
}
}
}
if (appsToUnload == null) {
logger.warn("Can not find {} to unload from {}", appName, paragraph.getId());
return;
}
synchronized (appsToUnload) {
if (appsToUnload.getStatus() != ApplicationState.ApplicationStatus.LOADED) {
throw new ApplicationException(
"Can't unload application status " + appsToUnload.getStatus());
}
appsToUnload.setStatus(ApplicationState.ApplicationStatus.UNLOADING);
RemoteInterpreterProcess intpProcess =
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
private void load(RemoteInterpreterProcess intpProcess, ApplicationState appState)
throws Exception {
RemoteInterpreterService.Client client;
try {
@ -167,76 +127,206 @@ public class HeliumApplicationFactory implements ApplicationEventListener {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
if (ret.isSuccess()) {
appsToUnload.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
} else {
appsToUnload.setStatus(ApplicationState.ApplicationStatus.LOADED);
throw new ApplicationException(ret.getMsg());
synchronized (appState) {
if (appState.getStatus() == ApplicationState.Status.LOADED) {
// already loaded
return;
}
appState.setStatus(ApplicationState.Status.LOADING);
try {
String pkgInfo = gson.toJson(pkg);
String appId = appState.getId();
RemoteApplicationResult ret = client.loadApplication(
appId,
pkgInfo,
paragraph.getNote().getId(),
paragraph.getId());
if (ret.isSuccess()) {
appState.setStatus(ApplicationState.Status.LOADED);
} else {
throw new ApplicationException(ret.getMsg());
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw e;
} finally {
intpProcess.releaseClient(client);
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
appsToUnload.setStatus(ApplicationState.ApplicationStatus.LOADED);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
}
/**
* run pkg
* Get ApplicationState
* @param paragraph
* @param appId
* @return
*/
public void run(Paragraph paragraph, String appName) throws ApplicationException {
ApplicationState app = null;
synchronized (paragraph.apps) {
for (ApplicationState as : paragraph.apps) {
if (as.getName().equals(appName)) {
app = as;
break;
}
}
public ApplicationState get(Paragraph paragraph, String appId) {
return paragraph.getApplicationState(appId);
}
/**
* Unload application
* It does not remove ApplicationState
*
* @param paragraph
* @param appId
*/
public void unload(Paragraph paragraph, String appId) {
executor.execute(new UnloadApplication(paragraph, appId));
}
/**
* Unload application task
*/
private class UnloadApplication implements Runnable {
private final Paragraph paragraph;
private final String appId;
public UnloadApplication(Paragraph paragraph, String appId) {
this.paragraph = paragraph;
this.appId = appId;
}
if (app == null) {
logger.warn("Can not find app {} from {}", appName, paragraph.getId());
return;
}
synchronized (app) {
if (app.getStatus() != ApplicationState.ApplicationStatus.LOADED) {
throw new ApplicationException(
"Can't run application status " + app.getStatus());
}
RemoteInterpreterProcess intpProcess =
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
RemoteInterpreterService.Client client;
@Override
public void run() {
ApplicationState appState = null;
try {
client = intpProcess.getClient();
appState = paragraph.getApplicationState(appId);
if (appState == null) {
logger.warn("Can not find {} to unload from {}", appId, paragraph.getId());
return;
}
unload(appState);
} catch (Exception e) {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.runApplication(app.getId());
if (ret.isSuccess()) {
// success
} else {
throw new ApplicationException(ret.getMsg());
logger.error(e.getMessage(), e);
if (appState != null) {
appState.setStatus(ApplicationState.Status.ERROR);
appState.setOutput(e.getMessage());
}
}
}
private void unload(ApplicationState appsToUnload) throws ApplicationException {
synchronized (appsToUnload) {
if (appsToUnload.getStatus() != ApplicationState.Status.LOADED) {
throw new ApplicationException(
"Can't unload application status " + appsToUnload.getStatus());
}
appsToUnload.setStatus(ApplicationState.Status.UNLOADING);
RemoteInterpreterProcess intpProcess =
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
if (ret.isSuccess()) {
appsToUnload.setStatus(ApplicationState.Status.UNLOADED);
} else {
throw new ApplicationException(ret.getMsg());
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
}
/**
* Run application
* It does not remove ApplicationState
*
* @param paragraph
* @param appId
*/
public void run(Paragraph paragraph, String appId) {
executor.execute(new RunApplication(paragraph, appId));
}
/**
* Run application task
*/
private class RunApplication implements Runnable {
private final Paragraph paragraph;
private final String appId;
public RunApplication(Paragraph paragraph, String appId) {
this.paragraph = paragraph;
this.appId = appId;
}
@Override
public void run() {
ApplicationState appState = null;
try {
appState = paragraph.getApplicationState(appId);
if (appState == null) {
logger.warn("Can not find {} to unload from {}", appId, paragraph.getId());
return;
}
run(appState);
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (appState != null) {
appState.setStatus(ApplicationState.Status.ERROR);
appState.setOutput(e.getMessage());
}
}
}
private void run(ApplicationState app) throws ApplicationException {
synchronized (app) {
if (app.getStatus() != ApplicationState.Status.LOADED) {
throw new ApplicationException(
"Can't run application status " + app.getStatus());
}
RemoteInterpreterProcess intpProcess =
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.runApplication(app.getId());
if (ret.isSuccess()) {
// success
} else {
throw new ApplicationException(ret.getMsg());
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
}
}
public void unloadAllInTheInterpreterProcess(RemoteInterpreterProcess process) {
// TODO
}
@ -296,15 +386,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener {
return null;
}
ApplicationState appFound = null;
synchronized (paragraph.apps) {
for (ApplicationState app : paragraph.apps) {
if (app.getId().equals(appId)) {
appFound = app;
break;
}
}
}
ApplicationState appFound = paragraph.getApplicationState(appId);
return appFound;
}

View file

@ -16,38 +16,45 @@
*/
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.helium.Application;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
/**
* Running ApplicationState
* Current state of application
*/
public class ApplicationState {
/**
* Status of Application
*/
public static enum ApplicationStatus {
public static enum Status {
LOADING,
LOADED,
UNLOADING,
UNLOADED
UNLOADED,
ERROR
};
Status status = Status.UNLOADED;
String id; // unique id for this instance similar to note id or paragraph id
String id; // unique id for this instance. Similar to note id or paragraph id
String name; // name of app
ApplicationStatus status;
String output;
public ApplicationState(String id, String name) {
this.id = id;
this.name = name;
status = ApplicationStatus.UNLOADED;
}
/**
* After ApplicationState is restored from NotebookRepo,
* such as after Zeppelin daemon starts or Notebook import,
* Application status need to be reset.
*/
public void resetStatus() {
if (status != Status.ERROR) {
status = Status.UNLOADED;
}
}
@Override
public boolean equals(Object o) {
String compareName;
@ -71,11 +78,11 @@ public class ApplicationState {
return id;
}
public void setStatus(ApplicationStatus status) {
public void setStatus(Status status) {
this.status = status;
}
public ApplicationStatus getStatus() {
public Status getStatus() {
return status;
}

View file

@ -18,6 +18,7 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
@ -53,7 +54,11 @@ public class Paragraph extends Job implements Serializable, Cloneable {
Date dateUpdated;
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
public final GUI settings; // form and parameter settings
public final List<ApplicationState> apps = new LinkedList<ApplicationState>();
/**
* Applicaiton states in this paragraph
*/
private final List<ApplicationState> apps = new LinkedList<ApplicationState>();
@VisibleForTesting
Paragraph() {
@ -391,4 +396,43 @@ public class Paragraph extends Job implements Serializable, Cloneable {
Paragraph paraClone = (Paragraph) this.clone();
return paraClone;
}
private String getApplicationId(HeliumPackage pkg) {
return "app_" + getNote().getId() + "-" + getId() + pkg.getName();
}
public ApplicationState createOrGetApplicationState(HeliumPackage pkg) {
synchronized (apps) {
for (ApplicationState as : apps) {
if (as.getName().equals(pkg.getName())) {
return as;
}
}
String appId = getApplicationId(pkg);
ApplicationState appState = new ApplicationState(appId, pkg.getName());
apps.add(appState);
return appState;
}
}
public ApplicationState getApplicationState(String appId) {
synchronized (apps) {
for (ApplicationState as : apps) {
if (as.getId().equals(appId)) {
return as;
}
}
}
return null;
}
public List<ApplicationState> getAllApplicationStates() {
synchronized (apps) {
return new LinkedList<ApplicationState>(apps);
}
}
}

View file

@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.apache.zeppelin.scheduler.ExecutorFactory;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
@ -36,6 +37,8 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@ -70,7 +73,9 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
heliumAppFactory = new HeliumApplicationFactory();
ExecutorService executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
heliumAppFactory = new HeliumApplicationFactory(executor);
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf,
@ -124,15 +129,17 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
assertEquals("repl1: job", p1.getResult().message());
// when
String appId = heliumAppFactory.load(pkg1, p1);
heliumAppFactory.run(p1, pkg1.getName());
assertEquals(0, p1.getAllApplicationStates().size());
String appId = heliumAppFactory.loadAndRun(pkg1, p1);
assertEquals(1, p1.getAllApplicationStates().size());
ApplicationState app = p1.getApplicationState(appId);
Thread.sleep(1000); // wait for enough time
// then
Thread.sleep(1000);
assertEquals("Hello world", p1.apps.get(0).getOutput());
assertEquals("Hello world", app.getOutput());
// clean
heliumAppFactory.unload(p1, pkg1.getName());
heliumAppFactory.unload(p1, appId);
notebook.removeNote(note1.getId());
}