pull upstream master & fix some details

This commit is contained in:
hyonzin 2016-10-18 18:22:16 +09:00
commit 5fa270da51
56 changed files with 2382 additions and 489 deletions

View file

@ -149,6 +149,28 @@ elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then
else
echo "HBASE_HOME and HBASE_CONF_DIR are not set, configuration might not be loaded"
fi
elif [[ "${INTERPRETER_ID}" == "pig" ]]; then
# autodetect HADOOP_CONF_HOME by heuristic
if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
elif [[ -d "/etc/hadoop/conf" ]]; then
export HADOOP_CONF_DIR="/etc/hadoop/conf"
fi
fi
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
fi
# autodetect TEZ_CONF_DIR
if [[ -n "${TEZ_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${TEZ_CONF_DIR}"
elif [[ -d "/etc/tez/conf" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":/etc/tez/conf"
else
echo "TEZ_CONF_DIR is not set, configuration might not be loaded"
fi
fi
addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"

View file

@ -32,6 +32,7 @@ kylin org.apache.zeppelin:zeppelin-kylin:0.6.1 Kylin in
lens org.apache.zeppelin:zeppelin-lens:0.6.1 Lens interpreter
livy org.apache.zeppelin:zeppelin-livy:0.6.1 Livy interpreter
md org.apache.zeppelin:zeppelin-markdown:0.6.1 Markdown support
pig org.apache.zeppelin:zeppelin-pig:0.6.1 Pig interpreter
postgresql org.apache.zeppelin:zeppelin-postgresql:0.6.1 Postgresql interpreter
python org.apache.zeppelin:zeppelin-python:0.6.1 Python interpreter
shell org.apache.zeppelin:zeppelin-shell:0.6.1 Shell command

View file

@ -190,7 +190,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter, org.apache.zeppelin.pig.PigQueryInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

View file

@ -62,6 +62,7 @@
<li><a href="{{BASE_PATH}}/interpreter/lens.html">Lens</a></li>
<li><a href="{{BASE_PATH}}/interpreter/livy.html">Livy</a></li>
<li><a href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li>
<li><a href="{{BASE_PATH}}/interpreter/pig.html">Pig</a></li>
<li><a href="{{BASE_PATH}}/interpreter/python.html">Python</a></li>
<li><a href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, HAWQ</a></li>
<li><a href="{{BASE_PATH}}/interpreter/r.html">R</a></li>
@ -118,8 +119,6 @@
<li><a href="{{BASE_PATH}}/development/howtocontributewebsite.html">How to contribute (website)</a></li>
</ul>
</li>
</ul>
<ul class="nav navbar-nav">
<li>
<a href="{{BASE_PATH}}/search.html" class="nav-search-link">
<span class="fa fa-search nav-search-icon"></span>

View file

@ -619,6 +619,10 @@ and (max-width: 1024px) {
.navbar-collapse.collapse {
padding-right: 0;
}
.navbar-fixed-top > .container {
width: 800px;
}
}
/* master branch docs dropdown menu */

97
docs/interpreter/pig.md Normal file
View file

@ -0,0 +1,97 @@
---
layout: page
title: "Pig Interpreter for Apache Zeppelin"
description: "Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs."
group: manual
---
{% include JB/setup %}
# Pig Interpreter for Apache Zeppelin
<div id="toc"></div>
## Overview
[Apache Pig](https://pig.apache.org/) is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.
## Supported interpreter type
- `%pig.script` (default)
All the pig script can run in this type of interpreter, and display type is plain text.
- `%pig.query`
Almost the same as `%pig.script`. The only difference is that you don't need to add alias in the last statement. And the display type is table.
## Supported runtime mode
- Local
- MapReduce
- Tez (Only Tez 0.7 is supported)
## How to use
### How to setup Pig
- Local Mode
Nothing needs to be done for local mode
- MapReduce Mode
HADOOP\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`.
- Tez Mode
HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`.
### How to configure interpreter
At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default.
<table class="table-configuration">
<tr>
<th>Property</th>
<th>Default</th>
<th>Description</th>
</tr>
<tr>
<td>zeppelin.pig.execType</td>
<td>mapreduce</td>
<td>Execution mode for pig runtime. local | mapreduce | tez </td>
</tr>
<tr>
<td>zeppelin.pig.includeJobStats</td>
<td>false</td>
<td>whether display jobStats info in <code>%pig.script</code></td>
</tr>
<tr>
<td>zeppelin.pig.maxResult</td>
<td>1000</td>
<td>max row number displayed in <code>%pig.query</code></td>
</tr>
</table>
### Example
##### pig
```
%pig
raw_data = load 'dataset/sf_crime/train.csv' using PigStorage(',') as (Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y);
b = group raw_data all;
c = foreach b generate COUNT($1);
dump c;
```
##### pig.query
```
%pig.query
b = foreach raw_data generate Category;
c = group b by Category;
foreach c generate group as category, COUNT($1) as count;
```
Data is shared between `%pig` and `%pig.query`, so that you can do some common work in `%pig`, and do different kinds of query based on the data of `%pig`.

View file

@ -82,3 +82,49 @@ interpreter.start()
The above code will start interpreter thread inside your process. Once the interpreter is started you can configure zeppelin to connect to RemoteInterpreter by checking **Connect to existing process** checkbox and then provide **Host** and **Port** on which interpreter porocess is listening as shown in the image below:
<img src="../assets/themes/zeppelin/img/screenshots/existing_interpreter.png" width="450px">
## (Experimental) Interpreter Execution Hooks
Zeppelin allows for users to specify additional code to be executed by an interpreter at pre and post-paragraph code execution. This is primarily useful if you need to run the same set of code for all of the paragraphs within your notebook at specific times. Currently, this feature is only available for the spark and pyspark interpreters. To specify your hook code, you may use '`z.registerHook()`. For example, enter the following into one paragraph:
```python
%pyspark
z.registerHook("post_exec", "print 'This code should be executed before the parapgraph code!'")
z.registerHook("pre_exec", "print 'This code should be executed after the paragraph code!'")
```
These calls will not take into effect until the next time you run a paragraph. In another paragraph, enter
```python
%pyspark
print "This code should be entered into the paragraph by the user!"
```
The output should be:
```
This code should be executed before the paragraph code!
This code should be entered into the paragraph by the user!
This code should be executed after the paragraph code!
```
If you ever need to know the hook code, use `z.getHook()`:
```python
%pyspark
print z.getHook("post_exec")
```
```
print 'This code should be executed after the paragraph code!'
```
Any call to `z.registerHook()` will automatically overwrite what was previously registered. To completely unregister a hook event, use `z.unregisterHook(eventCode)`. Currently only `"post_exec"` and `"pre_exec"` are valid event codes for the Zeppelin Hook Registry system.
Finally, the hook registry is internally shared by other interpreters in the same group. This would allow for hook code for one interpreter REPL to be set by another as follows:
```scala
%spark
z.unregisterHook("post_exec", "pyspark")
```
The API is identical for both the spark (scala) and pyspark (python) implementations.
### Caveats
Calls to `z.registerHook("pre_exec", ...)` should be made with care. If there are errors in your specified hook code, this will cause the interpreter REPL to become unable to execute any code pass the pre-execute stage making it impossible for direct calls to `z.unregisterHook()` to take into effect. Current workarounds include calling `z.unregisterHook()` from a different interpreter REPL in the same interpreter group (see above) or manually restarting the interpreter group in the UI.

View file

@ -56,24 +56,6 @@ public class KylinInterpreter extends Interpreter {
static final Pattern KYLIN_TABLE_FORMAT_REGEX_LABEL = Pattern.compile("\"label\":\"(.*?)\"");
static final Pattern KYLIN_TABLE_FORMAT_REGEX = Pattern.compile("\"results\":\\[\\[\"(.*?)\"]]");
static {
Interpreter.register(
"kylin",
"kylin",
KylinInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(KYLIN_USERNAME, "ADMIN", "username for kylin user")
.add(KYLIN_PASSWORD, "KYLIN", "password for kylin user")
.add(KYLIN_QUERY_API_URL, "http://<host>:<port>/kylin/api/query", "Kylin API.")
.add(KYLIN_QUERY_PROJECT, "default", "kylin project name")
.add(KYLIN_QUERY_OFFSET, "0", "kylin query offset")
.add(KYLIN_QUERY_LIMIT, "5000", "kylin query limit")
.add(KYLIN_QUERY_ACCEPT_PARTIAL, "true", "The kylin query partial flag").build());
}
public KylinInterpreter(Properties property) {
super(property);
}

View file

@ -0,0 +1,54 @@
[
{
"group": "kylin",
"name": "kylin",
"className": "org.apache.zeppelin.kylin.KylinInterpreter",
"properties": {
"kylin.api.url": {
"envName": null,
"propertyName": "kylin.api.url",
"defaultValue": "http://localhost:7070/kylin/api/query",
"description": "Kylin API"
},
"kylin.api.user": {
"envName": null,
"propertyName": "kylin.api.user",
"defaultValue": "ADMIN",
"description": "username for kylin user"
},
"kylin.api.password": {
"envName": null,
"propertyName": "kylin.api.password",
"defaultValue": "KYLIN",
"description": "password for kylin user"
},
"kylin.query.project": {
"envName": null,
"propertyName": "kylin.query.project",
"defaultValue": "default",
"description": "kylin project name"
},
"kylin.query.offset": {
"envName": null,
"propertyName": "kylin.query.offset",
"defaultValue": "0",
"description": "kylin query offset"
},
"kylin.query.limit": {
"envName": null,
"propertyName": "kylin.query.limit",
"defaultValue": "5000",
"description": "kylin query limit"
},
"kylin.query.ispartial": {
"envName": null,
"propertyName": "kylin.query.ispartial",
"defaultValue": "true",
"description": "The kylin query partial flag"
}
},
"editor": {
"language": "sql"
}
}
]

View file

@ -20,8 +20,7 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.message.AbstractHttpMessage;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.kylin.KylinInterpreter;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@ -33,19 +32,23 @@ import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class KylinInterpreterTest {
@Before
public void setUp() throws Exception {
}
static final Properties kylinProperties = new Properties();
@After
public void tearDown() throws Exception {
@BeforeClass
public static void setUpClass() {
kylinProperties.put("kylin.api.url", "http://localhost:7070/kylin/api/query");
kylinProperties.put("kylin.api.user", "ADMIN");
kylinProperties.put("kylin.api.password", "KYLIN");
kylinProperties.put("kylin.query.project", "default");
kylinProperties.put("kylin.query.offset", "0");
kylinProperties.put("kylin.query.limit", "5000");
kylinProperties.put("kylin.query.ispartial", "true");
}
@Test
public void test(){
KylinInterpreter t = new MockKylinInterpreter(new Properties());
KylinInterpreter t = new MockKylinInterpreter(kylinProperties);
InterpreterResult result = t.interpret(
"select a.date,sum(b.measure) as measure from kylin_fact_table a " +
"inner join kylin_lookup_table b on a.date=b.date group by a.date", null);
@ -198,4 +201,4 @@ class MockEntity implements HttpEntity{
public void consumeContent() throws IOException {
}
}
}

184
pig/pom.xml Normal file
View file

@ -0,0 +1,184 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-pig</artifactId>
<packaging>jar</packaging>
<version>0.7.0-SNAPSHOT</version>
<name>Zeppelin: Apache Pig Interpreter</name>
<description>Zeppelin interpreter for Apache Pig</description>
<url>http://zeppelin.apache.org</url>
<properties>
<pig.version>0.16.0</pig.version>
<hadoop.version>2.6.0</hadoop.version>
<tez.version>0.7.0</tez.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<classifier>h2</classifier>
<version>${pig.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
<version>${tez.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-common</artifactId>
<version>${tez.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-dag</artifactId>
<version>${tez.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-runtime-library</artifactId>
<version>${tez.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-runtime-internals</artifactId>
<version>${tez.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-mapreduce</artifactId>
<version>${tez.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-yarn-timeline-history-with-acls</artifactId>
<version>${tez.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/pig
</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/pig
</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.pig;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
*
*/
public abstract class BasePigInterpreter extends Interpreter {
private static Logger LOGGER = LoggerFactory.getLogger(BasePigInterpreter.class);
protected ConcurrentHashMap<String, PigScriptListener> listenerMap = new ConcurrentHashMap<>();
public BasePigInterpreter(Properties property) {
super(property);
}
@Override
public void cancel(InterpreterContext context) {
LOGGER.info("Cancel paragraph:" + context.getParagraphId());
PigScriptListener listener = listenerMap.get(context.getParagraphId());
if (listener != null) {
Set<String> jobIds = listener.getJobIds();
if (jobIds.isEmpty()) {
LOGGER.info("No job is started, so can not cancel paragraph:" + context.getParagraphId());
}
for (String jobId : jobIds) {
LOGGER.info("Kill jobId:" + jobId);
HExecutionEngine engine =
(HExecutionEngine) getPigServer().getPigContext().getExecutionEngine();
try {
Field launcherField = HExecutionEngine.class.getDeclaredField("launcher");
launcherField.setAccessible(true);
Launcher launcher = (Launcher) launcherField.get(engine);
// It doesn't work for Tez Engine due to PIG-5035
launcher.killJob(jobId, new Configuration());
} catch (NoSuchFieldException | BackendException | IllegalAccessException e) {
LOGGER.error("Fail to cancel paragraph:" + context.getParagraphId(), e);
}
}
} else {
LOGGER.warn("No PigScriptListener found, can not cancel paragraph:"
+ context.getParagraphId());
}
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
PigScriptListener listener = listenerMap.get(context.getParagraphId());
if (listener != null) {
return listener.getProgress();
}
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
PigInterpreter.class.getName() + this.hashCode());
}
public abstract PigServer getPigServer();
}

View file

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.pig;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pig.PigServer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.tools.pigstats.*;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
/**
* Pig interpreter for Zeppelin.
*/
public class PigInterpreter extends BasePigInterpreter {
private static Logger LOGGER = LoggerFactory.getLogger(PigInterpreter.class);
private PigServer pigServer;
private boolean includeJobStats = false;
public PigInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
String execType = getProperty("zeppelin.pig.execType");
if (execType == null) {
execType = "mapreduce";
}
String includeJobStats = getProperty("zeppelin.pig.includeJobStats");
if (includeJobStats != null) {
this.includeJobStats = Boolean.parseBoolean(includeJobStats);
}
try {
pigServer = new PigServer(execType);
} catch (IOException e) {
LOGGER.error("Fail to initialize PigServer", e);
throw new RuntimeException("Fail to initialize PigServer", e);
}
}
@Override
public void close() {
pigServer = null;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
// remember the origial stdout, because we will redirect stdout to capture
// the pig dump output.
PrintStream originalStdOut = System.out;
ByteArrayOutputStream bytesOutput = new ByteArrayOutputStream();
File tmpFile = null;
try {
tmpFile = PigUtils.createTempPigScript(cmd);
System.setOut(new PrintStream(bytesOutput));
// each thread should its own ScriptState & PigStats
ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());
// reset PigStats, otherwise you may get the PigStats of last job in the same thread
// because PigStats is ThreadLocal variable
PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats());
PigScriptListener scriptListener = new PigScriptListener();
ScriptState.get().registerListener(scriptListener);
listenerMap.put(contextInterpreter.getParagraphId(), scriptListener);
pigServer.registerScript(tmpFile.getAbsolutePath());
} catch (IOException e) {
if (e instanceof FrontendException) {
FrontendException fe = (FrontendException) e;
if (!fe.getMessage().contains("Backend error :")) {
// If the error message contains "Backend error :", that means the exception is from
// backend.
LOGGER.error("Fail to run pig script.", e);
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
}
}
PigStats stats = PigStats.get();
if (stats != null) {
String errorMsg = PigUtils.extactJobStats(stats);
if (errorMsg != null) {
LOGGER.error("Fail to run pig script, " + errorMsg);
return new InterpreterResult(Code.ERROR, errorMsg);
}
}
LOGGER.error("Fail to run pig script.", e);
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
} finally {
System.setOut(originalStdOut);
listenerMap.remove(contextInterpreter.getParagraphId());
if (tmpFile != null) {
tmpFile.delete();
}
}
StringBuilder outputBuilder = new StringBuilder();
PigStats stats = PigStats.get();
if (stats != null && includeJobStats) {
String jobStats = PigUtils.extactJobStats(stats);
if (jobStats != null) {
outputBuilder.append(jobStats);
}
}
outputBuilder.append(bytesOutput.toString());
return new InterpreterResult(Code.SUCCESS, outputBuilder.toString());
}
public PigServer getPigServer() {
return pigServer;
}
}

View file

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.pig;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
/**
*
*/
public class PigQueryInterpreter extends BasePigInterpreter {
private static Logger LOGGER = LoggerFactory.getLogger(PigQueryInterpreter.class);
private PigServer pigServer;
private int maxResult;
public PigQueryInterpreter(Properties properties) {
super(properties);
}
@Override
public void open() {
pigServer = getPigInterpreter().getPigServer();
maxResult = Integer.parseInt(getProperty("zeppelin.pig.maxResult"));
}
@Override
public void close() {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
// '-' is invalid for pig alias
String alias = "paragraph_" + context.getParagraphId().replace("-", "_");
String[] lines = st.split("\n");
List<String> queries = new ArrayList<String>();
for (int i = 0; i < lines.length; ++i) {
if (i == lines.length - 1) {
lines[i] = alias + " = " + lines[i];
}
queries.add(lines[i]);
}
StringBuilder resultBuilder = new StringBuilder("%table ");
try {
File tmpScriptFile = PigUtils.createTempPigScript(queries);
// each thread should its own ScriptState & PigStats
ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());
// reset PigStats, otherwise you may get the PigStats of last job in the same thread
// because PigStats is ThreadLocal variable
PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats());
PigScriptListener scriptListener = new PigScriptListener();
ScriptState.get().registerListener(scriptListener);
listenerMap.put(context.getParagraphId(), scriptListener);
pigServer.registerScript(tmpScriptFile.getAbsolutePath());
Schema schema = pigServer.dumpSchema(alias);
boolean schemaKnown = (schema != null);
if (schemaKnown) {
for (int i = 0; i < schema.size(); ++i) {
Schema.FieldSchema field = schema.getField(i);
resultBuilder.append(field.alias);
if (i != schema.size() - 1) {
resultBuilder.append("\t");
}
}
resultBuilder.append("\n");
}
Iterator<Tuple> iter = pigServer.openIterator(alias);
boolean firstRow = true;
int index = 0;
while (iter.hasNext() && index <= maxResult) {
index++;
Tuple tuple = iter.next();
if (firstRow && !schemaKnown) {
for (int i = 0; i < tuple.size(); ++i) {
resultBuilder.append("c_" + i + "\t");
}
resultBuilder.append("\n");
firstRow = false;
}
resultBuilder.append(StringUtils.join(tuple, "\t"));
resultBuilder.append("\n");
}
if (index >= maxResult && iter.hasNext()) {
resultBuilder.append("\n<font color=red>Results are limited by " + maxResult + ".</font>");
}
} catch (IOException e) {
// Extract error in the following order
// 1. catch FrontendException, FrontendException happens in the query compilation phase.
// 2. PigStats, This is execution error
// 3. Other errors.
if (e instanceof FrontendException) {
FrontendException fe = (FrontendException) e;
if (!fe.getMessage().contains("Backend error :")) {
LOGGER.error("Fail to run pig query.", e);
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
}
}
PigStats stats = PigStats.get();
if (stats != null) {
String errorMsg = PigUtils.extactJobStats(stats);
if (errorMsg != null) {
return new InterpreterResult(Code.ERROR, errorMsg);
}
}
LOGGER.error("Fail to run pig query.", e);
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
} finally {
listenerMap.remove(context.getParagraphId());
}
return new InterpreterResult(Code.SUCCESS, resultBuilder.toString());
}
@Override
public PigServer getPigServer() {
return this.pigServer;
}
private PigInterpreter getPigInterpreter() {
LazyOpenInterpreter lazy = null;
PigInterpreter pig = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
pig = (PigInterpreter) p;
if (lazy != null) {
lazy.open();
}
return pig;
}
}

View file

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.pig;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
/**
*
*/
public class PigScriptListener implements PigProgressNotificationListener {
private static Logger LOGGER = LoggerFactory.getLogger(PigScriptListener.class);
private Set<String> jobIds = new HashSet();
private int progress;
@Override
public void initialPlanNotification(String scriptId, OperatorPlan<?> plan) {
}
@Override
public void launchStartedNotification(String scriptId, int numJobsToLaunch) {
}
@Override
public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) {
}
@Override
public void jobStartedNotification(String scriptId, String assignedJobId) {
this.jobIds.add(assignedJobId);
}
@Override
public void jobFinishedNotification(String scriptId, JobStats jobStats) {
}
@Override
public void jobFailedNotification(String scriptId, JobStats jobStats) {
}
@Override
public void outputCompletedNotification(String scriptId, OutputStats outputStats) {
}
@Override
public void progressUpdatedNotification(String scriptId, int progress) {
LOGGER.debug("scriptId:" + scriptId + ", progress:" + progress);
this.progress = progress;
}
@Override
public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
}
public Set<String> getJobIds() {
return jobIds;
}
public int getProgress() {
return progress;
}
}

View file

@ -0,0 +1,292 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.pig;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pig.PigRunner;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
import org.apache.pig.tools.pigstats.tez.TezDAGStats;
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Field;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
*
*/
public class PigUtils {
private static Logger LOGGER = LoggerFactory.getLogger(PigUtils.class);
protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static File createTempPigScript(String content) throws IOException {
File tmpFile = File.createTempFile("zeppelin", "pig");
LOGGER.debug("Create pig script file:" + tmpFile.getAbsolutePath());
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
return tmpFile.getAbsoluteFile();
}
public static File createTempPigScript(List<String> lines) throws IOException {
return createTempPigScript(StringUtils.join(lines, "\n"));
}
public static String extactJobStats(PigStats stats) {
if (stats instanceof SimplePigStats) {
return extractFromSimplePigStats((SimplePigStats) stats);
} else if (stats instanceof TezPigScriptStats) {
return extractFromTezPigStats((TezPigScriptStats) stats);
} else {
throw new RuntimeException("Unrecognized stats type:" + stats.getClass().getSimpleName());
}
}
public static String extractFromSimplePigStats(SimplePigStats stats) {
try {
Field userIdField = PigStats.class.getDeclaredField("userId");
userIdField.setAccessible(true);
String userId = (String) (userIdField.get(stats));
Field startTimeField = PigStats.class.getDeclaredField("startTime");
startTimeField.setAccessible(true);
long startTime = (Long) (startTimeField.get(stats));
Field endTimeField = PigStats.class.getDeclaredField("endTime");
endTimeField.setAccessible(true);
long endTime = (Long) (endTimeField.get(stats));
if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
LOGGER.warn("unknown return code, can't display the results");
return null;
}
if (stats.getPigContext() == null) {
LOGGER.warn("unknown exec type, don't display the results");
return null;
}
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
StringBuilder sb = new StringBuilder();
sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t")
.append(userId).append("\t")
.append(sdf.format(new Date(startTime))).append("\t")
.append(sdf.format(new Date(endTime))).append("\t")
.append(stats.getFeatures()).append("\n");
sb.append("\n");
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
sb.append("Success!\n");
} else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
sb.append("Some jobs have failed! Stop running all dependent jobs\n");
} else {
sb.append("Failed!\n");
}
sb.append("\n");
Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
jobPlanField.setAccessible(true);
PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats);
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
sb.append("Job Stats (time in seconds):\n");
sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
List<JobStats> arr = jobPlan.getSuccessfulJobs();
for (JobStats js : arr) {
sb.append(js.getDisplayString());
}
sb.append("\n");
}
if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
sb.append("Failed Jobs:\n");
sb.append(MRJobStats.FAILURE_HEADER).append("\n");
List<JobStats> arr = jobPlan.getFailedJobs();
for (JobStats js : arr) {
sb.append(js.getDisplayString());
}
sb.append("\n");
}
sb.append("Input(s):\n");
for (InputStats is : stats.getInputStats()) {
sb.append(is.getDisplayString());
}
sb.append("\n");
sb.append("Output(s):\n");
for (OutputStats ds : stats.getOutputStats()) {
sb.append(ds.getDisplayString());
}
sb.append("\nCounters:\n");
sb.append("Total records written : " + stats.getRecordWritten()).append("\n");
sb.append("Total bytes written : " + stats.getBytesWritten()).append("\n");
sb.append("Spillable Memory Manager spill count : "
+ stats.getSMMSpillCount()).append("\n");
sb.append("Total bags proactively spilled: "
+ stats.getProactiveSpillCountObjects()).append("\n");
sb.append("Total records proactively spilled: "
+ stats.getProactiveSpillCountRecords()).append("\n");
sb.append("\nJob DAG:\n").append(jobPlan.toString());
return "Script Statistics: \n" + sb.toString();
} catch (Exception e) {
LOGGER.error("Can not extract message from SimplePigStats", e);
return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
}
}
private static String extractFromTezPigStats(TezPigScriptStats stats) {
try {
Field userIdField = PigStats.class.getDeclaredField("userId");
userIdField.setAccessible(true);
String userId = (String) (userIdField.get(stats));
Field startTimeField = PigStats.class.getDeclaredField("startTime");
startTimeField.setAccessible(true);
long startTime = (Long) (startTimeField.get(stats));
Field endTimeField = PigStats.class.getDeclaredField("endTime");
endTimeField.setAccessible(true);
long endTime = (Long) (endTimeField.get(stats));
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
StringBuilder sb = new StringBuilder();
sb.append("\n");
sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", stats.getHadoopVersion()));
sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", stats.getPigVersion()));
sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion()));
sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
sb.append(String.format("%1$20s: %2$-100s%n", "FileName", stats.getFileName()));
sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime))));
sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime))));
sb.append(String.format("%1$20s: %2$-100s%n", "Features", stats.getFeatures()));
sb.append("\n");
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
sb.append("Success!\n");
} else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
sb.append("Some tasks have failed! Stop running all dependent tasks\n");
} else {
sb.append("Failed!\n");
}
sb.append("\n");
// Print diagnostic info in case of failure
if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
if (stats.getErrorMessage() != null) {
String[] lines = stats.getErrorMessage().split("\n");
for (int i = 0; i < lines.length; i++) {
String s = lines[i].trim();
if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) {
sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
}
}
sb.append("\n");
}
}
Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
tezDAGStatsMapField.setAccessible(true);
Map<String, TezDAGStats> tezDAGStatsMap =
(Map<String, TezDAGStats>) tezDAGStatsMapField.get(stats);
int count = 0;
for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
sb.append("\n");
sb.append("DAG " + count++ + ":\n");
sb.append(dagStats.getDisplayString());
sb.append("\n");
}
sb.append("Input(s):\n");
for (InputStats is : stats.getInputStats()) {
sb.append(is.getDisplayString().trim()).append("\n");
}
sb.append("\n");
sb.append("Output(s):\n");
for (OutputStats os : stats.getOutputStats()) {
sb.append(os.getDisplayString().trim()).append("\n");
}
return "Script Statistics:\n" + sb.toString();
} catch (Exception e) {
LOGGER.error("Can not extract message from SimplePigStats", e);
return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
}
}
public static List<String> extractJobIds(PigStats stat) {
if (stat instanceof SimplePigStats) {
return extractJobIdsFromSimplePigStats((SimplePigStats) stat);
} else if (stat instanceof TezPigScriptStats) {
return extractJobIdsFromTezPigStats((TezPigScriptStats) stat);
} else {
throw new RuntimeException("Unrecognized stats type:" + stat.getClass().getSimpleName());
}
}
public static List<String> extractJobIdsFromSimplePigStats(SimplePigStats stat) {
List<String> jobIds = new ArrayList<>();
try {
Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
jobPlanField.setAccessible(true);
PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat);
List<JobStats> arr = jobPlan.getJobList();
for (JobStats js : arr) {
jobIds.add(js.getJobId());
}
return jobIds;
} catch (Exception e) {
LOGGER.error("Can not extract jobIds from SimpelPigStats", e);
throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e);
}
}
public static List<String> extractJobIdsFromTezPigStats(TezPigScriptStats stat) {
List<String> jobIds = new ArrayList<>();
try {
Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
tezDAGStatsMapField.setAccessible(true);
Map<String, TezDAGStats> tezDAGStatsMap =
(Map<String, TezDAGStats>) tezDAGStatsMapField.get(stat);
for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
LOGGER.debug("Tez JobId:" + dagStats.getJobId());
jobIds.add(dagStats.getJobId());
}
return jobIds;
} catch (Exception e) {
LOGGER.error("Can not extract jobIds from TezPigScriptStats", e);
throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e);
}
}
}

View file

@ -0,0 +1,46 @@
[
{
"group": "pig",
"name": "script",
"className": "org.apache.zeppelin.pig.PigInterpreter",
"properties": {
"zeppelin.pig.execType": {
"envName": null,
"propertyName": "zeppelin.pig.execType",
"defaultValue": "mapreduce",
"description": "local | mapreduce | tez"
},
"zeppelin.pig.includeJobStats": {
"envName": null,
"propertyName": "zeppelin.pig.includeJobStats",
"defaultValue": "false",
"description": "flag to include job stats in output"
}
},
"editor": {
"language": "pig"
}
},
{
"group": "pig",
"name": "query",
"className": "org.apache.zeppelin.pig.PigQueryInterpreter",
"properties": {
"zeppelin.pig.execType": {
"envName": null,
"propertyName": "zeppelin.pig.execType",
"defaultValue": "mapreduce",
"description": "local | mapreduce | tez"
},
"zeppelin.pig.maxResult": {
"envName": null,
"propertyName": "zeppelin.pig.maxResult",
"defaultValue": "1000",
"description": "max row number for %pig.query"
}
},
"editor": {
"language": "pig"
}
}
]

View file

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.pig;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class PigInterpreterTest {
private PigInterpreter pigInterpreter;
private InterpreterContext context;
@Before
public void setUp() {
Properties properties = new Properties();
properties.put("zeppelin.pig.execType", "local");
pigInterpreter = new PigInterpreter(properties);
pigInterpreter.open();
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
null, null);
}
@After
public void tearDown() {
pigInterpreter.close();
}
@Test
public void testBasics() throws IOException {
String content = "1\tandy\n"
+ "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+ "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.type());
assertEquals(Code.SUCCESS, result.code());
assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
// describe
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.type());
assertEquals(Code.SUCCESS, result.code());
assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.type());
assertEquals(Code.ERROR, result.code());
assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'"));
// execution error
pigscript = "a = load 'invalid_path';"
+ "dump a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.type());
assertEquals(Code.ERROR, result.code());
assertTrue(result.message().contains("Input path does not exist"));
}
@Test
public void testIncludeJobStats() throws IOException {
Properties properties = new Properties();
properties.put("zeppelin.pig.execType", "local");
properties.put("zeppelin.pig.includeJobStats", "true");
pigInterpreter = new PigInterpreter(properties);
pigInterpreter.open();
String content = "1\tandy\n"
+ "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+ "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.type());
assertEquals(Code.SUCCESS, result.code());
assertTrue(result.message().contains("Counters:"));
assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
// describe
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.type());
assertEquals(Code.SUCCESS, result.code());
// no job is launched, so no jobStats
assertTrue(!result.message().contains("Counters:"));
assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.type());
assertEquals(Code.ERROR, result.code());
// no job is launched, so no jobStats
assertTrue(!result.message().contains("Counters:"));
assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'"));
// execution error
pigscript = "a = load 'invalid_path';"
+ "dump a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(Type.TEXT, result.type());
assertEquals(Code.ERROR, result.code());
assertTrue(result.message().contains("Counters:"));
assertTrue(result.message().contains("Input path does not exist"));
}
}

View file

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.pig;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
*
*/
public class PigQueryInterpreterTest {
private PigInterpreter pigInterpreter;
private PigQueryInterpreter pigQueryInterpreter;
private InterpreterContext context;
@Before
public void setUp() {
Properties properties = new Properties();
properties.put("zeppelin.pig.execType", "local");
properties.put("zeppelin.pig.maxResult", "20");
pigInterpreter = new PigInterpreter(properties);
pigQueryInterpreter = new PigQueryInterpreter(properties);
List<Interpreter> interpreters = new ArrayList();
interpreters.add(pigInterpreter);
interpreters.add(pigQueryInterpreter);
InterpreterGroup group = new InterpreterGroup();
group.put("note_id", interpreters);
pigInterpreter.setInterpreterGroup(group);
pigQueryInterpreter.setInterpreterGroup(group);
pigInterpreter.open();
pigQueryInterpreter.open();
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
null, null);
}
@After
public void tearDown() {
pigInterpreter.close();
pigQueryInterpreter.close();
}
@Test
public void testBasics() throws IOException {
String content = "andy\tmale\t10\n"
+ "peter\tmale\t20\n"
+ "amy\tfemale\t14\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// run script in PigInterpreter
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (name, gender, age);\n"
+ "a2 = load 'invalid_path' as (name, gender, age);\n"
+ "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)"));
// run single line query in PigQueryInterpreter
String query = "foreach a generate name, age;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("name\tage\nandy\t10\npeter\t20\namy\t14\n", result.message());
// run multiple line query in PigQueryInterpreter
query = "b = group a by gender;\nforeach b generate group as gender, COUNT($1) as count;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("gender\tcount\nmale\t2\nfemale\t1\n", result.message());
// syntax error in PigQueryInterpereter
query = "b = group a by invalid_column;\nforeach b generate group as gender, COUNT($1) as count;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertTrue(result.message().contains("Projected field [invalid_column] does not exist in schema"));
// execution error in PigQueryInterpreter
query = "foreach a2 generate name, age;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertTrue(result.message().contains("Input path does not exist"));
}
@Test
public void testMaxResult() throws IOException {
StringBuilder content = new StringBuilder();
for (int i=0;i<30;++i) {
content.append(i + "\tname_" + i + "\n");
}
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// run script in PigInterpreter
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id, name);";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// empty output
assertTrue(result.message().isEmpty());
// run single line query in PigQueryInterpreter
String query = "foreach a generate id;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().contains("id\n0\n1\n2"));
assertTrue(result.message().contains("Results are limited by 20"));
}
}

View file

@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
log4j.rootLogger = INFO, stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n

View file

@ -62,6 +62,7 @@
<module>shell</module>
<module>livy</module>
<module>hbase</module>
<module>pig</module>
<module>postgresql</module>
<module>jdbc</module>
<module>file</module>

View file

@ -49,6 +49,7 @@ import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterProperty;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
@ -101,6 +102,7 @@ public class SparkInterpreter extends Interpreter {
private SparkConf conf;
private static SparkContext sc;
private static SQLContext sqlc;
private static InterpreterHookRegistry hooks;
private static SparkEnv env;
private static Object sparkSession; // spark 2.x
private static JobProgressListener sparkListener;
@ -813,8 +815,10 @@ public class SparkInterpreter extends Interpreter {
sqlc = getSQLContext();
dep = getDependencyResolver();
hooks = getInterpreterGroup().getInterpreterHookRegistry();
z = new ZeppelinContext(sc, sqlc, null, dep,
z = new ZeppelinContext(sc, sqlc, null, dep, hooks,
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
interpret("@transient val _binder = new java.util.HashMap[String, Object]()");

View file

@ -28,11 +28,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectWatcher;
@ -41,6 +44,7 @@ import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
@ -53,19 +57,33 @@ import scala.Unit;
* Spark context for zeppelin.
*/
public class ZeppelinContext {
// Map interpreter class name (to be used by hook registry) from
// given replName in parapgraph
private static final Map<String, String> interpreterClassMap;
static {
interpreterClassMap = new HashMap<String, String>();
interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter");
interpreterClassMap.put("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter");
interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter");
interpreterClassMap.put("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter");
}
private SparkDependencyResolver dep;
private InterpreterContext interpreterContext;
private int maxResult;
private List<Class> supportedClasses;
private InterpreterHookRegistry hooks;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
SparkDependencyResolver dep,
InterpreterHookRegistry hooks,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
this.hooks = hooks;
this.maxResult = maxResult;
this.supportedClasses = new ArrayList<>();
try {
@ -697,6 +715,90 @@ public class ZeppelinContext {
registry.remove(name, noteId, null);
}
/**
* Get the interpreter class name from name entered in paragraph
* @param replName if replName is a valid className, return that instead.
*/
public String getClassNameFromReplName(String replName) {
for (String name : interpreterClassMap.values()) {
if (replName.equals(name)) {
return replName;
}
}
if (replName.contains("spark.")) {
replName = replName.replace("spark.", "");
}
return interpreterClassMap.get(replName);
}
/**
* General function to register hook event
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
* @param replName Name of the interpreter
*/
@Experimental
public void registerHook(String event, String cmd, String replName) {
String noteId = interpreterContext.getNoteId();
String className = getClassNameFromReplName(replName);
hooks.register(noteId, className, event, cmd);
}
/**
* registerHook() wrapper for current repl
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerHook(String event, String cmd) {
String className = interpreterContext.getClassName();
registerHook(event, cmd, className);
}
/**
* Get the hook code
* @param event The type of event to hook to (pre_exec, post_exec)
* @param replName Name of the interpreter
*/
@Experimental
public String getHook(String event, String replName) {
String noteId = interpreterContext.getNoteId();
String className = getClassNameFromReplName(replName);
return hooks.get(noteId, className, event);
}
/**
* getHook() wrapper for current repl
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public String getHook(String event) {
String className = interpreterContext.getClassName();
return getHook(event, className);
}
/**
* Unbind code from given hook event
* @param event The type of event to hook to (pre_exec, post_exec)
* @param replName Name of the interpreter
*/
@Experimental
public void unregisterHook(String event, String replName) {
String noteId = interpreterContext.getNoteId();
String className = getClassNameFromReplName(replName);
hooks.unregister(noteId, className, event);
}
/**
* unregisterHook() wrapper for current repl
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterHook(String event) {
String className = interpreterContext.getClassName();
unregisterHook(event, className);
}
/**
* Add object into resource pool

View file

@ -80,16 +80,16 @@ class PyZeppelinContext(dict):
def get(self, key):
return self.__getitem__(key)
def input(self, name, defaultValue = ""):
def input(self, name, defaultValue=""):
return self.z.input(name, defaultValue)
def select(self, name, options, defaultValue = ""):
def select(self, name, options, defaultValue=""):
# auto_convert to ArrayList doesn't match the method signature on JVM side
tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
iterables = gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples)
return self.z.select(name, defaultValue, iterables)
def checkbox(self, name, options, defaultChecked = None):
def checkbox(self, name, options, defaultChecked=None):
if defaultChecked is None:
defaultChecked = list(map(lambda items: items[0], options))
optionTuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
@ -99,6 +99,23 @@ class PyZeppelinContext(dict):
checkedIterables = self.z.checkbox(name, defaultCheckedIterables, optionIterables)
return gateway.jvm.scala.collection.JavaConversions.asJavaCollection(checkedIterables)
def registerHook(self, event, cmd, replName=None):
if replName is None:
self.z.registerHook(event, cmd)
else:
self.z.registerHook(event, cmd, replName)
def unregisterHook(self, event, replName=None):
if replName is None:
self.z.unregisterHook(event)
else:
self.z.unregisterHook(event, replName)
def getHook(self, event, replName=None):
if replName is None:
return self.z.getHook(event)
return self.z.getHook(event, replName)
def __tupleToScalaTuple2(self, tuple):
if (len(tuple) == 2):
return gateway.jvm.scala.Tuple2(tuple[0], tuple[1])

View file

@ -116,7 +116,6 @@ The following components are provided under Apache License.
(Apache 2.0) Utility classes for Jetty (org.mortbay.jetty:jetty-util:6.1.26 - http://javadox.com/org.mortbay.jetty/jetty/6.1.26/overview-tree.html)
(Apache 2.0) Servlet API (org.mortbay.jetty:servlet-api:2.5-20081211 - https://en.wikipedia.org/wiki/Jetty_(web_server))
(Apache 2.0) Google HTTP Client Library for Java (com.google.http-client:google-http-client-jackson2:1.21.0 - https://github.com/google/google-http-java-client/tree/dev/google-http-client-jackson2)
(Apache 2.0) angular-esri-map (https://github.com/Esri/angular-esri-map)
(Apache 2.0) pegdown (org.pegdown:pegdown:1.6.0 - https://github.com/sirthias/pegdown)
(Apache 2.0) parboiled-java (org.parboiled:parboiled-java:1.1.7 - https://github.com/sirthias/parboiled)
(Apache 2.0) parboiled-core (org.parboiled:parboiled-core:1.1.7 - https://github.com/sirthias/parboiled)
@ -156,7 +155,15 @@ The following components are provided under Apache License.
(Apache 2.0) Tachyon Project Core (org.tachyonproject:tachyon:0.6.4 - http://tachyonproject.org/tachyon/)
(Apache 2.0) Tachyon Project Client (org.tachyonproject:tachyon-client:0.6.4 - http://tachyonproject.org/tachyon-client/)
(Apache 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/)
(Apache 2.0) Apache Pig (org.apache.pig:0.16 - http://pig.apache.org)
(Apache 2.0) tez-api (org.apache.tez:tez-api:0.7.0 - http://tez.apache.org)
(Apache 2.0) tez-common (org.apache.tez:tez-common:0.7.0 - http://tez.apache.org)
(Apache 2.0) tez-dag (org.apache.tez:tez-dag:0.7.0 - http://tez.apache.org)
(Apache 2.0) tez-runtime-library (org.apache.tez:runtime-library:0.7.0 - http://tez.apache.org)
(Apache 2.0) tez-runtime-internals (org.apache.tez:tez-runtime-internals:0.7.0 - http://tez.apache.org)
(Apache 2.0) tez-mapreduce (org.apache.tez:tez-mapreduce:0.7.0 - http://tez.apache.org)
(Apache 2.0) tez-yarn-timeline-history-with-acls (org.apache.tez:tez-yarn-timeline-history-with-acls:0.7.0 - http://tez.apache.org)
========================================================================
MIT licenses
========================================================================

View file

@ -27,6 +27,7 @@ import java.util.Properties;
import com.google.gson.annotations.SerializedName;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -203,6 +204,71 @@ public abstract class Interpreter {
this.classloaderUrls = classloaderUrls;
}
/**
* General function to register hook event
* @param noteId - Note to bind hook to
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerHook(String noteId, String event, String cmd) {
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
String className = getClassName();
hooks.register(noteId, className, event, cmd);
}
/**
* registerHook() wrapper for global scope
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerHook(String event, String cmd) {
registerHook(null, event, cmd);
}
/**
* Get the hook code
* @param noteId - Note to bind hook to
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public String getHook(String noteId, String event) {
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
String className = getClassName();
return hooks.get(noteId, className, event);
}
/**
* getHook() wrapper for global scope
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public String getHook(String event) {
return getHook(null, event);
}
/**
* Unbind code from given hook event
* @param noteId - Note to bind hook to
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterHook(String noteId, String event) {
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
String className = getClassName();
hooks.unregister(noteId, className, event);
}
/**
* unregisterHook() wrapper for global scope
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterHook(String event) {
unregisterHook(null, event);
}
@ZeppelinApi
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
synchronized (interpreterGroup) {

View file

@ -57,6 +57,7 @@ public class InterpreterContext {
private AngularObjectRegistry angularObjectRegistry;
private ResourcePool resourcePool;
private List<InterpreterContextRunner> runners;
private String className;
public InterpreterContext(String noteId,
String paragraphId,
@ -124,4 +125,11 @@ public class InterpreterContext {
return runners;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
}

View file

@ -45,6 +45,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
Logger LOGGER = Logger.getLogger(InterpreterGroup.class);
AngularObjectRegistry angularObjectRegistry;
InterpreterHookRegistry hookRegistry;
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
ResourcePool resourcePool;
boolean angularRegistryPushed = false;
@ -118,10 +119,18 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public AngularObjectRegistry getAngularObjectRegistry() {
return angularObjectRegistry;
}
public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
this.angularObjectRegistry = angularObjectRegistry;
}
public InterpreterHookRegistry getInterpreterHookRegistry() {
return hookRegistry;
}
public void setInterpreterHookRegistry(InterpreterHookRegistry hookRegistry) {
this.hookRegistry = hookRegistry;
}
public RemoteInterpreterProcess getRemoteInterpreterProcess() {
return remoteInterpreterProcess;

View file

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
/**
* An interface for processing custom callback code into the interpreter.
*/
public interface InterpreterHookListener {
/**
* Prepends pre-execute hook code to the script that will be interpreted
*/
public void onPreExecute(String script);
/**
* Appends post-execute hook code to the script that will be interpreted
*/
public void onPostExecute(String script);
}

View file

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import java.util.HashMap;
import java.util.Map;
/**
* The InterpreterinterpreterHookRegistry specifies code to be conditionally executed by an
* interpreter. The constants defined in this class denote currently
* supported events. Each instance is bound to a single InterpreterGroup.
* Scope is determined on a per-note basis (except when null for global scope).
*/
public class InterpreterHookRegistry {
public static final String GLOBAL_KEY = "_GLOBAL_";
private String interpreterId;
private Map<String, Map<String, Map<String, String>>> registry =
new HashMap<String, Map<String, Map<String, String>>>();
/**
* hookRegistry constructor.
*
* @param interpreterId The Id of the InterpreterGroup instance to bind to
*/
public InterpreterHookRegistry(final String interpreterId) {
this.interpreterId = interpreterId;
}
/**
* Get the interpreterGroup id this instance is bound to
*/
public String getInterpreterId() {
return interpreterId;
}
/**
* Adds a note to the registry
*
* @param noteId The Id of the Note instance to add
*/
public void addNote(String noteId) {
synchronized (registry) {
if (registry.get(noteId) == null) {
registry.put(noteId, new HashMap<String, Map<String, String>>());
}
}
}
/**
* Adds a className to the registry
*
* @param noteId The note id
* @param className The name of the interpreter repl to map the hooks to
*/
public void addRepl(String noteId, String className) {
synchronized (registry) {
addNote(noteId);
if (registry.get(noteId).get(className) == null) {
registry.get(noteId).put(className, new HashMap<String, String>());
}
}
}
/**
* Register a hook for a specific event.
*
* @param noteId Denotes the note this instance belongs to
* @param className The name of the interpreter repl to map the hooks to
* @param event hook event (see constants defined in this class)
* @param cmd Code to be executed by the interpreter
*/
public void register(String noteId, String className,
String event, String cmd) throws IllegalArgumentException {
synchronized (registry) {
if (noteId == null) {
noteId = GLOBAL_KEY;
}
addRepl(noteId, className);
if (!event.equals(HookType.POST_EXEC) && !event.equals(HookType.PRE_EXEC) &&
!event.equals(HookType.POST_EXEC_DEV) && !event.equals(HookType.PRE_EXEC_DEV)) {
throw new IllegalArgumentException("Must be " + HookType.POST_EXEC + ", " +
HookType.POST_EXEC_DEV + ", " +
HookType.PRE_EXEC + " or " +
HookType.PRE_EXEC_DEV);
}
registry.get(noteId).get(className).put(event, cmd);
}
}
/**
* Unregister a hook for a specific event.
*
* @param noteId Denotes the note this instance belongs to
* @param className The name of the interpreter repl to map the hooks to
* @param event hook event (see constants defined in this class)
*/
public void unregister(String noteId, String className, String event) {
synchronized (registry) {
if (noteId == null) {
noteId = GLOBAL_KEY;
}
addRepl(noteId, className);
registry.get(noteId).get(className).remove(event);
}
}
/**
* Get a hook for a specific event.
*
* @param noteId Denotes the note this instance belongs to
* @param className The name of the interpreter repl to map the hooks to
* @param event hook event (see constants defined in this class)
*/
public String get(String noteId, String className, String event) {
synchronized (registry) {
if (noteId == null) {
noteId = GLOBAL_KEY;
}
addRepl(noteId, className);
return registry.get(noteId).get(className).get(event);
}
}
/**
* Container for hook event type constants
*/
public static final class HookType {
// Execute the hook code PRIOR to main paragraph code execution
public static final String PRE_EXEC = "pre_exec";
// Execute the hook code AFTER main paragraph code execution
public static final String POST_EXEC = "post_exec";
// Same as above but reserved for interpreter developers, in order to allow
// notebook users to use the above without overwriting registry settings
// that are initialized directly in subclasses of Interpreter.
public static final String PRE_EXEC_DEV = "pre_exec_dev";
public static final String POST_EXEC_DEV = "post_exec_dev";
}
}

View file

@ -147,4 +147,34 @@ public class LazyOpenInterpreter
public void setClassloaderUrls(URL [] urls) {
intp.setClassloaderUrls(urls);
}
@Override
public void registerHook(String noteId, String event, String cmd) {
intp.registerHook(noteId, event, cmd);
}
@Override
public void registerHook(String event, String cmd) {
intp.registerHook(event, cmd);
}
@Override
public String getHook(String noteId, String event) {
return intp.getHook(noteId, event);
}
@Override
public String getHook(String event) {
return intp.getHook(event);
}
@Override
public void unregisterHook(String noteId, String event) {
intp.unregisterHook(noteId, event);
}
@Override
public void unregisterHook(String event) {
intp.unregisterHook(event);
}
}

View file

@ -33,6 +33,8 @@ import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.*;
import org.apache.zeppelin.helium.*;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.InterpreterHookListener;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
import org.apache.zeppelin.interpreter.thrift.*;
@ -60,6 +62,7 @@ public class RemoteInterpreterServer
InterpreterGroup interpreterGroup;
AngularObjectRegistry angularObjectRegistry;
InterpreterHookRegistry hookRegistry;
DistributedResourcePool resourcePool;
private ApplicationLoader appLoader;
@ -152,7 +155,9 @@ public class RemoteInterpreterServer
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
hookRegistry = new InterpreterHookRegistry(interpreterGroup.getId());
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
interpreterGroup.setInterpreterHookRegistry(hookRegistry);
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
interpreterGroup.setResourcePool(resourcePool);
@ -290,6 +295,7 @@ public class RemoteInterpreterServer
}
Interpreter intp = getInterpreter(noteId, className);
InterpreterContext context = convert(interpreterContext);
context.setClassName(intp.getClassName());
Scheduler scheduler = intp.getScheduler();
InterpretJobListener jobListener = new InterpretJobListener();
@ -383,10 +389,61 @@ public class RemoteInterpreterServer
return infos;
}
private void processInterpreterHooks(final String noteId) {
InterpreterHookListener hookListener = new InterpreterHookListener() {
@Override
public void onPreExecute(String script) {
String cmdDev = interpreter.getHook(noteId, HookType.PRE_EXEC_DEV);
String cmdUser = interpreter.getHook(noteId, HookType.PRE_EXEC);
// User defined hook should be executed before dev hook
List<String> cmds = Arrays.asList(cmdDev, cmdUser);
for (String cmd : cmds) {
if (cmd != null) {
script = cmd + '\n' + script;
}
}
InterpretJob.this.script = script;
}
@Override
public void onPostExecute(String script) {
String cmdDev = interpreter.getHook(noteId, HookType.POST_EXEC_DEV);
String cmdUser = interpreter.getHook(noteId, HookType.POST_EXEC);
// User defined hook should be executed after dev hook
List<String> cmds = Arrays.asList(cmdUser, cmdDev);
for (String cmd : cmds) {
if (cmd != null) {
script += '\n' + cmd;
}
}
InterpretJob.this.script = script;
}
};
hookListener.onPreExecute(script);
hookListener.onPostExecute(script);
}
@Override
protected Object jobRun() throws Throwable {
try {
InterpreterContext.set(context);
// Open the interpreter instance prior to calling interpret().
// This is necessary because the earliest we can register a hook
// is from within the open() method.
LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter;
if (!lazy.isOpen()) {
lazy.open();
}
// Add hooks to script from registry.
// Global scope first, followed by notebook scope
processInterpreterHooks(null);
processInterpreterHooks(context.getNoteId());
InterpreterResult result = interpreter.interpret(script, context);
// data from context.out is prepended to InterpreterResult if both defined

View file

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class InterpreterHookRegistryTest {
@Test
public void testBasic() {
final String PRE_EXEC = InterpreterHookRegistry.HookType.PRE_EXEC;
final String POST_EXEC = InterpreterHookRegistry.HookType.POST_EXEC;
final String PRE_EXEC_DEV = InterpreterHookRegistry.HookType.PRE_EXEC_DEV;
final String POST_EXEC_DEV = InterpreterHookRegistry.HookType.POST_EXEC_DEV;
final String GLOBAL_KEY = InterpreterHookRegistry.GLOBAL_KEY;
final String noteId = "note";
final String className = "class";
final String preExecHook = "pre";
final String postExecHook = "post";
InterpreterHookRegistry registry = new InterpreterHookRegistry("intpId");
// Test register()
registry.register(noteId, className, PRE_EXEC, preExecHook);
registry.register(noteId, className, POST_EXEC, postExecHook);
registry.register(noteId, className, PRE_EXEC_DEV, preExecHook);
registry.register(noteId, className, POST_EXEC_DEV, postExecHook);
// Test get()
assertEquals(registry.get(noteId, className, PRE_EXEC), preExecHook);
assertEquals(registry.get(noteId, className, POST_EXEC), postExecHook);
assertEquals(registry.get(noteId, className, PRE_EXEC_DEV), preExecHook);
assertEquals(registry.get(noteId, className, POST_EXEC_DEV), postExecHook);
// Test Unregister
registry.unregister(noteId, className, PRE_EXEC);
registry.unregister(noteId, className, POST_EXEC);
registry.unregister(noteId, className, PRE_EXEC_DEV);
registry.unregister(noteId, className, POST_EXEC_DEV);
assertNull(registry.get(noteId, className, PRE_EXEC));
assertNull(registry.get(noteId, className, POST_EXEC));
assertNull(registry.get(noteId, className, PRE_EXEC_DEV));
assertNull(registry.get(noteId, className, POST_EXEC_DEV));
// Test Global Scope
registry.register(null, className, PRE_EXEC, preExecHook);
assertEquals(registry.get(GLOBAL_KEY, className, PRE_EXEC), preExecHook);
}
@Test(expected = IllegalArgumentException.class)
public void testValidEventCode() {
InterpreterHookRegistry registry = new InterpreterHookRegistry("intpId");
// Test that only valid event codes ("pre_exec", "post_exec") are accepted
registry.register("foo", "bar", "baz", "whatever");
}
}

View file

@ -161,6 +161,7 @@ public class NotebookRestApi {
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
note.persist(subject);
notebookServer.broadcastNote(note);
notebookServer.broadcastNoteList(subject);
return new JsonResponse<>(Status.OK).build();
}
@ -307,7 +308,7 @@ public class NotebookRestApi {
/**
* Clone note REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @return JSON with status.CREATED
* @throws IOException, CloneNotSupportedException, IllegalArgumentException
*/
@ -369,7 +370,7 @@ public class NotebookRestApi {
/**
* Get paragraph REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @return JSON with information of the paragraph
* @throws IOException
*/
@ -434,7 +435,7 @@ public class NotebookRestApi {
/**
* Delete paragraph REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @return JSON with status.OK
* @throws IOException
*/
@ -464,9 +465,9 @@ public class NotebookRestApi {
}
/**
* Run notebook jobs REST API
* Run note jobs REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
*/
@ -493,9 +494,9 @@ public class NotebookRestApi {
}
/**
* Stop(delete) notebook jobs REST API
* Stop(delete) note jobs REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
*/
@ -521,7 +522,7 @@ public class NotebookRestApi {
/**
* Get notebook job status REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
*/
@ -542,7 +543,7 @@ public class NotebookRestApi {
/**
* Get notebook paragraph job status REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @param paragraphId ID of Paragraph
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
@ -652,7 +653,7 @@ public class NotebookRestApi {
/**
* Stop(delete) paragraph job REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @param paragraphId ID of Paragraph
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
@ -717,7 +718,7 @@ public class NotebookRestApi {
/**
* Remove cron job REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
*/
@ -745,7 +746,7 @@ public class NotebookRestApi {
/**
* Get cron job REST API
*
* @param noteId ID of Notebook
* @param noteId ID of Note
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
*/

View file

@ -92,7 +92,7 @@ public class ZeppelinServer extends Application {
notebookWsServer, heliumApplicationFactory, depResolver);
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new LuceneSearch();
this.notebookAuthorization = new NotebookAuthorization(conf);
this.notebookAuthorization = NotebookAuthorization.init(conf);
this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
notebook = new Notebook(conf,
notebookRepo, schedulerFactory, replFactory, notebookWsServer,

View file

@ -60,6 +60,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@ -86,6 +87,8 @@ public class NotebookServer extends WebSocketServlet implements
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets =
new ConcurrentHashMap<String, Queue<NotebookSocket>>();
private Notebook notebook() {
return ZeppelinServer.notebook;
@ -161,6 +164,9 @@ public class NotebookServer extends WebSocketServlet implements
userAndRoles.addAll(roles);
}
}
if (StringUtils.isEmpty(conn.getUser())) {
addUserConnection(messagereceived.principal, conn);
}
AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);
/** Lets be elegant here */
@ -268,6 +274,26 @@ public class NotebookServer extends WebSocketServlet implements
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
removeConnectionFromAllNote(conn);
connectedSockets.remove(conn);
removeUserConnection(conn.getUser(), conn);
}
private void removeUserConnection(String user, NotebookSocket conn) {
if (userConnectedSockets.containsKey(user)) {
userConnectedSockets.get(user).remove(conn);
} else {
LOG.warn("Closing connection that is absent in user connections");
}
}
private void addUserConnection(String user, NotebookSocket conn) {
conn.setUser(user);
if (userConnectedSockets.containsKey(user)) {
userConnectedSockets.get(user).add(conn);
} else {
Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>();
socketQueue.add(conn);
userConnectedSockets.put(user, socketQueue);
}
}
protected Message deserializeMessage(String msg) {
@ -383,8 +409,12 @@ public class NotebookServer extends WebSocketServlet implements
}
}
private void broadcastAll(Message m) {
for (NotebookSocket conn : connectedSockets) {
private void multicastToUser(String user, Message m) {
if (!userConnectedSockets.containsKey(user)) {
LOG.warn("Broadcasting to user that is not in connections map");
return;
}
for (NotebookSocket conn: userConnectedSockets.get(user)) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
@ -476,6 +506,7 @@ public class NotebookServer extends WebSocketServlet implements
LOG.error("Fail to reload notes from repository", e);
}
}
List<Note> notes = notebook.getAllNotes(subject);
List<Map<String, String>> notesInfo = new LinkedList<>();
for (Note note : notes) {
@ -504,8 +535,20 @@ public class NotebookServer extends WebSocketServlet implements
}
public void broadcastNoteList(AuthenticationInfo subject) {
if (subject == null) {
subject = new AuthenticationInfo(StringUtils.EMPTY);
}
//send first to requesting user
List<Map<String, String>> notesInfo = generateNotesInfo(false, subject);
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
//to others afterwards
for (String user: userConnectedSockets.keySet()) {
if (subject.getUser() == user) {
continue;
}
notesInfo = generateNotebooksInfo(false, new AuthenticationInfo(user));
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
}
}
public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) {
@ -514,8 +557,21 @@ public class NotebookServer extends WebSocketServlet implements
}
public void broadcastReloadedNoteList(AuthenticationInfo subject) {
if (subject == null) {
subject = new AuthenticationInfo(StringUtils.EMPTY);
}
//reload and reply first to requesting user
List<Map<String, String>> notesInfo = generateNotesInfo(true, subject);
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
//to others afterwards
for (String user: userConnectedSockets.keySet()) {
if (subject.getUser() == user) {
continue;
}
//reloaded already above; parameter - false
notesInfo = generateNotebooksInfo(false, new AuthenticationInfo(user));
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
}
}
void permissionError(NotebookSocket conn, String op,

View file

@ -20,6 +20,7 @@ import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@ -32,12 +33,14 @@ public class NotebookSocket extends WebSocketAdapter {
private NotebookSocketListener listener;
private HttpServletRequest request;
private String protocol;
private String user;
public NotebookSocket(HttpServletRequest req, String protocol,
NotebookSocketListener listener) {
this.listener = listener;
this.request = req;
this.protocol = protocol;
this.user = StringUtils.EMPTY;
}
@Override
@ -69,4 +72,11 @@ public class NotebookSocket extends WebSocketAdapter {
connection.getRemote().sendString(serializeMessage);
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
}

View file

@ -32,6 +32,7 @@ import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
@ -341,7 +342,9 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
Map<String, Object> resp = gson.fromJson(get.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
}.getType());
List<Map<String, String>> body = (List<Map<String, String>>) resp.get("body");
assertEquals("List notebooks are equal", ZeppelinServer.notebook.getAllNotes().size(), body.size());
//TODO(khalid): anonymous or specific user notes?
AuthenticationInfo subject = new AuthenticationInfo("anonymous");
assertEquals("List notebooks are equal", ZeppelinServer.notebook.getAllNotes(subject).size(), body.size());
get.releaseConnection();
}

View file

@ -33,7 +33,6 @@
"handsontable": "~0.24.2",
"moment-duration-format": "^1.3.0",
"select2": "^4.0.3",
"angular-esri-map": "~2.0.0",
"github-markdown-css": "^2.4.0"
},
"devDependencies": {

View file

@ -33,8 +33,7 @@
'xeditable',
'ngToast',
'focus-if',
'ngResource',
'esri.map'
'ngResource'
])
.filter('breakFilter', function() {
return function(text) {

View file

@ -20,7 +20,7 @@ limitations under the License.
<button type="button" class="btn btn-default btn-sm"
ng-if="paragraph.result.type == 'TABLE'"
ng-class="{'active': isGraphMode('table')}"
ng-click="setGraphMode('table', true)"
ng-click="setGraphMode('table', true)"
tooltip="Table" tooltip-placement="bottom"><i class="fa fa-table"></i>
</button>
<button type="button" class="btn btn-default btn-sm"
@ -53,12 +53,6 @@ limitations under the License.
ng-click="setGraphMode('scatterChart', true)"
tooltip="Scatter Chart" tooltip-placement="bottom"><i class="cf cf-scatter-chart"></i>
</button>
<button type="button" class="btn btn-default btn-sm"
ng-if="paragraph.result.type == 'TABLE'"
ng-class="{'active': isGraphMode('map')}"
ng-click="setGraphMode('map', true)"
tooltip="Map" tooltip-placement="bottom"><i class="fa fa-map-marker"></i>
</button>
<button type="button"
ng-if="paragraph.result.type != 'TABLE'"

View file

@ -51,12 +51,4 @@ limitations under the License.
id="p{{paragraph.id}}_scatterChart">
<svg></svg>
</div>
<div ng-if="getGraphMode()=='map'" id="p{{paragraph.id}}_map"
ng-switch="paragraph.config.graph.map.isOnline">
<div ng-switch-when="true"></div>
<span class="map-offline-text" ng-switch-default>
<span>Maps require internet connectivity.</span>
</span>
</div>
</div>

View file

@ -32,7 +32,7 @@ limitations under the License.
</ul>
</div>
<div class="row" ng-if="getGraphMode()!='scatterChart' && getGraphMode()!='map'">
<div class="row" ng-if="getGraphMode()!='scatterChart'">
<div class="col-md-4">
<span class="columns lightBold">
Keys
@ -165,52 +165,4 @@ limitations under the License.
</span>
</div>
</div>
<div class="row" ng-if="getGraphMode()=='map'">
<div class="col-md-4">
<span class="columns lightBold">
Latitude
<ul data-drop="true"
ng-model="paragraph.config.graph.map.lat"
jqyoui-droppable="{onDrop:'onGraphOptionChange()'}"
class="list-unstyled">
<li ng-if="paragraph.config.graph.map.lat">
<div class="btn btn-primary btn-xs">
{{paragraph.config.graph.map.lat.name}} <span class="fa fa-close" ng-click="removeMapOptionLat($index)"></span>
</div>
</li>
</ul>
</span>
</div>
<div class="col-md-4">
<span class="columns lightBold">
Longitude
<ul data-drop="true"
ng-model="paragraph.config.graph.map.lng"
jqyoui-droppable="{onDrop:'onGraphOptionChange()'}"
class="list-unstyled">
<li ng-if="paragraph.config.graph.map.lng">
<div class="btn btn-primary btn-xs">
{{paragraph.config.graph.map.lng.name}} <span class="fa fa-close" ng-click="removeMapOptionLng($index)"></span>
</div>
</li>
</ul>
</span>
</div>
<div class="col-md-4">
<span class="columns lightBold">
Pin contents
<ul data-drop="true"
ng-model="paragraph.config.graph.map.pinCols"
jqyoui-droppable="{multiple:true, onDrop:'onGraphOptionChange()'}"
class="list-unstyled">
<li ng-repeat="col in paragraph.config.graph.map.pinCols">
<div class="btn btn-primary btn-xs">
{{col.name}} <span class="fa fa-close" ng-click="removeMapOptionPinInfo($index)"></span>
</div>
</li>
</ul>
</span>
</div>
</div>
</div>

View file

@ -30,13 +30,12 @@
'websocketMsgSrv',
'baseUrlSrv',
'ngToast',
'saveAsService',
'esriLoader'
'saveAsService'
];
function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $location,
$timeout, $compile, $http, $q, websocketMsgSrv,
baseUrlSrv, ngToast, saveAsService, esriLoader) {
baseUrlSrv, ngToast, saveAsService) {
var ANGULAR_FUNCTION_OBJECT_NAME_PREFIX = '_Z_ANGULAR_FUNC_';
$scope.parentNote = null;
$scope.paragraph = null;
@ -105,7 +104,6 @@
$scope.parentNote = note;
$scope.originalText = angular.copy(newParagraph.text);
$scope.chart = {};
$scope.baseMapOption = ['Streets', 'Satellite', 'Hybrid', 'Topo', 'Gray', 'Oceans', 'Terrain'];
$scope.colWidthOption = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
$scope.paragraphFocused = false;
if (newParagraph.focus) {
@ -257,22 +255,6 @@
config.graph.scatter = {};
}
if (!config.graph.map) {
config.graph.map = {};
}
if (!config.graph.map.baseMapType) {
config.graph.map.baseMapType = $scope.baseMapOption[0];
}
if (!config.graph.map.isOnline) {
config.graph.map.isOnline = true;
}
if (!config.graph.map.pinCols) {
config.graph.map.pinCols = [];
}
if (config.enabled === undefined) {
config.enabled = true;
}
@ -974,8 +956,6 @@
if (!type || type === 'table') {
setTable($scope.paragraph.result, refresh);
} else if (type === 'map') {
setMap($scope.paragraph.result, refresh);
} else {
setD3Chart(type, $scope.paragraph.result, refresh);
}
@ -1206,236 +1186,6 @@
$timeout(retryRenderer);
};
var setMap = function(data, refresh) {
var createPinMapLayer = function(pins, cb) {
esriLoader.require(['esri/layers/FeatureLayer'], function(FeatureLayer) {
var pinLayer = new FeatureLayer({
id: 'pins',
spatialReference: $scope.map.spatialReference,
geometryType: 'point',
source: pins,
fields: [],
objectIdField: '_ObjectID',
renderer: $scope.map.pinRenderer,
popupTemplate: {
title: '[{_lng}, {_lat}]',
content: [{
type: 'fields',
fieldInfos: []
}]
}
});
// add user-selected pin info fields to popup
var pinInfoCols = $scope.paragraph.config.graph.map.pinCols;
for (var i = 0; i < pinInfoCols.length; ++i) {
pinLayer.popupTemplate.content[0].fieldInfos.push({
fieldName: pinInfoCols[i].name,
visible: true
});
}
cb(pinLayer);
});
};
var getMapPins = function(cb) {
esriLoader.require(['esri/geometry/Point'], function(Point, FeatureLayer) {
var latCol = $scope.paragraph.config.graph.map.lat;
var lngCol = $scope.paragraph.config.graph.map.lng;
var pinInfoCols = $scope.paragraph.config.graph.map.pinCols;
var pins = [];
// construct objects for pins
if (latCol && lngCol && data.rows) {
for (var i = 0; i < data.rows.length; ++i) {
var row = data.rows[i];
var lng = row[lngCol.index];
var lat = row[latCol.index];
var pin = {
geometry: new Point({
longitude: lng,
latitude: lat,
spatialReference: $scope.map.spatialReference
}),
attributes: {
_ObjectID: i,
_lng: lng,
_lat: lat
}
};
// add pin info from user-selected columns
for (var j = 0; j < pinInfoCols.length; ++j) {
var col = pinInfoCols[j];
pin.attributes[col.name] = row[col.index];
}
pins.push(pin);
}
}
cb(pins);
});
};
var updateMapPins = function() {
var pinLayer = $scope.map.map.findLayerById('pins');
$scope.map.popup.close();
if (pinLayer) {
$scope.map.map.remove(pinLayer);
}
// add pins to map as layer
getMapPins(function(pins) {
createPinMapLayer(pins, function(pinLayer) {
$scope.map.map.add(pinLayer);
if (pinLayer.source.length > 0) {
$scope.map.goTo(pinLayer.source);
}
});
});
};
var createMap = function(mapdiv) {
// prevent zooming with the scroll wheel
var disableZoom = function(e) {
var evt = e || window.event;
evt.cancelBubble = true;
evt.returnValue = false;
if (evt.stopPropagation) {
evt.stopPropagation();
}
};
var eName = window.WheelEvent ? 'wheel' : // Modern browsers
window.MouseWheelEvent ? 'mousewheel' : // WebKit and IE
'DOMMouseScroll'; // Old Firefox
mapdiv.addEventListener(eName, disableZoom, true);
esriLoader.require(['esri/views/MapView',
'esri/Map',
'esri/renderers/SimpleRenderer',
'esri/symbols/SimpleMarkerSymbol'],
function(MapView, Map, SimpleRenderer, SimpleMarkerSymbol) {
$scope.map = new MapView({
container: mapdiv,
map: new Map({
basemap: $scope.paragraph.config.graph.map.baseMapType.toLowerCase()
}),
center: [-106.3468, 56.1304], // Canada (lng, lat)
zoom: 2,
pinRenderer: new SimpleRenderer({
symbol: new SimpleMarkerSymbol({
'color': [255, 0, 0, 0.5],
'size': 16.5,
'outline': {
'color': [0, 0, 0, 1],
'width': 1.125,
},
// map pin SVG path
'path': 'M16,3.5c-4.142,0-7.5,3.358-7.5,7.5c0,4.143,7.5,18.121,7.5,' +
'18.121S23.5,15.143,23.5,11C23.5,6.858,20.143,3.5,16,3.5z ' +
'M16,14.584c-1.979,0-3.584-1.604-3.584-3.584S14.021,7.416,' +
'16,7.416S19.584,9.021,19.584,11S17.979,14.584,16,14.584z'
})
})
});
$scope.map.on('click', function() {
// ArcGIS JS API 4.0 does not account for scrolling or position
// changes by default (this is a bug, to be fixed in the upcoming
// version 4.1; see https://geonet.esri.com/thread/177238#comment-609681).
// This results in a misaligned popup.
// Workaround: manually set popup position to match position of selected pin
if ($scope.map.popup.selectedFeature) {
$scope.map.popup.location = $scope.map.popup.selectedFeature.geometry;
}
});
$scope.map.then(updateMapPins);
});
};
var checkMapOnline = function(cb) {
// are we able to get a response from the ArcGIS servers?
var callback = function(res) {
var online = (res.status > 0);
$scope.paragraph.config.graph.map.isOnline = online;
cb(online);
};
$http.head('//services.arcgisonline.com/arcgis/', {
timeout: 5000,
withCredentials: false
}).then(callback, callback);
};
var renderMap = function() {
var mapdiv = angular.element('#p' + $scope.paragraph.id + '_map')
.css('height', $scope.paragraph.config.graph.height)
.children('div').get(0);
// on chart type change, destroy map to force reinitialization.
if ($scope.map && !refresh) {
$scope.map.map.destroy();
$scope.map.pinRenderer = null;
$scope.map = null;
}
var requireMapCSS = function() {
var url = '//js.arcgis.com/4.0/esri/css/main.css';
if (!angular.element('link[href="' + url + '"]').length) {
var link = document.createElement('link');
link.rel = 'stylesheet';
link.type = 'text/css';
link.href = url;
angular.element('head').append(link);
}
};
var requireMapJS = function(cb) {
if (!esriLoader.isLoaded()) {
esriLoader.bootstrap({
url: '//js.arcgis.com/4.0'
}).then(cb);
} else {
cb();
}
};
checkMapOnline(function(online) {
// we need an internet connection to use the map
if (online) {
// create map if not exists.
if (!$scope.map) {
requireMapCSS();
requireMapJS(function() {
createMap(mapdiv);
});
} else {
updateMapPins();
}
}
});
};
var retryRenderer = function() {
if (angular.element('#p' + $scope.paragraph.id + '_map div').length) {
try {
renderMap();
} catch (err) {
console.log('Map drawing error %o', err);
}
} else {
$timeout(retryRenderer,10);
}
};
$timeout(retryRenderer);
};
$scope.setMapBaseMap = function(bm) {
$scope.paragraph.config.graph.map.baseMapType = bm;
if ($scope.map) {
$scope.map.map.basemap = bm.toLowerCase();
}
};
$scope.isGraphMode = function(graphName) {
var activeAppId = _.get($scope.paragraph.config, 'helium.activeApp');
if ($scope.getResultType() === 'TABLE' && $scope.getGraphMode() === graphName && !activeAppId) {
@ -1498,24 +1248,6 @@
$scope.setGraphMode($scope.paragraph.config.graph.mode, true, false);
};
$scope.removeMapOptionLat = function(idx) {
$scope.paragraph.config.graph.map.lat = null;
clearUnknownColsFromGraphOption();
$scope.setGraphMode($scope.paragraph.config.graph.mode, true, false);
};
$scope.removeMapOptionLng = function(idx) {
$scope.paragraph.config.graph.map.lng = null;
clearUnknownColsFromGraphOption();
$scope.setGraphMode($scope.paragraph.config.graph.mode, true, false);
};
$scope.removeMapOptionPinInfo = function(idx) {
$scope.paragraph.config.graph.map.pinCols.splice(idx, 1);
clearUnknownColsFromGraphOption();
$scope.setGraphMode($scope.paragraph.config.graph.mode, true, false);
};
/* Clear unknown columns from graph option */
var clearUnknownColsFromGraphOption = function() {
var unique = function(list) {
@ -1574,10 +1306,6 @@
removeUnknown($scope.paragraph.config.graph.groups);
removeUnknownFromFields($scope.paragraph.config.graph.scatter);
unique($scope.paragraph.config.graph.map.pinCols);
removeUnknown($scope.paragraph.config.graph.map.pinCols);
removeUnknownFromFields($scope.paragraph.config.graph.map);
};
/* select default key and value if there're none selected */
@ -1598,23 +1326,6 @@
$scope.paragraph.config.graph.scatter.xAxis = $scope.paragraph.result.columnNames[0];
}
}
/* try to find columns for the map logitude and latitude */
var findDefaultMapCol = function(settingName, keyword) {
var col;
if (!$scope.paragraph.config.graph.map[settingName]) {
for (var i = 0; i < $scope.paragraph.result.columnNames.length; ++i) {
col = $scope.paragraph.result.columnNames[i];
if (col.name.toUpperCase().indexOf(keyword) !== -1) {
$scope.paragraph.config.graph.map[settingName] = col;
break;
}
}
}
};
findDefaultMapCol('lat', 'LAT');
findDefaultMapCol('lng', 'LONG');
};
var pivot = function(data) {

View file

@ -331,41 +331,12 @@ table.dataTable.table-condensed .sorting_desc:after {
.tableDisplay div {
}
.tableDisplay img:not(.esri-bitmap) {
.tableDisplay img {
display: block;
max-width: 100%;
height: auto;
}
.esri-display-object > svg {
overflow: visible;
}
.esri-popup > .esri-docked.esri-dock-to-bottom {
padding: 8px;
margin-top: 0px;
}
.esri-popup-main {
max-height: 100%;
}
span.map-offline-text {
display: table;
width: 100%;
height: 100%;
text-align: center;
}
span.map-offline-text > span {
display: table-cell;
vertical-align: middle;
font-size: 18px;
font-weight: 700;
font-family: "Helvetica Neue", Helvetica, Arial, sans-serif;
color: #212121;
}
.tableDisplay .btn-group span {
margin: 10px 0 0 10px;
font-size: 12px;
@ -383,8 +354,7 @@ span.map-offline-text > span {
}
.tableDisplay .option .columns,
div.esri-view {
.tableDisplay .option .columns {
height: 100%;
}

View file

@ -13,13 +13,15 @@ limitations under the License.
-->
<div id="noteImportModal" class="modal fade" role="dialog"
tabindex="-1">
<div class="modal-dialog">
tabindex="-1" data-backdrop="static" data-keyboard="false">
<div class="modal-dialog" >
<!-- Modal content-->
<div class="modal-content" id="NoteImportCtrl" ng-init="NoteImportInit">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal">&times;</button>
<!-- close button -->
<button type="button" class="close" data-dismiss="modal" ng-click="noteimportctrl.resetFlags()">&times;</button>
<h4 class="modal-title">Import new note</h4>
</div>
<div class="modal-body">
@ -57,7 +59,7 @@ limitations under the License.
<div class="form-group slide-right" ng-show="note.step2">
<label for="noteImportUrl">URL</label>
<input placeholder="Note name" type="text" class="form-control" id="noteImportUrl"
<input placeholder="Note url" type="text" class="form-control" id="noteImportUrl"
ng-model="note.importUrl" />
</div>

View file

@ -147,7 +147,6 @@ limitations under the License.
<script src="bower_components/handsontable/dist/handsontable.js"></script>
<script src="bower_components/moment-duration-format/lib/moment-duration-format.js"></script>
<script src="bower_components/select2/dist/js/select2.js"></script>
<script src="bower_components/angular-esri-map/dist/angular-esri-map.js"></script>
<!-- endbower -->
<!-- endbuild -->
<!-- build:js({.tmp,src}) scripts/scripts.js -->

View file

@ -65,7 +65,6 @@ module.exports = function(config) {
'bower_components/handsontable/dist/handsontable.js',
'bower_components/moment-duration-format/lib/moment-duration-format.js',
'bower_components/select2/dist/js/select2.js',
'bower_components/angular-esri-map/dist/angular-esri-map.js',
'bower_components/angular-mocks/angular-mocks.js',
// endbower
'src/app/app.js',

View file

@ -39,7 +39,7 @@ describe('Controller: ParagraphCtrl', function() {
'getResultType', 'loadTableData', 'setGraphMode', 'isGraphMode', 'onGraphOptionChange',
'removeGraphOptionKeys', 'removeGraphOptionValues', 'removeGraphOptionGroups', 'setGraphOptionValueAggr',
'removeScatterOptionXaxis', 'removeScatterOptionYaxis', 'removeScatterOptionGroup',
'removeScatterOptionSize', 'removeMapOptionLat', 'removeMapOptionLng', 'removeMapOptionPinInfo'];
'removeScatterOptionSize'];
functions.forEach(function(fn) {
it('check for scope functions to be defined : ' + fn, function() {

View file

@ -521,6 +521,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.alluxio.AlluxioInterpreter,"
+ "org.apache.zeppelin.file.HDFSFileInterpreter,"
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+ "org.apache.zeppelin.pig.PigInterpreter,"
+ "org.apache.zeppelin.pig.PigQueryInterpreter,"
+ "org.apache.zeppelin.flink.FlinkInterpreter,"
+ "org.apache.zeppelin.python.PythonInterpreter,"
+ "org.apache.zeppelin.python.PythonInterpreterPandasSql,"
@ -543,7 +545,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
+ "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
+ "scalding,jdbc,hbase,bigquery,beam"),
+ "scalding,jdbc,hbase,bigquery,beam,pig"),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
// use specified notebook (id) as homescreen

View file

@ -31,7 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Sets;
@ -483,6 +482,7 @@ public class Notebook implements NoteEventListener {
}
List<NoteInfo> noteInfos = notebookRepo.list(subject);
for (NoteInfo info : noteInfos) {
loadNoteFromRepo(info.getId(), subject);
}
@ -533,7 +533,7 @@ public class Notebook implements NoteEventListener {
return noteList;
}
}
public List<Note> getAllNotes(AuthenticationInfo subject) {
final Set<String> entities = Sets.newHashSet();
if (subject != null) {
@ -643,7 +643,7 @@ public class Notebook implements NoteEventListener {
} else {
info.put("noteName", "Note " + jobNote.getId());
}
// set notebook type ( cron or normal )
// set note type ( cron or normal )
if (jobNote.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) && !jobNote.getConfig()
.get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) {
info.put("noteType", "cron");
@ -703,10 +703,10 @@ public class Notebook implements NoteEventListener {
long lastRunningUnixTime = 0;
Map<String, Object> info = new HashMap<>();
// set notebook ID
// set note ID
info.put("noteId", note.getId());
// set notebook Name
// set note Name
String noteName = note.getName();
if (noteName != null && !noteName.equals("")) {
info.put("noteName", note.getName());
@ -714,7 +714,7 @@ public class Notebook implements NoteEventListener {
info.put("noteName", "Note " + note.getId());
}
// set notebook type ( cron or normal )
// set note type ( cron or normal )
if (note.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) && !note.getConfig()
.get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) {
info.put("noteType", "cron");

View file

@ -17,9 +17,13 @@
package org.apache.zeppelin.notebook;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,29 +35,44 @@ import java.util.*;
*/
public class NotebookAuthorization {
private static final Logger LOG = LoggerFactory.getLogger(NotebookAuthorization.class);
private static NotebookAuthorization instance = null;
/*
* { "note1": { "owners": ["u1"], "readers": ["u1", "u2"], "writers": ["u1"] }, "note2": ... } }
*/
private Map<String, Map<String, Set<String>>> authInfo = new HashMap<>();
private ZeppelinConfiguration conf;
private Gson gson;
private String filePath;
private static Map<String, Map<String, Set<String>>> authInfo = new HashMap<>();
private static ZeppelinConfiguration conf;
private static Gson gson;
private static String filePath;
public NotebookAuthorization(ZeppelinConfiguration conf) {
this.conf = conf;
filePath = conf.getNotebookAuthorizationPath();
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
gson = builder.create();
try {
loadFromFile();
} catch (IOException e) {
LOG.error("Error loading NotebookAuthorization", e);
private NotebookAuthorization() {}
public static NotebookAuthorization init(ZeppelinConfiguration config) {
if (instance == null) {
instance = new NotebookAuthorization();
conf = config;
filePath = conf.getNotebookAuthorizationPath();
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
gson = builder.create();
try {
loadFromFile();
} catch (IOException e) {
LOG.error("Error loading NotebookAuthorization", e);
}
}
return instance;
}
private void loadFromFile() throws IOException {
public static NotebookAuthorization getInstance() {
if (instance == null) {
LOG.warn("Notebook authorization module was called without initialization,"
+ " initializing with default configuration");
init(ZeppelinConfiguration.create());
}
return instance;
}
private static void loadFromFile() throws IOException {
File settingFile = new File(filePath);
LOG.info(settingFile.getAbsolutePath());
if (!settingFile.exists()) {
@ -74,7 +93,7 @@ public class NotebookAuthorization {
String json = sb.toString();
NotebookAuthorizationInfoSaving info = gson.fromJson(json,
NotebookAuthorizationInfoSaving.class);
this.authInfo = info.authInfo;
authInfo = info.authInfo;
}
private void saveToFile() {
@ -225,4 +244,16 @@ public class NotebookAuthorization {
saveToFile();
}
public List<NoteInfo> filterByUser(List<NoteInfo> notes, AuthenticationInfo subject) {
final Set<String> entities = Sets.newHashSet();
if (subject != null) {
entities.add(subject.getUser());
}
return FluentIterable.from(notes).filter(new Predicate<NoteInfo>() {
@Override
public boolean apply(NoteInfo input) {
return input != null && isReader(input.getId(), entities);
}
}).toList();
}
}

View file

@ -31,6 +31,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
@ -178,12 +179,15 @@ public class NotebookRepoSync implements NotebookRepo {
*/
void sync(int sourceRepoIndex, int destRepoIndex, AuthenticationInfo subject) throws IOException {
LOG.info("Sync started");
NotebookAuthorization auth = NotebookAuthorization.getInstance();
NotebookRepo srcRepo = getRepo(sourceRepoIndex);
NotebookRepo dstRepo = getRepo(destRepoIndex);
List <NoteInfo> srcNotes = srcRepo.list(subject);
List <NoteInfo> allSrcNotes = srcRepo.list(subject);
List <NoteInfo> srcNotes = auth.filterByUser(allSrcNotes, subject);
List <NoteInfo> dstNotes = dstRepo.list(subject);
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo,
subject);
List<String> pushNoteIDs = noteIDs.get(pushKey);
List<String> pullNoteIDs = noteIDs.get(pullKey);
List<String> delDstNoteIDs = noteIDs.get(delDstKey);
@ -226,9 +230,13 @@ public class NotebookRepoSync implements NotebookRepo {
}
private void pushNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo localRepo,
NotebookRepo remoteRepo) throws IOException {
NotebookRepo remoteRepo) {
for (String id : ids) {
remoteRepo.save(localRepo.get(id, subject), subject);
try {
remoteRepo.save(localRepo.get(id, subject), subject);
} catch (IOException e) {
LOG.error("Failed to push note to storage, moving onto next one", e);
}
}
}
@ -256,7 +264,8 @@ public class NotebookRepoSync implements NotebookRepo {
}
private Map<String, List<String>> notesCheckDiff(List<NoteInfo> sourceNotes,
NotebookRepo sourceRepo, List<NoteInfo> destNotes, NotebookRepo destRepo)
NotebookRepo sourceRepo, List<NoteInfo> destNotes, NotebookRepo destRepo,
AuthenticationInfo subject)
throws IOException {
List <String> pushIDs = new ArrayList<String>();
List <String> pullIDs = new ArrayList<String>();
@ -268,8 +277,8 @@ public class NotebookRepoSync implements NotebookRepo {
dnote = containsID(destNotes, snote.getId());
if (dnote != null) {
/* note exists in source and destination storage systems */
sdate = lastModificationDate(sourceRepo.get(snote.getId(), null));
ddate = lastModificationDate(destRepo.get(dnote.getId(), null));
sdate = lastModificationDate(sourceRepo.get(snote.getId(), subject));
ddate = lastModificationDate(destRepo.get(dnote.getId(), subject));
if (sdate.compareTo(ddate) != 0) {
if (sdate.after(ddate) || oneWaySync) {

View file

@ -89,7 +89,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
NotebookAuthorization notebookAuthorization = new NotebookAuthorization(conf);
NotebookAuthorization notebookAuthorization = NotebookAuthorization.init(conf);
notebook = new Notebook(
conf,
notebookRepo,

View file

@ -92,7 +92,7 @@ public class NotebookTest implements JobListenerFactory{
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
notebookAuthorization = new NotebookAuthorization(conf);
notebookAuthorization = NotebookAuthorization.init(conf);
credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, search,
@ -207,6 +207,7 @@ public class NotebookTest implements JobListenerFactory{
Notebook notebook2 = new Notebook(
conf, notebookRepo, schedulerFactory,
new InterpreterFactory(conf, null, null, null, depResolver), this, null, null, null);
assertEquals(1, notebook2.getAllNotes().size());
}
@ -588,7 +589,7 @@ public class NotebookTest implements JobListenerFactory{
// create a note and a paragraph
Note note = notebook.createNote(null);
NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
// empty owners, readers and writers means note is public
// empty owners, readers or writers means note is public
assertEquals(notebookAuthorization.isOwner(note.getId(),
new HashSet<String>(Arrays.asList("user2"))), true);
assertEquals(notebookAuthorization.isReader(note.getId(),
@ -873,6 +874,39 @@ public class NotebookTest implements JobListenerFactory{
assertEquals(1, notebook.getAllNotes(new AuthenticationInfo("user2")).size());
}
@Test
public void testGetAllNotesWithDifferentPermissions() throws IOException {
AuthenticationInfo user1 = new AuthenticationInfo("user1");
AuthenticationInfo user2 = new AuthenticationInfo("user2");
List<Note> notes1 = notebook.getAllNotes(user1);
List<Note> notes2 = notebook.getAllNotes(user2);
assertEquals(notes1.size(), 0);
assertEquals(notes2.size(), 0);
//creates note and sets user1 owner
Note note = notebook.createNote(user1);
// note is public since readers and writers empty
notes1 = notebook.getAllNotes(user1);
notes2 = notebook.getAllNotes(user2);
assertEquals(notes1.size(), 1);
assertEquals(notes2.size(), 1);
notebook.getNotebookAuthorization().setReaders(note.getId(), Sets.newHashSet("user1"));
//note is public since writers empty
notes1 = notebook.getAllNotes(user1);
notes2 = notebook.getAllNotes(user2);
assertEquals(notes1.size(), 1);
assertEquals(notes2.size(), 1);
notebook.getNotebookAuthorization().setWriters(note.getId(), Sets.newHashSet("user1"));
notes1 = notebook.getAllNotes(user1);
notes2 = notebook.getAllNotes(user2);
assertEquals(notes1.size(), 1);
assertEquals(notes2.size(), 0);
}
private void delete(File file){
if(file.isFile()) file.delete();
else if(file.isDirectory()){

View file

@ -99,7 +99,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
search = mock(SearchService.class);
notebookRepoSync = new NotebookRepoSync(conf);
notebookAuthorization = new NotebookAuthorization(conf);
notebookAuthorization = NotebookAuthorization.init(conf);
credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search,
notebookAuthorization, credentials);