Cache online registry under local-repo for offline support

This commit is contained in:
Lee moon soo 2017-01-23 14:16:15 -08:00
parent b2985e9c8c
commit 65c6092e03
4 changed files with 75 additions and 25 deletions

View file

@ -124,6 +124,8 @@ public class ZeppelinServer extends Application {
this.helium = new Helium(
conf.getHeliumConfPath(),
conf.getHeliumRegistry(),
new File(
conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO), "helium_registry_cache"),
heliumVisualizationFactory,
heliumApplicationFactory);

View file

@ -42,6 +42,8 @@ public class Helium {
private final HeliumConf heliumConf;
private final String heliumConfPath;
private final String registryPaths;
private final File registryCacheDir;
private final Gson gson;
private final HeliumVisualizationFactory visualizationFactory;
private final HeliumApplicationFactory applicationFactory;
@ -49,11 +51,13 @@ public class Helium {
public Helium(
String heliumConfPath,
String registryPaths,
File registryCacheDir,
HeliumVisualizationFactory visualizationFactory,
HeliumApplicationFactory applicationFactory)
throws IOException {
this.heliumConfPath = heliumConfPath;
this.registryPaths = registryPaths;
this.registryCacheDir = registryCacheDir;
this.visualizationFactory = visualizationFactory;
this.applicationFactory = applicationFactory;
@ -102,7 +106,7 @@ public class Helium {
for (String uri : paths) {
if (uri.startsWith("http://") || uri.startsWith("https://")) {
logger.info("Add helium online registry {}", uri);
registry.add(new HeliumOnlineRegistry(uri, uri));
registry.add(new HeliumOnlineRegistry(uri, uri, registryCacheDir));
} else {
logger.info("Add helium local registry {}", uri);
registry.add(new HeliumLocalRegistry(uri, uri));

View file

@ -18,6 +18,7 @@ package org.apache.zeppelin.helium;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.io.FileUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
@ -26,10 +27,7 @@ import org.apache.zeppelin.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.*;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
@ -52,38 +50,84 @@ import java.util.Map;
public class HeliumOnlineRegistry extends HeliumRegistry {
Logger logger = LoggerFactory.getLogger(HeliumOnlineRegistry.class);
private final Gson gson;
private final File registryCacheFile;
public HeliumOnlineRegistry(String name, String uri) {
public HeliumOnlineRegistry(String name, String uri, File registryCacheDir) {
super(name, uri);
registryCacheDir.mkdirs();
this.registryCacheFile = new File(registryCacheDir, name);
gson = new Gson();
}
@Override
public List<HeliumPackage> getAll() throws IOException {
public synchronized List<HeliumPackage> getAll() throws IOException {
HttpClient client = HttpClientBuilder.create()
.setUserAgent("ApacheZeppelin/" + Util.getVersion())
.build();
HttpGet get = new HttpGet(uri());
HttpResponse response = client.execute(get);
if (response.getStatusLine().getStatusCode() != 200) {
throw new IOException(uri() + " returned " + response.getStatusLine().toString());
HttpResponse response;
try {
response = client.execute(get);
} catch (Exception e) {
logger.error(e.getMessage());
return readFromCache();
}
BufferedReader reader = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
List<Map<String, Map<String, HeliumPackage>>> packages = gson.fromJson(
reader,
new TypeToken<List<Map<String, Map<String, HeliumPackage>>>>() {
}.getType());
reader.close();
List<HeliumPackage> packageList = new LinkedList<>();
if (response.getStatusLine().getStatusCode() != 200) {
// try read from cache
logger.error(uri() + " returned " + response.getStatusLine().toString());
return readFromCache();
} else {
List<HeliumPackage> packageList = new LinkedList<>();
for (Map<String, Map<String, HeliumPackage>> pkg : packages) {
for (Map<String, HeliumPackage> versions : pkg.values()) {
packageList.addAll(versions.values());
BufferedReader reader;
reader = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
List<Map<String, Map<String, HeliumPackage>>> packages = gson.fromJson(
reader,
new TypeToken<List<Map<String, Map<String, HeliumPackage>>>>() {
}.getType());
reader.close();
for (Map<String, Map<String, HeliumPackage>> pkg : packages) {
for (Map<String, HeliumPackage> versions : pkg.values()) {
packageList.addAll(versions.values());
}
}
writeToCache(packageList);
return packageList;
}
}
private List<HeliumPackage> readFromCache() {
synchronized (registryCacheFile) {
if (registryCacheFile.isFile()) {
try {
return gson.fromJson(
new FileReader(registryCacheFile),
new TypeToken<List<HeliumPackage>>() {
}.getType());
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
return new LinkedList<>();
}
} else {
return new LinkedList<>();
}
}
return packageList;
}
private void writeToCache(List<HeliumPackage> pkg) throws IOException {
synchronized (registryCacheFile) {
if (registryCacheFile.exists()) {
registryCacheFile.delete();
}
String jsonToCache = gson.toJson(pkg);
FileUtils.writeStringToFile(registryCacheFile, jsonToCache);
}
}
}

View file

@ -53,7 +53,7 @@ public class HeliumTest {
// given
File heliumConf = new File(tmpDir, "helium.conf");
Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(),
null, null);
null, null, null);
assertFalse(heliumConf.exists());
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
helium.addRegistry(registry1);
@ -68,7 +68,7 @@ public class HeliumTest {
// then
Helium heliumRestored = new Helium(
heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null);
heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
assertEquals(2, heliumRestored.getAllRegistry().size());
}
@ -76,7 +76,7 @@ public class HeliumTest {
public void testRestoreRegistryInstances() throws IOException, URISyntaxException, TaskRunnerException {
File heliumConf = new File(tmpDir, "helium.conf");
Helium helium = new Helium(
heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null);
heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2");
helium.addRegistry(registry1);