Merge remote-tracking branch 'origin/master' into ZEPPELIN-1320-2

This commit is contained in:
Prabhjyot Singh 2016-11-10 09:23:54 +05:30
commit 02c308423a
219 changed files with 7352 additions and 1959 deletions

5
.gitignore vendored
View file

@ -1,4 +1,5 @@
*.class
*.pyc
# Package Files #
*.jar
@ -6,7 +7,8 @@
*.ear
# interpreter
/interpreter/
/interpreter/*
!/interpreter/lib
# interpreter temp files
spark/derby.log
@ -21,6 +23,7 @@ lens/lens-cli-hist.log
conf/zeppelin-env.sh
conf/zeppelin-env.cmd
conf/zeppelin-site.xml
conf/shiro.ini
conf/keystore
conf/truststore
conf/interpreter.json

View file

@ -36,7 +36,7 @@ matrix:
include:
# Test License compliance using RAT tool
- jdk: "oraclejdk7"
env: SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.3" PROFILE="-Prat" BUILD_FLAG="clean" TEST_FLAG="org.apache.rat:apache-rat-plugin:check" TEST_PROJECTS=""
env: SCALA_VER="2.11" PROFILE="-Prat" BUILD_FLAG="clean" TEST_FLAG="org.apache.rat:apache-rat-plugin:check" TEST_PROJECTS=""
# Test all modules with spark 2.0.0 and scala 2.11
- jdk: "oraclejdk7"
@ -58,18 +58,6 @@ matrix:
- jdk: "oraclejdk7"
env: SCALA_VER="2.10" SPARK_VER="1.4.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.4 -Pr -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,r -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
# Test spark module for 1.3.1
- jdk: "oraclejdk7"
env: SCALA_VER="2.10" SPARK_VER="1.3.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.3 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
# Test spark module for 1.2.2
- jdk: "oraclejdk7"
env: SCALA_VER="2.10" SPARK_VER="1.2.2" HADOOP_VER="2.3" PROFILE="-Pspark-1.2 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
# Test spark module for 1.1.1
- jdk: "oraclejdk7"
env: SCALA_VER="2.10" SPARK_VER="1.1.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.1 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
# Test selenium with spark module for 1.6.1
- jdk: "oraclejdk7"
env: TEST_SELENIUM="true" SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark -Pexamples" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.AbstractFunctionalSuite -DfailIfNoTests=false"

View file

@ -2,7 +2,7 @@
**Documentation:** [User Guide](http://zeppelin.apache.org/docs/latest/index.html)<br/>
**Mailing Lists:** [User and Dev mailing list](http://zeppelin.apache.org/community.html)<br/>
**Continuous Integration:** [![Build Status](https://secure.travis-ci.org/apache/zeppelin.png?branch=master)](https://travis-ci.org/apache/zeppelin) <br/>
**Continuous Integration:** [![Build Status](https://travis-ci.org/apache/zeppelin.svg?branch=master)](https://travis-ci.org/apache/zeppelin) <br/>
**Contributing:** [Contribution Guide](https://zeppelin.apache.org/contribution/contributions.html)<br/>
**Issue Tracker:** [Jira](https://issues.apache.org/jira/browse/ZEPPELIN)<br/>
**License:** [Apache 2.0](https://github.com/apache/zeppelin/blob/master/LICENSE)
@ -128,9 +128,6 @@ Available profiles are
-Pspark-1.6
-Pspark-1.5
-Pspark-1.4
-Pspark-1.3
-Pspark-1.2
-Pspark-1.1
-Pcassandra-spark-1.5
-Pcassandra-spark-1.4
-Pcassandra-spark-1.3
@ -192,7 +189,7 @@ enable 3rd party vendor repository (cloudera)
##### `-Pmapr[version]` (optional)
For the MapR Hadoop Distribution, these profiles will handle the Hadoop version. As MapR allows different versions of Spark to be installed, you should specify which version of Spark is installed on the cluster by adding a Spark profile (`-Pspark-1.2`, `-Pspark-1.3`, etc.) as needed.
For the MapR Hadoop Distribution, these profiles will handle the Hadoop version. As MapR allows different versions of Spark to be installed, you should specify which version of Spark is installed on the cluster by adding a Spark profile (`-Pspark-1.6`, `-Pspark-2.0`, etc.) as needed.
The correct Maven artifacts can be found for every version of MapR at http://doc.mapr.com
Available profiles are

View file

@ -135,7 +135,7 @@ public class AlluxioInterpreter extends Interpreter {
private String[] splitAndRemoveEmpty(String st, String splitSeparator) {
String[] voices = st.split(splitSeparator);
ArrayList<String> result = new ArrayList<String>();
ArrayList<String> result = new ArrayList<>();
for (String voice : voices) {
if (!voice.trim().isEmpty()) {
result.add(voice);
@ -145,7 +145,7 @@ public class AlluxioInterpreter extends Interpreter {
}
private String[] splitAndRemoveEmpty(String[] sts, String splitSeparator) {
ArrayList<String> result = new ArrayList<String>();
ArrayList<String> result = new ArrayList<>();
for (String st : sts) {
result.addAll(Arrays.asList(splitAndRemoveEmpty(st, splitSeparator)));
}

View file

@ -16,6 +16,9 @@
"defaultValue": "19998",
"description": "Alluxio master port"
}
},
"editor": {
"editOnDblClick": false
}
}
]
]

View file

@ -93,7 +93,7 @@ public class AlluxioInterpreterTest {
List expectedResultThree = Arrays.asList(
new InterpreterCompletion("copyFromLocal", "copyFromLocal"),
new InterpreterCompletion("copyToLocal", "copyToLocal"));
List expectedResultNone = new ArrayList<String>();
List expectedResultNone = new ArrayList<>();
List<InterpreterCompletion> resultOne = alluxioInterpreter.completion("c", 0);
List<InterpreterCompletion> resultTwo = alluxioInterpreter.completion("co", 0);

View file

@ -4,7 +4,9 @@
"name": "angular",
"className": "org.apache.zeppelin.angular.AngularInterpreter",
"properties": {
},
"editor": {
"editOnDblClick": true
}
}
]

View file

@ -5,7 +5,9 @@
"className": "org.apache.zeppelin.beam.BeamInterpreter",
"defaultInterpreter": true,
"properties": {
},
"editor": {
"editOnDblClick": false
}
}
]

View file

@ -24,7 +24,8 @@
}
},
"editor": {
"language": "sql"
"language": "sql",
"editOnDblClick": false
}
}
]

View file

@ -190,6 +190,9 @@
"defaultValue": "true",
"description": "Cassandra socket TCP no delay. Default = true"
}
},
"editor": {
"editOnDblClick": false
}
}
]

View file

@ -45,6 +45,10 @@ user3 = password4, role2
#ldapRealm.userDnTemplate = uid={0},ou=Users,dc=COMPANY,dc=COM
#ldapRealm.contextFactory.authenticationMechanism = SIMPLE
### A sample PAM configuration
#pamRealm=org.apache.zeppelin.realm.PamRealm
#pamRealm.service=sshd
### A sample for configuring ZeppelinHub Realm
#zeppelinHubRealm = org.apache.zeppelin.realm.ZeppelinHubRealm
## Url of ZeppelinHub
@ -78,5 +82,5 @@ admin = *
#/api/interpreter/** = authc, roles[admin]
#/api/configurations/** = authc, roles[admin]
#/api/credential/** = authc, roles[admin]
/** = anon
#/** = authc
#/** = anon
/** = authc

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

View file

@ -34,7 +34,7 @@ Interpreters in the same InterpreterGroup can reference each other. For example,
[InterpreterSetting](https://github.com/apache/zeppelin/blob/master/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java) is configuration of a given [InterpreterGroup](https://github.com/apache/zeppelin/blob/master/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java) and a unit of start/stop interpreter.
All Interpreters in the same InterpreterSetting are launched in a single, separate JVM process. The Interpreter communicates with Zeppelin engine via **[Thrift](https://github.com/apache/zeppelin/blob/master/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift)**.
In 'Separate Interpreter(scoped / isolated) for each note' mode which you can see at the **Interpreter Setting** menu when you create a new interpreter, new interpreter instance will be created per notebook. But it still runs on the same JVM while they're in the same InterpreterSettings.
In 'Separate Interpreter(scoped / isolated) for each note' mode which you can see at the **Interpreter Setting** menu when you create a new interpreter, new interpreter instance will be created per note. But it still runs on the same JVM while they're in the same InterpreterSettings.
## Make your own Interpreter
@ -48,7 +48,7 @@ There are three locations where you can store your interpreter group, name and o
{ZEPPELIN_INTERPRETER_DIR}/{YOUR_OWN_INTERPRETER_DIR}/interpreter-setting.json
```
Here is an example of `interpreter-setting.json` on your own interpreter. Note that if you don't specify editor object, your interpreter will use plain text mode for syntax highlighting.
Here is an example of `interpreter-setting.json` on your own interpreter.
```json
[
@ -71,7 +71,8 @@ Here is an example of `interpreter-setting.json` on your own interpreter. Note t
}, ...
},
"editor": {
"language": "your-syntax-highlight-language"
"language": "your-syntax-highlight-language",
"editOnDblClick": false
}
},
{
@ -98,15 +99,18 @@ The name of the interpreter is what you later write to identify a paragraph whic
some interpreter specific code...
```
## Programming Languages for Interpreter
If the interpreter uses a specific programming language (like Scala, Python, SQL), it is generally recommended to add a syntax highlighting supported for that to the notebook paragraph editor.
## Editor setting for Interpreter
You can add `editor` object to `interpreter-setting.json` file to specify paragraph editor settings.
To check out the list of languages supported, see the `mode-*.js` files under `zeppelin-web/bower_components/ace-builds/src-noconflict` or from [github.com/ajaxorg/ace-builds](https://github.com/ajaxorg/ace-builds/tree/master/src-noconflict).
### Language
If the interpreter uses a specific programming language (like Scala, Python, SQL), it is generally recommended to add a syntax highlighting supported for that to the note paragraph editor.
To check out the list of languages supported, see the `mode-*.js` files under `zeppelin-web/bower_components/ace-builds/src-noconflict` or from [github.com/ajaxorg/ace-builds](https://github.com/ajaxorg/ace-builds/tree/master/src-noconflict).
If you want to add a new set of syntax highlighting,
1. Add the `mode-*.js` file to <code>[zeppelin-web/bower.json](https://github.com/apache/zeppelin/blob/master/zeppelin-web/bower.json)</code> ( when built, <code>[zeppelin-web/src/index.html](https://github.com/apache/zeppelin/blob/master/zeppelin-web/src/index.html)</code> will be changed automatically. ).
2. Add `editor` object to `interpreter-setting.json` file. If you want to set your language to `java` for example, add:
1. Add the `mode-*.js` file to <code>[zeppelin-web/bower.json](https://github.com/apache/zeppelin/blob/master/zeppelin-web/bower.json)</code> (when built, <code>[zeppelin-web/src/index.html](https://github.com/apache/zeppelin/blob/master/zeppelin-web/src/index.html)</code> will be changed automatically).
2. Add `language` field to `editor` object. Note that if you don't specify language field, your interpreter will use plain text mode for syntax highlighting. Let's say you want to set your language to `java`, then add:
```
"editor": {
@ -114,6 +118,14 @@ If you want to add a new set of syntax highlighting,
}
```
### Edit on double click
If your interpreter uses mark-up language such as markdown or HTML, set `editOnDblClick` to `true` so that text editor opens on pargraph double click and closes on paragraph run. Otherwise set it to `false`.
```
"editor": {
"editOnDblClick": false
}
```
## Install your interpreter binary
Once you have built your interpreter, you can place it under the interpreter directory with all its dependencies.
@ -150,7 +162,7 @@ Now you are done and ready to use your interpreter.
## Use your interpreter
### 0.5.0
Inside of a notebook, `%[INTERPRETER_NAME]` directive will call your interpreter.
Inside of a note, `%[INTERPRETER_NAME]` directive will call your interpreter.
Note that the first interpreter configuration in zeppelin.interpreters will be the default one.
For example,
@ -163,7 +175,7 @@ println(a)
```
### 0.6.0 and later
Inside of a notebook, `%[INTERPRETER_GROUP].[INTERPRETER_NAME]` directive will call your interpreter.
Inside of a note, `%[INTERPRETER_GROUP].[INTERPRETER_NAME]` directive will call your interpreter.
You can omit either [INTERPRETER\_GROUP] or [INTERPRETER\_NAME]. If you omit [INTERPRETER\_NAME], then first available interpreter will be selected in the [INTERPRETER\_GROUP].
Likewise, if you skip [INTERPRETER\_GROUP], then [INTERPRETER\_NAME] will be chosen from default interpreter group.
@ -216,7 +228,7 @@ Checkout some interpreters released with Zeppelin by default.
We welcome contribution to a new interpreter. Please follow these few steps:
- First, check out the general contribution guide [here](https://zeppelin.apache.org/contribution/contributions.html).
- Follow the steps in [Make your own Interpreter](#make-your-own-interpreter) section above.
- Follow the steps in [Make your own Interpreter](#make-your-own-interpreter) section and [Editor setting for Interpreter](#editor-setting-for-interpreter) above.
- Add your interpreter as in the [Configure your interpreter](#configure-your-interpreter) section above; also add it to the example template [zeppelin-site.xml.template](https://github.com/apache/zeppelin/blob/master/conf/zeppelin-site.xml.template).
- Add tests! They are run by [Travis](https://travis-ci.org/apache/zeppelin) for all changes and it is important that they are self-contained.
- Include your interpreter as a module in [`pom.xml`](https://github.com/apache/zeppelin/blob/master/pom.xml).

View file

@ -39,6 +39,13 @@ With `%html` directive, Zeppelin treats your output as HTML
<img src="/assets/themes/zeppelin/img/screenshots/display_html.png" />
### Mathematical expressions
HTML display system automatically formats mathematical expression using [MathJax](https://www.mathjax.org/). You can use
`\\( INLINE EXPRESSION \\)` and `$$ EXPRESSION $$` to format. For example
<img src="/assets/themes/zeppelin/img/screenshots/display_formula.png" />
## Table
If you have data that row separated by '\n' (newline) and column separated by '\t' (tab) with first row as header row, for example

View file

@ -55,7 +55,7 @@ Stable binary packages are available on the [Apache Zeppelin Download Page](http
If you downloaded the default package, just unpack it in a directory of your choice and you're ready to go. If you downloaded the *net-install* package, you should manually [install additional interpreters](../manual/interpreterinstallation.html) first. You can also install everything by running `./bin/install-interpreter.sh --all`.
After unpacking, jump to the [Starting Apache Zeppelin with Command Line](#starting-apache-zeppelin-with-command-line).
After unpacking, jump to the [Starting Apache Zeppelin from Command Line](#starting-apache-zeppelin-from-the-command-line).
### Building from Source
@ -178,7 +178,7 @@ chdir /usr/share/zeppelin
exec bin/zeppelin-daemon.sh upstart
```
## Next Steps:
## Next Steps
Congratulations, you have successfully installed Apache Zeppelin! Here are two next steps you might find useful:

View file

@ -52,3 +52,4 @@ So, copying `notebook` and `conf` directory should be enough.
- From 0.7, we don't use `ZEPPELIN_JAVA_OPTS` as default value of `ZEPPELIN_INTP_JAVA_OPTS` and also the same for `ZEPPELIN_MEM`/`ZEPPELIN_INTP_MEM`. If user want to configure the jvm opts of interpreter process, please set `ZEPPELIN_INTP_JAVA_OPTS` and `ZEPPELIN_INTP_MEM` explicitly. If you don't set `ZEPPELIN_INTP_MEM`, Zeppelin will set it to `-Xms1024m -Xmx1024m -XX:MaxPermSize=512m` by default.
- Mapping from `%jdbc(prefix)` to `%prefix` is no longer available. Instead, you can use %[interpreter alias] with multiple interpreter setttings on GUI.
- Usage of `ZEPPELIN_PORT` is not supported in ssl mode. Instead use `ZEPPELIN_SSL_PORT` to configure the ssl port. Value from `ZEPPELIN_PORT` is used only when `ZEPPELIN_SSL` is set to `false`.
- The support on Spark 1.1.x to 1.3.x is deprecated.

220
docs/interpreter/mahout.md Normal file
View file

@ -0,0 +1,220 @@
---
layout: page
title: "Mahout Interpreter for Apache Zeppelin"
description: "Apache Mahout provides a unified API (the R-Like Scala DSL) for quickly creating machine learning algorithms on a variety of engines."
group: interpreter
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
{% include JB/setup %}
# Apache Mahout Interpreter for Apache Zeppelin
<div id="toc"></div>
## Installation
Apache Mahout is a collection of packages that enable machine learning and matrix algebra on underlying engines such as Apache Flink or Apache Spark. A convenience script for creating and configuring two Mahout enabled interpreters exists. The `%sparkMahout` and `%flinkMahout` interpreters do not exist by default but can be easily created using this script.
### Easy Installation
To quickly and easily get up and running using Apache Mahout, run the following command from the top-level directory of the Zeppelin install:
```bash
python scripts/mahout/add_mahout.py
```
This will create the `%sparkMahout` and `%flinkMahout` interpreters, and restart Zeppelin.
### Advanced Installation
The `add_mahout.py` script contains several command line arguments for advanced users.
<table class="table-configuration">
<tr>
<th>Argument</th>
<th>Description</th>
<th>Example</th>
</tr>
<tr>
<td>--zeppelin_home</td>
<td>This is the path to the Zeppelin installation. This flag is not needed if the script is run from the top-level installation directory or from the `zeppelin/scripts/mahout` directory.</td>
<td>/path/to/zeppelin</td>
</tr>
<tr>
<td>--mahout_home</td>
<td>If the user has already installed Mahout, this flag can set the path to `MAHOUT_HOME`. If this is set, downloading Mahout will be skipped.</td>
<td>/path/to/mahout_home</td>
</tr>
<tr>
<td>--restart_later</td>
<td>Restarting is necessary for updates to take effect. By default the script will restart Zeppelin for you- restart will be skipped if this flag is set.</td>
<td>NA</td>
</tr>
<tr>
<td>--force_download</td>
<td>This flag will force the script to re-download the binary even if it already exists. This is useful for previously failed downloads.</td>
<td>NA</td>
</tr>
<tr>
<td>--overwrite_existing</td>
<td>This flag will force the script to overwrite existing `%sparkMahout` and `%flinkMahout` interpreters. Useful when you want to just start over.</td>
<td>NA</td>
</tr>
</table>
__NOTE 1:__ Apache Mahout at this time only supports Spark 1.5 and Spark 1.6 and Scala 2.10. If the user is using another version of Spark (e.g. 2.0), the `%sparkMahout` will likely not work. The `%flinkMahout` interpreter will still work and the user is encouraged to develop with that engine as the code can be ported via copy and paste, as is evidenced by the tutorial notebook.
__NOTE 2:__ If using Apache Flink in cluster mode, the following libraries will also need to be coppied to `${FLINK_HOME}/lib`
- mahout-math-0.12.2.jar
- mahout-math-scala_2.10-0.12.2.jar
- mahout-flink_2.10-0.12.2.jar
- mahout-hdfs-0.12.2.jar
- [com.google.guava:guava:14.0.1](http://central.maven.org/maven2/com/google/guava/guava/14.0.1/guava-14.0.1.jar)
## Overview
The [Apache Mahout](http://mahout.apache.org/)™ project's goal is to build an environment for quickly creating scalable performant machine learning applications.
Apache Mahout software provides three major features:
- A simple and extensible programming environment and framework for building scalable algorithms
- A wide variety of premade algorithms for Scala + Apache Spark, H2O, Apache Flink
- Samsara, a vector math experimentation environment with R-like syntax which works at scale
In other words:
*Apache Mahout provides a unified API for quickly creating machine learning algorithms on a variety of engines.*
## How to use
When starting a session with Apache Mahout, depending on which engine you are using (Spark or Flink), a few imports must be made and a _Distributed Context_ must be declared. Copy and paste the following code and run once to get started.
### Flink
```scala
%flinkMahout
import org.apache.flink.api.scala._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.flinkbindings._
import org.apache.mahout.math._
import scalabindings._
import RLikeOps._
implicit val ctx = new FlinkDistributedContext(benv)
```
### Spark
```scala
%sparkMahout
import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._
implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)
```
### Same Code, Different Engines
After importing and setting up the distributed context, the Mahout R-Like DSL is consistent across engines. The following code will run in both `%flinkMahout` and `%sparkMahout`
```scala
val drmData = drmParallelize(dense(
(2, 2, 10.5, 10, 29.509541), // Apple Cinnamon Cheerios
(1, 2, 12, 12, 18.042851), // Cap'n'Crunch
(1, 1, 12, 13, 22.736446), // Cocoa Puffs
(2, 1, 11, 13, 32.207582), // Froot Loops
(1, 2, 12, 11, 21.871292), // Honey Graham Ohs
(2, 1, 16, 8, 36.187559), // Wheaties Honey Gold
(6, 2, 17, 1, 50.764999), // Cheerios
(3, 2, 13, 7, 40.400208), // Clusters
(3, 3, 13, 4, 45.811716)), numPartitions = 2)
drmData.collect(::, 0 until 4)
val drmX = drmData(::, 0 until 4)
val y = drmData.collect(::, 4)
val drmXtX = drmX.t %*% drmX
val drmXty = drmX.t %*% y
val XtX = drmXtX.collect
val Xty = drmXty.collect(::, 0)
val beta = solve(XtX, Xty)
```
## Leveraging Resource Pools and R for Visualization
Resource Pools are a powerful Zeppelin feature that lets us share information between interpreters. A fun trick is to take the output of our work in Mahout and analyze it in other languages.
### Setting up a Resource Pool in Flink
In Spark based interpreters resource pools are accessed via the ZeppelinContext API. To put and get things from the resource pool one can be done simple
```scala
val myVal = 1
z.put("foo", myVal)
val myFetchedVal = z.get("foo")
```
To add this functionality to a Flink based interpreter we declare the follwoing
```scala
%flinkMahout
import org.apache.zeppelin.interpreter.InterpreterContext
val z = InterpreterContext.get().getResourcePool()
```
Now we can access the resource pool in a consistent manner from the `%flinkMahout` interpreter.
### Passing a variable from Mahout to R and Plotting
In this simple example, we use Mahout (on Flink or Spark, the code is the same) to create a random matrix and then take the Sin of each element. We then randomly sample the matrix and create a tab separated string. Finally we pass that string to R where it is read as a .tsv file, and a DataFrame is created and plotted using native R plotting libraries.
```scala
val mxRnd = Matrices.symmetricUniformView(5000, 2, 1234)
val drmRand = drmParallelize(mxRnd)
val drmSin = drmRand.mapBlock() {case (keys, block) =>
val blockB = block.like()
for (i <- 0 until block.nrow) {
blockB(i, 0) = block(i, 0)
blockB(i, 1) = Math.sin((block(i, 0) * 8))
}
keys -> blockB
}
z.put("sinDrm", org.apache.mahout.math.drm.drmSampleToTSV(drmSin, 0.85))
```
And then in an R paragraph...
```r
%spark.r {"imageWidth": "400px"}
library("ggplot2")
sinStr = z.get("flinkSinDrm")
data <- read.table(text= sinStr, sep="\t", header=FALSE)
plot(data, col="red")
```

View file

@ -53,6 +53,12 @@ The following example demonstrates the basic usage of Markdown in a Zeppelin not
<img src="../assets/themes/zeppelin/img/docs-img/markdown-example.png" width="70%" />
## Mathematical expression
Markdown interpreter leverages %html display system internally. That means you can mix mathematical expressions with markdown syntax. For more information, please see [Mathematical Expression](../displaysystem/basicdisplaysystem.html#mathematical-expressions) section.
### Markdown4j Parser
`markdown4j` parser provides [YUML](http://yuml.me/) and [Websequence](https://www.websequencediagrams.com/) extensions

View file

@ -86,9 +86,26 @@ print("".join(z.checkbox("f3", [("o1","1"), ("o2","2")],["1"])))
* Code-completion is currently not implemented.
## Matplotlib integration
The python interpreter can display matplotlib graph with the function `z.show()`.
You need to have matplotlib module installed and a XServer running to use this functionality!
The python interpreter can display matplotlib figures inline automatically using the `pyplot` module:
```python
%python
import matplotlib.pyplot as plt
plt.plot([1, 2, 3])
```
This is the recommended method for using matplotlib from within a Zeppelin notebook. The output of this command will by default be converted to HTML by implicitly making use of the `%html` magic. Additional configuration can be achieved using the builtin `z.configure_mpl()` method. For example,
```python
z.configure_mpl(width=400, height=300, fmt='svg')
plt.plot([1, 2, 3])
```
Will produce a 400x300 image in SVG format, which by default are normally 600x400 and PNG respectively. In the future, another option called `angular` can be used to make it possible to update a plot produced from one paragraph directly from another (the output will be `%angular` instead of `%html`). However, this feature is already available in the `pyspark` interpreter. More details can be found in the included "Zeppelin Tutorial: Python - matplotlib basic" tutorial notebook.
If Zeppelin cannot find the matplotlib backend files (which should usually be found in `$ZEPPELIN_HOME/interpreter/lib/python`) in your `PYTHONPATH`, then the backend will automatically be set to agg, and the (otherwise deprecated) instructions below can be used for more limited inline plotting.
If you are unable to load the inline backend, use `z.show(plt)`:
```python
%python
import matplotlib.pyplot as plt

View file

@ -363,6 +363,11 @@ select * from ${table=defaultTableName} where text like '%${search}%'
To learn more about dynamic form, checkout [Dynamic Form](../manual/dynamicform.html).
## Matplotlib Integration (pyspark)
Both the `python` and `pyspark` interpreters have built-in support for inline visualization using `matplotlib`, a popular plotting library for python. More details can be found in the [python interpreter documentation](../interpreter/python.html), since matplotlib support is identical. More advanced interactive plotting can be done with pyspark through utilizing Zeppelin's built-in [Angular Display System](../displaysystem/back-end-angular.html), as shown below:
<img class="img-responsive" src="../assets/themes/zeppelin/img/docs-img/matplotlibAngularExample.gif" />
## Interpreter setting option
You can choose one of `shared`, `scoped` and `isolated` options wheh you configure Spark interpreter. Spark interpreter creates separated Scala compiler per each notebook but share a single SparkContext in `scoped` mode (experimental). It creates separated SparkContext per each notebook in `isolated` mode.

View file

@ -400,6 +400,16 @@ The role of registered interpreters, settings and interpreters group are describ
<td>Fail code</td>
<td> 500 </td>
</tr>
<tr>
<td>Sample JSON input (Optional)</td>
<td>
<pre>
{
"noteId": "2AVQJVC8N"
}
</pre>
</td>
</tr>
<tr>
<td>Sample JSON response</td>
<td>

View file

@ -493,7 +493,7 @@ If you work with Apache Zeppelin and find a need for an additional REST API, ple
<col width="200">
<tr>
<td>Description</td>
<td> This ```POST``` method runs the paragraph synchronously by given note and paragraph id. This API can return SUCCESS or ERROR depending on the outcome of the paragraph execution
<td>This ```POST``` method runs the paragraph synchronously by given note and paragraph id. This API can return SUCCESS or ERROR depending on the outcome of the paragraph execution
</td>
</tr>
<tr>
@ -808,6 +808,127 @@ If you work with Apache Zeppelin and find a need for an additional REST API, ple
</tr>
</table>
<br/>
### Update paragraph configuration
<table class="table-configuration">
<col width="200">
<tr>
<td>Description</td>
<td>This ```PUT``` method update paragraph configuration using given id so that user can change paragraph setting such as graph type, show or hide editor/result and paragraph size, etc. You can update certain fields you want, for example you can update <code>colWidth</code> field only by sending request with payload <code>{"colWidth": 12.0}</code>.
</td>
</tr>
<tr>
<td>URL</td>
<td>```http://[zeppelin-server]:[zeppelin-port]/api/notebook/[noteId]/paragraph/[paragraphId]/config```</td>
</tr>
<tr>
<td>Success code</td>
<td>200</td>
</tr>
<tr>
<td>Bad Request code</td>
<td>400</td>
</tr>
<tr>
<td>Forbidden code</td>
<td>403</td>
</tr>
<tr>
<td>Not Found code</td>
<td>404</td>
</tr>
<tr>
<td>Fail code</td>
<td>500</td>
</tr>
<tr>
<td>sample JSON input</td>
<td><pre>
{
"colWidth": 6.0,
"graph": {
"mode": "lineChart",
"height": 200.0,
"optionOpen": false,
"keys": [
{
"name": "age",
"index": 0.0,
"aggr": "sum"
}
],
"values": [
{
"name": "value",
"index": 1.0,
"aggr": "sum"
}
],
"groups": [],
"scatter": {}
},
"editorHide": true,
"editorMode": "ace/mode/markdown",
"tableHide": false
}</pre></td>
</tr>
<tr>
<td>sample JSON response</td>
<td><pre>
{
"status":"OK",
"message":"",
"body":{
"text":"%sql \nselect age, count(1) value\nfrom bank \nwhere age \u003c 30 \ngroup by age \norder by age",
"config":{
"colWidth":6.0,
"graph":{
"mode":"lineChart",
"height":200.0,
"optionOpen":false,
"keys":[
{
"name":"age",
"index":0.0,
"aggr":"sum"
}
],
"values":[
{
"name":"value",
"index":1.0,
"aggr":"sum"
}
],
"groups":[],
"scatter":{}
},
"tableHide":false,
"editorMode":"ace/mode/markdown",
"editorHide":true
},
"settings":{
"params":{},
"forms":{}
},
"apps":[],
"jobName":"paragraph_1423500782552_-1439281894",
"id":"20150210-015302_1492795503",
"result":{
"code":"SUCCESS",
"type":"TABLE",
"msg":"age\tvalue\n19\t4\n20\t3\n21\t7\n22\t9\n23\t20\n24\t24\n25\t44\n26\t77\n27\t94\n28\t103\n29\t97\n"
},
"dateCreated":"Feb 10, 2015 1:53:02 AM",
"dateStarted":"Jul 3, 2015 1:43:17 PM",
"dateFinished":"Jul 3, 2015 1:43:23 PM",
"status":"FINISHED",
"progressUpdateIntervalMs":500
}
}</pre></td>
</tr>
</table>
<br/>
### Move a paragraph to the specific index
<table class="table-configuration">
@ -835,7 +956,6 @@ If you work with Apache Zeppelin and find a need for an additional REST API, ple
</tr>
</table>
<br/>
### Delete a paragraph
<table class="table-configuration">
@ -934,7 +1054,8 @@ If you work with Apache Zeppelin and find a need for an additional REST API, ple
<td> Fail code</td>
<td> 500 </td>
</tr>
<td>sample JSON input</td>
<tr>
<td>sample JSON input</td>
<td><pre>
{
"paragraphs": [
@ -961,6 +1082,7 @@ If you work with Apache Zeppelin and find a need for an additional REST API, ple
"config": {},
"info": {}
}</pre></td>
</tr>
<tr>
<td>sample JSON response</td>
<td><pre>
@ -970,5 +1092,40 @@ If you work with Apache Zeppelin and find a need for an additional REST API, ple
"body": "2AZPHY918"
}</pre></td>
</tr>
</table>
<br />
### Clear all paragraph result
<table class="table-configuration">
<col width="200">
<tr>
<td>Description</td>
<td>This ```PUT``` method clear all paragraph results from note of given id.
</td>
</tr>
<tr>
<td>URL</td>
<td>```http://[zeppelin-server]:[zeppelin-port]/api/notebook/[noteId]/clear```</td>
</tr>
<tr>
<td>Success code</td>
<td>200</td>
</tr>
<tr>
<td>Forbidden code</td>
<td>401</td>
</tr>
<tr>
<td>Not Found code</td>
<td>404</td>
</tr>
<tr>
<td>Fail code</td>
<td>500</td>
</tr>
<tr>
<td>sample JSON response</td>
<td><pre>{"status": "OK"}</pre></td>
</tr>
</tr>
</table>

View file

@ -85,7 +85,7 @@ This instruction based on Ubuntu 14.04 LTS but may work with other OS with few c
}
location /ws { # For websocket support
proxy_pass http://zeppelin;
proxy_pass http://zeppelin/ws;
proxy_http_version 1.1;
proxy_set_header Upgrade websocket;
proxy_set_header Connection upgrade;
@ -130,4 +130,4 @@ This instruction based on Ubuntu 14.04 LTS but may work with other OS with few c
Another option is to have an authentication server that can verify user credentials in an LDAP server.
If an incoming request to the Zeppelin server does not have a cookie with user information encrypted with the authentication server public key, the user
is redirected to the authentication server. Once the user is verified, the authentication server redirects the browser to a specific URL in the Zeppelin server which sets the authentication cookie in the browser.
The end result is that all requests to the Zeppelin web server have the authentication cookie which contains user and groups information.
The end result is that all requests to the Zeppelin web server have the authentication cookie which contains user and groups information.

View file

@ -31,7 +31,11 @@ When you connect to Apache Zeppelin, you will be asked to enter your credentials
## Security Setup
You can setup **Zeppelin notebook authentication** in some simple steps.
### 1. Secure the HTTP channel
### 1. Enable Shiro
By default in `conf`, you will find `shiro.ini.template`, this file is used as an example and it is strongly recommended
to create a `shiro.ini` file by doing the following command line `cp conf/shiro.ini.template conf/shiro.ini`.
### 2. Secure the HTTP channel
To secure the HTTP channel, you have to change both **anon** and **authc** settings in `conf/shiro.ini`. In here, **anon** means "the access is anonymous" and **authc** means "formed auth security".
The default status of them is
@ -49,10 +53,10 @@ Deactivate the line "/** = anon" and activate the line "/** = authc" in `conf/sh
For the further information about `shiro.ini` file format, please refer to [Shiro Configuration](http://shiro.apache.org/configuration.html#Configuration-INISections).
### 2. Secure the Websocket channel
### 3. Secure the Websocket channel
Set to property **zeppelin.anonymous.allowed** to **false** in `conf/zeppelin-site.xml`. If you don't have this file yet, just copy `conf/zeppelin-site.xml.template` to `conf/zeppelin-site.xml`.
### 3. Start Zeppelin
### 4. Start Zeppelin
```
bin/zeppelin-daemon.sh start (or restart)
@ -60,7 +64,7 @@ bin/zeppelin-daemon.sh start (or restart)
Then you can browse Zeppelin at [http://localhost:8080](http://localhost:8080).
### 4. Login
### 5. Login
Finally, you can login using one of the below **username/password** combinations.
<center><img src="../assets/themes/zeppelin/img/docs-img/zeppelin-login.png"></center>
@ -94,7 +98,7 @@ ldapRealm.contextFactory.url = ldap://ldap.test.com:389
ldapRealm.userDnTemplate = uid={0},ou=Users,dc=COMPANY,dc=COM
ldapRealm.contextFactory.authenticationMechanism = SIMPLE
```
also define roles/groups that you want to have in system, like below;
```
@ -143,6 +147,19 @@ ldapRealm.userDnTemplate = uid={0},ou=Users,dc=COMPANY,dc=COM
ldapRealm.contextFactory.authenticationMechanism = SIMPLE
```
### PAM
[PAM](https://en.wikipedia.org/wiki/Pluggable_authentication_module) authentication support allows the reuse of existing authentication
moduls on the host where Zeppelin is running. On a typical system modules are configured per service for example sshd, passwd, etc. under `/etc/pam.d/`. You can
either reuse one of these services or create your own for Zeppelin. Activiting PAM authentication requires two parameters:
1. realm: The Shiro realm being used
2. service: The service configured under `/etc/pam.d/` to be used. The name here needs to be the same as the file name under `/etc/pam.d/`
```
[main]
pamRealm=org.apache.zeppelin.realm.PamRealm
pamRealm.service=sshd
```
### ZeppelinHub
[ZeppelinHub](https://www.zeppelinhub.com) is a service that synchronize your Apache Zeppelin notebooks and enables you to collaborate easily.
@ -159,8 +176,8 @@ securityManager.realms = $zeppelinHubRealm
> Note: ZeppelinHub is not releated to apache Zeppelin project.
## Secure your Zeppelin information (optional)
By default, anyone who defined in `[users]` can share **Interpreter Setting**, **Credential** and **Configuration** information in Apache Zeppelin.
Sometimes you might want to hide these information for your use case.
By default, anyone who defined in `[users]` can share **Interpreter Setting**, **Credential** and **Configuration** information in Apache Zeppelin.
Sometimes you might want to hide these information for your use case.
Since Shiro provides **url-based security**, you can hide the information by commenting or uncommenting these below lines in `conf/shiro.ini`.
```
@ -171,9 +188,8 @@ Since Shiro provides **url-based security**, you can hide the information by com
/api/credential/** = authc, roles[admin]
```
In this case, only who have `admin` role can see **Interpreter Setting**, **Credential** and **Configuration** information.
In this case, only who have `admin` role can see **Interpreter Setting**, **Credential** and **Configuration** information.
If you want to grant this permission to other users, you can change **roles[ ]** as you defined at `[users]` section.
<br/>
> **NOTE :** All of the above configurations are defined in the `conf/shiro.ini` file. This documentation is originally from [SECURITY-README.md](https://github.com/apache/zeppelin/blob/master/SECURITY-README.md).

View file

@ -28,6 +28,9 @@
"defaultValue": "10",
"description": "The size of the result set of a search query"
}
},
"editor": {
"editOnDblClick": false
}
}
]

View file

@ -22,6 +22,9 @@
"defaultValue": "1000",
"description": "Maximum number of lines of results fetched"
}
},
"editor": {
"editOnDblClick": false
}
}
]

View file

@ -123,7 +123,7 @@ public class HDFSFileInterpreterTest extends TestCase {
* Store command results from curl against a real file system
*/
class MockFileSystem {
HashMap<String, String> mfs = new HashMap<String, String>();
HashMap<String, String> mfs = new HashMap<>();
void addListStatusData() {
mfs.put("/?op=LISTSTATUS",
"{\"FileStatuses\":{\"FileStatus\":[\n" +

View file

@ -34,7 +34,7 @@
<description>Zeppelin flink support</description>
<properties>
<flink.version>1.1.2</flink.version>
<flink.version>1.1.3</flink.version>
<flink.akka.version>2.3.7</flink.akka.version>
<scala.macros.version>2.0.1</scala.macros.version>
</properties>

View file

@ -27,8 +27,12 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.zeppelin.interpreter.Interpreter;
@ -47,10 +51,12 @@ import scala.Option;
import scala.Some;
import scala.collection.JavaConversions;
import scala.collection.immutable.Nil;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.AbstractFunction0;
import scala.tools.nsc.Settings;
import scala.tools.nsc.interpreter.IMain;
import scala.tools.nsc.interpreter.Results;
import scala.tools.nsc.settings.MutableSettings;
import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
import scala.tools.nsc.settings.MutableSettings.PathSetting;
@ -175,12 +181,18 @@ public class FlinkInterpreter extends Interpreter {
pathSettings.v_$eq(classpath);
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
.getContextClassLoader()));
BooleanSetting b = (BooleanSetting) settings.usejavacp();
b.v_$eq(true);
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
// To prevent 'File name too long' error on some file system.
MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName();
numClassFileSetting.v_$eq(128);
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$maxClassfileName_$eq(
numClassFileSetting);
return settings;
}
@ -197,7 +209,7 @@ public class FlinkInterpreter extends Interpreter {
}
private List<File> classPath(ClassLoader cl) {
List<File> paths = new LinkedList<File>();
List<File> paths = new LinkedList<>();
if (cl == null) {
return paths;
}
@ -217,7 +229,7 @@ public class FlinkInterpreter extends Interpreter {
public Object getLastObject() {
Object obj = imain.lastRequest().lineRep().call(
"$result",
JavaConversions.asScalaBuffer(new LinkedList<Object>()));
JavaConversions.asScalaBuffer(new LinkedList<>()));
return obj;
}
@ -334,6 +346,20 @@ public class FlinkInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
if (localMode()) {
// In localMode we can cancel all running jobs,
// because the local cluster can only run one job at the time.
for (JobID job : this.localFlinkCluster.getCurrentlyRunningJobsJava()) {
logger.info("Stop job: " + job);
cancelJobLocalMode(job);
}
}
}
private void cancelJobLocalMode(JobID jobID){
FiniteDuration timeout = AkkaUtils.getTimeout(this.localFlinkCluster.configuration());
ActorGateway leader = this.localFlinkCluster.getLeaderGateway(timeout);
leader.ask(new JobManagerMessages.CancelJob(jobID), timeout);
}
@Override

View file

@ -18,7 +18,8 @@
}
},
"editor": {
"language": "scala"
"language": "scala",
"editOnDblClick": false
}
}
]

View file

@ -20,6 +20,9 @@
"defaultValue": "false",
"description": "Disable checks for unit and manual tests"
}
},
"editor": {
"editOnDblClick": false
}
}
]

View file

@ -178,7 +178,7 @@ public class IgniteInterpreter extends Interpreter {
public Object getLastObject() {
Object obj = imain.lastRequest().lineRep().call(
"$result",
JavaConversions.asScalaBuffer(new LinkedList<Object>()));
JavaConversions.asScalaBuffer(new LinkedList<>()));
return obj;
}

View file

@ -0,0 +1,312 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file provides a static (non-interactive) matplotlib plotting backend
# for zeppelin notebooks for use with the python/pyspark interpreters
from __future__ import print_function
import uuid
import warnings
import base64
from io import BytesIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
import mpl_config
import matplotlib
from matplotlib._pylab_helpers import Gcf
from matplotlib.backends.backend_agg import new_figure_manager, FigureCanvasAgg
from matplotlib.backend_bases import ShowBase, FigureManagerBase
from matplotlib.figure import Figure
########################################################################
#
# The following functions and classes are for pylab and implement
# window/figure managers, etc...
#
########################################################################
class Show(ShowBase):
"""
A callable object that displays the figures to the screen. Valid kwargs
include figure width and height (in units supported by the div tag), block
(allows users to override blocking behavior regardless of whether or not
interactive mode is enabled, currently unused) and close (Implicitly call
matplotlib.pyplot.close('all') with each call to show()).
"""
def __call__(self, close=None, block=None, **kwargs):
if close is None:
close = mpl_config.get('close')
try:
managers = Gcf.get_all_fig_managers()
if not managers:
return
# Tell zeppelin that the output will be html using the %html magic
# We want to do this only once to avoid seeing "%html" printed
# directly to the outout when multiple figures are displayed from
# one paragraph.
if mpl_config.get('angular'):
print('%angular')
else:
print('%html')
# Show all open figures
for manager in managers:
manager.show(**kwargs)
finally:
# This closes all the figures if close is set to True.
if close and Gcf.get_all_fig_managers():
Gcf.destroy_all()
class FigureCanvasZInline(FigureCanvasAgg):
"""
The canvas the figure renders into. Calls the draw and print fig
methods, creates the renderers, etc...
"""
def get_bytes(self, **kwargs):
"""
Get the byte representation of the figure.
Should only be used with jpg/png formats.
"""
# Make sure format is correct
fmt = kwargs.get('format', mpl_config.get('format'))
if fmt == 'svg':
raise ValueError("get_bytes() does not support svg, use png or jpg")
# Express the image as bytes
buf = BytesIO()
self.print_figure(buf, **kwargs)
byte_str = b"data:image/%s;base64," %fmt
byte_str += base64.b64encode(buf.getvalue())
# Python3 forces all strings to default to unicode, but for raster image
# formats (eg png, jpg), we want to work with bytes. Thus this step is
# needed to ensure compatability for all python versions.
byte_str = byte_str.decode('ascii')
buf.close()
return byte_str
def get_svg(self, **kwargs):
"""
Get the svg representation of the figure.
Should only be used with svg format.
"""
# Make sure format is correct
fmt = kwargs.get('format', mpl_config.get('format'))
if fmt != 'svg':
raise ValueError("get_svg() does not support png or jpg, use svg")
# For SVG the data string has to be unicode, not bytes
buf = StringIO()
self.print_figure(buf, **kwargs)
svg_str = buf.getvalue()
buf.close()
return svg_str
def draw_idle(self, *args, **kwargs):
"""
Called when the figure gets updated (eg through a plotting command).
This is overriden to allow open figures to be reshown after they
are updated when mpl_config.get('close') is False.
"""
if not self._is_idle_drawing:
with self._idle_draw_cntx():
self.draw(*args, **kwargs)
draw_if_interactive()
class FigureManagerZInline(FigureManagerBase):
"""
Wrap everything up into a window for the pylab interface
"""
def __init__(self, canvas, num):
FigureManagerBase.__init__(self, canvas, num)
self.fig_id = "figure_{0}".format(uuid.uuid4().hex)
self._shown = False
def angular_bind(self, **kwargs):
"""
Bind figure data to Zeppelin's Angular Object Registry.
If mpl_config("angular") is True and PY4J is supported, this allows
for the possibility to interactively update a figure from a separate
paragraph without having to display it multiple times.
"""
# This doesn't work for SVG so make sure it's not our format
fmt = kwargs.get('format', mpl_config.get('format'))
if fmt == 'svg':
return
# Get the figure data as a byte array
src = self.canvas.get_bytes(**kwargs)
# Flag to determine whether or not to use
# zeppelin's angular display system
angular = mpl_config.get('angular')
# ZeppelinContext instance (requires PY4J)
context = mpl_config.get('context')
# Finally we must ensure that automatic closing is set to False,
# as otherwise using the angular display system is pointless
close = mpl_config.get('close')
# If above conditions are met, bind the figure data to
# the Angular Object Registry.
if not close and angular:
if hasattr(context, 'angularBind'):
# Binding is performed through figure ID to ensure this works
# if multiple figures are open
context.angularBind(self.fig_id, src)
# Zeppelin will automatically replace this value even if it
# is updated from another pargraph thanks to the {{}} notation
src = "{{%s}}" %self.fig_id
else:
warnings.warn("Cannot bind figure to Angular Object Registry. "
"Check if PY4J is installed.")
return src
def angular_unbind(self):
"""
Unbind figure from angular display system.
"""
context = mpl_config.get('context')
if hasattr(context, 'angularUnbind'):
context.angularUnbind(self.fig_id)
def destroy(self):
"""
Called when close=True or implicitly by pyplot.close().
Overriden to automatically clean up the angular object registry.
"""
self.angular_unbind()
def show(self, **kwargs):
if not self._shown:
zdisplay(self.canvas.figure, **kwargs)
else:
self.canvas.draw_idle()
self.angular_bind(**kwargs)
self._shown = True
def draw_if_interactive():
"""
If interactive mode is on, this allows for updating properties of
the figure when each new plotting command is called.
"""
manager = Gcf.get_active()
interactive = matplotlib.is_interactive()
angular = mpl_config.get('angular')
# Don't bother continuing if we aren't in interactive mode
# or if there are no active figures. Also pointless to continue
# in angular mode as we don't want to reshow the figure.
if not interactive or angular or manager is None:
return
# Allow for figure to be reshown if close is false since
# this function call implies that it has been updated
if not mpl_config.get('close'):
manager._shown = False
def new_figure_manager(num, *args, **kwargs):
"""
Create a new figure manager instance
"""
# if a main-level app must be created, this (and
# new_figure_manager_given_figure) is the usual place to
# do it -- see backend_wx, backend_wxagg and backend_tkagg for
# examples. Not all GUIs require explicit instantiation of a
# main-level app (egg backend_gtk, backend_gtkagg) for pylab
FigureClass = kwargs.pop('FigureClass', Figure)
thisFig = FigureClass(*args, **kwargs)
return new_figure_manager_given_figure(num, thisFig)
def new_figure_manager_given_figure(num, figure):
"""
Create a new figure manager instance for the given figure.
"""
canvas = FigureCanvasZInline(figure)
manager = FigureManagerZInline(canvas, num)
return manager
########################################################################
#
# Backend specific functions
#
########################################################################
def zdisplay(fig, **kwargs):
"""
Publishes a matplotlib figure to the notebook paragraph output.
"""
# kwargs can be width or height (in units supported by div tag)
width = kwargs.pop('width', 'auto')
height = kwargs.pop('height', 'auto')
fmt = kwargs.get('format', mpl_config.get('format'))
# Check if format is supported
supported_formats = mpl_config.get('supported_formats')
if fmt not in supported_formats:
raise ValueError("Unsupported format %s" %fmt)
# For SVG the data string has to be unicode, not bytes
if fmt == 'svg':
img = fig.canvas.get_svg(**kwargs)
# This is needed to ensure the SVG image is the correct size.
# We should find a better way to do this...
width = '{}px'.format(mpl_config.get('width'))
height = '{}px'.format(mpl_config.get('height'))
else:
# Express the image as bytes
src = fig.canvas.manager.angular_bind(**kwargs)
img = "<img src={src} style='width={width};height:{height}'>"
img = img.format(src=src, width=width, height=height)
# Print the image to the notebook paragraph via the %html magic
html = "<div style='width:{width};height:{height}'>{img}<div>"
print(html.format(width=width, height=height, img=img))
def displayhook():
"""
Called post paragraph execution if interactive mode is on
"""
if matplotlib.is_interactive():
show()
########################################################################
#
# Now just provide the standard names that backend.__init__ is expecting
#
########################################################################
# Create a reference to the show function we are using. This is what actually
# gets called by matplotlib.pyplot.show().
show = Show()
# Default FigureCanvas and FigureManager classes to use from the backend
FigureCanvas = FigureCanvasZInline
FigureManager = FigureManagerZInline

View file

@ -0,0 +1,95 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This module provides utitlites for users to configure the inline plotting
# backend through a PyZeppelinContext instance (eg, through z.configure_mpl())
import matplotlib
def configure(**kwargs):
"""
Generic configure function.
Usage: configure(prop1='foo', prop2='bar', ...)
Currently supported zeppelin-specific properties are:
interactive - If true show all figures without explicit call to show()
via a post-execute hook.
angular - If true, bind figures to angular display system.
close - If true, close all figures once shown.
width, height - Default width / height of the figure in pixels.
fontsize - Font size.
dpi - dpi of the figure.
fmt - Figure format
supported_formats - Supported Figure formats ()
context - ZeppelinContext instance (requires PY4J)
"""
_config.update(**kwargs)
# Broadcast relevant changes to matplotlib RC
_on_config_change()
def get(key):
"""
Get the configuration info given a key
"""
return _config[key]
def _on_config_change():
# dpi
dpi = _config['dpi']
matplotlib.rcParams['savefig.dpi'] = dpi
matplotlib.rcParams['figure.dpi'] = dpi
# Width and height
width = float(_config['width']) / dpi
height = float(_config['height']) / dpi
matplotlib.rcParams['figure.figsize'] = (width, height)
# Font size
fontsize = _config['fontsize']
matplotlib.rcParams['font.size'] = fontsize
# Default Figure Format
fmt = _config['format']
supported_formats = _config['supported_formats']
if fmt not in supported_formats:
raise ValueError("Unsupported format %s" %fmt)
matplotlib.rcParams['savefig.format'] = fmt
# Interactive mode
interactive = _config['interactive']
matplotlib.interactive(interactive)
def _init_config():
dpi = matplotlib.rcParams['savefig.dpi']
fmt = matplotlib.rcParams['savefig.format']
width, height = matplotlib.rcParams['figure.figsize']
fontsize = matplotlib.rcParams['font.size']
_config['dpi'] = dpi
_config['format'] = fmt
_config['width'] = width*dpi
_config['height'] = height*dpi
_config['fontsize'] = fontsize
_config['close'] = True
_config['interactive'] = matplotlib.is_interactive()
_config['angular'] = False
_config['supported_formats'] = ['png', 'jpg', 'svg']
_config['context'] = None
_config = {}
_init_config()

View file

@ -66,7 +66,8 @@
}
},
"editor": {
"language": "sql"
"language": "sql",
"editOnDblClick": false
}
}
]

View file

@ -48,7 +48,8 @@
}
},
"editor": {
"language": "sql"
"language": "sql",
"editOnDblClick": false
}
}
]

View file

@ -76,9 +76,9 @@ public class LensInterpreter extends Interpreter {
private static Pattern s_queryExecutePattern = Pattern.compile(".*query\\s+execute\\s+(.*)");
private static Map<String, ExecutionDetail> s_paraToQH =
new ConcurrentHashMap<String, ExecutionDetail> (); //tracks paragraphId -> Lens QueryHandle
new ConcurrentHashMap<> (); //tracks paragraphId -> Lens QueryHandle
private static Map<LensClient, Boolean> s_clientMap =
new ConcurrentHashMap<LensClient, Boolean>();
new ConcurrentHashMap<>();
private int m_maxResults;
private int m_maxThreads;

View file

@ -41,6 +41,8 @@
<assertj.version>1.7.0</assertj.version>
<mockito.version>1.9.5</mockito.version>
<livy.version>0.2.0</livy.version>
<spark.version>1.5.2</spark.version>
<hadoop.version>2.6.0</hadoop.version>
</properties>
<dependencies>
@ -111,38 +113,218 @@
<groupId>com.cloudera.livy</groupId>
<artifactId>livy-integration-test</artifactId>
<version>${livy.version}</version>
<scope>compile</scope>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.cloudera.livy</groupId>
<artifactId>livy-test-lib</artifactId>
<version>${livy.version}</version>
<scope>compile</scope>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.10</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.cloudera.livy</groupId>
<artifactId>livy-core</artifactId>
<version>${livy.version}</version>
<scope>compile</scope>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.10</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
@ -250,9 +432,10 @@
</goals>
<configuration>
<target>
<delete file="${project.build.directory}/unit-tests.log" quiet="true" />
<delete file="${project.build.directory}/jacoco.exec" quiet="true" />
<delete dir="${project.build.directory}/tmp" quiet="true" />
<delete file="${project.build.directory}/unit-tests.log"
quiet="true"/>
<delete file="${project.build.directory}/jacoco.exec" quiet="true"/>
<delete dir="${project.build.directory}/tmp" quiet="true"/>
</target>
</configuration>
</execution>
@ -265,7 +448,7 @@
</goals>
<configuration>
<target>
<mkdir dir="${project.build.directory}/tmp" />
<mkdir dir="${project.build.directory}/tmp"/>
</target>
</configuration>
</execution>

View file

@ -57,7 +57,7 @@ public class LivyHelper {
public Integer createSession(InterpreterContext context, String kind) throws Exception {
try {
Map<String, String> conf = new HashMap<String, String>();
Map<String, String> conf = new HashMap<>();
Iterator<Entry<Object, Object>> it = property.entrySet().iterator();
while (it.hasNext()) {
@ -352,16 +352,16 @@ public class LivyHelper {
ResponseEntity<String> response = null;
try {
if (method.equals("POST")) {
HttpEntity<String> entity = new HttpEntity<String>(jsonData, headers);
HttpEntity<String> entity = new HttpEntity<>(jsonData, headers);
response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class);
paragraphHttpMap.put(paragraphId, response);
} else if (method.equals("GET")) {
HttpEntity<String> entity = new HttpEntity<String>(headers);
HttpEntity<String> entity = new HttpEntity<>(headers);
response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class);
paragraphHttpMap.put(paragraphId, response);
} else if (method.equals("DELETE")) {
HttpEntity<String> entity = new HttpEntity<String>(headers);
HttpEntity<String> entity = new HttpEntity<>(headers);
response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class);
}
} catch (HttpClientErrorException e) {

View file

@ -93,8 +93,18 @@
"description": "Whether display app info"
}
},
"option": {
"remote": true,
"port": -1,
"perNote": "shared",
"perUser": "scoped",
"isExistingProcess": false,
"setPermission": false,
"users": []
},
"editor": {
"language": "scala"
"language": "scala",
"editOnDblClick": false
}
},
{
@ -114,8 +124,18 @@
"description": "Execute multiple SQL concurrently if set true."
}
},
"option": {
"remote": true,
"port": -1,
"perNote": "shared",
"perUser": "scoped",
"isExistingProcess": false,
"setPermission": false,
"users": []
},
"editor": {
"language": "sql"
"language": "sql",
"editOnDblClick": false
}
},
{
@ -124,8 +144,18 @@
"className": "org.apache.zeppelin.livy.LivyPySparkInterpreter",
"properties": {
},
"option": {
"remote": true,
"port": -1,
"perNote": "shared",
"perUser": "scoped",
"isExistingProcess": false,
"setPermission": false,
"users": []
},
"editor": {
"language": "python"
"language": "python",
"editOnDblClick": false
}
},
{
@ -134,8 +164,18 @@
"className": "org.apache.zeppelin.livy.LivySparkRInterpreter",
"properties": {
},
"option": {
"remote": true,
"port": -1,
"perNote": "shared",
"perUser": "scoped",
"isExistingProcess": false,
"setPermission": false,
"users": []
},
"editor": {
"language": "r"
"language": "r",
"editOnDblClick": false
}
}
]

View file

@ -12,7 +12,8 @@
}
},
"editor": {
"language": "markdown"
"language": "markdown",
"editOnDblClick": true
}
}
]

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -68,7 +68,7 @@ public class PigQueryInterpreter extends BasePigInterpreter {
// '-' is invalid for pig alias
String alias = "paragraph_" + context.getParagraphId().replace("-", "_");
String[] lines = st.split("\n");
List<String> queries = new ArrayList<String>();
List<String> queries = new ArrayList<>();
for (int i = 0; i < lines.length; ++i) {
if (i == lines.length - 1) {
lines[i] = alias + " = " + lines[i];

View file

@ -18,7 +18,8 @@
}
},
"editor": {
"language": "pig"
"language": "pig",
"editOnDblClick": false
}
},
{
@ -40,7 +41,8 @@
}
},
"editor": {
"language": "pig"
"language": "pig",
"editOnDblClick": false
}
}
]

View file

@ -460,6 +460,9 @@
<fileset>
<directory>interpreter</directory>
<followSymlinks>false</followSymlinks>
<excludes>
<exclude>lib/**</exclude>
</excludes>
</fileset>
</filesets>
</configuration>

View file

@ -324,7 +324,7 @@ public class PostgreSqlInterpreter extends Interpreter {
@Override
public List<InterpreterCompletion> completion(String buf, int cursor) {
List<CharSequence> candidates = new ArrayList<CharSequence>();
List<CharSequence> candidates = new ArrayList<>();
if (sqlCompleter != null && sqlCompleter.complete(buf, cursor, candidates) >= 0) {
List completion = Lists.transform(candidates, sequenceToStringTransformer);
return completion;

View file

@ -65,7 +65,7 @@ public class SqlCompleter extends StringsCompleter {
}
};
private Set<String> modelCompletions = new HashSet<String>();
private Set<String> modelCompletions = new HashSet<>();
public SqlCompleter(Set<String> allCompletions, Set<String> dataModelCompletions) {
super(allCompletions);
@ -150,7 +150,7 @@ public class SqlCompleter extends StringsCompleter {
keywords += "," + driverKeywords.toUpperCase();
}
Set<String> completions = new TreeSet<String>();
Set<String> completions = new TreeSet<>();
// Add the keywords from the current JDBC connection
@ -193,7 +193,7 @@ public class SqlCompleter extends StringsCompleter {
public static Set<String> getDataModelMetadataCompletions(Connection connection)
throws SQLException {
Set<String> completions = new TreeSet<String>();
Set<String> completions = new TreeSet<>();
getColumnNames(connection.getMetaData(), completions);
getSchemaNames(connection.getMetaData(), completions);
return completions;

View file

@ -39,7 +39,7 @@ public class SqlCompleterTest extends BasicJDBCTestCaseAdapter {
private Logger logger = LoggerFactory.getLogger(SqlCompleterTest.class);
private final static Set<String> EMPTY = new HashSet<String>();
private final static Set<String> EMPTY = new HashSet<>();
private CompleterTester tester;
@ -157,7 +157,7 @@ public class SqlCompleterTest extends BasicJDBCTestCaseAdapter {
private void expectedCompletions(String buffer, int cursor, Set<String> expected) {
ArrayList<CharSequence> candidates = new ArrayList<CharSequence>();
ArrayList<CharSequence> candidates = new ArrayList<>();
completer.complete(buffer, cursor, candidates);

View file

@ -47,6 +47,6 @@ mvn -Dpython.test.exclude='' test -pl python -am
* JavaBuilder can't send SIGINT signal to interrupt paragraph execution. Therefore interpreter directly send a `kill SIGINT PID` to python process to interrupt execution. Python process catch SIGINT signal with some code defined in bootstrap.py
* Matplotlib display feature is made with SVG export (in string) and then displays it with html code.
* Matplotlib figures are displayed inline with the notebook automatically using a built-in backend for zeppelin in conjunction with a post-execute hook.
* `%python.sql` support for Pandas DataFrames is optional and provided using https://github.com/yhat/pandasql if user have one installed
* `%python.sql` support for Pandas DataFrames is optional and provided using https://github.com/yhat/pandasql if user have one installed

View file

@ -36,7 +36,8 @@
<py4j.version>0.9.2</py4j.version>
<python.test.exclude>
**/PythonInterpreterWithPythonInstalledTest.java,
**/PythonInterpreterPandasSqlTest.java
**/PythonInterpreterPandasSqlTest.java,
**/PythonInterpreterMatplotlibTest.java
</python.test.exclude>
</properties>

View file

@ -32,6 +32,8 @@ import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
@ -68,6 +70,12 @@ public class PythonInterpreter extends Interpreter {
@Override
public void open() {
// Add matplotlib display hook
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
}
LOG.info("Starting Python interpreter ---->");
LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON));

View file

@ -16,10 +16,11 @@
# PYTHON 2 / 3 compatibility :
# bootstrap.py must be runnable with Python 2 or 3
# Remove interactive mode displayhook
import os
import sys
import signal
import base64
import warnings
from io import BytesIO
try:
from StringIO import StringIO
@ -117,6 +118,7 @@ class PyZeppelinContext(object):
def __init__(self):
self.max_result = 1000
self._displayhook = lambda *args: None
def input(self, name, defaultValue=""):
print(self.errorMsg)
@ -137,11 +139,14 @@ class PyZeppelinContext(object):
elif hasattr(p, '__call__'):
p() #error reporting
def show_dataframe(self, df, **kwargs):
def show_dataframe(self, df, show_index=False, **kwargs):
"""Pretty prints DF using Table Display System
"""
limit = len(df) > self.max_result
header_buf = StringIO("")
if show_index:
idx_name = str(df.index.name) if df.index.name is not None else ""
header_buf.write(idx_name + "\t")
header_buf.write(str(df.columns[0]))
for col in df.columns[1:]:
header_buf.write("\t")
@ -150,7 +155,11 @@ class PyZeppelinContext(object):
body_buf = StringIO("")
rows = df.head(self.max_result).values if limit else df.values
for row in rows:
index = df.index.values
for idx, row in zip(index, rows):
if show_index:
body_buf.write("%html <strong>{}</strong>".format(idx))
body_buf.write("\t")
body_buf.write(str(row[0]))
for cell in row[1:]:
body_buf.write("\t")
@ -164,7 +173,7 @@ class PyZeppelinContext(object):
#)
body_buf.close(); header_buf.close()
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
**kwargs):
"""Matplotlib show function
"""
@ -187,6 +196,39 @@ class PyZeppelinContext(object):
html = "%html <div style='width:{width};height:{height}'>{img}<div>"
print(html.format(width=width, height=height, img=img_str))
img.close()
def configure_mpl(self, **kwargs):
import mpl_config
mpl_config.configure(**kwargs)
def _setup_matplotlib(self):
# If we don't have matplotlib installed don't bother continuing
try:
import matplotlib
except ImportError:
pass
# Make sure custom backends are available in the PYTHONPATH
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
if mpl_path not in sys.path:
sys.path.append(mpl_path)
# Finally check if backend exists, and if so configure as appropriate
try:
matplotlib.use('module://backend_zinline')
import backend_zinline
# Everything looks good so make config assuming that we are using
# an inline backend
self._displayhook = backend_zinline.displayhook
self.configure_mpl(width=600, height=400, dpi=72,
fontsize=10, interactive=True, format='png')
except ImportError:
# Fall back to Agg if no custom backend installed
matplotlib.use('Agg')
warnings.warn("Unable to load inline matplotlib backend, "
"falling back to Agg")
z = PyZeppelinContext()
z._setup_matplotlib()

View file

@ -18,13 +18,19 @@
}
},
"editor": {
"language": "python"
"language": "python",
"editOnDblClick": false
}
},
{
"group": "python",
"name": "sql",
"className": "org.apache.zeppelin.python.PythonInterpreterPandasSql",
"properties": { }
"properties": {
},
"editor":{
"language": "sql",
"editOnDblClick": false
}
}
]

View file

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.*;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
/**
* In order for this test to work, test env must have installed:
* <ol>
* - <li>Python</li>
* - <li>Matplotlib</li>
* <ol>
*
* Your PYTHONPATH should also contain the directory of the Matplotlib
* backend files. Usually these can be found in $ZEPPELIN_HOME/interpreter/lib/python.
*
* To run manually on such environment, use:
* <code>
* mvn -Dpython.test.exclude='' test -pl python -am
* </code>
*/
public class PythonInterpreterMatplotlibTest {
private InterpreterGroup intpGroup;
private PythonInterpreter python;
private InterpreterContext context;
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("zeppelin.python", "python");
p.setProperty("zeppelin.python.maxResult", "100");
intpGroup = new InterpreterGroup();
python = new PythonInterpreter(p);
python.setInterpreterGroup(intpGroup);
python.open();
List<Interpreter> interpreters = new LinkedList<>();
interpreters.add(python);
intpGroup.put("note", interpreters);
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(
new InterpreterOutputListener() {
@Override public void onAppend(InterpreterOutput out, byte[] line) {}
@Override public void onUpdate(InterpreterOutput out, byte[] output) {}
}));
}
@Test
public void dependenciesAreInstalled() {
// matplotlib
InterpreterResult ret = python.interpret("import matplotlib", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
// inline backend
ret = python.interpret("import backend_zinline", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
public void showPlot() {
// Simple plot test
InterpreterResult ret;
ret = python.interpret("import matplotlib.pyplot as plt", context);
ret = python.interpret("z.configure_mpl(interactive=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret = python.interpret("plt.show()", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message(), Type.HTML, ret.type());
assertTrue(ret.message().contains("data:image/png;base64"));
assertTrue(ret.message().contains("<div>"));
}
@Test
// Test for when configuration is set to auto-close figures after show().
public void testClose() {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
ret = python.interpret("import matplotlib.pyplot as plt", context);
ret = python.interpret("z.configure_mpl(interactive=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret1 = python.interpret("plt.show()", context);
// Second call to show() should print nothing, and Type should be TEXT.
// This is because when close=True, there should be no living instances
// of FigureManager, causing show() to return before setting the output
// type to HTML.
ret = python.interpret("plt.show()", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message(), Type.TEXT, ret.type());
assertTrue(ret.message().equals(""));
// Now test that new plot is drawn. It should be identical to the
// previous one.
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret2 = python.interpret("plt.show()", context);
assertTrue(ret1.message().equals(ret2.message()));
}
@Test
// Test for when configuration is set to not auto-close figures after show().
public void testNoClose() {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
ret = python.interpret("import matplotlib.pyplot as plt", context);
ret = python.interpret("z.configure_mpl(interactive=False, close=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret1 = python.interpret("plt.show()", context);
// Second call to show() should print nothing, and Type should be HTML.
// This is because when close=False, there should be living instances
// of FigureManager, causing show() to set the output
// type to HTML even though the figure is inactive.
ret = python.interpret("plt.show()", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message(), Type.HTML, ret.type());
assertTrue(ret.message().equals(""));
// Now test that plot can be reshown if it is updated. It should be
// different from the previous one because it will plot the same line
// again but in a different color.
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret2 = python.interpret("plt.show()", context);
assertTrue(!ret1.message().equals(ret2.message()));
}
}

View file

@ -159,17 +159,20 @@ public class PythonInterpreterPandasSqlTest {
ret = python.interpret("import pandas as pd", context);
ret = python.interpret("import numpy as np", context);
// given a Pandas DataFrame with non-text data
// given a Pandas DataFrame with an index and non-text data
ret = python.interpret("index = pd.Index([10, 11, 12, 13], name='index_name')", context);
ret = python.interpret("d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
ret = python.interpret("df1 = pd.DataFrame(d1)", context);
ret = python.interpret("df1 = pd.DataFrame(d1, index=index)", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
// when
ret = python.interpret("z.show(df1)", context);
ret = python.interpret("z.show(df1, show_index=True)", context);
// then
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message(), Type.TABLE, ret.type());
assertTrue(ret.message().indexOf("index_name") == 0);
assertTrue(ret.message().indexOf("13") > 0);
assertTrue(ret.message().indexOf("nan") > 0);
assertTrue(ret.message().indexOf("6.7") > 0);
}

View file

@ -0,0 +1,290 @@
# /**
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements. See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership. The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */
import argparse
import json
from os.path import isfile
from os import getcwd
from subprocess import call, check_call
#######################################################################################################################
# I put these here so it will (hopeully) be easy(er) to bump versions / maintain
# If there is demand, we could easily make parts or all comand line arguments as well
#######################################################################################################################
tar_name = "apache-mahout-distribution-0.12.2.tar.gz"
mahout_bin_url = "http://apache.osuosl.org/mahout/0.12.2/%s" % tar_name
mahout_version = "0.12.2"
parser = argparse.ArgumentParser()
parser.add_argument("--force_download", help="force download Apache Mahout", action="store_true")
parser.add_argument("--restart_later", help="force download Apache Mahout", action="store_true")
parser.add_argument("--zeppelin_home", help="path to ZEPPELIN_HOME")
parser.add_argument("--mahout_home", help="path to MAHOUT_HOME, use this if you have already installed Apache Mahout")
parser.add_argument("--overwrite_existing", help="if %sparkMahout or %flinkMahout exist, delete them and create new ones. Otherwise Fail.", action="store_true")
args = parser.parse_args()
class ZeppelinTerpWrangler:
def __init__(self, interpreter_json_path):
self.interpreter_json_path = interpreter_json_path
def _getTerpID(self, terpName):
terp_id = None
for k, v in self.interpreter_json['interpreterSettings'].iteritems():
if v['name'] == terpName:
terp_id = k
break
return terp_id
def _terpExists(self, terpName):
terp_id = self._getTerpID(terpName)
if terp_id == None:
return False
return True
def createTerp(self, original_terp_name, new_terp_name, overwrite_existing=True ):
new_terp_id = new_terp_name
if self._terpExists(new_terp_name):
print "Found existing '%s' interpreter..." % new_terp_name
if overwrite_existing:
print "deleting %s from interpreter.json" %new_terp_name
del self.interpreter_json['interpreterSettings'][self._getTerpID(new_terp_name)]
else:
print "exiting program."
exit(1)
orig_terp_id = self._getTerpID(original_terp_name)
from copy import deepcopy
self.interpreter_json['interpreterSettings'][new_terp_id] = deepcopy(
self.interpreter_json['interpreterSettings'][orig_terp_id])
self.interpreter_json['interpreterSettings'][new_terp_id]['name'] = new_terp_name
self.interpreter_json['interpreterSettings'][new_terp_id]['id'] = new_terp_id
print "created new interpreter '%s' from interpreter '%s" % (new_terp_name, original_terp_name)
def _readTerpJson(self):
with open(self.interpreter_json_path) as f:
self.interpreter_json = json.load(f)
def _writeTerpJson(self):
with open(self.interpreter_json_path, 'wb') as f:
json.dump(self.interpreter_json, f, sort_keys=True, indent=4)
def _updateTerpProp(self, terpName, property, value):
terp_id = self._getTerpID(terpName)
self.interpreter_json['interpreterSettings'][terp_id]['properties'][property] = value
def _addTerpDep(self, terpName="", dep="", exclusions=None):
if self.interpreter_json == {}:
print "no interpreter.json loaded, reading last one downloaded"
self._readTerpJson()
terp_id = self._getTerpID(terpName)
deps = self.interpreter_json['interpreterSettings'][terp_id]['dependencies']
dep_dict = {
u'groupArtifactVersion': dep,
u'local': False
}
if exclusions != None:
dep_dict["exclusions"] = exclusions
deps.append(dep_dict)
## Remove Duplicate Dependencies
seen = set()
new_deps = list()
for d in deps:
t = d.items()
if t[0] not in seen:
seen.add(t[0])
new_deps.append(d)
self.interpreter_json['interpreterSettings'][terp_id]['dependencies'] = new_deps
def addMahoutConfig(self, terpName, mahout_home, mahout_version = "0.12.2"):
print "updating '%s' with Apache Mahout dependencies and settings" % terpName
terpDeps = ["%s/mahout-math-%s.jar" % (mahout_home, mahout_version),
"%s/mahout-math-scala_2.10-%s.jar" % (mahout_home, mahout_version)]
if "spark" in terpName.lower():
configs = {
"spark.kryo.referenceTracking": "false",
"spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator",
"spark.kryoserializer.buffer": "32k",
"spark.kryoserializer.buffer.max": "600m",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
}
terpDeps.append('%s/mahout-spark_2.10-%s-dependency-reduced.jar' % (mahout_home, mahout_version))
terpDeps.append("%s/mahout-spark_2.10-%s.jar" % (mahout_home, mahout_version))
terpDeps.append("%s/mahout-spark-shell_2.10-%s.jar" % (mahout_home, mahout_version))
if "flink" in terpName.lower():
configs = {
"taskmanager.numberOfTaskSlots" : "12"
}
addlDeps = [
"%s/mahout-flink_2.10-%s.jar" % (mahout_home, mahout_version),
"%s/mahout-hdfs-%s.jar" % (mahout_home, mahout_version),
"com.google.guava:guava:14.0.1"
#"%s/guava-14.0.1.jar" % mahout_home ## reuired in lib dir if running against cluster
]
for t in addlDeps:
terpDeps.append(t)
for k, v in configs.iteritems():
self._updateTerpProp(terpName, k, v)
for t in terpDeps:
self._addTerpDep(terpName, t)
#######################################################################################################################
# Need to be sure we know where Zeppelin Top directory is so we can edit conf files
#
#######################################################################################################################
def valid_zeppelin_home(path):
return isfile(path + "/bin/zeppelin-daemon.sh")
if args.zeppelin_home == None:
zeppelin_home = getcwd()
if (zeppelin_home.split("/")[-1] == "bin") and (isfile("zeppelin-daemon.sh")):
print "we're in the zeppelin/bin"
zeppelin_home = "/".join(zeppelin_home.split("/")[:-1])
print "--zeppelin_home not specified, using %s" % zeppelin_home
else:
zeppelin_home = args.zeppelin_home
if not valid_zeppelin_home(zeppelin_home):
print "%s does not appear to be a valid ZEPPELIN_HOME - e.g. the top level directory of the ZEPPELIN install" % zeppelin_home
exit(1)
else:
print "ZEPPELIN_HOME validated"
interpreter_json_path = zeppelin_home + "/conf/interpreter.json"
if not isfile(interpreter_json_path):
print "interpreter.json doesn't exist. Checking weather Zeppelin is running."
status = call(["bin/zeppelin-daemon.sh", 'status'], cwd=zeppelin_home)
if status == 1:
print "Zeppelin doesn't appear to be running- it is possible that Zeppelin has never been run (interpreter.json is created when Zeppelin is run)"
print "I'm going to try to start Zeppelin to create interpreter.json"
call(["bin/zeppelin-daemon.sh", 'start'], cwd=zeppelin_home)
from time import sleep
sleep(3)
else:
print "We're in the correct top-level directory, Zeppelin appears to be running, but there is no 'interpreter.json'. \
\nThis is a confusing case. Please try restarting Zeppelin, but if that doesn't work reach out on the mailing list."
if isfile(interpreter_json_path):
z = ZeppelinTerpWrangler(interpreter_json_path)
else:
print "'interpreter.json' not found in %s/conf" % args.zeppelin_home
exit(1)
#######################################################################################################################
# If --mahout_home not set, download and untar Mahout in to ZEPPELIN_HOME
# Set MAHOUT_HOME to ZEPPELIN_HOME/<mahout_untar_dir>
#######################################################################################################################
def download_mahout():
if args.force_download:
print "--force_download: OK, deleting existing tar if it exists."
call(["rm", "%s/%s" % (zeppelin_home, tar_name)])
return True
elif isfile("%s/%s" % (zeppelin_home, tar_name)):
print "%s found, skipping download" % tar_name
return False
elif args.mahout_home:
print "--mahout_home set, skipping download"
return False
else:
return True
if download_mahout():
check_call(['wget', mahout_bin_url], cwd= zeppelin_home)
check_call(['tar', 'xzf', tar_name], cwd= zeppelin_home)
if args.mahout_home:
mahout_home = args.mahout_home
else:
mahout_home = zeppelin_home + "/" + ".".join(tar_name.split(".")[:-2])
#######################################################################################################################
# Create new interpreters
#######################################################################################################################
z._readTerpJson()
z.createTerp("spark", "sparkMahout", args.overwrite_existing)
z.createTerp("flink", "flinkMahout", args.overwrite_existing)
z.addMahoutConfig("sparkMahout", mahout_home, mahout_version)
z.addMahoutConfig("flinkMahout", mahout_home, mahout_version)
z._writeTerpJson()
#######################################################################################################################
# Add "export MAHOUT_HOME=... to conf/zeppelin-env.sh
# Create if doesn't exist.
#######################################################################################################################
mahout_home_str = '\nexport MAHOUT_HOME=%s\n' % (mahout_home)
zeppelin_env_sh_path = '%s/conf/zeppelin-env.sh' % zeppelin_home
if isfile(zeppelin_env_sh_path):
with open(zeppelin_env_sh_path, 'rb') as f:
zeppelin_env_sh = f.readlines()
if any(["export MAHOUT_HOME=" in line for line in zeppelin_env_sh]):
print "'export MAHOUT_HOME=...' already exists in zeppelin_env.sh, not appending"
else:
print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str
with open(zeppelin_env_sh_path, 'a') as f:
f.write(mahout_home_str)
else:
print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str
with open(zeppelin_env_sh_path, 'wb') as f:
f.write(mahout_home_str)
#######################################################################################################################
# You have to restart Apache Zeppelin for new terps to show up... do this for user unless the specified otherwise
#
#######################################################################################################################
if not args.restart_later:
print "restarting Apache Zeppelin to load new interpreters..."
check_call(["bin/zeppelin-daemon.sh", 'restart'], cwd= zeppelin_home)
else:
print "--restart_later flag detected: remember to restart Zeppelin to see new Mahout interpreters!!"
#######################################################################################################################
# Good bye
#######################################################################################################################
print "---------------------------------------------------------------------------------------------------------------"
print "all done! Thanks for using Apache Mahout"
print "bye"

View file

@ -60,7 +60,7 @@ public class ShellInterpreter extends Interpreter {
@Override
public void open() {
LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
executors = new ConcurrentHashMap<String, DefaultExecutor>();
executors = new ConcurrentHashMap<>();
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
}

View file

@ -30,7 +30,8 @@
}
},
"editor": {
"language": "sh"
"language": "sh",
"editOnDblClick": false
}
}
]

View file

@ -450,38 +450,6 @@
</build>
<profiles>
<profile>
<id>spark-1.1</id>
<dependencies>
</dependencies>
<properties>
<spark.version>1.1.1</spark.version>
<akka.version>2.2.3-shaded-protobuf</akka.version>
</properties>
</profile>
<profile>
<id>spark-1.2</id>
<dependencies>
</dependencies>
<properties>
<spark.version>1.2.1</spark.version>
</properties>
</profile>
<profile>
<id>spark-1.3</id>
<properties>
<spark.version>1.3.1</spark.version>
</properties>
<dependencies>
</dependencies>
</profile>
<profile>
<id>spark-1.4</id>
<properties>

View file

@ -153,7 +153,7 @@ public class DepInterpreter extends Interpreter {
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
// set classloader for scala compiler
settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
.getContextClassLoader()));
BooleanSetting b = (BooleanSetting) settings.usejavacp();
@ -219,7 +219,7 @@ public class DepInterpreter extends Interpreter {
public Object getLastObject() {
IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
Object obj = r.lineRep().call("$result",
JavaConversions.asScalaBuffer(new LinkedList<Object>()));
JavaConversions.asScalaBuffer(new LinkedList<>()));
return obj;
}
@ -290,7 +290,7 @@ public class DepInterpreter extends Interpreter {
Candidates ret = c.complete(buf, cursor);
List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
List<InterpreterCompletion> completions = new LinkedList<InterpreterCompletion>();
List<InterpreterCompletion> completions = new LinkedList<>();
for (String candidate : candidates) {
completions.add(new InterpreterCompletion(candidate, candidate));
@ -298,7 +298,7 @@ public class DepInterpreter extends Interpreter {
return completions;
} else {
return new LinkedList<InterpreterCompletion>();
return new LinkedList<>();
}
}
@ -314,7 +314,7 @@ public class DepInterpreter extends Interpreter {
}
private List<File> classPath(ClassLoader cl) {
List<File> paths = new LinkedList<File>();
List<File> paths = new LinkedList<>();
if (cl == null) {
return paths;
}

View file

@ -48,10 +48,12 @@ import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.slf4j.Logger;
@ -111,11 +113,16 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public void open() {
// Add matplotlib display hook
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
}
DepInterpreter depInterpreter = getDepInterpreter();
// load libraries from Dependency Interpreter
URL [] urls = new URL[0];
List<URL> urlList = new LinkedList<URL>();
List<URL> urlList = new LinkedList<>();
if (depInterpreter != null) {
SparkDependencyContext depc = depInterpreter.getDependencyContext();
@ -165,6 +172,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
private Map setupPySparkEnv() throws IOException{
Map env = EnvironmentUtils.getProcEnvironment();
if (!env.containsKey("PYTHONPATH")) {
SparkConf conf = getSparkConf();
env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":"));
}
return env;
}
private void createGatewayServerAndStartScript() {
// create python script
createPythonScript();
@ -196,10 +212,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
executor.setStreamHandler(streamHandler);
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
try {
Map env = EnvironmentUtils.getProcEnvironment();
Map env = setupPySparkEnv();
executor.execute(cmd, env, this);
pythonscriptRunning = true;
} catch (IOException e) {

View file

@ -504,6 +504,7 @@ public class SparkInterpreter extends Interpreter {
conf.set("spark.files", conf.get("spark.yarn.dist.files"));
}
conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs));
conf.set("spark.submit.pyFiles", Joiner.on(",").join(pythonLibUris));
}
// Distributes needed libraries to workers
@ -596,7 +597,7 @@ public class SparkInterpreter extends Interpreter {
}
String[] argsArray = args.split(" ");
LinkedList<String> argList = new LinkedList<String>();
LinkedList<String> argList = new LinkedList<>();
for (String arg : argsArray) {
argList.add(arg);
}
@ -719,7 +720,7 @@ public class SparkInterpreter extends Interpreter {
// set classloader for scala compiler
settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
.getContextClassLoader()));
BooleanSetting b = (BooleanSetting) settings.usejavacp();
b.v_$eq(true);
@ -957,7 +958,7 @@ public class SparkInterpreter extends Interpreter {
}
private List<File> classPath(ClassLoader cl) {
List<File> paths = new LinkedList<File>();
List<File> paths = new LinkedList<>();
if (cl == null) {
return paths;
}
@ -978,7 +979,7 @@ public class SparkInterpreter extends Interpreter {
public List<InterpreterCompletion> completion(String buf, int cursor) {
if (completer == null) {
logger.warn("Can't find completer");
return new LinkedList<InterpreterCompletion>();
return new LinkedList<>();
}
if (buf.length() < cursor) {
@ -994,7 +995,7 @@ public class SparkInterpreter extends Interpreter {
Candidates ret = c.complete(completionText, cursor);
List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
List<InterpreterCompletion> completions = new LinkedList<InterpreterCompletion>();
List<InterpreterCompletion> completions = new LinkedList<>();
for (String candidate : candidates) {
completions.add(new InterpreterCompletion(candidate, candidate));
@ -1067,7 +1068,7 @@ public class SparkInterpreter extends Interpreter {
return null;
}
Object obj = r.lineRep().call("$result",
JavaConversions.asScalaBuffer(new LinkedList<Object>()));
JavaConversions.asScalaBuffer(new LinkedList<>()));
return obj;
}

View file

@ -61,7 +61,7 @@ public class ZeppelinContext {
// given replName in parapgraph
private static final Map<String, String> interpreterClassMap;
static {
interpreterClassMap = new HashMap<String, String>();
interpreterClassMap = new HashMap<>();
interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter");
interpreterClassMap.put("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter");
interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter");
@ -134,7 +134,7 @@ public class ZeppelinContext {
@ZeppelinApi
public scala.collection.Iterable<Object> checkbox(String name,
scala.collection.Iterable<Tuple2<Object, String>> options) {
List<Object> allChecked = new LinkedList<Object>();
List<Object> allChecked = new LinkedList<>();
for (Tuple2<Object, String> option : asJavaIterable(options)) {
allChecked.add(option._1());
}
@ -400,7 +400,7 @@ public class ZeppelinContext {
@ZeppelinApi
public List<String> listParagraphs() {
List<String> paragraphs = new LinkedList<String>();
List<String> paragraphs = new LinkedList<>();
for (InterpreterContextRunner r : interpreterContext.getRunners()) {
paragraphs.add(r.getParagraphId());

View file

@ -49,16 +49,16 @@ import scala.Console;
*
*/
public class SparkDependencyContext {
List<Dependency> dependencies = new LinkedList<Dependency>();
List<Repository> repositories = new LinkedList<Repository>();
List<Dependency> dependencies = new LinkedList<>();
List<Repository> repositories = new LinkedList<>();
List<File> files = new LinkedList<File>();
List<File> filesDist = new LinkedList<File>();
List<File> files = new LinkedList<>();
List<File> filesDist = new LinkedList<>();
private RepositorySystem system = Booter.newRepositorySystem();
private RepositorySystemSession session;
private RemoteRepository mavenCentral = Booter.newCentralRepository();
private RemoteRepository mavenLocal = Booter.newLocalRepository();
private List<RemoteRepository> additionalRepos = new LinkedList<RemoteRepository>();
private List<RemoteRepository> additionalRepos = new LinkedList<>();
public SparkDependencyContext(String localRepoPath, String additionalRemoteRepository) {
session = Booter.newRepositorySystemSession(system, localRepoPath);
@ -88,11 +88,11 @@ public class SparkDependencyContext {
public void reset() {
Console.println("DepInterpreter(%dep) deprecated. "
+ "Remove dependencies and repositories through GUI interpreter menu instead.");
dependencies = new LinkedList<Dependency>();
repositories = new LinkedList<Repository>();
dependencies = new LinkedList<>();
repositories = new LinkedList<>();
files = new LinkedList<File>();
filesDist = new LinkedList<File>();
files = new LinkedList<>();
filesDist = new LinkedList<>();
}
private void addRepoFromProperty(String listOfRepo) {

View file

@ -114,7 +114,7 @@ public class SparkDependencyResolver extends AbstractDependencyResolver {
}
// NOTE: Must use reflection until this is exposed/fixed upstream in Scala
List<String> classPaths = new LinkedList<String>();
List<String> classPaths = new LinkedList<>();
for (URL url : urls) {
classPaths.add(url.getPath());
}
@ -151,7 +151,7 @@ public class SparkDependencyResolver extends AbstractDependencyResolver {
private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform platform, URL[] urls) {
IndexedSeq<ClassPath<AbstractFile>> entries =
((MergedClassPath<AbstractFile>) platform.classPath()).entries();
List<ClassPath<AbstractFile>> cp = new LinkedList<ClassPath<AbstractFile>>();
List<ClassPath<AbstractFile>> cp = new LinkedList<>();
for (int i = 0; i < entries.size(); i++) {
cp.add(entries.apply(i));
@ -200,7 +200,7 @@ public class SparkDependencyResolver extends AbstractDependencyResolver {
return loadFromMvn(artifact, excludes, addSparkContext);
} else {
loadFromFs(artifact, addSparkContext);
LinkedList<String> libs = new LinkedList<String>();
LinkedList<String> libs = new LinkedList<>();
libs.add(artifact);
return libs;
}
@ -224,8 +224,8 @@ public class SparkDependencyResolver extends AbstractDependencyResolver {
private List<String> loadFromMvn(String artifact, Collection<String> excludes,
boolean addSparkContext) throws Exception {
List<String> loadedLibs = new LinkedList<String>();
Collection<String> allExclusions = new LinkedList<String>();
List<String> loadedLibs = new LinkedList<>();
Collection<String> allExclusions = new LinkedList<>();
allExclusions.addAll(excludes);
allExclusions.addAll(Arrays.asList(exclusions));
@ -244,8 +244,8 @@ public class SparkDependencyResolver extends AbstractDependencyResolver {
}
}
List<URL> newClassPathList = new LinkedList<URL>();
List<File> files = new LinkedList<File>();
List<URL> newClassPathList = new LinkedList<>();
List<File> files = new LinkedList<>();
for (ArtifactResult artifactResult : listOfArtifact) {
logger.info("Load " + artifactResult.getArtifact().getGroupId() + ":"
+ artifactResult.getArtifact().getArtifactId() + ":"
@ -302,7 +302,7 @@ public class SparkDependencyResolver extends AbstractDependencyResolver {
}
public static Collection<String> inferScalaVersion(Collection<String> artifact) {
List<String> list = new LinkedList<String>();
List<String> list = new LinkedList<>();
for (String a : artifact) {
list.add(inferScalaVersion(a));
}

View file

@ -56,7 +56,8 @@
}
},
"editor": {
"language": "scala"
"language": "scala",
"editOnDblClick": false
}
},
{
@ -90,7 +91,8 @@
}
},
"editor": {
"language": "sql"
"language": "sql",
"editOnDblClick": false
}
},
{
@ -112,7 +114,8 @@
}
},
"editor": {
"language": "scala"
"language": "scala",
"editOnDblClick": false
}
},
{
@ -128,7 +131,8 @@
}
},
"editor": {
"language": "python"
"language": "python",
"editOnDblClick": false
}
}
]

View file

@ -15,7 +15,7 @@
# limitations under the License.
#
import sys, getopt, traceback, json, re
import os, sys, getopt, traceback, json, re
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from py4j.protocol import Py4JJavaError
@ -50,6 +50,7 @@ class Logger(object):
class PyZeppelinContext(dict):
def __init__(self, zc):
self.z = zc
self._displayhook = lambda *args: None
def show(self, obj):
from pyspark.sql import DataFrame
@ -116,6 +117,39 @@ class PyZeppelinContext(dict):
return self.z.getHook(event)
return self.z.getHook(event, replName)
def _setup_matplotlib(self):
# If we don't have matplotlib installed don't bother continuing
try:
import matplotlib
except ImportError:
return
# Make sure custom backends are available in the PYTHONPATH
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
if mpl_path not in sys.path:
sys.path.append(mpl_path)
# Finally check if backend exists, and if so configure as appropriate
try:
matplotlib.use('module://backend_zinline')
import backend_zinline
# Everything looks good so make config assuming that we are using
# an inline backend
self._displayhook = backend_zinline.displayhook
self.configure_mpl(width=600, height=400, dpi=72, fontsize=10,
interactive=True, format='png', context=self.z)
except ImportError:
# Fall back to Agg if no custom backend installed
matplotlib.use('Agg')
warnings.warn("Unable to load inline matplotlib backend, "
"falling back to Agg")
def configure_mpl(self, **kwargs):
import mpl_config
mpl_config.configure(**kwargs)
def __tupleToScalaTuple2(self, tuple):
if (len(tuple) == 2):
return gateway.jvm.scala.Tuple2(tuple[0], tuple[1])
@ -244,6 +278,7 @@ sqlContext = sqlc
completion = PySparkCompletion(intp)
z = PyZeppelinContext(intp.getZeppelinContext())
z._setup_matplotlib()
while True :
req = intp.getStatements()
@ -251,6 +286,22 @@ while True :
stmts = req.statements().split("\n")
jobGroup = req.jobGroup()
final_code = []
# Get post-execute hooks
try:
global_hook = intp.getHook('post_exec_dev')
except:
global_hook = None
try:
user_hook = z.getHook('post_exec')
except:
user_hook = None
nhooks = 0
for hook in (global_hook, user_hook):
if hook:
nhooks += 1
for s in stmts:
if s == None:
@ -268,7 +319,9 @@ while True :
# so that the last statement's evaluation will be printed to stdout
sc.setJobGroup(jobGroup, "Zeppelin")
code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
to_run_exec, to_run_single = code.body[:-1], code.body[-1:]
to_run_hooks = code.body[-nhooks:]
to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
[code.body[-(nhooks + 1)]])
try:
for node in to_run_exec:
@ -280,6 +333,11 @@ while True :
mod = ast.Interactive([node])
code = compile(mod, '<stdin>', 'single')
exec(code)
for node in to_run_hooks:
mod = ast.Module([node])
code = compile(mod, '<stdin>', 'exec')
exec(code)
except:
raise Exception(traceback.format_exc())

View file

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import static org.junit.Assert.*;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class PySparkInterpreterTest {
public static SparkInterpreter sparkInterpreter;
public static PySparkInterpreter pySparkInterpreter;
public static InterpreterGroup intpGroup;
private File tmpDir;
public static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class);
private InterpreterContext context;
public static Properties getPySparkTestProperties() {
Properties p = new Properties();
p.setProperty("master", "local[*]");
p.setProperty("spark.app.name", "Zeppelin Test");
p.setProperty("zeppelin.spark.useHiveContext", "true");
p.setProperty("zeppelin.spark.maxResult", "1000");
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
return p;
}
/**
* Get spark version number as a numerical value.
* eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
*/
public static int getSparkVersionNumber() {
if (sparkInterpreter == null) {
return 0;
}
String[] split = sparkInterpreter.getSparkContext().version().split("\\.");
int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
return version;
}
@Before
public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo");
tmpDir.mkdirs();
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
if (sparkInterpreter == null) {
sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
intpGroup.get("note").add(sparkInterpreter);
sparkInterpreter.setInterpreterGroup(intpGroup);
sparkInterpreter.open();
}
if (pySparkInterpreter == null) {
pySparkInterpreter = new PySparkInterpreter(getPySparkTestProperties());
intpGroup.get("note").add(pySparkInterpreter);
pySparkInterpreter.setInterpreterGroup(intpGroup);
pySparkInterpreter.open();
}
context = new InterpreterContext("note", "id", "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}));
}
@After
public void tearDown() throws Exception {
delete(tmpDir);
}
private void delete(File file) {
if (file.isFile()) file.delete();
else if (file.isDirectory()) {
File[] files = file.listFiles();
if (files != null && files.length > 0) {
for (File f : files) {
delete(f);
}
}
file.delete();
}
}
@Test
public void testBasicIntp() {
if (getSparkVersionNumber() > 11) {
assertEquals(InterpreterResult.Code.SUCCESS,
pySparkInterpreter.interpret("a = 1\n", context).code());
}
}
}

View file

@ -16,28 +16,15 @@
# limitations under the License.
#
if [[ "$#" -ne 2 ]]; then
echo "usage) $0 [spark version] [hadoop version]"
echo " eg) $0 1.3.1 2.6"
exit 1
exit 0
fi
SPARK_VERSION="${1}"
HADOOP_VERSION="${2}"
echo "${SPARK_VERSION}" | grep "^1.[123].[0-9]" > /dev/null
if [[ "$?" -eq 0 ]]; then
echo "${SPARK_VERSION}" | grep "^1.[12].[0-9]" > /dev/null
if [[ "$?" -eq 0 ]]; then
SPARK_VER_RANGE="<=1.2"
else
SPARK_VER_RANGE="<=1.3"
fi
else
SPARK_VER_RANGE=">1.3"
fi
set -xe
MAX_DOWNLOAD_TIME_SEC=590
@ -75,30 +62,13 @@ if [[ ! -d "${SPARK_HOME}" ]]; then
ls -la .
echo "${SPARK_CACHE} does not have ${SPARK_ARCHIVE} downloading ..."
# download archive if not cached
if [[ "${SPARK_VERSION}" = "1.1.1" || "${SPARK_VERSION}" = "1.2.2" || "${SPARK_VERSION}" = "1.3.1" || "${SPARK_VERSION}" = "1.4.1" ]]; then
echo "${SPARK_VERSION} being downloaded from archives"
# spark old versions are only available only on the archives (prior to 1.5.2)
STARTTIME=`date +%s`
#timeout -s KILL "${MAX_DOWNLOAD_TIME_SEC}" wget "http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_ARCHIVE}.tgz"
download_with_retry "http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_ARCHIVE}.tgz"
ENDTIME=`date +%s`
DOWNLOADTIME="$((ENDTIME-STARTTIME))"
else
echo "${SPARK_VERSION} being downloaded from mirror"
# spark 1.5.2 and up and later can be downloaded from mirror
# get download address from mirror
MIRROR_INFO=$(curl -s "http://www.apache.org/dyn/closer.cgi/spark/spark-${SPARK_VERSION}/${SPARK_ARCHIVE}.tgz?asjson=1")
PREFFERED=$(echo "${MIRROR_INFO}" | grep preferred | sed 's/[^"]*.preferred.: .\([^"]*\).*/\1/g')
PATHINFO=$(echo "${MIRROR_INFO}" | grep path_info | sed 's/[^"]*.path_info.: .\([^"]*\).*/\1/g')
STARTTIME=`date +%s`
#timeout -s KILL "${MAX_DOWNLOAD_TIME_SEC}" wget -q "${PREFFERED}${PATHINFO}"
download_with_retry "${PREFFERED}${PATHINFO}"
ENDTIME=`date +%s`
DOWNLOADTIME="$((ENDTIME-STARTTIME))"
fi
# download spark from archive if not cached
echo "${SPARK_VERSION} being downloaded from archives"
STARTTIME=`date +%s`
#timeout -s KILL "${MAX_DOWNLOAD_TIME_SEC}" wget "http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_ARCHIVE}.tgz"
download_with_retry "http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_ARCHIVE}.tgz"
ENDTIME=`date +%s`
DOWNLOADTIME="$((ENDTIME-STARTTIME))"
fi
# extract archive in un-cached root, clean-up on failure

View file

@ -23,18 +23,12 @@ Apache Zeppelin is distributed as a single gzip archive with the following struc
Zeppelin
├── bin
│ ├── zeppelin.sh
│ └── seppelin-deamon.sh
│ └── zeppelin-daemon.sh
├── conf
├── interpreter
├── lib
├── licenses
├── zan-repo
│ ├── txt.wordcount
│ ├── vis.bubble
│ ├── vis.gchart
│ ├── ml.something
│ └── ...
├── zeppelin-server-<verion>.jar
├── notebook
└── zeppelin-web-<verion>.war
```
@ -42,4 +36,4 @@ Zeppelin
We use `maven-assembly-plugin` to build it, see `zeppelin-distribution/src/assemble/distribution.xml ` for details.
>**IMPORTANT:** `_/lib_` subdirectory contains all transitive dependencies of the `zeppelin-distribution` module,
automatically resolved by maven, except for explicitly excluded `_server_` and `_web_` Zeppelin sub-modules.
automatically resolved by maven, except for explicitly excluded `_web_` Zeppelin sub-modules.

View file

@ -29,11 +29,10 @@
<dependencySets>
<dependencySet>
<!-- Enable access to all projects in the current multimodule build!
<!-- Enable access to all projects in the current multimodule build!
<useAllReactorProjects>true</useAllReactorProjects> -->
<!-- Now, select which projects to include in this module-set. -->
<includes>
<include>org.apache.zeppelin:zeppelin-server</include>
<include>org.apache.zeppelin:zeppelin-web</include>
</includes>
<useProjectArtifact>false</useProjectArtifact>
@ -44,7 +43,6 @@
<useProjectArtifact>false</useProjectArtifact>
<excludes>
<exclude>${project.groupId}:zeppelin-web</exclude>
<exclude>${project.groupId}:zeppelin-server</exclude>
</excludes>
</dependencySet>
</dependencySets>
@ -94,4 +92,3 @@
</fileSets>-->
</assembly>

View file

@ -163,6 +163,8 @@ The following components are provided under Apache License.
(Apache 2.0) tez-runtime-internals (org.apache.tez:tez-runtime-internals:0.7.0 - http://tez.apache.org)
(Apache 2.0) tez-mapreduce (org.apache.tez:tez-mapreduce:0.7.0 - http://tez.apache.org)
(Apache 2.0) tez-yarn-timeline-history-with-acls (org.apache.tez:tez-yarn-timeline-history-with-acls:0.7.0 - http://tez.apache.org)
(Apache 2.0) jna (net.java.dev.jna:jna:4.1.0 https://github.com/java-native-access/jna)
(Apache 2.0) MathJax v2.7.0 - https://github.com/mathjax/MathJax/blob/2.7.0/LICENSE
========================================================================
MIT licenses
@ -208,7 +210,8 @@ The following components are provided under the MIT License.
(The MIT License) JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.16 - http://www.slf4j.org)
(The MIT License) angular-resource (angular-resource - https://github.com/angular/angular.js/tree/master/src/ngResource)
(The MIT License) minimal-json (com.eclipsesource.minimal-json:minimal-json:0.9.4 - https://github.com/ralfstx/minimal-json)
(The MIT License) pyrolite (net.razorvine:pyrolite:4.9) - https://github.com/irmen/Pyrolite/blob/v4.9/LICENSE
(The MIT License) pyrolite (net.razorvine:pyrolite:4.9) - https://github.com/irmen/Pyrolite/blob/v4.9/LICENSE)
(The MIT License) libpam4j (org.kohsuke:libpam4j:1.8 https://github.com/kohsuke/libpam4j/blob/master/src/main/java/org/jvnet/libpam/PAM.java)
========================================================================
BSD-style licenses

View file

@ -37,7 +37,7 @@ import org.sonatype.aether.resolution.ArtifactResult;
*/
public abstract class AbstractDependencyResolver {
protected RepositorySystem system = Booter.newRepositorySystem();
protected List<RemoteRepository> repos = new LinkedList<RemoteRepository>();
protected List<RemoteRepository> repos = new LinkedList<>();
protected RepositorySystemSession session;
public AbstractDependencyResolver(String localRepoPath) {

View file

@ -31,7 +31,7 @@ public class Dependency {
public Dependency(String groupArtifactVersion) {
this.groupArtifactVersion = groupArtifactVersion;
exclusions = new LinkedList<String>();
exclusions = new LinkedList<>();
}
@Override

View file

@ -42,11 +42,11 @@ import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
*
*/
public class DependencyContext {
List<Dependency> dependencies = new LinkedList<Dependency>();
List<Repository> repositories = new LinkedList<Repository>();
List<Dependency> dependencies = new LinkedList<>();
List<Repository> repositories = new LinkedList<>();
List<File> files = new LinkedList<File>();
List<File> filesDist = new LinkedList<File>();
List<File> files = new LinkedList<>();
List<File> filesDist = new LinkedList<>();
private RepositorySystem system = Booter.newRepositorySystem();
private RepositorySystemSession session;
private RemoteRepository mavenCentral = Booter.newCentralRepository();
@ -73,11 +73,11 @@ public class DependencyContext {
}
public void reset() {
dependencies = new LinkedList<Dependency>();
repositories = new LinkedList<Repository>();
dependencies = new LinkedList<>();
repositories = new LinkedList<>();
files = new LinkedList<File>();
filesDist = new LinkedList<File>();
files = new LinkedList<>();
filesDist = new LinkedList<>();
}

View file

@ -70,7 +70,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
throws RepositoryException, IOException {
if (StringUtils.isBlank(artifact)) {
// Skip dependency loading if artifact is empty
return new LinkedList<File>();
return new LinkedList<>();
}
// <groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
@ -78,7 +78,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
if (numSplits >= 3 && numSplits <= 6) {
return loadFromMvn(artifact, excludes);
} else {
LinkedList<File> libs = new LinkedList<File>();
LinkedList<File> libs = new LinkedList<>();
libs.add(new File(artifact));
return libs;
}
@ -90,7 +90,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
public List<File> load(String artifact, Collection<String> excludes, File destPath)
throws RepositoryException, IOException {
List<File> libs = new LinkedList<File>();
List<File> libs = new LinkedList<>();
if (StringUtils.isNotBlank(artifact)) {
libs = load(artifact, excludes);
@ -123,7 +123,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
private List<File> loadFromMvn(String artifact, Collection<String> excludes)
throws RepositoryException {
Collection<String> allExclusions = new LinkedList<String>();
Collection<String> allExclusions = new LinkedList<>();
allExclusions.addAll(excludes);
allExclusions.addAll(Arrays.asList(exclusions));
@ -142,7 +142,7 @@ public class DependencyResolver extends AbstractDependencyResolver {
}
}
List<File> files = new LinkedList<File>();
List<File> files = new LinkedList<>();
for (ArtifactResult artifactResult : listOfArtifact) {
files.add(artifactResult.getArtifact().getFile());
logger.info("load {}", artifactResult.getArtifact().getFile().getAbsolutePath());

View file

@ -37,7 +37,7 @@ public class TransferListener extends AbstractTransferListener {
Logger logger = LoggerFactory.getLogger(TransferListener.class);
private PrintStream out;
private Map<TransferResource, Long> downloads = new ConcurrentHashMap<TransferResource, Long>();
private Map<TransferResource, Long> downloads = new ConcurrentHashMap<>();
private int lastLength;

View file

@ -38,8 +38,7 @@ public class AngularObject<T> {
private T object;
private transient AngularObjectListener listener;
private transient List<AngularObjectWatcher> watchers
= new LinkedList<AngularObjectWatcher>();
private transient List<AngularObjectWatcher> watchers = new LinkedList<>();
private String noteId; // noteId belonging to. null for global scope
private String paragraphId; // paragraphId belongs to. null for notebook scope
@ -175,7 +174,7 @@ public class AngularObject<T> {
}
final Logger logger = LoggerFactory.getLogger(AngularObject.class);
List<AngularObjectWatcher> ws = new LinkedList<AngularObjectWatcher>();
List<AngularObjectWatcher> ws = new LinkedList<>();
synchronized (watchers) {
ws.addAll(watchers);
}

View file

@ -32,8 +32,7 @@ import java.util.Map;
* - Global scope : Shared to all notebook that uses the same interpreter group
*/
public class AngularObjectRegistry {
Map<String, Map<String, AngularObject>> registry =
new HashMap<String, Map<String, AngularObject>>();
Map<String, Map<String, AngularObject>> registry = new HashMap<>();
private final String GLOBAL_KEY = "_GLOBAL_";
private AngularObjectRegistryListener listener;
private String interpreterId;
@ -209,7 +208,7 @@ public class AngularObjectRegistry {
* @return all angularobject in the scope
*/
public List<AngularObject> getAll(String noteId, String paragraphId) {
List<AngularObject> all = new LinkedList<AngularObject>();
List<AngularObject> all = new LinkedList<>();
synchronized (registry) {
Map<String, AngularObject> r = getRegistryForKey(noteId, paragraphId);
if (r != null) {
@ -228,7 +227,7 @@ public class AngularObjectRegistry {
* @return
*/
public List<AngularObject> getAllWithGlobal(String noteId) {
List<AngularObject> all = new LinkedList<AngularObject>();
List<AngularObject> all = new LinkedList<>();
synchronized (registry) {
Map<String, AngularObject> global = getRegistryForKey(null, null);
if (global != null) {

View file

@ -32,8 +32,8 @@ import org.apache.zeppelin.display.Input.ParamOption;
*/
public class GUI implements Serializable {
Map<String, Object> params = new HashMap<String, Object>(); // form parameters from client
Map<String, Input> forms = new TreeMap<String, Input>(); // form configuration
Map<String, Object> params = new HashMap<>(); // form parameters from client
Map<String, Input> forms = new TreeMap<>(); // form configuration
public GUI() {
@ -86,7 +86,7 @@ public class GUI implements Serializable {
checked = defaultChecked;
}
forms.put(id, new Input(id, defaultChecked, "checkbox", options));
Collection<Object> filtered = new LinkedList<Object>();
Collection<Object> filtered = new LinkedList<>();
for (Object o : checked) {
if (isValidOption(o, options)) {
filtered.add(o);
@ -105,6 +105,6 @@ public class GUI implements Serializable {
}
public void clear() {
this.forms = new TreeMap<String, Input>();
this.forms = new TreeMap<>();
}
}

View file

@ -292,7 +292,7 @@ public class Input implements Serializable {
}
public static Map<String, Input> extractSimpleQueryParam(String script) {
Map<String, Input> params = new HashMap<String, Input>();
Map<String, Input> params = new HashMap<>();
if (script == null) {
return params;
}
@ -331,7 +331,7 @@ public class Input implements Serializable {
}
Collection<Object> checked = value instanceof Collection ? (Collection<Object>) value
: Arrays.asList((Object[]) value);
List<Object> validChecked = new LinkedList<Object>();
List<Object> validChecked = new LinkedList<>();
for (Object o : checked) { // filter out obsolete checked values
for (ParamOption option : input.getOptions()) {
if (option.getValue().equals(o)) {
@ -387,14 +387,14 @@ public class Input implements Serializable {
public static String[] split(String str, String escapeSeq, char escapeChar, String[] blockStart,
String[] blockEnd, String[] splitters, boolean includeSplitter) {
List<String> splits = new ArrayList<String>();
List<String> splits = new ArrayList<>();
String curString = "";
boolean escape = false; // true when escape char is found
int lastEscapeOffset = -1;
int blockStartPos = -1;
List<Integer> blockStack = new LinkedList<Integer>();
List<Integer> blockStack = new LinkedList<>();
for (int i = 0; i < str.length(); i++) {
char c = str.charAt(i);

View file

@ -200,7 +200,7 @@ public class ApplicationLoader {
}
// Create Application classloader
List<URL> urlList = new LinkedList<URL>();
List<URL> urlList = new LinkedList<>();
// load artifact
if (packageInfo.getArtifact() != null) {

View file

@ -319,6 +319,7 @@ public abstract class Interpreter {
private Map<String, InterpreterProperty> properties;
private Map<String, Object> editor;
private String path;
private InterpreterOption option;
public RegisteredInterpreter(String name, String group, String className,
Map<String, InterpreterProperty> properties) {
@ -376,6 +377,9 @@ public abstract class Interpreter {
return getGroup() + "." + getName();
}
public InterpreterOption getOption() {
return option;
}
}
/**

View file

@ -30,8 +30,7 @@ import org.apache.zeppelin.resource.ResourcePool;
* Interpreter context
*/
public class InterpreterContext {
private static final ThreadLocal<InterpreterContext> threadIC =
new ThreadLocal<InterpreterContext>();
private static final ThreadLocal<InterpreterContext> threadIC = new ThreadLocal<>();
public final InterpreterOutput out;

View file

@ -55,7 +55,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
// List<Interpreter>>();
private static final Map<String, InterpreterGroup> allInterpreterGroups =
new ConcurrentHashMap<String, InterpreterGroup>();
new ConcurrentHashMap<>();
public static InterpreterGroup getByInterpreterGroupId(String id) {
return allInterpreterGroups.get(id);
@ -147,7 +147,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
*/
public void close() {
LOGGER.info("Close interpreter group " + getId());
List<Interpreter> intpToClose = new LinkedList<Interpreter>();
List<Interpreter> intpToClose = new LinkedList<>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToClose.addAll(intpGroupForNote);
}
@ -168,7 +168,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
if (intpToClose == null) {
return;
}
List<Thread> closeThreads = new LinkedList<Thread>();
List<Thread> closeThreads = new LinkedList<>();
for (final Interpreter intp : intpToClose) {
Thread t = new Thread() {
@ -219,7 +219,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
*/
public void destroy() {
LOGGER.info("Destroy interpreter group " + getId());
List<Interpreter> intpToDestroy = new LinkedList<Interpreter>();
List<Interpreter> intpToDestroy = new LinkedList<>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToDestroy.addAll(intpGroupForNote);
}
@ -241,7 +241,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
return;
}
List<Thread> destroyThreads = new LinkedList<Thread>();
List<Thread> destroyThreads = new LinkedList<>();
for (final Interpreter intp : intpToDestroy) {
Thread t = new Thread() {

View file

@ -29,8 +29,7 @@ import java.util.Map;
public class InterpreterHookRegistry {
public static final String GLOBAL_KEY = "_GLOBAL_";
private String interpreterId;
private Map<String, Map<String, Map<String, String>>> registry =
new HashMap<String, Map<String, Map<String, String>>>();
private Map<String, Map<String, Map<String, String>>> registry = new HashMap<>();
/**
* hookRegistry constructor.

View file

@ -17,8 +17,6 @@
package org.apache.zeppelin.interpreter;
import com.google.common.base.Preconditions;
import java.util.List;
/**
@ -86,9 +84,12 @@ public class InterpreterOption {
}
public InterpreterOption(boolean remote, String perUser, String perNote) {
Preconditions.checkNotNull(remote);
Preconditions.checkNotNull(perUser);
Preconditions.checkNotNull(perNote);
if (perUser == null) {
throw new NullPointerException("perUser can not be null.");
}
if (perNote == null) {
throw new NullPointerException("perNote can not be null.");
}
this.remote = remote;
this.perUser = perUser;

View file

@ -39,7 +39,7 @@ public class InterpreterOutput extends OutputStream {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private final List<Object> outList = new LinkedList<Object>();
private final List<Object> outList = new LinkedList<>();
private InterpreterOutputChangeWatcher watcher;
private final InterpreterOutputListener flushListener;
private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
@ -185,7 +185,7 @@ public class InterpreterOutput extends OutputStream {
public byte[] toByteArray() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
List<Object> all = new LinkedList<Object>();
List<Object> all = new LinkedList<>();
synchronized (outList) {
all.addAll(outList);
@ -213,11 +213,17 @@ public class InterpreterOutput extends OutputStream {
return out.toByteArray();
}
private boolean typeShouldBeDetected() {
return getType() == InterpreterResult.Type.TABLE ? false : true;
}
public void flush() throws IOException {
synchronized (outList) {
buffer.flush();
byte[] bytes = buffer.toByteArray();
bytes = detectTypeFromLine(bytes);
if (typeShouldBeDetected()) {
bytes = detectTypeFromLine(bytes);
}
if (bytes != null) {
outList.add(bytes);
if (type == InterpreterResult.Type.TEXT) {

View file

@ -44,8 +44,8 @@ public class InterpreterOutputChangeWatcher extends Thread {
Logger logger = LoggerFactory.getLogger(InterpreterOutputChangeWatcher.class);
private WatchService watcher;
private final List<File> watchFiles = new LinkedList<File>();
private final Map<WatchKey, File> watchKeys = new HashMap<WatchKey, File>();
private final List<File> watchFiles = new LinkedList<>();
private final Map<WatchKey, File> watchKeys = new HashMap<>();
private InterpreterOutputChangeListener listener;
private boolean stop;

View file

@ -24,7 +24,7 @@ import java.util.Map;
* InterpreterPropertyBuilder
*/
public class InterpreterPropertyBuilder {
Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>();
Map<String, InterpreterProperty> properties = new HashMap<>();
public InterpreterPropertyBuilder add(String name, String defaultValue, String description){
properties.put(name, new InterpreterProperty(defaultValue, description));

View file

@ -119,7 +119,7 @@ public class InterpreterResult implements Serializable {
private TreeMap<Integer, Type> buildIndexMap(String msg) {
int lastIndexOftypes = 0;
TreeMap<Integer, Type> typesLastIndexInMsg = new TreeMap<Integer, Type>();
TreeMap<Integer, Type> typesLastIndexInMsg = new TreeMap<>();
Type[] types = Type.values();
for (Type t : types) {
lastIndexOftypes = getIndexOfType(msg, t);

View file

@ -105,7 +105,7 @@ public class DevInterpreter extends Interpreter {
@Override
public List<InterpreterCompletion> completion(String buf, int cursor) {
return new LinkedList<InterpreterCompletion>();
return new LinkedList<>();
}
public InterpreterContext getLastInterpretContext() {

View file

@ -37,7 +37,7 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
public class ClientFactory extends BasePooledObjectFactory<Client>{
private String host;
private int port;
Map<Client, TSocket> clientSocketMap = new HashMap<Client, TSocket>();
Map<Client, TSocket> clientSocketMap = new HashMap<>();
public ClientFactory(String host, int port) {
this.host = host;
@ -64,7 +64,7 @@ public class ClientFactory extends BasePooledObjectFactory<Client>{
@Override
public PooledObject<Client> wrap(Client client) {
return new DefaultPooledObject<Client>(client);
return new DefaultPooledObject<>(client);
}
@Override

View file

@ -35,7 +35,7 @@ public class InterpreterContextRunnerPool {
private Map<String, List<InterpreterContextRunner>> interpreterContextRunners;
public InterpreterContextRunnerPool() {
interpreterContextRunners = new HashMap<String, List<InterpreterContextRunner>>();
interpreterContextRunners = new HashMap<>();
}

View file

@ -155,7 +155,7 @@ public class RemoteInterpreter extends Interpreter {
}
private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
Map<String, String> env = new HashMap<String, String>();
Map<String, String> env = new HashMap<>();
for (Object key : property.keySet()) {
if (isEnvString((String) key)) {
env.put((String) key, property.getProperty((String) key));

View file

@ -43,9 +43,9 @@ import java.util.Map;
*/
public class RemoteInterpreterEventClient implements ResourcePoolConnector {
private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEvent.class);
private final List<RemoteInterpreterEvent> eventQueue = new LinkedList<RemoteInterpreterEvent>();
private final List<ResourceSet> getAllResourceResponse = new LinkedList<ResourceSet>();
private final Map<ResourceId, Object> getResourceResponse = new HashMap<ResourceId, Object>();
private final List<RemoteInterpreterEvent> eventQueue = new LinkedList<>();
private final List<ResourceSet> getAllResourceResponse = new LinkedList<>();
private final Map<ResourceId, Object> getResourceResponse = new HashMap<>();
private final Gson gson = new Gson();
/**
@ -79,7 +79,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
* notify angularObject removal
*/
public void angularObjectRemove(String name, String noteId, String paragraphId) {
Map<String, String> removeObject = new HashMap<String, String>();
Map<String, String> removeObject = new HashMap<>();
removeObject.put("name", name);
removeObject.put("noteId", noteId);
removeObject.put("paragraphId", paragraphId);
@ -213,7 +213,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
}
public void onInterpreterOutputAppend(String noteId, String paragraphId, String output) {
Map<String, String> appendOutput = new HashMap<String, String>();
Map<String, String> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("data", output);
@ -224,7 +224,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
}
public void onInterpreterOutputUpdate(String noteId, String paragraphId, String output) {
Map<String, String> appendOutput = new HashMap<String, String>();
Map<String, String> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("data", output);
@ -243,7 +243,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
}
public void onAppOutputAppend(String noteId, String paragraphId, String appId, String output) {
Map<String, String> appendOutput = new HashMap<String, String>();
Map<String, String> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("appId", appId);
@ -256,7 +256,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
public void onAppOutputUpdate(String noteId, String paragraphId, String appId, String output) {
Map<String, String> appendOutput = new HashMap<String, String>();
Map<String, String> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("appId", appId);
@ -268,7 +268,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
}
public void onAppStatusUpdate(String noteId, String paragraphId, String appId, String status) {
Map<String, String> appendOutput = new HashMap<String, String>();
Map<String, String> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("appId", appId);

View file

@ -211,7 +211,7 @@ public class RemoteInterpreterEventPoller extends Thread {
boolean broken = false;
try {
client = interpreterProcess.getClient();
List<String> resourceList = new LinkedList<String>();
List<String> resourceList = new LinkedList<>();
Gson gson = new Gson();
for (Resource r : resourceSet) {
resourceList.add(gson.toJson(r));

Some files were not shown because too many files have changed in this diff Show more