mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
HeliumRegistry
This commit is contained in:
parent
568ee541db
commit
b891b98ee5
19 changed files with 4106 additions and 1079 deletions
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.zeppelin.helium;
|
||||
|
||||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.resource.DistributedResourcePool;
|
||||
import org.apache.zeppelin.resource.Resource;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
|
@ -39,32 +38,30 @@ public class ApplicationLoader {
|
|||
|
||||
private final DependencyResolver depResolver;
|
||||
private final ResourcePool resourcePool;
|
||||
private final Map<HeliumPackageInfo, Class<Application>> cached;
|
||||
private final Map<RunningApplication, Application> runningApplications;
|
||||
private final Map<HeliumPackage, Class<Application>> cached;
|
||||
|
||||
public ApplicationLoader(ResourcePool resourcePool, DependencyResolver depResolver) {
|
||||
this.depResolver = depResolver;
|
||||
this.resourcePool = resourcePool;
|
||||
cached = Collections.synchronizedMap(
|
||||
new HashMap<HeliumPackageInfo, Class<Application>>());
|
||||
runningApplications = new HashMap<RunningApplication, Application>();
|
||||
new HashMap<HeliumPackage, Class<Application>>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Information of loaded application
|
||||
*/
|
||||
private static class RunningApplication {
|
||||
HeliumPackageInfo packageInfo;
|
||||
HeliumPackage packageInfo;
|
||||
String noteId;
|
||||
String paragraphId;
|
||||
|
||||
public RunningApplication(HeliumPackageInfo packageInfo, String noteId, String paragraphId) {
|
||||
public RunningApplication(HeliumPackage packageInfo, String noteId, String paragraphId) {
|
||||
this.packageInfo = packageInfo;
|
||||
this.noteId = noteId;
|
||||
this.paragraphId = paragraphId;
|
||||
}
|
||||
|
||||
public HeliumPackageInfo getPackageInfo() {
|
||||
public HeliumPackage getPackageInfo() {
|
||||
return packageInfo;
|
||||
}
|
||||
|
||||
|
|
@ -103,9 +100,9 @@ public class ApplicationLoader {
|
|||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
public Application load(HeliumPackageInfo packageInfo, ApplicationContext context)
|
||||
public Application load(HeliumPackage packageInfo, ApplicationContext context)
|
||||
throws Exception {
|
||||
if (packageInfo.getType() != HeliumPackageInfo.Type.APPLICATION) {
|
||||
if (packageInfo.getType() != HeliumPackage.Type.APPLICATION) {
|
||||
throw new ApplicationException(
|
||||
"Can't instantiate " + packageInfo.getType() + " package using ApplicationLoader");
|
||||
}
|
||||
|
|
@ -113,11 +110,6 @@ public class ApplicationLoader {
|
|||
// check if already loaded
|
||||
RunningApplication key =
|
||||
new RunningApplication(packageInfo, context.getNoteId(), context.getParagraphId());
|
||||
synchronized (runningApplications) {
|
||||
if (runningApplications.containsKey(key)) {
|
||||
return runningApplications.get(key);
|
||||
}
|
||||
}
|
||||
|
||||
// get resource required by this package
|
||||
ResourceSet resources = findRequiredResourceSet(packageInfo.getResources(),
|
||||
|
|
@ -134,23 +126,8 @@ public class ApplicationLoader {
|
|||
Constructor<Application> constructor =
|
||||
appClass.getConstructor(ResourceSet.class, ApplicationContext.class);
|
||||
|
||||
synchronized (runningApplications) {
|
||||
if (!runningApplications.containsKey(key)) {
|
||||
logger.info("Load {} {} from note {} paragraph {}",
|
||||
packageInfo.getArtifact(),
|
||||
packageInfo.getClassName(),
|
||||
context.getNoteId(),
|
||||
context.getParagraphId());
|
||||
|
||||
Application app = new ClassLoaderApplication(
|
||||
constructor.newInstance(resources, context),
|
||||
cl);
|
||||
runningApplications.put(key, app);
|
||||
return app;
|
||||
} else {
|
||||
return runningApplications.get(key);
|
||||
}
|
||||
}
|
||||
Application app = new ClassLoaderApplication(constructor.newInstance(resources, context), cl);
|
||||
return app;
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
|
|
@ -158,36 +135,6 @@ public class ApplicationLoader {
|
|||
}
|
||||
}
|
||||
|
||||
public void unload(HeliumPackageInfo packageInfo, ApplicationContext context)
|
||||
throws ApplicationException {
|
||||
Application appToUnload = null;
|
||||
synchronized (runningApplications) {
|
||||
RunningApplication key
|
||||
= new RunningApplication(packageInfo, context.getNoteId(), context.getParagraphId());
|
||||
|
||||
if (runningApplications.containsKey(key)) {
|
||||
appToUnload = runningApplications.remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
if (appToUnload != null) {
|
||||
logger.info("Unload {} {} from note {} paragraph {}",
|
||||
packageInfo.getArtifact(),
|
||||
packageInfo.getClassName(),
|
||||
context.getNoteId(),
|
||||
context.getParagraphId());
|
||||
appToUnload.unload();
|
||||
}
|
||||
}
|
||||
|
||||
public Application get(HeliumPackageInfo packageInfo, ApplicationContext context) {
|
||||
synchronized (runningApplications) {
|
||||
RunningApplication key
|
||||
= new RunningApplication(packageInfo, context.getNoteId(), context.getParagraphId());
|
||||
return runningApplications.get(key);
|
||||
}
|
||||
}
|
||||
|
||||
private ResourceSet findRequiredResourceSet(
|
||||
String [][] requiredResources, String noteId, String paragraphId)
|
||||
throws ApplicationException {
|
||||
|
|
@ -234,7 +181,7 @@ public class ApplicationLoader {
|
|||
}
|
||||
|
||||
|
||||
private Class<Application> loadClass(HeliumPackageInfo packageInfo) throws Exception {
|
||||
private Class<Application> loadClass(HeliumPackage packageInfo) throws Exception {
|
||||
if (cached.containsKey(packageInfo)) {
|
||||
return cached.get(packageInfo);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,9 +17,9 @@
|
|||
package org.apache.zeppelin.helium;
|
||||
|
||||
/**
|
||||
* Helium package information
|
||||
* Helium package definition
|
||||
*/
|
||||
public class HeliumPackageInfo {
|
||||
public class HeliumPackage {
|
||||
private Type type;
|
||||
private String name; // user friendly name of this application
|
||||
private String description; // description
|
||||
|
|
@ -34,12 +34,12 @@ public class HeliumPackageInfo {
|
|||
APPLICATION
|
||||
}
|
||||
|
||||
public HeliumPackageInfo(Type type,
|
||||
String name,
|
||||
String description,
|
||||
String artifact,
|
||||
String className,
|
||||
String[][] resources) {
|
||||
public HeliumPackage(Type type,
|
||||
String name,
|
||||
String description,
|
||||
String artifact,
|
||||
String className,
|
||||
String[][] resources) {
|
||||
this.type = type;
|
||||
this.name = name;
|
||||
this.description = description;
|
||||
|
|
@ -55,11 +55,11 @@ public class HeliumPackageInfo {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof HeliumPackageInfo)) {
|
||||
if (!(o instanceof HeliumPackage)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
HeliumPackageInfo info = (HeliumPackageInfo) o;
|
||||
HeliumPackage info = (HeliumPackage) o;
|
||||
return type == info.type && artifact.equals(info.artifact) && className.equals(info.className);
|
||||
}
|
||||
|
||||
|
|
@ -29,13 +29,12 @@ import org.apache.thrift.TException;
|
|||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.display.*;
|
||||
import org.apache.zeppelin.helium.*;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.interpreter.thrift.*;
|
||||
import org.apache.zeppelin.resource.*;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
|
|
@ -61,6 +60,8 @@ public class RemoteInterpreterServer
|
|||
InterpreterGroup interpreterGroup;
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
DistributedResourcePool resourcePool;
|
||||
private ApplicationLoader appLoader;
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
|
||||
|
|
@ -69,6 +70,11 @@ public class RemoteInterpreterServer
|
|||
private TThreadPoolServer server;
|
||||
|
||||
RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient();
|
||||
private DependencyResolver depLoader;
|
||||
|
||||
private final Map<String, Application> runningApplications =
|
||||
Collections.synchronizedMap(new HashMap<String, Application>());
|
||||
|
||||
|
||||
public RemoteInterpreterServer(int port) throws TTransportException {
|
||||
this.port = port;
|
||||
|
|
@ -137,14 +143,17 @@ public class RemoteInterpreterServer
|
|||
|
||||
@Override
|
||||
public void createInterpreter(String interpreterGroupId, String noteId, String
|
||||
className,
|
||||
Map<String, String> properties) throws TException {
|
||||
className, Map<String, String> properties) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
interpreterGroup = new InterpreterGroup(interpreterGroupId);
|
||||
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
|
||||
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
|
||||
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
|
||||
interpreterGroup.setResourcePool(resourcePool);
|
||||
|
||||
String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
|
||||
depLoader = new DependencyResolver(localRepoPath);
|
||||
appLoader = new ApplicationLoader(resourcePool, depLoader);
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
@ -696,4 +705,73 @@ public class RemoteInterpreterServer
|
|||
logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e);
|
||||
}
|
||||
}
|
||||
|
||||
private ApplicationContext getApplicationContext(
|
||||
HeliumPackage packageInfo, String noteId, String paragraphId) {
|
||||
return new ApplicationContext(noteId, paragraphId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteApplicationResult loadApplication(
|
||||
String applicationInstanceId, String packageInfo, String noteId, String paragraphId)
|
||||
throws TException {
|
||||
if (runningApplications.containsKey(applicationInstanceId)) {
|
||||
logger.warn("Application instance {} is already running");
|
||||
return new RemoteApplicationResult(true, "");
|
||||
}
|
||||
HeliumPackage pkgInfo = gson.fromJson(packageInfo, HeliumPackage.class);
|
||||
ApplicationContext context = getApplicationContext(pkgInfo, noteId, paragraphId);
|
||||
try {
|
||||
Application app = null;
|
||||
logger.info(
|
||||
"Loading application {}({}). artifact={}, className={} into note={}, paragraph={}",
|
||||
pkgInfo.getName(),
|
||||
applicationInstanceId,
|
||||
pkgInfo.getArtifact(),
|
||||
pkgInfo.getClassName(),
|
||||
noteId,
|
||||
paragraphId);
|
||||
app = appLoader.load(pkgInfo, context);
|
||||
runningApplications.put(applicationInstanceId, app);
|
||||
return new RemoteApplicationResult(true, "");
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new RemoteApplicationResult(false, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteApplicationResult unloadApplication(String applicationInstanceId)
|
||||
throws TException {
|
||||
Application app = runningApplications.remove(applicationInstanceId);
|
||||
if (app != null) {
|
||||
try {
|
||||
logger.info("Unloading application {}", applicationInstanceId);
|
||||
app.unload();
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new RemoteApplicationResult(false, e.getMessage());
|
||||
}
|
||||
}
|
||||
return new RemoteApplicationResult(true, "");
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteApplicationResult runApplication(String applicationInstanceId)
|
||||
throws TException {
|
||||
logger.info("run application {}", applicationInstanceId);
|
||||
|
||||
Application app = runningApplications.get(applicationInstanceId);
|
||||
if (app == null) {
|
||||
logger.error("Application instance {} not exists", applicationInstanceId);
|
||||
return new RemoteApplicationResult(false, "Application instance does not exists");
|
||||
} else {
|
||||
try {
|
||||
app.run();
|
||||
return new RemoteApplicationResult(true, "");
|
||||
} catch (ApplicationException e) {
|
||||
return new RemoteApplicationResult(false, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,518 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.2)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.thrift;
|
||||
|
||||
import org.apache.thrift.scheme.IScheme;
|
||||
import org.apache.thrift.scheme.SchemeFactory;
|
||||
import org.apache.thrift.scheme.StandardScheme;
|
||||
|
||||
import org.apache.thrift.scheme.TupleScheme;
|
||||
import org.apache.thrift.protocol.TTupleProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolException;
|
||||
import org.apache.thrift.EncodingUtils;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.async.AsyncMethodCallback;
|
||||
import org.apache.thrift.server.AbstractNonblockingServer.*;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Collections;
|
||||
import java.util.BitSet;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import javax.annotation.Generated;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
|
||||
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");
|
||||
|
||||
private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.BOOL, (short)1);
|
||||
private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)2);
|
||||
|
||||
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
|
||||
static {
|
||||
schemes.put(StandardScheme.class, new RemoteApplicationResultStandardSchemeFactory());
|
||||
schemes.put(TupleScheme.class, new RemoteApplicationResultTupleSchemeFactory());
|
||||
}
|
||||
|
||||
public boolean success; // required
|
||||
public String msg; // required
|
||||
|
||||
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
|
||||
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
|
||||
SUCCESS((short)1, "success"),
|
||||
MSG((short)2, "msg");
|
||||
|
||||
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
|
||||
|
||||
static {
|
||||
for (_Fields field : EnumSet.allOf(_Fields.class)) {
|
||||
byName.put(field.getFieldName(), field);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByThriftId(int fieldId) {
|
||||
switch(fieldId) {
|
||||
case 1: // SUCCESS
|
||||
return SUCCESS;
|
||||
case 2: // MSG
|
||||
return MSG;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, throwing an exception
|
||||
* if it is not found.
|
||||
*/
|
||||
public static _Fields findByThriftIdOrThrow(int fieldId) {
|
||||
_Fields fields = findByThriftId(fieldId);
|
||||
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
|
||||
return fields;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches name, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByName(String name) {
|
||||
return byName.get(name);
|
||||
}
|
||||
|
||||
private final short _thriftId;
|
||||
private final String _fieldName;
|
||||
|
||||
_Fields(short thriftId, String fieldName) {
|
||||
_thriftId = thriftId;
|
||||
_fieldName = fieldName;
|
||||
}
|
||||
|
||||
public short getThriftFieldId() {
|
||||
return _thriftId;
|
||||
}
|
||||
|
||||
public String getFieldName() {
|
||||
return _fieldName;
|
||||
}
|
||||
}
|
||||
|
||||
// isset id assignments
|
||||
private static final int __SUCCESS_ISSET_ID = 0;
|
||||
private byte __isset_bitfield = 0;
|
||||
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
|
||||
static {
|
||||
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
|
||||
tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
|
||||
tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
metaDataMap = Collections.unmodifiableMap(tmpMap);
|
||||
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteApplicationResult.class, metaDataMap);
|
||||
}
|
||||
|
||||
public RemoteApplicationResult() {
|
||||
}
|
||||
|
||||
public RemoteApplicationResult(
|
||||
boolean success,
|
||||
String msg)
|
||||
{
|
||||
this();
|
||||
this.success = success;
|
||||
setSuccessIsSet(true);
|
||||
this.msg = msg;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a deep copy on <i>other</i>.
|
||||
*/
|
||||
public RemoteApplicationResult(RemoteApplicationResult other) {
|
||||
__isset_bitfield = other.__isset_bitfield;
|
||||
this.success = other.success;
|
||||
if (other.isSetMsg()) {
|
||||
this.msg = other.msg;
|
||||
}
|
||||
}
|
||||
|
||||
public RemoteApplicationResult deepCopy() {
|
||||
return new RemoteApplicationResult(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
setSuccessIsSet(false);
|
||||
this.success = false;
|
||||
this.msg = null;
|
||||
}
|
||||
|
||||
public boolean isSuccess() {
|
||||
return this.success;
|
||||
}
|
||||
|
||||
public RemoteApplicationResult setSuccess(boolean success) {
|
||||
this.success = success;
|
||||
setSuccessIsSet(true);
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetSuccess() {
|
||||
__isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __SUCCESS_ISSET_ID);
|
||||
}
|
||||
|
||||
/** Returns true if field success is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetSuccess() {
|
||||
return EncodingUtils.testBit(__isset_bitfield, __SUCCESS_ISSET_ID);
|
||||
}
|
||||
|
||||
public void setSuccessIsSet(boolean value) {
|
||||
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __SUCCESS_ISSET_ID, value);
|
||||
}
|
||||
|
||||
public String getMsg() {
|
||||
return this.msg;
|
||||
}
|
||||
|
||||
public RemoteApplicationResult setMsg(String msg) {
|
||||
this.msg = msg;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetMsg() {
|
||||
this.msg = null;
|
||||
}
|
||||
|
||||
/** Returns true if field msg is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetMsg() {
|
||||
return this.msg != null;
|
||||
}
|
||||
|
||||
public void setMsgIsSet(boolean value) {
|
||||
if (!value) {
|
||||
this.msg = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setFieldValue(_Fields field, Object value) {
|
||||
switch (field) {
|
||||
case SUCCESS:
|
||||
if (value == null) {
|
||||
unsetSuccess();
|
||||
} else {
|
||||
setSuccess((Boolean)value);
|
||||
}
|
||||
break;
|
||||
|
||||
case MSG:
|
||||
if (value == null) {
|
||||
unsetMsg();
|
||||
} else {
|
||||
setMsg((String)value);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public Object getFieldValue(_Fields field) {
|
||||
switch (field) {
|
||||
case SUCCESS:
|
||||
return Boolean.valueOf(isSuccess());
|
||||
|
||||
case MSG:
|
||||
return getMsg();
|
||||
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSet(_Fields field) {
|
||||
if (field == null) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
switch (field) {
|
||||
case SUCCESS:
|
||||
return isSetSuccess();
|
||||
case MSG:
|
||||
return isSetMsg();
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
if (that instanceof RemoteApplicationResult)
|
||||
return this.equals((RemoteApplicationResult)that);
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean equals(RemoteApplicationResult that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
|
||||
boolean this_present_success = true;
|
||||
boolean that_present_success = true;
|
||||
if (this_present_success || that_present_success) {
|
||||
if (!(this_present_success && that_present_success))
|
||||
return false;
|
||||
if (this.success != that.success)
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean this_present_msg = true && this.isSetMsg();
|
||||
boolean that_present_msg = true && that.isSetMsg();
|
||||
if (this_present_msg || that_present_msg) {
|
||||
if (!(this_present_msg && that_present_msg))
|
||||
return false;
|
||||
if (!this.msg.equals(that.msg))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
List<Object> list = new ArrayList<Object>();
|
||||
|
||||
boolean present_success = true;
|
||||
list.add(present_success);
|
||||
if (present_success)
|
||||
list.add(success);
|
||||
|
||||
boolean present_msg = true && (isSetMsg());
|
||||
list.add(present_msg);
|
||||
if (present_msg)
|
||||
list.add(msg);
|
||||
|
||||
return list.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(RemoteApplicationResult other) {
|
||||
if (!getClass().equals(other.getClass())) {
|
||||
return getClass().getName().compareTo(other.getClass().getName());
|
||||
}
|
||||
|
||||
int lastComparison = 0;
|
||||
|
||||
lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetSuccess()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
lastComparison = Boolean.valueOf(isSetMsg()).compareTo(other.isSetMsg());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetMsg()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, other.msg);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public _Fields fieldForId(int fieldId) {
|
||||
return _Fields.findByThriftId(fieldId);
|
||||
}
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
|
||||
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
|
||||
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("RemoteApplicationResult(");
|
||||
boolean first = true;
|
||||
|
||||
sb.append("success:");
|
||||
sb.append(this.success);
|
||||
first = false;
|
||||
if (!first) sb.append(", ");
|
||||
sb.append("msg:");
|
||||
if (this.msg == null) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
sb.append(this.msg);
|
||||
}
|
||||
first = false;
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public void validate() throws org.apache.thrift.TException {
|
||||
// check for required fields
|
||||
// check for sub-struct validity
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
|
||||
try {
|
||||
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
|
||||
try {
|
||||
// it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
|
||||
__isset_bitfield = 0;
|
||||
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private static class RemoteApplicationResultStandardSchemeFactory implements SchemeFactory {
|
||||
public RemoteApplicationResultStandardScheme getScheme() {
|
||||
return new RemoteApplicationResultStandardScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class RemoteApplicationResultStandardScheme extends StandardScheme<RemoteApplicationResult> {
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot, RemoteApplicationResult struct) throws org.apache.thrift.TException {
|
||||
org.apache.thrift.protocol.TField schemeField;
|
||||
iprot.readStructBegin();
|
||||
while (true)
|
||||
{
|
||||
schemeField = iprot.readFieldBegin();
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
|
||||
break;
|
||||
}
|
||||
switch (schemeField.id) {
|
||||
case 1: // SUCCESS
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
|
||||
struct.success = iprot.readBool();
|
||||
struct.setSuccessIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
case 2: // MSG
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
|
||||
struct.msg = iprot.readString();
|
||||
struct.setMsgIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
iprot.readFieldEnd();
|
||||
}
|
||||
iprot.readStructEnd();
|
||||
|
||||
// check for required fields of primitive type, which can't be checked in the validate method
|
||||
struct.validate();
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot, RemoteApplicationResult struct) throws org.apache.thrift.TException {
|
||||
struct.validate();
|
||||
|
||||
oprot.writeStructBegin(STRUCT_DESC);
|
||||
oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
|
||||
oprot.writeBool(struct.success);
|
||||
oprot.writeFieldEnd();
|
||||
if (struct.msg != null) {
|
||||
oprot.writeFieldBegin(MSG_FIELD_DESC);
|
||||
oprot.writeString(struct.msg);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
oprot.writeFieldStop();
|
||||
oprot.writeStructEnd();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class RemoteApplicationResultTupleSchemeFactory implements SchemeFactory {
|
||||
public RemoteApplicationResultTupleScheme getScheme() {
|
||||
return new RemoteApplicationResultTupleScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class RemoteApplicationResultTupleScheme extends TupleScheme<RemoteApplicationResult> {
|
||||
|
||||
@Override
|
||||
public void write(org.apache.thrift.protocol.TProtocol prot, RemoteApplicationResult struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol oprot = (TTupleProtocol) prot;
|
||||
BitSet optionals = new BitSet();
|
||||
if (struct.isSetSuccess()) {
|
||||
optionals.set(0);
|
||||
}
|
||||
if (struct.isSetMsg()) {
|
||||
optionals.set(1);
|
||||
}
|
||||
oprot.writeBitSet(optionals, 2);
|
||||
if (struct.isSetSuccess()) {
|
||||
oprot.writeBool(struct.success);
|
||||
}
|
||||
if (struct.isSetMsg()) {
|
||||
oprot.writeString(struct.msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void read(org.apache.thrift.protocol.TProtocol prot, RemoteApplicationResult struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol iprot = (TTupleProtocol) prot;
|
||||
BitSet incoming = iprot.readBitSet(2);
|
||||
if (incoming.get(0)) {
|
||||
struct.success = iprot.readBool();
|
||||
struct.setSuccessIsSet(true);
|
||||
}
|
||||
if (incoming.get(1)) {
|
||||
struct.msg = iprot.readString();
|
||||
struct.setMsgIsSet(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
|
||||
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
|
||||
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-18")
|
||||
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -56,6 +56,11 @@ struct RemoteInterpreterEvent {
|
|||
2: string data // json serialized data
|
||||
}
|
||||
|
||||
struct RemoteApplicationResult {
|
||||
1: bool success,
|
||||
2: string msg
|
||||
}
|
||||
|
||||
service RemoteInterpreterService {
|
||||
void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string className, 4: map<string, string> properties);
|
||||
|
||||
|
|
@ -88,4 +93,8 @@ service RemoteInterpreterService {
|
|||
void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
|
||||
void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
|
||||
void angularRegistryPush(1: string registry);
|
||||
|
||||
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string noteId, 4: string paragraphId);
|
||||
RemoteApplicationResult unloadApplication(1: string applicationInstanceId);
|
||||
RemoteApplicationResult runApplication(1: string applicationInstanceId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ package org.apache.zeppelin.helium;
|
|||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.resource.LocalResourcePool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
|
@ -46,36 +45,33 @@ public class ApplicationLoaderTest {
|
|||
|
||||
@Test
|
||||
public void loadUnloadApplication() throws Exception {
|
||||
// given
|
||||
LocalResourcePool resourcePool = new LocalResourcePool("pool1");
|
||||
DependencyResolver dep = new DependencyResolver(tmpDir.getAbsolutePath());
|
||||
ApplicationLoader appLoader = new ApplicationLoader(resourcePool, dep);
|
||||
|
||||
HeliumPackageInfo pkg1 = createPackageInfo(MockApplication1.class.getName(), "artifact1");
|
||||
HeliumPackage pkg1 = createPackageInfo(MockApplication1.class.getName(), "artifact1");
|
||||
ApplicationContext context1 = createContext("note1", "paragraph1");
|
||||
|
||||
// app not loaded yet
|
||||
assertEquals(null, appLoader.get(pkg1, context1));
|
||||
|
||||
// load application
|
||||
// when load application
|
||||
MockApplication1 app = (MockApplication1) ((ClassLoaderApplication)
|
||||
appLoader.load(pkg1, context1)).getInnerApplication();
|
||||
|
||||
// then
|
||||
assertFalse(app.isUnloaded());
|
||||
assertEquals(0, app.getNumRun());
|
||||
assertNotNull(appLoader.get(pkg1, context1));
|
||||
|
||||
// unload application
|
||||
appLoader.unload(pkg1, context1);
|
||||
// when unload
|
||||
app.unload();
|
||||
|
||||
// then
|
||||
assertTrue(app.isUnloaded());
|
||||
assertEquals(0, app.getNumRun());
|
||||
}
|
||||
|
||||
public HeliumPackageInfo createPackageInfo(String className, String artifact) {
|
||||
HeliumPackageInfo app1 = new HeliumPackageInfo(
|
||||
HeliumPackageInfo.Type.APPLICATION,
|
||||
public HeliumPackage createPackageInfo(String className, String artifact) {
|
||||
HeliumPackage app1 = new HeliumPackage(
|
||||
HeliumPackage.Type.APPLICATION,
|
||||
"name1",
|
||||
"desc1",
|
||||
artifact,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Manages helium packages
|
||||
*/
|
||||
public class Helium {
|
||||
Logger logger = LoggerFactory.getLogger(Helium.class);
|
||||
private List<HeliumRegistry> registry = new LinkedList<HeliumRegistry>();
|
||||
|
||||
private final HeliumConf heliumConf;
|
||||
private final String heliumConfPath;
|
||||
private final Gson gson;
|
||||
|
||||
public Helium(String heliumConfPath) throws IOException {
|
||||
this.heliumConfPath = heliumConfPath;
|
||||
|
||||
GsonBuilder builder = new GsonBuilder();
|
||||
builder.setPrettyPrinting();
|
||||
builder.registerTypeAdapter(
|
||||
HeliumRegistry.class, new HeliumRegistrySerializer());
|
||||
gson = builder.create();
|
||||
|
||||
heliumConf = loadConf(heliumConfPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add HeliumRegistry
|
||||
* @param registry
|
||||
*/
|
||||
public void addRegistry(HeliumRegistry registry) {
|
||||
synchronized (this.registry) {
|
||||
this.registry.add(registry);
|
||||
}
|
||||
}
|
||||
|
||||
public List<HeliumRegistry> getAllRegistry() {
|
||||
synchronized (this.registry) {
|
||||
List list = new LinkedList<HeliumRegistry>();
|
||||
for (HeliumRegistry r : registry) {
|
||||
list.add(r);
|
||||
}
|
||||
return list;
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized HeliumConf loadConf(String path) throws IOException {
|
||||
File heliumConfFile = new File(path);
|
||||
if (!heliumConfFile.isFile()) {
|
||||
logger.warn("{} does not exists", path);
|
||||
return new HeliumConf();
|
||||
} else {
|
||||
String jsonString = FileUtils.readFileToString(heliumConfFile);
|
||||
HeliumConf conf = gson.fromJson(jsonString, HeliumConf.class);
|
||||
this.registry = conf.getRegistry();
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void save() throws IOException {
|
||||
String jsonString;
|
||||
synchronized (registry) {
|
||||
heliumConf.setRegistry(registry);
|
||||
jsonString = gson.toJson(heliumConf);
|
||||
}
|
||||
|
||||
File heliumConfFile = new File(heliumConfPath);
|
||||
if (!heliumConfFile.exists()) {
|
||||
heliumConfFile.createNewFile();
|
||||
}
|
||||
|
||||
FileUtils.writeStringToFile(heliumConfFile, jsonString);
|
||||
}
|
||||
|
||||
public List<HeliumPackageSearchResult> getAllPackageInfo() {
|
||||
List<HeliumPackageSearchResult> list = new LinkedList<HeliumPackageSearchResult>();
|
||||
synchronized (registry) {
|
||||
for (HeliumRegistry r : registry) {
|
||||
try {
|
||||
for (HeliumPackage pkg : r.getAll()) {
|
||||
list.add(new HeliumPackageSearchResult(r.name(), pkg));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Helium config. This object will be persisted to conf/heliumc.conf
|
||||
*/
|
||||
public class HeliumConf {
|
||||
List<HeliumRegistry> registry = new LinkedList<HeliumRegistry>();
|
||||
|
||||
public List<HeliumRegistry> getRegistry() {
|
||||
return registry;
|
||||
}
|
||||
|
||||
public void setRegistry(List<HeliumRegistry> registry) {
|
||||
this.registry = registry;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Simple Helium registry on local filesystem
|
||||
*/
|
||||
public class HeliumLocalRegistry extends HeliumRegistry {
|
||||
Logger logger = LoggerFactory.getLogger(HeliumLocalRegistry.class);
|
||||
|
||||
private final Gson gson;
|
||||
|
||||
public HeliumLocalRegistry(String name, URI uri) {
|
||||
super(name, uri);
|
||||
gson = new Gson();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized List<HeliumPackage> getAll() throws IOException {
|
||||
List<HeliumPackage> result = new LinkedList<HeliumPackage>();
|
||||
|
||||
File file = new File(uri());
|
||||
File [] files = file.listFiles();
|
||||
if (files == null) {
|
||||
return result;
|
||||
}
|
||||
|
||||
for (File f : files) {
|
||||
if (f.getName().startsWith(".")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
HeliumPackage pkgInfo = readPackageInfo(f);
|
||||
if (pkgInfo != null) {
|
||||
result.add(pkgInfo);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private HeliumPackage readPackageInfo(File f) {
|
||||
try {
|
||||
return gson.fromJson(FileUtils.readFileToString(f), HeliumPackage.class);
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
/**
|
||||
* search result
|
||||
*/
|
||||
public class HeliumPackageSearchResult {
|
||||
private final String registry;
|
||||
private final HeliumPackage pkg;
|
||||
|
||||
/**
|
||||
* Create search result item
|
||||
* @param registry registry name
|
||||
* @param pkg package information
|
||||
*/
|
||||
public HeliumPackageSearchResult(String registry, HeliumPackage pkg) {
|
||||
this.registry = registry;
|
||||
this.pkg = pkg;
|
||||
}
|
||||
|
||||
public String getRegistry() {
|
||||
return registry;
|
||||
}
|
||||
|
||||
public HeliumPackage getPkg() {
|
||||
return pkg;
|
||||
}
|
||||
}
|
||||
|
|
@ -16,20 +16,26 @@
|
|||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Helium registry
|
||||
* Helium package registry
|
||||
*/
|
||||
public abstract class HeliumRegistry {
|
||||
private final String name;
|
||||
private final URI uri;
|
||||
|
||||
public HeliumRegistry(URI uri) {
|
||||
public HeliumRegistry(String name, URI uri) {
|
||||
this.name = name;
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
public String name() {
|
||||
return name;
|
||||
}
|
||||
public URI uri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
public abstract List<HeliumPackage> getAll() throws IOException;
|
||||
}
|
||||
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import com.google.gson.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Type;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* HeliumRegistrySerializer (and deserializer) for gson
|
||||
*/
|
||||
public class HeliumRegistrySerializer
|
||||
implements JsonSerializer<HeliumRegistry>, JsonDeserializer<HeliumRegistry> {
|
||||
Logger logger = LoggerFactory.getLogger(HeliumRegistrySerializer.class);
|
||||
|
||||
@Override
|
||||
public HeliumRegistry deserialize(JsonElement json,
|
||||
Type type,
|
||||
JsonDeserializationContext jsonDeserializationContext)
|
||||
throws JsonParseException {
|
||||
JsonObject jsonObject = json.getAsJsonObject();
|
||||
String className = jsonObject.get("class").getAsString();
|
||||
URI uri = null;
|
||||
try {
|
||||
uri = new URI(jsonObject.get("uri").getAsString());
|
||||
} catch (URISyntaxException e) {
|
||||
new JsonParseException(e);
|
||||
}
|
||||
String name = jsonObject.get("name").getAsString();
|
||||
|
||||
try {
|
||||
logger.info("Restore helium registry {} {} {}", name, className, uri);
|
||||
Class<HeliumRegistry> cls =
|
||||
(Class<HeliumRegistry>) getClass().getClassLoader().loadClass(className);
|
||||
Constructor<HeliumRegistry> constructor = cls.getConstructor(String.class, URI.class);
|
||||
HeliumRegistry registry = constructor.newInstance(name, uri);
|
||||
return registry;
|
||||
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
|
||||
InstantiationException | InvocationTargetException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonElement serialize(HeliumRegistry heliumRegistry,
|
||||
Type type,
|
||||
JsonSerializationContext jsonSerializationContext) {
|
||||
JsonObject json = new JsonObject();
|
||||
json.addProperty("class", heliumRegistry.getClass().getName());
|
||||
json.addProperty("uri", heliumRegistry.uri().toString());
|
||||
json.addProperty("name", heliumRegistry.name());
|
||||
return json;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class HeliumLocalRegistryTest {
|
||||
private File tmpDir;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
|
||||
tmpDir.mkdirs();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllPackage() throws IOException {
|
||||
// given
|
||||
File r1Path = new File(tmpDir, "r1");
|
||||
HeliumLocalRegistry r1 = new HeliumLocalRegistry("r1", r1Path.toURI());
|
||||
assertEquals(0, r1.getAll().size());
|
||||
|
||||
// when
|
||||
Gson gson = new Gson();
|
||||
HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
|
||||
"app1",
|
||||
"desc1",
|
||||
"artifact1",
|
||||
"classname1",
|
||||
new String[][]{});
|
||||
FileUtils.writeStringToFile(new File(r1Path, "pkg1.json"), gson.toJson(pkg1));
|
||||
|
||||
// then
|
||||
assertEquals(1, r1.getAll().size());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class HeliumTest {
|
||||
private File tmpDir;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
|
||||
tmpDir.mkdirs();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaveLoadConf() throws IOException, URISyntaxException {
|
||||
// given
|
||||
File heliumConf = new File(tmpDir, "helium.conf");
|
||||
Helium helium = new Helium(heliumConf.getAbsolutePath());
|
||||
assertFalse(heliumConf.exists());
|
||||
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", new URI("file:///r1"));
|
||||
helium.addRegistry(registry1);
|
||||
assertEquals(1, helium.getAllRegistry().size());
|
||||
assertEquals(0, helium.getAllPackageInfo().size());
|
||||
|
||||
// when
|
||||
helium.save();
|
||||
|
||||
// then
|
||||
assertTrue(heliumConf.exists());
|
||||
|
||||
// then
|
||||
Helium heliumRestored = new Helium(heliumConf.getAbsolutePath());
|
||||
assertEquals(1, heliumRestored.getAllRegistry().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestoreRegistryInstances() throws IOException, URISyntaxException {
|
||||
File heliumConf = new File(tmpDir, "helium.conf");
|
||||
Helium helium = new Helium(heliumConf.getAbsolutePath());
|
||||
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", new URI("file:///r1"));
|
||||
HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", new URI("file:///r2"));
|
||||
helium.addRegistry(registry1);
|
||||
helium.addRegistry(registry2);
|
||||
|
||||
// when
|
||||
registry1.add(new HeliumPackage(
|
||||
HeliumPackage.Type.APPLICATION,
|
||||
"name1",
|
||||
"desc1",
|
||||
"artifact1",
|
||||
"className1",
|
||||
new String[][]{}));
|
||||
|
||||
registry2.add(new HeliumPackage(
|
||||
HeliumPackage.Type.APPLICATION,
|
||||
"name2",
|
||||
"desc2",
|
||||
"artifact2",
|
||||
"className2",
|
||||
new String[][]{}));
|
||||
|
||||
// then
|
||||
assertEquals(2, helium.getAllPackageInfo().size());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
public class HeliumTestRegistry extends HeliumRegistry {
|
||||
List<HeliumPackage> infos = new LinkedList<HeliumPackage>();
|
||||
|
||||
public HeliumTestRegistry(String name, URI uri) {
|
||||
super(name, uri);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HeliumPackage> getAll() throws IOException {
|
||||
return infos;
|
||||
}
|
||||
|
||||
public void add(HeliumPackage info) {
|
||||
infos.add(info);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue