mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
sync -> async api
This commit is contained in:
parent
9f5c493e41
commit
4eaeea7205
6 changed files with 314 additions and 177 deletions
|
|
@ -22,5 +22,4 @@ package org.apache.zeppelin.helium;
|
|||
public interface ApplicationEventListener {
|
||||
public void onOutputAppend(String noteId, String paragraphId, String appId, String output);
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -712,7 +712,6 @@ public class RemoteInterpreterServer
|
|||
return new InterpreterOutput(new InterpreterOutputListener() {
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
logger.info("Append output ----------------" + new String(line));
|
||||
eventClient.onAppOutputAppend(noteId, paragraphId, appId, new String(line));
|
||||
}
|
||||
|
||||
|
|
@ -725,7 +724,6 @@ public class RemoteInterpreterServer
|
|||
|
||||
private ApplicationContext getApplicationContext(
|
||||
HeliumPackage packageInfo, String noteId, String paragraphId, String applicationInstanceId) {
|
||||
System.err.println("get app context ************");
|
||||
InterpreterOutput out = createAppOutput(noteId, paragraphId, applicationInstanceId);
|
||||
return new ApplicationContext(noteId, paragraphId, out);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,8 +30,12 @@ import org.apache.zeppelin.notebook.Notebook;
|
|||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import sun.reflect.annotation.ExceptionProxy;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* HeliumApplicationFactory
|
||||
|
|
@ -42,123 +46,79 @@ import java.util.*;
|
|||
* 4. example app
|
||||
* 5. dev mode
|
||||
* 6. app launcher
|
||||
* 7. offline mode. front-end table data / pivot panel access
|
||||
*/
|
||||
public class HeliumApplicationFactory implements ApplicationEventListener {
|
||||
Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
|
||||
private final ExecutorService executor;
|
||||
private final Gson gson;
|
||||
private Notebook notebook;
|
||||
private ApplicationEventListener applicationEventListener;
|
||||
|
||||
public HeliumApplicationFactory() {
|
||||
public HeliumApplicationFactory(ExecutorService executor) {
|
||||
gson = new Gson();
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
|
||||
private static String generateApplicationId(HeliumPackage pkg, Paragraph paragraph) {
|
||||
return "app_" + paragraph.getNote().getId() + "-" + paragraph.getId() + pkg.getName();
|
||||
}
|
||||
|
||||
private boolean isRemote(InterpreterGroup group) {
|
||||
return group.getAngularObjectRegistry() instanceof RemoteAngularObjectRegistry;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Load pkg
|
||||
* Load pkg and run task
|
||||
*/
|
||||
public String load(HeliumPackage pkg, Paragraph paragraph) throws ApplicationException {
|
||||
Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
|
||||
InterpreterGroup intpGroup = intp.getInterpreterGroup();
|
||||
RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess();
|
||||
if (intpProcess == null) {
|
||||
throw new ApplicationException("Target interpreter process is not running");
|
||||
}
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
String appId = generateApplicationId(pkg, paragraph);
|
||||
String pkgInfo = gson.toJson(pkg);
|
||||
|
||||
ApplicationState appState = null;
|
||||
synchronized (paragraph.apps) {
|
||||
for (ApplicationState as : paragraph.apps) {
|
||||
if (as.getName().equals(pkg.getName())) {
|
||||
appState = as;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (appState == null) {
|
||||
appState = new ApplicationState(appId, pkg.getName());
|
||||
paragraph.apps.add(appState);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (appState) {
|
||||
if (appState.getStatus() == ApplicationState.ApplicationStatus.LOADED) {
|
||||
logger.info("Application {} already loaded on paragraph {}",
|
||||
pkg.getName(), paragraph.getId());
|
||||
return appId;
|
||||
}
|
||||
appState.setStatus(ApplicationState.ApplicationStatus.LOADING);
|
||||
try {
|
||||
RemoteApplicationResult ret = client.loadApplication(
|
||||
appId,
|
||||
pkgInfo,
|
||||
paragraph.getNote().getId(),
|
||||
paragraph.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
appState.setStatus(ApplicationState.ApplicationStatus.LOADED);
|
||||
} else {
|
||||
appState.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
return appId;
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
appState.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
public String loadAndRun(HeliumPackage pkg, Paragraph paragraph) {
|
||||
ApplicationState appState = paragraph.createOrGetApplicationState(pkg);
|
||||
executor.submit(new LoadApplication(appState, pkg, paragraph));
|
||||
return appState.getId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Unload pkg
|
||||
* Load application and run in the remote process
|
||||
*/
|
||||
public void unload(Paragraph paragraph, String appName) throws ApplicationException {
|
||||
private class LoadApplication implements Runnable {
|
||||
private final HeliumPackage pkg;
|
||||
private final Paragraph paragraph;
|
||||
private final ApplicationState appState;
|
||||
|
||||
ApplicationState appsToUnload = null;
|
||||
public LoadApplication(ApplicationState appState, HeliumPackage pkg, Paragraph paragraph) {
|
||||
this.appState = appState;
|
||||
this.pkg = pkg;
|
||||
this.paragraph = paragraph;
|
||||
}
|
||||
|
||||
synchronized (paragraph.apps) {
|
||||
for (ApplicationState as : paragraph.apps) {
|
||||
if (as.getName().equals(appName)) {
|
||||
appsToUnload = as;
|
||||
break;
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// get interpreter process
|
||||
Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
|
||||
InterpreterGroup intpGroup = intp.getInterpreterGroup();
|
||||
RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess();
|
||||
if (intpProcess == null) {
|
||||
throw new ApplicationException("Target interpreter process is not running");
|
||||
}
|
||||
|
||||
// load application
|
||||
load(intpProcess, appState);
|
||||
|
||||
// run application
|
||||
RunApplication runTask = new RunApplication(paragraph, appState.getId());
|
||||
runTask.run();
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
|
||||
if (appState != null) {
|
||||
appState.setStatus(ApplicationState.Status.ERROR);
|
||||
appState.setOutput(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (appsToUnload == null) {
|
||||
logger.warn("Can not find {} to unload from {}", appName, paragraph.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (appsToUnload) {
|
||||
if (appsToUnload.getStatus() != ApplicationState.ApplicationStatus.LOADED) {
|
||||
throw new ApplicationException(
|
||||
"Can't unload application status " + appsToUnload.getStatus());
|
||||
}
|
||||
appsToUnload.setStatus(ApplicationState.ApplicationStatus.UNLOADING);
|
||||
RemoteInterpreterProcess intpProcess =
|
||||
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
private void load(RemoteInterpreterProcess intpProcess, ApplicationState appState)
|
||||
throws Exception {
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
|
|
@ -167,76 +127,206 @@ public class HeliumApplicationFactory implements ApplicationEventListener {
|
|||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
appsToUnload.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
|
||||
} else {
|
||||
appsToUnload.setStatus(ApplicationState.ApplicationStatus.LOADED);
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
synchronized (appState) {
|
||||
if (appState.getStatus() == ApplicationState.Status.LOADED) {
|
||||
// already loaded
|
||||
return;
|
||||
}
|
||||
appState.setStatus(ApplicationState.Status.LOADING);
|
||||
try {
|
||||
String pkgInfo = gson.toJson(pkg);
|
||||
String appId = appState.getId();
|
||||
|
||||
RemoteApplicationResult ret = client.loadApplication(
|
||||
appId,
|
||||
pkgInfo,
|
||||
paragraph.getNote().getId(),
|
||||
paragraph.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
appState.setStatus(ApplicationState.Status.LOADED);
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw e;
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
appsToUnload.setStatus(ApplicationState.ApplicationStatus.LOADED);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* run pkg
|
||||
* Get ApplicationState
|
||||
* @param paragraph
|
||||
* @param appId
|
||||
* @return
|
||||
*/
|
||||
public void run(Paragraph paragraph, String appName) throws ApplicationException {
|
||||
ApplicationState app = null;
|
||||
synchronized (paragraph.apps) {
|
||||
for (ApplicationState as : paragraph.apps) {
|
||||
if (as.getName().equals(appName)) {
|
||||
app = as;
|
||||
break;
|
||||
}
|
||||
}
|
||||
public ApplicationState get(Paragraph paragraph, String appId) {
|
||||
return paragraph.getApplicationState(appId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unload application
|
||||
* It does not remove ApplicationState
|
||||
*
|
||||
* @param paragraph
|
||||
* @param appId
|
||||
*/
|
||||
public void unload(Paragraph paragraph, String appId) {
|
||||
executor.execute(new UnloadApplication(paragraph, appId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Unload application task
|
||||
*/
|
||||
private class UnloadApplication implements Runnable {
|
||||
private final Paragraph paragraph;
|
||||
private final String appId;
|
||||
|
||||
public UnloadApplication(Paragraph paragraph, String appId) {
|
||||
this.paragraph = paragraph;
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
if (app == null) {
|
||||
logger.warn("Can not find app {} from {}", appName, paragraph.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (app) {
|
||||
if (app.getStatus() != ApplicationState.ApplicationStatus.LOADED) {
|
||||
throw new ApplicationException(
|
||||
"Can't run application status " + app.getStatus());
|
||||
}
|
||||
RemoteInterpreterProcess intpProcess =
|
||||
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
@Override
|
||||
public void run() {
|
||||
ApplicationState appState = null;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
appState = paragraph.getApplicationState(appId);
|
||||
|
||||
if (appState == null) {
|
||||
logger.warn("Can not find {} to unload from {}", appId, paragraph.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
unload(appState);
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.runApplication(app.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
// success
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
logger.error(e.getMessage(), e);
|
||||
if (appState != null) {
|
||||
appState.setStatus(ApplicationState.Status.ERROR);
|
||||
appState.setOutput(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void unload(ApplicationState appsToUnload) throws ApplicationException {
|
||||
synchronized (appsToUnload) {
|
||||
if (appsToUnload.getStatus() != ApplicationState.Status.LOADED) {
|
||||
throw new ApplicationException(
|
||||
"Can't unload application status " + appsToUnload.getStatus());
|
||||
}
|
||||
appsToUnload.setStatus(ApplicationState.Status.UNLOADING);
|
||||
RemoteInterpreterProcess intpProcess =
|
||||
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
appsToUnload.setStatus(ApplicationState.Status.UNLOADED);
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run application
|
||||
* It does not remove ApplicationState
|
||||
*
|
||||
* @param paragraph
|
||||
* @param appId
|
||||
*/
|
||||
public void run(Paragraph paragraph, String appId) {
|
||||
executor.execute(new RunApplication(paragraph, appId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Run application task
|
||||
*/
|
||||
private class RunApplication implements Runnable {
|
||||
private final Paragraph paragraph;
|
||||
private final String appId;
|
||||
|
||||
public RunApplication(Paragraph paragraph, String appId) {
|
||||
this.paragraph = paragraph;
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
ApplicationState appState = null;
|
||||
try {
|
||||
appState = paragraph.getApplicationState(appId);
|
||||
|
||||
if (appState == null) {
|
||||
logger.warn("Can not find {} to unload from {}", appId, paragraph.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
run(appState);
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
if (appState != null) {
|
||||
appState.setStatus(ApplicationState.Status.ERROR);
|
||||
appState.setOutput(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void run(ApplicationState app) throws ApplicationException {
|
||||
synchronized (app) {
|
||||
if (app.getStatus() != ApplicationState.Status.LOADED) {
|
||||
throw new ApplicationException(
|
||||
"Can't run application status " + app.getStatus());
|
||||
}
|
||||
RemoteInterpreterProcess intpProcess =
|
||||
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.runApplication(app.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
// success
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void unloadAllInTheInterpreterProcess(RemoteInterpreterProcess process) {
|
||||
// TODO
|
||||
}
|
||||
|
|
@ -296,15 +386,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener {
|
|||
return null;
|
||||
}
|
||||
|
||||
ApplicationState appFound = null;
|
||||
synchronized (paragraph.apps) {
|
||||
for (ApplicationState app : paragraph.apps) {
|
||||
if (app.getId().equals(appId)) {
|
||||
appFound = app;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
ApplicationState appFound = paragraph.getApplicationState(appId);
|
||||
|
||||
return appFound;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,38 +16,45 @@
|
|||
*/
|
||||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import org.apache.zeppelin.helium.Application;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
|
||||
/**
|
||||
* Running ApplicationState
|
||||
* Current state of application
|
||||
*/
|
||||
public class ApplicationState {
|
||||
|
||||
/**
|
||||
* Status of Application
|
||||
*/
|
||||
public static enum ApplicationStatus {
|
||||
public static enum Status {
|
||||
LOADING,
|
||||
LOADED,
|
||||
UNLOADING,
|
||||
UNLOADED
|
||||
UNLOADED,
|
||||
ERROR
|
||||
};
|
||||
|
||||
Status status = Status.UNLOADED;
|
||||
|
||||
String id; // unique id for this instance similar to note id or paragraph id
|
||||
String id; // unique id for this instance. Similar to note id or paragraph id
|
||||
String name; // name of app
|
||||
ApplicationStatus status;
|
||||
String output;
|
||||
|
||||
public ApplicationState(String id, String name) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
status = ApplicationStatus.UNLOADED;
|
||||
}
|
||||
|
||||
/**
|
||||
* After ApplicationState is restored from NotebookRepo,
|
||||
* such as after Zeppelin daemon starts or Notebook import,
|
||||
* Application status need to be reset.
|
||||
*/
|
||||
public void resetStatus() {
|
||||
if (status != Status.ERROR) {
|
||||
status = Status.UNLOADED;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
String compareName;
|
||||
|
|
@ -71,11 +78,11 @@ public class ApplicationState {
|
|||
return id;
|
||||
}
|
||||
|
||||
public void setStatus(ApplicationStatus status) {
|
||||
public void setStatus(Status status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public ApplicationStatus getStatus() {
|
||||
public Status getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.display.Input;
|
||||
|
|
@ -53,7 +54,11 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
Date dateUpdated;
|
||||
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
|
||||
public final GUI settings; // form and parameter settings
|
||||
public final List<ApplicationState> apps = new LinkedList<ApplicationState>();
|
||||
|
||||
/**
|
||||
* Applicaiton states in this paragraph
|
||||
*/
|
||||
private final List<ApplicationState> apps = new LinkedList<ApplicationState>();
|
||||
|
||||
@VisibleForTesting
|
||||
Paragraph() {
|
||||
|
|
@ -391,4 +396,43 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
Paragraph paraClone = (Paragraph) this.clone();
|
||||
return paraClone;
|
||||
}
|
||||
|
||||
|
||||
private String getApplicationId(HeliumPackage pkg) {
|
||||
return "app_" + getNote().getId() + "-" + getId() + pkg.getName();
|
||||
}
|
||||
|
||||
public ApplicationState createOrGetApplicationState(HeliumPackage pkg) {
|
||||
synchronized (apps) {
|
||||
for (ApplicationState as : apps) {
|
||||
if (as.getName().equals(pkg.getName())) {
|
||||
return as;
|
||||
}
|
||||
}
|
||||
|
||||
String appId = getApplicationId(pkg);
|
||||
ApplicationState appState = new ApplicationState(appId, pkg.getName());
|
||||
apps.add(appState);
|
||||
return appState;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public ApplicationState getApplicationState(String appId) {
|
||||
synchronized (apps) {
|
||||
for (ApplicationState as : apps) {
|
||||
if (as.getId().equals(appId)) {
|
||||
return as;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public List<ApplicationState> getAllApplicationStates() {
|
||||
synchronized (apps) {
|
||||
return new LinkedList<ApplicationState>(apps);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
|
|||
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
|
||||
import org.apache.zeppelin.notebook.*;
|
||||
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
|
||||
import org.apache.zeppelin.scheduler.ExecutorFactory;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.apache.zeppelin.search.SearchService;
|
||||
|
|
@ -36,6 +37,8 @@ import org.junit.Test;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
|
@ -70,7 +73,9 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
|
||||
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
|
||||
heliumAppFactory = new HeliumApplicationFactory();
|
||||
|
||||
ExecutorService executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
|
||||
heliumAppFactory = new HeliumApplicationFactory(executor);
|
||||
|
||||
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
|
||||
factory = new InterpreterFactory(conf,
|
||||
|
|
@ -124,15 +129,17 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
assertEquals("repl1: job", p1.getResult().message());
|
||||
|
||||
// when
|
||||
String appId = heliumAppFactory.load(pkg1, p1);
|
||||
heliumAppFactory.run(p1, pkg1.getName());
|
||||
assertEquals(0, p1.getAllApplicationStates().size());
|
||||
String appId = heliumAppFactory.loadAndRun(pkg1, p1);
|
||||
assertEquals(1, p1.getAllApplicationStates().size());
|
||||
ApplicationState app = p1.getApplicationState(appId);
|
||||
Thread.sleep(1000); // wait for enough time
|
||||
|
||||
// then
|
||||
Thread.sleep(1000);
|
||||
assertEquals("Hello world", p1.apps.get(0).getOutput());
|
||||
assertEquals("Hello world", app.getOutput());
|
||||
|
||||
// clean
|
||||
heliumAppFactory.unload(p1, pkg1.getName());
|
||||
heliumAppFactory.unload(p1, appId);
|
||||
notebook.removeNote(note1.getId());
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue