mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Small cleanup: JavaDoc, annotations, warning for NotebookRepos
This commit is contained in:
parent
c39bdb836a
commit
cf0b4bc1dc
3 changed files with 61 additions and 59 deletions
|
|
@ -60,8 +60,6 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Main class of Zeppelin.
|
||||
*
|
||||
* @author Leemoonsoo
|
||||
*
|
||||
*/
|
||||
|
||||
public class ZeppelinServer extends Application {
|
||||
|
|
|
|||
|
|
@ -36,14 +36,15 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Notebook repository sync with remote storage
|
||||
*/
|
||||
public class NotebookRepoSync implements NotebookRepo{
|
||||
private List<NotebookRepo> repos = new ArrayList<NotebookRepo>();
|
||||
public class NotebookRepoSync implements NotebookRepo {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSync.class);
|
||||
private static final int maxRepoNum = 2;
|
||||
private static final String pushKey = "pushNoteIDs";
|
||||
private static final String pullKey = "pullNoteIDs";
|
||||
private static ZeppelinConfiguration config;
|
||||
|
||||
private List<NotebookRepo> repos = new ArrayList<NotebookRepo>();
|
||||
|
||||
/**
|
||||
* @param (conf)
|
||||
* @throws - Exception
|
||||
|
|
@ -51,18 +52,19 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
public NotebookRepoSync(ZeppelinConfiguration conf) throws Exception {
|
||||
|
||||
config = conf;
|
||||
|
||||
|
||||
String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
|
||||
if (allStorageClassNames.isEmpty()) {
|
||||
throw new IOException("Empty ZEPPELIN_NOTEBOOK_STORAGE conf parameter");
|
||||
}
|
||||
String[] storageClassNames = allStorageClassNames.split(",");
|
||||
if (storageClassNames.length > getMaxRepoNum()) {
|
||||
throw new IOException("Unsupported number of storage classes (" +
|
||||
throw new IOException("Unsupported number of storage classes (" +
|
||||
storageClassNames.length + ") in ZEPPELIN_NOTEBOOK_STORAGE");
|
||||
}
|
||||
|
||||
for (int i = 0; i < storageClassNames.length; i++) {
|
||||
@SuppressWarnings("static-access")
|
||||
Class<?> notebookStorageClass = getClass().forName(storageClassNames[i].trim());
|
||||
Constructor<?> constructor = notebookStorageClass.getConstructor(
|
||||
ZeppelinConfiguration.class);
|
||||
|
|
@ -73,20 +75,26 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
}
|
||||
}
|
||||
|
||||
/* by default lists from first repository */
|
||||
/**
|
||||
* Lists Notebooks from the first repository
|
||||
*/
|
||||
@Override
|
||||
public List<NoteInfo> list() throws IOException {
|
||||
if (config.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE) && getRepoCount() > 1) {
|
||||
sync(0, 1);
|
||||
}
|
||||
return getRepo(0).list();
|
||||
}
|
||||
|
||||
|
||||
/* list from specific repo (for tests) */
|
||||
List<NoteInfo> list(int repoIndex) throws IOException {
|
||||
return getRepo(repoIndex).list();
|
||||
}
|
||||
|
||||
/* by default returns from first repository */
|
||||
/**
|
||||
* Returns from Notebook from the first repository
|
||||
*/
|
||||
@Override
|
||||
public Note get(String noteId) throws IOException {
|
||||
return getRepo(0).get(noteId);
|
||||
}
|
||||
|
|
@ -95,8 +103,11 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
Note get(int repoIndex, String noteId) throws IOException {
|
||||
return getRepo(repoIndex).get(noteId);
|
||||
}
|
||||
|
||||
/* by default saves to all repos */
|
||||
|
||||
/**
|
||||
* Saves to all repositories
|
||||
*/
|
||||
@Override
|
||||
public void save(Note note) throws IOException {
|
||||
getRepo(0).save(note);
|
||||
if (getRepoCount() > 1) {
|
||||
|
|
@ -114,6 +125,7 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
getRepo(repoIndex).save(note);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String noteId) throws IOException {
|
||||
for (NotebookRepo repo : repos) {
|
||||
repo.remove(noteId);
|
||||
|
|
@ -122,16 +134,16 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
}
|
||||
|
||||
/**
|
||||
* copy new/updated notes from source to destination storage
|
||||
* copy new/updated notes from source to destination storage
|
||||
* @throws IOException
|
||||
*/
|
||||
public void sync(int sourceRepoIndex, int destRepoIndex) throws IOException {
|
||||
void sync(int sourceRepoIndex, int destRepoIndex) throws IOException {
|
||||
LOG.info("Sync started");
|
||||
NotebookRepo sourceRepo = getRepo(sourceRepoIndex);
|
||||
NotebookRepo destRepo = getRepo(destRepoIndex);
|
||||
List <NoteInfo> sourceNotes = sourceRepo.list();
|
||||
List <NoteInfo> destNotes = destRepo.list();
|
||||
|
||||
|
||||
Map<String, List<String>> noteIDs = notesCheckDiff(sourceNotes,
|
||||
sourceRepo,
|
||||
destNotes,
|
||||
|
|
@ -147,7 +159,7 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
} else {
|
||||
LOG.info("Nothing to push");
|
||||
}
|
||||
|
||||
|
||||
if (!pullNoteIDs.isEmpty()) {
|
||||
LOG.info("Notes with the following IDs will be pulled");
|
||||
for (String id : pullNoteIDs) {
|
||||
|
|
@ -157,16 +169,16 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
} else {
|
||||
LOG.info("Nothing to pull");
|
||||
}
|
||||
|
||||
|
||||
LOG.info("Sync ended");
|
||||
}
|
||||
|
||||
public void sync() throws IOException {
|
||||
sync(0, 1);
|
||||
}
|
||||
|
||||
|
||||
private void pushNotes(List<String> ids, NotebookRepo localRepo,
|
||||
NotebookRepo remoteRepo) throws IOException {
|
||||
NotebookRepo remoteRepo) throws IOException {
|
||||
for (String id : ids) {
|
||||
remoteRepo.save(localRepo.get(id));
|
||||
}
|
||||
|
|
@ -175,7 +187,7 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
int getRepoCount() {
|
||||
return repos.size();
|
||||
}
|
||||
|
||||
|
||||
int getMaxRepoNum() {
|
||||
return maxRepoNum;
|
||||
}
|
||||
|
|
@ -186,14 +198,13 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
}
|
||||
return repos.get(repoIndex);
|
||||
}
|
||||
|
||||
private Map<String, List<String>> notesCheckDiff(List <NoteInfo> sourceNotes,
|
||||
NotebookRepo sourceRepo,
|
||||
List <NoteInfo> destNotes,
|
||||
NotebookRepo destRepo) throws IOException {
|
||||
|
||||
private Map<String, List<String>> notesCheckDiff(List<NoteInfo> sourceNotes,
|
||||
NotebookRepo sourceRepo, List<NoteInfo> destNotes, NotebookRepo destRepo)
|
||||
throws IOException {
|
||||
List <String> pushIDs = new ArrayList<String>();
|
||||
List <String> pullIDs = new ArrayList<String>();
|
||||
|
||||
|
||||
NoteInfo dnote;
|
||||
Date sdate, ddate;
|
||||
for (NoteInfo snote : sourceNotes) {
|
||||
|
|
@ -218,7 +229,7 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
pushIDs.add(snote.getId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
for (NoteInfo note : destNotes) {
|
||||
dnote = containsID(sourceNotes, note.getId());
|
||||
if (dnote == null) {
|
||||
|
|
@ -226,14 +237,14 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
pullIDs.add(note.getId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Map<String, List<String>> map = new HashMap<String, List<String>>();
|
||||
map.put(pushKey, pushIDs);
|
||||
map.put(pullKey, pullIDs);
|
||||
return map;
|
||||
}
|
||||
|
||||
private NoteInfo containsID(List <NoteInfo> notes, String id) {
|
||||
private NoteInfo containsID(List <NoteInfo> notes, String id) {
|
||||
for (NoteInfo note : notes) {
|
||||
if (note.getId().equals(id)) {
|
||||
return note;
|
||||
|
|
@ -248,7 +259,7 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
private Date lastModificationDate(Note note) {
|
||||
Date latest = new Date(0L);
|
||||
Date tempCreated, tempStarted, tempFinished;
|
||||
|
||||
|
||||
for (Paragraph paragraph : note.getParagraphs()) {
|
||||
tempCreated = paragraph.getDateCreated();
|
||||
tempStarted = paragraph.getDateStarted();
|
||||
|
|
@ -266,7 +277,8 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
}
|
||||
return latest;
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private void printParagraphs(Note note) {
|
||||
LOG.info("Note name : " + note.getName());
|
||||
LOG.info("Note ID : " + note.id());
|
||||
|
|
@ -274,7 +286,7 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
printParagraph(p);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void printParagraph(Paragraph paragraph) {
|
||||
LOG.info("Date created : " + paragraph.getDateCreated());
|
||||
LOG.info("Date started : " + paragraph.getDateStarted());
|
||||
|
|
@ -282,7 +294,8 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
LOG.info("Paragraph ID : " + paragraph.getId());
|
||||
LOG.info("Paragraph title : " + paragraph.getTitle());
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private void printNoteInfos(List <NoteInfo> notes) {
|
||||
LOG.info("The following is a list of note infos");
|
||||
for (NoteInfo note : notes) {
|
||||
|
|
@ -295,7 +308,7 @@ public class NotebookRepoSync implements NotebookRepo{
|
|||
LOG.info("ID : " + note.getId());
|
||||
Map<String, Object> configs = note.getConfig();
|
||||
for (Map.Entry<String, Object> entry : configs.entrySet()) {
|
||||
LOG.info("Config Key = " + entry.getKey() + " , Value = " +
|
||||
LOG.info("Config Key = " + entry.getKey() + " , Value = " +
|
||||
entry.getValue().toString() + "of class " + entry.getClass());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.auth.AWSCredentialsProviderChain;
|
||||
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
|
|
@ -52,13 +51,10 @@ import com.google.gson.Gson;
|
|||
import com.google.gson.GsonBuilder;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author vgmartinez
|
||||
*
|
||||
* Backend for storing Notebooks on S3
|
||||
*/
|
||||
public class S3NotebookRepo implements NotebookRepo {
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(S3NotebookRepo.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(S3NotebookRepo.class);
|
||||
|
||||
// Use a credential provider chain so that instance profiles can be utilized
|
||||
// on an EC2 instance. The order of locations where credentials are searched
|
||||
|
|
@ -75,13 +71,11 @@ public class S3NotebookRepo implements NotebookRepo {
|
|||
// shared by all AWS SDKs and the AWS CLI
|
||||
// 4. Instance profile credentials delivered through the Amazon EC2 metadata service
|
||||
private AmazonS3 s3client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain());
|
||||
|
||||
private static String bucketName = "";
|
||||
private String user = "";
|
||||
|
||||
|
||||
|
||||
private ZeppelinConfiguration conf;
|
||||
|
||||
|
||||
public S3NotebookRepo(ZeppelinConfiguration conf) throws IOException {
|
||||
this.conf = conf;
|
||||
user = conf.getUser();
|
||||
|
|
@ -96,11 +90,11 @@ public class S3NotebookRepo implements NotebookRepo {
|
|||
ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
|
||||
.withBucketName(bucketName)
|
||||
.withPrefix(user + "/" + "notebook");
|
||||
ObjectListing objectListing;
|
||||
ObjectListing objectListing;
|
||||
do {
|
||||
objectListing = s3client.listObjects(listObjectsRequest);
|
||||
|
||||
for (S3ObjectSummary objectSummary :
|
||||
|
||||
for (S3ObjectSummary objectSummary :
|
||||
objectListing.getObjectSummaries()) {
|
||||
if (objectSummary.getKey().contains("note.json")) {
|
||||
try {
|
||||
|
|
@ -109,22 +103,21 @@ public class S3NotebookRepo implements NotebookRepo {
|
|||
infos.add(info);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("Can't read note ", e);
|
||||
LOG.error("Can't read note ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
listObjectsRequest.setMarker(objectListing.getNextMarker());
|
||||
} while (objectListing.isTruncated());
|
||||
} catch (AmazonServiceException ase) {
|
||||
|
||||
|
||||
} catch (AmazonClientException ace) {
|
||||
logger.info("Caught an AmazonClientException, " +
|
||||
LOG.info("Caught an AmazonClientException, " +
|
||||
"which means the client encountered " +
|
||||
"an internal error while trying to communicate" +
|
||||
" with S3, " +
|
||||
"such as not being able to access the network.");
|
||||
logger.info("Error Message: " + ace.getMessage());
|
||||
LOG.info("Error Message: " + ace.getMessage());
|
||||
}
|
||||
return infos;
|
||||
}
|
||||
|
|
@ -133,10 +126,10 @@ public class S3NotebookRepo implements NotebookRepo {
|
|||
GsonBuilder gsonBuilder = new GsonBuilder();
|
||||
gsonBuilder.setPrettyPrinting();
|
||||
Gson gson = gsonBuilder.create();
|
||||
|
||||
|
||||
S3Object s3object = s3client.getObject(new GetObjectRequest(
|
||||
bucketName, key));
|
||||
|
||||
|
||||
InputStream ins = s3object.getObjectContent();
|
||||
String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING));
|
||||
ins.close();
|
||||
|
|
@ -167,20 +160,18 @@ public class S3NotebookRepo implements NotebookRepo {
|
|||
Gson gson = gsonBuilder.create();
|
||||
String json = gson.toJson(note);
|
||||
String key = user + "/" + "notebook" + "/" + note.id() + "/" + "note.json";
|
||||
|
||||
|
||||
File file = File.createTempFile("note", "json");
|
||||
file.deleteOnExit();
|
||||
Writer writer = new OutputStreamWriter(new FileOutputStream(file));
|
||||
|
||||
|
||||
writer.write(json);
|
||||
writer.close();
|
||||
s3client.putObject(new PutObjectRequest(
|
||||
bucketName, key, file));
|
||||
s3client.putObject(new PutObjectRequest(bucketName, key, file));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void remove(String noteId) throws IOException {
|
||||
|
||||
String key = user + "/" + "notebook" + "/" + noteId;
|
||||
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
|
||||
.withBucketName(bucketName).withPrefix(key);
|
||||
|
|
|
|||
Loading…
Reference in a new issue