Pass required resource to run() method

This commit is contained in:
Lee moon soo 2016-04-14 05:24:43 +01:00
parent fade3c1f0f
commit 03be3a1205
7 changed files with 58 additions and 52 deletions

View file

@ -19,22 +19,19 @@ package org.apache.zeppelin.helium;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.resource.ResourceSet;
import java.io.IOException;
/**
* Zeppelin Application base
*/
public abstract class Application {
private final ResourceSet args;
private final ApplicationContext context;
public Application(ResourceSet args, ApplicationContext context) throws ApplicationException {
this.args = args;
public Application(ApplicationContext context) {
this.context = context;
}
public ResourceSet args() {
return args;
}
public ApplicationContext context() {
return context;
}
@ -43,7 +40,8 @@ public abstract class Application {
* This method can be invoked multiple times before unload(),
* Either just after application selected or when paragraph re-run after application load
*/
public abstract void run() throws ApplicationException;
public abstract void run(ResourceSet args)
throws ApplicationException, IOException;
/**

View file

@ -123,10 +123,9 @@ public class ApplicationLoader {
ClassLoader cl = appClass.getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
Constructor<Application> constructor =
appClass.getConstructor(ResourceSet.class, ApplicationContext.class);
Constructor<Application> constructor = appClass.getConstructor(ApplicationContext.class);
Application app = new ClassLoaderApplication(constructor.newInstance(resources, context), cl);
Application app = new ClassLoaderApplication(constructor.newInstance(context), cl);
return app;
} catch (Exception e) {
throw new ApplicationException(e);
@ -135,7 +134,7 @@ public class ApplicationLoader {
}
}
private ResourceSet findRequiredResourceSet(
public ResourceSet findRequiredResourceSet(
String [][] requiredResources, String noteId, String paragraphId) {
if (requiredResources == null || requiredResources.length == 0) {
return new ResourceSet();

View file

@ -25,18 +25,18 @@ public class ClassLoaderApplication extends Application {
Application app;
ClassLoader cl;
public ClassLoaderApplication(Application app, ClassLoader cl) throws ApplicationException {
super(null, null);
super(app.context());
this.app = app;
this.cl = cl;
}
@Override
public void run() throws ApplicationException {
public void run(ResourceSet args) throws ApplicationException {
// instantiate
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
app.run();
app.run(args);
} catch (ApplicationException e) {
throw e;
} catch (Exception e) {
@ -69,14 +69,4 @@ public class ClassLoaderApplication extends Application {
public Application getInnerApplication() {
return app;
}
@Override
public ResourceSet args() {
return app.args();
}
@Override
public ApplicationContext context() {
return app.context();
}
}

View file

@ -74,8 +74,7 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
logger.info("Create instance " + className);
try {
Class<?> appClass = ClassLoader.getSystemClassLoader().loadClass(className);
Constructor<?> constructor = appClass.getConstructor(
ResourceSet.class, ApplicationContext.class);
Constructor<?> constructor = appClass.getConstructor(ApplicationContext.class);
// classPath will be ..../target/classes in dev mode most cases
String classPath = appClass.getProtectionDomain().getCodeSource().getLocation().getPath();
@ -83,7 +82,8 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
context.out.addResourceSearchPath(classPath + "../../src/main/resources/");
context.out.addResourceSearchPath(classPath + "../../src/test/resources/");
app = (Application) constructor.newInstance(resourceSet, getApplicationContext(context));
ApplicationContext appContext = getApplicationContext(context);
app = (Application) constructor.newInstance(appContext);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new InterpreterResult(Code.ERROR, e.getMessage());
@ -94,8 +94,8 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
logger.info("Run " + className);
app.context().out.clear();
app.context().out.setType(InterpreterResult.Type.ANGULAR);
app.run();
} catch (ApplicationException e) {
app.run(resourceSet);
} catch (IOException | ApplicationException e) {
logger.error(e.getMessage(), e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}

View file

@ -72,8 +72,8 @@ public class RemoteInterpreterServer
RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient();
private DependencyResolver depLoader;
private final Map<String, Application> runningApplications =
Collections.synchronizedMap(new HashMap<String, Application>());
private final Map<String, RunningApplication> runningApplications =
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
@ -776,7 +776,7 @@ public class RemoteInterpreterServer
noteId,
paragraphId);
app = appLoader.load(pkgInfo, context);
runningApplications.put(applicationInstanceId, app);
runningApplications.put(applicationInstanceId, new RunningApplication(pkgInfo, app));
return new RemoteApplicationResult(true, "");
} catch (Exception e) {
logger.error(e.getMessage(), e);
@ -787,11 +787,11 @@ public class RemoteInterpreterServer
@Override
public RemoteApplicationResult unloadApplication(String applicationInstanceId)
throws TException {
Application app = runningApplications.remove(applicationInstanceId);
if (app != null) {
RunningApplication runningApplication = runningApplications.remove(applicationInstanceId);
if (runningApplication != null) {
try {
logger.info("Unloading application {}", applicationInstanceId);
app.unload();
runningApplication.app.unload();
} catch (ApplicationException e) {
logger.error(e.getMessage(), e);
return new RemoteApplicationResult(false, e.getMessage());
@ -805,20 +805,27 @@ public class RemoteInterpreterServer
throws TException {
logger.info("run application {}", applicationInstanceId);
Application app = runningApplications.get(applicationInstanceId);
if (app == null) {
RunningApplication runningApp = runningApplications.get(applicationInstanceId);
if (runningApp == null) {
logger.error("Application instance {} not exists", applicationInstanceId);
return new RemoteApplicationResult(false, "Application instance does not exists");
} else {
ApplicationContext context = runningApp.app.context();
try {
app.context().out.clear();
app.context().out.setType(InterpreterResult.Type.ANGULAR);
app.run();
String output = new String(app.context().out.toByteArray());
logger.info("Update app output " + output);
context.out.clear();
context.out.setType(InterpreterResult.Type.ANGULAR);
ResourceSet resource = appLoader.findRequiredResourceSet(
runningApp.pkg.getResources(),
context.getNoteId(),
context.getParagraphId());
for (Resource res : resource) {
System.err.println("Resource " + res.get());
}
runningApp.app.run(resource);
String output = new String(context.out.toByteArray());
eventClient.onAppOutputUpdate(
app.context().getNoteId(),
app.context().getParagraphId(),
context.getNoteId(),
context.getParagraphId(),
applicationInstanceId,
output);
return new RemoteApplicationResult(true, "");
@ -826,5 +833,18 @@ public class RemoteInterpreterServer
return new RemoteApplicationResult(false, e.getMessage());
}
}
}
private static class RunningApplication {
public final Application app;
public final HeliumPackage pkg;
public RunningApplication(HeliumPackage pkg, Application app) {
this.app = app;
this.pkg = pkg;
}
};
}

View file

@ -26,14 +26,14 @@ public class MockApplication1 extends Application {
boolean unloaded;
int run;
public MockApplication1(ResourceSet args, ApplicationContext context) throws ApplicationException {
super(args, context);
public MockApplication1(ApplicationContext context) {
super(context);
unloaded = false;
run = 0;
}
@Override
public void run() {
public void run(ResourceSet args) {
run++;
}

View file

@ -23,13 +23,12 @@ import java.util.concurrent.atomic.AtomicInteger;
public class HeliumTestApplication extends Application {
AtomicInteger numRun = new AtomicInteger(0);
public HeliumTestApplication(ResourceSet args, ApplicationContext context)
throws ApplicationException {
super(args, context);
public HeliumTestApplication(ApplicationContext context) {
super(context);
}
@Override
public void run() throws ApplicationException {
public void run(ResourceSet args) throws ApplicationException {
try {
context().out.clear();
context().out.write("Hello world " + numRun.incrementAndGet());