mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Pass required resource to run() method
This commit is contained in:
parent
fade3c1f0f
commit
03be3a1205
7 changed files with 58 additions and 52 deletions
|
|
@ -19,22 +19,19 @@ package org.apache.zeppelin.helium;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Zeppelin Application base
|
||||
*/
|
||||
public abstract class Application {
|
||||
private final ResourceSet args;
|
||||
|
||||
private final ApplicationContext context;
|
||||
|
||||
public Application(ResourceSet args, ApplicationContext context) throws ApplicationException {
|
||||
this.args = args;
|
||||
public Application(ApplicationContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
public ResourceSet args() {
|
||||
return args;
|
||||
}
|
||||
|
||||
public ApplicationContext context() {
|
||||
return context;
|
||||
}
|
||||
|
|
@ -43,7 +40,8 @@ public abstract class Application {
|
|||
* This method can be invoked multiple times before unload(),
|
||||
* Either just after application selected or when paragraph re-run after application load
|
||||
*/
|
||||
public abstract void run() throws ApplicationException;
|
||||
public abstract void run(ResourceSet args)
|
||||
throws ApplicationException, IOException;
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -123,10 +123,9 @@ public class ApplicationLoader {
|
|||
ClassLoader cl = appClass.getClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(cl);
|
||||
try {
|
||||
Constructor<Application> constructor =
|
||||
appClass.getConstructor(ResourceSet.class, ApplicationContext.class);
|
||||
Constructor<Application> constructor = appClass.getConstructor(ApplicationContext.class);
|
||||
|
||||
Application app = new ClassLoaderApplication(constructor.newInstance(resources, context), cl);
|
||||
Application app = new ClassLoaderApplication(constructor.newInstance(context), cl);
|
||||
return app;
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
|
|
@ -135,7 +134,7 @@ public class ApplicationLoader {
|
|||
}
|
||||
}
|
||||
|
||||
private ResourceSet findRequiredResourceSet(
|
||||
public ResourceSet findRequiredResourceSet(
|
||||
String [][] requiredResources, String noteId, String paragraphId) {
|
||||
if (requiredResources == null || requiredResources.length == 0) {
|
||||
return new ResourceSet();
|
||||
|
|
|
|||
|
|
@ -25,18 +25,18 @@ public class ClassLoaderApplication extends Application {
|
|||
Application app;
|
||||
ClassLoader cl;
|
||||
public ClassLoaderApplication(Application app, ClassLoader cl) throws ApplicationException {
|
||||
super(null, null);
|
||||
super(app.context());
|
||||
this.app = app;
|
||||
this.cl = cl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws ApplicationException {
|
||||
public void run(ResourceSet args) throws ApplicationException {
|
||||
// instantiate
|
||||
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(cl);
|
||||
try {
|
||||
app.run();
|
||||
app.run(args);
|
||||
} catch (ApplicationException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
|
|
@ -69,14 +69,4 @@ public class ClassLoaderApplication extends Application {
|
|||
public Application getInnerApplication() {
|
||||
return app;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceSet args() {
|
||||
return app.args();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationContext context() {
|
||||
return app.context();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,8 +74,7 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
|
|||
logger.info("Create instance " + className);
|
||||
try {
|
||||
Class<?> appClass = ClassLoader.getSystemClassLoader().loadClass(className);
|
||||
Constructor<?> constructor = appClass.getConstructor(
|
||||
ResourceSet.class, ApplicationContext.class);
|
||||
Constructor<?> constructor = appClass.getConstructor(ApplicationContext.class);
|
||||
|
||||
// classPath will be ..../target/classes in dev mode most cases
|
||||
String classPath = appClass.getProtectionDomain().getCodeSource().getLocation().getPath();
|
||||
|
|
@ -83,7 +82,8 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
|
|||
context.out.addResourceSearchPath(classPath + "../../src/main/resources/");
|
||||
context.out.addResourceSearchPath(classPath + "../../src/test/resources/");
|
||||
|
||||
app = (Application) constructor.newInstance(resourceSet, getApplicationContext(context));
|
||||
ApplicationContext appContext = getApplicationContext(context);
|
||||
app = (Application) constructor.newInstance(appContext);
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
|
|
@ -94,8 +94,8 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
|
|||
logger.info("Run " + className);
|
||||
app.context().out.clear();
|
||||
app.context().out.setType(InterpreterResult.Type.ANGULAR);
|
||||
app.run();
|
||||
} catch (ApplicationException e) {
|
||||
app.run(resourceSet);
|
||||
} catch (IOException | ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,8 +72,8 @@ public class RemoteInterpreterServer
|
|||
RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient();
|
||||
private DependencyResolver depLoader;
|
||||
|
||||
private final Map<String, Application> runningApplications =
|
||||
Collections.synchronizedMap(new HashMap<String, Application>());
|
||||
private final Map<String, RunningApplication> runningApplications =
|
||||
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
|
||||
|
||||
public RemoteInterpreterServer(int port) throws TTransportException {
|
||||
this.port = port;
|
||||
|
|
@ -776,7 +776,7 @@ public class RemoteInterpreterServer
|
|||
noteId,
|
||||
paragraphId);
|
||||
app = appLoader.load(pkgInfo, context);
|
||||
runningApplications.put(applicationInstanceId, app);
|
||||
runningApplications.put(applicationInstanceId, new RunningApplication(pkgInfo, app));
|
||||
return new RemoteApplicationResult(true, "");
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
|
|
@ -787,11 +787,11 @@ public class RemoteInterpreterServer
|
|||
@Override
|
||||
public RemoteApplicationResult unloadApplication(String applicationInstanceId)
|
||||
throws TException {
|
||||
Application app = runningApplications.remove(applicationInstanceId);
|
||||
if (app != null) {
|
||||
RunningApplication runningApplication = runningApplications.remove(applicationInstanceId);
|
||||
if (runningApplication != null) {
|
||||
try {
|
||||
logger.info("Unloading application {}", applicationInstanceId);
|
||||
app.unload();
|
||||
runningApplication.app.unload();
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new RemoteApplicationResult(false, e.getMessage());
|
||||
|
|
@ -805,20 +805,27 @@ public class RemoteInterpreterServer
|
|||
throws TException {
|
||||
logger.info("run application {}", applicationInstanceId);
|
||||
|
||||
Application app = runningApplications.get(applicationInstanceId);
|
||||
if (app == null) {
|
||||
RunningApplication runningApp = runningApplications.get(applicationInstanceId);
|
||||
if (runningApp == null) {
|
||||
logger.error("Application instance {} not exists", applicationInstanceId);
|
||||
return new RemoteApplicationResult(false, "Application instance does not exists");
|
||||
} else {
|
||||
ApplicationContext context = runningApp.app.context();
|
||||
try {
|
||||
app.context().out.clear();
|
||||
app.context().out.setType(InterpreterResult.Type.ANGULAR);
|
||||
app.run();
|
||||
String output = new String(app.context().out.toByteArray());
|
||||
logger.info("Update app output " + output);
|
||||
context.out.clear();
|
||||
context.out.setType(InterpreterResult.Type.ANGULAR);
|
||||
ResourceSet resource = appLoader.findRequiredResourceSet(
|
||||
runningApp.pkg.getResources(),
|
||||
context.getNoteId(),
|
||||
context.getParagraphId());
|
||||
for (Resource res : resource) {
|
||||
System.err.println("Resource " + res.get());
|
||||
}
|
||||
runningApp.app.run(resource);
|
||||
String output = new String(context.out.toByteArray());
|
||||
eventClient.onAppOutputUpdate(
|
||||
app.context().getNoteId(),
|
||||
app.context().getParagraphId(),
|
||||
context.getNoteId(),
|
||||
context.getParagraphId(),
|
||||
applicationInstanceId,
|
||||
output);
|
||||
return new RemoteApplicationResult(true, "");
|
||||
|
|
@ -826,5 +833,18 @@ public class RemoteInterpreterServer
|
|||
return new RemoteApplicationResult(false, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
private static class RunningApplication {
|
||||
public final Application app;
|
||||
public final HeliumPackage pkg;
|
||||
|
||||
public RunningApplication(HeliumPackage pkg, Application app) {
|
||||
this.app = app;
|
||||
this.pkg = pkg;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,14 +26,14 @@ public class MockApplication1 extends Application {
|
|||
boolean unloaded;
|
||||
int run;
|
||||
|
||||
public MockApplication1(ResourceSet args, ApplicationContext context) throws ApplicationException {
|
||||
super(args, context);
|
||||
public MockApplication1(ApplicationContext context) {
|
||||
super(context);
|
||||
unloaded = false;
|
||||
run = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
public void run(ResourceSet args) {
|
||||
run++;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,13 +23,12 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
public class HeliumTestApplication extends Application {
|
||||
AtomicInteger numRun = new AtomicInteger(0);
|
||||
public HeliumTestApplication(ResourceSet args, ApplicationContext context)
|
||||
throws ApplicationException {
|
||||
super(args, context);
|
||||
public HeliumTestApplication(ApplicationContext context) {
|
||||
super(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws ApplicationException {
|
||||
public void run(ResourceSet args) throws ApplicationException {
|
||||
try {
|
||||
context().out.clear();
|
||||
context().out.write("Hello world " + numRun.incrementAndGet());
|
||||
|
|
|
|||
Loading…
Reference in a new issue