# Conflicts:
#	docs/security/authentication.md
This commit is contained in:
Jesang Yoon 2016-03-21 15:04:42 +09:00
commit 90219f737c
57 changed files with 4206 additions and 286 deletions

1
.gitignore vendored
View file

@ -12,6 +12,7 @@
spark/derby.log
spark/metastore_db
spark-1.*-bin-hadoop*
zeppelin-server/derby.log
lens/lens-cli-hist.log

View file

@ -24,7 +24,7 @@
# export ZEPPELIN_INTP_JAVA_OPTS # zeppelin interpreter process jvm options. Default = ZEPPELIN_JAVA_OPTS
# export ZEPPELIN_LOG_DIR # Where log files are stored. PWD by default.
# export ZEPPELIN_PID_DIR # The pid files are stored. /tmp by default.
# export ZEPPELIN_PID_DIR # The pid files are stored. ${ZEPPELIN_HOME}/run by default.
# export ZEPPELIN_WAR_TEMPDIR # The location of jetty temporary directory.
# export ZEPPELIN_NOTEBOOK_DIR # Where notebook saved
# export ZEPPELIN_NOTEBOOK_HOMESCREEN # Id of notebook to be displayed in homescreen. ex) 2A94M5J1Z

View file

@ -138,7 +138,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

View file

@ -46,6 +46,7 @@
<li><a href="{{BASE_PATH}}/interpreter/flink.html">Flink</a></li>
<li><a href="{{BASE_PATH}}/interpreter/geode.html">Geode</a></li>
<li><a href="{{BASE_PATH}}/interpreter/hbase.html">HBase</a></li>
<li><a href="{{BASE_PATH}}/interpreter/hdfs.html">HDFS</a></li>
<li><a href="{{BASE_PATH}}/interpreter/hive.html">Hive</a></li>
<li><a href="{{BASE_PATH}}/interpreter/ignite.html">Ignite</a></li>
<li><a href="{{BASE_PATH}}/interpreter/jdbc.html">JDBC</a></li>

56
docs/interpreter/hdfs.md Normal file
View file

@ -0,0 +1,56 @@
---
layout: page
title: "HDFS File System Interpreter"
description: ""
group: manual
---
{% include JB/setup %}
## HDFS File System Interpreter for Apache Zeppelin
[Hadoop File System](http://hadoop.apache.org/) is a distributed, fault tolerant file system part of the hadoop project and is often used as storage for distributed processing engines like [Hadoop MapReduce](http://hadoop.apache.org/) and [Apache Spark](http://spark.apache.org/) or underlying file systems like [Alluxio](http://www.alluxio.org/).
## Configuration
<table class="table-configuration">
<tr>
<th>Property</th>
<th>Default</th>
<th>Description</th>
</tr>
<tr>
<td>hdfs.url</td>
<td>http://localhost:50070/webhdfs/v1/</td>
<td>The URL for WebHDFS</td>
</tr>
<tr>
<td>hdfs.user</td>
<td>hdfs</td>
<td>The WebHDFS user</td>
</tr>
<tr>
<td>hdfs.maxlength</td>
<td>1000</td>
<td>Maximum number of lines of results fetched</td>
</tr>
</table>
<br/>
This interpreter connects to HDFS using the HTTP WebHDFS interface.
It supports the basic shell file commands applied to HDFS, it currently only supports browsing.
* You can use <i>ls [PATH]</i> and <i>ls -l [PATH]</i> to list a directory. If the path is missing, then the current directory is listed. <i>ls </i> supports a <i>-h</i> flag for human readable file sizes.
* You can use <i>cd [PATH]</i> to change your current directory by giving a relative or an absolute path.
* You can invoke <i>pwd</i> to see your current directory.
> **Tip :** Use ( Ctrl + . ) for autocompletion.
### Create Interpreter
In a notebook, to enable the **HDFS** interpreter, click the **Gear** icon and select **HDFS**.
#### WebHDFS REST API
You can confirm that you're able to access the WebHDFS API by running a curl command against the WebHDFS end point provided to the interpreter.
Here is an example:
$> curl "http://localhost:50070/webhdfs/v1/?op=LISTSTATUS"

View file

@ -62,7 +62,7 @@ This instruction based on Ubuntu 14.04 LTS but may work with other OS with few c
# Zeppelin Website
server {
listen 80;
listen [YOUR-ZEPPELIN-WEB-SERVER-PORT];
listen 443 ssl; # optional, to serve HTTPS connection
server_name [YOUR-ZEPPELIN-SERVER-HOST]; # for example: zeppelin.mycompany.com
@ -87,7 +87,7 @@ This instruction based on Ubuntu 14.04 LTS but may work with other OS with few c
# Zeppelin Websocket
server {
listen 8091 ssl;
listen [YOUR-ZEPPELIN-WEBSOCKET-PORT] ssl; # add ssl is optional, to serve HTTPS connection
server_name [YOUR-ZEPPELIN-SERVER-HOST]; # for example: zeppelin.mycompany.com
ssl_certificate [PATH-TO-YOUR-CERT-FILE]; # optional, to serve HTTPS connection
@ -134,7 +134,7 @@ This instruction based on Ubuntu 14.04 LTS but may work with other OS with few c
1. More security consideration
* Using HTTPS connection with Basic Authentication is highly recommended since basic auth without encryption may expose your important credential information over the network.
* Using [Shiro Security feature built-into Zeppelin](https://github.com/apache/incubator-zeppelin/pull/53) is recommended if you prefer all-in-one solution for authentication but NGINX may provides ad-hoc solution for re-use authentication served by your system's NGINX server or in case of you need to separate authentication from zeppelin server.
* Using [Shiro Security feature built-into Zeppelin](https://github.com/apache/incubator-zeppelin/blob/master/SECURITY-README.md) is recommended if you prefer all-in-one solution for authentication but NGINX may provides ad-hoc solution for re-use authentication served by your system's NGINX server or in case of you need to separate authentication from zeppelin server.
* It is recommended to isolate direct connection to Zeppelin server from public internet or external services to secure your zeppelin instance from unexpected attack or problems caused by public zone.
### Another option

146
file/pom.xml Normal file
View file

@ -0,0 +1,146 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.6.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-file</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<name>Zeppelin File System Interpreters</name>
<url>http://www.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
<version>2.22.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/file</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/file</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<!--<includeScope>runtime</includeScope>-->
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.file;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
/**
* File interpreter for Zeppelin.
*
*/
public abstract class FileInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(FileInterpreter.class);
String currentDir = null;
CommandArgs args = null;
public FileInterpreter(Properties property) {
super(property);
currentDir = new String("/");
}
/**
* Handling the arguments of the command
*/
public class CommandArgs {
public String input = null;
public String command = null;
public ArrayList<String> args = null;
public HashSet<Character> flags = null;
public CommandArgs(String cmd) {
input = cmd;
args = new ArrayList();
flags = new HashSet();
}
private void parseArg(String arg) {
if (arg.charAt(0) == '-') { // handle flags
for (int i = 0; i < arg.length(); i++) {
Character c = arg.charAt(i);
flags.add(c);
}
} else { // handle other args
args.add(arg);
}
}
public void parseArgs() {
if (input == null)
return;
StringTokenizer st = new StringTokenizer(input);
if (st.hasMoreTokens()) {
command = st.nextToken();
while (st.hasMoreTokens())
parseArg(st.nextToken());
}
}
}
// Functions that each file system implementation must override
public abstract String listAll(String path);
public abstract boolean isDirectory(String path);
// Combine paths, takes care of arguments such as ..
protected String getNewPath(String argument){
Path arg = Paths.get(argument);
Path ret = arg.isAbsolute() ? arg : Paths.get(currentDir, argument);
return ret.normalize().toString();
}
// Handle the command handling uniformly across all file systems
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.info("Run File command '" + cmd + "'");
args = new CommandArgs(cmd);
args.parseArgs();
if (args.command == null) {
logger.info("Error: No command");
return new InterpreterResult(Code.ERROR, Type.TEXT, "No command");
}
// Simple parsing of the command
if (args.command.equals("cd")) {
String newPath = !args.args.isEmpty() ? getNewPath(args.args.get(0)) : currentDir;
if (!isDirectory(newPath))
return new InterpreterResult(Code.ERROR, Type.TEXT, newPath + ": No such directory");
currentDir = newPath;
return new InterpreterResult(Code.SUCCESS, Type.TEXT, "OK");
} else if (args.command.equals("ls")) {
String newPath = !args.args.isEmpty() ? getNewPath(args.args.get(0)) : currentDir;
try {
String results = listAll(newPath);
return new InterpreterResult(Code.SUCCESS, Type.TEXT, results);
} catch (Exception e) {
logger.error("Error listing files in path " + newPath, e);
return new InterpreterResult(Code.ERROR, Type.TEXT, e.getMessage());
}
} else if (args.command.equals("pwd")) {
return new InterpreterResult(Code.SUCCESS, Type.TEXT, currentDir);
} else {
return new InterpreterResult(Code.ERROR, Type.TEXT, "Unknown command");
}
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
FileInterpreter.class.getName() + this.hashCode());
}
@Override
public List<String> completion(String buf, int cursor) {
return null;
}
}

View file

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.file;
import java.net.URL;
import java.net.HttpURLConnection;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.ws.rs.core.UriBuilder;
import org.slf4j.Logger;
/**
* Definition and HTTP invocation methods for all WebHDFS commands
*
*/
public class HDFSCommand {
/**
* Type of HTTP request
*/
public enum HttpType {
GET,
PUT
}
/**
* Definition of WebHDFS operator
*/
public class Op {
public String op;
public HttpType cmd;
public int minArgs;
public Op(String op, HttpType cmd, int minArgs) {
this.op = op;
this.cmd = cmd;
this.minArgs = minArgs;
}
}
/**
* Definition of argument to an operator
*/
public class Arg {
public String key;
public String value;
public Arg(String key, String value) {
this.key = key;
this.value = value;
}
}
// How to connect to WebHDFS
String url = null;
String user = null;
int maxLength = 0;
Logger logger;
// Define all the commands available
public Op getFileStatus = new Op("GETFILESTATUS", HttpType.GET, 0);
public Op listStatus = new Op("LISTSTATUS", HttpType.GET, 0);
public HDFSCommand(String url, String user, Logger logger, int maxLength) {
super();
this.url = url;
this.user = user;
this.maxLength = maxLength;
this.logger = logger;
}
public String checkArgs(Op op, String path, Arg[] args) throws Exception {
if (op == null ||
path == null ||
(op.minArgs > 0 &&
(args == null ||
args.length != op.minArgs)))
{
String a = "";
a = (op != null) ? a + op.op + "\n" : a;
a = (path != null) ? a + path + "\n" : a;
a = (args != null) ? a + args + "\n" : a;
return a;
}
return null;
}
// The operator that runs all commands
public String runCommand(Op op, String path, Arg[] args) throws Exception {
// Check arguments
String error = checkArgs(op, path, args);
if (error != null) {
logger.error("Bad arguments to command: " + error);
return "ERROR: BAD ARGS";
}
// Build URI
UriBuilder builder = UriBuilder
.fromPath(url)
.path(path)
.queryParam("op", op.op);
if (args != null) {
for (Arg a : args) {
builder = builder.queryParam(a.key, a.value);
}
}
java.net.URI uri = builder.build();
// Connect and get response string
URL hdfsUrl = uri.toURL();
HttpURLConnection con = (HttpURLConnection) hdfsUrl.openConnection();
if (op.cmd == HttpType.GET) {
con.setRequestMethod("GET");
int responseCode = con.getResponseCode();
logger.info("Sending 'GET' request to URL : " + hdfsUrl);
logger.info("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(
new InputStreamReader(con.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
int i = 0;
while ((inputLine = in.readLine()) != null) {
if (inputLine.length() < maxLength)
response.append(inputLine);
i++;
if (i >= maxLength)
break;
}
in.close();
return response.toString();
}
return null;
}
}

View file

@ -0,0 +1,330 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.file;
import java.text.SimpleDateFormat;
import java.util.*;
import com.google.gson.Gson;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
/**
* HDFS implementation of File interpreter for Zeppelin.
*
*/
public class HDFSFileInterpreter extends FileInterpreter {
static final String HDFS_URL = "hdfs.url";
static final String HDFS_USER = "hdfs.user";
static final String HDFS_MAXLENGTH = "hdfs.maxlength";
static {
Interpreter.register(
"hdfs",
"file",
HDFSFileInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(HDFS_URL, "http://localhost:50070/webhdfs/v1/", "The URL for WebHDFS")
.add(HDFS_USER, "hdfs", "The WebHDFS user")
.add(HDFS_MAXLENGTH, "1000", "Maximum number of lines of results fetched").build());
}
Exception exceptionOnConnect = null;
HDFSCommand cmd = null;
Gson gson = null;
public void prepare() {
String userName = getProperty(HDFS_USER);
String hdfsUrl = getProperty(HDFS_URL);
int i = Integer.parseInt(getProperty(HDFS_MAXLENGTH));
cmd = new HDFSCommand(hdfsUrl, userName, logger, i);
gson = new Gson();
}
public HDFSFileInterpreter(Properties property){
super(property);
prepare();
}
/**
* Status of one file
*
* matches returned JSON
*/
public class OneFileStatus {
public long accessTime;
public int blockSize;
public int childrenNum;
public int fileId;
public String group;
public long length;
public long modificationTime;
public String owner;
public String pathSuffix;
public String permission;
public int replication;
public int storagePolicy;
public String type;
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("\nAccessTime = " + accessTime);
sb.append("\nBlockSize = " + blockSize);
sb.append("\nChildrenNum = " + childrenNum);
sb.append("\nFileId = " + fileId);
sb.append("\nGroup = " + group);
sb.append("\nLength = " + length);
sb.append("\nModificationTime = " + modificationTime);
sb.append("\nOwner = " + owner);
sb.append("\nPathSuffix = " + pathSuffix);
sb.append("\nPermission = " + permission);
sb.append("\nReplication = " + replication);
sb.append("\nStoragePolicy = " + storagePolicy);
sb.append("\nType = " + type);
return sb.toString();
}
}
/**
* Status of one file
*
* matches returned JSON
*/
public class SingleFileStatus {
public OneFileStatus FileStatus;
}
/**
* Status of all files in a directory
*
* matches returned JSON
*/
public class MultiFileStatus {
public OneFileStatus[] FileStatus;
}
/**
* Status of all files in a directory
*
* matches returned JSON
*/
public class AllFileStatus {
public MultiFileStatus FileStatuses;
}
// tests whether we're able to connect to HDFS
private void testConnection() {
try {
if (isDirectory("/"))
logger.info("Successfully created WebHDFS connection");
} catch (Exception e) {
logger.error("testConnection: Cannot open WebHDFS connection. Bad URL: " + "/", e);
exceptionOnConnect = e;
}
}
@Override
public void open() {
testConnection();
}
@Override
public void close() {
}
private String listDir(String path) throws Exception {
return cmd.runCommand(cmd.listStatus, path, null);
}
private String listPermission(OneFileStatus fs){
StringBuilder sb = new StringBuilder();
sb.append(fs.type.equalsIgnoreCase("Directory") ? 'd' : '-');
int p = Integer.parseInt(fs.permission, 16);
sb.append(((p & 0x400) == 0) ? '-' : 'r');
sb.append(((p & 0x200) == 0) ? '-' : 'w');
sb.append(((p & 0x100) == 0) ? '-' : 'x');
sb.append(((p & 0x40) == 0) ? '-' : 'r');
sb.append(((p & 0x20) == 0) ? '-' : 'w');
sb.append(((p & 0x10) == 0) ? '-' : 'x');
sb.append(((p & 0x4) == 0) ? '-' : 'r');
sb.append(((p & 0x2) == 0) ? '-' : 'w');
sb.append(((p & 0x1) == 0) ? '-' : 'x');
return sb.toString();
}
private String listDate(OneFileStatus fs) {
return new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(fs.modificationTime));
}
private String ListOne(String path, OneFileStatus fs) {
if (args.flags.contains(new Character('l'))) {
StringBuilder sb = new StringBuilder();
sb.append(listPermission(fs) + "\t");
sb.append(((fs.replication == 0) ? "-" : fs.replication) + "\t ");
sb.append(fs.owner + "\t");
sb.append(fs.group + "\t");
if (args.flags.contains(new Character('h'))){ //human readable
sb.append(humanReadableByteCount(fs.length) + "\t\t");
} else {
sb.append(fs.length + "\t");
}
sb.append(listDate(fs) + "GMT\t");
sb.append((path.length() == 1) ? path + fs.pathSuffix : path + '/' + fs.pathSuffix);
return sb.toString();
}
return fs.pathSuffix;
}
private String humanReadableByteCount(long bytes) {
int unit = 1024;
if (bytes < unit) return bytes + " B";
int exp = (int) (Math.log(bytes) / Math.log(unit));
String pre = "KMGTPE".charAt(exp - 1) + "";
return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
}
public String listFile(String filePath) {
try {
String str = cmd.runCommand(cmd.getFileStatus, filePath, null);
SingleFileStatus sfs = gson.fromJson(str, SingleFileStatus.class);
if (sfs != null) {
return ListOne(filePath, sfs.FileStatus);
}
} catch (Exception e) {
logger.error("listFile: " + filePath, e);
}
return "No such File or directory";
}
public String listAll(String path) {
String all = "";
if (exceptionOnConnect != null)
return "Error connecting to provided endpoint.";
try {
//see if directory.
if (isDirectory(path)) {
String sfs = listDir(path);
if (sfs != null) {
AllFileStatus allFiles = gson.fromJson(sfs, AllFileStatus.class);
if (allFiles != null &&
allFiles.FileStatuses != null &&
allFiles.FileStatuses.FileStatus != null)
{
for (OneFileStatus fs : allFiles.FileStatuses.FileStatus)
all = all + ListOne(path, fs) + '\n';
}
}
return all;
} else {
return listFile(path);
}
} catch (Exception e) {
logger.error("listall: listDir " + path, e);
throw new InterpreterException("Could not find file or directory:\t" + path);
}
}
public boolean isDirectory(String path) {
boolean ret = false;
if (exceptionOnConnect != null)
return ret;
try {
String str = cmd.runCommand(cmd.getFileStatus, path, null);
SingleFileStatus sfs = gson.fromJson(str, SingleFileStatus.class);
if (sfs != null)
return sfs.FileStatus.type.equals("DIRECTORY");
} catch (Exception e) {
logger.error("IsDirectory: " + path, e);
return false;
}
return ret;
}
@Override
public List<String> completion(String buf, int cursor) {
logger.info("Completion request at position\t" + cursor + " in string " + buf);
final List<String> suggestions = new ArrayList<>();
if (StringUtils.isEmpty(buf)) {
suggestions.add("ls");
suggestions.add("cd");
suggestions.add("pwd");
return suggestions;
}
//part of a command == no spaces
if (buf.split(" ").length == 1){
if ("cd".contains(buf)) suggestions.add("cd");
if ("ls".contains(buf)) suggestions.add("ls");
if ("pwd".contains(buf)) suggestions.add("pwd");
return suggestions;
}
// last word will contain the path we're working with.
String lastToken = buf.substring(buf.lastIndexOf(" ") + 1);
if (lastToken.startsWith("-")) { //flag not path
return null;
}
String localPath = ""; //all things before the last '/'
String unfinished = lastToken; //unfished filenames or directories
if (lastToken.contains("/")) {
localPath = lastToken.substring(0, lastToken.lastIndexOf('/') + 1);
unfinished = lastToken.substring(lastToken.lastIndexOf('/') + 1);
}
String globalPath = getNewPath(localPath); //adjust for cwd
if (isDirectory(globalPath)){
try {
String fileStatusString = listDir(globalPath);
if (fileStatusString != null) {
AllFileStatus allFiles = gson.fromJson(fileStatusString, AllFileStatus.class);
if (allFiles != null &&
allFiles.FileStatuses != null &&
allFiles.FileStatuses.FileStatus != null)
{
for (OneFileStatus fs : allFiles.FileStatuses.FileStatus) {
if (fs.pathSuffix.contains(unfinished)) {
//only suggest the text after the last .
String beforeLastPeriod = unfinished.substring(0, unfinished.lastIndexOf('.') + 1);
//beforeLastPeriod should be the start of fs.pathSuffix, so take the end of it.
String suggestedFinish = fs.pathSuffix.substring(beforeLastPeriod.length());
suggestions.add(suggestedFinish);
}
}
return suggestions;
}
}
} catch (Exception e) {
logger.error("listall: listDir " + globalPath, e);
return null;
}
} else {
logger.info("path is not a directory. No values suggested.");
}
//Error in string.
return null;
}
}

View file

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.file;
import com.google.gson.Gson;
import junit.framework.TestCase;
import static org.junit.Assert.*;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.Test;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.Properties;
import java.lang.Override;
import java.lang.String;
/**
* Tests Interpreter by running pre-determined commands against mock file system
*
*/
public class HDFSFileInterpreterTest extends TestCase {
@Test
public void test() {
HDFSFileInterpreter t = new MockHDFSFileInterpreter(new Properties());
t.open();
// We have info for /, /user, /tmp, /mr-history/done
// Ensure
// 1. ls -l works
// 2. paths (. and ..) are correctly handled
// 3. flags and arguments to commands are correctly handled
InterpreterResult result1 = t.interpret("ls -l /", null);
assertEquals(result1.type(), InterpreterResult.Type.TEXT);
InterpreterResult result2 = t.interpret("ls -l /./user/..", null);
assertEquals(result2.type(), InterpreterResult.Type.TEXT);
assertEquals(result1.message(), result2.message());
// Ensure you can do cd and after that the ls uses current directory correctly
InterpreterResult result3 = t.interpret("cd user", null);
assertEquals(result3.type(), InterpreterResult.Type.TEXT);
assertEquals(result3.message(), "OK");
InterpreterResult result4 = t.interpret("ls", null);
assertEquals(result4.type(), InterpreterResult.Type.TEXT);
InterpreterResult result5 = t.interpret("ls /user", null);
assertEquals(result5.type(), InterpreterResult.Type.TEXT);
assertEquals(result4.message(), result5.message());
// Ensure pwd works correctly
InterpreterResult result6 = t.interpret("pwd", null);
assertEquals(result6.type(), InterpreterResult.Type.TEXT);
assertEquals(result6.message(), "/user");
// Move a couple of levels and check we're in the right place
InterpreterResult result7 = t.interpret("cd ../mr-history/done", null);
assertEquals(result7.type(), InterpreterResult.Type.TEXT);
assertEquals(result7.message(), "OK");
InterpreterResult result8 = t.interpret("ls -l ", null);
assertEquals(result8.type(), InterpreterResult.Type.TEXT);
InterpreterResult result9 = t.interpret("ls -l /mr-history/done", null);
assertEquals(result9.type(), InterpreterResult.Type.TEXT);
assertEquals(result8.message(), result9.message());
InterpreterResult result10 = t.interpret("cd ../..", null);
assertEquals(result10.type(), InterpreterResult.Type.TEXT);
assertEquals(result7.message(), "OK");
InterpreterResult result11 = t.interpret("ls -l ", null);
assertEquals(result11.type(), InterpreterResult.Type.TEXT);
// we should be back to first result after all this navigation
assertEquals(result1.message(), result11.message());
t.close();
}
}
/**
* Store command results from curl against a real file system
*/
class MockFileSystem {
HashMap<String, String> mfs = new HashMap<String, String>();
void addListStatusData() {
mfs.put("/?op=LISTSTATUS",
"{\"FileStatuses\":{\"FileStatus\":[\n" +
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16389,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1438548219672,\"owner\":\"yarn\",\"pathSuffix\":\"app-logs\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16395,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548030045,\"owner\":\"hdfs\",\"pathSuffix\":\"hdp\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16390,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985336,\"owner\":\"mapred\",\"pathSuffix\":\"mapred\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":2,\"fileId\":16392,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985346,\"owner\":\"hdfs\",\"pathSuffix\":\"mr-history\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16400,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725,\"owner\":\"hdfs\",\"pathSuffix\":\"system\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548150089,\"owner\":\"hdfs\",\"pathSuffix\":\"tmp\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547921792,\"owner\":\"hdfs\",\"pathSuffix\":\"user\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" +
"]}}"
);
mfs.put("/user?op=LISTSTATUS",
"{\"FileStatuses\":{\"FileStatus\":[\n" +
" {\"accessTime\":0,\"blockSize\":0,\"childrenNum\":4,\"fileId\":16388,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253161263,\"owner\":\"ambari-qa\",\"pathSuffix\":\"ambari-qa\",\"permission\":\"770\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" +
" ]}}"
);
mfs.put("/tmp?op=LISTSTATUS",
"{\"FileStatuses\":{\"FileStatus\":[\n" +
" {\"accessTime\":1441253097489,\"blockSize\":134217728,\"childrenNum\":0,\"fileId\":16400,\"group\":\"hdfs\",\"length\":1645,\"modificationTime\":1441253097517,\"owner\":\"hdfs\",\"pathSuffix\":\"ida8c06540_date040315\",\"permission\":\"755\",\"replication\":3,\"storagePolicy\":0,\"type\":\"FILE\"}\n" +
" ]}}"
);
mfs.put("/mr-history/done?op=LISTSTATUS",
"{\"FileStatuses\":{\"FileStatus\":[\n" +
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16433,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197481,\"owner\":\"mapred\",\"pathSuffix\":\"2015\",\"permission\":\"770\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" +
"]}}"
);
}
void addGetFileStatusData() {
mfs.put("/?op=GETFILESTATUS",
"{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":7,\"fileId\":16385,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}");
mfs.put("/user?op=GETFILESTATUS",
"{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253043188,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}");
mfs.put("/tmp?op=GETFILESTATUS",
"{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253097489,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}");
mfs.put("/mr-history/done?op=GETFILESTATUS",
"{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16393,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197480,\"owner\":\"mapred\",\"pathSuffix\":\"\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}");
}
public void addMockData(HDFSCommand.Op op) {
if (op.op.equals("LISTSTATUS")) {
addListStatusData();
} else if (op.op.equals("GETFILESTATUS")) {
addGetFileStatusData();
}
// do nothing
}
public String get(String key) {
return mfs.get(key);
}
}
/**
* Run commands against mock file system that simulates webhdfs responses
*/
class MockHDFSCommand extends HDFSCommand {
MockFileSystem fs = null;
public MockHDFSCommand(String url, String user, Logger logger) {
super(url, user, logger, 1000);
fs = new MockFileSystem();
fs.addMockData(getFileStatus);
fs.addMockData(listStatus);
}
@Override
public String runCommand(Op op, String path, Arg[] args) throws Exception {
String error = checkArgs(op, path, args);
assertNull(error);
String c = path + "?op=" + op.op;
if (args != null) {
for (Arg a : args) {
c += "&" + a.key + "=" + a.value;
}
}
return fs.get(c);
}
}
/**
* Mock Interpreter - uses Mock HDFS command
*/
class MockHDFSFileInterpreter extends HDFSFileInterpreter {
@Override
public void prepare() {
// Run commands against mock File System instead of WebHDFS
cmd = new MockHDFSCommand("", "", logger);
gson = new Gson();
}
public MockHDFSFileInterpreter(Properties property) {
super(property);
}
}

View file

@ -98,6 +98,7 @@
<module>postgresql</module>
<module>jdbc</module>
<module>tajo</module>
<module>file</module>
<module>flink</module>
<module>ignite</module>
<module>kylin</module>
@ -193,6 +194,13 @@
<version>2.4</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>

View file

@ -93,7 +93,7 @@ public class SparkInterpreter extends Interpreter {
getSystemDefault("MASTER", "spark.master", "local[*]"),
"Spark master uri. ex) spark://masterhost:7077")
.add("spark.executor.memory",
getSystemDefault(null, "spark.executor.memory", "512m"),
getSystemDefault(null, "spark.executor.memory", ""),
"Executor memory per worker instance. ex) 512m, 32g")
.add("spark.cores.max",
getSystemDefault(null, "spark.cores.max", ""),
@ -774,13 +774,34 @@ public class SparkInterpreter extends Interpreter {
context.out.clear();
Code r = null;
String incomplete = "";
boolean inComment = false;
for (int l = 0; l < linesToRun.length; l++) {
String s = linesToRun[l];
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
if (l + 1 < linesToRun.length) {
String nextLine = linesToRun[l + 1].trim();
if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) {
boolean continuation = false;
if (nextLine.isEmpty()
|| nextLine.startsWith("//") // skip empty line or comment
|| nextLine.startsWith("}")
|| nextLine.startsWith("object")) { // include "} object" for Scala companion object
continuation = true;
} else if (!inComment && nextLine.startsWith("/*")) {
inComment = true;
continuation = true;
} else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
inComment = false;
continuation = true;
} else if (nextLine.length() > 1
&& nextLine.charAt(0) == '.'
&& nextLine.charAt(1) != '.' // ".."
&& nextLine.charAt(1) != '/') { // "./"
continuation = true;
} else if (inComment) {
continuation = true;
}
if (continuation) {
incomplete += s + "\n";
continue;
}

View file

@ -371,7 +371,11 @@ public class ZeppelinContext {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
String noteId = interpreterContext.getNoteId();
// try get local object
AngularObject ao = registry.get(name, interpreterContext.getNoteId(), null);
AngularObject paragraphAo = registry.get(name, noteId, interpreterContext.getParagraphId());
AngularObject noteAo = registry.get(name, noteId, null);
AngularObject ao = paragraphAo != null ? paragraphAo : noteAo;
if (ao == null) {
// then global object
ao = registry.get(name, null, null);

View file

@ -41,6 +41,9 @@ class Logger(object):
def reset(self):
self.out = ""
def flush(self):
pass
class PyZeppelinContext(dict):
def __init__(self, zc):

View file

@ -141,6 +141,17 @@ public class SparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
}
@Test
public void testNextLineComments() {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
}
@Test
public void testNextLineCompanionObject() {
String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}";
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, context).code());
}
@Test
public void testEndWithComment() {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());

View file

@ -57,6 +57,7 @@ The following components are provided under Apache License.
(Apache 2.0) javax.servlet (org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016 - http://www.eclipse.org/jetty)
(Apache 2.0) Joda-Time (joda-time:joda-time:2.8.1 - http://www.joda.org/joda-time/)
(Apache 2.0) Jackson (org.codehaus.jackson:jackson-core-asl:1.9.13 - http://jackson.codehaus.org/)
(Apache 2.0) Javassist (org.javassist:javassist:jar:3.18.1-GA:compile - http://jboss-javassist.github.io/javassist/)
(Apache 2.0) JetS3t (net.java.dev.jets3t:jets3t:jar:0.9.3) - http://www.jets3t.org/
(Apache 2.0) Jetty (org.eclipse.jetty:jetty - http://www.eclipse.org/jetty)
(Apache 2.0) mx4j (mx4j:mx4j:jar:3.0.2) - http://mx4j.sourceforge.net/
@ -187,7 +188,10 @@ CDDL license
The following components are provided under the CDDL License.
(CDDL 1.0) javax.activation (javax.activation:activation:jar:1.1.1 - http://java.sun.com/javase/technologies/desktop/javabeans/jaf/index.jsp)
(CDDL 1.0) java.annotation (javax.annotation:javax.annotation-api:jar:1.2:compile - http://jcp.org/en/jsr/detail?id=250)
(CDDL 1.1) Jersey (com.sun.jersey:jersey:jar:1.9 - https://jersey.java.net/)
(CDDL 1.1) jersey-core (org.glassfish.jersey.core:jersey-core:2.22.2 - https://jersey.java.net/)
(CDDL 1.1) hk2 (org.glassfish.hk2 - https://hk2.java.net/2.5.0-b03/)

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.display;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.apache.zeppelin.scheduler.ExecutorFactory;
@ -43,6 +44,15 @@ public class AngularObject<T> {
private String noteId; // noteId belonging to. null for global scope
private String paragraphId; // paragraphId belongs to. null for notebook scope
/**
* Public constructor, neccessary for the deserialization when using Thrift angularRegistryPush()
* Without public constructor, GSON library will instantiate the AngularObject using
* serialization so the <strong>watchers</strong> list won't be initialized and will throw
* NullPointerException the first time it is accessed
*/
public AngularObject() {
}
/**
* To create new AngularObject, use AngularObjectRegistry.add()
*
@ -111,17 +121,17 @@ public class AngularObject<T> {
@Override
public boolean equals(Object o) {
if (o instanceof AngularObject) {
AngularObject ao = (AngularObject) o;
if (noteId == null && ao.noteId == null ||
(noteId != null && ao.noteId != null && noteId.equals(ao.noteId))) {
if (paragraphId == null && ao.paragraphId == null ||
(paragraphId != null && ao.paragraphId != null && paragraphId.equals(ao.paragraphId))) {
return name.equals(ao.name);
}
}
}
return false;
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AngularObject<?> that = (AngularObject<?>) o;
return Objects.equals(name, that.name) &&
Objects.equals(noteId, that.noteId) &&
Objects.equals(paragraphId, that.paragraphId);
}
@Override
public int hashCode() {
return Objects.hash(name, noteId, paragraphId);
}
/**
@ -232,4 +242,14 @@ public class AngularObject<T> {
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("AngularObject{");
sb.append("noteId='").append(noteId).append('\'');
sb.append(", paragraphId='").append(paragraphId).append('\'');
sb.append(", object=").append(object);
sb.append(", name='").append(name).append('\'');
sb.append('}');
return sb.toString();
}
}

View file

@ -246,4 +246,12 @@ public class AngularObjectRegistry {
public String getInterpreterGroupId() {
return interpreterId;
}
public Map<String, Map<String, AngularObject>> getRegistry() {
return registry;
}
public void setRegistry(Map<String, Map<String, AngularObject>> registry) {
this.registry = registry;
}
}

View file

@ -48,6 +48,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
AngularObjectRegistry angularObjectRegistry;
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
ResourcePool resourcePool;
boolean angularRegistryPushed = false;
// map [notebook session, Interpreters in the group], to support per note session interpreters
//Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
@ -254,4 +255,12 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public ResourcePool getResourcePool() {
return resourcePool;
}
public boolean isAngularRegistryPushed() {
return angularRegistryPushed;
}
public void setAngularRegistryPushed(boolean angularRegistryPushed) {
this.angularRegistryPushed = angularRegistryPushed;
}
}

View file

@ -63,7 +63,7 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
Gson gson = new Gson();
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
return null;
return super.add(name, o, noteId, paragraphId, true);
}
Client client = null;
@ -97,7 +97,7 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
paragraphId) {
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
return null;
return super.remove(name, noteId, paragraphId);
}
Client client = null;

View file

@ -20,6 +20,8 @@ package org.apache.zeppelin.interpreter.remote;
import java.util.*;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
@ -128,10 +130,11 @@ public class RemoteInterpreter extends Interpreter {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
interpreterProcess.reference(getInterpreterGroup());
final InterpreterGroup interpreterGroup = getInterpreterGroup();
interpreterProcess.reference(interpreterGroup);
interpreterProcess.setMaxPoolSize(
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
String groupId = getInterpreterGroup().getId();
String groupId = interpreterGroup.getId();
synchronized (interpreterProcess) {
Client client = null;
@ -146,7 +149,14 @@ public class RemoteInterpreter extends Interpreter {
logger.info("Create remote interpreter {}", getClassName());
property.put("zeppelin.interpreter.localRepo", localRepoPath);
client.createInterpreter(groupId, noteId,
getClassName(), (Map) property);
getClassName(), (Map) property);
// Push angular object loaded from JSON file to remote interpreter
if (!interpreterGroup.isAngularRegistryPushed()) {
pushAngularObjectRegistryToRemote(client);
interpreterGroup.setAngularRegistryPushed(true);
}
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@ -387,4 +397,30 @@ public class RemoteInterpreter extends Interpreter {
Type.valueOf(result.getType()),
result.getMsg());
}
/**
* Push local angular object registry to
* remote interpreter. This method should be
* call ONLY inside the init() method
* @param client
* @throws TException
*/
void pushAngularObjectRegistryToRemote(Client client) throws TException {
final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
.getAngularObjectRegistry();
if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
.getRegistry();
logger.info("Push local angular object registry from ZeppelinServer to" +
" remote interpreter group {}", this.getInterpreterGroup().getId());
final java.lang.reflect.Type registryType = new TypeToken<Map<String,
Map<String, AngularObject>>>() {}.getType();
Gson gson = new Gson();
client.angularRegistryPush(gson.toJson(registry, registryType));
}
}
}

View file

@ -199,7 +199,7 @@ public class RemoteInterpreterEventPoller extends Thread {
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
List<String> resourceList = client.resoucePoolGetAll();
List<String> resourceList = client.resourcePoolGetAll();
Gson gson = new Gson();
for (String res : resourceList) {
resourceSet.add(gson.fromJson(res, Resource.class));
@ -260,7 +260,10 @@ public class RemoteInterpreterEventPoller extends Thread {
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
ByteBuffer res = client.resourceGet(resourceId.getName());
ByteBuffer res = client.resourceGet(
resourceId.getNoteId(),
resourceId.getParagraphId(),
resourceId.getName());
Object o = Resource.deserializeObject(res);
return o;
} catch (Exception e) {

View file

@ -343,12 +343,22 @@ public class RemoteInterpreterServer
}
String interpreterResultMessage = result.message();
InterpreterResult combinedResult;
if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
message += interpreterResultMessage;
return new InterpreterResult(result.code(), result.type(), message);
combinedResult = new InterpreterResult(result.code(), result.type(), message);
} else {
return new InterpreterResult(result.code(), outputType, message);
combinedResult = new InterpreterResult(result.code(), outputType, message);
}
// put result into resource pool
context.getResourcePool().put(
context.getNoteId(),
context.getParagraphId(),
WellKnownResourceName.ParagraphResult.toString(),
combinedResult);
return combinedResult;
} finally {
InterpreterContext.remove();
}
@ -636,7 +646,7 @@ public class RemoteInterpreterServer
}
@Override
public List<String> resoucePoolGetAll() throws TException {
public List<String> resourcePoolGetAll() throws TException {
logger.debug("Request getAll from ZeppelinServer");
ResourceSet resourceSet = resourcePool.getAll(false);
@ -651,9 +661,17 @@ public class RemoteInterpreterServer
}
@Override
public ByteBuffer resourceGet(String resourceName) throws TException {
public boolean resourceRemove(String noteId, String paragraphId, String resourceName)
throws TException {
Resource resource = resourcePool.remove(noteId, paragraphId, resourceName);
return resource != null;
}
@Override
public ByteBuffer resourceGet(String noteId, String paragraphId, String resourceName)
throws TException {
logger.debug("Request resourceGet {} from ZeppelinServer", resourceName);
Resource resource = resourcePool.get(resourceName, false);
Resource resource = resourcePool.get(noteId, paragraphId, resourceName, false);
if (resource == null || resource.get() == null || !resource.isSerializable()) {
return ByteBuffer.allocate(0);
@ -666,4 +684,16 @@ public class RemoteInterpreterServer
}
}
}
@Override
public void angularRegistryPush(String registryAsString) throws TException {
try {
Map<String, Map<String, AngularObject>> deserializedRegistry = gson
.fromJson(registryAsString,
new TypeToken<Map<String, Map<String, AngularObject>>>() { }.getType());
interpreterGroup.getAngularObjectRegistry().setRegistry(deserializedRegistry);
} catch (Exception e) {
logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e);
}
}
}

View file

@ -1,22 +1,5 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.3)
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-02-16")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -1,22 +1,5 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.3)
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-02-16")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -1,22 +1,5 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.3)
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
@ -37,7 +20,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
RESOURCE_POOL_GET_ALL(6),
RESOURCE_GET(7),
OUTPUT_APPEND(8),
OUTPUT_UPDATE(9);
OUTPUT_UPDATE(9),
ANGULAR_REGISTRY_PUSH(10);
private final int value;
@ -76,6 +60,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return OUTPUT_APPEND;
case 9:
return OUTPUT_UPDATE;
case 10:
return ANGULAR_REGISTRY_PUSH;
default:
return null;
}

View file

@ -1,22 +1,5 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.3)
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-02-16")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -33,6 +33,11 @@ public class DistributedResourcePool extends LocalResourcePool {
return get(name, true);
}
@Override
public Resource get(String noteId, String paragraphId, String name) {
return get(noteId, paragraphId, name, true);
}
/**
* get resource by name.
* @param name
@ -58,6 +63,35 @@ public class DistributedResourcePool extends LocalResourcePool {
}
}
/**
* get resource by name.
* @param name
* @param remote false only return from local resource
* @return null if resource not found.
*/
public Resource get(String noteId, String paragraphId, String name, boolean remote) {
// try local first
Resource resource = super.get(noteId, paragraphId, name);
if (resource != null) {
return resource;
}
if (remote) {
ResourceSet resources = connector.getAllResources()
.filterByNoteId(noteId)
.filterByParagraphId(paragraphId)
.filterByName(name);
if (resources.isEmpty()) {
return null;
} else {
return resources.get(0);
}
} else {
return null;
}
}
@Override
public ResourceSet getAll() {
return getAll(true);

View file

@ -52,6 +52,12 @@ public class LocalResourcePool implements ResourcePool {
return resources.get(resourceId);
}
@Override
public Resource get(String noteId, String paragraphId, String name) {
ResourceId resourceId = new ResourceId(resourcePoolId, noteId, paragraphId, name);
return resources.get(resourceId);
}
@Override
public ResourceSet getAll() {
return new ResourceSet(resources.values());
@ -70,8 +76,21 @@ public class LocalResourcePool implements ResourcePool {
resources.put(resourceId, resource);
}
@Override
public void put(String noteId, String paragraphId, String name, Object object) {
ResourceId resourceId = new ResourceId(resourcePoolId, noteId, paragraphId, name);
Resource resource = new Resource(resourceId, object);
resources.put(resourceId, resource);
}
@Override
public Resource remove(String name) {
return resources.remove(new ResourceId(resourcePoolId, name));
}
@Override
public Resource remove(String noteId, String paragraphId, String name) {
return resources.remove(new ResourceId(resourcePoolId, noteId, paragraphId, name));
}
}

View file

@ -22,9 +22,20 @@ package org.apache.zeppelin.resource;
public class ResourceId {
private final String resourcePoolId;
private final String name;
private final String noteId;
private final String paragraphId;
ResourceId(String resourcePoolId, String name) {
this.resourcePoolId = resourcePoolId;
this.noteId = null;
this.paragraphId = null;
this.name = name;
}
ResourceId(String resourcePoolId, String noteId, String paragraphId, String name) {
this.resourcePoolId = resourcePoolId;
this.noteId = noteId;
this.paragraphId = paragraphId;
this.name = name;
}
@ -36,16 +47,35 @@ public class ResourceId {
return name;
}
public String getNoteId() {
return noteId;
}
public String getParagraphId() {
return paragraphId;
}
@Override
public int hashCode() {
return (resourcePoolId + name).hashCode();
return (resourcePoolId + noteId + paragraphId + name).hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof ResourceId) {
ResourceId r = (ResourceId) o;
return (r.name.equals(name) && r.resourcePoolId.equals(resourcePoolId));
return equals(r.name, name) && equals(r.resourcePoolId, resourcePoolId) &&
equals(r.noteId, noteId) && equals(r.paragraphId, paragraphId);
} else {
return false;
}
}
private boolean equals(String a, String b) {
if (a == null && b == null) {
return true;
} else if (a != null && b != null) {
return a.equals(b);
} else {
return false;
}

View file

@ -33,6 +33,15 @@ public interface ResourcePool {
*/
public Resource get(String name);
/**
* Get resource from name
* @param noteId
* @param paragraphId
* @param name Resource name
* @return null if resource not found
*/
public Resource get(String noteId, String paragraphId, String name);
/**
* Get all resources
* @return
@ -46,10 +55,31 @@ public interface ResourcePool {
*/
public void put(String name, Object object);
/**
* Put an object into resource pool
* Given noteId and paragraphId is identifying resource along with name.
* Object will be automatically removed on related note or paragraph removal.
*
* @param noteId
* @param paragraphId
* @param name
* @param object
*/
public void put(String noteId, String paragraphId, String name, Object object);
/**
* Remove object
* @param name Resource name to remove
* @return removed Resource. null if resource not found
*/
public Resource remove(String name);
/**
* Remove object
* @param noteId
* @param paragraphId
* @param name Resource name to remove
* @return removed Resource. null if resource not found
*/
public Resource remove(String noteId, String paragraphId, String name);
}

View file

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.resource;
import com.google.gson.Gson;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.slf4j.Logger;
import java.util.List;
/**
* Utilities for ResourcePool
*/
public class ResourcePoolUtils {
static Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolUtils.class);
public static ResourceSet getAllResources() {
return getAllResourcesExcept(null);
}
public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
ResourceSet resourceSet = new ResourceSet();
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
if (interpreterGroupExcludsion != null &&
intpGroup.getId().equals(interpreterGroupExcludsion)) {
continue;
}
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
ResourcePool localPool = intpGroup.getResourcePool();
if (localPool != null) {
resourceSet.addAll(localPool.getAll());
}
} else if (remoteInterpreterProcess.isRunning()) {
RemoteInterpreterService.Client client = null;
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
List<String> resourceList = client.resourcePoolGetAll();
Gson gson = new Gson();
for (String res : resourceList) {
resourceSet.add(gson.fromJson(res, Resource.class));
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
broken = true;
} finally {
if (client != null) {
intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
}
}
}
}
return resourceSet;
}
public static void removeResourcesBelongsToNote(String noteId) {
removeResourcesBelongsToParagraph(noteId, null);
}
public static void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
ResourceSet resourceSet = new ResourceSet();
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
ResourcePool localPool = intpGroup.getResourcePool();
if (localPool != null) {
resourceSet.addAll(localPool.getAll());
}
if (noteId != null) {
resourceSet = resourceSet.filterByNoteId(noteId);
}
if (paragraphId != null) {
resourceSet = resourceSet.filterByParagraphId(paragraphId);
}
for (Resource r : resourceSet) {
localPool.remove(
r.getResourceId().getNoteId(),
r.getResourceId().getParagraphId(),
r.getResourceId().getName());
}
} else if (remoteInterpreterProcess.isRunning()) {
RemoteInterpreterService.Client client = null;
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
List<String> resourceList = client.resourcePoolGetAll();
Gson gson = new Gson();
for (String res : resourceList) {
resourceSet.add(gson.fromJson(res, Resource.class));
}
if (noteId != null) {
resourceSet = resourceSet.filterByNoteId(noteId);
}
if (paragraphId != null) {
resourceSet = resourceSet.filterByParagraphId(paragraphId);
}
for (Resource r : resourceSet) {
client.resourceRemove(
r.getResourceId().getNoteId(),
r.getResourceId().getParagraphId(),
r.getResourceId().getName());
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
broken = true;
} finally {
if (client != null) {
intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
}
}
}
}
}
}

View file

@ -72,4 +72,34 @@ public class ResourceSet extends LinkedList<Resource> {
}
return result;
}
public ResourceSet filterByNoteId(String noteId) {
ResourceSet result = new ResourceSet();
for (Resource r : this) {
if (equals(r.getResourceId().getNoteId(), noteId)) {
result.add(r);
}
}
return result;
}
public ResourceSet filterByParagraphId(String paragraphId) {
ResourceSet result = new ResourceSet();
for (Resource r : this) {
if (equals(r.getResourceId().getParagraphId(), paragraphId)) {
result.add(r);
}
}
return result;
}
private boolean equals(String a, String b) {
if (a == null && b == null) {
return true;
} else if (a != null && b != null) {
return a.equals(b);
} else {
return false;
}
}
}

View file

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.resource;
/**
* Well known resource names in ResourcePool
*/
public enum WellKnownResourceName {
ParagraphResult("zeppelin.paragraph.result"); // paragraph run result
String name;
WellKnownResourceName(String name) {
this.name = name;
}
public String toString() {
return name;
}
}

View file

@ -47,7 +47,8 @@ enum RemoteInterpreterEventType {
RESOURCE_POOL_GET_ALL = 6,
RESOURCE_GET = 7
OUTPUT_APPEND = 8,
OUTPUT_UPDATE = 9
OUTPUT_UPDATE = 9,
ANGULAR_REGISTRY_PUSH=10
}
struct RemoteInterpreterEvent {
@ -76,12 +77,15 @@ service RemoteInterpreterService {
// as a response, ZeppelinServer send serialized value of resource
void resourceResponseGet(1: string resourceId, 2: binary object);
// get all resources in the interpreter process
list<string> resoucePoolGetAll();
list<string> resourcePoolGetAll();
// get value of resource
binary resourceGet(1: string resourceName);
binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName);
// remove resource
bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName);
void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
object);
void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
void angularRegistryPush(1: string registry);
}

View file

@ -28,7 +28,10 @@ import java.util.Map;
import java.util.Properties;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
@ -42,6 +45,10 @@ import org.apache.zeppelin.scheduler.Scheduler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
public class RemoteInterpreterTest {
@ -664,4 +671,29 @@ public class RemoteInterpreterTest {
assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
}
@Test
public void should_push_local_angular_repo_to_remote() throws Exception {
//Given
final Client client = Mockito.mock(Client.class);
final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
MockInterpreterA.class.getName(), "runner", "path","localRepo", env, 10 * 1000, null);
final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
interpreterGroup.setAngularObjectRegistry(registry);
intr.setInterpreterGroup(interpreterGroup);
final java.lang.reflect.Type registryType = new TypeToken<Map<String,
Map<String, AngularObject>>>() {}.getType();
final Gson gson = new Gson();
final String expected = gson.toJson(registry.getRegistry(), registryType);
//When
intr.pushAngularObjectRegistryToRemote(client);
//Then
Mockito.verify(client).angularRegistryPush(expected);
}
}

View file

@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
public class MockInterpreterResourcePool extends Interpreter {
@ -61,9 +62,18 @@ public class MockInterpreterResourcePool extends Interpreter {
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] stmt = st.split(" ");
String cmd = stmt[0];
String noteId = null;
String paragraphId = null;
String name = null;
if (stmt.length >= 2) {
name = stmt[1];
String[] npn = stmt[1].split(":");
if (npn.length == 3) {
noteId = npn[0];
paragraphId = npn[1];
name = npn[2];
} else {
name = stmt[1];
}
}
String value = null;
if (stmt.length == 3) {
@ -73,11 +83,16 @@ public class MockInterpreterResourcePool extends Interpreter {
ResourcePool resourcePool = context.getResourcePool();
Object ret = null;
if (cmd.equals("put")) {
resourcePool.put(name, value);
resourcePool.put(noteId, paragraphId, name, value);
} else if (cmd.equalsIgnoreCase("get")) {
ret = resourcePool.get(name).get();
Resource resource = resourcePool.get(noteId, paragraphId, name);
if (resource != null) {
ret = resourcePool.get(noteId, paragraphId, name).get();
} else {
ret = "";
}
} else if (cmd.equals("remove")) {
ret = resourcePool.remove(name);
ret = resourcePool.remove(noteId, paragraphId, name);
} else if (cmd.equals("getAll")) {
ret = resourcePool.getAll();
}

View file

@ -136,12 +136,13 @@ public class DistributedResourcePoolTest {
InterpreterResult ret;
intp1.interpret("put key1 value1", context);
intp2.interpret("put key2 value2", context);
int numInterpreterResult = 2;
ret = intp1.interpret("getAll", context);
assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size());
ret = intp2.interpret("getAll", context);
assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size());
ret = intp1.interpret("get key1", context);
assertEquals("value1", gson.fromJson(ret.message(), String.class));
@ -201,4 +202,44 @@ public class DistributedResourcePoolTest {
assertEquals("value1", pool1.getAll().get(0).get());
assertEquals("value2", pool1.getAll().get(1).get());
}
@Test
public void testResourcePoolUtils() {
Gson gson = new Gson();
InterpreterResult ret;
// when create some resources
intp1.interpret("put note1:paragraph1:key1 value1", context);
intp1.interpret("put note1:paragraph2:key1 value2", context);
intp2.interpret("put note2:paragraph1:key1 value1", context);
intp2.interpret("put note2:paragraph2:key2 value2", context);
int numInterpreterResult = 2;
// then get all resources.
assertEquals(numInterpreterResult + 4, ResourcePoolUtils.getAllResources().size());
// when remove all resources from note1
ResourcePoolUtils.removeResourcesBelongsToNote("note1");
// then resources should be removed.
assertEquals(numInterpreterResult + 2, ResourcePoolUtils.getAllResources().size());
assertEquals("", gson.fromJson(
intp1.interpret("get note1:paragraph1:key1", context).message(),
String.class));
assertEquals("", gson.fromJson(
intp1.interpret("get note1:paragraph2:key1", context).message(),
String.class));
// when remove all resources from note2:paragraph1
ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
// then 1
assertEquals(numInterpreterResult + 1, ResourcePoolUtils.getAllResources().size());
assertEquals("value2", gson.fromJson(
intp1.interpret("get note2:paragraph2:key2", context).message(),
String.class));
}
}

View file

@ -191,6 +191,11 @@
</exclusions>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>

View file

@ -103,6 +103,8 @@ public class Message {
ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated,
ANGULAR_OBJECT_CLIENT_BIND, // [c-s] angular object updated from AngularJS z object
LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations
CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
// @param settings serialized Map<String, String> object
@ -131,4 +133,17 @@ public class Message {
public Object get(String k) {
return data.get(k);
}
public <T> T getType(String key) {
return (T) data.get(key);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Message{");
sb.append("data=").append(data);
sb.append(", op=").append(op);
sb.append('}');
return sb.toString();
}
}

View file

@ -16,6 +16,15 @@
*/
package org.apache.zeppelin.socket;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
@ -25,6 +34,8 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
@ -43,13 +54,6 @@ import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Zeppelin websocket service.
*
@ -99,6 +103,11 @@ public class NotebookServer extends WebSocketServlet implements
LOG.debug("RECEIVE PRINCIPAL << " + messagereceived.principal);
LOG.debug("RECEIVE TICKET << " + messagereceived.ticket);
LOG.debug("RECEIVE ROLES << " + messagereceived.roles);
if (LOG.isTraceEnabled()) {
LOG.trace("RECEIVE MSG = " + messagereceived);
}
String ticket = TicketContainer.instance.getTicket(messagereceived.principal);
if (ticket != null && !ticket.equals(messagereceived.ticket))
throw new Exception("Invalid ticket " + messagereceived.ticket + " != " + ticket);
@ -178,6 +187,9 @@ public class NotebookServer extends WebSocketServlet implements
case ANGULAR_OBJECT_UPDATED:
angularObjectUpdated(conn, userAndRoles, notebook, messagereceived);
break;
case ANGULAR_OBJECT_CLIENT_BIND:
angularObjectClientBind(conn, userAndRoles, notebook, messagereceived);
break;
case LIST_CONFIGURATIONS:
sendAllConfigurations(conn, userAndRoles, notebook);
break;
@ -205,7 +217,7 @@ public class NotebookServer extends WebSocketServlet implements
return gson.fromJson(msg, Message.class);
}
private String serializeMessage(Message m) {
protected String serializeMessage(Message m) {
return gson.toJson(m);
}
@ -716,6 +728,91 @@ public class NotebookServer extends WebSocketServlet implements
}
}
/**
* Push the given Angular variable to the target
* interpreter angular registry given a noteId
* and a paragraph id
* @param conn
* @param notebook
* @param fromMessage
* @throws Exception
*/
protected void angularObjectClientBind(NotebookSocket conn, HashSet<String> userAndRoles,
Notebook notebook, Message fromMessage)
throws Exception {
String noteId = fromMessage.getType("noteId");
String varName = fromMessage.getType("name");
Object varValue = fromMessage.get("value");
String paragraphId = fromMessage.getType("paragraphId");
Note note = notebook.getNote(noteId);
if (paragraphId == null) {
throw new IllegalArgumentException("target paragraph not specified for " +
"angular value bind");
}
if (note != null) {
final InterpreterGroup interpreterGroup = findInterpreterGroupForParagraph(note,
paragraphId);
final AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
RemoteAngularObjectRegistry remoteRegistry = (RemoteAngularObjectRegistry) registry;
pushAngularObjectToRemoteRegistry(noteId, paragraphId, varName, varValue, remoteRegistry,
interpreterGroup.getId(), conn);
} else {
pushAngularObjectToLocalRepo(noteId, paragraphId, varName, varValue, registry,
interpreterGroup.getId(), conn);
}
}
}
private InterpreterGroup findInterpreterGroupForParagraph(Note note, String paragraphId)
throws Exception {
final Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph == null) {
throw new IllegalArgumentException("Unknown paragraph with id : " + paragraphId);
}
return paragraph.getCurrentRepl().getInterpreterGroup();
}
private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId,
String varName, Object varValue, RemoteAngularObjectRegistry remoteRegistry,
String interpreterGroupId, NotebookSocket conn) {
final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue,
noteId, paragraphId);
this.broadcastExcept(
noteId,
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", noteId)
.put("paragraphId", paragraphId),
conn);
}
private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName,
Object varValue, AngularObjectRegistry registry,
String interpreterGroupId, NotebookSocket conn) {
AngularObject angularObject = registry.get(varName, noteId, paragraphId);
if (angularObject == null) {
angularObject = registry.add(varName, varValue, noteId, paragraphId);
} else {
angularObject.set(varValue, true);
}
this.broadcastExcept(
noteId,
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", noteId)
.put("paragraphId", paragraphId),
conn);
}
private void moveParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");

View file

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.display;
public class AngularObjectBuilder {
public static <T> AngularObject<T> build(String varName, T value, String noteId,
String paragraphId) {
return new AngularObject<>(varName, value, noteId, paragraphId, null);
}
}

View file

@ -20,8 +20,13 @@
package org.apache.zeppelin.socket;
import com.google.gson.Gson;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectBuilder;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
@ -36,8 +41,10 @@ import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.List;
import static java.util.Arrays.asList;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@ -157,6 +164,104 @@ public class NotebookServerTest extends AbstractTestRestApi {
notebook.removeNote(note.getId());
}
@Test
public void should_bind_angular_object_to_remote_for_paragraphs() throws Exception {
//Given
final String varName = "name";
final String value = "DuyHai DOAN";
final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_BIND)
.put("noteId", "noteId")
.put("name", varName)
.put("value", value)
.put("paragraphId", "paragraphId");
final NotebookServer server = new NotebookServer();
final Notebook notebook = mock(Notebook.class);
final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
when(notebook.getNote("noteId")).thenReturn(note);
final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
when(note.getParagraph("paragraphId")).thenReturn(paragraph);
final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class);
final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
mdGroup.setAngularObjectRegistry(mdRegistry);
when(paragraph.getCurrentRepl().getInterpreterGroup()).thenReturn(mdGroup);
final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraphId");
when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraphId")).thenReturn(ao1);
NotebookSocket conn = mock(NotebookSocket.class);
NotebookSocket otherConn = mock(NotebookSocket.class);
final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", ao1)
.put("interpreterGroupId", "mdGroup")
.put("noteId", "noteId")
.put("paragraphId", "paragraphId"));
server.noteSocketMap.put("noteId", asList(conn, otherConn));
// When
server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived);
// Then
verify(mdRegistry, never()).addAndNotifyRemoteProcess(varName, value, "noteId", null);
verify(otherConn).send(mdMsg1);
}
@Test
public void should_bind_angular_object_to_local_for_paragraphs() throws Exception {
//Given
final String varName = "name";
final String value = "DuyHai DOAN";
final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_BIND)
.put("noteId", "noteId")
.put("name", varName)
.put("value", value)
.put("paragraphId", "paragraphId");
final NotebookServer server = new NotebookServer();
final Notebook notebook = mock(Notebook.class);
final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
when(notebook.getNote("noteId")).thenReturn(note);
final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
when(note.getParagraph("paragraphId")).thenReturn(paragraph);
final AngularObjectRegistry mdRegistry = mock(AngularObjectRegistry.class);
final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
mdGroup.setAngularObjectRegistry(mdRegistry);
when(paragraph.getCurrentRepl().getInterpreterGroup()).thenReturn(mdGroup);
final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraphId");
when(mdRegistry.add(varName, value, "noteId", "paragraphId")).thenReturn(ao1);
NotebookSocket conn = mock(NotebookSocket.class);
NotebookSocket otherConn = mock(NotebookSocket.class);
final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", ao1)
.put("interpreterGroupId", "mdGroup")
.put("noteId", "noteId")
.put("paragraphId", "paragraphId"));
server.noteSocketMap.put("noteId", asList(conn, otherConn));
// When
server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived);
// Then
verify(otherConn).send(mdMsg1);
}
private NotebookSocket createWebSocket() {
NotebookSocket sock = mock(NotebookSocket.class);
when(sock.getRequest()).thenReturn(createHttpServletRequest());

View file

@ -158,7 +158,7 @@ limitations under the License.
<span style="position:relative; top:2px; margin-right:4px; cursor:pointer;"
ng-click="togglePermissions()"
tooltip-placement="bottom" tooltip="Note permissions">
<i class="fa fa-lock" ng-style="{color: showSetting ? '#3071A9' : 'black' }"></i>
<i class="fa fa-lock" ng-style="{color: showPermissions ? '#3071A9' : 'black' }"></i>
</span>
<span class="btn-group">

View file

@ -14,30 +14,6 @@ limitations under the License.
<!-- Here the controller <NotebookCtrl> is not needed because explicitly set in the app.js (route) -->
<div ng-include src="'app/notebook/notebook-actionBar.html'"></div>
<div style="padding-top: 36px;">
<!-- permissions -->
<div ng-if="showPermissions" class="permissions">
<div>
<h4>Note Permissions (Only note owners can change)</h4>
</div>
<hr />
<div>
<p>
Enter comma separated users and groups in the fields. <br />
Empty field (*) implies anyone can do the operation.
</p>
<div class="permissionsForm"
data-ng-model="permissions">
<p>Owners : <input ng-list ng-model="permissions.owners" placeholder="*"> Owners can change permissions, read and write the note. </p>
<p>Readers : <input ng-list ng-model="permissions.readers" placeholder="*"> Readers can only read the note.</p>
<p>Writers : <input ng-list ng-model="permissions.writers" placeholder="*"> Writers can read and write the note.</p>
</div>
</div>
<br />
<div>
<button class="btn btn-primary" ng-click="savePermissions()">Save</button>
<button class="btn btn-default" ng-click="closePermissions()">Cancel</button>
</div>
</div>
<!-- settings -->
<div ng-if="showSetting" class="setting">
<div>
@ -81,6 +57,31 @@ limitations under the License.
</div>
</div>
<!-- permissions -->
<div ng-if="showPermissions" class="permissions">
<div>
<h4>Note Permissions (Only note owners can change)</h4>
</div>
<hr />
<div>
<p>
Enter comma separated users and groups in the fields. <br />
Empty field (*) implies anyone can do the operation.
</p>
<div class="permissionsForm"
data-ng-model="permissions">
<p>Owners : <input ng-list ng-model="permissions.owners" placeholder="*"> Owners can change permissions, read and write the note. </p>
<p>Readers : <input ng-list ng-model="permissions.readers" placeholder="*"> Readers can only read the note.</p>
<p>Writers : <input ng-list ng-model="permissions.writers" placeholder="*"> Writers can read and write the note.</p>
</div>
</div>
<br />
<div>
<button class="btn btn-primary" ng-click="savePermissions()">Save</button>
<button class="btn btn-default" ng-click="closePermissions()">Cancel</button>
</div>
</div>
<div class="note-jump"></div>
<!-- Include the paragraphs according to the note -->

View file

@ -23,11 +23,22 @@ angular.module('zeppelinWebApp')
$scope.editor = null;
var paragraphScope = $rootScope.$new(true, $rootScope);
// to keep backward compatibility
$scope.compiledScope = paragraphScope;
var angularObjectRegistry = {};
paragraphScope.z = {
// Example: z.angularBind('my_var', 'Test Value', '20150213-231621_168813393')
angularBind: function(varName, value, paragraphId) {
// Only push to server if there paragraphId is defined
if (paragraphId) {
websocketMsgSrv.clientBindAngularObject($routeParams.noteId, varName, value, paragraphId);
}
}
};
var angularObjectRegistry = {};
var editorModes = {
'ace/mode/scala': /^%spark/,

View file

@ -70,6 +70,18 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope,
});
},
clientBindAngularObject: function(noteId, name, value, paragraphId) {
websocketEvents.sendNewEvent({
op: 'ANGULAR_OBJECT_CLIENT_BIND',
data: {
noteId: noteId,
name: name,
value: value,
paragraphId: paragraphId
}
});
},
cancelParagraphRun: function(paragraphId) {
websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}});
},

View file

@ -450,6 +450,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
+ "org.apache.zeppelin.alluxio.AlluxioInterpreter,"
+ "org.apache.zeppelin.file.HDFSFileInterpreter,"
+ "org.apache.zeppelin.phoenix.PhoenixInterpreter,"
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+ "org.apache.zeppelin.tajo.TajoInterpreter,"

View file

@ -434,6 +434,8 @@ public class InterpreterFactory {
angularObjectRegistry = new AngularObjectRegistry(
id,
angularObjectRegistryListener);
// TODO(moon) : create distributed resource pool for local interpreters and set
}
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);

View file

@ -32,6 +32,7 @@ import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.utility.IdHashes;
import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@ -208,6 +209,8 @@ public class Note implements Serializable, JobListener {
* @return a paragraph that was deleted, or <code>null</code> otherwise
*/
public Paragraph removeParagraph(String paragraphId) {
removeAllAngularObjectInParagraph(paragraphId);
ResourcePoolUtils.removeResourcesBelongsToParagraph(id(), paragraphId);
synchronized (paragraphs) {
Iterator<Paragraph> i = paragraphs.iterator();
while (i.hasNext()) {
@ -220,7 +223,7 @@ public class Note implements Serializable, JobListener {
}
}
removeAllAngularObjectInParagraph(paragraphId);
return null;
}

View file

@ -40,6 +40,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.quartz.CronScheduleBuilder;
@ -307,6 +308,8 @@ public class Notebook {
}
}
ResourcePoolUtils.removeResourcesBelongsToNote(id);
try {
note.unpersist();
} catch (IOException e) {

View file

@ -35,6 +35,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
/**
* Paragraph is a representation of an execution unit.
*
@ -52,6 +54,13 @@ public class Paragraph extends Job implements Serializable, Cloneable {
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
public final GUI settings; // form and parameter settings
@VisibleForTesting
Paragraph() {
super(generateId(), null);
config = new HashMap<>();
settings = new GUI();
}
public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoader) {
super(generateId(), listener);
this.note = note;
@ -163,6 +172,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
return replLoader.get(name);
}
public Interpreter getCurrentRepl() {
return getRepl(getRequiredReplName());
}
public List<String> completion(String buffer, int cursor) {
String replName = getRequiredReplName(buffer);
if (replName != null) {

View file

@ -45,13 +45,20 @@ public class MockInterpreter1 extends Interpreter{
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterResult result;
if ("getId".equals(st)) {
// get unique id of this interpreter instance
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
} else {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
}
if (context.getResourcePool() != null) {
context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result);
}
return result;
}
@Override

View file

@ -45,7 +45,19 @@ public class MockInterpreter2 extends Interpreter{
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: "+st);
InterpreterResult result;
if ("getId".equals(st)) {
// get unique id of this interpreter instance
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
} else {
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
}
if (context.getResourcePool() != null) {
context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result);
}
return result;
}
@Override

View file

@ -38,6 +38,8 @@ import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@ -325,6 +327,33 @@ public class NotebookTest implements JobListenerFactory{
assertEquals(cp.getResult().message(), p.getResult().message());
}
@Test
public void testResourceRemovealOnParagraphNoteRemove() throws IOException {
Note note = notebook.createNote();
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
}
Paragraph p1 = note.addParagraph();
p1.setText("hello");
Paragraph p2 = note.addParagraph();
p2.setText("%mock2 world");
note.runAll();
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
while(p2.isTerminated()==false || p2.getResult()==null) Thread.yield();
assertEquals(2, ResourcePoolUtils.getAllResources().size());
// remove a paragraph
note.removeParagraph(p1.getId());
assertEquals(1, ResourcePoolUtils.getAllResources().size());
// remove note
notebook.removeNote(note.id());
assertEquals(0, ResourcePoolUtils.getAllResources().size());
}
@Test
public void testAngularObjectRemovalOnNotebookRemove() throws InterruptedException,
IOException {