mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Helium application factory
This commit is contained in:
parent
b891b98ee5
commit
b239f1b96b
8 changed files with 265 additions and 2 deletions
|
|
@ -101,7 +101,7 @@ public class ApplicationLoader {
|
|||
* @throws Exception
|
||||
*/
|
||||
public Application load(HeliumPackage packageInfo, ApplicationContext context)
|
||||
throws Exception {
|
||||
throws Exception {
|
||||
if (packageInfo.getType() != HeliumPackage.Type.APPLICATION) {
|
||||
throw new ApplicationException(
|
||||
"Can't instantiate " + packageInfo.getType() + " package using ApplicationLoader");
|
||||
|
|
|
|||
|
|
@ -28,6 +28,9 @@ public class HeliumPackage {
|
|||
private String [][] resources; // resource classnames that requires
|
||||
// [[ .. and .. and .. ] or [ .. and .. and ..] ..]
|
||||
|
||||
/**
|
||||
* Type of package
|
||||
*/
|
||||
public static enum Type {
|
||||
INTERPRETER,
|
||||
NOTEBOOK_REPO,
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
|
|||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.helium.Helium;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.NotebookAuthorization;
|
||||
|
|
@ -74,12 +75,15 @@ public class ZeppelinServer extends Application {
|
|||
private SearchService notebookIndex;
|
||||
private NotebookAuthorization notebookAuthorization;
|
||||
private DependencyResolver depResolver;
|
||||
private Helium helium;
|
||||
|
||||
public ZeppelinServer() throws Exception {
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
|
||||
this.depResolver = new DependencyResolver(
|
||||
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
|
||||
|
||||
this.helium = new Helium(conf.getHeliumConfPath());
|
||||
this.schedulerFactory = new SchedulerFactory();
|
||||
this.replFactory = new InterpreterFactory(conf, notebookWsServer,
|
||||
notebookWsServer, depResolver);
|
||||
|
|
|
|||
|
|
@ -342,6 +342,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getRelativeDir(String.format("%s/interpreter.json", getConfDir()));
|
||||
}
|
||||
|
||||
public String getHeliumConfPath() {
|
||||
return getRelativeDir(String.format("%s/helium.json", getConfDir()));
|
||||
}
|
||||
|
||||
public String getNotebookAuthorizationPath() {
|
||||
return getRelativeDir(String.format("%s/notebook-authorization.json", getConfDir()));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -115,5 +115,4 @@ public class Helium {
|
|||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,228 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* HeliumApplicationFactory
|
||||
*/
|
||||
public class HeliumApplicationFactory {
|
||||
Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
|
||||
|
||||
private final Gson gson;
|
||||
Map<String, RunningApplicationInfo> apps =
|
||||
Collections.synchronizedMap(new HashMap<String, RunningApplicationInfo>());
|
||||
|
||||
|
||||
public HeliumApplicationFactory() {
|
||||
gson = new Gson();
|
||||
}
|
||||
|
||||
private static class RunningApplicationInfo {
|
||||
HeliumPackage pkg;
|
||||
Paragraph paragraph;
|
||||
RemoteInterpreterProcess process;
|
||||
|
||||
public RunningApplicationInfo(HeliumPackage pkg,
|
||||
RemoteInterpreterProcess process,
|
||||
Paragraph paragraph) {
|
||||
this.pkg = pkg;
|
||||
this.paragraph = paragraph;
|
||||
this.process = process;
|
||||
}
|
||||
|
||||
public HeliumPackage getPkg() {
|
||||
return pkg;
|
||||
}
|
||||
|
||||
public Paragraph getParagraph() {
|
||||
return paragraph;
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcess getProcess() {
|
||||
return process;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static String generateApplicationId() {
|
||||
return "app_" + System.currentTimeMillis() + "_"
|
||||
+ new Random(System.currentTimeMillis()).nextInt();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Load pkg
|
||||
*/
|
||||
public void load(HeliumPackage pkg, Paragraph paragraph) throws ApplicationException {
|
||||
Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
|
||||
RemoteInterpreterProcess intpProcess = intp.getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
String appId = generateApplicationId();
|
||||
String pkgInfo = gson.toJson(pkg);
|
||||
try {
|
||||
RemoteApplicationResult ret = client.loadApplication(
|
||||
appId,
|
||||
pkgInfo,
|
||||
paragraph.getNote().getId(),
|
||||
paragraph.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
RunningApplicationInfo appInfo = new RunningApplicationInfo(pkg, intpProcess, paragraph);
|
||||
apps.put(appId, appInfo);
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Unload pkg
|
||||
*/
|
||||
public void unload(String appId) throws ApplicationException {
|
||||
RunningApplicationInfo appInfo = apps.get(appId);
|
||||
if (appInfo == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess intpProcess = appInfo.getProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.unloadApplication(appId);
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
apps.remove(appId);
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* run pkg
|
||||
*/
|
||||
public void run(String appId) throws ApplicationException {
|
||||
RunningApplicationInfo appInfo = apps.get(appId);
|
||||
if (appInfo == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess intpProcess = appInfo.getProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.runApplication(appId);
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
// success
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
public void unloadAllInTheInteprreterProcess(RemoteInterpreterProcess process) {
|
||||
for (String appId : apps.keySet()) {
|
||||
RunningApplicationInfo app = apps.get(appId);
|
||||
if (app.getProcess() == process) {
|
||||
try {
|
||||
unload(appId);
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void unloadAllInTheNote(Note note) {
|
||||
for (String appId : apps.keySet()) {
|
||||
RunningApplicationInfo app = apps.get(appId);
|
||||
if (app.getParagraph().getNote().getId().equals(note.getId())) {
|
||||
try {
|
||||
unload(appId);
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void unloadAllInTheParagraph(Paragraph paragraph) {
|
||||
for (String appId : apps.keySet()) {
|
||||
RunningApplicationInfo app = apps.get(appId);
|
||||
if (app.getParagraph().getId().equals(paragraph.getId())) {
|
||||
try {
|
||||
unload(appId);
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.notebook;
|
||||
|
||||
/**
|
||||
* Running ApplicationState
|
||||
*/
|
||||
public class ApplicationState {
|
||||
}
|
||||
|
|
@ -53,6 +53,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
Date dateUpdated;
|
||||
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
|
||||
public final GUI settings; // form and parameter settings
|
||||
private Map<String, ApplicationState> apps; // helium applications running
|
||||
|
||||
@VisibleForTesting
|
||||
Paragraph() {
|
||||
|
|
@ -71,6 +72,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
dateUpdated = null;
|
||||
settings = new GUI();
|
||||
config = new HashMap<String, Object>();
|
||||
apps = new HashMap<String, ApplicationState>();
|
||||
}
|
||||
|
||||
private static String generateId() {
|
||||
|
|
|
|||
Loading…
Reference in a new issue