ZEPPELIN-1515. Notebook: HDFS as a backend storage (Read & Write Mode)

This commit is contained in:
Jeff Zhang 2017-06-30 14:48:22 +08:00
parent af18991367
commit b3e83abbd5
11 changed files with 418 additions and 90 deletions

View file

@ -67,6 +67,10 @@ if [[ -d "${ZEPPELIN_HOME}/zeppelin-server/target/classes" ]]; then
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-server/target/classes"
fi
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}"
fi
# Add jdbc connector jar
# ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/jdbc/jars/jdbc-connector-jar"

View file

@ -73,6 +73,10 @@ addJarInDir "${ZEPPELIN_HOME}/zeppelin-web/target/lib"
ZEPPELIN_CLASSPATH="$CLASSPATH:$ZEPPELIN_CLASSPATH"
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}"
fi
if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then
echo "Log dir doesn't exist, create ${ZEPPELIN_LOG_DIR}"
$(mkdir -p "${ZEPPELIN_LOG_DIR}")

View file

@ -173,6 +173,26 @@
</property>
-->
<!-- Notebook storage layer using hdfs file system
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.HdfsNotebookRepo</value>
<description>hdfs notebook persistence layer implementation</description>
</property>
<property>
<name>zeppelin.hdfs.keytab</name>
<value></value>
<description>keytab for accessing kerberized hdfs</description>
</property>
<property>
<name>zeppelin.hdfs.principal</name>
<value></value>
<description>principal for accessing kerberized hdfs</description>
</property>
-->
<!-- For connecting your Zeppelin with ZeppelinHub -->
<!--
<property>

View file

@ -30,6 +30,7 @@ There are few notebook storage systems available for a use out of the box:
* (default) use local file system and version it using local Git repository - `GitNotebookRepo`
* all notes are saved in the notebook folder in your local File System - `VFSNotebookRepo`
* all notes are saved in the notebook folder in hdfs - `HdfsNotebookRepo`
* storage using Amazon S3 service - `S3NotebookRepo`
* storage using Azure service - `AzureNotebookRepo`
* storage using MongoDB - `MongoNotebookRepo`
@ -51,6 +52,22 @@ To enable versioning for all your local notebooks though a standard Git reposito
</property>
```
</br>
## Notebook Storage in Hdfs repository <a name="Hdfs"></a>
Notes may be stored in hdfs, so that multiple Zeppelin instances can share the same notes. It supports all the versions of hadoop 2.x. If you use `HdfsNotebookRepo`, then `zeppelin.notebook.dir` is the path on hdfs. And you need to specify `HADOOP_CONF_DIR` in `zeppelin-env.sh` so that zeppelin can find the right hadoop configuration files.
If your hadoop cluster is kerberized, then you need to specify `zeppelin.hdfs.keytab` and `zeppelin.hdfs.principal`
```
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.HdfsNotebookRepo</value>
<description>hdfs notebook persistence layer implementation</description>
</property>
```
</br>
## Notebook Storage in S3 <a name="S3"></a>

View file

@ -195,101 +195,12 @@
<artifactId>websocket-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<!--
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jsp</artifactId>
<version>${jetty.version}</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet.jsp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>org.apache.jasper.glassfish</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet.jsp</artifactId>
<version>2.2.0.v201112011158</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
</exclusions>
</dependency>
-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop-common.version}</version>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-webdav</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jgit</groupId>
<artifactId>org.eclipse.jgit</artifactId>
</exclusion>
<exclusion>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>

View file

@ -36,6 +36,7 @@
<properties>
<!--library versions-->
<hadoop.version>2.6.0</hadoop.version>
<commons.lang3.version>3.4</commons.lang3.version>
<commons.vfs2.version>2.0</commons.vfs2.version>
<aws.sdk.s3.version>1.10.62</aws.sdk.s3.version>
@ -301,6 +302,71 @@
<version>1.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-webdav</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jgit</groupId>
<artifactId>org.eclipse.jgit</artifactId>
</exclusion>
<exclusion>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>

View file

@ -681,7 +681,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"),
ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", null),
ZEPPELIN_SERVER_STRICT_TRANSPORT("zeppelin.server.strict.transport", "max-age=631138519"),
ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1");
ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
ZEPPELIN_HDFS_KEYTAB("zeppelin.hdfs.keytab", ""),
ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", "");
private String varName;
@SuppressWarnings("rawtypes")

View file

@ -106,6 +106,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
public Note() {
generateId();
}
public Note(NotebookRepo repo, InterpreterFactory factory,

View file

@ -0,0 +1,200 @@
package org.apache.zeppelin.notebook.repo;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* NotebookRepos for hdfs.
*
* Assume the notebook directory structure is as following
* - notebookdir
* - noteId/note.json
* - noteId/note.json
* - noteId/note.json
*/
public class HdfsNotebookRepo implements NotebookRepo {
private static final Logger LOGGER = LoggerFactory.getLogger(HdfsNotebookRepo.class);
private Configuration hadoopConf;
private ZeppelinConfiguration zConf;
private boolean isSecurityEnabled = false;
private FileSystem fs;
private Path notebookDir;
public HdfsNotebookRepo(ZeppelinConfiguration zConf) throws IOException {
this.zConf = zConf;
this.hadoopConf = new Configuration();
this.notebookDir = new Path(zConf.getNotebookDir());
LOGGER.info("Use hdfs directory {} to store notebook", notebookDir);
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
if (isSecurityEnabled) {
String keytab = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HDFS_KEYTAB);
String principal = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HDFS_PRINCIPAL);
if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
throw new IOException("keytab and principal can not be empty, keytab: " + keytab
+ ", principal: " + principal);
}
UserGroupInformation.loginUserFromKeytab(principal, keytab);
}
this.fs = FileSystem.get(new Configuration());
if (!fs.exists(notebookDir)) {
fs.mkdirs(notebookDir);
LOGGER.info("Create notebook dir {} in hdfs", notebookDir.toString());
}
if (fs.isFile(notebookDir)) {
throw new IOException("notebookDir {} is file instead of directory, please remove it or " +
"specify another directory");
}
}
@Override
public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
return callHdfsOperation(new HdfsOperation<List<NoteInfo>>() {
@Override
public List<NoteInfo> call() throws IOException {
List<NoteInfo> noteInfos = new ArrayList<>();
for (FileStatus status : fs.globStatus(new Path(notebookDir, "*/note.json"))) {
NoteInfo noteInfo = new NoteInfo(status.getPath().getParent().getName(), "", null);
noteInfos.add(noteInfo);
}
return noteInfos;
}
});
}
@Override
public Note get(final String noteId, AuthenticationInfo subject) throws IOException {
return callHdfsOperation(new HdfsOperation<Note>() {
@Override
public Note call() throws IOException {
Path notePath = new Path(notebookDir.toString() + "/" + noteId + "/note.json");
LOGGER.debug("Read note from file: " + notePath);
ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
IOUtils.copyBytes(fs.open(notePath), noteBytes, hadoopConf);
return Note.fromJson(new String(noteBytes.toString(
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING))));
}
});
}
@Override
public void save(final Note note, AuthenticationInfo subject) throws IOException {
callHdfsOperation(new HdfsOperation<Void>() {
@Override
public Void call() throws IOException {
Path notePath = new Path(notebookDir.toString() + "/" + note.getId() + "/note.json");
Path tmpNotePath = new Path(notebookDir.toString() + "/" + note.getId() + "/.note.json");
LOGGER.debug("Saving note to file: " + notePath);
if (fs.exists(tmpNotePath)) {
fs.delete(tmpNotePath, true);
}
InputStream in = new ByteArrayInputStream(note.toJson().getBytes(
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
IOUtils.copyBytes(in, fs.create(tmpNotePath), hadoopConf);
fs.delete(notePath, true);
fs.rename(tmpNotePath, notePath);
return null;
}
});
}
@Override
public void remove(final String noteId, AuthenticationInfo subject) throws IOException {
callHdfsOperation(new HdfsOperation<Void>() {
@Override
public Void call() throws IOException {
Path noteFolder = new Path(notebookDir.toString() + "/" + noteId);
fs.delete(noteFolder, true);
return null;
}
});
}
@Override
public void close() {
LOGGER.warn("close is not implemented for HdfsNotebookRepo");
}
@Override
public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
throws IOException {
LOGGER.warn("checkpoint is not implemented for HdfsNotebookRepo");
return null;
}
@Override
public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
LOGGER.warn("get revId is not implemented for HdfsNotebookRepo");
return null;
}
@Override
public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
LOGGER.warn("revisionHistory is not implemented for HdfsNotebookRepo");
return null;
}
@Override
public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject)
throws IOException {
LOGGER.warn("setNoteRevision is not implemented for HdfsNotebookRepo");
return null;
}
@Override
public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject) {
LOGGER.warn("getSettings is not implemented for HdfsNotebookRepo");
return null;
}
@Override
public void updateSettings(Map<String, String> settings, AuthenticationInfo subject) {
LOGGER.warn("updateSettings is not implemented for HdfsNotebookRepo");
}
private interface HdfsOperation<T> {
T call() throws IOException;
}
public <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
if (isSecurityEnabled) {
UserGroupInformation.getLoginUser().reloginFromKeytab();
try {
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
return func.call();
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
} else {
return func.call();
}
}
}

View file

@ -81,6 +81,7 @@ public class NotebookRepoSync implements NotebookRepo {
Constructor<?> constructor = notebookStorageClass.getConstructor(
ZeppelinConfiguration.class);
repos.add((NotebookRepo) constructor.newInstance(conf));
LOG.info("Instantiate NotebookRepo: " + storageClassNames[i]);
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException |
InstantiationException | IllegalAccessException | IllegalArgumentException |
InvocationTargetException e) {

View file

@ -0,0 +1,101 @@
package org.apache.zeppelin.notebook.repo;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class HdfsNotebookRepoTest {
private ZeppelinConfiguration zConf;
private Configuration hadoopConf;
private FileSystem fs;
private HdfsNotebookRepo hdfsNotebookRepo;
private String notebookDir;
private AuthenticationInfo authInfo = AuthenticationInfo.ANONYMOUS;
@Before
public void setUp() throws IOException {
notebookDir = Files.createTempDirectory("HdfsNotebookRepoTest").toFile().getAbsolutePath();
zConf = new ZeppelinConfiguration();
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir);
hadoopConf = new Configuration();
fs = FileSystem.get(hadoopConf);
hdfsNotebookRepo = new HdfsNotebookRepo(zConf);
}
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(notebookDir));
}
@Test
public void testBasics() throws IOException {
assertEquals(0, hdfsNotebookRepo.list(authInfo).size());
// create a new note
Note note = new Note();
note.setName("title_1");
Map<String, Object> config = new HashMap<>();
config.put("config_1", "value_1");
note.setConfig(config);
hdfsNotebookRepo.save(note, authInfo);
assertEquals(1, hdfsNotebookRepo.list(authInfo).size());
// read this note from hdfs
Note note_copy = hdfsNotebookRepo.get(note.getId(), authInfo);
assertEquals(note.getName(), note_copy.getName());
assertEquals(note.getConfig(), note_copy.getConfig());
// update this note
note.setName("title_2");
hdfsNotebookRepo.save(note, authInfo);
assertEquals(1, hdfsNotebookRepo.list(authInfo).size());
note_copy = hdfsNotebookRepo.get(note.getId(), authInfo);
assertEquals(note.getName(), note_copy.getName());
assertEquals(note.getConfig(), note_copy.getConfig());
// delete this note
hdfsNotebookRepo.remove(note.getId(), authInfo);
assertEquals(0, hdfsNotebookRepo.list(authInfo).size());
}
@Test
public void testComplicatedScenarios() throws IOException {
// scenario_1: notebook_dir is not clean. There're some unrecognized dir and file under notebook_dir
fs.mkdirs(new Path(notebookDir, "1/2"));
OutputStream out = fs.create(new Path(notebookDir, "1/a.json"));
out.close();
assertEquals(0, hdfsNotebookRepo.list(authInfo).size());
// scenario_2: note_folder is existed.
// create a new note
Note note = new Note();
note.setName("title_1");
Map<String, Object> config = new HashMap<>();
config.put("config_1", "value_1");
note.setConfig(config);
fs.mkdirs(new Path(notebookDir, note.getId()));
hdfsNotebookRepo.save(note, authInfo);
assertEquals(1, hdfsNotebookRepo.list(authInfo).size());
}
}