Merge pull request #6 from apache/master

Merging
This commit is contained in:
swakrish 2016-01-21 13:58:52 -08:00
commit 042d9afe31
71 changed files with 2446 additions and 1165 deletions

View file

@ -39,6 +39,29 @@ You can also use this small bookmarklet tool to fill your Pull Request fields au
javascript:(function() {var e = document.getElementById('pull_request_body');if (e) {e.value += '### What is this PR for?\nA few sentences describing the overall goals of the pull request\'s commits.\n\n### What type of PR is it?\n[Bug Fix | Improvement | Feature | Documentation | Hot Fix | Refactoring]\n\n### Todos\n* [ ] - Task\n\n### Is there a relevant Jira issue?\n\n### How should this be tested?\nOutline the steps to test the PR here.\n\n### Screenshots (if appropriate)\n\n### Questions:\n* Does the licenses files need update?\n* Is there breaking changes for older versions?\n* Does this needs documentation?';}})();
```
## Testing a Pull Request
You can also test and review a particular Pull Request. Here are two useful ways.
* Using a utility provided from Zeppelin.
```
dev/test_zeppelin_pr.py [# of PR]
```
For example, if you want to test `#513`, then the command will be:
```
dev/test_zeppelin_pr.py 513
```
* Another way is using [github/hub](https://github.com/github/hub).
```
hub checkout https://github.com/apache/incubator-zeppelin/pull/[# of PR]
```
The above two methods will help you test and review Pull Requests.
## Source Control Workflow
Zeppelin follows [Fork & Pull] (https://github.com/sevntu-checkstyle/sevntu.checkstyle/wiki/Development-workflow-with-Git:-Fork,-Branching,-Commits,-and-Pull-Request) model.
@ -53,7 +76,6 @@ When a Pull Request is submitted, it is being merged or rejected by following re
* Committer can initiate lazy consensus ("Merge if there is no more discussion") and the code can be merged after certain time (normally 24 hours) when there is no review exists.
* Contributor can ping reviewers (including committer) by commenting 'Ready to review' or suitable indication.
## Becoming a Committer
The PPMC adds new committers from the active contributors, based on their contribution to Zeppelin. The qualifications for new committers include:

File diff suppressed because it is too large Load diff

View file

@ -6,13 +6,10 @@ group: manual
---
{% include JB/setup %}
## Elasticsearch Interpreter for Apache Zeppelin
[Elasticsearch](https://www.elastic.co/products/elasticsearch) is a highly scalable open-source full-text search and analytics engine. It allows you to store, search, and analyze big volumes of data quickly and in near real time. It is generally used as the underlying engine/technology that powers applications that have complex search features and requirements.
<br />
## 1. Configuration
## Configuration
<table class="table-configuration">
<tr>
<th>Property</th>
@ -45,24 +42,20 @@ group: manual
![Interpreter configuration](../assets/themes/zeppelin/img/docs-img/elasticsearch-config.png)
</center>
> **Note #1 :** You can add more properties to configure the Elasticsearch client.
> **Note #2 :** If you use Shield, you can add a property named `shield.user` with a value containing the name and the password ( format: `username:password` ). For more details about Shield configuration, consult the [Shield reference guide](https://www.elastic.co/guide/en/shield/current/_using_elasticsearch_java_clients_with_shield.html). Do not forget, to copy the shield client jar in the interpreter directory (`ZEPPELIN_HOME/interpreters/elasticsearch`).
<br />
## 2. Enabling the Elasticsearch Interpreter
## Enabling the Elasticsearch Interpreter
In a notebook, to enable the **Elasticsearch** interpreter, click the **Gear** icon and select **Elasticsearch**.
<br />
## 3. Using the Elasticsearch Interpreter
## Using the Elasticsearch Interpreter
In a paragraph, use `%elasticsearch` to select the Elasticsearch interpreter and then input all commands. To get the list of available commands, use `help`.
```bash
| %elasticsearch
| help
%elasticsearch
help
Elasticsearch interpreter:
General format: <command> /<indices>/<types>/<id> <option> <JSON>
- indices: list of indices separated by commas (depends on the command)
@ -84,19 +77,17 @@ Commands:
> **Tip :** Use ( Ctrl + . ) for autocompletion.
### Get
With the `get` command, you can find a document by id. The result is a JSON document.
```bash
| %elasticsearch
| get /index/type/id
%elasticsearch
get /index/type/id
```
Example:
![Elasticsearch - Get](../assets/themes/zeppelin/img/docs-img/elasticsearch-get.png)
### Search
With the `search` command, you can send a search query to Elasticsearch. There are two formats of query:
@ -106,53 +97,50 @@ With the `search` command, you can send a search query to Elasticsearch. There a
* This is a shortcut to a query like that: `{ "query": { "query_string": { "query": "__HERE YOUR QUERY__", "analyze_wildcard": true } } }`
* See [Elasticsearch query string syntax](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax) for more details about the content of such a query.
```bash
| %elasticsearch
| search /index1,index2,.../type1,type2,... <JSON document containing the query or query_string elements>
%elasticsearch
search /index1,index2,.../type1,type2,... <JSON document containing the query or query_string elements>
```
If you want to modify the size of the result set, you can add a line that is setting the size, before your search command.
```bash
| %elasticsearch
| size 50
| search /index1,index2,.../type1,type2,... <JSON document containing the query or query_string elements>
%elasticsearch
size 50
search /index1,index2,.../type1,type2,... <JSON document containing the query or query_string elements>
```
> A search query can also contain [aggregations](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html). If there is at least one aggregation, the result of the first aggregation is shown, otherwise, you get the search hits.
Examples:
* With a JSON query:
```bash
| %elasticsearch
| search / { "query": { "match_all": { } } }
|
| %elasticsearch
| search /logs { "query": { "query_string": { "query": "request.method:GET AND status:200" } } }
|
| %elasticsearch
| search /logs { "aggs": {
| "content_length_stats": {
| "extended_stats": {
| "field": "content_length"
| }
| }
| } }
%elasticsearch
search / { "query": { "match_all": { } } }
%elasticsearch
search /logs { "query": { "query_string": { "query": "request.method:GET AND status:200" } } }
%elasticsearch
search /logs { "aggs": {
"content_length_stats": {
"extended_stats": {
"field": "content_length"
}
}
} }
```
* With query_string elements:
```bash
| %elasticsearch
| search /logs request.method:GET AND status:200
|
| %elasticsearch
| search /logs (404 AND (POST OR DELETE))
%elasticsearch
search /logs request.method:GET AND status:200
%elasticsearch
search /logs (404 AND (POST OR DELETE))
```
> **Important** : a document in Elasticsearch is a JSON document, so it is hierarchical, not flat as a row in a SQL table.
@ -199,13 +187,12 @@ Examples:
* With a query containing a multi-bucket aggregation:
![Elasticsearch - Search with aggregation (multi-bucket)](../assets/themes/zeppelin/img/docs-img/elasticsearch-agg-multi-bucket-pie.png)
### Count
With the `count` command, you can count documents available in some indices and types. You can also provide a query.
```bash
| %elasticsearch
| count /index1,index2,.../type1,type2,... <JSON document containing the query OR a query string>
%elasticsearch
count /index1,index2,.../type1,type2,... <JSON document containing the query OR a query string>
```
Examples:
@ -216,34 +203,30 @@ Examples:
* With a query:
![Elasticsearch - Count with query](../assets/themes/zeppelin/img/docs-img/elasticsearch-count-with-query.png)
### Index
With the `index` command, you can insert/update a document in Elasticsearch.
```bash
| %elasticsearch
| index /index/type/id <JSON document>
|
| %elasticsearch
| index /index/type <JSON document>
%elasticsearch
index /index/type/id <JSON document>
%elasticsearch
index /index/type <JSON document>
```
### Delete
With the `delete` command, you can delete a document.
```bash
| %elasticsearch
| delete /index/type/id
%elasticsearch
delete /index/type/id
```
### Apply Zeppelin Dynamic Forms
You can leverage [Zeppelin Dynamic Form]({{BASE_PATH}}/manual/dynamicform.html) inside your queries. You can use both the `text input` and `select form` parameterization features.
```bash
| %elasticsearch
| size ${limit=10}
| search /index/type { "query": { "match_all": { } } }
%elasticsearch
size ${limit=10}
search /index/type { "query": { "match_all": { } } }
```

View file

@ -6,11 +6,9 @@ group: manual
---
{% include JB/setup %}
## Flink interpreter for Apache Zeppelin
[Apache Flink](https://flink.apache.org) is an open source platform for distributed stream and batch data processing. Flinks core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.
<br>
## How to start local Flink cluster, to test the interpreter
Zeppelin comes with pre-configured flink-local interpreter, which starts Flink in a local mode on your machine, so you do not need to install anything.
@ -38,10 +36,8 @@ At the "Interpreters" menu, you have to create a new Flink interpreter and provi
For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html).
## How to test it's working
In example, by using the [Zeppelin notebook](https://www.zeppelinhub.com/viewer/notebooks/aHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL05GTGFicy96ZXBwZWxpbi1ub3RlYm9va3MvbWFzdGVyL25vdGVib29rcy8yQVFFREs1UEMvbm90ZS5qc29u) is from Till Rohrmann's presentation [Interactive data analysis with Apache Flink](http://www.slideshare.net/tillrohrmann/data-analysis-49806564) for Apache Flink Meetup.
```
%sh
rm 10.txt.utf-8

View file

@ -6,10 +6,7 @@ group: manual
---
{% include JB/setup %}
## Geode/Gemfire OQL Interpreter for Apache Zeppelin
<br/>
<table class="table-configuration">
<tr>
<th>Name</th>
@ -23,7 +20,6 @@ group: manual
</tr>
</table>
<br/>
This interpreter supports the [Geode](http://geode.incubator.apache.org/) [Object Query Language (OQL)](http://geode-docs.cfapps.io/docs/developing/querying_basics/oql_compared_to_sql.html). With the OQL-based querying language:
[<img align="right" src="http://img.youtube.com/vi/zvzzA9GXu3Q/3.jpg" alt="zeppelin-view" hspace="10" width="200"></img>](https://www.youtube.com/watch?v=zvzzA9GXu3Q)
@ -38,7 +34,6 @@ This interpreter supports the [Geode](http://geode.incubator.apache.org/) [Objec
This [Video Tutorial](https://www.youtube.com/watch?v=zvzzA9GXu3Q) illustrates some of the features provided by the `Geode Interpreter`.
### Create Interpreter
By default Zeppelin creates one `Geode/OQL` instance. You can remove it or create more instances.
Multiple Geode instances can be created, each configured to the same or different backend Geode cluster. But over time a `Notebook` can have only one Geode interpreter instance `bound`. That means you _cannot_ connect to different Geode clusters in the same `Notebook`. This is a known Zeppelin limitation.
@ -53,38 +48,35 @@ In the `Notebook` click on the `settings` icon in the top right corner. The sele
### Configuration
You can modify the configuration of the Geode from the `Interpreter` section. The Geode interpreter expresses the following properties:
<table class="table-configuration">
<tr>
<th>Property Name</th>
<th>Description</th>
<th>Default Value</th>
</tr>
<tr>
<td>geode.locator.host</td>
<td>The Geode Locator Host</td>
<td>localhost</td>
</tr>
<tr>
<td>geode.locator.port</td>
<td>The Geode Locator Port</td>
<td>10334</td>
</tr>
<tr>
<td>geode.max.result</td>
<td>Max number of OQL result to display to prevent the browser overload</td>
<td>1000</td>
</tr>
</table>
<table class="table-configuration">
<tr>
<th>Property Name</th>
<th>Description</th>
<th>Default Value</th>
</tr>
<tr>
<td>geode.locator.host</td>
<td>The Geode Locator Host</td>
<td>localhost</td>
</tr>
<tr>
<td>geode.locator.port</td>
<td>The Geode Locator Port</td>
<td>10334</td>
</tr>
<tr>
<td>geode.max.result</td>
<td>Max number of OQL result to display to prevent the browser overload</td>
<td>1000</td>
</tr>
</table>
### How to use
> *Tip 1: Use (CTRL + .) for OQL auto-completion.*
> *Tip 2: Always start the paragraphs with the full `%geode.oql` prefix tag! The short notation: `%geode` would still be able run the OQL queries but the syntax highlighting and the auto-completions will be disabled.*
#### Create / Destroy Regions
The OQL specification does not support [Geode Regions](https://cwiki.apache.org/confluence/display/GEODE/Index#Index-MainConceptsandComponents) mutation operations. To `create`/`destroy` regions one should use the [GFSH](http://geode-docs.cfapps.io/docs/tools_modules/gfsh/chapter_overview.html) shell tool instead. In the following it is assumed that the GFSH is colocated with Zeppelin server.
```bash
@ -105,9 +97,7 @@ EOF
Above snippet re-creates two regions: `regionEmployee` and `regionCompany`. Note that you have to explicitly specify the locator host and port. The values should match those you have used in the Geode Interpreter configuration. Comprehensive list of [GFSH Commands by Functional Area](http://geode-docs.cfapps.io/docs/tools_modules/gfsh/gfsh_quick_reference.html).
#### Basic OQL
#### Basic OQL
```sql
%geode.oql
SELECT count(*) FROM /regionEmployee
@ -144,12 +134,9 @@ Following query will return the EntrySet value as a Blob:
SELECT e.key, e.value FROM /regionEmployee.entrySet e
```
> Note: You can have multiple queries in the same paragraph but only the result from the first is displayed. [[1](https://issues.apache.org/jira/browse/ZEPPELIN-178)], [[2](https://issues.apache.org/jira/browse/ZEPPELIN-212)].
#### GFSH Commands From The Shell
Use the Shell Interpreter (`%sh`) to run OQL commands form the command line:
```bash
@ -159,7 +146,6 @@ gfsh -e "connect" -e "list members"
```
#### Apply Zeppelin Dynamic Forms
You can leverage [Zeppelin Dynamic Form](../manual/dynamicform.html) inside your OQL queries. You can use both the `text input` and `select form` parameterization features
```sql

View file

@ -6,12 +6,10 @@ group: manual
---
{% include JB/setup %}
## Hive Interpreter for Apache Zeppelin
The [Apache Hive](https://hive.apache.org/) ™ data warehouse software facilitates querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.
## 1. Configuration
### Configuration
<table class="table-configuration">
<tr>
<th>Property</th>
@ -71,9 +69,8 @@ The [Apache Hive](https://hive.apache.org/) ™ data warehouse software facilita
</table>
This interpreter provides multiple configuration with `${prefix}`. User can set a multiple connection properties by this prefix. It can be used like `%hive(${prefix})`.
## 2. How to use
## How to use
Basically, you can use
```sql
@ -92,7 +89,6 @@ select * from my_table;
You can also run multiple queries up to 10 by default. Changing these settings is not implemented yet.
### Apply Zeppelin Dynamic Forms
You can leverage [Zeppelin Dynamic Form]({{BASE_PATH}}/manual/dynamicform.html) inside your queries. You can use both the `text input` and `select form` parameterization features.
```sql

View file

@ -18,55 +18,54 @@ You can use Zeppelin to retrieve distributed data from cache using Ignite SQL in
### Installing and Running Ignite example
In order to use Ignite interpreters, you may install Apache Ignite in some simple steps:
1. Download Ignite [source release](https://ignite.apache.org/download.html#sources) or [binary release](https://ignite.apache.org/download.html#binaries) whatever you want. But you must download Ignite as the same version of Zeppelin's. If it is not, you can't use scala code on Zeppelin. You can find ignite version in Zepplin at the pom.xml which is placed under `path/to/your-Zeppelin/ignite/pom.xml` ( Of course, in Zeppelin source release ). Please check `ignite.version` .<br>Currently, Zeppelin provides ignite only in Zeppelin source release. So, if you download Zeppelin binary release( `zeppelin-0.5.0-incubating-bin-spark-xxx-hadoop-xx` ), you can not use ignite interpreter on Zeppelin. We are planning to include ignite in a future binary release.
2. Examples are shipped as a separate Maven project, so to start running you simply need to import provided <dest_dir>/apache-ignite-fabric-1.2.0-incubating-bin/pom.xml file into your favourite IDE, such as Eclipse.
1. Download Ignite [source release](https://ignite.apache.org/download.html#sources) or [binary release](https://ignite.apache.org/download.html#binaries) whatever you want. But you must download Ignite as the same version of Zeppelin's. If it is not, you can't use scala code on Zeppelin. You can find ignite version in Zepplin at the pom.xml which is placed under `path/to/your-Zeppelin/ignite/pom.xml` ( Of course, in Zeppelin source release ). Please check `ignite.version` .<br>Currently, Zeppelin provides ignite only in Zeppelin source release. So, if you download Zeppelin binary release( `zeppelin-0.5.0-incubating-bin-spark-xxx-hadoop-xx` ), you can not use ignite interpreter on Zeppelin. We are planning to include ignite in a future binary release.
2. Examples are shipped as a separate Maven project, so to start running you simply need to import provided <dest_dir>/apache-ignite-fabric-1.2.0-incubating-bin/pom.xml file into your favourite IDE, such as Eclipse.
* In case of Eclipse, Eclipse -> File -> Import -> Existing Maven Projects
* Set examples directory path to Eclipse and select the pom.xml.
* Then start `org.apache.ignite.examples.ExampleNodeStartup` (or whatever you want) to run at least one or more ignite node. When you run example code, you may notice that the number of node is increase one by one.
> **Tip. If you want to run Ignite examples on the cli not IDE, you can export executable Jar file from IDE. Then run it by using below command.**
```
$ nohup java -jar </path/to/your Jar file name>
```
### Configuring Ignite Interpreter
* In case of Eclipse, Eclipse -> File -> Import -> Existing Maven Projects
* Set examples directory path to Eclipse and select the pom.xml.
* Then start `org.apache.ignite.examples.ExampleNodeStartup` (or whatever you want) to run at least one or more ignite node. When you run example code, you may notice that the number of node is increase one by one.
> **Tip. If you want to run Ignite examples on the cli not IDE, you can export executable Jar file from IDE. Then run it by using below command.**
```
$ nohup java -jar </path/to/your Jar file name>
```
### Configuring Ignite Interpreter
At the "Interpreters" menu, you may edit Ignite interpreter or create new one. Zeppelin provides these properties for Ignite.
<table class="table-configuration">
<table class="table-configuration">
<tr>
<th>Property Name</th>
<th>value</th>
<th>Description</th>
<th>Property Name</th>
<th>value</th>
<th>Description</th>
</tr>
<tr>
<td>ignite.addresses</td>
<td>127.0.0.1:47500..47509</td>
<td>Coma separated list of Ignite cluster hosts. See [Ignite Cluster Configuration](https://apacheignite.readme.io/v1.2/docs/cluster-config) section for more details.</td>
<td>ignite.addresses</td>
<td>127.0.0.1:47500..47509</td>
<td>Coma separated list of Ignite cluster hosts. See [Ignite Cluster Configuration](https://apacheignite.readme.io/v1.2/docs/cluster-config) section for more details.</td>
</tr>
<tr>
<td>ignite.clientMode</td>
<td>true</td>
<td>You can connect to the Ignite cluster as client or server node. See [Ignite Clients vs. Servers](https://apacheignite.readme.io/v1.2/docs/clients-vs-servers) section for details. Use true or false values in order to connect in client or server mode respectively.</td>
<td>ignite.clientMode</td>
<td>true</td>
<td>You can connect to the Ignite cluster as client or server node. See [Ignite Clients vs. Servers](https://apacheignite.readme.io/v1.2/docs/clients-vs-servers) section for details. Use true or false values in order to connect in client or server mode respectively.</td>
</tr>
<tr>
<td>ignite.config.url</td>
<td></td>
<td>Configuration URL. Overrides all other settings.</td>
</tr
<tr>
<td>ignite.jdbc.url</td>
<td>jdbc:ignite:cfg://default-ignite-jdbc.xml</td>
<td>Ignite JDBC connection URL.</td>
</tr>
<tr>
<td>ignite.peerClassLoadingEnabled</td>
<td>true</td>
<td>Enables peer-class-loading. See [Zero Deployment](https://apacheignite.readme.io/v1.2/docs/zero-deployment) section for details. Use true or false values in order to enable or disable P2P class loading respectively.</td>
<td>ignite.config.url</td>
<td></td>
<td>Configuration URL. Overrides all other settings.</td>
</tr>
</table>
<tr>
<td>ignite.jdbc.url</td>
<td>jdbc:ignite:cfg://default-ignite-jdbc.xml</td>
<td>Ignite JDBC connection URL.</td>
</tr>
<tr>
<td>ignite.peerClassLoadingEnabled</td>
<td>true</td>
<td>Enables peer-class-loading. See [Zero Deployment](https://apacheignite.readme.io/v1.2/docs/zero-deployment) section for details. Use true or false values in order to enable or disable P2P class loading respectively.</td>
</tr>
</table>
![Configuration of Ignite Interpreter](../assets/themes/zeppelin/img/docs-img/ignite-interpreter-setting.png)
@ -82,35 +81,33 @@ In order to execute SQL query, use ` %ignite.ignitesql ` prefix. <br>
Supposing you are running `org.apache.ignite.examples.streaming.wordcount.StreamWords`, then you can use "words" cache( Of course you have to specify this cache name to the Ignite interpreter setting section `ignite.jdbc.url` of Zeppelin ).
For example, you can select top 10 words in the words cache using the following query
```
%ignite.ignitesql
select _val, count(_val) as cnt from String group by _val order by cnt desc limit 10
```
![IgniteSql on Zeppelin](../assets/themes/zeppelin/img/docs-img/ignite-sql-example.png)
```
%ignite.ignitesql
select _val, count(_val) as cnt from String group by _val order by cnt desc limit 10
```
![IgniteSql on Zeppelin](../assets/themes/zeppelin/img/docs-img/ignite-sql-example.png)
As long as your Ignite version and Zeppelin Ignite version is same, you can also use scala code. Please check the Zeppelin Ignite version before you download your own Ignite.
```
%ignite
import org.apache.ignite._
import org.apache.ignite.cache.affinity._
import org.apache.ignite.cache.query._
import org.apache.ignite.configuration._
```
%ignite
import org.apache.ignite._
import org.apache.ignite.cache.affinity._
import org.apache.ignite.cache.query._
import org.apache.ignite.configuration._
import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
val cache: IgniteCache[AffinityUuid, String] = ignite.cache("words")
val cache: IgniteCache[AffinityUuid, String] = ignite.cache("words")
val qry = new SqlFieldsQuery("select avg(cnt), min(cnt), max(cnt) from (select count(_val) as cnt from String group by _val)", true)
val qry = new SqlFieldsQuery("select avg(cnt), min(cnt), max(cnt) from (select count(_val) as cnt from String group by _val)", true)
val res = cache.query(qry).getAll()
val res = cache.query(qry).getAll()
collectionAsScalaIterable(res).foreach(println _)
```
![Using Scala Code](../assets/themes/zeppelin/img/docs-img/ignite-scala-example.png)
collectionAsScalaIterable(res).foreach(println _)
```
![Using Scala Code](../assets/themes/zeppelin/img/docs-img/ignite-scala-example.png)
Apache Ignite also provides a guide docs for Zeppelin ["Ignite with Apache Zeppelin"](https://apacheignite.readme.io/docs/data-analysis-with-apache-zeppelin)

View file

@ -16,69 +16,70 @@ group: manual
### Installing and Running Lens
In order to use Lens interpreters, you may install Apache Lens in some simple steps:
1. Download Lens for latest version from [the ASF](http://www.apache.org/dyn/closer.lua/lens/2.3-beta). Or the older release can be found [in the Archives](http://archive.apache.org/dist/lens/).
2. Before running Lens, you have to set HIVE_HOME and HADOOP_HOME. If you want to get more information about this, please refer to [here](http://lens.apache.org/lenshome/install-and-run.html#Installation). Lens also provides Pseudo Distributed mode. [Lens pseudo-distributed setup](http://lens.apache.org/lenshome/pseudo-distributed-setup.html) is done by using [docker](https://www.docker.com/). Hive server and hadoop daemons are run as separate processes in lens pseudo-distributed setup.
3. Now, you can start lens server (or stop).
```
./bin/lens-ctl start (or stop)
```
1. Download Lens for latest version from [the ASF](http://www.apache.org/dyn/closer.lua/lens/2.3-beta). Or the older release can be found [in the Archives](http://archive.apache.org/dist/lens/).
2. Before running Lens, you have to set HIVE_HOME and HADOOP_HOME. If you want to get more information about this, please refer to [here](http://lens.apache.org/lenshome/install-and-run.html#Installation). Lens also provides Pseudo Distributed mode. [Lens pseudo-distributed setup](http://lens.apache.org/lenshome/pseudo-distributed-setup.html) is done by using [docker](https://www.docker.com/). Hive server and hadoop daemons are run as separate processes in lens pseudo-distributed setup.
3. Now, you can start lens server (or stop).
```
./bin/lens-ctl start (or stop)
```
### Configuring Lens Interpreter
At the "Interpreters" menu, you can edit Lens interpreter or create new one. Zeppelin provides these properties for Lens.
<table class="table-configuration">
<table class="table-configuration">
<tr>
<th>Property Name</th>
<th>value</th>
<th>Description</th>
<th>Property Name</th>
<th>value</th>
<th>Description</th>
</tr>
<tr>
<td>lens.client.dbname</td>
<td>default</td>
<td>The database schema name</td>
<td>lens.client.dbname</td>
<td>default</td>
<td>The database schema name</td>
</tr>
<tr>
<td>lens.query.enable.persistent.resultset</td>
<td>false</td>
<td>Whether to enable persistent resultset for queries. When enabled, server will fetch results from driver, custom format them if any and store in a configured location. The file name of query output is queryhandle-id, with configured extensions</td>
<td>lens.query.enable.persistent.resultset</td>
<td>false</td>
<td>Whether to enable persistent resultset for queries. When enabled, server will fetch results from driver, custom format them if any and store in a configured location. The file name of query output is queryhandle-id, with configured extensions</td>
</tr>
<tr>
<td>lens.server.base.url</td>
<td>http://hostname:port/lensapi</td>
<td>The base url for the lens server. you have to edit "hostname" and "port" that you may use(ex. http://0.0.0.0:9999/lensapi)</td>
<td>lens.server.base.url</td>
<td>http://hostname:port/lensapi</td>
<td>The base url for the lens server. you have to edit "hostname" and "port" that you may use(ex. http://0.0.0.0:9999/lensapi)</td>
</tr>
<tr>
<td>lens.session.cluster.user </td>
<td>default</td>
<td>Hadoop cluster username</td>
<td>lens.session.cluster.user </td>
<td>default</td>
<td>Hadoop cluster username</td>
</tr>
<tr>
<td>zeppelin.lens.maxResult</td>
<td>1000</td>
<td>Max number of rows to display</td>
<td>zeppelin.lens.maxResult</td>
<td>1000</td>
<td>Max number of rows to display</td>
</tr>
<tr>
<td>zeppelin.lens.maxThreads</td>
<td>10</td>
<td>If concurrency is true then how many threads?</td>
<td>zeppelin.lens.maxThreads</td>
<td>10</td>
<td>If concurrency is true then how many threads?</td>
</tr>
<tr>
<td>zeppelin.lens.run.concurrent</td>
<td>true</td>
<td>Run concurrent Lens Sessions</td>
<td>zeppelin.lens.run.concurrent</td>
<td>true</td>
<td>Run concurrent Lens Sessions</td>
</tr>
<tr>
<td>xxx</td>
<td>yyy</td>
<td>anything else from [Configuring lens server](https://lens.apache.org/admin/config-server.html)</td>
<td>xxx</td>
<td>yyy</td>
<td>anything else from [Configuring lens server](https://lens.apache.org/admin/config-server.html)</td>
</tr>
</table>
</table>
![Apache Lens Interpreter Setting](../assets/themes/zeppelin/img/docs-img/lens-interpreter-setting.png)
### Interpreter Bindging for Zeppelin Notebook
After configuring Lens interpreter, create your own notebook, then you can bind interpreters like below image.
After configuring Lens interpreter, create your own notebook, then you can bind interpreters like below image.
![Zeppelin Notebook Interpreter Biding](../assets/themes/zeppelin/img/docs-img/lens-interpreter-binding.png)
For more interpreter binding information see [here](http://zeppelin.incubator.apache.org/docs/manual/interpreters.html).
@ -90,84 +91,79 @@ As you can see in this video, they are using Lens Client Shell(./bin/lens-cli.sh
<li> Create and Use(Switch) Databases.
```
create database newDb
```
```
use newDb
```
```
create database newDb
```
```
use newDb
```
<li> Create Storage.
```
create storage your/path/to/lens/client/examples/resources/db-storage.xml
```
```
create storage your/path/to/lens/client/examples/resources/db-storage.xml
```
<li> Create Dimensions, Show fields and join-chains of them.
```
create dimension your/path/to/lens/client/examples/resources/customer.xml
```
```
dimension show fields customer
```
```
dimension show joinchains customer
```
```
create dimension your/path/to/lens/client/examples/resources/customer.xml
```
```
dimension show fields customer
```
```
dimension show joinchains customer
```
<li> Create Caches, Show fields and join-chains of them.
```
create cube your/path/to/lens/client/examples/resources/sales-cube.xml
```
```
cube show fields sales
```
```
cube show joinchains sales
```
```
create cube your/path/to/lens/client/examples/resources/sales-cube.xml
```
```
cube show fields sales
```
```
cube show joinchains sales
```
<li> Create Dimtables and Fact.
```
create dimtable your/path/to/lens/client/examples/resources/customer_table.xml
```
```
create fact your/path/to/lens/client/examples/resources/sales-raw-fact.xml
```
```
create dimtable your/path/to/lens/client/examples/resources/customer_table.xml
```
```
create fact your/path/to/lens/client/examples/resources/sales-raw-fact.xml
```
<li> Add partitions to Dimtable and Fact.
```
dimtable add single-partition --dimtable_name customer_table --storage_name local --path your/path/to/lens/client/examples/resources/customer-local-part.xml
```
```
fact add partitions --fact_name sales_raw_fact --storage_name local --path your/path/to/lens/client/examples/resources/sales-raw-local-parts.xml
```
```
dimtable add single-partition --dimtable_name customer_table --storage_name local --path your/path/to/lens/client/examples/resources/customer-local-part.xml
```
```
fact add partitions --fact_name sales_raw_fact --storage_name local --path your/path/to/lens/client/examples/resources/sales-raw-local-parts.xml
```
<li> Now, you can run queries on cubes.
```
query execute cube select customer_city_name, product_details.description, product_details.category, product_details.color, store_sales from sales where time_range_in(delivery_time, '2015-04-11-00', '2015-04-13-00')
```
![Lens Query Result](../assets/themes/zeppelin/img/docs-img/lens-result.png)
```
query execute cube select customer_city_name, product_details.description, product_details.category, product_details.color, store_sales from sales where time_range_in(delivery_time, '2015-04-11-00', '2015-04-13-00')
```
![Lens Query Result](../assets/themes/zeppelin/img/docs-img/lens-result.png)
These are just examples that provided in advance by Lens. If you want to explore whole tutorials of Lens, see the [tutorial video](https://cwiki.apache.org/confluence/display/LENS/2015/07/13/20+Minute+video+demo+of+Apache+Lens+through+examples).
### Lens UI Service
Lens also provides web UI service. Once the server starts up, you can open the service on http://serverhost:19999/index.html and browse. You may also check the structure that you made and use query easily here.
![Lens UI Servive](../assets/themes/zeppelin/img/docs-img/lens-ui-service.png)
![Lens UI Servive](../assets/themes/zeppelin/img/docs-img/lens-ui-service.png)

View file

@ -6,10 +6,7 @@ group: manual
---
{% include JB/setup %}
## PostgreSQL, HAWQ Interpreter for Apache Zeppelin
<br/>
<table class="table-configuration">
<tr>
<th>Name</th>
@ -23,7 +20,6 @@ group: manual
</tr>
</table>
<br/>
[<img align="right" src="http://img.youtube.com/vi/wqXXQhJ5Uk8/0.jpg" alt="zeppelin-view" hspace="10" width="250"></img>](https://www.youtube.com/watch?v=wqXXQhJ5Uk8)
This interpreter seamlessly supports the following SQL data processing engines:
@ -32,11 +28,9 @@ This interpreter seamlessly supports the following SQL data processing engines:
* [Apache HAWQ](http://pivotal.io/big-data/pivotal-hawq) - Powerful [Open Source](https://wiki.apache.org/incubator/HAWQProposal) SQL-On-Hadoop engine.
* [Greenplum](http://pivotal.io/big-data/pivotal-greenplum-database) - MPP database built on open source PostgreSQL.
This [Video Tutorial](https://www.youtube.com/watch?v=wqXXQhJ5Uk8) illustrates some of the features provided by the `Postgresql Interpreter`.
### Create Interpreter
By default Zeppelin creates one `PSQL` instance. You can remove it or create new instances.
Multiple PSQL instances can be created, each configured to the same or different backend databases. But over time a `Notebook` can have only one PSQL interpreter instance `bound`. That means you _cannot_ connect to different databases in the same `Notebook`. This is a known Zeppelin limitation.
@ -51,47 +45,45 @@ In the `Notebook` click on the `settings` icon in the top right corner. The sele
### Configuration
You can modify the configuration of the PSQL from the `Interpreter` section. The PSQL interpreter expenses the following properties:
<table class="table-configuration">
<tr>
<th>Property Name</th>
<th>Description</th>
<th>Default Value</th>
</tr>
<tr>
<td>postgresql.url</td>
<td>JDBC URL to connect to </td>
<td>jdbc:postgresql://localhost:5432</td>
</tr>
<tr>
<td>postgresql.user</td>
<td>JDBC user name</td>
<td>gpadmin</td>
</tr>
<tr>
<td>postgresql.password</td>
<td>JDBC password</td>
<td></td>
</tr>
<tr>
<td>postgresql.driver.name</td>
<td>JDBC driver name. In this version the driver name is fixed and should not be changed</td>
<td>org.postgresql.Driver</td>
</tr>
<tr>
<td>postgresql.max.result</td>
<td>Max number of SQL result to display to prevent the browser overload</td>
<td>1000</td>
</tr>
</table>
<table class="table-configuration">
<tr>
<th>Property Name</th>
<th>Description</th>
<th>Default Value</th>
</tr>
<tr>
<td>postgresql.url</td>
<td>JDBC URL to connect to </td>
<td>jdbc:postgresql://localhost:5432</td>
</tr>
<tr>
<td>postgresql.user</td>
<td>JDBC user name</td>
<td>gpadmin</td>
</tr>
<tr>
<td>postgresql.password</td>
<td>JDBC password</td>
<td></td>
</tr>
<tr>
<td>postgresql.driver.name</td>
<td>JDBC driver name. In this version the driver name is fixed and should not be changed</td>
<td>org.postgresql.Driver</td>
</tr>
<tr>
<td>postgresql.max.result</td>
<td>Max number of SQL result to display to prevent the browser overload</td>
<td>1000</td>
</tr>
</table>
### How to use
```
Tip: Use (CTRL + .) for SQL auto-completion.
```
#### DDL and SQL commands
#### DDL and SQL commands
Start the paragraphs with the full `%psql.sql` prefix tag! The short notation: `%psql` would still be able run the queries but the syntax highlighting and the auto-completions will be disabled.
You can use the standard CREATE / DROP / INSERT commands to create or modify the data model:
@ -121,7 +113,6 @@ select * from mytable;
```
#### PSQL command line tools
Use the Shell Interpreter (`%sh`) to access the command line [PSQL](http://www.postgresql.org/docs/9.4/static/app-psql.html) interactively:
```bash
@ -131,6 +122,7 @@ psql -h phd3.localdomain -U gpadmin -p 5432 <<EOF
\q
EOF
```
This will produce output like this:
```
@ -146,7 +138,6 @@ This will produce output like this:
```
#### Apply Zeppelin Dynamic Forms
You can leverage [Zeppelin Dynamic Form](../manual/dynamicform.html) inside your queries. You can use both the `text input` and `select form` parametrization features
```sql
@ -157,8 +148,8 @@ GROUP BY ${group_by=product_id,product_id|product_name|customer_id|store_id}
ORDER BY count ${order=DESC,DESC|ASC}
LIMIT ${limit=10};
```
#### Example HAWQ PXF/HDFS Tables
#### Example HAWQ PXF/HDFS Tables
Create HAWQ external table that read data from tab-separated-value data in HDFS.
```sql
@ -168,11 +159,13 @@ CREATE EXTERNAL TABLE retail_demo.payment_methods_pxf (
payment_method_code character varying(20)
) LOCATION ('pxf://${NAME_NODE_HOST}:50070/retail_demo/payment_methods.tsv.gz?profile=HdfsTextSimple') FORMAT 'TEXT' (DELIMITER = E'\t');
```
And retrieve content
```sql
%psql.sql
select * from retail_demo.payment_methods_pxf
```
### Auto-completion
The PSQL Interpreter provides a basic auto-completion functionality. On `(Ctrl+.)` it list the most relevant suggestions in a pop-up window. In addition to the SQL keyword the interpreter provides suggestions for the Schema, Table, Column names as well.

View file

@ -6,7 +6,6 @@ group: manual
---
{% include JB/setup %}
## Scalding Interpreter for Apache Zeppelin
[Scalding](https://github.com/twitter/scalding) is an open source Scala library for writing MapReduce jobs.
@ -18,20 +17,20 @@ mvn clean package -Pscalding -DskipTests
```
### Enabling the Scalding Interpreter
In a notebook, to enable the **Scalding** interpreter, click on the **Gear** icon,select **Scalding**, and hit **Save**.
<center>
![Interpreter Binding](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png)
![Interpreter Selection](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png)
</center>
<center>
![Interpreter Binding](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png)
![Interpreter Selection](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png)
</center>
### Configuring the Interpreter
Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything.
### Testing the Interpreter
In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book.
```
@ -75,4 +74,4 @@ If you click on the icon for the pie chart, you should be able to see a chart li
### Current Status & Future Work
The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates.
The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!).
The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!).

View file

@ -8,7 +8,6 @@ group: manual
## Spark Interpreter for Apache Zeppelin
[Apache Spark](http://spark.apache.org) is supported in Zeppelin with
Spark Interpreter group, which consisted of 4 interpreters.
@ -40,14 +39,10 @@ Spark Interpreter group, which consisted of 4 interpreters.
</tr>
</table>
<br />
## Configuration
Without any configuration, Spark interpreter works out of box in local mode. But if you want to connect to your Spark cluster, you'll need to follow below two simple steps.
### 1. Export SPARK_HOME
In **conf/zeppelin-env.sh**, export `SPARK_HOME` environment variable with your Spark installation path.
for example
@ -64,7 +59,6 @@ export SPARK_SUBMIT_OPTIONS="--packages com.databricks:spark-csv_2.10:1.2.0"
```
### 2. Set master in Interpreter menu
After start Zeppelin, go to **Interpreter** menu and edit **master** property in your Spark interpreter setting. The value may vary depending on your Spark cluster deployment type.
for example,
@ -74,25 +68,21 @@ for example,
* **yarn-client** in Yarn client mode
* **mesos://host:5050** in Mesos cluster
That's it. Zeppelin will work with any version of Spark and any deployment type without rebuilding Zeppelin in this way. ( Zeppelin 0.5.5-incubating release works up to Spark 1.5.2 )
> Note that without exporting `SPARK_HOME`, it's running in local mode with included version of Spark. The included version may vary depending on the build profile.
<br />
## SparkContext, SQLContext, ZeppelinContext
SparkContext, SQLContext, ZeppelinContext are automatically created and exposed as variable names 'sc', 'sqlContext' and 'z', respectively, both in scala and python environments.
> Note that scala / python environment shares the same SparkContext, SQLContext, ZeppelinContext instance.
<br />
<a name="dependencyloading"> </a>
## Dependency Management
There are two ways to load external library in spark interpreter. First is using Zeppelin's `%dep` interpreter and second is loading Spark properties.
### 1. Dynamic Dependency Loading via %dep interpreter
When your code requires external library, instead of doing download/copy/restart Zeppelin, you can easily do following jobs using `%dep` interpreter.
* Load libraries recursively from Maven repository
@ -182,15 +172,13 @@ Here are few examples:
spark.jars.packages com.databricks:spark-csv_2.10:1.2.0
spark.files /path/mylib1.py,/path/mylib2.egg,/path/mylib3.zip
<br />
## ZeppelinContext
Zeppelin automatically injects ZeppelinContext as variable 'z' in your scala/python environment. ZeppelinContext provides some additional functions and utility.
### Object Exchange
ZeppelinContext extends map and it's shared between scala, python environment.
So you can put some object from scala and read it from python, vise versa.
<div class="codetabs">
<div data-lang="scala" markdown="1">
@ -212,6 +200,7 @@ myObject = z.get("objName")
</div>
</div>
### Form Creation
ZeppelinContext provides functions for creating forms.

View file

@ -19,31 +19,24 @@ limitations under the License.
-->
{% include JB/setup %}
## Interpreters in Zeppelin
In this section, we will explain about the role of interpreters, interpreters group and interpreter settings in Zeppelin.
The concept of Zeppelin interpreter allows any language/data-processing-backend to be plugged into Zeppelin.
Currently, Zeppelin supports many interpreters such as Scala ( with Apache Spark ), Python ( with Apache Spark ), SparkSQL, Hive, Markdown, Shell and so on.
<br/>
## What is Zeppelin interpreter?
Zeppelin Interpreter is a plug-in which enables Zeppelin users to use a specific language/data-processing-backend. For example, to use scala code in Zeppelin, you need `%spark` interpreter.
When you click the ```+Create``` button in the interpreter page, the interpreter drop-down list box will show all the available interpreters on your server.
<img src="/assets/themes/zeppelin/img/screenshots/interpreter_create.png">
<br/>
## What is Zeppelin Interpreter Setting?
Zeppelin interpreter setting is the configuration of a given interpreter on Zeppelin server. For example, the properties are required for hive JDBC interpreter to connect to the Hive server.
<img src="/assets/themes/zeppelin/img/screenshots/interpreter_setting.png">
<br/>
## What is Zeppelin Interpreter Group?
Every Interpreter is belonged to an **Interpreter Group**. Interpreter Group is a unit of start/stop interpreter.
By default, every interpreter is belonged to a single group, but the group might contain more interpreters. For example, spark interpreter group is including Spark support, pySpark,
SparkSQL and the dependency loader.
@ -53,9 +46,7 @@ Technically, Zeppelin interpreters from the same group are running in the same J
Each interpreters is belonged to a single group and registered together. All of their properties are listed in the interpreter setting like below image.
<img src="/assets/themes/zeppelin/img/screenshots/interpreter_setting_spark.png">
<br/>
## Programming Languages for Interpreter
If the interpreter uses a specific programming language ( like Scala, Python, SQL ), it is generally recommended to add a syntax highlighting supported for that to the notebook paragraph editor.
To check out the list of languages supported, see the `mode-*.js` files under `zeppelin-web/bower_components/ace-builds/src-noconflict` or from [github.com/ajaxorg/ace-builds](https://github.com/ajaxorg/ace-builds/tree/master/src-noconflict).
@ -65,5 +56,3 @@ If you want to add a new set of syntax highlighting,
1. Add the `mode-*.js` file to `zeppelin-web/bower.json` ( when built, `zeppelin-web/src/index.html` will be changed automatically. ).
2. Add to the list of `editorMode` in `zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js` - it follows the pattern 'ace/mode/x' where x is the name.
3. Add to the code that checks for `%` prefix and calls `session.setMode(editorMode.x)` in `setParagraphMode` located in `zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js`.

View file

@ -141,6 +141,10 @@ public class ElasticsearchInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
logger.info("Run Elasticsearch command '" + cmd + "'");
if (StringUtils.isEmpty(cmd) || StringUtils.isEmpty(cmd.trim())) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
}
int currentResultSize = resultSize;

View file

@ -198,4 +198,14 @@ public class ElasticsearchInterpreterTest {
assertEquals("11", res.message());
}
@Test
public void testMisc() {
InterpreterResult res = interpreter.interpret(null, null);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret(" \n \n ", null);
assertEquals(Code.SUCCESS, res.code());
}
}

View file

@ -40,7 +40,7 @@ public class FlinkInterpreterTest {
Properties p = new Properties();
flink = new FlinkInterpreter(p);
flink.open();
context = new InterpreterContext(null, null, null, null, null, null, null, null);
context = new InterpreterContext(null, null, null, null, null, null, null, null, null);
}
@AfterClass

View file

@ -79,9 +79,9 @@ public class HiveInterpreterTest {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();
assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message().contains("SCHEMA_NAME"));
assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME"));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
}
@Test
@ -101,7 +101,7 @@ public class HiveInterpreterTest {
t.open();
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
}
@Test
@ -117,13 +117,13 @@ public class HiveInterpreterTest {
t.open();
InterpreterResult interpreterResult =
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
t.getConnection("default").close();
interpreterResult =
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
}
@ -139,7 +139,7 @@ public class HiveInterpreterTest {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();
InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null);
InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null);
//simple select test
InterpreterResult result = t.interpret("select * from test_table", interpreterContext);

View file

@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";
private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null);
private IgniteInterpreter intp;
private Ignite ignite;

View file

@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";
private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null);
private Ignite ignite;
private IgniteSqlInterpreter intp;

View file

@ -94,7 +94,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
String sqlQuery = "select * from test_table";
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null));
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
@ -116,7 +116,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
String sqlQuery = "select * from test_table";
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null));
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());

View file

@ -65,7 +65,7 @@ public class ScaldingInterpreterTest {
context = new InterpreterContext("note", "id", "title", "text",
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
new LinkedList<InterpreterContextRunner>(), null);
}
@After

View file

@ -73,8 +73,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
private GatewayServer gatewayServer;
private DefaultExecutor executor;
private int port;
private ByteArrayOutputStream outputStream;
private ByteArrayOutputStream errStream;
private SparkOutputStream outputStream;
private BufferedWriter ins;
private PipedInputStream in;
private ByteArrayOutputStream input;
@ -173,7 +172,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
cmd.addArgument(Integer.toString(port), false);
cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
executor = new DefaultExecutor();
outputStream = new ByteArrayOutputStream();
outputStream = new SparkOutputStream();
PipedOutputStream ps = new PipedOutputStream();
in = null;
try {
@ -274,7 +273,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
statementError = error;
statementFinishedNotifier.notify();
}
}
boolean pythonScriptInitialized = false;
@ -287,6 +285,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
public void appendOutput(String message) throws IOException {
outputStream.getInterpreterOutput().write(message);
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
@ -300,7 +302,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
+ outputStream.toString());
}
outputStream.reset();
outputStream.setInterpreterOutput(context.out);
synchronized (pythonScriptInitializeNotifier) {
long startTime = System.currentTimeMillis();
@ -314,15 +316,24 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
String errorMessage = "";
try {
context.out.flush();
errorMessage = new String(context.out.toByteArray());
} catch (IOException e) {
throw new InterpreterException(e);
}
if (pythonscriptRunning == false) {
// python script failed to initialize and terminated
return new InterpreterResult(Code.ERROR, "failed to start pyspark"
+ outputStream.toString());
+ errorMessage);
}
if (pythonScriptInitialized == false) {
// timeout. didn't get initialized message
return new InterpreterResult(Code.ERROR, "pyspark is not responding "
+ outputStream.toString());
+ errorMessage);
}
if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
@ -352,7 +363,14 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
if (statementError) {
return new InterpreterResult(Code.ERROR, statementOutput);
} else {
return new InterpreterResult(Code.SUCCESS, statementOutput);
try {
context.out.flush();
} catch (IOException e) {
throw new InterpreterException(e);
}
return new InterpreterResult(Code.SUCCESS);
}
}
@ -389,8 +407,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
return new LinkedList<String>();
}
outputStream.reset();
pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "");
statementOutput = null;

View file

@ -17,9 +17,7 @@
package org.apache.zeppelin.spark;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@ -41,7 +39,6 @@ import org.apache.spark.repl.SparkJLineCompletion;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
@ -115,7 +112,7 @@ public class SparkInterpreter extends Interpreter {
private SparkILoop interpreter;
private SparkIMain intp;
private SparkContext sc;
private ByteArrayOutputStream out;
private SparkOutputStream out;
private SQLContext sqlc;
private SparkDependencyResolver dep;
private SparkJLineCompletion completor;
@ -129,7 +126,7 @@ public class SparkInterpreter extends Interpreter {
public SparkInterpreter(Properties property) {
super(property);
out = new ByteArrayOutputStream();
out = new SparkOutputStream();
}
public SparkInterpreter(Properties property, SparkContext sc) {
@ -452,10 +449,9 @@ public class SparkInterpreter extends Interpreter {
b.v_$eq(true);
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
PrintStream printStream = new PrintStream(out);
/* spark interpreter */
this.interpreter = new SparkILoop(null, new PrintWriter(out));
interpreter.settings_$eq(settings);
interpreter.createInterpreter();
@ -481,7 +477,7 @@ public class SparkInterpreter extends Interpreter {
dep = getDependencyResolver();
z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
z = new ZeppelinContext(sc, sqlc, null, dep,
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
@ -489,7 +485,6 @@ public class SparkInterpreter extends Interpreter {
binder.put("sc", sc);
binder.put("sqlc", sqlc);
binder.put("z", z);
binder.put("out", printStream);
intp.interpret("@transient val z = "
+ "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
@ -675,13 +670,13 @@ public class SparkInterpreter extends Interpreter {
synchronized (this) {
z.setGui(context.getGui());
sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
InterpreterResult r = interpretInput(lines);
InterpreterResult r = interpretInput(lines, context);
sc.clearJobGroup();
return r;
}
}
public InterpreterResult interpretInput(String[] lines) {
public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
SparkEnv.set(env);
// add print("") to make sure not finishing with comment
@ -692,8 +687,9 @@ public class SparkInterpreter extends Interpreter {
}
linesToRun[lines.length] = "print(\"\")";
Console.setOut((java.io.PrintStream) binder.get("out"));
out.reset();
Console.setOut(context.out);
out.setInterpreterOutput(context.out);
context.out.clear();
Code r = null;
String incomplete = "";
@ -713,6 +709,7 @@ public class SparkInterpreter extends Interpreter {
res = intp.interpret(incomplete + s);
} catch (Exception e) {
sc.clearJobGroup();
out.setInterpreterOutput(null);
logger.info("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
}
@ -721,7 +718,8 @@ public class SparkInterpreter extends Interpreter {
if (r == Code.ERROR) {
sc.clearJobGroup();
return new InterpreterResult(r, out.toString());
out.setInterpreterOutput(null);
return new InterpreterResult(r, "");
} else if (r == Code.INCOMPLETE) {
incomplete += s + "\n";
} else {
@ -730,9 +728,13 @@ public class SparkInterpreter extends Interpreter {
}
if (r == Code.INCOMPLETE) {
sc.clearJobGroup();
out.setInterpreterOutput(null);
return new InterpreterResult(r, "Incomplete expression");
} else {
return new InterpreterResult(r, out.toString());
sc.clearJobGroup();
out.setInterpreterOutput(null);
return new InterpreterResult(Code.SUCCESS);
}
}

View file

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import java.io.IOException;
import java.io.OutputStream;
/**
* InterpreterOutput can be attached / detached.
*/
public class SparkOutputStream extends OutputStream {
InterpreterOutput interpreterOutput;
public SparkOutputStream() {
}
public InterpreterOutput getInterpreterOutput() {
return interpreterOutput;
}
public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
this.interpreterOutput = interpreterOutput;
}
@Override
public void write(int b) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b);
}
}
@Override
public void write(byte [] b) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b);
}
}
@Override
public void write(byte [] b, int offset, int len) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b, offset, len);
}
}
@Override
public void close() throws IOException {
if (interpreterOutput != null) {
interpreterOutput.close();
}
}
@Override
public void flush() throws IOException {
if (interpreterOutput != null) {
interpreterOutput.flush();
}
}
}

View file

@ -21,6 +21,7 @@ import static scala.collection.JavaConversions.asJavaCollection;
import static scala.collection.JavaConversions.asJavaIterable;
import static scala.collection.JavaConversions.collectionAsScalaIterable;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@ -54,19 +55,17 @@ import scala.collection.Iterable;
*/
public class ZeppelinContext extends HashMap<String, Object> {
private SparkDependencyResolver dep;
private PrintStream out;
private InterpreterContext interpreterContext;
private int maxResult;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
SparkDependencyResolver dep, PrintStream printStream,
SparkDependencyResolver dep,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
this.out = printStream;
this.maxResult = maxResult;
}
@ -273,10 +272,15 @@ public class ZeppelinContext extends HashMap<String, Object> {
throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
}
if (cls.isInstance(o)) {
out.print(showDF(sc, interpreterContext, o, maxResult));
} else {
out.print(o.toString());
try {
if (cls.isInstance(o)) {
interpreterContext.out.write(showDF(sc, interpreterContext, o, maxResult));
} else {
interpreterContext.out.write(o.toString());
}
} catch (IOException e) {
throw new InterpreterException(e);
}
}

View file

@ -36,10 +36,7 @@ class Logger(object):
self.out = ""
def write(self, message):
self.out = self.out + message
def get(self):
return self.out
intp.appendOutput(message)
def reset(self):
self.out = ""
@ -224,7 +221,7 @@ while True :
sc.setJobGroup(jobGroup, "Zeppelin")
eval(compiledCode)
intp.setStatementsFinished(output.get(), False)
intp.setStatementsFinished("", False)
except Py4JJavaError:
excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
innerErrorStart = excInnerError.find("Py4JJavaError:")

View file

@ -60,7 +60,7 @@ public class DepInterpreterTest {
context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
new LinkedList<InterpreterContextRunner>(), null);
}
@After

View file

@ -28,10 +28,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.After;
import org.junit.Before;
@ -79,9 +76,21 @@ public class SparkInterpreterTest {
InterpreterGroup intpGroup = new InterpreterGroup();
context = new InterpreterContext("note", "id", "title", "text",
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}));
}
@After

View file

@ -25,10 +25,7 @@ import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.junit.After;
import org.junit.Before;
@ -69,7 +66,17 @@ public class SparkSqlInterpreterTest {
}
context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}));
}
@After

View file

@ -29,6 +29,7 @@ import org.apache.zeppelin.display.GUI;
public class InterpreterContext {
private static final ThreadLocal<InterpreterContext> threadIC =
new ThreadLocal<InterpreterContext>();
public final InterpreterOutput out;
public static InterpreterContext get() {
return threadIC.get();
@ -58,7 +59,8 @@ public class InterpreterContext {
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry,
List<InterpreterContextRunner> runners
List<InterpreterContextRunner> runners,
InterpreterOutput out
) {
this.noteId = noteId;
this.paragraphId = paragraphId;
@ -68,6 +70,7 @@ public class InterpreterContext {
this.gui = gui;
this.angularObjectRegistry = angularObjectRegistry;
this.runners = runners;
this.out = out;
}

View file

@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
/**
* InterpreterOutput is OutputStream that supposed to print content on notebook
* in addition to InterpreterResult which used to return from Interpreter.interpret().
*/
public class InterpreterOutput extends OutputStream {
Logger logger = LoggerFactory.getLogger(InterpreterOutput.class);
private final int NEW_LINE_CHAR = '\n';
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private final List<Object> outList = new LinkedList<Object>();
private InterpreterOutputChangeWatcher watcher;
private final InterpreterOutputListener flushListener;
private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
private boolean firstWrite = true;
public InterpreterOutput(InterpreterOutputListener flushListener) {
this.flushListener = flushListener;
clear();
}
public InterpreterOutput(InterpreterOutputListener flushListener,
InterpreterOutputChangeListener listener) throws IOException {
this.flushListener = flushListener;
clear();
watcher = new InterpreterOutputChangeWatcher(listener);
watcher.start();
}
public InterpreterResult.Type getType() {
return type;
}
public void setType(InterpreterResult.Type type) {
if (this.type != type) {
clear();
flushListener.onUpdate(this, new byte[]{});
this.type = type;
}
}
public void clear() {
synchronized (outList) {
type = InterpreterResult.Type.TEXT;
buffer.reset();
outList.clear();
if (watcher != null) {
watcher.clear();
}
}
}
@Override
public void write(int b) throws IOException {
synchronized (outList) {
buffer.write(b);
if (b == NEW_LINE_CHAR) {
// first time use of this outputstream.
if (firstWrite) {
// clear the output on gui
flushListener.onUpdate(this, new byte[]{});
firstWrite = false;
}
flush();
}
}
}
private byte [] detectTypeFromLine(byte [] byteArray) {
// check output type directive
String line = new String(byteArray);
for (InterpreterResult.Type t : InterpreterResult.Type.values()) {
String typeString = '%' + t.name().toLowerCase();
if ((typeString + "\n").equals(line)) {
setType(t);
byteArray = null;
break;
} else if (line.startsWith(typeString + " ")) {
setType(t);
byteArray = line.substring(typeString.length() + 1).getBytes();
break;
}
}
return byteArray;
}
@Override
public void write(byte [] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte [] b, int off, int len) throws IOException {
synchronized (outList) {
for (int i = off; i < len; i++) {
write(b[i]);
}
}
}
/**
* In dev mode, it monitors file and update ZeppelinServer
* @param file
* @throws IOException
*/
public void write(File file) throws IOException {
outList.add(file);
if (watcher != null) {
watcher.watch(file);
}
}
public void write(String string) throws IOException {
write(string.getBytes());
}
/**
* write contents in the resource file in the classpath
* @param url
* @throws IOException
*/
public void write(URL url) throws IOException {
if ("file".equals(url.getProtocol())) {
write(new File(url.getPath()));
} else {
outList.add(url);
}
}
public void writeResource(String resourceName) throws IOException {
// search file under resource dir first for dev mode
File mainResource = new File("./src/main/resources/" + resourceName);
File testResource = new File("./src/test/resources/" + resourceName);
if (mainResource.isFile()) {
write(mainResource);
} else if (testResource.isFile()) {
write(testResource);
} else {
// search from classpath
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = this.getClass().getClassLoader();
}
if (cl == null) {
cl = ClassLoader.getSystemClassLoader();
}
write(cl.getResource(resourceName));
}
}
public byte[] toByteArray() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
List<Object> all = new LinkedList<Object>();
synchronized (outList) {
all.addAll(outList);
}
for (Object o : all) {
if (o instanceof File) {
File f = (File) o;
FileInputStream fin = new FileInputStream(f);
copyStream(fin, out);
fin.close();
} else if (o instanceof byte[]) {
out.write((byte[]) o);
} else if (o instanceof Integer) {
out.write((int) o);
} else if (o instanceof URL) {
InputStream fin = ((URL) o).openStream();
copyStream(fin, out);
fin.close();
} else {
// can not handle the object
}
}
out.close();
return out.toByteArray();
}
public void flush() throws IOException {
synchronized (outList) {
buffer.flush();
byte[] bytes = buffer.toByteArray();
bytes = detectTypeFromLine(bytes);
if (bytes != null) {
outList.add(bytes);
if (type == InterpreterResult.Type.TEXT) {
flushListener.onAppend(this, bytes);
}
}
buffer.reset();
}
}
private void copyStream(InputStream in, OutputStream out) throws IOException {
int bufferSize = 8192;
byte[] buffer = new byte[bufferSize];
while (true) {
int bytesRead = in.read(buffer);
if (bytesRead == -1) {
break;
} else {
out.write(buffer, 0, bytesRead);
}
}
}
@Override
public void close() throws IOException {
flush();
if (watcher != null) {
watcher.clear();
watcher.shutdown();
}
}
}

View file

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import java.io.File;
/**
* InterpreterOutputChangeListener
*/
public interface InterpreterOutputChangeListener {
public void fileChanged(File file);
}

View file

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import java.io.File;
import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Watch the change for the development mode support
*/
public class InterpreterOutputChangeWatcher extends Thread {
Logger logger = LoggerFactory.getLogger(InterpreterOutputChangeWatcher.class);
private WatchService watcher;
private final List<File> watchFiles = new LinkedList<File>();
private final Map<WatchKey, File> watchKeys = new HashMap<WatchKey, File>();
private InterpreterOutputChangeListener listener;
private boolean stop;
public InterpreterOutputChangeWatcher(InterpreterOutputChangeListener listener)
throws IOException {
watcher = FileSystems.getDefault().newWatchService();
this.listener = listener;
}
public void watch(File file) throws IOException {
String dirString;
if (file.isFile()) {
dirString = file.getParentFile().getAbsolutePath();
} else {
throw new IOException(file.getName() + " is not a file");
}
if (dirString == null) {
dirString = "/";
}
Path dir = FileSystems.getDefault().getPath(dirString);
logger.info("watch " + dir);
WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
synchronized (watchKeys) {
watchKeys.put(key, new File(dirString));
watchFiles.add(file);
}
}
public void clear() {
synchronized (watchKeys) {
for (WatchKey key : watchKeys.keySet()) {
key.cancel();
}
watchKeys.clear();
watchFiles.clear();
}
}
public void shutdown() throws IOException {
stop = true;
clear();
watcher.close();
}
public void run() {
while (!stop) {
WatchKey key = null;
try {
key = watcher.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException | ClosedWatchServiceException e) {
break;
}
if (key == null) {
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == OVERFLOW) {
continue;
}
WatchEvent<Path> ev = (WatchEvent<Path>) event;
Path filename = ev.context();
// search for filename
synchronized (watchKeys) {
for (File f : watchFiles) {
if (f.getName().compareTo(filename.toString()) == 0) {
File changedFile;
if (filename.isAbsolute()) {
changedFile = new File(filename.toString());
} else {
changedFile = new File(watchKeys.get(key), filename.toString());
}
logger.info("File change detected " + changedFile.getAbsolutePath());
if (listener != null) {
listener.fileChanged(changedFile);
}
}
}
}
}
boolean valid = key.reset();
if (!valid) {
break;
}
}
}
}

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
/**
* Listen InterpreterOutput buffer flush
*/
public interface InterpreterOutputListener {
/**
* called when newline is detected
* @param line
*/
public void onAppend(InterpreterOutput out, byte[] line);
/**
* when entire output is updated. eg) after detecting new display system
* @param output
*/
public void onUpdate(InterpreterOutput out, byte[] output);
}

View file

@ -146,4 +146,8 @@ public class InterpreterResult implements Serializable {
this.type = type;
return this;
}
public String toString() {
return "%" + type.name().toLowerCase() + " " + msg;
}
}

View file

@ -48,6 +48,7 @@ import com.google.gson.reflect.TypeToken;
*
*/
public class RemoteInterpreter extends Interpreter {
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
Gson gson = new Gson();
private String interpreterRunner;
@ -60,32 +61,35 @@ public class RemoteInterpreter extends Interpreter {
private int connectTimeout;
public RemoteInterpreter(Properties property,
String className,
String interpreterRunner,
String interpreterPath,
int connectTimeout) {
String className,
String interpreterRunner,
String interpreterPath,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.className = className;
initialized = false;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
env = new HashMap<String, String>();
this.connectTimeout = connectTimeout;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
public RemoteInterpreter(Properties property,
String className,
String interpreterRunner,
String interpreterPath,
Map<String, String> env,
int connectTimeout) {
String className,
String interpreterRunner,
String interpreterPath,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.className = className;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.env = env;
this.connectTimeout = connectTimeout;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
@Override
@ -103,7 +107,8 @@ public class RemoteInterpreter extends Interpreter {
if (intpGroup.getRemoteInterpreterProcess() == null) {
// create new remote process
RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
interpreterRunner, interpreterPath, env, connectTimeout);
interpreterRunner, interpreterPath, env, connectTimeout,
remoteInterpreterProcessListener);
intpGroup.setRemoteInterpreterProcess(remoteProcess);
}

View file

@ -18,29 +18,35 @@
package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
*
*/
public class RemoteInterpreterEventPoller extends Thread {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
private final RemoteInterpreterProcessListener listener;
private volatile boolean shutdown;
private RemoteInterpreterProcess interpreterProcess;
private InterpreterGroup interpreterGroup;
public RemoteInterpreterEventPoller() {
public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) {
this.listener = listener;
shutdown = false;
}
@ -110,6 +116,24 @@ public class RemoteInterpreterEventPoller extends Thread {
interpreterProcess.getInterpreterContextRunnerPool().run(
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
// on output append
Map<String, String> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, String>>() {}.getType());
String noteId = outputAppend.get("noteId");
String paragraphId = outputAppend.get("paragraphId");
String outputToAppend = outputAppend.get("data");
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
// on output update
Map<String, String> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, String>>() {}.getType());
String noteId = outputAppend.get("noteId");
String paragraphId = outputAppend.get("paragraphId");
String outputToUpdate = outputAppend.get("data");
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
}
logger.debug("Event from remoteproceess {}", event.getType());
} catch (Exception e) {

View file

@ -53,10 +53,11 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
private int connectTimeout;
public RemoteInterpreterProcess(String intpRunner,
String intpDir,
Map<String, String> env,
int connectTimeout) {
this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(), connectTimeout);
String intpDir,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener listener) {
this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(listener), connectTimeout);
}
RemoteInterpreterProcess(String intpRunner,

View file

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
/**
* Event from remoteInterpreterProcess
*/
public interface RemoteInterpreterProcessListener {
public void onOutputAppend(String noteId, String paragraphId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String output);
}

View file

@ -35,15 +35,8 @@ import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
@ -300,7 +293,26 @@ public class RemoteInterpreterServer
try {
InterpreterContext.set(context);
InterpreterResult result = interpreter.interpret(script, context);
return result;
// data from context.out is prepended to InterpreterResult if both defined
String message = "";
context.out.flush();
InterpreterResult.Type outputType = context.out.getType();
byte[] interpreterOutput = context.out.toByteArray();
context.out.clear();
if (interpreterOutput != null && interpreterOutput.length > 0) {
message = new String(interpreterOutput);
}
String interpreterResultMessage = result.message();
if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
message += interpreterResultMessage;
return new InterpreterResult(result.code(), result.type(), message);
} else {
return new InterpreterResult(result.code(), outputType, message);
}
} finally {
InterpreterContext.remove();
}
@ -351,7 +363,8 @@ public class RemoteInterpreterServer
private InterpreterContext convert(RemoteInterpreterContext ric) {
List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType());
new TypeToken<List<RemoteInterpreterContextRunner>>() {
}.getType());
for (InterpreterContextRunner r : runners) {
contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
@ -366,7 +379,40 @@ public class RemoteInterpreterServer
new TypeToken<Map<String, Object>>() {}.getType()),
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry(),
contextRunners);
contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
}
private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) {
return new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
Map<String, String> appendOutput = new HashMap<String, String>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("data", new String(line));
Gson gson = new Gson();
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.OUTPUT_APPEND,
gson.toJson(appendOutput)));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
Map<String, String> appendOutput = new HashMap<String, String>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("data", new String(output));
Gson gson = new Gson();
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.OUTPUT_UPDATE,
gson.toJson(appendOutput)));
}
});
}

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -33,7 +33,9 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
ANGULAR_OBJECT_ADD(2),
ANGULAR_OBJECT_UPDATE(3),
ANGULAR_OBJECT_REMOVE(4),
RUN_INTERPRETER_CONTEXT_RUNNER(5);
RUN_INTERPRETER_CONTEXT_RUNNER(5),
OUTPUT_APPEND(6),
OUTPUT_UPDATE(7);
private final int value;
@ -64,6 +66,10 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return ANGULAR_OBJECT_REMOVE;
case 5:
return RUN_INTERPRETER_CONTEXT_RUNNER;
case 6:
return OUTPUT_APPEND;
case 7:
return OUTPUT_UPDATE;
default:
return null;
}

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterService {
public interface Iface {

View file

@ -42,7 +42,9 @@ enum RemoteInterpreterEventType {
ANGULAR_OBJECT_ADD = 2,
ANGULAR_OBJECT_UPDATE = 3,
ANGULAR_OBJECT_REMOVE = 4,
RUN_INTERPRETER_CONTEXT_RUNNER = 5
RUN_INTERPRETER_CONTEXT_RUNNER = 5,
OUTPUT_APPEND = 6,
OUTPUT_UPDATE = 7
}
struct RemoteInterpreterEvent {

View file

@ -27,7 +27,7 @@ public class InterpreterContextTest {
public void testThreadLocal() {
assertNull(InterpreterContext.get());
InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null));
InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null));
assertNotNull(InterpreterContext.get());
InterpreterContext.remove();

View file

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import static org.junit.Assert.*;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener {
private File tmpDir;
private File fileChanged;
private int numChanged;
private InterpreterOutputChangeWatcher watcher;
@Before
public void setUp() throws Exception {
watcher = new InterpreterOutputChangeWatcher(this);
watcher.start();
tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
tmpDir.mkdirs();
fileChanged = null;
numChanged = 0;
}
@After
public void tearDown() throws Exception {
watcher.shutdown();
delete(tmpDir);
}
private void delete(File file){
if(file.isFile()) file.delete();
else if(file.isDirectory()){
File [] files = file.listFiles();
if(files!=null && files.length>0){
for(File f : files){
delete(f);
}
}
file.delete();
}
}
@Test
public void test() throws IOException, InterruptedException {
assertNull(fileChanged);
assertEquals(0, numChanged);
Thread.sleep(1000);
// create new file
File file1 = new File(tmpDir, "test1");
file1.createNewFile();
File file2 = new File(tmpDir, "test2");
file2.createNewFile();
watcher.watch(file1);
Thread.sleep(1000);
FileOutputStream out1 = new FileOutputStream(file1);
out1.write(1);
out1.close();
FileOutputStream out2 = new FileOutputStream(file2);
out2.write(1);
out2.close();
synchronized (this) {
wait(30*1000);
}
assertNotNull(fileChanged);
assertEquals(1, numChanged);
}
@Override
public void fileChanged(File file) {
fileChanged = file;
numChanged++;
synchronized(this) {
notify();
}
}
}

View file

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import static org.junit.Assert.*;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class InterpreterOutputTest implements InterpreterOutputListener {
private InterpreterOutput out;
int numAppendEvent;
int numUpdateEvent;
@Before
public void setUp() {
out = new InterpreterOutput(this);
numAppendEvent = 0;
numUpdateEvent = 0;
}
@After
public void tearDown() throws IOException {
out.close();
}
@Test
public void testDetectNewline() throws IOException {
out.write("hello\nworld");
assertEquals("hello\n", new String(out.toByteArray()));
assertEquals(1, numAppendEvent);
assertEquals(1, numUpdateEvent);
out.write("\n");
assertEquals("hello\nworld\n", new String(out.toByteArray()));
assertEquals(2, numAppendEvent);
assertEquals(1, numUpdateEvent);
}
@Test
public void testFlush() throws IOException {
out.write("hello\nworld");
assertEquals("hello\n", new String(out.toByteArray()));
assertEquals(1, numAppendEvent);
assertEquals(1, numUpdateEvent);
out.flush();
assertEquals("hello\nworld", new String(out.toByteArray()));
assertEquals(2, numAppendEvent);
assertEquals(1, numUpdateEvent);
out.clear();
out.write("%html div");
assertEquals("", new String(out.toByteArray()));
assertEquals(InterpreterResult.Type.TEXT, out.getType());
out.flush();
out.write("%html div");
assertEquals("div", new String(out.toByteArray()));
assertEquals(InterpreterResult.Type.HTML, out.getType());
}
@Test
public void testType() throws IOException {
// default output stream type is TEXT
out.write("Text\n");
assertEquals(InterpreterResult.Type.TEXT, out.getType());
assertEquals("Text\n", new String(out.toByteArray()));
assertEquals(1, numAppendEvent);
assertEquals(1, numUpdateEvent);
// change type
out.write("%html\n");
assertEquals(InterpreterResult.Type.HTML, out.getType());
assertEquals("", new String(out.toByteArray()));
assertEquals(1, numAppendEvent);
assertEquals(2, numUpdateEvent);
// none TEXT type output stream does not generate append event
out.write("<div>html</div>\n");
assertEquals(InterpreterResult.Type.HTML, out.getType());
assertEquals(1, numAppendEvent);
assertEquals(2, numUpdateEvent);
assertEquals("<div>html</div>\n", new String(out.toByteArray()));
// change type to text again
out.write("%text hello\n");
assertEquals(InterpreterResult.Type.TEXT, out.getType());
assertEquals(2, numAppendEvent);
assertEquals(3, numUpdateEvent);
assertEquals("hello\n", new String(out.toByteArray()));
}
@Test
public void testType2() throws IOException {
out.write("%html\nHello");
assertEquals(InterpreterResult.Type.HTML, out.getType());
}
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
numAppendEvent++;
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
numUpdateEvent++;
}
}

View file

@ -105,4 +105,9 @@ public class InterpreterResultTest {
"123\n", result.message());
}
@Test
public void testToString() {
assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString());
}
}

View file

@ -64,12 +64,13 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
Properties p = new Properties();
intp = new RemoteInterpreter(
p,
MockInterpreterAngular.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
p,
MockInterpreterAngular.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
intpGroup.add(intp);
@ -83,7 +84,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
new LinkedList<InterpreterContextRunner>(), null);
intp.open();
}

View file

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
/**
* Test for remote interpreter output stream
*/
public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
private InterpreterGroup intpGroup;
private HashMap<String, String> env;
@Before
public void setUp() throws Exception {
intpGroup = new InterpreterGroup();
env = new HashMap<String, String>();
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
}
@After
public void tearDown() throws Exception {
intpGroup.close();
intpGroup.destroy();
}
private RemoteInterpreter createMockInterpreter() {
RemoteInterpreter intp = new RemoteInterpreter(
new Properties(),
MockInterpreterOutputStream.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
this);
intpGroup.add(intp);
intp.setInterpreterGroup(intpGroup);
return intp;
}
private InterpreterContext createInterpreterContext() {
return new InterpreterContext(
"noteId",
"id",
"title",
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>(), null);
}
@Test
public void testInterpreterResultOnly() {
RemoteInterpreter intp = createMockInterpreter();
InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals("staticresult", ret.message());
ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals("staticresult2", ret.message());
ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertEquals("staticresult3", ret.message());
}
@Test
public void testInterpreterOutputStreamOnly() {
RemoteInterpreter intp = createMockInterpreter();
InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals("streamresult", ret.message());
ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertEquals("streamresult2", ret.message());
}
@Test
public void testInterpreterResultOutputStreamMixed() {
RemoteInterpreter intp = createMockInterpreter();
InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals("streamstatic", ret.message());
}
@Test
public void testOutputType() {
RemoteInterpreter intp = createMockInterpreter();
InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
assertEquals(InterpreterResult.Type.HTML, ret.type());
assertEquals("hello", ret.message());
ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
assertEquals(InterpreterResult.Type.HTML, ret.type());
assertEquals("hello", ret.message());
ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
assertEquals(InterpreterResult.Type.ANGULAR, ret.type());
assertEquals("helloworld", ret.message());
}
@Override
public void onOutputAppend(String noteId, String paragraphId, String output) {
}
@Override
public void onOutputUpdated(String noteId, String paragraphId, String output) {
}
}

View file

@ -34,7 +34,7 @@ public class RemoteInterpreterProcessTest {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
"../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
10 * 1000);
10 * 1000, null);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
assertEquals(1, rip.reference(intpGroup));

View file

@ -63,30 +63,38 @@ public class RemoteInterpreterTest {
intpGroup.destroy();
}
private RemoteInterpreter createMockInterpreterA(Properties p) {
return new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null);
}
private RemoteInterpreter createMockInterpreterB(Properties p) {
return new RemoteInterpreter(
p,
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null);
}
@Test
public void testRemoteInterperterCall() throws TTransportException, IOException {
Properties p = new Properties();
RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreter intpB = new RemoteInterpreter(
p,
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
RemoteInterpreter intpB = createMockInterpreterB(p);
intpGroup.add(intpB);
intpB.setInterpreterGroup(intpGroup);
@ -113,7 +121,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
intpB.open();
assertEquals(2, process.referenceCount());
@ -131,14 +139,7 @@ public class RemoteInterpreterTest {
public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
Properties p = new Properties();
RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
@ -153,7 +154,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
assertEquals(Code.ERROR, ret.code());
}
@ -163,24 +164,26 @@ public class RemoteInterpreterTest {
Properties p = new Properties();
RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreter intpB = new RemoteInterpreter(
p,
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
p,
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
intpGroup.add(intpB);
@ -199,7 +202,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
assertEquals("500", ret.message());
ret = intpB.interpret("500",
@ -211,7 +214,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
assertEquals("1000", ret.message());
long end = System.currentTimeMillis();
assertTrue(end - start >= 1000);
@ -225,26 +228,12 @@ public class RemoteInterpreterTest {
public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
Properties p = new Properties();
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
final RemoteInterpreter intpB = new RemoteInterpreter(
p,
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
final RemoteInterpreter intpB = createMockInterpreterB(p);
intpGroup.add(intpB);
intpB.setInterpreterGroup(intpGroup);
@ -276,7 +265,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
}
@Override
@ -310,7 +299,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
}
@Override
@ -340,14 +329,7 @@ public class RemoteInterpreterTest {
public void testRunOrderPreserved() throws InterruptedException {
Properties p = new Properties();
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
@ -382,7 +364,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
synchronized (results) {
results.add(ret.message());
@ -421,14 +403,7 @@ public class RemoteInterpreterTest {
Properties p = new Properties();
p.put("parallel", "true");
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
@ -466,7 +441,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
synchronized (results) {
results.add(ret.message());
@ -501,14 +476,7 @@ public class RemoteInterpreterTest {
public void testInterpreterGroupResetBeforeProcessStarts() {
Properties p = new Properties();
RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
RemoteInterpreter intpA = createMockInterpreterA(p);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
@ -523,14 +491,7 @@ public class RemoteInterpreterTest {
public void testInterpreterGroupResetAfterProcessFinished() {
Properties p = new Properties();
RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
RemoteInterpreter intpA = createMockInterpreterA(p);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
@ -548,14 +509,7 @@ public class RemoteInterpreterTest {
public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException {
Properties p = new Properties();
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
@ -585,7 +539,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
}
@Override
@ -616,26 +570,12 @@ public class RemoteInterpreterTest {
public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
Properties p = new Properties();
RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreter intpB = new RemoteInterpreter(
p,
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
);
RemoteInterpreter intpB = createMockInterpreterB(p);
intpGroup.add(intpB);
intpB.setInterpreterGroup(intpGroup);

View file

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote.mock;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
/**
* MockInterpreter to test outputstream
*/
public class MockInterpreterOutputStream extends Interpreter {
static {
Interpreter.register(
"interpreterOutputStream",
"group1",
MockInterpreterA.class.getName(),
new InterpreterPropertyBuilder().build());
}
private String lastSt;
public MockInterpreterOutputStream(Properties property) {
super(property);
}
@Override
public void open() {
//new RuntimeException().printStackTrace();
}
@Override
public void close() {
}
public String getLastStatement() {
return lastSt;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] ret = st.split(":");
try {
if (ret[1] != null) {
context.out.write(ret[1]);
}
} catch (IOException e) {
throw new InterpreterException(e);
}
return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ?
ret[2] : "");
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.NATIVE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public List<String> completion(String buf, int cursor) {
return null;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
}
}

View file

@ -64,12 +64,13 @@ public class RemoteSchedulerTest {
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
intpGroup.add(intpA);
@ -103,7 +104,7 @@ public class RemoteSchedulerTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
new LinkedList<InterpreterContextRunner>(), null));
return "1000";
}
@ -147,12 +148,13 @@ public class RemoteSchedulerTest {
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
intpGroup.add(intpA);
@ -173,7 +175,7 @@ public class RemoteSchedulerTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
new LinkedList<InterpreterContextRunner>(), null);
@Override
public int progress() {
@ -209,7 +211,7 @@ public class RemoteSchedulerTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
new LinkedList<InterpreterContextRunner>(), null);
@Override
public int progress() {

View file

@ -81,7 +81,8 @@ public class ZeppelinServer extends Application {
this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO));
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookWsServer, depResolver);
this.replFactory = new InterpreterFactory(conf, notebookWsServer,
notebookWsServer, depResolver);
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new LuceneSearch();

View file

@ -93,6 +93,8 @@ public class Message {
PARAGRAPH_REMOVE,
PARAGRAPH_CLEAR_OUTPUT,
PARAGRAPH_APPEND_OUTPUT, // [s-c] append output
PARAGRAPH_UPDATE_OUTPUT, // [s-c] update (replace) output
PING,
ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object

View file

@ -30,12 +30,11 @@ import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@ -57,7 +56,8 @@ import com.google.gson.Gson;
*
*/
public class NotebookServer extends WebSocketServlet implements
NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener {
NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener,
RemoteInterpreterProcessListener {
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
Gson gson = new Gson();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
@ -707,15 +707,47 @@ public class NotebookServer extends WebSocketServlet implements
}
}
/**
* This callback is for the paragraph that runs on ZeppelinServer
* @param noteId
* @param paragraphId
* @param output output to append
*/
@Override
public void onOutputAppend(String noteId, String paragraphId, String output) {
Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("data", output);
Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
broadcast(noteId, msg);
}
/**
* This callback is for the paragraph that runs on ZeppelinServer
* @param noteId
* @param paragraphId
* @param output output to update (replace)
*/
@Override
public void onOutputUpdated(String noteId, String paragraphId, String output) {
Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("data", output);
Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
broadcast(noteId, msg);
}
/**
* Need description here.
*
*/
public static class ParagraphJobListener implements JobListener {
public static class ParagraphListenerImpl implements ParagraphJobListener {
private NotebookServer notebookServer;
private Note note;
public ParagraphJobListener(NotebookServer notebookServer, Note note) {
public ParagraphListenerImpl(NotebookServer notebookServer, Note note) {
this.notebookServer = notebookServer;
this.note = note;
}
@ -750,11 +782,43 @@ public class NotebookServer extends WebSocketServlet implements
}
notebookServer.broadcastNote(note);
}
/**
* This callback is for praragraph that runs on RemoteInterpreterProcess
* @param paragraph
* @param out
* @param output
*/
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
.put("noteId", paragraph.getNote().getId())
.put("paragraphId", paragraph.getId())
.put("data", output);
notebookServer.broadcast(paragraph.getNote().getId(), msg);
}
/**
* This callback is for paragraph that runs on RemoteInterpreterProcess
* @param paragraph
* @param out
* @param output
*/
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
.put("noteId", paragraph.getNote().getId())
.put("paragraphId", paragraph.getId())
.put("data", output);
notebookServer.broadcast(paragraph.getNote().getId(), msg);
}
}
@Override
public JobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener(this, note);
public ParagraphJobListener getParagraphJobListener(Note note) {
return new ParagraphListenerImpl(this, note);
}
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {

View file

@ -23,24 +23,22 @@ limitations under the License.
ng-bind-html="paragraph.result.comment">
</div>
<div id="{{paragraph.id}}_text"
<div id="p{{paragraph.id}}_text"
class="text"
ng-if="paragraph.result.type == 'TEXT'"
ng-bind="paragraph.result.msg">
</div>
ng-if="getResultType() == 'TEXT'"></div>
<div id="p{{paragraph.id}}_html"
class="resultContained"
ng-if="paragraph.result.type == 'HTML'">
ng-if="getResultType() == 'HTML'">
</div>
<div id="p{{paragraph.id}}_angular"
class="resultContained"
ng-if="paragraph.result.type == 'ANGULAR'">
ng-if="getResultType() == 'ANGULAR'">
</div>
<img id="{{paragraph.id}}_img"
ng-if="paragraph.result.type == 'IMG'"
ng-if="getResultType() == 'IMG'"
ng-src="{{getBase64ImageSrc(paragraph.result.msg)}}">
</img>

View file

@ -54,11 +54,13 @@ angular.module('zeppelinWebApp')
$scope.renderHtml();
} else if ($scope.getResultType() === 'ANGULAR') {
$scope.renderAngular();
} else if ($scope.getResultType() === 'TEXT') {
$scope.renderText();
}
};
$scope.renderHtml = function() {
var retryRenderer = function() {
$scope.renderHtml = function() {
var retryRenderer = function() {
if (angular.element('#p' + $scope.paragraph.id + '_html').length) {
try {
angular.element('#p' + $scope.paragraph.id + '_html').html($scope.paragraph.result.msg);
@ -93,6 +95,42 @@ angular.module('zeppelinWebApp')
$timeout(retryRenderer);
};
$scope.renderText = function() {
var retryRenderer = function() {
var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
if (textEl.length) {
// clear all lines before render
$scope.clearTextOutput();
if ($scope.paragraph.result && $scope.paragraph.result.msg) {
$scope.appendTextOutput($scope.paragraph.result.msg);
}
} else {
$timeout(retryRenderer, 10);
}
};
$timeout(retryRenderer);
};
$scope.clearTextOutput = function() {
var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
if (textEl.length) {
textEl.children().remove();
}
};
$scope.appendTextOutput = function(msg) {
var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
if (textEl.length) {
var lines = msg.split('\n');
for (var i=0; i < lines.length; i++) {
textEl.append(angular.element('<div></div>').text(lines[i]));
}
}
};
var initializeDefault = function() {
var config = $scope.paragraph.config;
@ -156,6 +194,10 @@ angular.module('zeppelinWebApp')
}
});
var isEmpty = function (object) {
return !object;
};
// TODO: this may have impact on performance when there are many paragraphs in a note.
$scope.$on('updateParagraph', function(event, data) {
if (data.paragraph.id === $scope.paragraph.id &&
@ -166,6 +208,7 @@ angular.module('zeppelinWebApp')
data.paragraph.status !== $scope.paragraph.status ||
data.paragraph.jobName !== $scope.paragraph.jobName ||
data.paragraph.title !== $scope.paragraph.title ||
isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result) ||
data.paragraph.errorMessage !== $scope.paragraph.errorMessage ||
!angular.equals(data.paragraph.settings, $scope.paragraph.settings) ||
!angular.equals(data.paragraph.config, $scope.paragraph.config))
@ -175,7 +218,8 @@ angular.module('zeppelinWebApp')
var newType = $scope.getResultType(data.paragraph);
var oldGraphMode = $scope.getGraphMode();
var newGraphMode = $scope.getGraphMode(data.paragraph);
var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished);
var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished) || isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result);
var statusChanged = (data.paragraph.status !== $scope.paragraph.status);
//console.log("updateParagraph oldData %o, newData %o. type %o -> %o, mode %o -> %o", $scope.paragraph, data, oldType, newType, oldGraphMode, newGraphMode);
@ -234,6 +278,8 @@ angular.module('zeppelinWebApp')
$scope.renderHtml();
} else if (newType === 'ANGULAR' && resultRefreshed) {
$scope.renderAngular();
} else if (newType === 'TEXT' && resultRefreshed) {
$scope.renderText();
}
if (statusChanged || resultRefreshed) {
@ -252,6 +298,19 @@ angular.module('zeppelinWebApp')
});
$scope.$on('appendParagraphOutput', function(event, data) {
if ($scope.paragraph.id === data.paragraphId) {
$scope.appendTextOutput(data.data);
}
});
$scope.$on('updateParagraphOutput', function(event, data) {
if ($scope.paragraph.id === data.paragraphId) {
$scope.clearTextOutput(data.data);
$scope.appendTextOutput(data.data);
}
});
$scope.isRunning = function() {
if ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING') {
return true;

View file

@ -54,6 +54,10 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope,
$rootScope.$broadcast('setNoteMenu', data.notes);
} else if (op === 'PARAGRAPH') {
$rootScope.$broadcast('updateParagraph', data);
} else if (op === 'PARAGRAPH_APPEND_OUTPUT') {
$rootScope.$broadcast('appendParagraphOutput', data);
} else if (op === 'PARAGRAPH_UPDATE_OUTPUT') {
$rootScope.$broadcast('updateParagraphOutput', data);
} else if (op === 'PROGRESS') {
$rootScope.$broadcast('updateProgress', data);
} else if (op === 'COMPLETION_LIST') {

View file

@ -29,6 +29,7 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.slf4j.Logger;
@ -65,25 +66,30 @@ public class InterpreterFactory {
private InterpreterOption defaultOption;
AngularObjectRegistryListener angularObjectRegistryListener;
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
DependencyResolver depResolver;
public InterpreterFactory(ZeppelinConfiguration conf,
AngularObjectRegistryListener angularObjectRegistryListener,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
DependencyResolver depResolver)
throws InterpreterException, IOException {
this(conf, new InterpreterOption(true), angularObjectRegistryListener, depResolver);
this(conf, new InterpreterOption(true), angularObjectRegistryListener,
remoteInterpreterProcessListener, depResolver);
}
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
AngularObjectRegistryListener angularObjectRegistryListener,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
DependencyResolver depResolver)
throws InterpreterException, IOException {
this.conf = conf;
this.defaultOption = defaultOption;
this.angularObjectRegistryListener = angularObjectRegistryListener;
this.depResolver = depResolver;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
interpreterClassList = replsConf.split(",");
@ -500,7 +506,8 @@ public class InterpreterFactory {
/**
* Change interpreter property and restart
* @param name
* @param id
* @param option
* @param properties
* @throws IOException
*/
@ -659,7 +666,7 @@ public class InterpreterFactory {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
property, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, connectTimeout));
interpreterPath, connectTimeout, remoteInterpreterProcessListener));
return intp;
}

View file

@ -17,11 +17,9 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.scheduler.JobListener;
/**
* TODO(moon): provide description.
*/
public interface JobListenerFactory {
public JobListener getParagraphJobListener(Note note);
public ParagraphJobListener getParagraphJobListener(Note note);
}

View file

@ -19,39 +19,43 @@ package org.apache.zeppelin.notebook;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.utility.IdHashes;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.search.SearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Binded interpreters for a note
*/
public class Note implements Serializable, JobListener {
static Logger logger = LoggerFactory.getLogger(Note.class);
private static final long serialVersionUID = 7920699076577612429L;
// threadpool for delayed persist of note
private static final ScheduledThreadPoolExecutor delayedPersistThreadPool =
new ScheduledThreadPoolExecutor(0);
static {
delayedPersistThreadPool.setRemoveOnCancelPolicy(true);
}
final List<Paragraph> paragraphs = new LinkedList<>();
private String name = "";
private String id;
@ -62,6 +66,7 @@ public class Note implements Serializable, JobListener {
private transient JobListenerFactory jobListenerFactory;
private transient NotebookRepo repo;
private transient SearchService index;
private transient ScheduledFuture delayedPersist;
/**
* note configurations.
@ -144,9 +149,8 @@ public class Note implements Serializable, JobListener {
/**
* Add paragraph last.
*
* @param p
*/
public Paragraph addParagraph() {
Paragraph p = new Paragraph(this, this, replLoader);
synchronized (paragraphs) {
@ -187,7 +191,6 @@ public class Note implements Serializable, JobListener {
* Insert paragraph in given index.
*
* @param index
* @param p
*/
public Paragraph insertParagraph(int index) {
Paragraph p = new Paragraph(this, this, replLoader);
@ -339,8 +342,6 @@ public class Note implements Serializable, JobListener {
/**
* Run all paragraphs sequentially.
*
* @param jobListener
*/
public void runAll() {
synchronized (paragraphs) {
@ -400,15 +401,55 @@ public class Note implements Serializable, JobListener {
}
public void persist() throws IOException {
stopDelayedPersistTimer();
snapshotAngularObjectRegistry();
index.updateIndexDoc(this);
repo.save(this);
}
/**
* Persist this note with maximum delay.
* @param maxDelaySec
*/
public void persist(int maxDelaySec) {
startDelayedPersistTimer(maxDelaySec);
}
public void unpersist() throws IOException {
repo.remove(id());
}
private void startDelayedPersistTimer(int maxDelaySec) {
synchronized (this) {
if (delayedPersist != null) {
return;
}
delayedPersist = delayedPersistThreadPool.schedule(new Runnable() {
@Override
public void run() {
try {
persist();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}, maxDelaySec, TimeUnit.SECONDS);
}
}
private void stopDelayedPersistTimer() {
synchronized (this) {
if (delayedPersist == null) {
return;
}
delayedPersist.cancel(false);
}
}
public Map<String, Object> getConfig() {
if (config == null) {
config = new HashMap<>();

View file

@ -28,6 +28,7 @@ import org.apache.zeppelin.scheduler.JobListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
@ -213,7 +214,29 @@ public class Paragraph extends Job implements Serializable, Cloneable {
if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
return getReturn();
}
return ret;
String message = "";
context.out.flush();
InterpreterResult.Type outputType = context.out.getType();
byte[] interpreterOutput = context.out.toByteArray();
context.out.clear();
if (interpreterOutput != null && interpreterOutput.length > 0) {
message = new String(interpreterOutput);
}
if (message.isEmpty()) {
return ret;
} else {
String interpreterResultMessage = ret.message();
if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
message += interpreterResultMessage;
return new InterpreterResult(ret.code(), ret.type(), message);
} else {
return new InterpreterResult(ret.code(), outputType, message);
}
}
} finally {
InterpreterContext.remove();
}
@ -244,6 +267,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
runners.add(new ParagraphRunner(note, note.id(), p.getId()));
}
final Paragraph self = this;
InterpreterContext interpreterContext = new InterpreterContext(
note.id(),
getId(),
@ -252,7 +276,34 @@ public class Paragraph extends Job implements Serializable, Cloneable {
this.getConfig(),
this.settings,
registry,
runners);
runners,
new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
updateParagraphResult(out);
((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
updateParagraphResult(out);
((ParagraphJobListener) getListener()).onOutputUpdate(self, out,
new String(output));
}
private void updateParagraphResult(InterpreterOutput out) {
// update paragraph result
Throwable t = null;
String message = null;
try {
message = new String(out.toByteArray());
} catch (IOException e) {
logger().error(e.getMessage(), e);
t = e;
}
setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t);
}
}));
return interpreterContext;
}

View file

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.scheduler.JobListener;
/**
* Listen paragraph update
*/
public interface ParagraphJobListener extends JobListener {
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output);
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output);
}

View file

@ -55,8 +55,8 @@ public class InterpreterFactoryTest {
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
conf = new ZeppelinConfiguration();
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
context = new InterpreterContext("note", "id", "title", "text", null, null, null, null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null);
}
@ -140,7 +140,7 @@ public class InterpreterFactoryTest {
factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties());
assertEquals(3, factory.get().size());
InterpreterFactory factory2 = new InterpreterFactory(conf, null, null);
InterpreterFactory factory2 = new InterpreterFactory(conf, null, null, null);
assertEquals(3, factory2.get().size());
}
}

View file

@ -58,7 +58,7 @@ public class NoteInterpreterLoaderTest {
MockInterpreter11.register("mock11", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter11");
MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
}
@After

View file

@ -38,6 +38,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
@ -85,7 +86,7 @@ public class NotebookTest implements JobListenerFactory{
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
@ -172,7 +173,8 @@ public class NotebookTest implements JobListenerFactory{
note.persist();
Notebook notebook2 = new Notebook(
conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null), this, null);
conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null, null), this,
null);
assertEquals(1, notebook2.getAllNotes().size());
}
@ -411,8 +413,16 @@ public class NotebookTest implements JobListenerFactory{
}
@Override
public JobListener getParagraphJobListener(Note note) {
return new JobListener(){
public ParagraphJobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener(){
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
}
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
}
@Override
public void onProgressUpdate(Job job, int progress) {

View file

@ -30,12 +30,10 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@ -87,7 +85,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
SearchService search = mock(SearchService.class);
notebookRepoSync = new NotebookRepoSync(conf);
@ -224,8 +222,16 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
}
@Override
public JobListener getParagraphJobListener(Note note) {
return new JobListener(){
public ParagraphJobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener(){
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
}
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
}
@Override
public void onProgressUpdate(Job job, int progress) {

View file

@ -30,10 +30,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
@ -76,7 +73,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
this.schedulerFactory = new SchedulerFactory();
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
@ -140,7 +137,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
}
@Override
public JobListener getParagraphJobListener(Note note) {
public ParagraphJobListener getParagraphJobListener(Note note) {
return null;
}
}