ApplicationLoader

This commit is contained in:
Lee moon soo 2016-03-06 18:55:17 -08:00
parent 7424af247b
commit 568ee541db
9 changed files with 716 additions and 0 deletions

View file

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.resource.ResourceSet;
/**
* Zeppelin Application base
*/
public abstract class Application {
private final ResourceSet args;
private final ApplicationContext context;
public Application(ResourceSet args, ApplicationContext context) throws ApplicationException {
this.args = args;
this.context = context;
}
public ResourceSet args() {
return args;
}
public ApplicationContext context() {
return context;
}
/**
* This method can be invoked multiple times before unload(),
* Either just after application selected or when paragraph re-run after application load
*/
public abstract void run() throws ApplicationException;
/**
* this method is invoked just before application is removed
*/
public abstract void unload() throws ApplicationException;
}

View file

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
/**
* ApplicationContext
*/
public class ApplicationContext {
private final String noteId;
private final String paragraphId;
public ApplicationContext(String noteId, String paragraphId) {
this.noteId = noteId;
this.paragraphId = paragraphId;
}
public String getNoteId() {
return noteId;
}
public String getParagraphId() {
return paragraphId;
}
}

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
/**
* Application exception
*/
public class ApplicationException extends Exception {
public ApplicationException(String s) {
super(s);
}
public ApplicationException(Exception e) {
super(e);
}
public ApplicationException() {
}
}

View file

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.resource.DistributedResourcePool;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourceSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
/**
* Load application
*/
public class ApplicationLoader {
Logger logger = LoggerFactory.getLogger(ApplicationLoader.class);
private final DependencyResolver depResolver;
private final ResourcePool resourcePool;
private final Map<HeliumPackageInfo, Class<Application>> cached;
private final Map<RunningApplication, Application> runningApplications;
public ApplicationLoader(ResourcePool resourcePool, DependencyResolver depResolver) {
this.depResolver = depResolver;
this.resourcePool = resourcePool;
cached = Collections.synchronizedMap(
new HashMap<HeliumPackageInfo, Class<Application>>());
runningApplications = new HashMap<RunningApplication, Application>();
}
/**
* Information of loaded application
*/
private static class RunningApplication {
HeliumPackageInfo packageInfo;
String noteId;
String paragraphId;
public RunningApplication(HeliumPackageInfo packageInfo, String noteId, String paragraphId) {
this.packageInfo = packageInfo;
this.noteId = noteId;
this.paragraphId = paragraphId;
}
public HeliumPackageInfo getPackageInfo() {
return packageInfo;
}
public String getNoteId() {
return noteId;
}
public String getParagraphId() {
return paragraphId;
}
@Override
public int hashCode() {
return (paragraphId + noteId + packageInfo.getArtifact() + packageInfo.getClassName())
.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof RunningApplication)) {
return false;
}
RunningApplication r = (RunningApplication) o;
return packageInfo.equals(r.getPackageInfo()) && paragraphId.equals(r.getParagraphId()) &&
noteId.equals(r.getNoteId());
}
}
/**
*
* Instantiate application
*
* @param packageInfo
* @param context
* @return
* @throws Exception
*/
public Application load(HeliumPackageInfo packageInfo, ApplicationContext context)
throws Exception {
if (packageInfo.getType() != HeliumPackageInfo.Type.APPLICATION) {
throw new ApplicationException(
"Can't instantiate " + packageInfo.getType() + " package using ApplicationLoader");
}
// check if already loaded
RunningApplication key =
new RunningApplication(packageInfo, context.getNoteId(), context.getParagraphId());
synchronized (runningApplications) {
if (runningApplications.containsKey(key)) {
return runningApplications.get(key);
}
}
// get resource required by this package
ResourceSet resources = findRequiredResourceSet(packageInfo.getResources(),
context.getNoteId(), context.getParagraphId());
// load class
Class<Application> appClass = loadClass(packageInfo);
// instantiate
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
ClassLoader cl = appClass.getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
Constructor<Application> constructor =
appClass.getConstructor(ResourceSet.class, ApplicationContext.class);
synchronized (runningApplications) {
if (!runningApplications.containsKey(key)) {
logger.info("Load {} {} from note {} paragraph {}",
packageInfo.getArtifact(),
packageInfo.getClassName(),
context.getNoteId(),
context.getParagraphId());
Application app = new ClassLoaderApplication(
constructor.newInstance(resources, context),
cl);
runningApplications.put(key, app);
return app;
} else {
return runningApplications.get(key);
}
}
} catch (Exception e) {
throw new ApplicationException(e);
} finally {
Thread.currentThread().setContextClassLoader(oldcl);
}
}
public void unload(HeliumPackageInfo packageInfo, ApplicationContext context)
throws ApplicationException {
Application appToUnload = null;
synchronized (runningApplications) {
RunningApplication key
= new RunningApplication(packageInfo, context.getNoteId(), context.getParagraphId());
if (runningApplications.containsKey(key)) {
appToUnload = runningApplications.remove(key);
}
}
if (appToUnload != null) {
logger.info("Unload {} {} from note {} paragraph {}",
packageInfo.getArtifact(),
packageInfo.getClassName(),
context.getNoteId(),
context.getParagraphId());
appToUnload.unload();
}
}
public Application get(HeliumPackageInfo packageInfo, ApplicationContext context) {
synchronized (runningApplications) {
RunningApplication key
= new RunningApplication(packageInfo, context.getNoteId(), context.getParagraphId());
return runningApplications.get(key);
}
}
private ResourceSet findRequiredResourceSet(
String [][] requiredResources, String noteId, String paragraphId)
throws ApplicationException {
if (requiredResources == null || requiredResources.length == 0) {
return new ResourceSet();
}
String localResourcePoolId = resourcePool.id();
ResourceSet args = new ResourceSet();
ResourceSet allResources;
if (resourcePool instanceof DistributedResourcePool) {
allResources = ((DistributedResourcePool) resourcePool).getAll(false);
} else {
allResources = resourcePool.getAll();
}
allResources = allResources.filterByNoteId(noteId).filterByParagraphId(paragraphId);
for (String [] requires : requiredResources) {
args.clear();
for (String require : requires) {
boolean found = false;
for (Resource r : allResources) {
if (r.getClassName().equals(require)) {
args.add(r);
found = true;
break;
}
}
if (found == false) {
break;
}
}
if (args.size() == requires.length) {
return args;
}
}
throw new ApplicationException("Can not find available resources");
}
private Class<Application> loadClass(HeliumPackageInfo packageInfo) throws Exception {
if (cached.containsKey(packageInfo)) {
return cached.get(packageInfo);
}
// Create Application classloader
List<URL> urlList = new LinkedList<URL>();
// load artifact
if (packageInfo.getArtifact() != null) {
List<File> paths = depResolver.load(packageInfo.getArtifact());
if (paths != null) {
for (File path : paths) {
urlList.add(path.toURI().toURL());
}
}
}
URLClassLoader applicationClassLoader =
new URLClassLoader(
urlList.toArray(new URL[]{}),
Thread.currentThread().getContextClassLoader());
Class<Application> cls =
(Class<Application>) applicationClassLoader.loadClass(packageInfo.getClassName());
cached.put(packageInfo, cls);
return cls;
}
}

View file

@ -0,0 +1,56 @@
package org.apache.zeppelin.helium;
import org.apache.zeppelin.resource.ResourceSet;
/**
* Application wrapper
*/
public class ClassLoaderApplication extends Application {
Application app;
ClassLoader cl;
public ClassLoaderApplication(Application app, ClassLoader cl) throws ApplicationException {
super(null, null);
this.app = app;
this.cl = cl;
}
@Override
public void run() throws ApplicationException {
// instantiate
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
app.run();
} catch (ApplicationException e) {
throw e;
} catch (Exception e) {
throw new ApplicationException(e);
} finally {
Thread.currentThread().setContextClassLoader(oldcl);
}
}
@Override
public void unload() throws ApplicationException {
// instantiate
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
app.unload();
} catch (ApplicationException e) {
throw e;
} catch (Exception e) {
throw new ApplicationException(e);
} finally {
Thread.currentThread().setContextClassLoader(oldcl);
}
}
public ClassLoader getClassLoader() {
return cl;
}
public Application getInnerApplication() {
return app;
}
}

View file

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
/**
* Helium package information
*/
public class HeliumPackageInfo {
private Type type;
private String name; // user friendly name of this application
private String description; // description
private String artifact; // artifact name e.g) groupId:artifactId:versionId
private String className; // entry point
private String [][] resources; // resource classnames that requires
// [[ .. and .. and .. ] or [ .. and .. and ..] ..]
public static enum Type {
INTERPRETER,
NOTEBOOK_REPO,
APPLICATION
}
public HeliumPackageInfo(Type type,
String name,
String description,
String artifact,
String className,
String[][] resources) {
this.type = type;
this.name = name;
this.description = description;
this.artifact = artifact;
this.className = className;
this.resources = resources;
}
@Override
public int hashCode() {
return (type.toString() + artifact + className).hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof HeliumPackageInfo)) {
return false;
}
HeliumPackageInfo info = (HeliumPackageInfo) o;
return type == info.type && artifact.equals(info.artifact) && className.equals(info.className);
}
public Type getType() {
return type;
}
public String getName() {
return name;
}
public String getDescription() {
return description;
}
public String getArtifact() {
return artifact;
}
public String getClassName() {
return className;
}
public String[][] getResources() {
return resources;
}
}

View file

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
import java.net.URI;
/**
* Helium registry
*/
public abstract class HeliumRegistry {
private final URI uri;
public HeliumRegistry(URI uri) {
this.uri = uri;
}
public URI uri() {
return uri;
}
}

View file

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import static org.junit.Assert.*;
public class ApplicationLoaderTest {
private File tmpDir;
@Before
public void setUp() {
tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
tmpDir.mkdirs();
}
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(tmpDir);
}
@Test
public void loadUnloadApplication() throws Exception {
LocalResourcePool resourcePool = new LocalResourcePool("pool1");
DependencyResolver dep = new DependencyResolver(tmpDir.getAbsolutePath());
ApplicationLoader appLoader = new ApplicationLoader(resourcePool, dep);
HeliumPackageInfo pkg1 = createPackageInfo(MockApplication1.class.getName(), "artifact1");
ApplicationContext context1 = createContext("note1", "paragraph1");
// app not loaded yet
assertEquals(null, appLoader.get(pkg1, context1));
// load application
MockApplication1 app = (MockApplication1) ((ClassLoaderApplication)
appLoader.load(pkg1, context1)).getInnerApplication();
// then
assertFalse(app.isUnloaded());
assertEquals(0, app.getNumRun());
assertNotNull(appLoader.get(pkg1, context1));
// unload application
appLoader.unload(pkg1, context1);
// then
assertTrue(app.isUnloaded());
assertEquals(0, app.getNumRun());
}
public HeliumPackageInfo createPackageInfo(String className, String artifact) {
HeliumPackageInfo app1 = new HeliumPackageInfo(
HeliumPackageInfo.Type.APPLICATION,
"name1",
"desc1",
artifact,
className,
new String[][]{{}});
return app1;
}
public ApplicationContext createContext(String noteId, String paragraphId) {
ApplicationContext context1 = new ApplicationContext(
noteId,
paragraphId);
return context1;
}
}

View file

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.resource.ResourceSet;
/**
* Mock application
*/
public class MockApplication1 extends Application {
boolean unloaded;
int run;
public MockApplication1(ResourceSet args, ApplicationContext context) throws ApplicationException {
super(args, context);
unloaded = false;
run = 0;
}
@Override
public void run() {
run++;
}
@Override
public void unload() {
unloaded = true;
}
public boolean isUnloaded() {
return unloaded;
}
public int getNumRun() {
return run;
}
}