mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Implement one-way sync for notebook repos
This commit is contained in:
parent
73f1e48593
commit
566b0ed445
7 changed files with 106 additions and 10 deletions
|
|
@ -35,6 +35,7 @@ REM set ZEPPELIN_IDENT_STRING REM A string representing this instance of zep
|
|||
REM set ZEPPELIN_NICENESS REM The scheduling priority for daemons. Defaults to 0.
|
||||
REM set ZEPPELIN_INTERPRETER_LOCALREPO REM Local repository for interpreter's additional dependency loading
|
||||
REM set ZEPPELIN_NOTEBOOK_STORAGE REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
|
||||
REM set ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC REM If there are multiple notebook storages, should we treat the first one as the only source of truth?
|
||||
|
||||
|
||||
REM Spark interpreter configuration
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@
|
|||
# export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0.
|
||||
# export ZEPPELIN_INTERPRETER_LOCALREPO # Local repository for interpreter's additional dependency loading
|
||||
# export ZEPPELIN_NOTEBOOK_STORAGE # Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
|
||||
# export ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC # If there are multiple notebook storages, should we treat the first one as the only source of truth?
|
||||
|
||||
#### Spark interpreter configuration ####
|
||||
|
||||
|
|
|
|||
|
|
@ -164,6 +164,12 @@
|
|||
<description>notebook persistence layer implementation</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.notebook.one.way.sync</name>
|
||||
<value>false</value>
|
||||
<description>If there are multiple notebook storages, should we treat the first one as the only source of truth?</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.dir</name>
|
||||
<value>interpreter</value>
|
||||
|
|
|
|||
|
|
@ -373,6 +373,12 @@ You can configure Apache Zeppelin with both **environment variables** in `conf/z
|
|||
<td>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</td>
|
||||
<td>Comma separated list of notebook storage</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC</td>
|
||||
<td>zeppelin.notebook.one.way.sync</td>
|
||||
<td>false</td>
|
||||
<td>If there are multiple notebook storages, should we treat the first one as the only source of truth?</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>ZEPPELIN_INTERPRETERS</td>
|
||||
<td>zeppelin.interpreters</td>
|
||||
|
|
|
|||
|
|
@ -542,6 +542,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
|
||||
ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
|
||||
ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
|
||||
ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
|
||||
ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner",
|
||||
System.getProperty("os.name")
|
||||
.startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"),
|
||||
|
|
|
|||
|
|
@ -44,11 +44,13 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
private static final int maxRepoNum = 2;
|
||||
private static final String pushKey = "pushNoteIDs";
|
||||
private static final String pullKey = "pullNoteIDs";
|
||||
private static final String delDstKey = "delDstNoteIDs";
|
||||
|
||||
private static ZeppelinConfiguration config;
|
||||
private static final String defaultStorage = "org.apache.zeppelin.notebook.repo.VFSNotebookRepo";
|
||||
|
||||
private List<NotebookRepo> repos = new ArrayList<NotebookRepo>();
|
||||
private final boolean oneWaySync;
|
||||
|
||||
/**
|
||||
* @param noteIndex
|
||||
|
|
@ -58,6 +60,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
@SuppressWarnings("static-access")
|
||||
public NotebookRepoSync(ZeppelinConfiguration conf) {
|
||||
config = conf;
|
||||
oneWaySync = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC);
|
||||
String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
|
||||
if (allStorageClassNames.isEmpty()) {
|
||||
allStorageClassNames = defaultStorage;
|
||||
|
|
@ -182,6 +185,8 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
|
||||
List<String> pushNoteIDs = noteIDs.get(pushKey);
|
||||
List<String> pullNoteIDs = noteIDs.get(pullKey);
|
||||
List<String> delDstNoteIDs = noteIDs.get(delDstKey);
|
||||
|
||||
if (!pushNoteIDs.isEmpty()) {
|
||||
LOG.info("Notes with the following IDs will be pushed");
|
||||
for (String id : pushNoteIDs) {
|
||||
|
|
@ -202,6 +207,16 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
LOG.info("Nothing to pull");
|
||||
}
|
||||
|
||||
if (!delDstNoteIDs.isEmpty()) {
|
||||
LOG.info("Notes with the following IDs will be deleted from dest");
|
||||
for (String id : delDstNoteIDs) {
|
||||
LOG.info("ID : " + id);
|
||||
}
|
||||
deleteNotes(delDstNoteIDs, dstRepo);
|
||||
} else {
|
||||
LOG.info("Nothing to delete from dest");
|
||||
}
|
||||
|
||||
LOG.info("Sync ended");
|
||||
}
|
||||
|
||||
|
|
@ -216,6 +231,12 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
}
|
||||
}
|
||||
|
||||
private void deleteNotes(List<String> ids, NotebookRepo repo) throws IOException {
|
||||
for (String id : ids) {
|
||||
repo.remove(id, null);
|
||||
}
|
||||
}
|
||||
|
||||
public int getRepoCount() {
|
||||
return repos.size();
|
||||
}
|
||||
|
|
@ -237,6 +258,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
throws IOException {
|
||||
List <String> pushIDs = new ArrayList<String>();
|
||||
List <String> pullIDs = new ArrayList<String>();
|
||||
List <String> delDstIDs = new ArrayList<String>();
|
||||
|
||||
NoteInfo dnote;
|
||||
Date sdate, ddate;
|
||||
|
|
@ -246,14 +268,17 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
/* note exists in source and destination storage systems */
|
||||
sdate = lastModificationDate(sourceRepo.get(snote.getId(), null));
|
||||
ddate = lastModificationDate(destRepo.get(dnote.getId(), null));
|
||||
if (sdate.after(ddate)) {
|
||||
/* source contains more up to date note - push */
|
||||
pushIDs.add(snote.getId());
|
||||
LOG.info("Modified note is added to push list : " + sdate);
|
||||
} else if (sdate.compareTo(ddate) != 0) {
|
||||
/* destination contains more up to date note - pull */
|
||||
LOG.info("Modified note is added to pull list : " + ddate);
|
||||
pullIDs.add(snote.getId());
|
||||
|
||||
if (sdate.compareTo(ddate) != 0) {
|
||||
if (sdate.after(ddate) || oneWaySync) {
|
||||
/* source contains more up to date note - push */
|
||||
pushIDs.add(snote.getId());
|
||||
LOG.info("Modified note is added to push list : " + sdate);
|
||||
} else {
|
||||
/* destination contains more up to date note - pull */
|
||||
LOG.info("Modified note is added to pull list : " + ddate);
|
||||
pullIDs.add(snote.getId());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* note exists in source storage, and absent in destination
|
||||
|
|
@ -266,14 +291,21 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
for (NoteInfo note : destNotes) {
|
||||
dnote = containsID(sourceNotes, note.getId());
|
||||
if (dnote == null) {
|
||||
/* note exists in destination storage, and absent in source - pull*/
|
||||
pullIDs.add(note.getId());
|
||||
/* note exists in destination storage, and absent in source */
|
||||
if (oneWaySync) {
|
||||
LOG.info("Extraneous note is added to delete dest list : " + note.getId());
|
||||
delDstIDs.add(note.getId());
|
||||
} else {
|
||||
LOG.info("Missing note is added to pull list : " + note.getId());
|
||||
pullIDs.add(note.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, List<String>> map = new HashMap<String, List<String>>();
|
||||
map.put(pushKey, pushIDs);
|
||||
map.put(pullKey, pullIDs);
|
||||
map.put(delDstKey, delDstIDs);
|
||||
return map;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -84,6 +84,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
|
|||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
|
||||
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.VFSNotebookRepo,org.apache.zeppelin.notebook.repo.mock.VFSNotebookRepoMock");
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "false");
|
||||
LOG.info("main Note dir : " + mainNotePath);
|
||||
LOG.info("secondary note dir : " + secNotePath);
|
||||
conf = ZeppelinConfiguration.create();
|
||||
|
|
@ -220,6 +221,54 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
|
|||
assertEquals(1, notebookRepoSync.list(1, null).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOneWaySyncOnReloadedList() throws IOException, SchedulerException {
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "true");
|
||||
conf = ZeppelinConfiguration.create();
|
||||
notebookRepoSync = new NotebookRepoSync(conf);
|
||||
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search,
|
||||
notebookAuthorization, credentials);
|
||||
|
||||
// check that both storage repos are empty
|
||||
assertTrue(notebookRepoSync.getRepoCount() > 1);
|
||||
assertEquals(0, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(0, notebookRepoSync.list(1, null).size());
|
||||
|
||||
File srcDir = new File("src/test/resources/2A94M5J1Z");
|
||||
File destDir = new File(secNotebookDir + "/2A94M5J1Z");
|
||||
|
||||
// copy manually new notebook into secondary storage repo and check repos
|
||||
try {
|
||||
FileUtils.copyDirectory(srcDir, destDir);
|
||||
} catch (IOException e) {
|
||||
LOG.error(e.toString(), e);
|
||||
}
|
||||
assertEquals(0, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(1, notebookRepoSync.list(1, null).size());
|
||||
|
||||
// after reloading the notebook should be wiped from secondary storage
|
||||
notebookSync.reloadAllNotes(null);
|
||||
assertEquals(0, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(0, notebookRepoSync.list(1, null).size());
|
||||
|
||||
destDir = new File(mainNotebookDir + "/2A94M5J1Z");
|
||||
|
||||
// copy manually new notebook into primary storage repo and check repos
|
||||
try {
|
||||
FileUtils.copyDirectory(srcDir, destDir);
|
||||
} catch (IOException e) {
|
||||
LOG.error(e.toString(), e);
|
||||
}
|
||||
assertEquals(1, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(0, notebookRepoSync.list(1, null).size());
|
||||
|
||||
// after reloading notebooks repos should be synchronized
|
||||
notebookSync.reloadAllNotes(null);
|
||||
assertEquals(1, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(1, notebookRepoSync.list(1, null).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckpointOneStorage() throws IOException, SchedulerException {
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.GitNotebookRepo");
|
||||
|
|
|
|||
Loading…
Reference in a new issue