Merge from master

This commit is contained in:
Rajat Venkatesh 2016-01-09 22:19:32 +05:30
commit 4f6bd04bf7
64 changed files with 1707 additions and 494 deletions

View file

@ -22,17 +22,24 @@ before_install:
- "sh -e /etc/init.d/xvfb start"
install:
- mvn package -DskipTests -Pspark-1.5 -Phadoop-2.3 -Ppyspark -B
- mvn package -DskipTests -Pspark-1.6 -Phadoop-2.3 -Ppyspark -Pscalding -B
before_script:
-
script:
# spark 1.6
- mvn package -Pbuild-distr -Pspark-1.6 -Phadoop-2.3 -Ppyspark -Pscalding -B
- ./testing/startSparkCluster.sh 1.6.0 2.3
- echo "export SPARK_HOME=`pwd`/spark-1.6.0-bin-hadoop2.3" > conf/zeppelin-env.sh
- mvn verify -Pusing-packaged-distr -Pspark-1.6 -Phadoop-2.3 -Ppyspark -Pscalding -B
- ./testing/stopSparkCluster.sh 1.6.0 2.3
# spark 1.5
- mvn package -Pbuild-distr -Pspark-1.5 -Phadoop-2.3 -Ppyspark -B
- rm -rf `pwd`/interpreter/spark
- mvn package -DskipTests -Pspark-1.5 -Phadoop-2.3 -Ppyspark -B -pl 'zeppelin-interpreter,spark-dependencies,spark'
- ./testing/startSparkCluster.sh 1.5.2 2.3
- echo "export SPARK_HOME=`pwd`/spark-1.5.2-bin-hadoop2.3" > conf/zeppelin-env.sh
- mvn verify -Pusing-packaged-distr -Pspark-1.5 -Phadoop-2.3 -Ppyspark -B
- mvn package -Pspark-1.5 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,zeppelin-zengine,zeppelin-server' -Dtest=org.apache.zeppelin.rest.*Test -DfailIfNoTests=false
- ./testing/stopSparkCluster.sh 1.5.2 2.3
# spark 1.4
- rm -rf `pwd`/interpreter/spark

View file

@ -1,55 +0,0 @@
Publish Zeppelin Distribution Package
------
A. Build package
Following command will create package zeppelin-VERSION.tar.gz under _zeppelin-distribution/target_ directory.
mvn clean package -P build-distr
B. Upload to S3 bucket ~~web server~~
~~scp zeppelin-distribution/target/zeppelin-VERSION.tar.gz root@www.nflabs.com:/var/www/html/pub/zeppelin/~~
mvn package -P publish-distr
C. Edit www.zeppelin-project.org
Edit download page to have link to new release.
Publish javadoc
-------
Generate javadoc with following command
mvn javadoc:javadoc
mv "zeppelin-zengine/target/site/apidocs" "ZEPPELIN_HOMEPAGE/docs/zengine-api/VERSION"
and publish the web.
Publish Maven artifact
------------
**Publish to snapshot repository**
mvn -DperformRelease=true deploy
**Publish to release repository**
mvn -DperformRelease=true release:clean
mvn -DperformRelease=true release:prepare
mvn -DperformRelease=true release:perform
Artifact is now in staging repository.
Connect https://oss.sonatype.org/ , select staging repository and click "close" -> "release" will finally release it.
**Reference**
https://docs.sonatype.org/display/Repository/How+To+Generate+PGP+Signatures+With+Maven
https://docs.sonatype.org/display/Repository/Sonatype+OSS+Maven+Repository+Usage+Guide#SonatypeOSSMavenRepositoryUsageGuide-1a.TermsofServiceRepository%3ACentralRepositoryTermsofServiceforSubmitters

View file

@ -67,6 +67,7 @@ Set spark major version
Available profiles are
```
-Pspark-1.6
-Pspark-1.5
-Pspark-1.4
-Pspark-1.3
@ -134,13 +135,13 @@ Here're some examples:
```
# basic build
mvn clean package -Pspark-1.5 -Phadoop-2.4 -Pyarn -Ppyspark
mvn clean package -Pspark-1.6 -Phadoop-2.4 -Pyarn -Ppyspark
# spark-cassandra integration
mvn clean package -Pcassandra-spark-1.5 -Dhadoop.version=2.6.0 -Phadoop-2.6 -DskipTests
# with CDH
mvn clean package -Pspark-1.2 -Dhadoop.version=2.5.0-cdh5.3.0 -Phadoop-2.4 -Pvendor-repo -DskipTests
mvn clean package -Pspark-1.5 -Dhadoop.version=2.6.0-cdh5.5.0 -Phadoop-2.6 -Pvendor-repo -DskipTests
# with MapR
mvn clean package -Pspark-1.5 -Pmapr50 -DskipTests
@ -153,6 +154,11 @@ mvn clean package -Pspark-1.5 -Pmapr50 -DskipTests
mvn clean package -Dignite.version=1.1.0-incubating -DskipTests
```
#### Scalding Interpreter
```
mvn clean package -Pscalding -DskipTests
```
### Configure
If you wish to configure Zeppelin option (like port number), configure the following files:

View file

@ -81,8 +81,11 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
# This will evantually passes SPARK_APP_JAR to classpath of SparkIMain
ZEPPELIN_CLASSPATH=${SPARK_APP_JAR}
pattern="$SPARK_HOME/python/lib/py4j-*-src.zip"
py4j=($pattern)
# pick the first match py4j zip - there should only be one
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
export PYTHONPATH="${py4j[0]}:$PYTHONPATH"
else
# add Hadoop jars into classpath
if [[ -n "${HADOOP_HOME}" ]]; then
@ -95,7 +98,11 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
fi
addJarInDir "${INTERPRETER_DIR}/dep"
PYSPARKPATH="${ZEPPELIN_HOME}/interpreter/spark/pyspark/pyspark.zip:${ZEPPELIN_HOME}/interpreter/spark/pyspark/py4j-0.8.2.1-src.zip"
pattern="${ZEPPELIN_HOME}/interpreter/spark/pyspark/py4j-*-src.zip"
py4j=($pattern)
# pick the first match py4j zip - there should only be one
PYSPARKPATH="${ZEPPELIN_HOME}/interpreter/spark/pyspark/pyspark.zip:${py4j[0]}"
if [[ -z "${PYTHONPATH}" ]]; then
export PYTHONPATH="${PYSPARKPATH}"

View file

@ -105,7 +105,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter, org.apache.zeppelin.hbase.HbaseInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

View file

@ -36,6 +36,8 @@
<li>
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Interpreter <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="{{BASE_PATH}}/manual/interpreters.html">Overview</a></li>
<li role="separator" class="divider"></li>
<li><a href="{{BASE_PATH}}/interpreter/cassandra.html">Cassandra</a></li>
<li><a href="{{BASE_PATH}}/interpreter/elasticsearch.html">Elasticsearch</a></li>
<li><a href="{{BASE_PATH}}/interpreter/flink.html">Flink</a></li>
@ -45,6 +47,7 @@
<li><a href="{{BASE_PATH}}/interpreter/lens.html">Lens</a></li>
<li><a href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li>
<li><a href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, hawq</a></li>
<li><a href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li>
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Shell</a></li>
<li><a href="{{BASE_PATH}}/interpreter/spark.html">Spark</a></li>
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Tajo</a></li>

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 119 KiB

View file

Before

Width:  |  Height:  |  Size: 54 KiB

After

Width:  |  Height:  |  Size: 54 KiB

View file

Before

Width:  |  Height:  |  Size: 113 KiB

After

Width:  |  Height:  |  Size: 113 KiB

View file

Before

Width:  |  Height:  |  Size: 56 KiB

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 96 KiB

View file

@ -1,72 +0,0 @@
---
layout: page
title: "Docs"
description: ""
group: nav-right
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
{% include JB/setup %}
### Install
* [Install](./install/install.html)
* [YARN Install](./install/yarn_install.html)
* [Virtual Machine Install](./install/virtual_machine.html)
### Tutorial
* [Tutorial](./tutorial/tutorial.html)
### Interpreter
**[Interpreters in zeppelin](manual/interpreters.html)**
* [cassandra](./interpreter/cassandra.html)
* [flink](./interpreter/flink.html)
* [geode](./interpreter/geode.html)
* [hive](./interpreter/hive.html)
* [ignite](./interpreter/ignite.html)
* [lens](./interpreter/lens.html)
* [md](./interpreter/markdown.html)
* [postgresql, hawq](./interpreter/postgresql.html)
* [sh](./pleasecontribute.html)
* [spark](./interpreter/spark.html)
* [tajo](./pleasecontribute.html)
* [elasticsearch](./interpreter/elasticsearch.html)
### Storage
* [S3 Storage](./storage/storage.html)
### Display System
* [text](./displaysystem/display.html)
* [html](./displaysystem/display.html#html)
* [table](./displaysystem/table.html)
* [angular](./displaysystem/angular.html) (Beta)
### Manual
* [Dynamic Form](./manual/dynamicform.html)
* [Notebook as Homepage](./manual/notebookashomepage.html)
### REST API
* [Interpreter API](./rest-api/rest-interpreter.html)
* [Notebook API](./rest-api/rest-notebook.html)
### Development
* [Writing Zeppelin Interpreter](./development/writingzeppelininterpreter.html)
* [How to contribute (code)](./development/howtocontribute.html)
* [How to contribute (website)](./development/howtocontributewebsite.html)

View file

@ -176,12 +176,6 @@ Configuration can be done by both environment variable(conf/zeppelin-env.sh) and
<td>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</td>
<td>Comma separated list of notebook storage</td>
</tr>
<tr>
<td>ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE</td>
<td>zeppelin.notebook.reloadAllNotesFromStorage</td>
<td>false</td>
<td>Notebook list and contents will be always loaded from repository if set true. If set false, modified notebooks or new notebooks added on file system level won't be reflected on Zeppelin till user restarts Zeppelin.</td>
</tr>
<tr>
<td>ZEPPELIN_INTERPRETERS</td>
<td>zeppelin.interpreters</td>

View file

@ -110,6 +110,7 @@ With the `search` command, you can send a search query to Elasticsearch. There a
* This is a shortcut to a query like that: `{ "query": { "query_string": { "query": "__HERE YOUR QUERY__", "analyze_wildcard": true } } }`
* See [Elasticsearch query string syntax](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax) for more details about the content of such a query.
```bash
| %elasticsearch
| search /index1,index2,.../type1,type2,... <JSON document containing the query or query_string elements>
@ -124,6 +125,9 @@ If you want to modify the size of the result set, you can add a line that is set
```
> A search query can also contain [aggregations](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html). If there is at least one aggregation, the result of the first aggregation is shown, otherwise, you get the search hits.
Examples:
* With a JSON query:
@ -134,6 +138,15 @@ Examples:
|
| %elasticsearch
| search /logs { "query": { "query_string": { "query": "request.method:GET AND status:200" } } }
|
| %elasticsearch
| search /logs { "aggs": {
| "content_length_stats": {
| "extended_stats": {
| "field": "content_length"
| }
| }
| } }
```
* With query_string elements:
@ -159,16 +172,17 @@ Suppose we have a JSON document:
"url": "/zeppelin/4cd001cd-c517-4fa9-b8e5-a06b8f4056c4",
"headers": [ "Accept: *.*", "Host: apache.org"]
},
"status": "403"
"status": "403",
"content_length": 1234
}
```
The data will be flattened like this:
date | request.headers[0] | request.headers[1] | request.method | request.url | status
-----|--------------------|--------------------|----------------|-------------|-------
2015-12-08T21:03:13.588Z | Accept: \*.\* | Host: apache.org | GET | /zeppelin/4cd001cd-c517-4fa9-b8e5-a06b8f4056c4 | 403
content_length | date | request.headers[0] | request.headers[1] | request.method | request.url | status
---------------|------|--------------------|--------------------|----------------|-------------|-------
1234 | 2015-12-08T21:03:13.588Z | Accept: \*.\* | Host: apache.org | GET | /zeppelin/4cd001cd-c517-4fa9-b8e5-a06b8f4056c4 | 403
Examples:
@ -185,6 +199,12 @@ Examples:
* With a query string:
![Elasticsearch - Search with query string](../assets/themes/zeppelin/img/docs-img/elasticsearch-query-string.png)
* With a query containing a multi-value metric aggregation:
![Elasticsearch - Search with aggregation (multi-value metric)](../assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-value-metric.png)
* With a query containing a multi-bucket aggregation:
![Elasticsearch - Search with aggregation (multi-bucket)](../assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-bucket-pie.png)
#### count
With the `count` command, you can count documents available in some indices and types. You can also provide a query.

View file

@ -0,0 +1,78 @@
---
layout: page
title: "Scalding Interpreter"
description: ""
group: manual
---
{% include JB/setup %}
## Scalding Interpreter for Apache Zeppelin
[Scalding](https://github.com/twitter/scalding) is an open source Scala library for writing MapReduce jobs.
### Building the Scalding Interpreter
You have to first build the Scalding interpreter by enable the **scalding** profile as follows:
```
mvn clean package -Pscalding -DskipTests
```
### Enabling the Scalding Interpreter
In a notebook, to enable the **Scalding** interpreter, click on the **Gear** icon,select **Scalding**, and hit **Save**.
<center>
![Interpreter Binding](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png)
![Interpreter Selection](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png)
</center>
### Configuring the Interpreter
Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything.
### Testing the Interpreter
In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book.
```
%scalding
import scala.io.Source
// Get the Alice in Wonderland book from gutenberg.org:
val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines
val aliceLineNum = alice.zipWithIndex.toList
val alicePipe = TypedPipe.from(aliceLineNum)
// Now get a list of words for the book:
val aliceWords = alicePipe.flatMap { case (text, _) => text.split("\\s+").toList }
// Now lets add a count for each word:
val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word => (word, 1L) }
// let's sum them for each word:
val wordCount = aliceWithCount.group.sum
print ("Here are the top 10 words\n")
val top10 = wordCount
.groupAll
.sortBy { case (word, count) => -count }
.take(10)
top10.dump
```
```
%scalding
val table = "words\t count\n" + top10.toIterator.map{case (k, (word, count)) => s"$word\t$count"}.mkString("\n")
print("%table " + table)
```
If you click on the icon for the pie chart, you should be able to see a chart like this:
![Scalding - Pie - Chart](../assets/themes/zeppelin/img/docs-img/scalding-pie.png)
### Current Status & Future Work
The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates.
The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!).

View file

@ -17,17 +17,10 @@
package org.apache.zeppelin.elasticsearch;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import com.github.wnameless.json.flattener.JsonFlattener;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@ -43,16 +36,21 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.wnameless.json.flattener.JsonFlattener;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
/**
@ -313,7 +311,8 @@ public class ElasticsearchInterpreter extends Interpreter {
* Processes a "search" request.
*
* @param urlItems Items of the URL
* @param data May contains the limit and the JSON of the request
* @param data May contains the JSON of the request
* @param size Limit of result set
* @return Result of the search request, it contains a tab-formatted string of the matching hits
*/
private InterpreterResult processSearch(String[] urlItems, String data, int size) {
@ -325,10 +324,7 @@ public class ElasticsearchInterpreter extends Interpreter {
final SearchResponse response = searchData(urlItems, data, size);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE,
buildResponseMessage(response.getHits().getHits()));
return buildResponseMessage(response);
}
/**
@ -419,7 +415,39 @@ public class ElasticsearchInterpreter extends Interpreter {
return response;
}
private String buildResponseMessage(SearchHit[] hits) {
private InterpreterResult buildAggResponseMessage(Aggregations aggregations) {
// Only the result of the first aggregation is returned
//
final Aggregation agg = aggregations.asList().get(0);
InterpreterResult.Type resType = InterpreterResult.Type.TEXT;
String resMsg = "";
if (agg instanceof InternalMetricsAggregation) {
resMsg = XContentHelper.toString((InternalMetricsAggregation) agg).toString();
}
else if (agg instanceof InternalSingleBucketAggregation) {
resMsg = XContentHelper.toString((InternalSingleBucketAggregation) agg).toString();
}
else if (agg instanceof InternalMultiBucketAggregation) {
final StringBuffer buffer = new StringBuffer("key\tdoc_count");
final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
for (MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
buffer.append("\n")
.append(bucket.getKeyAsString())
.append("\t")
.append(bucket.getDocCount());
}
resType = InterpreterResult.Type.TABLE;
resMsg = buffer.toString();
}
return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg);
}
private String buildSearchHitsResponseMessage(SearchHit[] hits) {
if (hits == null || hits.length == 0) {
return "";
@ -462,4 +490,18 @@ public class ElasticsearchInterpreter extends Interpreter {
return buffer.toString();
}
private InterpreterResult buildResponseMessage(SearchResponse response) {
final Aggregations aggregations = response.getAggregations();
if (aggregations != null && aggregations.asList().size() > 0) {
return buildAggResponseMessage(aggregations);
}
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE,
buildSearchHitsResponseMessage(response.getHits().getHits()));
}
}

View file

@ -17,31 +17,27 @@
package org.apache.zeppelin.elasticsearch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
public class ElasticsearchInterpreterTest {
private static Client elsClient;
@ -49,7 +45,7 @@ public class ElasticsearchInterpreterTest {
private static ElasticsearchInterpreter interpreter;
private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" };
private static final String[] STATUS = { "200", "404", "500", "403" };
private static final int[] STATUS = { 200, 404, 500, 403 };
private static final String ELS_CLUSTER_NAME = "zeppelin-elasticsearch-interpreter-test";
private static final String ELS_HOST = "localhost";
@ -71,6 +67,14 @@ public class ElasticsearchInterpreterTest {
elsNode = NodeBuilder.nodeBuilder().settings(settings).node();
elsClient = elsNode.client();
elsClient.admin().indices().prepareCreate("logs")
.addMapping("http", jsonBuilder()
.startObject().startObject("http").startObject("properties")
.startObject("content_length")
.field("type", "integer")
.endObject()
.endObject().endObject().endObject()).get();
for (int i = 0; i < 50; i++) {
elsClient.prepareIndex("logs", "http", "" + i)
@ -84,6 +88,7 @@ public class ElasticsearchInterpreterTest {
.field("headers", Arrays.asList("Accept: *.*", "Host: apache.org"))
.endObject()
.field("status", STATUS[RandomUtils.nextInt(STATUS.length)])
.field("content_length", RandomUtils.nextInt(2000))
)
.get();
}
@ -147,6 +152,31 @@ public class ElasticsearchInterpreterTest {
res = interpreter.interpret("search /logs status:404", null);
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testAgg() {
// Single-value metric
InterpreterResult res = interpreter.interpret("search /logs { \"aggs\" : { \"distinct_status_count\" : " +
" { \"cardinality\" : { \"field\" : \"status\" } } } }", null);
assertEquals(Code.SUCCESS, res.code());
// Multi-value metric
res = interpreter.interpret("search /logs { \"aggs\" : { \"content_length_stats\" : " +
" { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", null);
assertEquals(Code.SUCCESS, res.code());
// Single bucket
res = interpreter.interpret("search /logs { \"aggs\" : { " +
" \"200_OK\" : { \"filter\" : { \"term\": { \"status\": \"200\" } }, " +
" \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", null);
assertEquals(Code.SUCCESS, res.code());
// Multi-buckets
res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " +
" { \"terms\" : { \"field\" : \"status\" } } } }", null);
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testIndex() {

View file

@ -33,7 +33,7 @@
<url>http://zeppelin.incubator.apache.org</url>
<properties>
<ignite.version>1.4.0</ignite.version>
<ignite.version>1.5.0.final</ignite.version>
<ignite.scala.binary.version>2.10</ignite.scala.binary.version>
<ignite.scala.version>2.10.4</ignite.scala.version>
</properties>

View file

@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.zeppelin.interpreter.InterpreterContext;
@ -59,6 +60,7 @@ public class IgniteSqlInterpreterTest {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(discoSpi);
cfg.setPeerClassLoadingEnabled(true);
cfg.setMarshaller(new OptimizedMarshaller());
cfg.setGridName("test");

21
pom.xml
View file

@ -628,6 +628,13 @@
</modules>
</profile>
<profile>
<id>scalding</id>
<modules>
<module>scalding</module>
</modules>
</profile>
<profile>
<id>build-distr</id>
<activation>
@ -688,7 +695,7 @@
<plugins>
<plugin>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.4</version>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
@ -703,15 +710,5 @@
</build>
</profile>
</profiles>
<distributionManagement>
<site>
<id>Website</id>
<url>${site_url}</url>
</site>
<repository>
<id>${repoid}</id>
<name>${reponame}</name>
<url>${repourl}</url>
</repository>
</distributionManagement>
</project>

202
scalding/pom.xml Normal file
View file

@ -0,0 +1,202 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.6.0-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-scalding</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<name>Zeppelin: Scalding interpreter</name>
<url>http://zeppelin.incubator.apache.org</url>
<properties>
<scala.version>2.10.4</scala.version>
<hadoop.version>2.3.0</hadoop.version>
<scalding.version>0.15.1-RC13</scalding.version>
</properties>
<repositories>
<repository>
<id>conjars</id>
<name>Concurrent Maven Repo</name>
<url>http://conjars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scalding-core_2.10</artifactId>
<version>${scalding.version}</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scalding-repl_2.10</artifactId>
<version>${scalding.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/scalding</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/scalding</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<!-- Plugin to compile Scala code -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,288 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.scalding;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.net.URL;
import java.net.URLClassLoader;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Console;
import scala.Some;
import scala.None;
import scala.tools.nsc.Settings;
import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
import scala.tools.nsc.settings.MutableSettings.PathSetting;
/**
* Scalding interpreter for Zeppelin. Based off the Spark interpreter code.
*
*/
public class ScaldingInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class);
public static final List<String> NO_COMPLETION =
Collections.unmodifiableList(new ArrayList<String>());
static {
Interpreter.register("scalding", ScaldingInterpreter.class.getName());
}
private ScaldingILoop interpreter;
private ByteArrayOutputStream out;
private Map<String, Object> binder;
public ScaldingInterpreter(Properties property) {
super(property);
out = new ByteArrayOutputStream();
}
@Override
public void open() {
URL[] urls = getClassloaderUrls();
// Very nice discussion about how scala compiler handle classpath
// https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
/*
* > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new
* Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through
* nsc.Settings.classpath.
*
* >> val settings = new Settings() >> settings.usejavacp.value = true >>
* settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >>
* val in = new Interpreter(settings) { >> override protected def parentClassLoader =
* getClass.getClassLoader >> } >> in.setContextClassLoader()
*/
Settings settings = new Settings();
// set classpath for scala compiler
PathSetting pathSettings = settings.classpath();
String classpath = "";
List<File> paths = currentClassPath();
for (File f : paths) {
if (classpath.length() > 0) {
classpath += File.pathSeparator;
}
classpath += f.getAbsolutePath();
}
if (urls != null) {
for (URL u : urls) {
if (classpath.length() > 0) {
classpath += File.pathSeparator;
}
classpath += u.getFile();
}
}
pathSettings.v_$eq(classpath);
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
// set classloader for scala compiler
settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
.getContextClassLoader()));
BooleanSetting b = (BooleanSetting) settings.usejavacp();
b.v_$eq(true);
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
/* Scalding interpreter */
PrintStream printStream = new PrintStream(out);
interpreter = new ScaldingILoop(null, new PrintWriter(out));
interpreter.settings_$eq(settings);
interpreter.createInterpreter();
interpreter.intp().
interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
binder = (Map<String, Object>) getValue("_binder");
binder.put("out", printStream);
}
private Object getValue(String name) {
Object ret = interpreter.intp().valueOfTerm(name);
if (ret instanceof None) {
return null;
} else if (ret instanceof Some) {
return ((Some) ret).get();
} else {
return ret;
}
}
private List<File> currentClassPath() {
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
if (cps != null) {
for (String cp : cps) {
paths.add(new File(cp));
}
}
return paths;
}
private List<File> classPath(ClassLoader cl) {
List<File> paths = new LinkedList<File>();
if (cl == null) {
return paths;
}
if (cl instanceof URLClassLoader) {
URLClassLoader ucl = (URLClassLoader) cl;
URL[] urls = ucl.getURLs();
if (urls != null) {
for (URL url : urls) {
paths.add(new File(url.getFile()));
}
}
}
return paths;
}
@Override
public void close() {
interpreter.intp().close();
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.info("Running Scalding command '" + cmd + "'");
if (cmd == null || cmd.trim().length() == 0) {
return new InterpreterResult(Code.SUCCESS);
}
return interpret(cmd.split("\n"), contextInterpreter);
}
public InterpreterResult interpret(String[] lines, InterpreterContext context) {
synchronized (this) {
InterpreterResult r = interpretInput(lines);
return r;
}
}
public InterpreterResult interpretInput(String[] lines) {
// add print("") to make sure not finishing with comment
// see https://github.com/NFLabs/zeppelin/issues/151
String[] linesToRun = new String[lines.length + 1];
for (int i = 0; i < lines.length; i++) {
linesToRun[i] = lines[i];
}
linesToRun[lines.length] = "print(\"\")";
Console.setOut((java.io.PrintStream) binder.get("out"));
out.reset();
Code r = null;
String incomplete = "";
for (int l = 0; l < linesToRun.length; l++) {
String s = linesToRun[l];
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
if (l + 1 < linesToRun.length) {
String nextLine = linesToRun[l + 1].trim();
if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) {
incomplete += s + "\n";
continue;
}
}
scala.tools.nsc.interpreter.Results.Result res = null;
try {
res = interpreter.intp().interpret(incomplete + s);
} catch (Exception e) {
logger.error("Interpreter exception: ", e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
r = getResultCode(res);
if (r == Code.ERROR) {
Console.flush();
return new InterpreterResult(r, out.toString());
} else if (r == Code.INCOMPLETE) {
incomplete += s + "\n";
} else {
incomplete = "";
}
}
if (r == Code.INCOMPLETE) {
return new InterpreterResult(r, "Incomplete expression");
} else {
Console.flush();
return new InterpreterResult(r, out.toString());
}
}
private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
return Code.SUCCESS;
} else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
return Code.INCOMPLETE;
} else {
return Code.ERROR;
}
}
@Override
public void cancel(InterpreterContext context) {
// not implemented
}
@Override
public FormType getFormType() {
return FormType.NATIVE;
}
@Override
public int getProgress(InterpreterContext context) {
// fine-grained progress not implemented - return 0
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
ScaldingInterpreter.class.getName() + this.hashCode());
}
@Override
public List<String> completion(String buf, int cursor) {
return NO_COMPLETION;
}
}

View file

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.scalding;
import java.io.{BufferedReader, File, FileReader}
import scala.tools.nsc.GenericRunnerSettings
import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter}
/**
* A class providing Scalding specific commands for inclusion in the Scalding REPL.
* This is currently forked from Scalding, but should eventually make it into Scalding itself:
* https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala
*/
class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter)
extends ILoop(in0, out) {
// def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
// def this() = this(None, new JPrintWriter(Console.out, true))
settings = new GenericRunnerSettings({ s => echo(s) })
override def printWelcome() {
val fc = Console.YELLOW
val wc = Console.RED
def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc)
echo(fc +
" ( \n" +
" )\\ ) ( ( \n" +
"(()/( ) )\\ )\\ ) ( ( ( \n" +
" /(_)) ( ( /( ((_)(()/( )\\ ( )\\))( \n" +
"(_)) )\\ )( )) _ ((_)(( ) )\\ ) (( ))\\ \n".replaceAll("_", wc + "_" + fc) + wc +
wrapFlames("/ __|((_) ((_)_ | | _| | (_) _(_(( (_()_) \n") +
wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\ \n") +
"|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, | \n" +
" |___/ ")
}
/**
* Commands specific to the Scalding REPL. To define a new command use one of the following
* factory methods:
* - `LoopCommand.nullary` for commands that take no arguments
* - `LoopCommand.cmd` for commands that take one string argument
* - `LoopCommand.varargs` for commands that take multiple string arguments
*/
private val scaldingCommands: List[LoopCommand] = List()
/**
* Change the shell prompt to read scalding&gt;
*
* @return a prompt string to use for this REPL.
*/
override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET
private[this] def addImports(ids: String*): IR.Result =
if (ids.isEmpty) IR.Success
else intp.interpret("import " + ids.mkString(", "))
/**
* Search for files with the given name in all directories from current directory
* up to root.
*/
private def findAllUpPath(filename: String): List[File] =
Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent)
.takeWhile(_ != "/")
.flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename)))
.toList
/**
* Gets the list of commands that this REPL supports.
*
* @return a list of the command supported by this REPL.
*/
override def commands: List[LoopCommand] = super.commands ++ scaldingCommands
protected def imports: List[String] = List(
"com.twitter.scalding._",
"com.twitter.scalding.ReplImplicits._",
"com.twitter.scalding.ReplImplicitContext._",
"com.twitter.scalding.ReplState._")
override def createInterpreter() {
super.createInterpreter()
intp.beQuietDuring {
addImports(imports: _*)
settings match {
case s: GenericRunnerSettings =>
findAllUpPath(".scalding_repl").reverse.foreach {
f => s.loadfiles.appendToValue(f.toString)
}
case _ => ()
}
}
}
}

View file

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.scalding;
import static org.junit.Assert.*;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
/**
* Tests for the Scalding interpreter for Zeppelin.
*
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ScaldingInterpreterTest {
public static ScaldingInterpreter repl;
private InterpreterContext context;
private File tmpDir;
@Before
public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo");
tmpDir.mkdirs();
if (repl == null) {
Properties p = new Properties();
repl = new ScaldingInterpreter(p);
repl.open();
}
InterpreterGroup intpGroup = new InterpreterGroup();
context = new InterpreterContext("note", "id", "title", "text",
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
}
@After
public void tearDown() throws Exception {
delete(tmpDir);
repl.close();
}
private void delete(File file) {
if (file.isFile()) file.delete();
else if (file.isDirectory()) {
File[] files = file.listFiles();
if (files != null && files.length > 0) {
for (File f : files) {
delete(f);
}
}
file.delete();
}
}
@Test
public void testBasicIntp() {
assertEquals(InterpreterResult.Code.SUCCESS,
repl.interpret("val a = 1\nval b = 2", context).code());
// when interpret incomplete expression
InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
assertTrue(incomplete.message().length() > 0); // expecting some error
// message
}
@Test
public void testBasicScalding() {
assertEquals(InterpreterResult.Code.SUCCESS,
repl.interpret("case class Sale(state: String, name: String, sale: Int)\n" +
"val salesList = List(Sale(\"CA\", \"A\", 60), Sale(\"CA\", \"A\", 20), Sale(\"VA\", \"B\", 15))\n" +
"val salesPipe = TypedPipe.from(salesList)\n" +
"val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" +
" groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" +
"results.dump",
context).code());
}
@Test
public void testNextLineInvocation() {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
}
@Test
public void testEndWithComment() {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
}
@Test
public void testReferencingUndefinedVal() {
InterpreterResult result = repl.interpret("def category(min: Int) = {"
+ " if (0 <= value) \"error\"" + "}", context);
assertEquals(Code.ERROR, result.code());
}
}

View file

@ -33,7 +33,7 @@
<name>Zeppelin: Spark dependencies</name>
<description>Zeppelin spark support</description>
<url>http://zeppelin.incubator.apache.org</url>
<properties>
<spark.version>1.4.1</spark.version>
@ -130,117 +130,117 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependencies>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
@ -489,7 +489,7 @@
<dependencies>
</dependencies>
</profile>
<profile>
<id>cassandra-spark-1.5</id>
<properties>
@ -513,7 +513,21 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-1.6</id>
<properties>
<spark.version>1.6.0</spark.version>
<py4j.version>0.9</py4j.version>
<akka.group>com.typesafe.akka</akka.group>
<akka.version>2.3.11</akka.version>
<protobuf.version>2.5.0</protobuf.version>
</properties>
<dependencies>
</dependencies>
</profile>
<profile>
<id>hadoop-0.23</id>
<!-- SPARK-1121: Adds an explicit dependency on Avro to work around a
@ -731,10 +745,6 @@
<profile>
<id>pyspark</id>
<properties>
<spark.download.url>http://archive.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz
</spark.download.url>
</properties>
<build>
<plugins>
<plugin>

View file

@ -33,16 +33,70 @@
<name>Zeppelin: Spark</name>
<description>Zeppelin spark support</description>
<url>http://zeppelin.incubator.apache.org</url>
<properties>
<spark.version>1.4.1</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<hadoop.version>2.3.0</hadoop.version>
<py4j.version>0.8.2.1</py4j.version>
</properties>
<profiles>
<profile>
<id>vendor-repo</id>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
</profile>
<profile>
<id>spark-1.1</id>
<properties>
<spark.version>1.1.1</spark.version>
</properties>
</profile>
<profile>
<id>spark-1.2</id>
<properties>
<spark.version>1.2.1</spark.version>
</properties>
</profile>
<profile>
<id>spark-1.3</id>
<properties>
<spark.version>1.3.1</spark.version>
</properties>
</profile>
<profile>
<id>spark-1.4</id>
<properties>
<spark.version>1.4.1</spark.version>
</properties>
</profile>
<profile>
<id>spark-1.5</id>
<properties>
<spark.version>1.5.2</spark.version>
</properties>
</profile>
<profile>
<id>spark-1.6</id>
<properties>
<spark.version>1.6.0</spark.version>
<py4j.version>0.9</py4j.version>
</properties>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
@ -72,7 +126,7 @@
<artifactId>guava</artifactId>
<version>14.0.1</version>
</dependency>
<!-- Aether :: maven dependency resolution -->
<dependency>
<groupId>org.apache.maven</groupId>
@ -294,18 +348,6 @@
</dependency>
</dependencies>
<profiles>
<profile>
<id>vendor-repo</id>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
</profile>
</profiles>
<build>
<plugins>
<plugin>
@ -397,7 +439,7 @@
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>

View file

@ -317,7 +317,8 @@ public class SparkInterpreter extends Interpreter {
"python" + File.separator + "lib");
}
String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.8.2.1-src.zip"};
//Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist
String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip"};
ArrayList<String> pythonLibUris = new ArrayList<>();
for (String lib : pythonLibs) {
File libFile = new File(pysparkPath, lib);

View file

@ -32,9 +32,10 @@ public class SparkVersion {
public static final SparkVersion SPARK_1_4_0 = SparkVersion.fromVersionString("1.4.0");
public static final SparkVersion SPARK_1_5_0 = SparkVersion.fromVersionString("1.5.0");
public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0");
public static final SparkVersion SPARK_1_7_0 = SparkVersion.fromVersionString("1.7.0");
public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_0_0;
public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_1_6_0;
public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_1_7_0;
private int version;
private String versionString;

View file

@ -295,30 +295,30 @@ public class ZeppelinContext extends HashMap<String, Object> {
try {
take = df.getClass().getMethod("take", int.class);
rows = (Object[]) take.invoke(df, maxResult + 1);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | ClassCastException e) {
sc.clearJobGroup();
throw new InterpreterException(e);
}
String msg = null;
List<Attribute> columns = null;
// get field names
Method queryExecution;
QueryExecution qe;
try {
queryExecution = df.getClass().getMethod("queryExecution");
qe = (QueryExecution) queryExecution.invoke(df);
// Use reflection because of classname returned by queryExecution changes from
// Spark <1.5.2 org.apache.spark.sql.SQLContext$QueryExecution
// Spark 1.6.0> org.apache.spark.sql.hive.HiveContext$QueryExecution
Object qe = df.getClass().getMethod("queryExecution").invoke(df);
Object a = qe.getClass().getMethod("analyzed").invoke(qe);
scala.collection.Seq seq = (scala.collection.Seq) a.getClass().getMethod("output").invoke(a);
columns = (List<Attribute>) scala.collection.JavaConverters.seqAsJavaListConverter(seq)
.asJava();
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
}
List<Attribute> columns =
scala.collection.JavaConverters.asJavaListConverter(
qe.analyzed().output()).asJava();
String msg = null;
for (Attribute col : columns) {
if (msg == null) {
msg = col.name();

View file

@ -111,7 +111,7 @@ class PySparkCompletion:
def getGlobalCompletion(self):
objectDefList = []
try:
for completionItem in list(globals().iterkeys()):
for completionItem in list(globals().keys()):
objectDefList.append(completionItem)
except:
return None
@ -119,18 +119,20 @@ class PySparkCompletion:
return objectDefList
def getMethodCompletion(self, text_value):
objectDefList = []
execResult = locals()
if text_value == None:
return None
completion_target = text_value
try:
if len(completion_target) <= 0:
return None
if text_value[-1] == ".":
completion_target = text_value[:-1]
exec("%s = %s(%s)" % ("objectDefList", "dir", completion_target))
exec("{} = dir({})".format("objectDefList", completion_target), globals(), execResult)
except:
return None
else:
return objectDefList
return list(execResult['objectDefList'])
def getCompletion(self, text_value):
@ -147,9 +149,9 @@ class PySparkCompletion:
for completionItem in list(objectCompletionList):
completionList.add(completionItem)
if len(completionList) <= 0:
print ""
print("")
else:
print json.dumps(filter(lambda x : not re.match("^__.*", x), list(completionList)))
print(json.dumps(list(filter(lambda x : not re.match("^__.*", x), list(completionList)))))
output = Logger()

View file

@ -1,4 +1,4 @@
(Apache 2.0) nvd3.js v1.1.15-beta (http://nvd3.org/) - https://github.com/novus/nvd3/blob/v1.1.15-beta/LICENSE.md
(Apache 2.0) nvd3.js v1.7.1 (http://nvd3.org/) - https://github.com/novus/nvd3/blob/v1.7.1/LICENSE.md
(Apache 2.0) gson v2.2 (com.google.code.gson:gson:jar:2.2 - https://github.com/google/gson) - https://github.com/google/gson/blob/gson-2.2/LICENSE
(Apache 2.0) Amazon Web Services SDK for Java v1.10.1 (https://aws.amazon.com/sdk-for-java/) - https://raw.githubusercontent.com/aws/aws-sdk-java/1.10.1/LICENSE.txt
(Apache 2.0) JavaEWAH v0.7.9 (https://github.com/lemire/javaewah) - https://github.com/lemire/javaewah/blob/master/LICENSE-2.0.txt
@ -10,11 +10,11 @@ The following components are provided under Apache License.
(Apache 2.0) Apache Commons Logging (commons-logging:commons-logging:1.1.1 - http://commons.apache.org/proper/commons-logging/)
(Apache 2.0) Apache Commons Codec (commons-codec:commons-codec:1.5 - http://commons.apache.org/proper/commons-codec/)
(Apache 2.0) Apache Commons Collections (commons-collections:commons-collections:3.2.1 - http://commons.apache.org/proper/commons-configuration/)
(Apache 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.9 - http://commons.apache.org/proper/commons-compress/)
(Apache 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.9 - http://commons.apache.org/proper/commons-compress/)
(Apache 2.0) Apache Commons Configuration (commons-configuration:commons-configuration:1.9 - http://commons.apache.org/configuration/)
(Apache 2.0) Apache Commons CLI (commons-cli:commons-cli:1.2 - http://commons.apache.org/cli/)
(Apache 2.0) Apache Commons Exec (commons-exec:commons-exec:1.3 - http://commons.apache.org/exec/)
(Apache 2.0) Http Components (org.apache.httpcomponents:httpcore:4.3.3 - https://github.com/apache/httpclient)
(Apache 2.0) Http Components (org.apache.httpcomponents:httpcore:4.3.3 - https://github.com/apache/httpclient)
(Apache 2.0) Http Components (org.apache.httpcomponents:httpclient:4.3.6 - https://github.com/apache/httpclient)
(Apache 2.0) Apache Commons Lang (org.apache.commons:commons-lang:2.5 - http://commons.apache.org/proper/commons-lang/)
(Apache 2.0) Apache Commons Lang 3 (org.apache.commons:commons-lang3:3.4 - http://commons.apache.org/proper/commons-lang/)
@ -54,7 +54,7 @@ The following components are provided under Apache License.
(Apache 2.0) Jackson (com.fasterxml.jackson.core:jackson-annotations:2.5.0 - https://github.com/FasterXML/jackson-core)
(Apache 2.0) Jackson (com.fasterxml.jackson.core:jackson-databind:2.5.3 - https://github.com/FasterXML/jackson-core)
(Apache 2.0) javax.servlet (org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016 - http://www.eclipse.org/jetty)
(Apache 2.0) Joda-Time (joda-time:joda-time:2.8.1 - http://www.joda.org/joda-time/)
(Apache 2.0) Joda-Time (joda-time:joda-time:2.8.1 - http://www.joda.org/joda-time/)
(Apache 2.0) Jackson (org.codehaus.jackson:jackson-core-asl:1.9.13 - http://jackson.codehaus.org/)
(Apache 2.0) JetS3t (net.java.dev.jets3t:jets3t:jar:0.9.3) - http://www.jets3t.org/
(Apache 2.0) Jetty (org.eclipse.jetty:jetty - http://www.eclipse.org/jetty)
@ -111,6 +111,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
(The MIT License) Angular Websocket v1.0.13 (http://angularclass.github.io/angular-websocket/) - https://github.com/AngularClass/angular-websocket/blob/v1.0.13/LICENSE
(The MIT License) UI.Ace v0.1.1 (http://angularclass.github.io/angular-websocket/) - https://github.com/angular-ui/ui-ace/blob/master/LICENSE
(The MIT License) jquery.scrollTo v1.4.13 (https://github.com/flesler/jquery.scrollTo) - https://github.com/flesler/jquery.scrollTo/blob/1.4.13/LICENSE
(The MIT License) jquery.floatThead v1.3.2 (https://github.com/mkoryak/floatThead) - https://github.com/mkoryak/floatThead/blob/master/license.txt
(The MIT License) angular-dragdrop v1.0.8 (http://codef0rmer.github.io/angular-dragdrop/#/) - https://github.com/codef0rmer/angular-dragdrop/blob/v1.0.8/LICENSE
(The MIT License) perfect-scrollbar v0.5.4 (http://noraesae.github.io/perfect-scrollbar/) - https://github.com/noraesae/perfect-scrollbar/tree/0.5.4
(The MIT License) ng-sortable v1.1.9 (https://github.com/a5hik/ng-sortable) - https://github.com/a5hik/ng-sortable/blob/1.1.9/LICENSE
@ -128,7 +129,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
The following components are provided under the MIT License.
(The MIT License) Objenesis (org.objenesis:objenesis:2.1 - https://github.com/easymock/objenesis) - Copyright (c) 2006-2015 the original author and authors
(The MIT License) JCL 1.1.1 implemented over SLF4J (org.slf4j:jcl-over-slf4j:1.7.5 - http://www.slf4j.org)
(The MIT License) JCL 1.1.1 implemented over SLF4J (org.slf4j:jcl-over-slf4j:1.7.5 - http://www.slf4j.org)
(The MIT License) JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.5 - http://www.slf4j.org)
(The MIT License) angular-resource (angular-resource - https://github.com/angular/angular.js/tree/master/src/ngResource)
(The MIT License) minimal-json (com.eclipsesource.minimal-json:minimal-json:0.9.4 - https://github.com/ralfstx/minimal-json)
@ -155,7 +156,7 @@ The following components are provided under the BSD-style License.
(New BSD License) JGit (org.eclipse.jgit:org.eclipse.jgit:jar:4.1.1.201511131810-r - https://eclipse.org/jgit/)
(New BSD License) Kryo (com.esotericsoftware.kryo:kryo:2.21 - http://code.google.com/p/kryo/)
(New BSD License) MinLog (com.esotericsoftware.minlog:minlog:1.2 - http://code.google.com/p/minlog/)
(New BSD License) ReflectASM (com.esotericsoftware.reflectasm:reflectasm:1.07 - http://code.google.com/p/reflectasm/)
(New BSD License) ReflectASM (com.esotericsoftware.reflectasm:reflectasm:1.07 - http://code.google.com/p/reflectasm/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.10.4 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.10.4 - http://www.scala-lang.org/)
(BSD-like) (The BSD License) jline (org.scala-lang:jline:2.10.4 - http://www.scala-lang.org/)
@ -165,7 +166,7 @@ The following components are provided under the BSD-style License.
(BSD-like) ASM (asm:asm:jar:3.1 - http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
(New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(New BSD License) Markdown4j (org.commonjava.googlecode.markdown4j:markdown4j:jar:2.2-cj-1.0 - https://code.google.com/p/markdown4j/)
========================================================================
@ -211,4 +212,3 @@ Creative Commons CC0 (http://creativecommons.org/publicdomain/zero/1.0/)
(CC0 1.0 Universal) JSR166e (com.twitter:jsr166e:1.1.0 - http://github.com/twitter/jsr166e)
(Public Domain, per Creative Commons CC0) HdrHistogram (org.hdrhistogram:HdrHistogram:2.1.6 - http://hdrhistogram.github.io/HdrHistogram/)

View file

@ -1,4 +1,4 @@
Copyright (c) 2011, 2012 Novus Partners, Inc.
Copyright (c) 2011-2014 Novus Partners, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.remote;
import java.util.List;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
@ -77,15 +78,19 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
}
Client client = null;
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
client.angularObjectAdd(name, noteId, gson.toJson(o));
return super.add(name, o, noteId, true);
} catch (TException e) {
broken = true;
logger.error("Error", e);
} catch (Exception e) {
logger.error("Error", e);
} finally {
if (client != null) {
remoteInterpreterProcess.releaseClient(client);
remoteInterpreterProcess.releaseClient(client, broken);
}
}
return null;
@ -106,15 +111,19 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
}
Client client = null;
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
client.angularObjectRemove(name, noteId);
return super.remove(name, noteId);
} catch (TException e) {
broken = true;
logger.error("Error", e);
} catch (Exception e) {
logger.error("Error", e);
} finally {
if (client != null) {
remoteInterpreterProcess.releaseClient(client);
remoteInterpreterProcess.releaseClient(client, broken);
}
}
return null;

View file

@ -131,6 +131,7 @@ public class RemoteInterpreter extends Interpreter {
throw new InterpreterException(e1);
}
boolean broken = false;
try {
for (Interpreter intp : this.getInterpreterGroup()) {
logger.info("Create remote interpreter {}", intp.getClassName());
@ -138,9 +139,10 @@ public class RemoteInterpreter extends Interpreter {
}
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}
}
@ -158,14 +160,19 @@ public class RemoteInterpreter extends Interpreter {
public void close() {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
Client client = null;
boolean broken = false;
try {
client = interpreterProcess.getClient();
client.close(className);
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} catch (Exception e1) {
throw new InterpreterException(e1);
} finally {
if (client != null) {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
getInterpreterProcess().dereference();
}
@ -195,6 +202,7 @@ public class RemoteInterpreter extends Interpreter {
interpreterContextRunnerPool.addAll(noteId, runners);
}
boolean broken = false;
try {
GUI settings = context.getGui();
RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context));
@ -215,9 +223,10 @@ public class RemoteInterpreter extends Interpreter {
InterpreterResult result = convert(remoteResult);
return result;
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}
@ -231,12 +240,14 @@ public class RemoteInterpreter extends Interpreter {
throw new InterpreterException(e1);
}
boolean broken = false;
try {
client.cancel(className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}
@ -257,13 +268,15 @@ public class RemoteInterpreter extends Interpreter {
throw new InterpreterException(e1);
}
boolean broken = false;
try {
formType = FormType.valueOf(client.getFormType(className));
return formType;
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}
@ -277,12 +290,14 @@ public class RemoteInterpreter extends Interpreter {
throw new InterpreterException(e1);
}
boolean broken = false;
try {
return client.getProgress(className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}
@ -297,12 +312,14 @@ public class RemoteInterpreter extends Interpreter {
throw new InterpreterException(e1);
}
boolean broken = false;
try {
return client.completion(className, buf, cursor);
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}

View file

@ -66,16 +66,18 @@ public class RemoteInterpreterEventPoller extends Thread {
}
RemoteInterpreterEvent event = null;
boolean broken = false;
try {
event = client.getEvent();
} catch (TException e) {
broken = true;
logger.error("Can't get RemoteInterpreterEvent", e);
waitQuietly();
continue;
} finally {
interpreterProcess.releaseClient(client, broken);
}
interpreterProcess.releaseClient(client);
Gson gson = new Gson();
AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();

View file

@ -140,7 +140,27 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
}
public void releaseClient(Client client) {
clientPool.returnObject(client);
releaseClient(client, false);
}
public void releaseClient(Client client, boolean broken) {
if (broken) {
releaseBrokenClient(client);
} else {
try {
clientPool.returnObject(client);
} catch (Exception e) {
logger.warn("exception occurred during releasing thrift client", e);
}
}
}
public void releaseBrokenClient(Client client) {
try {
clientPool.invalidateObject(client);
} catch (Exception e) {
logger.warn("exception occurred during releasing thrift client", e);
}
}
public int dereference() {
@ -159,7 +179,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
// safely ignore exception while client.shutdown() may terminates remote process
} finally {
if (client != null) {
releaseClient(client);
// no longer used
releaseBrokenClient(client);
}
}
@ -250,13 +271,15 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
logger.error("Can't update angular object", e);
}
boolean broken = false;
try {
Gson gson = new Gson();
client.angularObjectUpdate(name, noteId, gson.toJson(o));
} catch (TException e) {
broken = true;
logger.error("Can't update angular object", e);
} finally {
releaseClient(client);
releaseClient(client, broken);
}
}

View file

@ -251,6 +251,7 @@ public class RemoteScheduler implements Scheduler {
return Status.ERROR;
}
boolean broken = false;
try {
String statusStr = client.getStatus(job.getId());
if ("Unknown".equals(statusStr)) {
@ -265,6 +266,7 @@ public class RemoteScheduler implements Scheduler {
listener.afterStatusChange(job, null, status);
return status;
} catch (TException e) {
broken = true;
logger.error("Can't get status information", e);
lastStatus = Status.ERROR;
return Status.ERROR;
@ -273,7 +275,7 @@ public class RemoteScheduler implements Scheduler {
lastStatus = Status.ERROR;
return Status.ERROR;
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}
}

View file

@ -131,7 +131,7 @@ public class NotebookRestApi {
@GET
@Path("/")
public Response getNotebookList() throws IOException {
List<Map<String, String>> notesInfo = notebookServer.generateNotebooksInfo();
List<Map<String, String>> notesInfo = notebookServer.generateNotebooksInfo(false);
return new JsonResponse<>(Status.OK, "", notesInfo ).build();
}

View file

@ -86,6 +86,7 @@ public class Message {
// @param completions list of string
LIST_NOTES, // [c-s] ask list of note
RELOAD_NOTES_FROM_REPO, // [c-s] reload notes from repo
NOTES_INFO, // [s-c] list of note infos
// @param notes serialized List<NoteInfo> object

View file

@ -101,6 +101,9 @@ public class NotebookServer extends WebSocketServlet implements
case LIST_NOTES:
broadcastNoteList();
break;
case RELOAD_NOTES_FROM_REPO:
broadcastReloadedNoteList();
break;
case GET_HOME_NOTE:
sendHomeNote(conn, notebook);
break;
@ -291,7 +294,7 @@ public class NotebookServer extends WebSocketServlet implements
}
}
public List<Map<String, String>> generateNotebooksInfo (){
public List<Map<String, String>> generateNotebooksInfo(boolean needsReload) {
Notebook notebook = notebook();
ZeppelinConfiguration conf = notebook.getConf();
@ -299,6 +302,14 @@ public class NotebookServer extends WebSocketServlet implements
boolean hideHomeScreenNotebookFromList = conf
.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE);
if (needsReload) {
try {
notebook.reloadAllNotes();
} catch (IOException e) {
LOG.error("Fail to reload notes from repository");
}
}
List<Note> notes = notebook.getAllNotes();
List<Map<String, String>> notesInfo = new LinkedList<>();
for (Note note : notes) {
@ -321,8 +332,12 @@ public class NotebookServer extends WebSocketServlet implements
}
public void broadcastNoteList() {
List<Map<String, String>> notesInfo = generateNotebooksInfo(false);
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
}
List<Map<String, String>> notesInfo = generateNotebooksInfo();
public void broadcastReloadedNoteList() {
List<Map<String, String>> notesInfo = generateNotebooksInfo(true);
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
}
@ -401,7 +416,8 @@ public class NotebookServer extends WebSocketServlet implements
return cronUpdated;
}
private void createNote(WebSocket conn, Notebook notebook, Message message) throws IOException {
private void createNote(NotebookSocket conn, Notebook notebook, Message message)
throws IOException {
Note note = notebook.createNote();
note.addParagraph(); // it's an empty note. so add one paragraph
if (message != null) {
@ -414,7 +430,7 @@ public class NotebookServer extends WebSocketServlet implements
note.persist();
addConnectionToNote(note.id(), (NotebookSocket) conn);
broadcastNote(note);
conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
broadcastNoteList();
}
@ -458,7 +474,7 @@ public class NotebookServer extends WebSocketServlet implements
String name = (String) fromMessage.get("name");
Note newNote = notebook.cloneNote(noteId, name);
addConnectionToNote(newNote.id(), (NotebookSocket) conn);
broadcastNote(newNote);
conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
broadcastNoteList();
}

View file

@ -165,11 +165,14 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
}
assertEquals("<p>markdown</p>\n", p.getResult().message());
// restart interpreter
for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) {
if (setting.getName().equals("md")) {
// restart
ZeppelinServer.notebook.getInterpreterFactory().restart(setting.id());
// Call Restart Interpreter REST API
PutMethod put = httpPut("/interpreter/setting/restart/" + setting.id(), "");
assertThat("test interpreter restart:", put, isAllowed());
put.releaseConnection();
break;
}
}

View file

@ -17,7 +17,7 @@
"ace-builds": "1.1.9",
"angular-ui-ace": "0.1.1",
"jquery.scrollTo": "~1.4.13",
"nvd3": "~1.1.15-beta",
"nvd3": "~1.7.1",
"angular-dragdrop": "~1.0.8",
"perfect-scrollbar": "~0.5.4",
"ng-sortable": "~1.1.9",
@ -29,7 +29,8 @@
"angular-filter": "~0.5.4",
"ngtoast": "~1.5.5",
"ng-focus-if": "~1.0.2",
"bootstrap3-dialog": "bootstrap-dialog#~1.34.7"
"bootstrap3-dialog": "bootstrap-dialog#~1.34.7",
"floatThead": "~1.3.2"
},
"devDependencies": {
"angular-mocks": "1.3.8"

View file

@ -22,6 +22,8 @@ angular.module('zeppelinWebApp').controller('HomeCtrl', function($scope, noteboo
vm.notebookHome = false;
vm.staticHome = false;
$scope.isReloading = false;
var initHome = function() {
websocketMsgSrv.getHomeNotebook();
@ -46,4 +48,13 @@ angular.module('zeppelinWebApp').controller('HomeCtrl', function($scope, noteboo
vm.notebookHome = false;
}
});
$scope.$on('setNoteMenu', function(event, notes) {
$scope.isReloadingNotes = false;
});
$scope.reloadNotebookList = function() {
websocketMsgSrv.reloadAllNotesFromRepo();
$scope.isReloadingNotes = true;
};
});

View file

@ -26,7 +26,12 @@ limitations under the License.
<div class="row">
<div class="col-md-4">
<h4>Notebook</h4>
<h4>Notebook
<i ng-class="isReloadingNotes ? 'fa fa-refresh fa-spin' : 'fa fa-refresh'"
ng-style="!isReloadingNotes && {'cursor': 'pointer'}" style="font-size: 13px;"
ng-click="reloadNotebookList();">
</i>
</h4>
<div>
<h5><a href="" data-toggle="modal" data-target="#noteImportModal" style="text-decoration: none;">

View file

@ -100,6 +100,12 @@ limitations under the License.
{{note.info.cron}}
</p>
</div>
<div>
<span>- auto-restart interpreter on cron execution </span>
<input type="checkbox"
ng-model="note.config.releaseresource"
ng-click="setReleaseResource(note.config.releaseresource)">
</div>
</div>
</li>
</ul>

View file

@ -107,7 +107,6 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl',
// register mouseevent handler for focus paragraph
document.addEventListener('click', $scope.focusParagraphOnClick);
$scope.keyboardShortcut = function(keyEvent) {
// handle keyevent
if (!$scope.viewOnly) {
@ -282,6 +281,12 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl',
$scope.setConfig();
};
/** Set release resource for this note **/
$scope.setReleaseResource = function(value) {
$scope.note.config.releaseresource = value;
$scope.setConfig();
};
/** Update note config **/
$scope.setConfig = function(config) {
if(config) {

View file

@ -38,7 +38,7 @@ limitations under the License.
data-toggle="dropdown"
type="button">
</span>
<ul class="dropdown-menu" role="menu" style="width:200px;">
<ul class="dropdown-menu" role="menu" style="width:200px;z-index:1002">
<li>
<a ng-click="$event.stopPropagation()" class="dropdown"><span class="fa fa-arrows-h"></span> Width
<form style="display:inline; margin-left:5px;">

View file

@ -841,9 +841,8 @@ angular.module('zeppelinWebApp')
if ($scope.paragraph.id === paragraphId) {
// focus editor
if (!$scope.paragraph.config.editorHide) {
$scope.editor.focus();
if (!mouseEvent) {
$scope.editor.focus();
// move cursor to the first row (or the last row)
var row;
if (cursorPos >= 0) {
@ -1021,8 +1020,7 @@ angular.module('zeppelinWebApp')
var renderTable = function() {
var html = '';
html += '<table class="table table-hover table-condensed" style="top: 0; position: absolute;">';
html += '<table class="table table-hover table-condensed">';
html += ' <thead>';
html += ' <tr style="background-color: #F6F6F6; font-weight: bold;">';
for (var titleIndex in $scope.paragraph.result.columnNames) {
@ -1030,10 +1028,7 @@ angular.module('zeppelinWebApp')
}
html += ' </tr>';
html += ' </thead>';
html += '</table>';
html += '<table class="table table-hover table-condensed" style="margin-top: 31px;">';
html += ' <tbody>';
for (var r in $scope.paragraph.result.msgTable) {
var row = $scope.paragraph.result.msgTable[r];
html += ' <tr>';
@ -1048,19 +1043,38 @@ angular.module('zeppelinWebApp')
}
html += ' </tr>';
}
html += ' </tbody>';
html += '</table>';
angular.element('#p' + $scope.paragraph.id + '_table').html(html);
if ($scope.paragraph.result.msgTable.length > 10000) {
angular.element('#p' + $scope.paragraph.id + '_table').css('overflow', 'scroll');
// set table height
var height = $scope.paragraph.config.graph.height;
angular.element('#p' + $scope.paragraph.id + '_table').css('height', height);
} else {
var dataTable = angular.element('#p' + $scope.paragraph.id + '_table .table');
dataTable.floatThead({
scrollContainer: function (dataTable) {
return angular.element('#p' + $scope.paragraph.id + '_table');
}
});
angular.element('#p' + $scope.paragraph.id + '_table .table').on('remove', function () {
angular.element('#p' + $scope.paragraph.id + '_table .table').floatThead('destroy');
});
angular.element('#p' + $scope.paragraph.id + '_table').css('position', 'relative');
angular.element('#p' + $scope.paragraph.id + '_table').css('height', '100%');
angular.element('#p' + $scope.paragraph.id + '_table').perfectScrollbar('destroy');
angular.element('#p' + $scope.paragraph.id + '_table').perfectScrollbar();
angular.element('.ps-scrollbar-y-rail').css('z-index', '1002');
// set table height
var psHeight = $scope.paragraph.config.graph.height;
angular.element('#p' + $scope.paragraph.id + '_table').css('height', psHeight);
angular.element('#p' + $scope.paragraph.id + '_table').perfectScrollbar('update');
}
// set table height
var height = $scope.paragraph.config.graph.height;
angular.element('#p' + $scope.paragraph.id + '_table').height(height);
};
var retryRenderer = function() {
@ -1124,7 +1138,7 @@ angular.module('zeppelinWebApp')
$scope.chart[type].yAxis.tickFormat(function(d) {return xAxisTickFormat(d, yLabels);});
// configure how the tooltip looks.
$scope.chart[type].tooltipContent(function(key, x, y, data) {
$scope.chart[type].tooltipContent(function(key, x, y, graph, data) {
var tooltipContent = '<h3>' + key + '</h3>';
if ($scope.paragraph.config.graph.scatter.size &&
$scope.isValidSizeOption($scope.paragraph.config.graph.scatter, $scope.paragraph.result.rows)) {
@ -1135,9 +1149,8 @@ angular.module('zeppelinWebApp')
});
$scope.chart[type].showDistX(true)
.showDistY(true)
.showDistY(true);
//handle the problem of tooltip not showing when muliple points have same value.
.scatter.useVoronoi(false);
} else {
var p = pivot(data);
if (type === 'pieChart') {

View file

@ -27,3 +27,8 @@ body {
.nv-controlsWrap {
display: block;
}
.paragraph-col .focused {
box-shadow: 0px 2px 7px rgba(0, 0, 0, 0.3);
border-color: white;
}

View file

@ -34,14 +34,6 @@ angular.module('zeppelinWebApp').controller('NotenameCtrl', function($scope, $ro
vm.createNote();
};
$scope.$on('setNoteContent', function(event, note) {
//a hack, to make it run only after notebook creation
//it should not run i.e in case of linking to the paragraph
if (note && $location.path().indexOf(note.id) < 0) {
$location.path('notebook/' + note.id);
}
});
vm.preVisible = function(clone) {
var generatedName = vm.generateName();
$scope.note.notename = 'Note ' + generatedName;

View file

@ -13,7 +13,7 @@
*/
'use strict';
angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $websocket, baseUrlSrv) {
angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $websocket, $location, baseUrlSrv) {
var websocketCalls = {};
websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketUrl());
@ -46,6 +46,8 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope,
var data = payload.data;
if (op === 'NOTE') {
$rootScope.$broadcast('setNoteContent', data.note);
} else if (op === 'NEW_NOTE') {
$location.path('notebook/' + data.note.id);
} else if (op === 'NOTES_INFO') {
$rootScope.$broadcast('setNoteMenu', data.notes);
} else if (op === 'PARAGRAPH') {

View file

@ -32,10 +32,15 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope,
cloneNotebook: function(noteIdToClone, newNoteName ) {
websocketEvents.sendNewEvent({op: 'CLONE_NOTE', data: {id: noteIdToClone, name: newNoteName}});
},
getNotebookList: function() {
websocketEvents.sendNewEvent({op: 'LIST_NOTES'});
},
reloadAllNotesFromRepo: function() {
websocketEvents.sendNewEvent({op: 'RELOAD_NOTES_FROM_REPO'});
},
getNotebook: function(noteId) {
websocketEvents.sendNewEvent({op: 'GET_NOTE', data: {id: noteId}});
},

View file

@ -37,7 +37,7 @@ limitations under the License.
<!-- build:css(.) styles/vendor.css -->
<!-- bower:css -->
<link rel="stylesheet" href="bower_components/bootstrap/dist/css/bootstrap.css" />
<link rel="stylesheet" href="bower_components/nvd3/src/nv.d3.css" />
<link rel="stylesheet" href="bower_components/nvd3/build/nv.d3.css" />
<link rel="stylesheet" href="bower_components/perfect-scrollbar/src/perfect-scrollbar.css" />
<link rel="stylesheet" href="bower_components/ng-sortable/dist/ng-sortable.css" />
<link rel="stylesheet" href="bower_components/angular-xeditable/dist/css/xeditable.css" />
@ -110,7 +110,7 @@ limitations under the License.
<script src="bower_components/angular-ui-ace/ui-ace.js"></script>
<script src="bower_components/jquery.scrollTo/jquery.scrollTo.js"></script>
<script src="bower_components/d3/d3.js"></script>
<script src="bower_components/nvd3/nv.d3.js"></script>
<script src="bower_components/nvd3/build/nv.d3.js"></script>
<script src="bower_components/jquery-ui/jquery-ui.js"></script>
<script src="bower_components/angular-dragdrop/src/angular-dragdrop.js"></script>
<script src="bower_components/perfect-scrollbar/src/perfect-scrollbar.js"></script>
@ -124,6 +124,8 @@ limitations under the License.
<script src="bower_components/ngtoast/dist/ngToast.js"></script>
<script src="bower_components/ng-focus-if/focusIf.js"></script>
<script src="bower_components/bootstrap3-dialog/dist/js/bootstrap-dialog.min.js"></script>
<script src="bower_components/floatThead/dist/jquery.floatThead.js"></script>
<script src="bower_components/floatThead/dist/jquery.floatThead.min.js"></script>
<!-- endbower -->
<!-- endbuild -->
<!-- build:js({.tmp,src}) scripts/scripts.js -->

View file

@ -43,7 +43,7 @@ module.exports = function(config) {
'bower_components/angular-ui-ace/ui-ace.js',
'bower_components/jquery.scrollTo/jquery.scrollTo.js',
'bower_components/d3/d3.js',
'bower_components/nvd3/nv.d3.js',
'bower_components/nvd3/build/nv.d3.js',
'bower_components/jquery-ui/jquery-ui.js',
'bower_components/angular-dragdrop/src/angular-dragdrop.js',
'bower_components/perfect-scrollbar/src/perfect-scrollbar.js',
@ -57,6 +57,8 @@ module.exports = function(config) {
'bower_components/ngtoast/dist/ngToast.js',
'bower_components/ng-focus-if/focusIf.js',
'bower_components/bootstrap3-dialog/dist/js/bootstrap-dialog.min.js',
'bower_components/floatThead/dist/jquery.floatThead.js',
'bower_components/floatThead/dist/jquery.floatThead.min.js',
'bower_components/angular-mocks/angular-mocks.js',
// endbower
'src/app/app.js',

View file

@ -411,7 +411,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.geode.GeodeOqlInterpreter,"
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+ "org.apache.zeppelin.kylin.KylinInterpreter,"
+ "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter"),
+ "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
+ "org.apache.zeppelin.scalding.ScaldingInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
@ -423,10 +424,6 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"),
ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"),
ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
// Notebook list and contents will be always loaded from repository if set true.
// If set false, modified notebooks or new notebooks added on file system level
// won't be reflected on Zeppelin till user restarts Zeppelin.
ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE("zeppelin.notebook.reloadAllNotesFromStorage", false),
ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"),
// Decide when new note is created, interpreter settings will be binded automatically or not.
ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true),

View file

@ -519,6 +519,9 @@ public class InterpreterFactory {
synchronized (interpreterSettings) {
InterpreterSetting intpsetting = interpreterSettings.get(id);
if (intpsetting != null) {
stopJobAllInterpreter(intpsetting);
intpsetting.getInterpreterGroup().close();
intpsetting.getInterpreterGroup().destroy();
@ -541,20 +544,7 @@ public class InterpreterFactory {
InterpreterSetting intpsetting = interpreterSettings.get(id);
if (intpsetting != null) {
for (Interpreter intp : intpsetting.getInterpreterGroup()) {
for (Job job : intp.getScheduler().getJobsRunning()) {
job.abort();
job.setStatus(Status.ABORT);
logger.info("Job " + job.getJobName() + " aborted ");
}
for (Job job : intp.getScheduler().getJobsWaiting()) {
job.abort();
job.setStatus(Status.ABORT);
logger.info("Job " + job.getJobName() + " aborted ");
}
}
stopJobAllInterpreter(intpsetting);
intpsetting.getInterpreterGroup().close();
intpsetting.getInterpreterGroup().destroy();
@ -570,6 +560,22 @@ public class InterpreterFactory {
}
}
private void stopJobAllInterpreter(InterpreterSetting intpsetting) {
if (intpsetting != null) {
for (Interpreter intp : intpsetting.getInterpreterGroup()) {
for (Job job : intp.getScheduler().getJobsRunning()) {
job.abort();
job.setStatus(Status.ABORT);
logger.info("Job " + job.getJobName() + " aborted ");
}
for (Job job : intp.getScheduler().getJobsWaiting()) {
job.abort();
job.setStatus(Status.ABORT);
logger.info("Job " + job.getJobName() + " aborted ");
}
}
}
}
public void close() {
List<Thread> closeThreads = new LinkedList<Thread>();

View file

@ -38,6 +38,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.quartz.CronScheduleBuilder;
@ -330,10 +331,18 @@ public class Notebook {
* @return
* @throws IOException
*/
private void reloadAllNotes() throws IOException {
public void reloadAllNotes() throws IOException {
synchronized (notes) {
notes.clear();
}
if (notebookRepo instanceof NotebookRepoSync) {
NotebookRepoSync mainRepo = (NotebookRepoSync) notebookRepo;
if (mainRepo.getRepoCount() > 1) {
mainRepo.sync();
}
}
List<NoteInfo> noteInfos = notebookRepo.list();
for (NoteInfo info : noteInfos) {
loadNoteFromRepo(info.getId());
@ -366,13 +375,6 @@ public class Notebook {
}
public List<Note> getAllNotes() {
if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE)) {
try {
reloadAllNotes();
} catch (IOException e) {
logger.error("Cannot reload notes from storage", e);
}
}
synchronized (notes) {
List<Note> noteList = new ArrayList<Note>(notes.values());
Collections.sort(noteList, new Comparator<Note>() {
@ -413,6 +415,26 @@ public class Notebook {
String noteId = context.getJobDetail().getJobDataMap().getString("noteId");
Note note = notebook.getNote(noteId);
note.runAll();
while (!note.getLastParagraph().isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
boolean releaseResource = false;
try {
releaseResource = (boolean) note.getConfig().get("releaseresource");
} catch (java.lang.ClassCastException e) {
e.printStackTrace();
}
if (releaseResource) {
for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) {
notebook.getInterpreterFactory().restart(setting.id());
}
}
}
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.notebook.repo;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@ -41,7 +42,9 @@ public class NotebookRepoSync implements NotebookRepo {
private static final int maxRepoNum = 2;
private static final String pushKey = "pushNoteIDs";
private static final String pullKey = "pullNoteIDs";
private static ZeppelinConfiguration config;
private static final String defaultStorage = "org.apache.zeppelin.notebook.repo.VFSNotebookRepo";
private List<NotebookRepo> repos = new ArrayList<NotebookRepo>();
@ -50,28 +53,60 @@ public class NotebookRepoSync implements NotebookRepo {
* @param (conf)
* @throws - Exception
*/
public NotebookRepoSync(ZeppelinConfiguration conf) throws Exception {
@SuppressWarnings("static-access")
public NotebookRepoSync(ZeppelinConfiguration conf) {
config = conf;
String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
if (allStorageClassNames.isEmpty()) {
throw new IOException("Empty ZEPPELIN_NOTEBOOK_STORAGE conf parameter");
allStorageClassNames = defaultStorage;
LOG.warn("Empty ZEPPELIN_NOTEBOOK_STORAGE conf parameter, using default {}", defaultStorage);
}
String[] storageClassNames = allStorageClassNames.split(",");
if (storageClassNames.length > getMaxRepoNum()) {
throw new IOException("Unsupported number of storage classes (" +
storageClassNames.length + ") in ZEPPELIN_NOTEBOOK_STORAGE");
LOG.warn("Unsupported number {} of storage classes in ZEPPELIN_NOTEBOOK_STORAGE : {}\n" +
"first {} will be used", storageClassNames.length, allStorageClassNames, getMaxRepoNum());
}
for (int i = 0; i < storageClassNames.length; i++) {
for (int i = 0; i < Math.min(storageClassNames.length, getMaxRepoNum()); i++) {
@SuppressWarnings("static-access")
Class<?> notebookStorageClass = getClass().forName(storageClassNames[i].trim());
Class<?> notebookStorageClass;
try {
notebookStorageClass = getClass().forName(storageClassNames[i].trim());
Constructor<?> constructor = notebookStorageClass.getConstructor(
ZeppelinConfiguration.class);
repos.add((NotebookRepo) constructor.newInstance(conf));
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException |
InstantiationException | IllegalAccessException | IllegalArgumentException |
InvocationTargetException e) {
LOG.warn("Failed to initialize {} notebook storage class {}", storageClassNames[i], e);
}
}
// couldn't initialize any storage, use default
if (getRepoCount() == 0) {
LOG.info("No storages could be initialized, using default {} storage", defaultStorage);
initializeDefaultStorage(conf);
}
if (getRepoCount() > 1) {
try {
sync(0, 1);
} catch (IOException e) {
LOG.warn("Failed to sync with secondary storage on start {}", e);
}
}
}
@SuppressWarnings("static-access")
private void initializeDefaultStorage(ZeppelinConfiguration conf) {
Class<?> notebookStorageClass;
try {
notebookStorageClass = getClass().forName(defaultStorage);
Constructor<?> constructor = notebookStorageClass.getConstructor(
ZeppelinConfiguration.class);
repos.add((NotebookRepo) constructor.newInstance(conf));
}
if (getRepoCount() > 1) {
sync(0, 1);
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException |
InstantiationException | IllegalAccessException | IllegalArgumentException |
InvocationTargetException e) {
LOG.warn("Failed to initialize {} notebook storage class {}", defaultStorage, e);
}
}
@ -80,9 +115,6 @@ public class NotebookRepoSync implements NotebookRepo {
*/
@Override
public List<NoteInfo> list() throws IOException {
if (config.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE) && getRepoCount() > 1) {
sync(0, 1);
}
return getRepo(0).list();
}
@ -182,7 +214,7 @@ public class NotebookRepoSync implements NotebookRepo {
}
}
int getRepoCount() {
public int getRepoCount() {
return repos.size();
}
@ -190,7 +222,7 @@ public class NotebookRepoSync implements NotebookRepo {
return maxRepoNum;
}
private NotebookRepo getRepo(int repoIndex) throws IOException {
NotebookRepo getRepo(int repoIndex) throws IOException {
if (repoIndex < 0 || repoIndex >= getRepoCount()) {
throw new IOException("Storage repo index is out of range");
}

View file

@ -38,6 +38,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
@ -47,7 +48,6 @@ import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.search.LuceneSearch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -122,42 +122,39 @@ public class NotebookTest implements JobListenerFactory{
}
@Test
public void testGetAllNotes() throws IOException {
// get all notes after copy the {notebookId}/note.json into notebookDir
public void testReloadAllNotes() throws IOException {
File srcDir = new File("src/test/resources/2A94M5J1Z");
File destDir = new File(notebookDir.getAbsolutePath() + "/2A94M5J1Z");
// copy the notebook
try {
FileUtils.copyDirectory(srcDir, destDir);
} catch (IOException e) {
e.printStackTrace();
}
Note copiedNote = notebookRepo.get("2A94M5J1Z");
// when ZEPPELIN_NOTEBOOK_GET_FROM_REPO set to be false
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "false");
// doesn't have copied notebook in memory before reloading
List<Note> notes = notebook.getAllNotes();
assertEquals(notes.size(), 0);
// when ZEPPELIN_NOTEBOOK_GET_FROM_REPO set to be true
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "true");
// load copied notebook on memory when reloadAllNotes() is called
Note copiedNote = notebookRepo.get("2A94M5J1Z");
notebook.reloadAllNotes();
notes = notebook.getAllNotes();
assertEquals(notes.size(), 1);
assertEquals(notes.get(0).id(), copiedNote.id());
assertEquals(notes.get(0).getName(), copiedNote.getName());
assertEquals(notes.get(0).getParagraphs(), copiedNote.getParagraphs());
// get all notes after remove the {notebookId}/note.json from notebookDir
// when ZEPPELIN_NOTEBOOK_GET_FROM_REPO set to be false
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "false");
// delete the notebook
FileUtils.deleteDirectory(destDir);
// keep notebook in memory before reloading
notes = notebook.getAllNotes();
assertEquals(notes.size(), 1);
// when ZEPPELIN_NOTEBOOK_GET_FROM_REPO set to be true
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "true");
// delete notebook from notebook list when reloadAllNotes() is called
notebook.reloadAllNotes();
notes = notebook.getAllNotes();
assertEquals(notes.size(), 0);
}
@ -249,6 +246,47 @@ public class NotebookTest implements JobListenerFactory{
assertEquals(dateFinished, p.getDateFinished());
}
@Test
public void testAutoRestartInterpreterAfterSchedule() throws InterruptedException, IOException{
// create a note and a paragraph
Note note = notebook.createNote();
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
Paragraph p = note.addParagraph();
Map config = new HashMap<String, Object>();
p.setConfig(config);
p.setText("p1");
// set cron scheduler, once a second
config = note.getConfig();
config.put("enabled", true);
config.put("cron", "* * * * * ?");
config.put("releaseresource", "true");
note.setConfig(config);
notebook.refreshCron(note.id());
while (p.getStatus() != Status.FINISHED) {
Thread.sleep(100);
}
Date dateFinished = p.getDateFinished();
assertNotNull(dateFinished);
// restart interpreter
for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) {
notebook.getInterpreterFactory().restart(setting.id());
}
Thread.sleep(1000);
while (p.getStatus() != Status.FINISHED) {
Thread.sleep(100);
}
assertNotEquals(dateFinished, p.getDateFinished());
// remove cron scheduler.
config.put("cron", null);
note.setConfig(config);
notebook.refreshCron(note.id());
}
@Test
public void testCloneNote() throws IOException, CloneNotSupportedException,
InterruptedException {

View file

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.notebook.repo.mock.VFSNotebookRepoMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NotebookRepoSyncInitializationTest {
private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSyncInitializationTest.class);
private String validFirstStorageClass = "org.apache.zeppelin.notebook.repo.VFSNotebookRepo";
private String validSecondStorageClass = "org.apache.zeppelin.notebook.repo.mock.VFSNotebookRepoMock";
private String invalidStorageClass = "org.apache.zeppelin.notebook.repo.DummyNotebookRepo";
private String validOneStorageConf = validFirstStorageClass;
private String validTwoStorageConf = validFirstStorageClass + "," + validSecondStorageClass;
private String invalidTwoStorageConf = validFirstStorageClass + "," + invalidStorageClass;
private String unsupportedStorageConf = validFirstStorageClass + "," + validSecondStorageClass + "," + validSecondStorageClass;
private String emptyStorageConf = "";
@Before
public void setUp(){
//setup routine
}
@After
public void tearDown() {
//tear-down routine
}
@Test
public void validInitOneStorageTest() throws IOException {
// no need to initialize folder due to one storage
// set confs
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), validOneStorageConf);
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
// create repo
NotebookRepoSync notebookRepoSync = new NotebookRepoSync(conf);
// check proper initialization of one storage
assertEquals(notebookRepoSync.getRepoCount(), 1);
assertTrue(notebookRepoSync.getRepo(0) instanceof VFSNotebookRepo);
}
@Test
public void validInitTwoStorageTest() throws IOException {
// initialize folders for each storage
String zpath = System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis();
File mainZepDir = new File(zpath);
mainZepDir.mkdirs();
new File(mainZepDir, "conf").mkdirs();
String mainNotePath = zpath+"/notebook";
String secNotePath = mainNotePath + "_secondary";
File mainNotebookDir = new File(mainNotePath);
File secNotebookDir = new File(secNotePath);
mainNotebookDir.mkdirs();
secNotebookDir.mkdirs();
// set confs
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), mainZepDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), validTwoStorageConf);
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
// create repo
NotebookRepoSync notebookRepoSync = new NotebookRepoSync(conf);
// check that both initialized
assertEquals(notebookRepoSync.getRepoCount(), 2);
assertTrue(notebookRepoSync.getRepo(0) instanceof VFSNotebookRepo);
assertTrue(notebookRepoSync.getRepo(1) instanceof VFSNotebookRepoMock);
}
@Test
public void invalidInitTwoStorageTest() throws IOException {
// set confs
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), invalidTwoStorageConf);
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
// create repo
NotebookRepoSync notebookRepoSync = new NotebookRepoSync(conf);
// check that second didn't initialize
LOG.info(" " + notebookRepoSync.getRepoCount());
assertEquals(notebookRepoSync.getRepoCount(), 1);
assertTrue(notebookRepoSync.getRepo(0) instanceof VFSNotebookRepo);
}
@Test
public void initUnsupportedNumberStoragesTest() throws IOException {
// initialize folders for each storage, currently for 2 only
String zpath = System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis();
File mainZepDir = new File(zpath);
mainZepDir.mkdirs();
new File(mainZepDir, "conf").mkdirs();
String mainNotePath = zpath+"/notebook";
String secNotePath = mainNotePath + "_secondary";
File mainNotebookDir = new File(mainNotePath);
File secNotebookDir = new File(secNotePath);
mainNotebookDir.mkdirs();
secNotebookDir.mkdirs();
// set confs
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), mainZepDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), unsupportedStorageConf);
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
// create repo
NotebookRepoSync notebookRepoSync = new NotebookRepoSync(conf);
// check that first two storages initialized instead of three
assertEquals(notebookRepoSync.getRepoCount(), 2);
assertTrue(notebookRepoSync.getRepo(0) instanceof VFSNotebookRepo);
assertTrue(notebookRepoSync.getRepo(1) instanceof VFSNotebookRepoMock);
}
@Test
public void initEmptyStorageTest() throws IOException {
// set confs
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), emptyStorageConf);
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
// create repo
NotebookRepoSync notebookRepoSync = new NotebookRepoSync(conf);
// check initialization of one default storage
assertEquals(notebookRepoSync.getRepoCount(), 1);
assertTrue(notebookRepoSync.getRepo(0) instanceof VFSNotebookRepo);
}
@Test
public void initOneDummyStorageTest() throws IOException {
// set confs
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), invalidStorageClass);
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
// create repo
NotebookRepoSync notebookRepoSync = new NotebookRepoSync(conf);
// check initialization of one default storage instead of invalid one
assertEquals(notebookRepoSync.getRepoCount(), 1);
assertTrue(notebookRepoSync.getRepo(0) instanceof VFSNotebookRepo);
}
}

View file

@ -184,41 +184,32 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
assertEquals(p1.getId(), notebookRepoSync.get(1,
notebookRepoSync.list(1).get(0).getId()).getLastParagraph().getId());
}
@Test
public void testSyncOnList() throws IOException {
/* check that both storage repos are empty */
assertTrue(notebookRepoSync.getRepoCount() > 1);
assertEquals(0, notebookRepoSync.list(0).size());
assertEquals(0, notebookRepoSync.list(1).size());
File srcDir = new File("src/test/resources/2A94M5J1Z");
File destDir = new File(secNotebookDir + "/2A94M5J1Z");
/* copy manually new notebook into secondary storage repo and check repos */
public void testSyncOnReloadedList() throws IOException {
/* check that both storage repos are empty */
assertTrue(notebookRepoSync.getRepoCount() > 1);
assertEquals(0, notebookRepoSync.list(0).size());
assertEquals(0, notebookRepoSync.list(1).size());
File srcDir = new File("src/test/resources/2A94M5J1Z");
File destDir = new File(secNotebookDir + "/2A94M5J1Z");
/* copy manually new notebook into secondary storage repo and check repos */
try {
FileUtils.copyDirectory(srcDir, destDir);
} catch (IOException e) {
e.printStackTrace();
}
FileUtils.copyDirectory(srcDir, destDir);
} catch (IOException e) {
e.printStackTrace();
}
assertEquals(0, notebookRepoSync.list(0).size());
assertEquals(1, notebookRepoSync.list(1).size());
/* Although new notebook is added to secondary storage it's not displayed
* on list() with ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE set to false
*/
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "false");
assertEquals(0, notebookRepoSync.list().size());
/* notebook is synced after ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE variable is set to true */
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "true");
assertEquals(1, notebookRepoSync.list().size());
// After reloading notebooks repos should be synchronized
notebookSync.reloadAllNotes();
assertEquals(1, notebookRepoSync.list(0).size());
assertEquals(1, notebookRepoSync.list(1).size());
assertEquals(1, notebookRepoSync.list(1).size());
}
static void delete(File file){
if(file.isFile()) file.delete();
else if(file.isDirectory()){
@ -230,8 +221,8 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
}
file.delete();
}
}
@Override
public JobListener getParagraphJobListener(Note note) {
return new JobListener(){