mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
feat: Support config for helium pkg in backend
This commit is contained in:
parent
0a0c56547a
commit
6910e97b15
8 changed files with 208 additions and 24 deletions
|
|
@ -18,8 +18,12 @@
|
|||
package org.apache.zeppelin.rest;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParseException;
|
||||
import com.google.gson.JsonSyntaxException;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.helium.Helium;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
|
|
@ -34,6 +38,7 @@ import javax.ws.rs.core.Response;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Helium Rest Api
|
||||
|
|
@ -56,13 +61,34 @@ public class HeliumRestApi {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get all packages
|
||||
* @return
|
||||
* Get all package infos
|
||||
*/
|
||||
@GET
|
||||
@Path("all")
|
||||
public Response getAll() {
|
||||
return new JsonResponse(Response.Status.OK, "", helium.getAllPackageInfo()).build();
|
||||
@Path("package")
|
||||
public Response getAllPackageInfo() {
|
||||
return new JsonResponse(
|
||||
Response.Status.OK, "", helium.getAllPackageInfo()).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get single package info
|
||||
*/
|
||||
@GET
|
||||
@Path("package/{packageName}")
|
||||
public Response getSinglePackageInfo(@PathParam("packageName") String packageName) {
|
||||
if (StringUtils.isEmpty(packageName)) {
|
||||
return new JsonResponse(
|
||||
Response.Status.BAD_REQUEST,
|
||||
"Can't get package info for empty name").build();
|
||||
}
|
||||
|
||||
try {
|
||||
return new JsonResponse(
|
||||
Response.Status.OK, "", helium.getSinglePackageInfo(packageName)).build();
|
||||
} catch (RuntimeException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new JsonResponse(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage()).build();
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
|
|
@ -165,6 +191,62 @@ public class HeliumRestApi {
|
|||
return new JsonResponse(Response.Status.OK, order).build();
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("config")
|
||||
public Response getAllPackageConfigs() {
|
||||
try {
|
||||
Map<String, Map<String, Map<String, Object>>> config = helium.getAllPackageConfig();
|
||||
return new JsonResponse(Response.Status.OK, config).build();
|
||||
} catch (RuntimeException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new JsonResponse(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage()).build();
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("config/{packageName}/{packageVersion}")
|
||||
public Response getPackageConfig(@PathParam("packageName") String packageName,
|
||||
@PathParam("packageVersion") String packageVersion) {
|
||||
|
||||
if (StringUtils.isEmpty(packageName) || StringUtils.isEmpty(packageVersion)) {
|
||||
return new JsonResponse(Response.Status.BAD_REQUEST,
|
||||
"package name or version is empty"
|
||||
).build();
|
||||
}
|
||||
|
||||
try {
|
||||
Map<String, Object> config = helium.getPackageConfig(packageName, packageVersion);
|
||||
return new JsonResponse(Response.Status.OK, config).build();
|
||||
} catch (RuntimeException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new JsonResponse(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage()).build();
|
||||
}
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("config/{packageName}/{packageVersion}")
|
||||
public Response updatePackageConfig(@PathParam("packageName") String packageName,
|
||||
@PathParam("packageVersion") String packageVersion,
|
||||
String rawConfig) {
|
||||
|
||||
Map<String, Object> packageConfig = null;
|
||||
|
||||
try {
|
||||
packageConfig = gson.fromJson(
|
||||
rawConfig, new TypeToken<Map<String, Object>>(){}.getType());
|
||||
helium.updatePackageConfig(packageName, packageVersion, packageConfig);
|
||||
} catch (JsonParseException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new JsonResponse(Response.Status.BAD_REQUEST,
|
||||
e.getMessage()).build();
|
||||
} catch (IOException | RuntimeException e) {
|
||||
return new JsonResponse(Response.Status.INTERNAL_SERVER_ERROR,
|
||||
e.getMessage()).build();
|
||||
}
|
||||
|
||||
return new JsonResponse(Response.Status.OK, packageConfig).build();
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("order/visualization")
|
||||
public Response getVisualizationPackageOrder(String orderedPackageNameList) {
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ public class ZeppelinServer extends Application {
|
|||
new File(conf.getRelativeDir("zeppelin-web/src/app/spell")));
|
||||
}
|
||||
|
||||
this.helium = new Helium(
|
||||
ZeppelinServer.helium = new Helium(
|
||||
conf.getHeliumConfPath(),
|
||||
conf.getHeliumRegistry(),
|
||||
new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO),
|
||||
|
|
@ -177,7 +177,7 @@ public class ZeppelinServer extends Application {
|
|||
// Web UI
|
||||
final WebAppContext webApp = setupWebAppContext(contexts, conf);
|
||||
|
||||
// REST api
|
||||
// Create `ZeppelinServer` using reflection and setup REST Api
|
||||
setupRestApiContextHandler(webApp, conf);
|
||||
|
||||
// Notebook server
|
||||
|
|
|
|||
|
|
@ -2281,7 +2281,6 @@ public class NotebookServer extends WebSocketServlet
|
|||
resp.put("editor", notebook().getInterpreterSettingManager().
|
||||
getEditorSetting(interpreter, user, noteId, replName));
|
||||
conn.send(serializeMessage(resp));
|
||||
return;
|
||||
}
|
||||
|
||||
private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject)
|
||||
|
|
@ -2324,11 +2323,31 @@ public class NotebookServer extends WebSocketServlet
|
|||
.equals(WatcherSecurityKey.getKey()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Send websocket message to all connections regardless of notebook id
|
||||
*/
|
||||
private void broadcastToAllConnections(String serialized) {
|
||||
broadcastToAllConnectionsExcept(null, serialized);
|
||||
}
|
||||
|
||||
private void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serialized) {
|
||||
synchronized (connectedSockets) {
|
||||
for (NotebookSocket conn: connectedSockets) {
|
||||
if (exclude != null && exclude.equals(conn)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
conn.send(serialized);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot broadcast message to watcher", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void broadcastToWatchers(String noteId, String subject, Message message) {
|
||||
synchronized (watcherSockets) {
|
||||
if (watcherSockets.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (NotebookSocket watcher : watcherSockets) {
|
||||
try {
|
||||
watcher.send(
|
||||
|
|
|
|||
0
zeppelin-web/src/components/helium/helium-conf.js
Normal file
0
zeppelin-web/src/components/helium/helium-conf.js
Normal file
0
zeppelin-web/src/components/helium/helium-package.js
Normal file
0
zeppelin-web/src/components/helium/helium-package.js
Normal file
|
|
@ -19,6 +19,7 @@ package org.apache.zeppelin.helium;
|
|||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.resource.DistributedResourcePool;
|
||||
|
|
@ -144,7 +145,7 @@ public class Helium {
|
|||
}
|
||||
|
||||
private void clearNotExistsPackages() {
|
||||
Map<String, List<HeliumPackageSearchResult>> all = getAllPackageInfo(false);
|
||||
Map<String, List<HeliumPackageSearchResult>> all = getAllPackageInfo(false, null);
|
||||
|
||||
// clear visualization display order
|
||||
List<String> packageOrder = heliumConf.getBundleDisplayOrder();
|
||||
|
|
@ -166,10 +167,15 @@ public class Helium {
|
|||
}
|
||||
|
||||
public Map<String, List<HeliumPackageSearchResult>> getAllPackageInfo() {
|
||||
return getAllPackageInfo(true);
|
||||
return getAllPackageInfo(true, null);
|
||||
}
|
||||
|
||||
public Map<String, List<HeliumPackageSearchResult>> getAllPackageInfo(boolean refresh) {
|
||||
/**
|
||||
* @param refresh
|
||||
* @param packageName
|
||||
*/
|
||||
public Map<String, List<HeliumPackageSearchResult>> getAllPackageInfo(boolean refresh,
|
||||
String packageName) {
|
||||
Map<String, String> enabledPackageInfo = heliumConf.getEnabledPackages();
|
||||
|
||||
synchronized (registry) {
|
||||
|
|
@ -179,6 +185,12 @@ public class Helium {
|
|||
try {
|
||||
for (HeliumPackage pkg : r.getAll()) {
|
||||
String name = pkg.getName();
|
||||
|
||||
if (!StringUtils.isEmpty(packageName) &&
|
||||
!name.equals(packageName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String artifact = enabledPackageInfo.get(name);
|
||||
boolean enabled = (artifact != null && artifact.equals(pkg.getArtifact()));
|
||||
|
||||
|
|
@ -192,8 +204,12 @@ public class Helium {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
for (String name : allPackages.keySet()) {
|
||||
if (!StringUtils.isEmpty(packageName) &&
|
||||
!name.equals(packageName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<HeliumPackageSearchResult> pkgs = allPackages.get(name);
|
||||
String artifact = enabledPackageInfo.get(name);
|
||||
LinkedList<HeliumPackageSearchResult> newResults =
|
||||
|
|
@ -222,8 +238,18 @@ public class Helium {
|
|||
}
|
||||
}
|
||||
|
||||
public List<HeliumPackageSearchResult> getSinglePackageInfo(String packageName) {
|
||||
Map<String, List<HeliumPackageSearchResult>> result = getAllPackageInfo(false, packageName);
|
||||
|
||||
if (!result.containsKey(packageName)) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
return result.get(packageName);
|
||||
}
|
||||
|
||||
public HeliumPackageSearchResult getPackageInfo(String name, String artifact) {
|
||||
Map<String, List<HeliumPackageSearchResult>> infos = getAllPackageInfo(false);
|
||||
Map<String, List<HeliumPackageSearchResult>> infos = getAllPackageInfo(false, name);
|
||||
List<HeliumPackageSearchResult> packages = infos.get(name);
|
||||
if (artifact == null) {
|
||||
return packages.get(0);
|
||||
|
|
@ -278,6 +304,21 @@ public class Helium {
|
|||
save();
|
||||
}
|
||||
|
||||
public void updatePackageConfig(String pkgName, String pkgVersion,
|
||||
Map<String, Object> pkgConfig) throws IOException {
|
||||
heliumConf.updatePackageConfig(pkgName, pkgVersion, pkgConfig);
|
||||
|
||||
save();
|
||||
}
|
||||
|
||||
public Map<String, Map<String, Map<String, Object>>> getAllPackageConfig() {
|
||||
return heliumConf.getAllPackageConfigs();
|
||||
}
|
||||
|
||||
public Map<String, Object> getPackageConfig(String pkgName, String pkgVersion) {
|
||||
return heliumConf.getPackageConfig(pkgName, pkgVersion);
|
||||
}
|
||||
|
||||
public HeliumPackageSuggestion suggestApp(Paragraph paragraph) {
|
||||
HeliumPackageSuggestion suggestion = new HeliumPackageSuggestion();
|
||||
|
||||
|
|
@ -299,7 +340,7 @@ public class Helium {
|
|||
allResources = ResourcePoolUtils.getAllResources();
|
||||
}
|
||||
|
||||
for (List<HeliumPackageSearchResult> pkgs : getAllPackageInfo(false).values()) {
|
||||
for (List<HeliumPackageSearchResult> pkgs : getAllPackageInfo(false, null).values()) {
|
||||
for (HeliumPackageSearchResult pkg : pkgs) {
|
||||
if (pkg.getPkg().getType() == HeliumType.APPLICATION && pkg.isEnabled()) {
|
||||
ResourceSet resources = ApplicationLoader.findRequiredResourceSet(
|
||||
|
|
@ -327,7 +368,7 @@ public class Helium {
|
|||
* @return ordered list of enabled buildBundle package
|
||||
*/
|
||||
public List<HeliumPackage> getBundlePackagesToBundle() {
|
||||
Map<String, List<HeliumPackageSearchResult>> allPackages = getAllPackageInfo(false);
|
||||
Map<String, List<HeliumPackageSearchResult>> allPackages = getAllPackageInfo(false, null);
|
||||
List<String> visOrder = heliumConf.getBundleDisplayOrder();
|
||||
|
||||
List<HeliumPackage> orderedBundlePackages = new LinkedList<>();
|
||||
|
|
|
|||
|
|
@ -19,14 +19,19 @@ package org.apache.zeppelin.helium;
|
|||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Helium config. This object will be persisted to conf/heliumc.conf
|
||||
* Helium config. This object will be persisted to conf/helium.conf
|
||||
*/
|
||||
public class HeliumConf {
|
||||
// enabled packages {name, version}
|
||||
Map<String, String> enabled = Collections.synchronizedMap(new HashMap<String, String>());
|
||||
private Map<String, String> enabled = Collections.synchronizedMap(new HashMap<String, String>());
|
||||
|
||||
// config for versioned package {name {version {configKey configValue}}}
|
||||
private Map<String, Map<String, Map<String, Object>>> packageConfig =
|
||||
Collections.synchronizedMap(
|
||||
new HashMap<String, Map<String, Map<String, Object>>>());
|
||||
|
||||
// enabled visualization package display order
|
||||
List<String> bundleDisplayOrder = new LinkedList<>();
|
||||
private List<String> bundleDisplayOrder = new LinkedList<>();
|
||||
|
||||
public Map<String, String> getEnabledPackages() {
|
||||
return new HashMap<>(enabled);
|
||||
|
|
@ -40,6 +45,43 @@ public class HeliumConf {
|
|||
enabled.put(name, artifact);
|
||||
}
|
||||
|
||||
public void updatePackageConfig(String pkgName, String pkgVersion,
|
||||
Map<String, Object> newConfig) {
|
||||
if (!packageConfig.containsKey(pkgName)) {
|
||||
packageConfig.put(pkgName,
|
||||
Collections.synchronizedMap(new HashMap<String, Map<String, Object>>()));
|
||||
}
|
||||
|
||||
Map<String, Map<String, Object>> versionedConfig = packageConfig.get(pkgName);
|
||||
|
||||
versionedConfig.put(pkgVersion, newConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return versioned package config `{name, {version, {configKey, configVal}}}`
|
||||
*/
|
||||
public Map<String, Map<String, Map<String, Object>>> getAllPackageConfigs () {
|
||||
return packageConfig;
|
||||
}
|
||||
|
||||
public Map<String, Object> getPackageConfig (String pkgName, String pkgVersion) {
|
||||
if (!packageConfig.containsKey(pkgName)) {
|
||||
packageConfig.put(pkgName,
|
||||
Collections.synchronizedMap(new HashMap<String, Map<String, Object>>()));
|
||||
}
|
||||
|
||||
Map<String, Map<String, Object>> versionedConfig = packageConfig.get(pkgName);
|
||||
|
||||
if (!versionedConfig.containsKey(pkgVersion)) {
|
||||
versionedConfig.put(pkgVersion,
|
||||
Collections.synchronizedMap(new HashMap<String, Object>()));
|
||||
}
|
||||
|
||||
Map<String, Object> config = versionedConfig.get(pkgVersion);
|
||||
|
||||
return config;
|
||||
}
|
||||
|
||||
public void disablePackage(HeliumPackage pkg) {
|
||||
disablePackage(pkg.getName());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -122,7 +122,7 @@ public class HeliumTest {
|
|||
""));
|
||||
|
||||
// then
|
||||
assertEquals(1, helium.getAllPackageInfo(false).size());
|
||||
assertEquals(1, helium.getAllPackageInfo(false, null).size());
|
||||
|
||||
// when
|
||||
registry1.add(new HeliumPackage(
|
||||
|
|
@ -136,7 +136,7 @@ public class HeliumTest {
|
|||
""));
|
||||
|
||||
// then
|
||||
assertEquals(1, helium.getAllPackageInfo(false).size());
|
||||
assertEquals(2, helium.getAllPackageInfo(true).size());
|
||||
assertEquals(1, helium.getAllPackageInfo(false, null).size());
|
||||
assertEquals(2, helium.getAllPackageInfo(true, null).size());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue