Merge branch 'master' into ZEPPELIN-960

This commit is contained in:
CloverHearts 2016-08-17 11:16:36 +09:00
commit 1428795f80
9 changed files with 124 additions and 17 deletions

View file

@ -35,6 +35,7 @@ REM set ZEPPELIN_IDENT_STRING REM A string representing this instance of zep
REM set ZEPPELIN_NICENESS REM The scheduling priority for daemons. Defaults to 0.
REM set ZEPPELIN_INTERPRETER_LOCALREPO REM Local repository for interpreter's additional dependency loading
REM set ZEPPELIN_NOTEBOOK_STORAGE REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
REM set ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC REM If there are multiple notebook storages, should we treat the first one as the only source of truth?
REM Spark interpreter configuration

View file

@ -36,6 +36,7 @@
# export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0.
# export ZEPPELIN_INTERPRETER_LOCALREPO # Local repository for interpreter's additional dependency loading
# export ZEPPELIN_NOTEBOOK_STORAGE # Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
# export ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC # If there are multiple notebook storages, should we treat the first one as the only source of truth?
#### Spark interpreter configuration ####

View file

@ -164,6 +164,12 @@
<description>notebook persistence layer implementation</description>
</property>
<property>
<name>zeppelin.notebook.one.way.sync</name>
<value>false</value>
<description>If there are multiple notebook storages, should we treat the first one as the only source of truth?</description>
</property>
<property>
<name>zeppelin.interpreter.dir</name>
<value>interpreter</value>

View file

@ -374,6 +374,12 @@ You can configure Apache Zeppelin with both **environment variables** in `conf/z
<td>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</td>
<td>Comma separated list of notebook storage</td>
</tr>
<tr>
<td>ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC</td>
<td>zeppelin.notebook.one.way.sync</td>
<td>false</td>
<td>If there are multiple notebook storages, should we treat the first one as the only source of truth?</td>
</tr>
<tr>
<td>ZEPPELIN_INTERPRETERS</td>
<td>zeppelin.interpreters</td>

View file

@ -110,10 +110,10 @@ object RInterpreter {
// These are the additional properties we need on top of the ones provided by the spark interpreters
lazy val props: Map[String, InterpreterProperty] = new InterpreterPropertyBuilder()
.add("rhadoop.cmd", SparkInterpreter.getSystemDefault("rhadoop.cmd", "HADOOP_CMD", ""), "Usually /usr/bin/hadoop")
.add("rhadooop.streamingjar", SparkInterpreter.getSystemDefault("rhadoop.cmd", "HADOOP_STREAMING", ""), "Usually /usr/lib/hadoop/contrib/streaming/hadoop-streaming-<version>.jar")
.add("rscala.debug", SparkInterpreter.getSystemDefault("rscala.debug","RSCALA_DEBUG", "false"), "Whether to turn on rScala debugging") // TEST: Implemented but not tested
.add("rscala.timeout", SparkInterpreter.getSystemDefault("rscala.timeout","RSCALA_TIMEOUT", "60"), "Timeout for rScala") // TEST: Implemented but not tested
.add("rhadoop.cmd", SparkInterpreter.getSystemDefault("HADOOP_CMD", "rhadoop.cmd", ""), "Usually /usr/bin/hadoop")
.add("rhadooop.streamingjar", SparkInterpreter.getSystemDefault("HADOOP_STREAMING", "rhadooop.streamingjar", ""), "Usually /usr/lib/hadoop/contrib/streaming/hadoop-streaming-<version>.jar")
.add("rscala.debug", SparkInterpreter.getSystemDefault("RSCALA_DEBUG", "rscala.debug","false"), "Whether to turn on rScala debugging") // TEST: Implemented but not tested
.add("rscala.timeout", SparkInterpreter.getSystemDefault("RSCALA_TIMEOUT", "rscala.timeout","60"), "Timeout for rScala") // TEST: Implemented but not tested
.build
def getProps() = {

View file

@ -543,6 +543,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner",
System.getProperty("os.name")
.startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"),

View file

@ -55,6 +55,7 @@ import com.google.gson.reflect.TypeToken;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.NullArgumentException;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonatype.aether.RepositoryException;
@ -345,6 +346,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
InputStreamReader isr = new InputStreamReader(fis);
BufferedReader bufferedReader = new BufferedReader(isr);
StringBuilder sb = new StringBuilder();
InterpreterSetting interpreterSettingObject;
String depClassPath = StringUtils.EMPTY;
String line;
while ((line = bufferedReader.readLine()) != null) {
sb.append(line);
@ -365,9 +368,14 @@ public class InterpreterFactory implements InterpreterGroupFactory {
setting.getOption().setRemote(true);
// Update transient information from InterpreterSettingRef
// TODO(jl): Check if reference of setting is null
setting.setPath(interpreterSettingsRef.get(setting.getGroup()).getPath());
interpreterSettingObject = interpreterSettingsRef.get(setting.getGroup());
if (interpreterSettingObject == null) {
logger.warn("can't get InterpreterSetting " +
"Information From loaded Interpreter Setting Ref - {} ", setting.getGroup());
continue;
}
depClassPath = interpreterSettingObject.getPath();
setting.setPath(depClassPath);
setting.setInterpreterGroupFactory(this);
loadInterpreterDependencies(setting);

View file

@ -44,11 +44,13 @@ public class NotebookRepoSync implements NotebookRepo {
private static final int maxRepoNum = 2;
private static final String pushKey = "pushNoteIDs";
private static final String pullKey = "pullNoteIDs";
private static final String delDstKey = "delDstNoteIDs";
private static ZeppelinConfiguration config;
private static final String defaultStorage = "org.apache.zeppelin.notebook.repo.VFSNotebookRepo";
private List<NotebookRepo> repos = new ArrayList<NotebookRepo>();
private final boolean oneWaySync;
/**
* @param noteIndex
@ -58,6 +60,7 @@ public class NotebookRepoSync implements NotebookRepo {
@SuppressWarnings("static-access")
public NotebookRepoSync(ZeppelinConfiguration conf) {
config = conf;
oneWaySync = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC);
String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
if (allStorageClassNames.isEmpty()) {
allStorageClassNames = defaultStorage;
@ -182,6 +185,8 @@ public class NotebookRepoSync implements NotebookRepo {
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
List<String> pushNoteIDs = noteIDs.get(pushKey);
List<String> pullNoteIDs = noteIDs.get(pullKey);
List<String> delDstNoteIDs = noteIDs.get(delDstKey);
if (!pushNoteIDs.isEmpty()) {
LOG.info("Notes with the following IDs will be pushed");
for (String id : pushNoteIDs) {
@ -202,6 +207,16 @@ public class NotebookRepoSync implements NotebookRepo {
LOG.info("Nothing to pull");
}
if (!delDstNoteIDs.isEmpty()) {
LOG.info("Notes with the following IDs will be deleted from dest");
for (String id : delDstNoteIDs) {
LOG.info("ID : " + id);
}
deleteNotes(delDstNoteIDs, dstRepo);
} else {
LOG.info("Nothing to delete from dest");
}
LOG.info("Sync ended");
}
@ -216,6 +231,12 @@ public class NotebookRepoSync implements NotebookRepo {
}
}
private void deleteNotes(List<String> ids, NotebookRepo repo) throws IOException {
for (String id : ids) {
repo.remove(id, null);
}
}
public int getRepoCount() {
return repos.size();
}
@ -237,6 +258,7 @@ public class NotebookRepoSync implements NotebookRepo {
throws IOException {
List <String> pushIDs = new ArrayList<String>();
List <String> pullIDs = new ArrayList<String>();
List <String> delDstIDs = new ArrayList<String>();
NoteInfo dnote;
Date sdate, ddate;
@ -246,14 +268,18 @@ public class NotebookRepoSync implements NotebookRepo {
/* note exists in source and destination storage systems */
sdate = lastModificationDate(sourceRepo.get(snote.getId(), null));
ddate = lastModificationDate(destRepo.get(dnote.getId(), null));
if (sdate.after(ddate)) {
/* source contains more up to date note - push */
pushIDs.add(snote.getId());
LOG.info("Modified note is added to push list : " + sdate);
} else if (sdate.compareTo(ddate) != 0) {
/* destination contains more up to date note - pull */
LOG.info("Modified note is added to pull list : " + ddate);
pullIDs.add(snote.getId());
if (sdate.compareTo(ddate) != 0) {
if (sdate.after(ddate) || oneWaySync) {
/* if source contains more up to date note - push
* if oneWaySync is enabled, always push no matter who's newer */
pushIDs.add(snote.getId());
LOG.info("Modified note is added to push list : " + sdate);
} else {
/* destination contains more up to date note - pull */
LOG.info("Modified note is added to pull list : " + ddate);
pullIDs.add(snote.getId());
}
}
} else {
/* note exists in source storage, and absent in destination
@ -266,14 +292,23 @@ public class NotebookRepoSync implements NotebookRepo {
for (NoteInfo note : destNotes) {
dnote = containsID(sourceNotes, note.getId());
if (dnote == null) {
/* note exists in destination storage, and absent in source - pull*/
pullIDs.add(note.getId());
/* note exists in destination storage, and absent in source */
if (oneWaySync) {
/* if oneWaySync is enabled, delete the note from destination */
LOG.info("Extraneous note is added to delete dest list : " + note.getId());
delDstIDs.add(note.getId());
} else {
/* if oneWaySync is disabled, pull the note from destination */
LOG.info("Missing note is added to pull list : " + note.getId());
pullIDs.add(note.getId());
}
}
}
Map<String, List<String>> map = new HashMap<String, List<String>>();
map.put(pushKey, pushIDs);
map.put(pullKey, pullIDs);
map.put(delDstKey, delDstIDs);
return map;
}

View file

@ -84,6 +84,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.VFSNotebookRepo,org.apache.zeppelin.notebook.repo.mock.VFSNotebookRepoMock");
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "false");
LOG.info("main Note dir : " + mainNotePath);
LOG.info("secondary note dir : " + secNotePath);
conf = ZeppelinConfiguration.create();
@ -220,6 +221,54 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
assertEquals(1, notebookRepoSync.list(1, null).size());
}
@Test
public void testOneWaySyncOnReloadedList() throws IOException, SchedulerException {
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "true");
conf = ZeppelinConfiguration.create();
notebookRepoSync = new NotebookRepoSync(conf);
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search,
notebookAuthorization, credentials);
// check that both storage repos are empty
assertTrue(notebookRepoSync.getRepoCount() > 1);
assertEquals(0, notebookRepoSync.list(0, null).size());
assertEquals(0, notebookRepoSync.list(1, null).size());
File srcDir = new File("src/test/resources/2A94M5J1Z");
File destDir = new File(secNotebookDir + "/2A94M5J1Z");
// copy manually new notebook into secondary storage repo and check repos
try {
FileUtils.copyDirectory(srcDir, destDir);
} catch (IOException e) {
LOG.error(e.toString(), e);
}
assertEquals(0, notebookRepoSync.list(0, null).size());
assertEquals(1, notebookRepoSync.list(1, null).size());
// after reloading the notebook should be wiped from secondary storage
notebookSync.reloadAllNotes(null);
assertEquals(0, notebookRepoSync.list(0, null).size());
assertEquals(0, notebookRepoSync.list(1, null).size());
destDir = new File(mainNotebookDir + "/2A94M5J1Z");
// copy manually new notebook into primary storage repo and check repos
try {
FileUtils.copyDirectory(srcDir, destDir);
} catch (IOException e) {
LOG.error(e.toString(), e);
}
assertEquals(1, notebookRepoSync.list(0, null).size());
assertEquals(0, notebookRepoSync.list(1, null).size());
// after reloading notebooks repos should be synchronized
notebookSync.reloadAllNotes(null);
assertEquals(1, notebookRepoSync.list(0, null).size());
assertEquals(1, notebookRepoSync.list(1, null).size());
}
@Test
public void testCheckpointOneStorage() throws IOException, SchedulerException {
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.GitNotebookRepo");