mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Cache online registry under local-repo for offline support
This commit is contained in:
parent
b2985e9c8c
commit
65c6092e03
4 changed files with 75 additions and 25 deletions
|
|
@ -124,6 +124,8 @@ public class ZeppelinServer extends Application {
|
|||
this.helium = new Helium(
|
||||
conf.getHeliumConfPath(),
|
||||
conf.getHeliumRegistry(),
|
||||
new File(
|
||||
conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO), "helium_registry_cache"),
|
||||
heliumVisualizationFactory,
|
||||
heliumApplicationFactory);
|
||||
|
||||
|
|
|
|||
|
|
@ -42,6 +42,8 @@ public class Helium {
|
|||
private final HeliumConf heliumConf;
|
||||
private final String heliumConfPath;
|
||||
private final String registryPaths;
|
||||
private final File registryCacheDir;
|
||||
|
||||
private final Gson gson;
|
||||
private final HeliumVisualizationFactory visualizationFactory;
|
||||
private final HeliumApplicationFactory applicationFactory;
|
||||
|
|
@ -49,11 +51,13 @@ public class Helium {
|
|||
public Helium(
|
||||
String heliumConfPath,
|
||||
String registryPaths,
|
||||
File registryCacheDir,
|
||||
HeliumVisualizationFactory visualizationFactory,
|
||||
HeliumApplicationFactory applicationFactory)
|
||||
throws IOException {
|
||||
this.heliumConfPath = heliumConfPath;
|
||||
this.registryPaths = registryPaths;
|
||||
this.registryCacheDir = registryCacheDir;
|
||||
this.visualizationFactory = visualizationFactory;
|
||||
this.applicationFactory = applicationFactory;
|
||||
|
||||
|
|
@ -102,7 +106,7 @@ public class Helium {
|
|||
for (String uri : paths) {
|
||||
if (uri.startsWith("http://") || uri.startsWith("https://")) {
|
||||
logger.info("Add helium online registry {}", uri);
|
||||
registry.add(new HeliumOnlineRegistry(uri, uri));
|
||||
registry.add(new HeliumOnlineRegistry(uri, uri, registryCacheDir));
|
||||
} else {
|
||||
logger.info("Add helium local registry {}", uri);
|
||||
registry.add(new HeliumLocalRegistry(uri, uri));
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package org.apache.zeppelin.helium;
|
|||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
|
|
@ -26,10 +27,7 @@ import org.apache.zeppelin.util.Util;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.*;
|
||||
import java.net.URL;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
|
@ -52,38 +50,84 @@ import java.util.Map;
|
|||
public class HeliumOnlineRegistry extends HeliumRegistry {
|
||||
Logger logger = LoggerFactory.getLogger(HeliumOnlineRegistry.class);
|
||||
private final Gson gson;
|
||||
private final File registryCacheFile;
|
||||
|
||||
public HeliumOnlineRegistry(String name, String uri) {
|
||||
public HeliumOnlineRegistry(String name, String uri, File registryCacheDir) {
|
||||
super(name, uri);
|
||||
registryCacheDir.mkdirs();
|
||||
this.registryCacheFile = new File(registryCacheDir, name);
|
||||
gson = new Gson();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HeliumPackage> getAll() throws IOException {
|
||||
public synchronized List<HeliumPackage> getAll() throws IOException {
|
||||
HttpClient client = HttpClientBuilder.create()
|
||||
.setUserAgent("ApacheZeppelin/" + Util.getVersion())
|
||||
.build();
|
||||
HttpGet get = new HttpGet(uri());
|
||||
HttpResponse response = client.execute(get);
|
||||
if (response.getStatusLine().getStatusCode() != 200) {
|
||||
throw new IOException(uri() + " returned " + response.getStatusLine().toString());
|
||||
HttpResponse response;
|
||||
|
||||
try {
|
||||
response = client.execute(get);
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage());
|
||||
return readFromCache();
|
||||
}
|
||||
|
||||
BufferedReader reader = new BufferedReader(
|
||||
new InputStreamReader(response.getEntity().getContent()));
|
||||
List<Map<String, Map<String, HeliumPackage>>> packages = gson.fromJson(
|
||||
reader,
|
||||
new TypeToken<List<Map<String, Map<String, HeliumPackage>>>>() {
|
||||
}.getType());
|
||||
reader.close();
|
||||
|
||||
List<HeliumPackage> packageList = new LinkedList<>();
|
||||
if (response.getStatusLine().getStatusCode() != 200) {
|
||||
// try read from cache
|
||||
logger.error(uri() + " returned " + response.getStatusLine().toString());
|
||||
return readFromCache();
|
||||
} else {
|
||||
List<HeliumPackage> packageList = new LinkedList<>();
|
||||
|
||||
for (Map<String, Map<String, HeliumPackage>> pkg : packages) {
|
||||
for (Map<String, HeliumPackage> versions : pkg.values()) {
|
||||
packageList.addAll(versions.values());
|
||||
BufferedReader reader;
|
||||
reader = new BufferedReader(
|
||||
new InputStreamReader(response.getEntity().getContent()));
|
||||
|
||||
List<Map<String, Map<String, HeliumPackage>>> packages = gson.fromJson(
|
||||
reader,
|
||||
new TypeToken<List<Map<String, Map<String, HeliumPackage>>>>() {
|
||||
}.getType());
|
||||
reader.close();
|
||||
|
||||
for (Map<String, Map<String, HeliumPackage>> pkg : packages) {
|
||||
for (Map<String, HeliumPackage> versions : pkg.values()) {
|
||||
packageList.addAll(versions.values());
|
||||
}
|
||||
}
|
||||
|
||||
writeToCache(packageList);
|
||||
return packageList;
|
||||
}
|
||||
}
|
||||
|
||||
private List<HeliumPackage> readFromCache() {
|
||||
synchronized (registryCacheFile) {
|
||||
if (registryCacheFile.isFile()) {
|
||||
try {
|
||||
return gson.fromJson(
|
||||
new FileReader(registryCacheFile),
|
||||
new TypeToken<List<HeliumPackage>>() {
|
||||
}.getType());
|
||||
} catch (FileNotFoundException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new LinkedList<>();
|
||||
}
|
||||
} else {
|
||||
return new LinkedList<>();
|
||||
}
|
||||
}
|
||||
return packageList;
|
||||
}
|
||||
|
||||
private void writeToCache(List<HeliumPackage> pkg) throws IOException {
|
||||
synchronized (registryCacheFile) {
|
||||
if (registryCacheFile.exists()) {
|
||||
registryCacheFile.delete();
|
||||
}
|
||||
String jsonToCache = gson.toJson(pkg);
|
||||
FileUtils.writeStringToFile(registryCacheFile, jsonToCache);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ public class HeliumTest {
|
|||
// given
|
||||
File heliumConf = new File(tmpDir, "helium.conf");
|
||||
Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(),
|
||||
null, null);
|
||||
null, null, null);
|
||||
assertFalse(heliumConf.exists());
|
||||
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
|
||||
helium.addRegistry(registry1);
|
||||
|
|
@ -68,7 +68,7 @@ public class HeliumTest {
|
|||
|
||||
// then
|
||||
Helium heliumRestored = new Helium(
|
||||
heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null);
|
||||
heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
|
||||
assertEquals(2, heliumRestored.getAllRegistry().size());
|
||||
}
|
||||
|
||||
|
|
@ -76,7 +76,7 @@ public class HeliumTest {
|
|||
public void testRestoreRegistryInstances() throws IOException, URISyntaxException, TaskRunnerException {
|
||||
File heliumConf = new File(tmpDir, "helium.conf");
|
||||
Helium helium = new Helium(
|
||||
heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null);
|
||||
heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
|
||||
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
|
||||
HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2");
|
||||
helium.addRegistry(registry1);
|
||||
|
|
|
|||
Loading…
Reference in a new issue