Helium application factory

This commit is contained in:
Lee moon soo 2016-03-19 08:25:30 -07:00
parent b891b98ee5
commit b239f1b96b
8 changed files with 265 additions and 2 deletions

View file

@ -101,7 +101,7 @@ public class ApplicationLoader {
* @throws Exception
*/
public Application load(HeliumPackage packageInfo, ApplicationContext context)
throws Exception {
throws Exception {
if (packageInfo.getType() != HeliumPackage.Type.APPLICATION) {
throw new ApplicationException(
"Can't instantiate " + packageInfo.getType() + " package using ApplicationLoader");

View file

@ -28,6 +28,9 @@ public class HeliumPackage {
private String [][] resources; // resource classnames that requires
// [[ .. and .. and .. ] or [ .. and .. and ..] ..]
/**
* Type of package
*/
public static enum Type {
INTERPRETER,
NOTEBOOK_REPO,

View file

@ -31,6 +31,7 @@ import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.helium.Helium;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
@ -74,12 +75,15 @@ public class ZeppelinServer extends Application {
private SearchService notebookIndex;
private NotebookAuthorization notebookAuthorization;
private DependencyResolver depResolver;
private Helium helium;
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
this.depResolver = new DependencyResolver(
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
this.helium = new Helium(conf.getHeliumConfPath());
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookWsServer,
notebookWsServer, depResolver);

View file

@ -342,6 +342,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getRelativeDir(String.format("%s/interpreter.json", getConfDir()));
}
public String getHeliumConfPath() {
return getRelativeDir(String.format("%s/helium.json", getConfDir()));
}
public String getNotebookAuthorizationPath() {
return getRelativeDir(String.format("%s/notebook-authorization.json", getConfDir()));
}

View file

@ -115,5 +115,4 @@ public class Helium {
}
return list;
}
}

View file

@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
import com.google.gson.Gson;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* HeliumApplicationFactory
*/
public class HeliumApplicationFactory {
Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
private final Gson gson;
Map<String, RunningApplicationInfo> apps =
Collections.synchronizedMap(new HashMap<String, RunningApplicationInfo>());
public HeliumApplicationFactory() {
gson = new Gson();
}
private static class RunningApplicationInfo {
HeliumPackage pkg;
Paragraph paragraph;
RemoteInterpreterProcess process;
public RunningApplicationInfo(HeliumPackage pkg,
RemoteInterpreterProcess process,
Paragraph paragraph) {
this.pkg = pkg;
this.paragraph = paragraph;
this.process = process;
}
public HeliumPackage getPkg() {
return pkg;
}
public Paragraph getParagraph() {
return paragraph;
}
public RemoteInterpreterProcess getProcess() {
return process;
}
}
private static String generateApplicationId() {
return "app_" + System.currentTimeMillis() + "_"
+ new Random(System.currentTimeMillis()).nextInt();
}
/**
* Load pkg
*/
public void load(HeliumPackage pkg, Paragraph paragraph) throws ApplicationException {
Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
RemoteInterpreterProcess intpProcess = intp.getInterpreterGroup().getRemoteInterpreterProcess();
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
String appId = generateApplicationId();
String pkgInfo = gson.toJson(pkg);
try {
RemoteApplicationResult ret = client.loadApplication(
appId,
pkgInfo,
paragraph.getNote().getId(),
paragraph.getId());
if (ret.isSuccess()) {
RunningApplicationInfo appInfo = new RunningApplicationInfo(pkg, intpProcess, paragraph);
apps.put(appId, appInfo);
} else {
throw new ApplicationException(ret.getMsg());
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
/**
* Unload pkg
*/
public void unload(String appId) throws ApplicationException {
RunningApplicationInfo appInfo = apps.get(appId);
if (appInfo == null) {
return;
}
RemoteInterpreterProcess intpProcess = appInfo.getProcess();
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.unloadApplication(appId);
if (ret.isSuccess()) {
apps.remove(appId);
} else {
throw new ApplicationException(ret.getMsg());
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
/**
* run pkg
*/
public void run(String appId) throws ApplicationException {
RunningApplicationInfo appInfo = apps.get(appId);
if (appInfo == null) {
return;
}
RemoteInterpreterProcess intpProcess = appInfo.getProcess();
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.runApplication(appId);
if (ret.isSuccess()) {
// success
} else {
throw new ApplicationException(ret.getMsg());
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
public void unloadAllInTheInteprreterProcess(RemoteInterpreterProcess process) {
for (String appId : apps.keySet()) {
RunningApplicationInfo app = apps.get(appId);
if (app.getProcess() == process) {
try {
unload(appId);
} catch (ApplicationException e) {
logger.error(e.getMessage(), e);
}
}
}
}
public void unloadAllInTheNote(Note note) {
for (String appId : apps.keySet()) {
RunningApplicationInfo app = apps.get(appId);
if (app.getParagraph().getNote().getId().equals(note.getId())) {
try {
unload(appId);
} catch (ApplicationException e) {
logger.error(e.getMessage(), e);
}
}
}
}
public void unloadAllInTheParagraph(Paragraph paragraph) {
for (String appId : apps.keySet()) {
RunningApplicationInfo app = apps.get(appId);
if (app.getParagraph().getId().equals(paragraph.getId())) {
try {
unload(appId);
} catch (ApplicationException e) {
logger.error(e.getMessage(), e);
}
}
}
}
}

View file

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook;
/**
* Running ApplicationState
*/
public class ApplicationState {
}

View file

@ -53,6 +53,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
Date dateUpdated;
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
public final GUI settings; // form and parameter settings
private Map<String, ApplicationState> apps; // helium applications running
@VisibleForTesting
Paragraph() {
@ -71,6 +72,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
dateUpdated = null;
settings = new GUI();
config = new HashMap<String, Object>();
apps = new HashMap<String, ApplicationState>();
}
private static String generateId() {