ZEPPELIN-2993. Code Refactoring of ZEPPELIN-1515 follow up

This commit is contained in:
Jeff Zhang 2017-09-14 13:23:33 +08:00
parent 3fb67f9cc5
commit 45d1e9b6f1
6 changed files with 29 additions and 22 deletions

View file

@ -173,11 +173,11 @@
</property>
-->
<!-- Notebook storage layer using hdfs file system
<!-- Notebook storage layer using hadoop compatible file system
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.HdfsNotebookRepo</value>
<description>hdfs notebook persistence layer implementation</description>
<value>org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo</value>
<description>Hadoop compatible file system notebook persistence layer implementation, such as local file system, hdfs, azure wasb, s3 and etc.</description>
</property>
<property>

View file

@ -30,7 +30,7 @@ There are few notebook storage systems available for a use out of the box:
* (default) use local file system and version it using local Git repository - `GitNotebookRepo`
* all notes are saved in the notebook folder in your local File System - `VFSNotebookRepo`
* all notes are saved in the notebook folder in hdfs - `HdfsNotebookRepo`
* all notes are saved in the notebook folder in hadoop compatible file system - `FileSystemNotebookRepo`
* storage using Amazon S3 service - `S3NotebookRepo`
* storage using Azure service - `AzureNotebookRepo`
* storage using MongoDB - `MongoNotebookRepo`
@ -54,16 +54,16 @@ To enable versioning for all your local notebooks though a standard Git reposito
</br>
## Notebook Storage in Hdfs repository <a name="Hdfs"></a>
## Notebook Storage in hadoop compatible file system repository <a name="Hdfs"></a>
Notes may be stored in hdfs, so that multiple Zeppelin instances can share the same notes. It supports all the versions of hadoop 2.x. If you use `HdfsNotebookRepo`, then `zeppelin.notebook.dir` is the path on hdfs. And you need to specify `HADOOP_CONF_DIR` in `zeppelin-env.sh` so that zeppelin can find the right hadoop configuration files.
Notes may be stored in hadoop compatible file system such as hdfs, so that multiple Zeppelin instances can share the same notes. It supports all the versions of hadoop 2.x. If you use `FileSystemNotebookRepo`, then `zeppelin.notebook.dir` is the path on the hadoop compatible file system. And you need to specify `HADOOP_CONF_DIR` in `zeppelin-env.sh` so that zeppelin can find the right hadoop configuration files.
If your hadoop cluster is kerberized, then you need to specify `zeppelin.hdfs.keytab` and `zeppelin.hdfs.principal`
```
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.HdfsNotebookRepo</value>
<description>hdfs notebook persistence layer implementation</description>
<value>org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo</value>
<description>hadoop compatible file system notebook persistence layer implementation</description>
</property>
```

View file

@ -43,7 +43,7 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStop() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
assertEquals(false, server.isRunning());
server.start();

View file

@ -18,6 +18,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -33,9 +35,8 @@ import java.util.Map;
* - noteId/note.json
* - noteId/note.json
*/
public class HdfsNotebookRepo implements NotebookRepo {
private static final Logger LOGGER = LoggerFactory.getLogger(HdfsNotebookRepo.class);
public class FileSystemNotebookRepo implements NotebookRepo {
private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemNotebookRepo.class);
private Configuration hadoopConf;
private ZeppelinConfiguration zConf;
@ -43,11 +44,10 @@ public class HdfsNotebookRepo implements NotebookRepo {
private FileSystem fs;
private Path notebookDir;
public HdfsNotebookRepo(ZeppelinConfiguration zConf) throws IOException {
public FileSystemNotebookRepo(ZeppelinConfiguration zConf) throws IOException {
this.zConf = zConf;
this.hadoopConf = new Configuration();
this.notebookDir = new Path(zConf.getNotebookDir());
LOGGER.info("Use hdfs directory {} to store notebook", notebookDir);
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
if (isSecurityEnabled) {
String keytab = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HDFS_KEYTAB);
@ -59,7 +59,14 @@ public class HdfsNotebookRepo implements NotebookRepo {
UserGroupInformation.loginUserFromKeytab(principal, keytab);
}
this.fs = FileSystem.get(new Configuration());
try {
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new Configuration());
LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName());
this.notebookDir = fs.makeQualified(new Path(zConf.getNotebookDir()));
LOGGER.info("Using folder {} to store notebook", notebookDir);
} catch (URISyntaxException e) {
throw new IOException(e);
}
if (!fs.exists(notebookDir)) {
fs.mkdirs(notebookDir);
LOGGER.info("Create notebook dir {} in hdfs", notebookDir.toString());
@ -68,7 +75,6 @@ public class HdfsNotebookRepo implements NotebookRepo {
throw new IOException("notebookDir {} is file instead of directory, please remove it or " +
"specify another directory");
}
}
@Override
@ -180,7 +186,7 @@ public class HdfsNotebookRepo implements NotebookRepo {
T call() throws IOException;
}
public <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
public synchronized <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
if (isSecurityEnabled) {
UserGroupInformation.getLoginUser().reloginFromKeytab();
try {

View file

@ -218,6 +218,7 @@ public class VFSNotebookRepo implements NotebookRepo {
@Override
public synchronized void save(Note note, AuthenticationInfo subject) throws IOException {
LOG.info("Saving note:" + note.getId());
String json = note.toJson();
FileObject rootDir = getRootDir();

View file

@ -21,23 +21,23 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
public class HdfsNotebookRepoTest {
public class FileSystemNotebookRepoTest {
private ZeppelinConfiguration zConf;
private Configuration hadoopConf;
private FileSystem fs;
private HdfsNotebookRepo hdfsNotebookRepo;
private FileSystemNotebookRepo hdfsNotebookRepo;
private String notebookDir;
private AuthenticationInfo authInfo = AuthenticationInfo.ANONYMOUS;
@Before
public void setUp() throws IOException {
notebookDir = Files.createTempDirectory("HdfsNotebookRepoTest").toFile().getAbsolutePath();
notebookDir = Files.createTempDirectory("FileSystemNotebookRepoTest").toFile().getAbsolutePath();
zConf = new ZeppelinConfiguration();
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir);
hadoopConf = new Configuration();
fs = FileSystem.get(hadoopConf);
hdfsNotebookRepo = new HdfsNotebookRepo(zConf);
hdfsNotebookRepo = new FileSystemNotebookRepo(zConf);
}
@After