mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Add support for using Azure storage
This commit is contained in:
parent
025acd3641
commit
ce61f0e917
4 changed files with 247 additions and 0 deletions
|
|
@ -83,6 +83,33 @@
|
|||
</property>
|
||||
-->
|
||||
|
||||
<!-- If using Azure for storage use the following settings -->
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.notebook.azure.user</name>
|
||||
<value>user</value>
|
||||
<description>user name for Azure folder structure</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.notebook.azure.share</name>
|
||||
<value>zeppelin</value>
|
||||
<description>share name for notebook storage</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.notebook.azure.connectionString</name>
|
||||
<value>DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey></value>
|
||||
<description>share name for notebook storage</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.notebook.storage</name>
|
||||
<value>org.apache.zeppelin.notebook.repo.AzureNotebookRepo</value>
|
||||
<description>notebook persistence layer implementation</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!-- For versioning your local norebook storage using Git repository
|
||||
<property>
|
||||
<name>zeppelin.notebook.storage</name>
|
||||
|
|
|
|||
|
|
@ -58,6 +58,26 @@
|
|||
<version>1.10.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>azure-storage</artifactId>
|
||||
<version>4.0.0</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
|
|
|
|||
|
|
@ -472,6 +472,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE("zeppelin.notebook.homescreen.hide", false),
|
||||
ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"),
|
||||
ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"),
|
||||
ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null),
|
||||
ZEPPELIN_NOTEOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
|
||||
ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
|
||||
ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"),
|
||||
// Decide when new note is created, interpreter settings will be binded automatically or not.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,198 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.notebook.repo;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.file.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.NoteInfo;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.io.*;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Azure storage backend for notebooks
|
||||
*/
|
||||
public class AzureNotebookRepo implements NotebookRepo {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(S3NotebookRepo.class);
|
||||
|
||||
private final ZeppelinConfiguration conf;
|
||||
private final String user;
|
||||
private final String shareName;
|
||||
private final CloudFileDirectory rootDir;
|
||||
|
||||
public AzureNotebookRepo(ZeppelinConfiguration conf)
|
||||
throws URISyntaxException, InvalidKeyException, StorageException {
|
||||
this.conf = conf;
|
||||
user = conf.getUser();
|
||||
shareName = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEOOK_AZURE_SHARE);
|
||||
|
||||
CloudStorageAccount account = CloudStorageAccount.parse(
|
||||
conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING));
|
||||
CloudFileClient client = account.createCloudFileClient();
|
||||
CloudFileShare share = client.getShareReference(shareName);
|
||||
share.createIfNotExists();
|
||||
|
||||
CloudFileDirectory userDir = share.getRootDirectoryReference().getDirectoryReference(user);
|
||||
userDir.createIfNotExists();
|
||||
|
||||
rootDir = userDir.getDirectoryReference("notebook");
|
||||
rootDir.createIfNotExists();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NoteInfo> list() throws IOException {
|
||||
List<NoteInfo> infos = new LinkedList<NoteInfo>();
|
||||
NoteInfo info = null;
|
||||
|
||||
for (ListFileItem item : rootDir.listFilesAndDirectories()) {
|
||||
if (item.getClass() == CloudFileDirectory.class) {
|
||||
CloudFileDirectory dir = (CloudFileDirectory) item;
|
||||
|
||||
try {
|
||||
if (dir.getFileReference("note.json").exists()) {
|
||||
info = new NoteInfo(getNote(dir.getName()));
|
||||
|
||||
if (info != null) {
|
||||
infos.add(info);
|
||||
}
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Error enumerating notebooks", e);
|
||||
} catch (StorageException e) {
|
||||
LOG.error("Error enumerating notebooks", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return infos;
|
||||
}
|
||||
|
||||
private Note getNote(String noteId) throws IOException {
|
||||
InputStream ins = null;
|
||||
|
||||
try {
|
||||
CloudFileDirectory dir = rootDir.getDirectoryReference(noteId);
|
||||
CloudFile file = dir.getFileReference("note.json");
|
||||
|
||||
ins = file.openRead();
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Error reading file", e);
|
||||
return null;
|
||||
} catch (StorageException e) {
|
||||
LOG.error("Error reading file", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
String json = IOUtils.toString(ins,
|
||||
conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING));
|
||||
ins.close();
|
||||
|
||||
GsonBuilder gsonBuilder = new GsonBuilder();
|
||||
gsonBuilder.setPrettyPrinting();
|
||||
Gson gson = gsonBuilder.create();
|
||||
|
||||
Note note = gson.fromJson(json, Note.class);
|
||||
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
if (p.getStatus() == Job.Status.PENDING || p.getStatus() == Job.Status.RUNNING) {
|
||||
p.setStatus(Job.Status.ABORT);
|
||||
}
|
||||
}
|
||||
|
||||
return note;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Note get(String noteId) throws IOException {
|
||||
return getNote(noteId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void save(Note note) throws IOException {
|
||||
GsonBuilder gsonBuilder = new GsonBuilder();
|
||||
gsonBuilder.setPrettyPrinting();
|
||||
Gson gson = gsonBuilder.create();
|
||||
String json = gson.toJson(note);
|
||||
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
Writer writer = new OutputStreamWriter(output);
|
||||
writer.write(json);
|
||||
writer.close();
|
||||
output.close();
|
||||
|
||||
byte[] buffer = output.toByteArray();
|
||||
|
||||
try {
|
||||
CloudFileDirectory dir = rootDir.getDirectoryReference(note.getId());
|
||||
dir.createIfNotExists();
|
||||
|
||||
CloudFile cloudFile = dir.getFileReference("note.json");
|
||||
cloudFile.uploadFromByteArray(buffer, 0, buffer.length);
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Error saving file", e);
|
||||
} catch (StorageException e) {
|
||||
LOG.error("Error saving file", e);
|
||||
}
|
||||
}
|
||||
|
||||
// unfortunately, we need to use a recursive delete here
|
||||
private void delete(ListFileItem item) throws StorageException {
|
||||
if (item.getClass() == CloudFileDirectory.class) {
|
||||
CloudFileDirectory dir = (CloudFileDirectory) item;
|
||||
|
||||
for (ListFileItem subItem : dir.listFilesAndDirectories()) {
|
||||
delete(subItem);
|
||||
}
|
||||
|
||||
dir.deleteIfExists();
|
||||
} else if (item.getClass() == CloudFile.class) {
|
||||
CloudFile file = (CloudFile) item;
|
||||
|
||||
file.deleteIfExists();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String noteId) throws IOException {
|
||||
try {
|
||||
CloudFileDirectory dir = rootDir.getDirectoryReference(noteId);
|
||||
|
||||
delete(dir);
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Error deleting file", e);
|
||||
} catch (StorageException e) {
|
||||
LOG.error("Error deleting file", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue