Add support for using Azure storage

This commit is contained in:
Silvio Fiorito 2016-02-04 22:04:05 -05:00
parent 025acd3641
commit ce61f0e917
4 changed files with 247 additions and 0 deletions

View file

@ -83,6 +83,33 @@
</property>
-->
<!-- If using Azure for storage use the following settings -->
<!--
<property>
<name>zeppelin.notebook.azure.user</name>
<value>user</value>
<description>user name for Azure folder structure</description>
</property>
<property>
<name>zeppelin.notebook.azure.share</name>
<value>zeppelin</value>
<description>share name for notebook storage</description>
</property>
<property>
<name>zeppelin.notebook.azure.connectionString</name>
<value>DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey></value>
<description>share name for notebook storage</description>
</property>
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.AzureNotebookRepo</value>
<description>notebook persistence layer implementation</description>
</property>
-->
<!-- For versioning your local norebook storage using Git repository
<property>
<name>zeppelin.notebook.storage</name>

View file

@ -58,6 +58,26 @@
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>4.0.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View file

@ -472,6 +472,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE("zeppelin.notebook.homescreen.hide", false),
ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"),
ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"),
ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null),
ZEPPELIN_NOTEOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"),
// Decide when new note is created, interpreter settings will be binded automatically or not.

View file

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.file.*;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.scheduler.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.LinkedList;
import java.util.List;
/**
* Azure storage backend for notebooks
*/
public class AzureNotebookRepo implements NotebookRepo {
private static final Logger LOG = LoggerFactory.getLogger(S3NotebookRepo.class);
private final ZeppelinConfiguration conf;
private final String user;
private final String shareName;
private final CloudFileDirectory rootDir;
public AzureNotebookRepo(ZeppelinConfiguration conf)
throws URISyntaxException, InvalidKeyException, StorageException {
this.conf = conf;
user = conf.getUser();
shareName = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEOOK_AZURE_SHARE);
CloudStorageAccount account = CloudStorageAccount.parse(
conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING));
CloudFileClient client = account.createCloudFileClient();
CloudFileShare share = client.getShareReference(shareName);
share.createIfNotExists();
CloudFileDirectory userDir = share.getRootDirectoryReference().getDirectoryReference(user);
userDir.createIfNotExists();
rootDir = userDir.getDirectoryReference("notebook");
rootDir.createIfNotExists();
}
@Override
public List<NoteInfo> list() throws IOException {
List<NoteInfo> infos = new LinkedList<NoteInfo>();
NoteInfo info = null;
for (ListFileItem item : rootDir.listFilesAndDirectories()) {
if (item.getClass() == CloudFileDirectory.class) {
CloudFileDirectory dir = (CloudFileDirectory) item;
try {
if (dir.getFileReference("note.json").exists()) {
info = new NoteInfo(getNote(dir.getName()));
if (info != null) {
infos.add(info);
}
}
} catch (URISyntaxException e) {
LOG.error("Error enumerating notebooks", e);
} catch (StorageException e) {
LOG.error("Error enumerating notebooks", e);
}
}
}
return infos;
}
private Note getNote(String noteId) throws IOException {
InputStream ins = null;
try {
CloudFileDirectory dir = rootDir.getDirectoryReference(noteId);
CloudFile file = dir.getFileReference("note.json");
ins = file.openRead();
} catch (URISyntaxException e) {
LOG.error("Error reading file", e);
return null;
} catch (StorageException e) {
LOG.error("Error reading file", e);
return null;
}
String json = IOUtils.toString(ins,
conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING));
ins.close();
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.setPrettyPrinting();
Gson gson = gsonBuilder.create();
Note note = gson.fromJson(json, Note.class);
for (Paragraph p : note.getParagraphs()) {
if (p.getStatus() == Job.Status.PENDING || p.getStatus() == Job.Status.RUNNING) {
p.setStatus(Job.Status.ABORT);
}
}
return note;
}
@Override
public Note get(String noteId) throws IOException {
return getNote(noteId);
}
@Override
public void save(Note note) throws IOException {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.setPrettyPrinting();
Gson gson = gsonBuilder.create();
String json = gson.toJson(note);
ByteArrayOutputStream output = new ByteArrayOutputStream();
Writer writer = new OutputStreamWriter(output);
writer.write(json);
writer.close();
output.close();
byte[] buffer = output.toByteArray();
try {
CloudFileDirectory dir = rootDir.getDirectoryReference(note.getId());
dir.createIfNotExists();
CloudFile cloudFile = dir.getFileReference("note.json");
cloudFile.uploadFromByteArray(buffer, 0, buffer.length);
} catch (URISyntaxException e) {
LOG.error("Error saving file", e);
} catch (StorageException e) {
LOG.error("Error saving file", e);
}
}
// unfortunately, we need to use a recursive delete here
private void delete(ListFileItem item) throws StorageException {
if (item.getClass() == CloudFileDirectory.class) {
CloudFileDirectory dir = (CloudFileDirectory) item;
for (ListFileItem subItem : dir.listFilesAndDirectories()) {
delete(subItem);
}
dir.deleteIfExists();
} else if (item.getClass() == CloudFile.class) {
CloudFile file = (CloudFile) item;
file.deleteIfExists();
}
}
@Override
public void remove(String noteId) throws IOException {
try {
CloudFileDirectory dir = rootDir.getDirectoryReference(noteId);
delete(dir);
} catch (URISyntaxException e) {
LOG.error("Error deleting file", e);
} catch (StorageException e) {
LOG.error("Error deleting file", e);
}
}
@Override
public void close() {
}
}