mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-1515. Notebook: HDFS as a backend storage (Read & Write Mode)
This commit is contained in:
parent
af18991367
commit
b3e83abbd5
11 changed files with 418 additions and 90 deletions
|
|
@ -67,6 +67,10 @@ if [[ -d "${ZEPPELIN_HOME}/zeppelin-server/target/classes" ]]; then
|
|||
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-server/target/classes"
|
||||
fi
|
||||
|
||||
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
|
||||
ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}"
|
||||
fi
|
||||
|
||||
# Add jdbc connector jar
|
||||
# ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/jdbc/jars/jdbc-connector-jar"
|
||||
|
||||
|
|
|
|||
|
|
@ -73,6 +73,10 @@ addJarInDir "${ZEPPELIN_HOME}/zeppelin-web/target/lib"
|
|||
|
||||
ZEPPELIN_CLASSPATH="$CLASSPATH:$ZEPPELIN_CLASSPATH"
|
||||
|
||||
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
|
||||
ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}"
|
||||
fi
|
||||
|
||||
if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then
|
||||
echo "Log dir doesn't exist, create ${ZEPPELIN_LOG_DIR}"
|
||||
$(mkdir -p "${ZEPPELIN_LOG_DIR}")
|
||||
|
|
|
|||
|
|
@ -173,6 +173,26 @@
|
|||
</property>
|
||||
-->
|
||||
|
||||
<!-- Notebook storage layer using hdfs file system
|
||||
<property>
|
||||
<name>zeppelin.notebook.storage</name>
|
||||
<value>org.apache.zeppelin.notebook.repo.HdfsNotebookRepo</value>
|
||||
<description>hdfs notebook persistence layer implementation</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.hdfs.keytab</name>
|
||||
<value></value>
|
||||
<description>keytab for accessing kerberized hdfs</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.hdfs.principal</name>
|
||||
<value></value>
|
||||
<description>principal for accessing kerberized hdfs</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!-- For connecting your Zeppelin with ZeppelinHub -->
|
||||
<!--
|
||||
<property>
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ There are few notebook storage systems available for a use out of the box:
|
|||
|
||||
* (default) use local file system and version it using local Git repository - `GitNotebookRepo`
|
||||
* all notes are saved in the notebook folder in your local File System - `VFSNotebookRepo`
|
||||
* all notes are saved in the notebook folder in hdfs - `HdfsNotebookRepo`
|
||||
* storage using Amazon S3 service - `S3NotebookRepo`
|
||||
* storage using Azure service - `AzureNotebookRepo`
|
||||
* storage using MongoDB - `MongoNotebookRepo`
|
||||
|
|
@ -51,6 +52,22 @@ To enable versioning for all your local notebooks though a standard Git reposito
|
|||
</property>
|
||||
```
|
||||
|
||||
</br>
|
||||
|
||||
## Notebook Storage in Hdfs repository <a name="Hdfs"></a>
|
||||
|
||||
Notes may be stored in hdfs, so that multiple Zeppelin instances can share the same notes. It supports all the versions of hadoop 2.x. If you use `HdfsNotebookRepo`, then `zeppelin.notebook.dir` is the path on hdfs. And you need to specify `HADOOP_CONF_DIR` in `zeppelin-env.sh` so that zeppelin can find the right hadoop configuration files.
|
||||
If your hadoop cluster is kerberized, then you need to specify `zeppelin.hdfs.keytab` and `zeppelin.hdfs.principal`
|
||||
|
||||
```
|
||||
<property>
|
||||
<name>zeppelin.notebook.storage</name>
|
||||
<value>org.apache.zeppelin.notebook.repo.HdfsNotebookRepo</value>
|
||||
<description>hdfs notebook persistence layer implementation</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
|
||||
</br>
|
||||
|
||||
## Notebook Storage in S3 <a name="S3"></a>
|
||||
|
|
|
|||
|
|
@ -195,101 +195,12 @@
|
|||
<artifactId>websocket-server</artifactId>
|
||||
<version>${jetty.version}</version>
|
||||
</dependency>
|
||||
<!--
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-jsp</artifactId>
|
||||
<version>${jetty.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jetty.orbit</groupId>
|
||||
<artifactId>javax.servlet.jsp</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jetty.orbit</groupId>
|
||||
<artifactId>javax.servlet</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jetty.orbit</groupId>
|
||||
<artifactId>org.apache.jasper.glassfish</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.orbit</groupId>
|
||||
<artifactId>javax.servlet.jsp</artifactId>
|
||||
<version>2.2.0.v201112011158</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jetty.orbit</groupId>
|
||||
<artifactId>javax.servlet</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
-->
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<version>${hadoop-common.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-json</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
</exclusion>
|
||||
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.jackrabbit</groupId>
|
||||
<artifactId>jackrabbit-webdav</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-httpclient</groupId>
|
||||
<artifactId>commons-httpclient</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jgit</groupId>
|
||||
<artifactId>org.eclipse.jgit</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.jcraft</groupId>
|
||||
<artifactId>jsch</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.quartz-scheduler</groupId>
|
||||
<artifactId>quartz</artifactId>
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@
|
|||
|
||||
<properties>
|
||||
<!--library versions-->
|
||||
<hadoop.version>2.6.0</hadoop.version>
|
||||
<commons.lang3.version>3.4</commons.lang3.version>
|
||||
<commons.vfs2.version>2.0</commons.vfs2.version>
|
||||
<aws.sdk.s3.version>1.10.62</aws.sdk.s3.version>
|
||||
|
|
@ -301,6 +302,71 @@
|
|||
<version>1.5</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-json</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
</exclusion>
|
||||
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.jackrabbit</groupId>
|
||||
<artifactId>jackrabbit-webdav</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-httpclient</groupId>
|
||||
<artifactId>commons-httpclient</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jgit</groupId>
|
||||
<artifactId>org.eclipse.jgit</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.jcraft</groupId>
|
||||
<artifactId>jsch</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>xml-apis</groupId>
|
||||
<artifactId>xml-apis</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>xerces</groupId>
|
||||
<artifactId>xercesImpl</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
|||
|
|
@ -681,7 +681,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"),
|
||||
ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", null),
|
||||
ZEPPELIN_SERVER_STRICT_TRANSPORT("zeppelin.server.strict.transport", "max-age=631138519"),
|
||||
ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1");
|
||||
ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
|
||||
|
||||
ZEPPELIN_HDFS_KEYTAB("zeppelin.hdfs.keytab", ""),
|
||||
ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", "");
|
||||
|
||||
private String varName;
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
|
|
|||
|
|
@ -106,6 +106,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
|
|||
|
||||
|
||||
public Note() {
|
||||
generateId();
|
||||
}
|
||||
|
||||
public Note(NotebookRepo repo, InterpreterFactory factory,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,200 @@
|
|||
package org.apache.zeppelin.notebook.repo;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.NoteInfo;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* NotebookRepos for hdfs.
|
||||
*
|
||||
* Assume the notebook directory structure is as following
|
||||
* - notebookdir
|
||||
* - noteId/note.json
|
||||
* - noteId/note.json
|
||||
* - noteId/note.json
|
||||
*/
|
||||
public class HdfsNotebookRepo implements NotebookRepo {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(HdfsNotebookRepo.class);
|
||||
|
||||
|
||||
private Configuration hadoopConf;
|
||||
private ZeppelinConfiguration zConf;
|
||||
private boolean isSecurityEnabled = false;
|
||||
private FileSystem fs;
|
||||
private Path notebookDir;
|
||||
|
||||
public HdfsNotebookRepo(ZeppelinConfiguration zConf) throws IOException {
|
||||
this.zConf = zConf;
|
||||
this.hadoopConf = new Configuration();
|
||||
this.notebookDir = new Path(zConf.getNotebookDir());
|
||||
LOGGER.info("Use hdfs directory {} to store notebook", notebookDir);
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
if (isSecurityEnabled) {
|
||||
String keytab = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HDFS_KEYTAB);
|
||||
String principal = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HDFS_PRINCIPAL);
|
||||
if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
|
||||
throw new IOException("keytab and principal can not be empty, keytab: " + keytab
|
||||
+ ", principal: " + principal);
|
||||
}
|
||||
UserGroupInformation.loginUserFromKeytab(principal, keytab);
|
||||
}
|
||||
|
||||
this.fs = FileSystem.get(new Configuration());
|
||||
if (!fs.exists(notebookDir)) {
|
||||
fs.mkdirs(notebookDir);
|
||||
LOGGER.info("Create notebook dir {} in hdfs", notebookDir.toString());
|
||||
}
|
||||
if (fs.isFile(notebookDir)) {
|
||||
throw new IOException("notebookDir {} is file instead of directory, please remove it or " +
|
||||
"specify another directory");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<List<NoteInfo>>() {
|
||||
@Override
|
||||
public List<NoteInfo> call() throws IOException {
|
||||
List<NoteInfo> noteInfos = new ArrayList<>();
|
||||
for (FileStatus status : fs.globStatus(new Path(notebookDir, "*/note.json"))) {
|
||||
NoteInfo noteInfo = new NoteInfo(status.getPath().getParent().getName(), "", null);
|
||||
noteInfos.add(noteInfo);
|
||||
}
|
||||
return noteInfos;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Note get(final String noteId, AuthenticationInfo subject) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<Note>() {
|
||||
@Override
|
||||
public Note call() throws IOException {
|
||||
Path notePath = new Path(notebookDir.toString() + "/" + noteId + "/note.json");
|
||||
LOGGER.debug("Read note from file: " + notePath);
|
||||
ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
|
||||
IOUtils.copyBytes(fs.open(notePath), noteBytes, hadoopConf);
|
||||
return Note.fromJson(new String(noteBytes.toString(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING))));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void save(final Note note, AuthenticationInfo subject) throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
Path notePath = new Path(notebookDir.toString() + "/" + note.getId() + "/note.json");
|
||||
Path tmpNotePath = new Path(notebookDir.toString() + "/" + note.getId() + "/.note.json");
|
||||
LOGGER.debug("Saving note to file: " + notePath);
|
||||
if (fs.exists(tmpNotePath)) {
|
||||
fs.delete(tmpNotePath, true);
|
||||
}
|
||||
InputStream in = new ByteArrayInputStream(note.toJson().getBytes(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
|
||||
IOUtils.copyBytes(in, fs.create(tmpNotePath), hadoopConf);
|
||||
fs.delete(notePath, true);
|
||||
fs.rename(tmpNotePath, notePath);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(final String noteId, AuthenticationInfo subject) throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
Path noteFolder = new Path(notebookDir.toString() + "/" + noteId);
|
||||
fs.delete(noteFolder, true);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
LOGGER.warn("close is not implemented for HdfsNotebookRepo");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
|
||||
throws IOException {
|
||||
LOGGER.warn("checkpoint is not implemented for HdfsNotebookRepo");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
|
||||
LOGGER.warn("get revId is not implemented for HdfsNotebookRepo");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
|
||||
LOGGER.warn("revisionHistory is not implemented for HdfsNotebookRepo");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject)
|
||||
throws IOException {
|
||||
LOGGER.warn("setNoteRevision is not implemented for HdfsNotebookRepo");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject) {
|
||||
LOGGER.warn("getSettings is not implemented for HdfsNotebookRepo");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateSettings(Map<String, String> settings, AuthenticationInfo subject) {
|
||||
LOGGER.warn("updateSettings is not implemented for HdfsNotebookRepo");
|
||||
}
|
||||
|
||||
private interface HdfsOperation<T> {
|
||||
T call() throws IOException;
|
||||
}
|
||||
|
||||
public <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
|
||||
if (isSecurityEnabled) {
|
||||
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||
try {
|
||||
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<T>() {
|
||||
@Override
|
||||
public T run() throws Exception {
|
||||
return func.call();
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
} else {
|
||||
return func.call();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -81,6 +81,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
Constructor<?> constructor = notebookStorageClass.getConstructor(
|
||||
ZeppelinConfiguration.class);
|
||||
repos.add((NotebookRepo) constructor.newInstance(conf));
|
||||
LOG.info("Instantiate NotebookRepo: " + storageClassNames[i]);
|
||||
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException |
|
||||
InstantiationException | IllegalAccessException | IllegalArgumentException |
|
||||
InvocationTargetException e) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,101 @@
|
|||
package org.apache.zeppelin.notebook.repo;
|
||||
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class HdfsNotebookRepoTest {
|
||||
|
||||
private ZeppelinConfiguration zConf;
|
||||
private Configuration hadoopConf;
|
||||
private FileSystem fs;
|
||||
private HdfsNotebookRepo hdfsNotebookRepo;
|
||||
private String notebookDir;
|
||||
private AuthenticationInfo authInfo = AuthenticationInfo.ANONYMOUS;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
notebookDir = Files.createTempDirectory("HdfsNotebookRepoTest").toFile().getAbsolutePath();
|
||||
zConf = new ZeppelinConfiguration();
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir);
|
||||
hadoopConf = new Configuration();
|
||||
fs = FileSystem.get(hadoopConf);
|
||||
hdfsNotebookRepo = new HdfsNotebookRepo(zConf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
FileUtils.deleteDirectory(new File(notebookDir));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasics() throws IOException {
|
||||
assertEquals(0, hdfsNotebookRepo.list(authInfo).size());
|
||||
|
||||
// create a new note
|
||||
Note note = new Note();
|
||||
note.setName("title_1");
|
||||
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("config_1", "value_1");
|
||||
note.setConfig(config);
|
||||
hdfsNotebookRepo.save(note, authInfo);
|
||||
assertEquals(1, hdfsNotebookRepo.list(authInfo).size());
|
||||
|
||||
// read this note from hdfs
|
||||
Note note_copy = hdfsNotebookRepo.get(note.getId(), authInfo);
|
||||
assertEquals(note.getName(), note_copy.getName());
|
||||
assertEquals(note.getConfig(), note_copy.getConfig());
|
||||
|
||||
// update this note
|
||||
note.setName("title_2");
|
||||
hdfsNotebookRepo.save(note, authInfo);
|
||||
assertEquals(1, hdfsNotebookRepo.list(authInfo).size());
|
||||
note_copy = hdfsNotebookRepo.get(note.getId(), authInfo);
|
||||
assertEquals(note.getName(), note_copy.getName());
|
||||
assertEquals(note.getConfig(), note_copy.getConfig());
|
||||
|
||||
// delete this note
|
||||
hdfsNotebookRepo.remove(note.getId(), authInfo);
|
||||
assertEquals(0, hdfsNotebookRepo.list(authInfo).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComplicatedScenarios() throws IOException {
|
||||
// scenario_1: notebook_dir is not clean. There're some unrecognized dir and file under notebook_dir
|
||||
fs.mkdirs(new Path(notebookDir, "1/2"));
|
||||
OutputStream out = fs.create(new Path(notebookDir, "1/a.json"));
|
||||
out.close();
|
||||
|
||||
assertEquals(0, hdfsNotebookRepo.list(authInfo).size());
|
||||
|
||||
// scenario_2: note_folder is existed.
|
||||
// create a new note
|
||||
Note note = new Note();
|
||||
note.setName("title_1");
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("config_1", "value_1");
|
||||
note.setConfig(config);
|
||||
|
||||
fs.mkdirs(new Path(notebookDir, note.getId()));
|
||||
hdfsNotebookRepo.save(note, authInfo);
|
||||
assertEquals(1, hdfsNotebookRepo.list(authInfo).size());
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue