mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' into ZEPPELIN-960
This commit is contained in:
commit
1428795f80
9 changed files with 124 additions and 17 deletions
|
|
@ -35,6 +35,7 @@ REM set ZEPPELIN_IDENT_STRING REM A string representing this instance of zep
|
|||
REM set ZEPPELIN_NICENESS REM The scheduling priority for daemons. Defaults to 0.
|
||||
REM set ZEPPELIN_INTERPRETER_LOCALREPO REM Local repository for interpreter's additional dependency loading
|
||||
REM set ZEPPELIN_NOTEBOOK_STORAGE REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
|
||||
REM set ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC REM If there are multiple notebook storages, should we treat the first one as the only source of truth?
|
||||
|
||||
|
||||
REM Spark interpreter configuration
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@
|
|||
# export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0.
|
||||
# export ZEPPELIN_INTERPRETER_LOCALREPO # Local repository for interpreter's additional dependency loading
|
||||
# export ZEPPELIN_NOTEBOOK_STORAGE # Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
|
||||
# export ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC # If there are multiple notebook storages, should we treat the first one as the only source of truth?
|
||||
|
||||
#### Spark interpreter configuration ####
|
||||
|
||||
|
|
|
|||
|
|
@ -164,6 +164,12 @@
|
|||
<description>notebook persistence layer implementation</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.notebook.one.way.sync</name>
|
||||
<value>false</value>
|
||||
<description>If there are multiple notebook storages, should we treat the first one as the only source of truth?</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.dir</name>
|
||||
<value>interpreter</value>
|
||||
|
|
|
|||
|
|
@ -374,6 +374,12 @@ You can configure Apache Zeppelin with both **environment variables** in `conf/z
|
|||
<td>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</td>
|
||||
<td>Comma separated list of notebook storage</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC</td>
|
||||
<td>zeppelin.notebook.one.way.sync</td>
|
||||
<td>false</td>
|
||||
<td>If there are multiple notebook storages, should we treat the first one as the only source of truth?</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>ZEPPELIN_INTERPRETERS</td>
|
||||
<td>zeppelin.interpreters</td>
|
||||
|
|
|
|||
|
|
@ -110,10 +110,10 @@ object RInterpreter {
|
|||
|
||||
// These are the additional properties we need on top of the ones provided by the spark interpreters
|
||||
lazy val props: Map[String, InterpreterProperty] = new InterpreterPropertyBuilder()
|
||||
.add("rhadoop.cmd", SparkInterpreter.getSystemDefault("rhadoop.cmd", "HADOOP_CMD", ""), "Usually /usr/bin/hadoop")
|
||||
.add("rhadooop.streamingjar", SparkInterpreter.getSystemDefault("rhadoop.cmd", "HADOOP_STREAMING", ""), "Usually /usr/lib/hadoop/contrib/streaming/hadoop-streaming-<version>.jar")
|
||||
.add("rscala.debug", SparkInterpreter.getSystemDefault("rscala.debug","RSCALA_DEBUG", "false"), "Whether to turn on rScala debugging") // TEST: Implemented but not tested
|
||||
.add("rscala.timeout", SparkInterpreter.getSystemDefault("rscala.timeout","RSCALA_TIMEOUT", "60"), "Timeout for rScala") // TEST: Implemented but not tested
|
||||
.add("rhadoop.cmd", SparkInterpreter.getSystemDefault("HADOOP_CMD", "rhadoop.cmd", ""), "Usually /usr/bin/hadoop")
|
||||
.add("rhadooop.streamingjar", SparkInterpreter.getSystemDefault("HADOOP_STREAMING", "rhadooop.streamingjar", ""), "Usually /usr/lib/hadoop/contrib/streaming/hadoop-streaming-<version>.jar")
|
||||
.add("rscala.debug", SparkInterpreter.getSystemDefault("RSCALA_DEBUG", "rscala.debug","false"), "Whether to turn on rScala debugging") // TEST: Implemented but not tested
|
||||
.add("rscala.timeout", SparkInterpreter.getSystemDefault("RSCALA_TIMEOUT", "rscala.timeout","60"), "Timeout for rScala") // TEST: Implemented but not tested
|
||||
.build
|
||||
|
||||
def getProps() = {
|
||||
|
|
|
|||
|
|
@ -543,6 +543,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
|
||||
ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
|
||||
ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
|
||||
ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
|
||||
ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner",
|
||||
System.getProperty("os.name")
|
||||
.startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"),
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ import com.google.gson.reflect.TypeToken;
|
|||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.commons.lang.NullArgumentException;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.sonatype.aether.RepositoryException;
|
||||
|
|
@ -345,6 +346,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
InputStreamReader isr = new InputStreamReader(fis);
|
||||
BufferedReader bufferedReader = new BufferedReader(isr);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
InterpreterSetting interpreterSettingObject;
|
||||
String depClassPath = StringUtils.EMPTY;
|
||||
String line;
|
||||
while ((line = bufferedReader.readLine()) != null) {
|
||||
sb.append(line);
|
||||
|
|
@ -365,9 +368,14 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
setting.getOption().setRemote(true);
|
||||
|
||||
// Update transient information from InterpreterSettingRef
|
||||
// TODO(jl): Check if reference of setting is null
|
||||
|
||||
setting.setPath(interpreterSettingsRef.get(setting.getGroup()).getPath());
|
||||
interpreterSettingObject = interpreterSettingsRef.get(setting.getGroup());
|
||||
if (interpreterSettingObject == null) {
|
||||
logger.warn("can't get InterpreterSetting " +
|
||||
"Information From loaded Interpreter Setting Ref - {} ", setting.getGroup());
|
||||
continue;
|
||||
}
|
||||
depClassPath = interpreterSettingObject.getPath();
|
||||
setting.setPath(depClassPath);
|
||||
|
||||
setting.setInterpreterGroupFactory(this);
|
||||
loadInterpreterDependencies(setting);
|
||||
|
|
|
|||
|
|
@ -44,11 +44,13 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
private static final int maxRepoNum = 2;
|
||||
private static final String pushKey = "pushNoteIDs";
|
||||
private static final String pullKey = "pullNoteIDs";
|
||||
private static final String delDstKey = "delDstNoteIDs";
|
||||
|
||||
private static ZeppelinConfiguration config;
|
||||
private static final String defaultStorage = "org.apache.zeppelin.notebook.repo.VFSNotebookRepo";
|
||||
|
||||
private List<NotebookRepo> repos = new ArrayList<NotebookRepo>();
|
||||
private final boolean oneWaySync;
|
||||
|
||||
/**
|
||||
* @param noteIndex
|
||||
|
|
@ -58,6 +60,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
@SuppressWarnings("static-access")
|
||||
public NotebookRepoSync(ZeppelinConfiguration conf) {
|
||||
config = conf;
|
||||
oneWaySync = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC);
|
||||
String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
|
||||
if (allStorageClassNames.isEmpty()) {
|
||||
allStorageClassNames = defaultStorage;
|
||||
|
|
@ -182,6 +185,8 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
|
||||
List<String> pushNoteIDs = noteIDs.get(pushKey);
|
||||
List<String> pullNoteIDs = noteIDs.get(pullKey);
|
||||
List<String> delDstNoteIDs = noteIDs.get(delDstKey);
|
||||
|
||||
if (!pushNoteIDs.isEmpty()) {
|
||||
LOG.info("Notes with the following IDs will be pushed");
|
||||
for (String id : pushNoteIDs) {
|
||||
|
|
@ -202,6 +207,16 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
LOG.info("Nothing to pull");
|
||||
}
|
||||
|
||||
if (!delDstNoteIDs.isEmpty()) {
|
||||
LOG.info("Notes with the following IDs will be deleted from dest");
|
||||
for (String id : delDstNoteIDs) {
|
||||
LOG.info("ID : " + id);
|
||||
}
|
||||
deleteNotes(delDstNoteIDs, dstRepo);
|
||||
} else {
|
||||
LOG.info("Nothing to delete from dest");
|
||||
}
|
||||
|
||||
LOG.info("Sync ended");
|
||||
}
|
||||
|
||||
|
|
@ -216,6 +231,12 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
}
|
||||
}
|
||||
|
||||
private void deleteNotes(List<String> ids, NotebookRepo repo) throws IOException {
|
||||
for (String id : ids) {
|
||||
repo.remove(id, null);
|
||||
}
|
||||
}
|
||||
|
||||
public int getRepoCount() {
|
||||
return repos.size();
|
||||
}
|
||||
|
|
@ -237,6 +258,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
throws IOException {
|
||||
List <String> pushIDs = new ArrayList<String>();
|
||||
List <String> pullIDs = new ArrayList<String>();
|
||||
List <String> delDstIDs = new ArrayList<String>();
|
||||
|
||||
NoteInfo dnote;
|
||||
Date sdate, ddate;
|
||||
|
|
@ -246,14 +268,18 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
/* note exists in source and destination storage systems */
|
||||
sdate = lastModificationDate(sourceRepo.get(snote.getId(), null));
|
||||
ddate = lastModificationDate(destRepo.get(dnote.getId(), null));
|
||||
if (sdate.after(ddate)) {
|
||||
/* source contains more up to date note - push */
|
||||
pushIDs.add(snote.getId());
|
||||
LOG.info("Modified note is added to push list : " + sdate);
|
||||
} else if (sdate.compareTo(ddate) != 0) {
|
||||
/* destination contains more up to date note - pull */
|
||||
LOG.info("Modified note is added to pull list : " + ddate);
|
||||
pullIDs.add(snote.getId());
|
||||
|
||||
if (sdate.compareTo(ddate) != 0) {
|
||||
if (sdate.after(ddate) || oneWaySync) {
|
||||
/* if source contains more up to date note - push
|
||||
* if oneWaySync is enabled, always push no matter who's newer */
|
||||
pushIDs.add(snote.getId());
|
||||
LOG.info("Modified note is added to push list : " + sdate);
|
||||
} else {
|
||||
/* destination contains more up to date note - pull */
|
||||
LOG.info("Modified note is added to pull list : " + ddate);
|
||||
pullIDs.add(snote.getId());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* note exists in source storage, and absent in destination
|
||||
|
|
@ -266,14 +292,23 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
for (NoteInfo note : destNotes) {
|
||||
dnote = containsID(sourceNotes, note.getId());
|
||||
if (dnote == null) {
|
||||
/* note exists in destination storage, and absent in source - pull*/
|
||||
pullIDs.add(note.getId());
|
||||
/* note exists in destination storage, and absent in source */
|
||||
if (oneWaySync) {
|
||||
/* if oneWaySync is enabled, delete the note from destination */
|
||||
LOG.info("Extraneous note is added to delete dest list : " + note.getId());
|
||||
delDstIDs.add(note.getId());
|
||||
} else {
|
||||
/* if oneWaySync is disabled, pull the note from destination */
|
||||
LOG.info("Missing note is added to pull list : " + note.getId());
|
||||
pullIDs.add(note.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, List<String>> map = new HashMap<String, List<String>>();
|
||||
map.put(pushKey, pushIDs);
|
||||
map.put(pullKey, pullIDs);
|
||||
map.put(delDstKey, delDstIDs);
|
||||
return map;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -84,6 +84,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
|
|||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
|
||||
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.VFSNotebookRepo,org.apache.zeppelin.notebook.repo.mock.VFSNotebookRepoMock");
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "false");
|
||||
LOG.info("main Note dir : " + mainNotePath);
|
||||
LOG.info("secondary note dir : " + secNotePath);
|
||||
conf = ZeppelinConfiguration.create();
|
||||
|
|
@ -220,6 +221,54 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
|
|||
assertEquals(1, notebookRepoSync.list(1, null).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOneWaySyncOnReloadedList() throws IOException, SchedulerException {
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "true");
|
||||
conf = ZeppelinConfiguration.create();
|
||||
notebookRepoSync = new NotebookRepoSync(conf);
|
||||
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search,
|
||||
notebookAuthorization, credentials);
|
||||
|
||||
// check that both storage repos are empty
|
||||
assertTrue(notebookRepoSync.getRepoCount() > 1);
|
||||
assertEquals(0, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(0, notebookRepoSync.list(1, null).size());
|
||||
|
||||
File srcDir = new File("src/test/resources/2A94M5J1Z");
|
||||
File destDir = new File(secNotebookDir + "/2A94M5J1Z");
|
||||
|
||||
// copy manually new notebook into secondary storage repo and check repos
|
||||
try {
|
||||
FileUtils.copyDirectory(srcDir, destDir);
|
||||
} catch (IOException e) {
|
||||
LOG.error(e.toString(), e);
|
||||
}
|
||||
assertEquals(0, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(1, notebookRepoSync.list(1, null).size());
|
||||
|
||||
// after reloading the notebook should be wiped from secondary storage
|
||||
notebookSync.reloadAllNotes(null);
|
||||
assertEquals(0, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(0, notebookRepoSync.list(1, null).size());
|
||||
|
||||
destDir = new File(mainNotebookDir + "/2A94M5J1Z");
|
||||
|
||||
// copy manually new notebook into primary storage repo and check repos
|
||||
try {
|
||||
FileUtils.copyDirectory(srcDir, destDir);
|
||||
} catch (IOException e) {
|
||||
LOG.error(e.toString(), e);
|
||||
}
|
||||
assertEquals(1, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(0, notebookRepoSync.list(1, null).size());
|
||||
|
||||
// after reloading notebooks repos should be synchronized
|
||||
notebookSync.reloadAllNotes(null);
|
||||
assertEquals(1, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(1, notebookRepoSync.list(1, null).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckpointOneStorage() throws IOException, SchedulerException {
|
||||
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.GitNotebookRepo");
|
||||
|
|
|
|||
Loading…
Reference in a new issue