Merge branch 'master' into ZEPPELIN-528

Conflicts:
	zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
This commit is contained in:
Jungtaek Lim 2015-12-24 14:54:40 +09:00
commit 69a58abc80
59 changed files with 3143 additions and 408 deletions

View file

@ -105,7 +105,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

View file

@ -5,6 +5,8 @@ permalink: /:categories/:year/:month/:day/:title
exclude: [".rvmrc", ".rbenv-version", "README.md", "Rakefile", "changelog.md", "vendor", "node_modules", "scss"]
pygments: true
markdown: redcarpet
redcarpet:
extensions: ["tables"]
encoding: utf-8
# Themes are encouraged to use these universal variables

View file

@ -37,6 +37,7 @@
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Interpreter <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="{{BASE_PATH}}/interpreter/cassandra.html">Cassandra</a></li>
<li><a href="{{BASE_PATH}}/interpreter/elasticsearch.html">Elasticsearch</a></li>
<li><a href="{{BASE_PATH}}/interpreter/flink.html">Flink</a></li>
<li><a href="{{BASE_PATH}}/interpreter/geode.html">Geode</a></li>
<li><a href="{{BASE_PATH}}/interpreter/hive.html">Hive</a></li>

View file

@ -514,5 +514,9 @@ and (max-width: 1024px) {
}
#menu .navbar-brand {
margin-right: 50px;
margin-right: 50px;
}
.row img {
max-width: 100%;
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 113 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 257 KiB

View file

@ -44,6 +44,7 @@ limitations under the License.
* [sh](./pleasecontribute.html)
* [spark](./interpreter/spark.html)
* [tajo](./pleasecontribute.html)
* [elasticsearch](./interpreter/elasticsearch.html)
### Storage
* [S3 Storage](./storage/storage.html)

View file

@ -0,0 +1,236 @@
---
layout: page
title: "Elasticsearch Interpreter"
description: ""
group: manual
---
{% include JB/setup %}
## Elasticsearch Interpreter for Apache Zeppelin
### 1. Configuration
<br/>
<table class="table-configuration">
<tr>
<th>Property</th>
<th>Default</th>
<th>Description</th>
</tr>
<tr>
<td>elasticsearch.cluster.name</td>
<td>elasticsearch</td>
<td>Cluster name</td>
</tr>
<tr>
<td>elasticsearch.host</td>
<td>localhost</td>
<td>Host of a node in the cluster</td>
</tr>
<tr>
<td>elasticsearch.port</td>
<td>9300</td>
<td>Connection port <b>(important: this is not the HTTP port, but the transport port)</b></td>
</tr>
<tr>
<td>elasticsearch.result.size</td>
<td>10</td>
<td>The size of the result set of a search query</td>
</tr>
</table>
<center>
![Interpreter configuration](../assets/themes/zeppelin/img/docs-img/elasticsearch-config.png)
</center>
> Note #1: you can add more properties to configure the Elasticsearch client.
> Note #2: if you use Shield, you can add a property named `shield.user` with a value containing the name and the password (format: `username:password`). For more details about Shield configuration, consult the [Shield reference guide](https://www.elastic.co/guide/en/shield/current/_using_elasticsearch_java_clients_with_shield.html). Do not forget, to copy the shield client jar in the interpreter directory (`ZEPPELIN_HOME/interpreters/elasticsearch`).
<hr/>
### 2. Enabling the Elasticsearch Interpreter
In a notebook, to enable the **Elasticsearch** interpreter, click the **Gear** icon and select **Elasticsearch**.
<hr/>
### 3. Using the Elasticsearch Interpreter
In a paragraph, use `%elasticsearch` to select the Elasticsearch interpreter and then input all commands. To get the list of available commands, use `help`.
```bash
| %elasticsearch
| help
Elasticsearch interpreter:
General format: <command> /<indices>/<types>/<id> <option> <JSON>
- indices: list of indices separated by commas (depends on the command)
- types: list of document types separated by commas (depends on the command)
Commands:
- search /indices/types <query>
. indices and types can be omitted (at least, you have to provide '/')
. a query is either a JSON-formatted query, nor a lucene query
- size <value>
. defines the size of the result set (default value is in the config)
. if used, this command must be declared before a search command
- count /indices/types <query>
. same comments as for the search
- get /index/type/id
- delete /index/type/id
- index /ndex/type/id <json-formatted document>
. the id can be omitted, elasticsearch will generate one
```
> Tip: use (CTRL + .) for completion
#### get
With the `get` command, you can find a document by id. The result is a JSON document.
```bash
| %elasticsearch
| get /index/type/id
```
Example:
![Elasticsearch - Get](../assets/themes/zeppelin/img/docs-img/elasticsearch-get.png)
#### search
With the `search` command, you can send a search query to Elasticsearch. There are two formats of query:
* You can provide a JSON-formatted query, that is exactly what you provide when you use the REST API of Elasticsearch.
* See [Elasticsearch search API reference document](https://www.elastic.co/guide/en/elasticsearch/reference/current/search.html) for more details about the content of the search queries.
* You can also provide the content of a `query_string`
* This is a shortcut to a query like that: `{ "query": { "query_string": { "query": "__HERE YOUR QUERY__", "analyze_wildcard": true } } }`
* See [Elasticsearch query string syntax](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax) for more details about the content of such a query.
```bash
| %elasticsearch
| search /index1,index2,.../type1,type2,... <JSON document containing the query or query_string elements>
```
If you want to modify the size of the result set, you can add a line that is setting the size, before your search command.
```bash
| %elasticsearch
| size 50
| search /index1,index2,.../type1,type2,... <JSON document containing the query or query_string elements>
```
Examples:
* With a JSON query:
```bash
| %elasticsearch
| search / { "query": { "match_all": {} } }
|
| %elasticsearch
| search /logs { "query": { "query_string": { "query": "request.method:GET AND status:200" } } }
```
* With query_string elements:
```bash
| %elasticsearch
| search /logs request.method:GET AND status:200
|
| %elasticsearch
| search /logs (404 AND (POST OR DELETE))
```
> **Important**: a document in Elasticsearch is a JSON document, so it is hierarchical, not flat as a row in a SQL table.
For the Elastic interpreter, the result of a search query is flattened.
Suppose we have a JSON document:
```
{
"date": "2015-12-08T21:03:13.588Z",
"request": {
"method": "GET",
"url": "/zeppelin/4cd001cd-c517-4fa9-b8e5-a06b8f4056c4",
"headers": [ "Accept: *.*", "Host: apache.org"]
},
"status": "403"
}
```
The data will be flattened like this:
date | request.headers[0] | request.headers[1] | request.method | request.url | status
-----|--------------------|--------------------|----------------|-------------|-------
2015-12-08T21:03:13.588Z | Accept: \*.\* | Host: apache.org | GET | /zeppelin/4cd001cd-c517-4fa9-b8e5-a06b8f4056c4 | 403
Examples:
* With a table containing the results:
![Elasticsearch - Search - table](../assets/themes/zeppelin/img/docs-img/elasticsearch-search-table.png)
* You can also use a predefined diagram:
![Elasticsearch - Search - diagram](../assets/themes/zeppelin/img/docs-img/elasticsearch-search-pie.png)
* With a JSON query:
![Elasticsearch - Search with query](../assets/themes/zeppelin/img/docs-img/elasticsearch-search-json-query-table.png)
* With a query string:
![Elasticsearch - Search with query string](../assets/themes/zeppelin/img/docs-img/elasticsearch-query-string.png)
#### count
With the `count` command, you can count documents available in some indices and types. You can also provide a query.
```bash
| %elasticsearch
| count /index1,index2,.../type1,type2,... <JSON document containing the query OR a query string>
```
Examples:
* Without query:
![Elasticsearch - Count](../assets/themes/zeppelin/img/docs-img/elasticsearch-count.png)
* With a query:
![Elasticsearch - Count with query](../assets/themes/zeppelin/img/docs-img/elasticsearch-count-with-query.png)
#### index
With the `index` command, you can insert/update a document in Elasticsearch.
```bash
| %elasticsearch
| index /index/type/id <JSON document>
|
| %elasticsearch
| index /index/type <JSON document>
```
#### delete
With the `delete` command, you can delete a document.
```bash
| %elasticsearch
| delete /index/type/id
```
#### Apply Zeppelin Dynamic Forms
You can leverage [Zeppelin Dynamic Form]({{BASE_PATH}}/manual/dynamicform.html) inside your queries. You can use both the `text input` and `select form` parameterization features
```bash
| %elasticsearch
| size ${limit=10}
| search /index/type { "query": { "match_all": {} } }
```

View file

@ -556,4 +556,33 @@ limitations under the License.
<td><pre>{"status":"OK","body":"* * * * * ?"}</pre></td>
</tr>
</table>
<table class="table-configuration">
<col width="200">
<tr>
<th>Full-text search through the paragraphs in all notebooks</th>
<th></th>
</tr>
<tr>
<td>Description</td>
<td>```GET``` request will return list of matching paragraphs
</td>
</tr>
<tr>
<td>URL</td>
<td>```http://[zeppelin-server]:[zeppelin-port]/api/notebook/search?q=[query]```</td>
</tr>
<tr>
<td>Success code</td>
<td>200</td>
</tr>
<tr>
<td>Fail code</td>
<td> 500 </td>
</tr>
<tr>
<td>Sample JSON response </td>
<td><pre>{"status":"OK", body: [{"id":"<noteId>/paragraph/<paragraphId>", "name":"Notebook Name", "snippet":"", "text":""}]}</pre></td>
</tr>
</table>

147
elasticsearch/pom.xml Normal file
View file

@ -0,0 +1,147 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.6.0-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-elasticsearch</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<name>Zeppelin: Elasticsearch interpreter</name>
<url>http://www.apache.org</url>
<properties>
<elasticsearch.version>2.1.0</elasticsearch.version>
<guava.version>18.0</guava.version>
<json-flattener.version>0.1.1</json-flattener.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.github.wnameless</groupId>
<artifactId>json-flattener</artifactId>
<version>${json-flattener.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/elasticsearch</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/elasticsearch</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,465 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.elasticsearch;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.wnameless.json.flattener.JsonFlattener;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
/**
* Elasticsearch Interpreter for Zeppelin.
*/
public class ElasticsearchInterpreter extends Interpreter {
private static Logger logger = LoggerFactory.getLogger(ElasticsearchInterpreter.class);
private static final String HELP = "Elasticsearch interpreter:\n"
+ "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n"
+ " - indices: list of indices separated by commas (depends on the command)\n"
+ " - types: list of document types separated by commas (depends on the command)\n"
+ "Commands:\n"
+ " - search /indices/types <query>\n"
+ " . indices and types can be omitted (at least, you have to provide '/')\n"
+ " . a query is either a JSON-formatted query, nor a lucene query\n"
+ " - size <value>\n"
+ " . defines the size of the result set (default value is in the config)\n"
+ " . if used, this command must be declared before a search command\n"
+ " - count /indices/types <query>\n"
+ " . same comments as for the search\n"
+ " - get /index/type/id\n"
+ " - delete /index/type/id\n"
+ " - index /ndex/type/id <json-formatted document>\n"
+ " . the id can be omitted, elasticsearch will generate one";
private static final List<String> COMMANDS = Arrays.asList(
"count", "delete", "get", "help", "index", "search");
public static final String ELASTICSEARCH_HOST = "elasticsearch.host";
public static final String ELASTICSEARCH_PORT = "elasticsearch.port";
public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster.name";
public static final String ELASTICSEARCH_RESULT_SIZE = "elasticsearch.result.size";
static {
Interpreter.register(
"elasticsearch",
"elasticsearch",
ElasticsearchInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(ELASTICSEARCH_HOST, "localhost", "The host for Elasticsearch")
.add(ELASTICSEARCH_PORT, "9300", "The port for Elasticsearch")
.add(ELASTICSEARCH_CLUSTER_NAME, "elasticsearch", "The cluster name for Elasticsearch")
.add(ELASTICSEARCH_RESULT_SIZE, "10", "The size of the result set of a search query")
.build());
}
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
private Client client;
private String host = "localhost";
private int port = 9300;
private String clusterName = "elasticsearch";
private int resultSize = 10;
public ElasticsearchInterpreter(Properties property) {
super(property);
this.host = getProperty(ELASTICSEARCH_HOST);
this.port = Integer.parseInt(getProperty(ELASTICSEARCH_PORT));
this.clusterName = getProperty(ELASTICSEARCH_CLUSTER_NAME);
this.resultSize = Integer.parseInt(getProperty(ELASTICSEARCH_RESULT_SIZE));
}
@Override
public void open() {
try {
logger.info("prop={}", getProperty());
final Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put(getProperty())
.build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
}
catch (IOException e) {
logger.error("Open connection with Elasticsearch", e);
}
}
@Override
public void close() {
if (client != null) {
client.close();
}
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
logger.info("Run Elasticsearch command '" + cmd + "'");
int currentResultSize = resultSize;
if (client == null) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Problem with the Elasticsearch client, please check your configuration (host, port,...)");
}
String[] items = StringUtils.split(cmd.trim(), " ", 3);
// Process some specific commands (help, size, ...)
if ("help".equalsIgnoreCase(items[0])) {
return processHelp(InterpreterResult.Code.SUCCESS, null);
}
if ("size".equalsIgnoreCase(items[0])) {
// In this case, the line with size must be followed by a search,
// so we will continue with the next lines
final String[] lines = StringUtils.split(cmd.trim(), "\n", 2);
if (lines.length < 2) {
return processHelp(InterpreterResult.Code.ERROR,
"Size cmd must be followed by a search");
}
final String[] sizeLine = StringUtils.split(lines[0], " ", 2);
if (sizeLine.length != 2) {
return processHelp(InterpreterResult.Code.ERROR, "Right format is : size <value>");
}
currentResultSize = Integer.parseInt(sizeLine[1]);
items = StringUtils.split(lines[1].trim(), " ", 3);
}
if (items.length < 2) {
return processHelp(InterpreterResult.Code.ERROR, "Arguments missing");
}
final String method = items[0];
final String url = items[1];
final String data = items.length > 2 ? items[2].trim() : null;
final String[] urlItems = StringUtils.split(url.trim(), "/");
try {
if ("get".equalsIgnoreCase(method)) {
return processGet(urlItems);
}
else if ("count".equalsIgnoreCase(method)) {
return processCount(urlItems, data);
}
else if ("search".equalsIgnoreCase(method)) {
return processSearch(urlItems, data, currentResultSize);
}
else if ("index".equalsIgnoreCase(method)) {
return processIndex(urlItems, data);
}
else if ("delete".equalsIgnoreCase(method)) {
return processDelete(urlItems);
}
return processHelp(InterpreterResult.Code.ERROR, "Unknown command");
}
catch (Exception e) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Error : " + e.getMessage());
}
}
@Override
public void cancel(InterpreterContext interpreterContext) {
// Nothing to do
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext interpreterContext) {
return 0;
}
@Override
public List<String> completion(String s, int i) {
final List<String> suggestions = new ArrayList<>();
if (StringUtils.isEmpty(s)) {
suggestions.addAll(COMMANDS);
}
else {
for (String cmd : COMMANDS) {
if (cmd.toLowerCase().contains(s)) {
suggestions.add(cmd);
}
}
}
return suggestions;
}
private InterpreterResult processHelp(InterpreterResult.Code code, String additionalMessage) {
final StringBuffer buffer = new StringBuffer();
if (additionalMessage != null) {
buffer.append(additionalMessage).append("\n");
}
buffer.append(HELP).append("\n");
return new InterpreterResult(code, InterpreterResult.Type.TEXT, buffer.toString());
}
/**
* Processes a "get" request.
*
* @param urlItems Items of the URL
* @return Result of the get request, it contains a JSON-formatted string
*/
private InterpreterResult processGet(String[] urlItems) {
if (urlItems.length != 3
|| StringUtils.isEmpty(urlItems[0])
|| StringUtils.isEmpty(urlItems[1])
|| StringUtils.isEmpty(urlItems[2])) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index/type/id)");
}
final GetResponse response = client
.prepareGet(urlItems[0], urlItems[1], urlItems[2])
.get();
if (response.isExists()) {
final String json = gson.toJson(response.getSource());
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
json);
}
return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
}
/**
* Processes a "count" request.
*
* @param urlItems Items of the URL
* @param data May contains the JSON of the request
* @return Result of the count request, it contains the total hits
*/
private InterpreterResult processCount(String[] urlItems, String data) {
if (urlItems.length > 2) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
}
final SearchResponse response = searchData(urlItems, data, 0);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
"" + response.getHits().getTotalHits());
}
/**
* Processes a "search" request.
*
* @param urlItems Items of the URL
* @param data May contains the limit and the JSON of the request
* @return Result of the search request, it contains a tab-formatted string of the matching hits
*/
private InterpreterResult processSearch(String[] urlItems, String data, int size) {
if (urlItems.length > 2) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
}
final SearchResponse response = searchData(urlItems, data, size);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE,
buildResponseMessage(response.getHits().getHits()));
}
/**
* Processes a "index" request.
*
* @param urlItems Items of the URL
* @param data JSON to be indexed
* @return Result of the index request, it contains the id of the document
*/
private InterpreterResult processIndex(String[] urlItems, String data) {
if (urlItems.length < 2 || urlItems.length > 3) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index/type or /index/type/id)");
}
final IndexResponse response = client
.prepareIndex(urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2])
.setSource(data)
.get();
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
response.getId());
}
/**
* Processes a "delete" request.
*
* @param urlItems Items of the URL
* @return Result of the delete request, it contains the id of the deleted document
*/
private InterpreterResult processDelete(String[] urlItems) {
if (urlItems.length != 3
|| StringUtils.isEmpty(urlItems[0])
|| StringUtils.isEmpty(urlItems[1])
|| StringUtils.isEmpty(urlItems[2])) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index/type/id)");
}
final DeleteResponse response = client
.prepareDelete(urlItems[0], urlItems[1], urlItems[2])
.get();
if (response.isFound()) {
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
response.getId());
}
return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
}
private SearchResponse searchData(String[] urlItems, String query, int size) {
final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(
client, SearchAction.INSTANCE);
reqBuilder.setIndices();
if (urlItems.length >= 1) {
reqBuilder.setIndices(StringUtils.split(urlItems[0], ","));
}
if (urlItems.length > 1) {
reqBuilder.setTypes(StringUtils.split(urlItems[1], ","));
}
if (!StringUtils.isEmpty(query)) {
// The query can be either JSON-formatted, nor a Lucene query
// So, try to parse as a JSON => if there is an error, consider the query a Lucene one
try {
final Map source = gson.fromJson(query, Map.class);
reqBuilder.setExtraSource(source);
}
catch (JsonParseException e) {
// This is not a JSON (or maybe not well formatted...)
reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
}
}
reqBuilder.setSize(size);
final SearchResponse response = reqBuilder.get();
return response;
}
private String buildResponseMessage(SearchHit[] hits) {
if (hits == null || hits.length == 0) {
return "";
}
//First : get all the keys in order to build an ordered list of the values for each hit
//
final List<Map<String, Object>> flattenHits = new LinkedList<>();
final Set<String> keys = new TreeSet<>();
for (SearchHit hit : hits) {
final String json = hit.getSourceAsString();
final Map<String, Object> flattenMap = JsonFlattener.flattenAsMap(json);
flattenHits.add(flattenMap);
for (String key : flattenMap.keySet()) {
keys.add(key);
}
}
// Next : build the header of the table
//
final StringBuffer buffer = new StringBuffer();
for (String key : keys) {
buffer.append(key).append('\t');
}
buffer.replace(buffer.lastIndexOf("\t"), buffer.lastIndexOf("\t") + 1, "\n");
// Finally : build the result by using the key set
//
for (Map<String, Object> hit : flattenHits) {
for (String key : keys) {
final Object val = hit.get(key);
if (val != null) {
buffer.append(val);
}
buffer.append('\t');
}
buffer.replace(buffer.lastIndexOf("\t"), buffer.lastIndexOf("\t") + 1, "\n");
}
return buffer.toString();
}
}

View file

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.elasticsearch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class ElasticsearchInterpreterTest {
private static Client elsClient;
private static Node elsNode;
private static ElasticsearchInterpreter interpreter;
private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" };
private static final String[] STATUS = { "200", "404", "500", "403" };
private static final String ELS_CLUSTER_NAME = "zeppelin-elasticsearch-interpreter-test";
private static final String ELS_HOST = "localhost";
private static final String ELS_TRANSPORT_PORT = "10300";
private static final String ELS_HTTP_PORT = "10200";
private static final String ELS_PATH = "/tmp/els";
@BeforeClass
public static void populate() throws IOException {
final Settings settings = Settings.settingsBuilder()
.put("cluster.name", ELS_CLUSTER_NAME)
.put("network.host", ELS_HOST)
.put("http.port", ELS_HTTP_PORT)
.put("transport.tcp.port", ELS_TRANSPORT_PORT)
.put("path.home", ELS_PATH)
.build();
elsNode = NodeBuilder.nodeBuilder().settings(settings).node();
elsClient = elsNode.client();
for (int i = 0; i < 50; i++) {
elsClient.prepareIndex("logs", "http", "" + i)
.setRefresh(true)
.setSource(jsonBuilder()
.startObject()
.field("date", new Date())
.startObject("request")
.field("method", METHODS[RandomUtils.nextInt(METHODS.length)])
.field("url", "/zeppelin/" + UUID.randomUUID().toString())
.field("headers", Arrays.asList("Accept: *.*", "Host: apache.org"))
.endObject()
.field("status", STATUS[RandomUtils.nextInt(STATUS.length)])
)
.get();
}
final Properties props = new Properties();
props.put(ElasticsearchInterpreter.ELASTICSEARCH_HOST, ELS_HOST);
props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT);
props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME, ELS_CLUSTER_NAME);
interpreter = new ElasticsearchInterpreter(props);
interpreter.open();
}
@AfterClass
public static void clean() {
if (interpreter != null) {
interpreter.close();
}
if (elsClient != null) {
elsClient.admin().indices().delete(new DeleteIndexRequest("logs")).actionGet();
elsClient.close();
}
if (elsNode != null) {
elsNode.close();
}
}
@Test
public void testCount() {
InterpreterResult res = interpreter.interpret("count /unknown", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("count /logs", null);
assertEquals("50", res.message());
}
@Test
public void testGet() {
InterpreterResult res = interpreter.interpret("get /logs/http/unknown", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("get /logs/http/10", null);
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testSearch() {
InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", null);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("search /logs {{{hello}}}", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", null);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("search /logs status:404", null);
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testIndex() {
InterpreterResult res = interpreter.interpret("index /logs { \"date\": \"" + new Date() + "\", \"method\": \"PUT\", \"status\": \"500\" }", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testDelete() {
InterpreterResult res = interpreter.interpret("delete /logs/http/unknown", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("delete /logs/http/11", null);
assertEquals("11", res.message());
}
}

View file

@ -100,6 +100,7 @@
<module>kylin</module>
<module>lens</module>
<module>cassandra</module>
<module>elasticsearch</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-distribution</module>

View file

@ -25,7 +25,8 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
# please see the online documentation at vagrantup.com.
# Every Vagrant virtual environment requires a box to build off of.
config.vm.box = "hashicorp/precise64"
# ubuntu/trusty64 Official Ubuntu Server 14.04 LTS (Trusty Tahr) builds
config.vm.box = "ubuntu/trusty64"
# Disable automatic box update checking. If you disable this, then
# boxes will only be checked for updates when the user runs

View file

@ -36,4 +36,6 @@ echo '# or for a specific build'
echo
echo 'mvn clean package -Pspark-1.5 -Ppyspark -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests'
echo './bin/zeppelin-daemon.sh start'
echo
echo 'On your host machine browse to http://localhost:8080/'

View file

@ -44,6 +44,7 @@ The following components are provided under Apache License.
(Apache 2.0) Apache Tajo (http://tajo.apache.org/)
(Apache 2.0) Apache Flink (http://flink.apache.org/)
(Apache 2.0) Apache Thrift (http://thrift.apache.org/)
(Apache 2.0) Apache Lucene (https://lucene.apache.org/)
(Apache 2.0) Apache Zookeeper (org.apache.zookeeper:zookeeper:jar:3.4.5 - http://zookeeper.apache.org/)
(Apache 2.0) Chill (com.twitter:chill-java:jar:0.5.0 - https://github.com/twitter/chill/)
(Apache 2.0) Codehaus Plexus (org.codehaus.plexus:plexus:jar:1.5.6 - https://codehaus-plexus.github.io/)
@ -66,6 +67,31 @@ The following components are provided under Apache License.
(Apache 2.0) lz4-java (net.jpountz.lz4:lz4:jar:1.3.0 - https://github.com/jpountz/lz4-java)
(Apache 2.0) RoaringBitmap (org.roaringbitmap:RoaringBitmap:jar:0.4.5 - https://github.com/lemire/RoaringBitmap)
(Apache 2.0) json4s (org.json4s:json4s-ast_2.10:jar:3.2.10 - https://github.com/json4s/json4s)
(Apache 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)
(Apache 2.0) Jackson-dataformat-CBOR (com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.6.2 - http://wiki.fasterxml.com/JacksonForCbor)
(Apache 2.0) Jackson-dataformat-Smile (com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.6.2 - http://wiki.fasterxml.com/JacksonForSmile)
(Apache 2.0) Jackson-dataformat-YAML (com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.6.2 - https://github.com/FasterXML/jackson)
(Apache 2.0) json-flattener (com.github.wnameless:json-flattener:0.1.1 - https://github.com/wnameless/json-flattener)
(Apache 2.0) Spatial4J (com.spatial4j:spatial4j:0.4.1 - https://github.com/spatial4j/spatial4j)
(Apache 2.0) T-Digest (com.tdunning:t-digest:3.0 - https://github.com/tdunning/t-digest)
(Apache 2.0) Netty (io.netty:netty:3.10.5.Final - http://netty.io/)
(Apache 2.0) Lucene Common Analyzers (org.apache.lucene:lucene-analyzers-common:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-analyzers-common)
(Apache 2.0) Lucene Memory (org.apache.lucene:lucene-backward-codecs:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-backward-codecs)
(Apache 2.0) Lucene Core (org.apache.lucene:lucene-core:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-core)
(Apache 2.0) Lucene Grouping (org.apache.lucene:lucene-grouping:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-grouping)
(Apache 2.0) Lucene Highlighter (org.apache.lucene:lucene-highlighter:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-highlighter)
(Apache 2.0) Lucene Join (org.apache.lucene:lucene-join:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-join)
(Apache 2.0) Lucene Memory (org.apache.lucene:lucene-memory:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-memory)
(Apache 2.0) Lucene Miscellaneous (org.apache.lucene:lucene-misc:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-misc)
(Apache 2.0) Lucene Queries (org.apache.lucene:lucene-queries:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-queries)
(Apache 2.0) Lucene QueryParsers (org.apache.lucene:lucene-queryparser:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-queryparser)
(Apache 2.0) Lucene Sandbox (org.apache.lucene:lucene-sandbox:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-sandbox)
(Apache 2.0) Lucene Spatial (org.apache.lucene:lucene-spatial:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-spatial)
(Apache 2.0) Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-spatial3d)
(Apache 2.0) Lucene Suggest (org.apache.lucene:lucene-suggest:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-suggest)
(Apache 2.0) Elasticsearch: Core (org.elasticsearch:elasticsearch:2.1.0 - http://nexus.sonatype.org/oss-repository-hosting.html/parent/elasticsearch)
(Apache 2.0) Joda convert (org.joda:joda-convert:1.2 - http://joda-convert.sourceforge.net)
(Apache 2.0) SnakeYAML (org.yaml:snakeyaml:1.15 - http://www.snakeyaml.org)
@ -104,7 +130,8 @@ The following components are provided under the MIT License.
(The MIT License) Objenesis (org.objenesis:objenesis:2.1 - https://github.com/easymock/objenesis) - Copyright (c) 2006-2015 the original author and authors
(The MIT License) JCL 1.1.1 implemented over SLF4J (org.slf4j:jcl-over-slf4j:1.7.5 - http://www.slf4j.org)
(The MIT License) JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.5 - http://www.slf4j.org)
(The MIT License) angular-resource (angular-resource - https://github.com/angular/angular.js/tree/master/src/ngResource)
(The MIT License) minimal-json (com.eclipsesource.minimal-json:minimal-json:0.9.4 - https://github.com/ralfstx/minimal-json)
========================================================================
@ -178,3 +205,10 @@ See licenses/LICENSE-postgresql
========================================================================
Creative Commons CC0 (http://creativecommons.org/publicdomain/zero/1.0/)
========================================================================
(CC0 1.0 Universal) JSR166e (com.twitter:jsr166e:1.1.0 - http://github.com/twitter/jsr166e)
(Public Domain, per Creative Commons CC0) HdrHistogram (org.hdrhistogram:HdrHistogram:2.1.6 - http://hdrhistogram.github.io/HdrHistogram/)

View file

@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
* and saving/loading jobs from disk.
* Changing/adding/deleting non transitive field name need consideration of that.
*
* @author Leemoonsoo
*/
public abstract class Job {
/**

View file

@ -197,6 +197,22 @@
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0-m10</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
@ -240,23 +256,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0-m10</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
@ -268,6 +267,7 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.0</version>
<scope>test</scope>
</dependency>
</dependencies>

View file

@ -22,19 +22,29 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.ws.rs.*;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.rest.message.*;
import org.apache.zeppelin.rest.message.CronRequest;
import org.apache.zeppelin.rest.message.InterpreterSettingListForNoteBind;
import org.apache.zeppelin.rest.message.NewNotebookRequest;
import org.apache.zeppelin.rest.message.NewParagraphRequest;
import org.apache.zeppelin.rest.message.RunParagraphWithParametersRequest;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.server.JsonResponse;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.socket.NotebookServer;
import org.quartz.CronExpression;
import org.slf4j.Logger;
@ -49,17 +59,18 @@ import com.google.gson.reflect.TypeToken;
@Path("/notebook")
@Produces("application/json")
public class NotebookRestApi {
Logger logger = LoggerFactory.getLogger(NotebookRestApi.class);
private static final Logger LOG = LoggerFactory.getLogger(NotebookRestApi.class);
Gson gson = new Gson();
private Notebook notebook;
private NotebookServer notebookServer;
private SearchService notebookIndex;
public NotebookRestApi() {}
public NotebookRestApi(Notebook notebook, NotebookServer notebookServer) {
public NotebookRestApi(Notebook notebook, NotebookServer notebookServer, SearchService search) {
this.notebook = notebook;
this.notebookServer = notebookServer;
this.notebookIndex = search;
}
/**
@ -71,7 +82,7 @@ public class NotebookRestApi {
public Response bind(@PathParam("noteId") String noteId, String req) throws IOException {
List<String> settingIdList = gson.fromJson(req, new TypeToken<List<String>>(){}.getType());
notebook.bindInterpretersToNote(noteId, settingIdList);
return new JsonResponse(Status.OK).build();
return new JsonResponse<>(Status.OK).build();
}
/**
@ -114,14 +125,14 @@ public class NotebookRestApi {
);
}
}
return new JsonResponse(Status.OK, "", settingList).build();
return new JsonResponse<>(Status.OK, "", settingList).build();
}
@GET
@Path("/")
public Response getNotebookList() throws IOException {
List<Map<String, String>> notesInfo = notebookServer.generateNotebooksInfo();
return new JsonResponse(Status.OK, "", notesInfo ).build();
return new JsonResponse<>(Status.OK, "", notesInfo ).build();
}
@GET
@ -129,10 +140,10 @@ public class NotebookRestApi {
public Response getNotebook(@PathParam("notebookId") String notebookId) throws IOException {
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}
return new JsonResponse(Status.OK, "", note).build();
return new JsonResponse<>(Status.OK, "", note).build();
}
/**
@ -144,7 +155,7 @@ public class NotebookRestApi {
@POST
@Path("/")
public Response createNote(String message) throws IOException {
logger.info("Create new notebook by JSON {}" , message);
LOG.info("Create new notebook by JSON {}" , message);
NewNotebookRequest request = gson.fromJson(message,
NewNotebookRequest.class);
Note note = notebook.createNote();
@ -165,7 +176,7 @@ public class NotebookRestApi {
note.persist();
notebookServer.broadcastNote(note);
notebookServer.broadcastNoteList();
return new JsonResponse(Status.CREATED, "", note.getId() ).build();
return new JsonResponse<>(Status.CREATED, "", note.getId() ).build();
}
/**
@ -177,7 +188,7 @@ public class NotebookRestApi {
@DELETE
@Path("{notebookId}")
public Response deleteNote(@PathParam("notebookId") String notebookId) throws IOException {
logger.info("Delete notebook {} ", notebookId);
LOG.info("Delete notebook {} ", notebookId);
if (!(notebookId.isEmpty())) {
Note note = notebook.getNote(notebookId);
if (note != null) {
@ -185,7 +196,7 @@ public class NotebookRestApi {
}
}
notebookServer.broadcastNoteList();
return new JsonResponse(Status.OK, "").build();
return new JsonResponse<>(Status.OK, "").build();
}
/**
@ -198,14 +209,14 @@ public class NotebookRestApi {
@Path("{notebookId}")
public Response cloneNote(@PathParam("notebookId") String notebookId, String message) throws
IOException, CloneNotSupportedException, IllegalArgumentException {
logger.info("clone notebook by JSON {}" , message);
LOG.info("clone notebook by JSON {}" , message);
NewNotebookRequest request = gson.fromJson(message,
NewNotebookRequest.class);
String newNoteName = request.getName();
Note newNote = notebook.cloneNote(notebookId, newNoteName);
notebookServer.broadcastNote(newNote);
notebookServer.broadcastNoteList();
return new JsonResponse(Status.CREATED, "", newNote.getId()).build();
return new JsonResponse<>(Status.CREATED, "", newNote.getId()).build();
}
/**
@ -218,14 +229,14 @@ public class NotebookRestApi {
@Path("job/{notebookId}")
public Response runNoteJobs(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("run notebook jobs {} ", notebookId);
LOG.info("run notebook jobs {} ", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}
note.runAll();
return new JsonResponse(Status.OK).build();
return new JsonResponse<>(Status.OK).build();
}
/**
@ -238,10 +249,10 @@ public class NotebookRestApi {
@Path("job/{notebookId}")
public Response stopNoteJobs(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("stop notebook jobs {} ", notebookId);
LOG.info("stop notebook jobs {} ", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}
for (Paragraph p : note.getParagraphs()) {
@ -249,7 +260,7 @@ public class NotebookRestApi {
p.abort();
}
}
return new JsonResponse(Status.OK).build();
return new JsonResponse<>(Status.OK).build();
}
/**
@ -262,19 +273,21 @@ public class NotebookRestApi {
@Path("job/{notebookId}")
public Response getNoteJobStatus(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("get notebook job status.");
LOG.info("get notebook job status.");
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}
return new JsonResponse(Status.OK, null, note.generateParagraphsInfo()).build();
return new JsonResponse<>(Status.OK, null, note.generateParagraphsInfo()).build();
}
/**
* Run paragraph job REST API
*
* @param message - JSON with params if user wants to update dynamic form's value
* null, empty string, empty json if user doesn't want to update
*
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
*/
@ -284,16 +297,16 @@ public class NotebookRestApi {
@PathParam("paragraphId") String paragraphId,
String message) throws
IOException, IllegalArgumentException {
logger.info("run paragraph job {} {} {}", notebookId, paragraphId, message);
LOG.info("run paragraph job {} {} {}", notebookId, paragraphId, message);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph == null) {
return new JsonResponse(Status.NOT_FOUND, "paragraph not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "paragraph not found.").build();
}
// handle params if presented
@ -308,7 +321,7 @@ public class NotebookRestApi {
}
note.run(paragraph.getId());
return new JsonResponse(Status.OK).build();
return new JsonResponse<>(Status.OK).build();
}
/**
@ -322,18 +335,18 @@ public class NotebookRestApi {
public Response stopParagraph(@PathParam("notebookId") String notebookId,
@PathParam("paragraphId") String paragraphId) throws
IOException, IllegalArgumentException {
logger.info("stop paragraph job {} ", notebookId);
LOG.info("stop paragraph job {} ", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}
Paragraph p = note.getParagraph(paragraphId);
if (p == null) {
return new JsonResponse(Status.NOT_FOUND, "paragraph not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "paragraph not found.").build();
}
p.abort();
return new JsonResponse(Status.OK).build();
return new JsonResponse<>(Status.OK).build();
}
/**
@ -346,18 +359,18 @@ public class NotebookRestApi {
@Path("cron/{notebookId}")
public Response registerCronJob(@PathParam("notebookId") String notebookId, String message) throws
IOException, IllegalArgumentException {
logger.info("Register cron job note={} request cron msg={}", notebookId, message);
LOG.info("Register cron job note={} request cron msg={}", notebookId, message);
CronRequest request = gson.fromJson(message,
CronRequest.class);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}
if (!CronExpression.isValidExpression(request.getCronString())) {
return new JsonResponse(Status.BAD_REQUEST, "wrong cron expressions.").build();
return new JsonResponse<>(Status.BAD_REQUEST, "wrong cron expressions.").build();
}
Map<String, Object> config = note.getConfig();
@ -365,7 +378,7 @@ public class NotebookRestApi {
note.setConfig(config);
notebook.refreshCron(note.id());
return new JsonResponse(Status.OK).build();
return new JsonResponse<>(Status.OK).build();
}
/**
@ -378,11 +391,11 @@ public class NotebookRestApi {
@Path("cron/{notebookId}")
public Response removeCronJob(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("Remove cron job note {}", notebookId);
LOG.info("Remove cron job note {}", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}
Map<String, Object> config = note.getConfig();
@ -390,7 +403,7 @@ public class NotebookRestApi {
note.setConfig(config);
notebook.refreshCron(note.id());
return new JsonResponse(Status.OK).build();
return new JsonResponse<>(Status.OK).build();
}
/**
@ -403,13 +416,26 @@ public class NotebookRestApi {
@Path("cron/{notebookId}")
public Response getCronJob(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("Get cron job note {}", notebookId);
LOG.info("Get cron job note {}", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}
return new JsonResponse(Status.OK, note.getConfig().get("cron")).build();
return new JsonResponse<>(Status.OK, note.getConfig().get("cron")).build();
}
/**
* Search for a Notes
*/
@GET
@Path("search")
public Response search(@QueryParam("q") String queryTerm) {
LOG.info("Searching notebooks for: {}", queryTerm);
List<Map<String, String>> notebooksFound = notebookIndex.query(queryTerm);
LOG.info("{} notbooks found", notebooksFound.size());
return new JsonResponse<>(Status.OK, notebooksFound).build();
}
}

View file

@ -24,7 +24,6 @@ import javax.ws.rs.core.Response;
/**
* Zeppelin root rest api endpoint.
*
* @author anthonycorbacho
* @since 0.3.4
*/
@Path("/")

View file

@ -38,6 +38,8 @@ import org.apache.zeppelin.rest.InterpreterRestApi;
import org.apache.zeppelin.rest.NotebookRestApi;
import org.apache.zeppelin.rest.ZeppelinRestApi;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.search.LuceneSearch;
import org.apache.zeppelin.socket.NotebookServer;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.Handler;
@ -59,54 +61,66 @@ import org.slf4j.LoggerFactory;
* Main class of Zeppelin.
*
*/
public class ZeppelinServer extends Application {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class);
private SchedulerFactory schedulerFactory;
public static Notebook notebook;
public static NotebookServer notebookServer;
public static Server jettyServer;
public static Server jettyWebServer;
public static NotebookServer notebookWsServer;
private SchedulerFactory schedulerFactory;
private InterpreterFactory replFactory;
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
public static void main(String[] args) throws Exception {
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookWsServer);
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new LuceneSearch();
notebook = new Notebook(conf,
notebookRepo, schedulerFactory, replFactory, notebookWsServer, notebookIndex);
}
public static void main(String[] args) throws InterruptedException {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
conf.setProperty("args", args);
jettyServer = setupJettyServer(conf);
// REST api
final ServletContextHandler restApi = setupRestApiContextHandler(conf);
final ServletContextHandler restApiContext = setupRestApiContextHandler(conf);
// Notebook server
final ServletContextHandler notebook = setupNotebookServer(conf);
final ServletContextHandler notebookContext = setupNotebookServer(conf);
// Web UI
final WebAppContext webApp = setupWebAppContext(conf);
// add all handlers
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(new Handler[]{restApi, notebook, webApp});
jettyServer.setHandler(contexts);
contexts.setHandlers(new Handler[]{restApiContext, notebookContext, webApp});
LOG.info("Start zeppelin server");
jettyWebServer = setupJettyServer(conf);
jettyWebServer.setHandler(contexts);
LOG.info("Starting zeppelin server");
try {
jettyServer.start();
jettyWebServer.start(); //Instantiates ZeppelinServer
} catch (Exception e) {
LOG.error("Error while running jettyServer", e);
System.exit(-1);
}
LOG.info("Started zeppelin server");
LOG.info("Done, zeppelin server started");
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override public void run() {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyServer.stop();
ZeppelinServer.notebook.getInterpreterFactory().close();
ZeppelinServer.notebook.close();
jettyWebServer.stop();
notebook.getInterpreterFactory().close();
notebook.close();
} catch (Exception e) {
LOG.error("Error while stopping servlet container", e);
}
@ -125,18 +139,15 @@ public class ZeppelinServer extends Application {
System.exit(0);
}
jettyServer.join();
jettyWebServer.join();
ZeppelinServer.notebook.getInterpreterFactory().close();
}
private static Server setupJettyServer(ZeppelinConfiguration conf)
throws Exception {
private static Server setupJettyServer(ZeppelinConfiguration conf) {
AbstractConnector connector;
if (conf.useSsl()) {
connector = new SslSelectChannelConnector(getSslContextFactory(conf));
}
else {
} else {
connector = new SelectChannelConnector();
}
@ -153,11 +164,9 @@ public class ZeppelinServer extends Application {
return server;
}
private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf)
throws Exception {
notebookServer = new NotebookServer();
final ServletHolder servletHolder = new ServletHolder(notebookServer);
private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) {
notebookWsServer = new NotebookServer();
final ServletHolder servletHolder = new ServletHolder(notebookWsServer);
servletHolder.setInitParameter("maxTextMessageSize", "1024000");
final ServletContextHandler cxfContext = new ServletContextHandler(
@ -171,9 +180,8 @@ public class ZeppelinServer extends Application {
return cxfContext;
}
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf)
throws Exception {
@SuppressWarnings("deprecation")
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) {
// Note that the API for the SslContextFactory is different for
// Jetty version 9
SslContextFactory sslContextFactory = new SslContextFactory();
@ -194,6 +202,7 @@ public class ZeppelinServer extends Application {
return sslContextFactory;
}
@SuppressWarnings("unused") //TODO(bzz) why unused?
private static SSLContext getSslContext(ZeppelinConfiguration conf)
throws Exception {
@ -240,23 +249,10 @@ public class ZeppelinServer extends Application {
webApp.setTempDirectory(warTempDirectory);
}
// Explicit bind to root
webApp.addServlet(
new ServletHolder(new DefaultServlet()),
"/*"
);
webApp.addServlet(new ServletHolder(new DefaultServlet()), "/*");
return webApp;
}
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookServer);
this.notebookRepo = new NotebookRepoSync(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer);
}
@Override
public Set<Class<?>> getClasses() {
Set<Class<?>> classes = new HashSet<Class<?>>();
@ -264,14 +260,14 @@ public class ZeppelinServer extends Application {
}
@Override
public java.util.Set<java.lang.Object> getSingletons() {
Set<Object> singletons = new HashSet<Object>();
public Set<Object> getSingletons() {
Set<Object> singletons = new HashSet<>();
/** Rest-api root endpoint */
ZeppelinRestApi root = new ZeppelinRestApi();
singletons.add(root);
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer);
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer, notebookIndex);
singletons.add(notebookApi);
InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);

View file

@ -15,10 +15,13 @@
* limitations under the License.
*/
package org.apache.zeppelin.socket;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@ -44,27 +47,27 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import com.google.gson.Gson;
/**
* Zeppelin websocket service.
*
* @author anthonycorbacho
*/
public class NotebookServer extends WebSocketServlet implements
NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener {
private static final Logger LOG = LoggerFactory
.getLogger(NotebookServer.class);
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
Gson gson = new Gson();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
final List<NotebookSocket> connectedSockets = new LinkedList<>();
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
private Notebook notebook() {
return ZeppelinServer.notebook;
}
@Override
public boolean checkOrigin(HttpServletRequest request, String origin) {
try {
return SecurityUtils.isValidOrigin(origin, ZeppelinConfiguration.create());
} catch (UnknownHostException e) {
@ -72,7 +75,6 @@ public class NotebookServer extends WebSocketServlet implements
} catch (URISyntaxException e) {
e.printStackTrace();
}
return false;
}
@ -85,9 +87,7 @@ public class NotebookServer extends WebSocketServlet implements
public void onOpen(NotebookSocket conn) {
LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
conn.getRequest().getRemotePort());
synchronized (connectedSockets) {
connectedSockets.add(conn);
}
connectedSockets.add(conn);
}
@Override
@ -147,8 +147,7 @@ public class NotebookServer extends WebSocketServlet implements
completion(conn, notebook, messagereceived);
break;
case PING:
pong();
break;
break; //do nothing
case ANGULAR_OBJECT_UPDATED:
angularObjectUpdated(conn, notebook, messagereceived);
break;
@ -166,9 +165,7 @@ public class NotebookServer extends WebSocketServlet implements
LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest()
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
removeConnectionFromAllNote(conn);
synchronized (connectedSockets) {
connectedSockets.remove(conn);
}
connectedSockets.remove(conn);
}
protected Message deserializeMessage(String msg) {
@ -285,13 +282,11 @@ public class NotebookServer extends WebSocketServlet implements
}
private void broadcastAll(Message m) {
synchronized (connectedSockets) {
for (NotebookSocket conn : connectedSockets) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
LOG.error("socket error", e);
}
for (NotebookSocket conn : connectedSockets) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
LOG.error("socket error", e);
}
}
}
@ -730,6 +725,7 @@ public class NotebookServer extends WebSocketServlet implements
public static class ParagraphJobListener implements JobListener {
private NotebookServer notebookServer;
private Note note;
public ParagraphJobListener(NotebookServer notebookServer, Note note) {
this.notebookServer = notebookServer;
this.note = note;
@ -771,8 +767,6 @@ public class NotebookServer extends WebSocketServlet implements
public JobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener(this, note);
}
private void pong() {
}
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
List<InterpreterSetting> settings = note.getNoteReplLoader()

View file

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class CommandExecutor {
public final static Logger LOG = LoggerFactory.getLogger(CommandExecutor.class);
public enum IGNORE_ERRORS {
TRUE,
FALSE
}
public static int NORMAL_EXIT = 0;
private static IGNORE_ERRORS DEFAULT_BEHAVIOUR_ON_ERRORS = IGNORE_ERRORS.TRUE;
public static Object executeCommandLocalHost(String[] command, boolean printToConsole, ProcessData.Types_Of_Data type, IGNORE_ERRORS ignore_errors) {
List<String> subCommandsAsList = new ArrayList<String>(Arrays.asList(command));
String mergedCommand = StringUtils.join(subCommandsAsList, " ");
LOG.info("Sending command \"" + mergedCommand + "\" to localhost");
ProcessBuilder processBuilder = new ProcessBuilder(command);
Process process = null;
try {
process = processBuilder.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
ProcessData data_of_process = new ProcessData(process, printToConsole);
Object output_of_process = data_of_process.getData(type);
int exit_code = data_of_process.getExitCodeValue();
if (!printToConsole)
LOG.trace(output_of_process.toString());
else
LOG.debug(output_of_process.toString());
if (ignore_errors == IGNORE_ERRORS.FALSE && exit_code != NORMAL_EXIT) {
LOG.error(String.format("*********************Command '%s' failed with exitcode %s *********************", mergedCommand, exit_code));
}
return output_of_process;
}
public static Object executeCommandLocalHost(String command, boolean printToConsole, ProcessData.Types_Of_Data type) {
return executeCommandLocalHost(new String[]{"bash", "-c", command}, printToConsole, type, DEFAULT_BEHAVIOUR_ON_ERRORS);
}
}

View file

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.concurrent.TimeUnit;
public class ProcessData {
public enum Types_Of_Data {
OUTPUT,
ERROR,
EXIT_CODE,
STREAMS_MERGED,
PROCESS_DATA_OBJECT
}
public final static Logger LOG = LoggerFactory.getLogger(ProcessData.class);
private Process checked_process;
private boolean printToConsole = false;
private boolean removeRedundantOutput = true;
public ProcessData(Process connected_process, boolean printToConsole, int silenceTimeout, TimeUnit timeUnit) {
this.checked_process = connected_process;
this.printToConsole = printToConsole;
this.silenceTimeout = TimeUnit.MILLISECONDS.convert(silenceTimeout, timeUnit);
}
public ProcessData(Process connected_process, boolean printToConsole, int silenceTimeoutSec) {
this.checked_process = connected_process;
this.printToConsole = printToConsole;
this.silenceTimeout = TimeUnit.MILLISECONDS.convert(silenceTimeoutSec, TimeUnit.SECONDS);
}
public ProcessData(Process connected_process, boolean printToConsole) {
this.checked_process = connected_process;
this.printToConsole = printToConsole;
}
public ProcessData(Process connected_process) {
this.checked_process = connected_process;
this.printToConsole = true;
}
boolean returnCodeRetrieved = false;
private String outPutStream = null;
private String errorStream = null;
private int returnCode;
private long silenceTimeout = 10 * 60 * 1000;
private final long unconditionalExitDelayMinutes = 30;
public static boolean isRunning(Process process) {
try {
process.exitValue();
return false;
} catch (IllegalThreadStateException e) {
return true;
}
}
public Object getData(Types_Of_Data type) {
//TODO get rid of Pseudo-terminal will not be allocated because stdin is not a terminal.
switch (type) {
case OUTPUT: {
return this.getOutPutStream();
}
case ERROR: {
return this.getErrorStream();
}
case EXIT_CODE: {
return this.getExitCodeValue();
}
case STREAMS_MERGED: {
return this.getOutPutStream() + "\n" + this.getErrorStream();
}
case PROCESS_DATA_OBJECT: {
this.getErrorStream();
return this;
}
default: {
throw new IllegalArgumentException("Data Type " + type + " not supported yet!");
}
}
}
public int getExitCodeValue() {
try {
if (!returnCodeRetrieved) {
this.checked_process.waitFor();
this.returnCode = this.checked_process.exitValue();
this.returnCodeRetrieved = true;
this.checked_process.destroy();
}
} catch (Exception inter) {
throw new RuntimeException("Couldn't finish waiting for process " + this.checked_process + " termination", inter);
}
return this.returnCode;
}
public String getOutPutStream() {
if (this.outPutStream == null) {
try {
buildOutputAndErrorStreamData();
} catch (Exception e) {
throw new RuntimeException("Couldn't retrieve Output Stream data from process: " + this.checked_process.toString(), e);
}
}
this.outPutStream = this.outPutStream.replace("Pseudo-terminal will not be allocated because stdin is not a terminal.", "");
this.errorStream = this.errorStream.replace("Pseudo-terminal will not be allocated because stdin is not a terminal.", "");
return this.outPutStream;
}
public String getErrorStream() {
if (this.errorStream == null) {
try {
buildOutputAndErrorStreamData();
} catch (Exception e) {
throw new RuntimeException("Couldn't retrieve Error Stream data from process: " + this.checked_process.toString(), e);
}
}
this.outPutStream = this.outPutStream.replace("Pseudo-terminal will not be allocated because stdin is not a terminal.", "");
this.errorStream = this.errorStream.replace("Pseudo-terminal will not be allocated because stdin is not a terminal.", "");
return this.errorStream;
}
public String toString() {
StringBuilder result = new StringBuilder();
result.append(String.format("[OUTPUT STREAM]\n%s\n", this.outPutStream));
result.append(String.format("[ERROR STREAM]\n%s\n", this.errorStream));
result.append(String.format("[EXIT CODE]\n%d", this.returnCode));
return result.toString();
}
private void buildOutputAndErrorStreamData() throws IOException {
StringBuilder sbInStream = new StringBuilder();
StringBuilder sbErrorStream = new StringBuilder();
try {
InputStream in = this.checked_process.getInputStream();
InputStream inErrors = this.checked_process.getErrorStream();
BufferedReader inReader = new BufferedReader(new InputStreamReader(in));
BufferedReader inReaderErrors = new BufferedReader(new InputStreamReader(inErrors));
LOG.trace("Started retrieving data from streams of attached process: " + this.checked_process);
long lastStreamDataTime = System.currentTimeMillis(); //Store start time to be able to finish method if command hangs
long unconditionalExitTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(unconditionalExitDelayMinutes, TimeUnit.MINUTES); // Stop after 'unconditionalExitDelayMinutes' even if process is alive and sending output
final int BUFFER_LEN = 300;
char charBuffer[] = new char[BUFFER_LEN]; //Use char buffer to read output, size can be tuned.
boolean outputProduced = true; //Flag to check if previous iteration produced any output
while (isRunning(this.checked_process) || outputProduced) { //Continue if process is alive or some output was produced on previous iteration and there may be still some data to read.
outputProduced = false;
ZeppelinITUtils.sleep(100, false); //Some local commands can exit fast, but immediate stream reading will give no output and after iteration, 'while' condition will be false so we will not read out any output while it is still there, just need to wait for some time for it to appear in streams.
StringBuilder tempSB = new StringBuilder();
while (inReader.ready()) {
tempSB.setLength(0); // clean temporary StringBuilder
int readCount = inReader.read(charBuffer, 0, BUFFER_LEN); //read up to 'BUFFER_LEN' chars to buffer
if (readCount < 1) { // if nothing read or error occurred
break;
}
tempSB.append(charBuffer, 0, readCount);
sbInStream.append(tempSB);
if (tempSB.length() > 0) {
outputProduced = true; //set flag to know that we read something and there may be moire data, even if process already exited
}
lastStreamDataTime = System.currentTimeMillis(); //remember last time data was read from streams to be sure we are not looping infinitely
}
tempSB = new StringBuilder(); //Same, but for error stream
while (inReaderErrors.ready()) {
tempSB.setLength(0);
int readCount = inReaderErrors.read(charBuffer, 0, BUFFER_LEN);
if (readCount < 1) {
break;
}
tempSB.append(charBuffer, 0, readCount);
sbErrorStream.append(tempSB);
if (tempSB.length() > 0) {
outputProduced = true;
String temp = new String(tempSB);
temp = temp.replaceAll("Pseudo-terminal will not be allocated because stdin is not a terminal.", "");
//TODO : error stream output need to be improved, because it outputs downloading information.
if (printToConsole) {
if (!temp.trim().equals("")) {
if (temp.toLowerCase().contains("error") || temp.toLowerCase().contains("failed")) {
LOG.warn(temp.trim());
} else {
LOG.debug(temp.trim());
}
}
}
}
lastStreamDataTime = System.currentTimeMillis();
}
if ((System.currentTimeMillis() - lastStreamDataTime > silenceTimeout) || //Exit if silenceTimeout ms has passed from last stream read. Means process is alive but not sending any data.
(System.currentTimeMillis() > unconditionalExitTime)) { //Exit unconditionally - guards against alive process continuously sending data.
LOG.info("Conditions: " + (System.currentTimeMillis() - lastStreamDataTime > silenceTimeout) + " " +
(System.currentTimeMillis() > unconditionalExitTime));
this.checked_process.destroy();
try {
if ((System.currentTimeMillis() > unconditionalExitTime))
LOG.error("!@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@Unconditional exit occured@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@!\nsome process hag up for more than " + unconditionalExitDelayMinutes + " minutes.");
LOG.error("!##################################!");
StringWriter sw = new StringWriter();
new Exception("Exited from buildOutputAndErrorStreamData by timeout").printStackTrace(new PrintWriter(sw)); //Get stack trace
String exceptionAsString = sw.toString();
LOG.error(exceptionAsString);
} catch (Exception ignore) {
}
break;
}
}
in.close();
inErrors.close();
} finally {
this.outPutStream = sbInStream.toString();
this.errorStream = sbErrorStream.toString();
}
}
}

View file

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin;
import org.apache.commons.io.FileUtils;
import org.openqa.selenium.By;
import org.openqa.selenium.TimeoutException;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.chrome.ChromeDriver;
import org.openqa.selenium.firefox.FirefoxBinary;
import org.openqa.selenium.firefox.FirefoxDriver;
import org.openqa.selenium.firefox.FirefoxProfile;
import org.openqa.selenium.safari.SafariDriver;
import org.openqa.selenium.support.ui.ExpectedCondition;
import org.openqa.selenium.support.ui.WebDriverWait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import static org.junit.Assert.fail;
public class WebDriverManager {
public final static Logger LOG = LoggerFactory.getLogger(WebDriverManager.class);
private static String downLoadsDir = "";
static WebDriver getWebDriver() {
WebDriver driver = null;
if (driver == null) {
try {
FirefoxBinary ffox = new FirefoxBinary();
if ("true".equals(System.getenv("TRAVIS"))) {
ffox.setEnvironmentProperty("DISPLAY", ":99"); // xvfb is supposed to
// run with DISPLAY 99
}
int firefoxVersion = WebDriverManager.getFirefoxVersion();
LOG.info("Firefox version " + firefoxVersion + " detected");
downLoadsDir = FileUtils.getTempDirectory().toString();
String tempPath = downLoadsDir + "/firebug/";
downloadFireBug(firefoxVersion, tempPath);
final String firebugPath = tempPath + "firebug.xpi";
final String firepathPath = tempPath + "firepath.xpi";
FirefoxProfile profile = new FirefoxProfile();
profile.setPreference("browser.download.folderList", 2);
profile.setPreference("browser.download.dir", downLoadsDir);
profile.setPreference("browser.helperApps.alwaysAsk.force", false);
profile.setPreference("browser.download.manager.showWhenStarting", false);
profile.setPreference("browser.download.manager.showAlertOnComplete", false);
profile.setPreference("browser.download.manager.closeWhenDone", true);
profile.setPreference("app.update.auto", false);
profile.setPreference("app.update.enabled", false);
profile.setPreference("dom.max_script_run_time", 0);
profile.setPreference("dom.max_chrome_script_run_time", 0);
profile.setPreference("browser.helperApps.neverAsk.saveToDisk", "application/x-ustar,application/octet-stream,application/zip,text/csv,text/plain");
profile.setPreference("network.proxy.type", 0);
profile.addExtension(new File(firebugPath));
profile.addExtension(new File(firepathPath));
driver = new FirefoxDriver(ffox, profile);
} catch (Exception e) {
}
}
if (driver == null) {
try {
driver = new ChromeDriver();
} catch (Exception e) {
}
}
if (driver == null) {
try {
driver = new SafariDriver();
} catch (Exception e) {
}
}
String url;
if (System.getProperty("url") != null) {
url = System.getProperty("url");
} else {
url = "http://localhost:8080";
}
long start = System.currentTimeMillis();
boolean loaded = false;
driver.get(url);
while (System.currentTimeMillis() - start < 60 * 1000) {
// wait for page load
try {
(new WebDriverWait(driver, 5)).until(new ExpectedCondition<Boolean>() {
@Override
public Boolean apply(WebDriver d) {
return d.findElement(By.partialLinkText("Create new note"))
.isDisplayed();
}
});
loaded = true;
break;
} catch (TimeoutException e) {
driver.navigate().to(url);
}
}
if (loaded == false) {
fail();
}
return driver;
}
private static void downloadFireBug(int firefoxVersion, String tempPath) {
String firebugUrlString = null;
if (firefoxVersion < 23)
firebugUrlString = "http://getfirebug.com/releases/firebug/1.11/firebug-1.11.4.xpi";
else if (firefoxVersion >= 23 && firefoxVersion < 30)
firebugUrlString = "http://getfirebug.com/releases/firebug/1.12/firebug-1.12.8.xpi";
else if (firefoxVersion >= 30)
firebugUrlString = "http://getfirebug.com/releases/firebug/2.0/firebug-2.0.7.xpi";
LOG.info("firebug version: " + firefoxVersion + ", will be downloaded to " + tempPath);
try {
File firebugFile = new File(tempPath + "firebug.xpi");
URL firebugUrl = new URL(firebugUrlString);
if (!firebugFile.exists()) {
FileUtils.copyURLToFile(firebugUrl, firebugFile);
}
File firepathFile = new File(tempPath + "firepath.xpi");
URL firepathUrl = new URL("https://addons.cdn.mozilla.net/user-media/addons/11900/firepath-0.9.7.1-fx.xpi");
if (!firepathFile.exists()) {
FileUtils.copyURLToFile(firepathUrl, firepathFile);
}
} catch (IOException e) {
LOG.error("Download of firebug version: " + firefoxVersion + ", falied in path " + tempPath);
LOG.error(e.toString());
}
LOG.info("Download of firebug version: " + firefoxVersion + ", successful");
}
public static int getFirefoxVersion() {
try {
String firefoxVersionCmd = "firefox -v";
if (System.getProperty("os.name").startsWith("Mac OS")) {
firefoxVersionCmd = "/Applications/Firefox.app/Contents/MacOS/" + firefoxVersionCmd;
}
String versionString = (String) CommandExecutor.executeCommandLocalHost(firefoxVersionCmd, false, ProcessData.Types_Of_Data.OUTPUT);
return Integer.valueOf(versionString.replaceAll("Mozilla Firefox", "").trim().substring(0, 2));
} catch (Exception e) {
e.printStackTrace();
return -1;
}
}
}

View file

@ -68,70 +68,12 @@ public class ZeppelinIT {
private static final long MAX_PARAGRAPH_TIMEOUT_SEC = 60;
private WebDriver driver;
private void setWebDriver() {
if (driver == null) {
try {
FirefoxBinary ffox = new FirefoxBinary();
if ("true".equals(System.getenv("TRAVIS"))) {
ffox.setEnvironmentProperty("DISPLAY", ":99"); // xvfb is supposed to
// run with DISPLAY 99
}
FirefoxProfile profile = new FirefoxProfile();
driver = new FirefoxDriver(ffox, profile);
} catch (Exception e) {
LOG.error("Starting Firefox failed",e);
}
}
if (driver == null) {
try {
driver = new ChromeDriver();
} catch (Exception e) {
LOG.error("Starting Chrome failed",e);
}
}
if (driver == null) {
try {
driver = new SafariDriver();
} catch (Exception e) {
LOG.error("Starting Safari failed",e);
}
}
String url;
if (System.getProperty("url") != null) {
url = System.getProperty("url");
} else {
url = "http://localhost:8080";
}
long start = System.currentTimeMillis();
boolean loaded = false;
driver.get(url);
while (System.currentTimeMillis() - start < 60 * 1000) {
try { // wait for page load
WebElement element = pollingWait(By.partialLinkText("Create new note"), MAX_BROWSER_TIMEOUT_SEC);
loaded = element.isDisplayed();
break;
} catch (TimeoutException e) {
driver.navigate().to(url);
}
}
if (loaded == false) {
fail();
}
}
@Before
public void startUp() {
if (!endToEndTestEnabled()) {
return;
}
setWebDriver();
driver = WebDriverManager.getWebDriver();
}
@After
@ -186,125 +128,132 @@ public class ZeppelinIT {
return;
}
try {
createNewNote();
createNewNote();
// wait for first paragraph's " READY " status text
waitForParagraph(1, "READY");
// wait for first paragraph's " READY " status text
waitForParagraph(1, "READY");
/*
* print angular template
* %angular <div id='angularTestButton' ng-click='myVar=myVar+1'>BindingTest_{{myVar}}_</div>
*/
WebElement paragraph1Editor = driver.findElement(By.xpath(getParagraphXPath(1) + "//textarea"));
paragraph1Editor.sendKeys("println" + Keys.chord(Keys.SHIFT, "9") + "\""
+ Keys.chord(Keys.SHIFT, "5")
+ "angular <div id='angularTestButton' "
+ "ng" + Keys.chord(Keys.SUBTRACT) + "click='myVar=myVar+1'>"
+ "BindingTest_{{myVar}}_</div>\")");
paragraph1Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(1, "FINISHED");
/*
* print angular template
* %angular <div id='angularTestButton' ng-click='myVar=myVar+1'>BindingTest_{{myVar}}_</div>
*/
WebElement paragraph1Editor = driver.findElement(By.xpath(getParagraphXPath(1) + "//textarea"));
paragraph1Editor.sendKeys("println" + Keys.chord(Keys.SHIFT, "9") + "\""
+ Keys.chord(Keys.SHIFT, "5")
+ "angular <div id='angularTestButton' "
+ "ng" + Keys.chord(Keys.SUBTRACT) + "click='myVar=myVar+1'>"
+ "BindingTest_{{myVar}}_</div>\")");
paragraph1Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(1, "FINISHED");
// check expected text
waitForText("BindingTest__", By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
// check expected text
waitForText("BindingTest__", By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
/*
* Bind variable
* z.angularBind("myVar", 1)
*/
assertEquals(1, driver.findElements(By.xpath(getParagraphXPath(2) + "//textarea")).size());
WebElement paragraph2Editor = driver.findElement(By.xpath(getParagraphXPath(2) + "//textarea"));
paragraph2Editor.sendKeys("z.angularBind" + Keys.chord(Keys.SHIFT, "9") + "\"myVar\", 1)");
paragraph2Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(2, "FINISHED");
/*
* Bind variable
* z.angularBind("myVar", 1)
*/
assertEquals(1, driver.findElements(By.xpath(getParagraphXPath(2) + "//textarea")).size());
WebElement paragraph2Editor = driver.findElement(By.xpath(getParagraphXPath(2) + "//textarea"));
paragraph2Editor.sendKeys("z.angularBind" + Keys.chord(Keys.SHIFT, "9") + "\"myVar\", 1)");
paragraph2Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(2, "FINISHED");
// check expected text
waitForText("BindingTest_1_", By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
// check expected text
waitForText("BindingTest_1_", By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
/*
* print variable
* print("myVar="+z.angular("myVar"))
*/
WebElement paragraph3Editor = driver.findElement(By.xpath(getParagraphXPath(3) + "//textarea"));
paragraph3Editor.sendKeys(
"print" + Keys.chord(Keys.SHIFT, "9") + "\"myVar=\"" + Keys.chord(Keys.ADD)
+ "z.angular" + Keys.chord(Keys.SHIFT, "9") + "\"myVar\"))");
paragraph3Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(3, "FINISHED");
/*
* print variable
* print("myVar="+z.angular("myVar"))
*/
WebElement paragraph3Editor = driver.findElement(By.xpath(getParagraphXPath(3) + "//textarea"));
paragraph3Editor.sendKeys(
"print" + Keys.chord(Keys.SHIFT, "9") + "\"myVar=\"" + Keys.chord(Keys.ADD)
+ "z.angular" + Keys.chord(Keys.SHIFT, "9") + "\"myVar\"))");
paragraph3Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(3, "FINISHED");
// check expected text
waitForText("myVar=1", By.xpath(
getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]"));
// check expected text
waitForText("myVar=1", By.xpath(
getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]"));
/*
* Click element
*/
driver.findElement(By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click();
/*
* Click element
*/
driver.findElement(By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click();
// check expected text
waitForText("BindingTest_2_", By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
// check expected text
waitForText("BindingTest_2_", By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
/*
* Register watcher
* z.angularWatch("myVar", (before:Object, after:Object, context:org.apache.zeppelin.interpreter.InterpreterContext) => {
* z.run(2, context)
* }
*/
WebElement paragraph4Editor = driver.findElement(By.xpath(getParagraphXPath(4) + "//textarea"));
paragraph4Editor.sendKeys(
"z.angularWatch" + Keys.chord(Keys.SHIFT, "9") + "\"myVar\", "
+ Keys.chord(Keys.SHIFT, "9")
+ "before:Object, after:Object, context:org.apache.zeppelin.interpreter.InterpreterContext)"
+ Keys.EQUALS + ">{ z.run" +Keys.chord(Keys.SHIFT, "9") + "2, context)}");
paragraph4Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(4, "FINISHED");
/*
* Register watcher
* z.angularWatch("myVar", (before:Object, after:Object, context:org.apache.zeppelin.interpreter.InterpreterContext) => {
* z.run(2, context)
* }
*/
WebElement paragraph4Editor = driver.findElement(By.xpath(getParagraphXPath(4) + "//textarea"));
paragraph4Editor.sendKeys(
"z.angularWatch" + Keys.chord(Keys.SHIFT, "9") + "\"myVar\", "
+ Keys.chord(Keys.SHIFT, "9")
+ "before:Object, after:Object, context:org.apache.zeppelin.interpreter.InterpreterContext)"
+ Keys.EQUALS + ">{ z.run" +Keys.chord(Keys.SHIFT, "9") + "2, context)}");
paragraph4Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(4, "FINISHED");
/*
* Click element, again and see watcher works
*/
driver.findElement(By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click();
/*
* Click element, again and see watcher works
*/
driver.findElement(By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click();
// check expected text
waitForText("BindingTest_3_", By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
waitForParagraph(3, "FINISHED");
// check expected text
waitForText("BindingTest_3_", By.xpath(
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
waitForParagraph(3, "FINISHED");
// check expected text by watcher
waitForText("myVar=3", By.xpath(
getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]"));
// check expected text by watcher
waitForText("myVar=3", By.xpath(
getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]"));
/*
* Unbind
* z.angularUnbind("myVar")
*/
WebElement paragraph5Editor = driver.findElement(By.xpath(getParagraphXPath(5) + "//textarea"));
paragraph5Editor.sendKeys(
"z.angularUnbind" + Keys.chord(Keys.SHIFT, "9") + "\"myVar\")");
paragraph5Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(5, "FINISHED");
/*
* Unbind
* z.angularUnbind("myVar")
*/
WebElement paragraph5Editor = driver.findElement(By.xpath(getParagraphXPath(5) + "//textarea"));
paragraph5Editor.sendKeys(
"z.angularUnbind" + Keys.chord(Keys.SHIFT, "9") + "\"myVar\")");
paragraph5Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(5, "FINISHED");
// check expected text
waitForText("BindingTest__",
By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
// check expected text
waitForText("BindingTest__",
By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
/*
* Bind again and see rebind works.
*/
paragraph2Editor = driver.findElement(By.xpath(getParagraphXPath(2) + "//textarea"));
paragraph2Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(2, "FINISHED");
/*
* Bind again and see rebind works.
*/
paragraph2Editor = driver.findElement(By.xpath(getParagraphXPath(2) + "//textarea"));
paragraph2Editor.sendKeys(Keys.chord(Keys.SHIFT, Keys.ENTER));
waitForParagraph(2, "FINISHED");
// check expected text
waitForText("BindingTest_1_",
By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
// check expected text
waitForText("BindingTest_1_",
By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
System.out.println("testCreateNotebook Test executed");
driver.findElement(By.xpath("//*[@id='main']/div//h3/span[1]/button[@tooltip='Remove the notebook']"))
.sendKeys(Keys.ENTER);
ZeppelinITUtils.sleep(1000, true);
driver.findElement(By.xpath("//div[@class='modal-dialog'][contains(.,'delete this notebook')]" +
"//div[@class='modal-footer']//button[contains(.,'OK')]")).click();
ZeppelinITUtils.sleep(100, true);
System.out.println("testCreateNotebook Test executed");
} catch (ElementNotVisibleException e) {
File scrFile = ((TakesScreenshot)driver).getScreenshotAs(OutputType.FILE);

View file

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZeppelinITUtils {
public final static Logger LOG = LoggerFactory.getLogger(ZeppelinITUtils.class);
public static void sleep(long millis, boolean logOutput) {
if (logOutput) {
LOG.info("Starting sleeping for " + (millis / 1000) + " seconds...");
LOG.info("Caller: " + Thread.currentThread().getStackTrace()[2]);
}
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (logOutput) {
LOG.info("Finished.");
}
}
}

View file

@ -29,8 +29,12 @@ import java.util.concurrent.Executors;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethodBase;
import org.apache.commons.httpclient.methods.*;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.PutMethod;
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterSetting;
@ -207,7 +211,7 @@ public abstract class AbstractTestRestApi {
}
LOG.info("Terminating test Zeppelin...");
ZeppelinServer.jettyServer.stop();
ZeppelinServer.jettyWebServer.stop();
executor.shutdown();
long s = System.currentTimeMillis();

View file

@ -18,8 +18,6 @@
package org.apache.zeppelin.rest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -28,13 +26,10 @@ import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.PutMethod;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.rest.message.NewParagraphRequest;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.JsonResponse;
import org.apache.zeppelin.server.ZeppelinServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -50,8 +45,6 @@ import static org.junit.Assert.*;
/**
* BASIC Zeppelin rest api tests
*
* @author anthonycorbacho
*
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZeppelinRestApiTest extends AbstractTestRestApi {
@ -533,6 +526,28 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
assertThat("", deleteCron, isAllowed());
deleteCron.releaseConnection();
ZeppelinServer.notebook.removeNote(note.getId());
}
}
@Test
public void testRegressionZEPPELIN_527() throws IOException {
Note note = ZeppelinServer.notebook.createNote();
note.setName("note for run test");
Paragraph paragraph = note.addParagraph();
paragraph.setText("%spark\nval param = z.input(\"param\").toString\nprintln(param)");
note.persist();
GetMethod getNoteJobs = httpGet("/notebook/job/" + note.getId());
assertThat("test notebook jobs run:", getNoteJobs, isAllowed());
Map<String, Object> resp = gson.fromJson(getNoteJobs.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
}.getType());
List<Map<String, String>> body = (List<Map<String, String>>) resp.get("body");
assertFalse(body.get(0).containsKey("started"));
assertFalse(body.get(0).containsKey("finished"));
getNoteJobs.releaseConnection();
ZeppelinServer.notebook.removeNote(note.getId());
}
}

View file

@ -60,7 +60,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
AbstractTestRestApi.startUp();
gson = new Gson();
notebook = ZeppelinServer.notebook;
notebookServer = ZeppelinServer.notebookServer;
notebookServer = ZeppelinServer.notebookWsServer;
}
@AfterClass

View file

@ -11,6 +11,7 @@
"angular-animate": "1.3.8",
"angular-touch": "1.3.8",
"angular-route": "1.3.8",
"angular-resource": "1.3.8",
"angular-bootstrap": "~0.13.0",
"angular-websocket": "~1.0.13",
"ace-builds": "1.1.9",

View file

@ -32,7 +32,8 @@ angular.module('zeppelinWebApp', [
'puElasticInput',
'xeditable',
'ngToast',
'focus-if'
'focus-if',
'ngResource'
])
.filter('breakFilter', function() {
return function (text) {
@ -50,6 +51,10 @@ angular.module('zeppelinWebApp', [
templateUrl: 'app/notebook/notebook.html',
controller: 'NotebookCtrl'
})
.when('/notebook/:noteId/paragraph?=:paragraphId', {
templateUrl: 'app/notebook/notebook.html',
controller: 'NotebookCtrl'
})
.when('/notebook/:noteId/paragraph/:paragraphId?', {
templateUrl: 'app/notebook/notebook.html',
controller: 'NotebookCtrl'
@ -58,6 +63,10 @@ angular.module('zeppelinWebApp', [
templateUrl: 'app/interpreter/interpreter.html',
controller: 'InterpreterCtrl'
})
.when('/search/:searchTerm', {
templateUrl: 'app/search/result-list.html',
controller: 'SearchResultCtrl'
})
.otherwise({
redirectTo: '/'
});

View file

@ -1,4 +1,5 @@
/* jshint loopfunc: true */
/* global $: false */
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,9 +15,9 @@
*/
'use strict';
angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $route, $routeParams, $location,
$rootScope, $http, websocketMsgSrv, baseUrlSrv,
$timeout, SaveAsService) {
angular.module('zeppelinWebApp').controller('NotebookCtrl',
function($scope, $route, $routeParams, $location, $rootScope, $http,
websocketMsgSrv, baseUrlSrv, $timeout, SaveAsService) {
$scope.note = null;
$scope.showEditor = false;
$scope.editorToggled = false;
@ -66,6 +67,26 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
/** Init the new controller */
var initNotebook = function() {
websocketMsgSrv.getNotebook($routeParams.noteId);
var currentRoute = $route.current;
if (currentRoute) {
setTimeout(
function() {
var routeParams = currentRoute.params;
var $id = $('#' + routeParams.paragraph + '_container');
if ($id.length > 0) {
// adjust for navbar
var top = $id.offset().top - 103;
$('html, body').scrollTo({top: top, left: 0});
}
},
1000
);
}
};
initNotebook();

View file

@ -0,0 +1,119 @@
/* jshint loopfunc: true */
/*
* Licensed under the Apache License, Version 2.0 (the 'License');
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
angular
.module('zeppelinWebApp')
.controller('SearchResultCtrl', function($scope, $routeParams, searchService) {
var results = searchService.search({'q': $routeParams.searchTerm}).query();
results.$promise.then(function(result) {
$scope.notes = result.body.map(function(note) {
// redirect to notebook when search result is a notebook itself,
// not a paragraph
if (!/\/paragraph\//.test(note.id)) {
return note;
}
note.id = note.id.replace('paragraph/', '?paragraph=') +
'&term=' +
$routeParams.searchTerm;
return note;
});
});
$scope.page = 0;
$scope.allResults = false;
$scope.highlightSearchResults = function(note) {
return function(_editor) {
function getEditorMode(text) {
var editorModes = {
'ace/mode/scala': /^%spark/,
'ace/mode/sql': /^%(\w*\.)?\wql/,
'ace/mode/markdown': /^%md/,
'ace/mode/sh': /^%sh/
};
return Object.keys(editorModes).reduce(function(res, mode) {
return editorModes[mode].test(text)? mode : res;
}, 'ace/mode/scala');
}
var Range = ace.require('ace/range').Range;
_editor.setOption('highlightActiveLine', false);
_editor.$blockScrolling = Infinity;
_editor.setReadOnly(true);
_editor.renderer.setShowGutter(false);
_editor.setTheme('ace/theme/chrome');
_editor.getSession().setMode(getEditorMode(note.text));
function getIndeces(term) {
return function(str) {
var indeces = [];
var i = -1;
while((i = str.indexOf(term, i + 1)) >= 0) {
indeces.push(i);
}
return indeces;
};
}
var lines = note.snippet
.split('\n')
.map(function(line, row) {
var match = line.match(/<B>(.+?)<\/B>/);
// return early if nothing to highlight
if (!match) {
return line;
}
var term = match[1];
var __line = line
.replace(/<B>/g, '')
.replace(/<\/B>/g, '');
var indeces = getIndeces(term)(__line);
indeces.forEach(function(start) {
var end = start + term.length;
_editor
.getSession()
.addMarker(
new Range(row, start, row, end),
'search-results-highlight',
'line'
);
});
return __line;
});
// resize editor based on content length
_editor.setOption(
'maxLines',
lines.reduce(function(len, line) {return len + line.length;}, 0)
);
_editor.getSession().setValue(lines.join('\n'));
};
};
});

View file

@ -0,0 +1,42 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div ng-controller="SearchResultCtrl" class="searchResults">
<div class="row">
<div class="col-sm-8" style="margin: 0 auto; float: none">
<ul class="search-results">
<li class="panel panel-default" ng-repeat="note in notes">
<div class="panel-heading">
<h4>
<i style="font-size: 10px;" class="icon-doc"></i>
<a class="search-results-header"
href="#/notebook/{{note.id}}">
{{note.name || 'Note ' + note.id}}
</a>
</h4>
</div>
<div class="panel-body">
<div
class="search-result"
ui-ace="{
onLoad: highlightSearchResults(note),
require: ['ace/ext/language_tools']
}"
ng-model="_"
>
</div>
</div>
</li>
</div>
</div>
</div>

View file

@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
.search-results {
list-style-type: none;
margin: 10% auto 0;
padding: 0;
}
.search-result {
height: 200px;
}
.search-results-header {
text-decoration: none;
}
.search-results-highlight {
background-color: yellow;
position: absolute;
}
/* remove error highlighting */
.search-results .ace_invalid {
background: none !important;
}

View file

@ -15,8 +15,7 @@
'use strict';
angular.module('zeppelinWebApp').controller('NavCtrl', function($scope, $rootScope, $routeParams,
notebookListDataFactory, websocketMsgSrv,
arrayOrderingSrv) {
$location, notebookListDataFactory, websocketMsgSrv, arrayOrderingSrv) {
/** Current list of notes (ids) */
var vm = this;
@ -35,6 +34,19 @@ angular.module('zeppelinWebApp').controller('NavCtrl', function($scope, $rootSco
vm.connected = param;
});
$rootScope.$on('$locationChangeSuccess', function () {
var path = $location.path();
// hacky solution to clear search bar
// TODO(felizbear): figure out how to make ng-click work in navbar
if (path === '/') {
$scope.searchTerm = '';
}
});
$scope.search = function() {
$location.url(/search/ + $scope.searchTerm);
};
function loadNotes() {
websocketMsgSrv.getNotebookList();
}

View file

@ -43,7 +43,34 @@ limitations under the License.
<a href="#/interpreter">Interpreter</a>
</li>
</ul>
<ul class="nav navbar-nav navbar-right" style="margin-top:10px; margin-right:5px;">
<li>
<!--TODO(bzz): move to Typeahead https://angular-ui.github.io/bootstrap -->
<form role="search"
style="width: 300px; display: inline-block; margin: 0 10px"
ng-submit="search()">
<div class="input-group">
<input
type="text"
ng-model="searchTerm"
ng-disabled="!navbar.connected"
class="form-control"
placeholder="Search in your notebooks"
/>
<span class="input-group-btn">
<button
type="submit"
class="btn btn-default"
ng-disabled="!navbar.connected"
>
<i class="glyphicon glyphicon-search"></i>
</button>
</span>
</div>
</form>
</li>
<li class="server-status">
<i class="fa fa-circle" ng-class="{'server-connected':navbar.connected, 'server-disconnected':!navbar.connected}"></i>
<span ng-show="navbar.connected">Connected</span>

View file

@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
angular.module('zeppelinWebApp').service('searchService', function($resource, baseUrlSrv) {
this.search = function(term) {
console.log('Searching for: %o', term.q);
if (!term.q) { //TODO(bzz): empty string check
return;
}
var encQuery = window.encodeURIComponent(term.q);
return $resource(baseUrlSrv.getRestApiBase()+'/notebook/search?q='+encQuery, {}, {
query: {method:'GET'}
});
};
});

View file

@ -49,6 +49,7 @@ limitations under the License.
<!-- endbuild -->
<!-- build:css(.tmp) styles/main.css -->
<link rel="stylesheet" href="app/home/home.css">
<link rel="stylesheet" href="app/search/search.css">
<link rel="stylesheet" href="app/notebook/notebook.css">
<link rel="stylesheet" href="app/notebook/paragraph/paragraph.css">
<link rel="stylesheet" href="app/interpreter/interpreter.css">
@ -95,6 +96,7 @@ limitations under the License.
<script src="bower_components/angular-animate/angular-animate.js"></script>
<script src="bower_components/angular-touch/angular-touch.js"></script>
<script src="bower_components/angular-route/angular-route.js"></script>
<script src="bower_components/angular-resource/angular-resource.js"></script>
<script src="bower_components/angular-bootstrap/ui-bootstrap-tpls.js"></script>
<script src="bower_components/angular-websocket/angular-websocket.min.js"></script>
<script src="bower_components/ace-builds/src-noconflict/ace.js"></script>
@ -131,6 +133,7 @@ limitations under the License.
<script src="app/notebook/notebook.controller.js"></script>
<script src="app/interpreter/interpreter.controller.js"></script>
<script src="app/notebook/paragraph/paragraph.controller.js"></script>
<script src="app/search/result-list.controller.js"></script>
<script src="components/arrayOrderingSrv/arrayOrdering.service.js"></script>
<script src="components/navbar/navbar.controller.js"></script>
<script src="components/ngescape/ngescape.directive.js"></script>
@ -147,6 +150,7 @@ limitations under the License.
<script src="components/baseUrl/baseUrl.service.js"></script>
<script src="components/browser-detect/browserDetect.service.js"></script>
<script src="components/saveAs/saveAs.service.js"></script>
<script src="components/searchService/search.service.js"></script>
<!-- endbuild -->
</body>
</html>

View file

@ -29,6 +29,7 @@ module.exports = function(config) {
'bower_components/angular-animate/angular-animate.js',
'bower_components/angular-touch/angular-touch.js',
'bower_components/angular-route/angular-route.js',
'bower_components/angular-resource/angular-resource.js',
'bower_components/angular-bootstrap/ui-bootstrap-tpls.js',
'bower_components/angular-websocket/angular-websocket.min.js',
'bower_components/ace-builds/src-noconflict/ace.js',

View file

@ -123,6 +123,36 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-highlighter</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>

View file

@ -18,12 +18,12 @@
package org.apache.zeppelin.conf;
import java.net.URL;
import java.util.*;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.configuration.tree.ConfigurationNode;
import org.apache.zeppelin.notebook.repo.S3NotebookRepo;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
/**
* Zeppelin configuration.
*
* @author Leemoonsoo
*
*/
public class ZeppelinConfiguration extends XMLConfiguration {
private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";
@ -415,7 +413,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.cassandra.CassandraInterpreter,"
+ "org.apache.zeppelin.geode.GeodeOqlInterpreter,"
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+ "org.apache.zeppelin.kylin.KylinInterpreter"),
+ "org.apache.zeppelin.kylin.KylinInterpreter,"
+ "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),

View file

@ -20,12 +20,12 @@ package org.apache.zeppelin.notebook;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.Input;
@ -39,6 +39,7 @@ import org.apache.zeppelin.notebook.utility.IdHashes;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.search.SearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,40 +50,43 @@ import com.google.gson.Gson;
*/
public class Note implements Serializable, JobListener {
transient Logger logger = LoggerFactory.getLogger(Note.class);
List<Paragraph> paragraphs = new LinkedList<Paragraph>();
private String name;
private static final long serialVersionUID = 7920699076577612429L;
List<Paragraph> paragraphs = new LinkedList<>();
private String name = "";
private String id;
Map<String, List<AngularObject>> angularObjects = new HashMap<String, List<AngularObject>>();
@SuppressWarnings("rawtypes")
Map<String, List<AngularObject>> angularObjects = new HashMap<>();
private transient NoteInterpreterLoader replLoader;
private transient ZeppelinConfiguration conf;
private transient JobListenerFactory jobListenerFactory;
private transient NotebookRepo repo;
private transient SearchService index;
/**
* note configurations.
*
* - looknfeel - cron
*/
private Map<String, Object> config = new HashMap<String, Object>();
private Map<String, Object> config = new HashMap<>();
/**
* note information.
*
* - cron : cron expression validity.
*/
private Map<String, Object> info = new HashMap<String, Object>();
private Map<String, Object> info = new HashMap<>();
public Note() {}
public Note(NotebookRepo repo,
NoteInterpreterLoader replLoader,
JobListenerFactory jobListenerFactory) {
public Note(NotebookRepo repo, NoteInterpreterLoader replLoader,
JobListenerFactory jlFactory, SearchService noteIndex) {
this.repo = repo;
this.replLoader = replLoader;
this.jobListenerFactory = jobListenerFactory;
this.jobListenerFactory = jlFactory;
this.index = noteIndex;
generateId();
}
@ -130,6 +134,11 @@ public class Note implements Serializable, JobListener {
this.repo = repo;
}
public void setIndex(SearchService index) {
this.index = index;
}
@SuppressWarnings("rawtypes")
public Map<String, List<AngularObject>> getAngularObjects() {
return angularObjects;
}
@ -193,14 +202,16 @@ public class Note implements Serializable, JobListener {
* Remove paragraph by id.
*
* @param paragraphId
* @return
* @return a paragraph that was deleted, or <code>null</code> otherwise
*/
public Paragraph removeParagraph(String paragraphId) {
synchronized (paragraphs) {
for (int i = 0; i < paragraphs.size(); i++) {
Paragraph p = paragraphs.get(i);
Iterator<Paragraph> i = paragraphs.iterator();
while (i.hasNext()) {
Paragraph p = i.next();
if (p.getId().equals(paragraphId)) {
paragraphs.remove(i);
index.deleteIndexDoc(this, p);
i.remove();
return p;
}
}
@ -293,7 +304,7 @@ public class Note implements Serializable, JobListener {
return paragraphs.get(paragraphs.size() - 1);
}
}
public List<Map<String, String>> generateParagraphsInfo (){
List<Map<String, String>> paragraphsInfo = new LinkedList<>();
synchronized (paragraphs) {
@ -301,8 +312,12 @@ public class Note implements Serializable, JobListener {
Map<String, String> info = new HashMap<>();
info.put("id", p.getId());
info.put("status", p.getStatus().toString());
info.put("started", p.getDateStarted().toString());
info.put("finished", p.getDateFinished().toString());
if (p.getDateStarted() != null) {
info.put("started", p.getDateStarted().toString());
}
if (p.getDateFinished() != null) {
info.put("finished", p.getDateFinished().toString());
}
if (p.getStatus().isRunning()) {
info.put("progress", String.valueOf(p.progress()));
}
@ -310,7 +325,7 @@ public class Note implements Serializable, JobListener {
}
}
return paragraphsInfo;
}
}
/**
* Run all paragraphs sequentially.
@ -360,7 +375,7 @@ public class Note implements Serializable, JobListener {
}
private void snapshotAngularObjectRegistry() {
angularObjects = new HashMap<String, List<AngularObject>>();
angularObjects = new HashMap<>();
List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
if (settings == null || settings.size() == 0) {
@ -376,6 +391,7 @@ public class Note implements Serializable, JobListener {
public void persist() throws IOException {
snapshotAngularObjectRegistry();
index.updateIndexDoc(this);
repo.save(this);
}
@ -385,7 +401,7 @@ public class Note implements Serializable, JobListener {
public Map<String, Object> getConfig() {
if (config == null) {
config = new HashMap<String, Object>();
config = new HashMap<>();
}
return config;
}
@ -396,7 +412,7 @@ public class Note implements Serializable, JobListener {
public Map<String, Object> getInfo() {
if (info == null) {
info = new HashMap<String, Object>();
info = new HashMap<>();
}
return info;
}
@ -407,17 +423,10 @@ public class Note implements Serializable, JobListener {
@Override
public void beforeStatusChange(Job job, Status before, Status after) {
Paragraph p = (Paragraph) job;
}
@Override
public void afterStatusChange(Job job, Status before, Status after) {
Paragraph p = (Paragraph) job;
}
private static Logger logger() {
Logger logger = LoggerFactory.getLogger(Note.class);
return logger;
}
@Override

View file

@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
@ -38,6 +39,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
@ -56,7 +58,10 @@ import org.slf4j.LoggerFactory;
*/
public class Notebook {
Logger logger = LoggerFactory.getLogger(Notebook.class);
@SuppressWarnings("unused") @Deprecated //TODO(bzz): remove unused
private SchedulerFactory schedulerFactory;
private InterpreterFactory replFactory;
/** Keep the order. */
Map<String, Note> notes = new LinkedHashMap<String, Note>();
@ -65,22 +70,45 @@ public class Notebook {
private org.quartz.Scheduler quartzSched;
private JobListenerFactory jobListenerFactory;
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
/**
* Main constructor \w manual Dependency Injection
*
* @param conf
* @param notebookRepo
* @param schedulerFactory
* @param replFactory
* @param jobListenerFactory
* @param notebookIndex - (nullable) for indexing all notebooks on creating.
*
* @throws IOException
* @throws SchedulerException
*/
public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo,
SchedulerFactory schedulerFactory,
InterpreterFactory replFactory, JobListenerFactory jobListenerFactory) throws IOException,
SchedulerException {
InterpreterFactory replFactory, JobListenerFactory jobListenerFactory,
SearchService notebookIndex) throws IOException, SchedulerException {
this.conf = conf;
this.notebookRepo = notebookRepo;
this.schedulerFactory = schedulerFactory;
this.replFactory = replFactory;
this.jobListenerFactory = jobListenerFactory;
this.notebookIndex = notebookIndex;
quertzSchedFact = new org.quartz.impl.StdSchedulerFactory();
quartzSched = quertzSchedFact.getScheduler();
quartzSched.start();
CronJob.notebook = this;
loadAllNotes();
if (this.notebookIndex != null) {
long start = System.nanoTime();
logger.info("Notebook indexing started...");
notebookIndex.addIndexDocs(notes.values());
logger.info("Notebook indexing finished: {} indexed in {}s", notes.size(),
TimeUnit.NANOSECONDS.toSeconds(start - System.nanoTime()));
}
}
/**
@ -90,11 +118,14 @@ public class Notebook {
* @throws IOException
*/
public Note createNote() throws IOException {
Note note;
if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) {
return createNote(replFactory.getDefaultInterpreterSettingList());
note = createNote(replFactory.getDefaultInterpreterSettingList());
} else {
return createNote(null);
note = createNote(null);
}
notebookIndex.addIndexDoc(note);
return note;
}
/**
@ -105,7 +136,7 @@ public class Notebook {
*/
public Note createNote(List<String> interpreterIds) throws IOException {
NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory);
Note note = new Note(notebookRepo, intpLoader, jobListenerFactory);
Note note = new Note(notebookRepo, intpLoader, jobListenerFactory, notebookIndex);
intpLoader.setNoteId(note.id());
synchronized (notes) {
notes.put(note.id(), note);
@ -114,6 +145,7 @@ public class Notebook {
bindInterpretersToNote(note.id(), interpreterIds);
}
notebookIndex.addIndexDoc(note);
note.persist();
return note;
}
@ -144,6 +176,8 @@ public class Notebook {
for (Paragraph p : paragraphs) {
newNote.addCloneParagraph(p);
}
notebookIndex.addIndexDoc(newNote);
newNote.persist();
return newNote;
}
@ -183,9 +217,11 @@ public class Notebook {
public void removeNote(String id) {
Note note;
synchronized (notes) {
note = notes.remove(id);
}
notebookIndex.deleteIndexDocs(note);
// remove from all interpreter instance's angular object registry
for (InterpreterSetting settings : replFactory.get()) {
@ -204,6 +240,7 @@ public class Notebook {
}
}
@SuppressWarnings("rawtypes")
private Note loadNoteFromRepo(String id) {
Note note = null;
try {
@ -215,20 +252,17 @@ public class Notebook {
return null;
}
// set NoteInterpreterLoader
NoteInterpreterLoader noteInterpreterLoader = new NoteInterpreterLoader(
replFactory);
note.setReplLoader(noteInterpreterLoader);
noteInterpreterLoader.setNoteId(note.id());
//Manually inject ALL dependencies, as DI constructor was NOT used
note.setIndex(this.notebookIndex);
NoteInterpreterLoader replLoader = new NoteInterpreterLoader(replFactory);
note.setReplLoader(replLoader);
replLoader.setNoteId(note.id());
// set JobListenerFactory
note.setJobListenerFactory(jobListenerFactory);
// set notebookRepo
note.setNotebookRepo(notebookRepo);
Map<String, SnapshotAngularObject> angularObjectSnapshot =
new HashMap<String, SnapshotAngularObject>();
Map<String, SnapshotAngularObject> angularObjectSnapshot = new HashMap<>();
// restore angular object --------------
Date lastUpdatedDate = new Date(0);
@ -246,15 +280,11 @@ public class Notebook {
for (String intpGroupName : savedObjects.keySet()) {
List<AngularObject> objectList = savedObjects.get(intpGroupName);
for (AngularObject savedObject : objectList) {
SnapshotAngularObject snapshot = angularObjectSnapshot.get(savedObject.getName());
for (AngularObject object : objectList) {
SnapshotAngularObject snapshot = angularObjectSnapshot.get(object.getName());
if (snapshot == null || snapshot.getLastUpdate().before(lastUpdatedDate)) {
angularObjectSnapshot.put(
savedObject.getName(),
new SnapshotAngularObject(
intpGroupName,
savedObject,
lastUpdatedDate));
angularObjectSnapshot.put(object.getName(),
new SnapshotAngularObject(intpGroupName, object, lastUpdatedDate));
}
}
}
@ -310,6 +340,7 @@ public class Notebook {
}
}
@SuppressWarnings("rawtypes")
class SnapshotAngularObject {
String intpGroupId;
AngularObject angularObject;
@ -344,12 +375,9 @@ public class Notebook {
}
synchronized (notes) {
List<Note> noteList = new ArrayList<Note>(notes.values());
Collections.sort(noteList, new Comparator() {
Collections.sort(noteList, new Comparator<Note>() {
@Override
public int compare(Object one, Object two) {
Note note1 = (Note) one;
Note note2 = (Note) two;
public int compare(Note note1, Note note2) {
String name1 = note1.id();
if (note1.getName() != null) {
name1 = note1.getName();
@ -358,7 +386,6 @@ public class Notebook {
if (note2.getName() != null) {
name2 = note2.getName();
}
((Note) one).getName();
return name1.compareTo(name2);
}
});
@ -459,6 +486,7 @@ public class Notebook {
public void close() {
this.notebookRepo.close();
this.notebookIndex.close();
}
}

View file

@ -23,7 +23,6 @@ import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.JobListener;
import org.slf4j.Logger;
@ -35,10 +34,10 @@ import java.util.*;
/**
* Paragraph is a representation of an execution unit.
*
* @author Leemoonsoo
*/
public class Paragraph extends Job implements Serializable, Cloneable {
private static final transient long serialVersionUID = -6328572073497992016L;
private static final long serialVersionUID = -6328572073497992016L;
private transient NoteInterpreterLoader replLoader;
private transient Note note;

View file

@ -46,11 +46,11 @@ public class NotebookRepoSync implements NotebookRepo {
private List<NotebookRepo> repos = new ArrayList<NotebookRepo>();
/**
* @param noteIndex
* @param (conf)
* @throws - Exception
*/
public NotebookRepoSync(ZeppelinConfiguration conf) throws Exception {
config = conf;
String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
@ -134,20 +134,18 @@ public class NotebookRepoSync implements NotebookRepo {
}
/**
* copy new/updated notes from source to destination storage
* Copies new/updated notes from source to destination storage
*
* @throws IOException
*/
void sync(int sourceRepoIndex, int destRepoIndex) throws IOException {
LOG.info("Sync started");
NotebookRepo sourceRepo = getRepo(sourceRepoIndex);
NotebookRepo destRepo = getRepo(destRepoIndex);
List <NoteInfo> sourceNotes = sourceRepo.list();
List <NoteInfo> destNotes = destRepo.list();
NotebookRepo srcRepo = getRepo(sourceRepoIndex);
NotebookRepo dstRepo = getRepo(destRepoIndex);
List <NoteInfo> srcNotes = srcRepo.list();
List <NoteInfo> dstNotes = dstRepo.list();
Map<String, List<String>> noteIDs = notesCheckDiff(sourceNotes,
sourceRepo,
destNotes,
destRepo);
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
List<String> pushNoteIDs = noteIDs.get(pushKey);
List<String> pullNoteIDs = noteIDs.get(pullKey);
if (!pushNoteIDs.isEmpty()) {
@ -155,7 +153,7 @@ public class NotebookRepoSync implements NotebookRepo {
for (String id : pushNoteIDs) {
LOG.info("ID : " + id);
}
pushNotes(pushNoteIDs, sourceRepo, destRepo);
pushNotes(pushNoteIDs, srcRepo, dstRepo);
} else {
LOG.info("Nothing to push");
}
@ -165,7 +163,7 @@ public class NotebookRepoSync implements NotebookRepo {
for (String id : pullNoteIDs) {
LOG.info("ID : " + id);
}
pushNotes(pullNoteIDs, destRepo, sourceRepo);
pushNotes(pullNoteIDs, dstRepo, srcRepo);
} else {
LOG.info("Nothing to pull");
}

View file

@ -0,0 +1,391 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.search;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.WildcardQuery;
import org.apache.lucene.search.highlight.Highlighter;
import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
import org.apache.lucene.search.highlight.QueryScorer;
import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
import org.apache.lucene.search.highlight.TextFragment;
import org.apache.lucene.search.highlight.TokenSources;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* Search (both, indexing and query) the notebooks using Lucene.
*
* Query is thread-safe, as creates new IndexReader every time.
* Index is thread-safe, as re-uses single IndexWriter, which is thread-safe.
*/
public class LuceneSearch implements SearchService {
private static final Logger LOG = LoggerFactory.getLogger(LuceneSearch.class);
private static final String SEARCH_FIELD = "contents";
static final String PARAGRAPH = "paragraph";
static final String ID_FIELD = "id";
Directory ramDirectory;
Analyzer analyzer;
IndexWriterConfig iwc;
IndexWriter writer;
public LuceneSearch() {
ramDirectory = new RAMDirectory();
analyzer = new StandardAnalyzer();
iwc = new IndexWriterConfig(analyzer);
try {
writer = new IndexWriter(ramDirectory, iwc);
} catch (IOException e) {
LOG.error("Failed to reate new IndexWriter", e);
}
}
/* (non-Javadoc)
* @see org.apache.zeppelin.search.Search#query(java.lang.String)
*/
@Override
public List<Map<String, String>> query(String queryStr) {
if (null == ramDirectory) {
throw new IllegalStateException(
"Something went wrong on instance creation time, index dir is null");
}
List<Map<String, String>> result = Collections.emptyList();
try (IndexReader indexReader = DirectoryReader.open(ramDirectory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Analyzer analyzer = new StandardAnalyzer();
QueryParser parser = new QueryParser(SEARCH_FIELD, analyzer);
Query query = parser.parse(queryStr);
LOG.debug("Searching for: " + query.toString(SEARCH_FIELD));
SimpleHTMLFormatter htmlFormatter = new SimpleHTMLFormatter();
Highlighter highlighter = new Highlighter(htmlFormatter, new QueryScorer(query));
result = doSearch(indexSearcher, query, analyzer, highlighter);
indexReader.close();
} catch (IOException e) {
LOG.error("Failed to open index dir {}, make sure indexing finished OK", ramDirectory, e);
} catch (ParseException e) {
LOG.error("Failed to parse query " + queryStr, e);
}
return result;
}
private List<Map<String, String>> doSearch(IndexSearcher searcher, Query query,
Analyzer analyzer, Highlighter highlighter) {
List<Map<String, String>> matchingParagraphs = Lists.newArrayList();
ScoreDoc[] hits;
try {
hits = searcher.search(query, 20).scoreDocs;
for (int i = 0; i < hits.length; i++) {
LOG.debug("doc={} score={}", hits[i].doc, hits[i].score);
int id = hits[i].doc;
Document doc = searcher.doc(id);
String path = doc.get(ID_FIELD);
if (path != null) {
LOG.debug((i + 1) + ". " + path);
String title = doc.get("title");
if (title != null) {
LOG.debug(" Title: {}", doc.get("title"));
}
String text = doc.get(SEARCH_FIELD);
TokenStream tokenStream = TokenSources.getTokenStream(searcher.getIndexReader(), id,
SEARCH_FIELD, analyzer);
TextFragment[] frag = highlighter.getBestTextFragments(tokenStream, text, true, 3);
LOG.debug(" {} fragments found for query '{}'", frag.length, query);
for (int j = 0; j < frag.length; j++) {
if ((frag[j] != null) && (frag[j].getScore() > 0)) {
LOG.debug(" Fragment: {}", frag[j].toString());
}
}
String fragment = (frag != null && frag.length > 0) ? frag[0].toString() : "";
matchingParagraphs.add(ImmutableMap.of("id", path, // <noteId>/paragraph/<paragraphId>
"name", title, "snippet", fragment, "text", text));
} else {
LOG.info("{}. No {} for this document", i + 1, ID_FIELD);
}
}
} catch (IOException | InvalidTokenOffsetsException e) {
LOG.error("Exception on searching for {}", query, e);
}
return matchingParagraphs;
}
/* (non-Javadoc)
* @see org.apache.zeppelin.search.Search#updateIndexDoc(org.apache.zeppelin.notebook.Note)
*/
@Override
public void updateIndexDoc(Note note) throws IOException {
updateIndexNoteName(note);
for (Paragraph p: note.getParagraphs()) {
updateIndexParagraph(note, p);
}
}
private void updateIndexNoteName(Note note) throws IOException {
String noteName = note.getName();
String noteId = note.getId();
LOG.debug("Indexing Notebook {}, '{}'", noteId, noteName);
if (null == noteName || noteName.isEmpty()) {
LOG.debug("Skipping empty notebook name");
return;
}
updateDoc(noteId, noteName, null);
}
private void updateIndexParagraph(Note note, Paragraph p) throws IOException {
if (p.getText() == null) {
LOG.debug("Skipping empty paragraph");
return;
}
updateDoc(note.getId(), note.getName(), p);
}
/**
* Updates index for the given note: either note.name or a paragraph If
* paragraph is <code>null</code> - updates only for the note.name
*
* @param noteId
* @param noteName
* @param p
* @throws IOException
*/
private void updateDoc(String noteId, String noteName, Paragraph p) throws IOException {
String id = formatId(noteId, p);
Document doc = newDocument(id, noteName, p);
try {
writer.updateDocument(new Term(ID_FIELD, id), doc);
writer.commit();
} catch (IOException e) {
LOG.error("Failed to updaet index of notebook {}", noteId, e);
}
}
/**
* If paragraph is not null, id is <noteId>/paragraphs/<paragraphId>,
* otherwise it's just <noteId>.
*/
static String formatId(String noteId, Paragraph p) {
String id = noteId;
if (null != p) {
id = Joiner.on('/').join(id, PARAGRAPH, p.getId());
}
return id;
}
static String formatDeleteId(String noteId, Paragraph p) {
String id = noteId;
if (null != p) {
id = Joiner.on('/').join(id, PARAGRAPH, p.getId());
} else {
id = id + "*";
}
return id;
}
/**
* If paragraph is not null, indexes code in the paragraph, otherwise indexes
* the notebook name.
*
* @param id id of the document, different for Note name and paragraph
* @param noteName name of the note
* @param p paragraph
* @return
*/
private Document newDocument(String id, String noteName, Paragraph p) {
Document doc = new Document();
Field pathField = new StringField(ID_FIELD, id, Field.Store.YES);
doc.add(pathField);
doc.add(new StringField("title", noteName, Field.Store.YES));
if (null != p) {
doc.add(new TextField(SEARCH_FIELD, p.getText(), Field.Store.YES));
Date date = p.getDateStarted() != null ? p.getDateStarted() : p.getDateCreated();
doc.add(new LongField("modified", date.getTime(), Field.Store.NO));
} else {
doc.add(new TextField(SEARCH_FIELD, noteName, Field.Store.YES));
}
return doc;
}
/* (non-Javadoc)
* @see org.apache.zeppelin.search.Search#addIndexDocs(java.util.Collection)
*/
@Override
public void addIndexDocs(Collection<Note> collection) {
int docsIndexed = 0;
long start = System.nanoTime();
try {
for (Note note : collection) {
addIndexDocAsync(note);
docsIndexed++;
}
} catch (IOException e) {
LOG.error("Failed to index all Notebooks", e);
} finally {
try { // save what's been indexed, even if not full collection
writer.commit();
} catch (IOException e) {
LOG.error("Failed to save index", e);
}
long end = System.nanoTime();
LOG.info("Indexing {} notebooks took {}ms", docsIndexed,
TimeUnit.NANOSECONDS.toMillis(end - start));
}
}
/* (non-Javadoc)
* @see org.apache.zeppelin.search.Search#addIndexDoc(org.apache.zeppelin.notebook.Note)
*/
@Override
public void addIndexDoc(Note note) {
try {
addIndexDocAsync(note);
writer.commit();
} catch (IOException e) {
LOG.error("Failed to add note {} to index", note, e);
}
}
/**
* Indexes the given notebook, but does not commit changes.
*
* @param note
* @throws IOException
*/
private void addIndexDocAsync(Note note) throws IOException {
indexNoteName(writer, note.getId(), note.getName());
for (Paragraph doc : note.getParagraphs()) {
if (doc.getText() == null) {
LOG.debug("Skipping empty paragraph");
continue;
}
indexDoc(writer, note.getId(), note.getName(), doc);
}
}
/* (non-Javadoc)
* @see org.apache.zeppelin.search.Search#deleteIndexDocs(org.apache.zeppelin.notebook.Note)
*/
@Override
public void deleteIndexDocs(Note note) {
deleteDoc(note, null);
}
/* (non-Javadoc)
* @see org.apache.zeppelin.search.Search
* #deleteIndexDoc(org.apache.zeppelin.notebook.Note, org.apache.zeppelin.notebook.Paragraph)
*/
@Override
public void deleteIndexDoc(Note note, Paragraph p) {
deleteDoc(note, p);
}
private void deleteDoc(Note note, Paragraph p) {
if (null == note) {
LOG.error("Trying to delete note by reference to NULL");
return;
}
String fullNoteOrJustParagraph = formatDeleteId(note.getId(), p);
LOG.debug("Deleting note {}, out of: {}", note.getId(), writer.numDocs());
try {
writer.deleteDocuments(new WildcardQuery(new Term(ID_FIELD, fullNoteOrJustParagraph)));
writer.commit();
} catch (IOException e) {
LOG.error("Failed to delete {} from index by '{}'", note, fullNoteOrJustParagraph, e);
}
LOG.debug("Done, index contains {} docs now" + writer.numDocs());
}
/* (non-Javadoc)
* @see org.apache.zeppelin.search.Search#close()
*/
@Override
public void close() {
try {
writer.close();
} catch (IOException e) {
LOG.error("Failed to .close() the notebook index", e);
}
}
/**
* Indexes a notebook name
*
* @throws IOException
*/
private void indexNoteName(IndexWriter w, String noteId, String noteName) throws IOException {
LOG.debug("Indexing Notebook {}, '{}'", noteId, noteName);
if (null == noteName || noteName.isEmpty()) {
LOG.debug("Skipping empty notebook name");
return;
}
indexDoc(w, noteId, noteName, null);
}
/**
* Indexes a single document:
* - code of the paragraph (if non-null)
* - or just a note name
*/
private void indexDoc(IndexWriter w, String noteId, String noteName, Paragraph p)
throws IOException {
String id = formatId(noteId, p);
Document doc = newDocument(id, noteName, p);
w.addDocument(doc);
}
}

View file

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.search;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
/**
* Search (both, indexing and query) the notebooks.
*
* Intended to have multiple implementation, i.e:
* - local Lucene (in-memory, on-disk)
* - remote Elasticsearch
*/
public interface SearchService {
/**
* Full-text search in all the notebooks
*
* @param queryStr a query
* @return A list of matching paragraphs (id, text, snippet w/ highlight)
*/
public List<Map<String, String>> query(String queryStr);
/**
* Updates all documents in index for the given note:
* - name
* - all paragraphs
*
* @param note a Note to update index for
* @throws IOException
*/
public void updateIndexDoc(Note note) throws IOException;
/**
* Indexes full collection of notes: all the paragraphs + Note names
*
* @param collection of Notes
*/
public void addIndexDocs(Collection<Note> collection);
/**
* Indexes the given notebook.
*
* @throws IOException If there is a low-level I/O error
*/
public void addIndexDoc(Note note);
/**
* Deletes all docs on given Note from index
*/
public void deleteIndexDocs(Note note);
/**
* Deletes doc for a given
*
* @param note
* @param p
* @throws IOException
*/
public void deleteIndexDoc(Note note, Paragraph p);
/**
* Frees the recourses used by index
*/
public void close();
}

View file

@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
@ -45,6 +46,8 @@ import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.search.LuceneSearch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -84,8 +87,9 @@ public class NotebookTest implements JobListenerFactory{
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, search);
}
@After
@ -170,7 +174,8 @@ public class NotebookTest implements JobListenerFactory{
p1.setText("hello world");
note.persist();
Notebook notebook2 = new Notebook(conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this);
Notebook notebook2 = new Notebook(
conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this, null);
assertEquals(1, notebook2.getAllNotes().size());
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.notebook.repo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
@ -39,6 +40,8 @@ import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.search.LuceneSearch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -86,8 +89,9 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
SearchService search = mock(SearchService.class);
notebookRepoSync = new NotebookRepoSync(conf);
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this);
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search);
}
@After

View file

@ -18,6 +18,7 @@
package org.apache.zeppelin.notebook.repo;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
@ -32,20 +33,19 @@ import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookTest;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.search.LuceneSearch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VFSNotebookRepoTest implements JobListenerFactory{
private static final Logger logger = LoggerFactory.getLogger(NotebookTest.class);
public class VFSNotebookRepoTest implements JobListenerFactory {
private static final Logger LOG = LoggerFactory.getLogger(VFSNotebookRepoTest.class);
private ZeppelinConfiguration conf;
private SchedulerFactory schedulerFactory;
private Notebook notebook;
@ -53,16 +53,15 @@ public class VFSNotebookRepoTest implements JobListenerFactory{
private InterpreterFactory factory;
private File mainZepDir;
private File mainNotebookDir;
@Before
public void setUp() throws Exception {
String zpath = System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis();
String zpath = System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis();
mainZepDir = new File(zpath);
mainZepDir.mkdirs();
new File(mainZepDir, "conf").mkdirs();
String mainNotePath = zpath+"/notebook";
String mainNotePath = zpath + "/notebook";
mainNotebookDir = new File(mainNotePath);
mainNotebookDir.mkdirs();
@ -79,15 +78,15 @@ public class VFSNotebookRepoTest implements JobListenerFactory{
this.schedulerFactory = new SchedulerFactory();
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, search);
}
@After
public void tearDown() throws Exception {
//FileUtils.deleteDirectory(mainZepDir);
if (!FileUtils.deleteQuietly(mainZepDir)) {
logger.error("Failed to delete {} ", mainZepDir.getName());
LOG.error("Failed to delete {} ", mainZepDir.getName());
}
}
@ -97,7 +96,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory{
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
Paragraph p1 = note.addParagraph();
Map config = p1.getConfig();
Map<String, Object> config = p1.getConfig();
config.put("enabled", true);
p1.setConfig(config);
p1.setText("%mock1 hello world");

View file

@ -0,0 +1,259 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.search;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.*;
import static org.apache.zeppelin.search.LuceneSearch.formatId;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInterpreterLoader;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
public class LuceneSearchTest {
private static NoteInterpreterLoader replLoaderMock;
private static NotebookRepo notebookRepoMock;
private SearchService notebookIndex;
@BeforeClass
public static void beforeStartUp() {
notebookRepoMock = mock(NotebookRepo.class);
replLoaderMock = mock(NoteInterpreterLoader.class);
when(replLoaderMock.getInterpreterSettings())
.thenReturn(ImmutableList.<InterpreterSetting>of());
}
@Before
public void startUp() {
notebookIndex = new LuceneSearch();
}
@After
public void shutDown() {
notebookIndex.close();
}
@Test public void canIndexNotebook() {
//give
Note note1 = newNoteWithParapgraph("Notebook1", "test");
Note note2 = newNoteWithParapgraph("Notebook2", "not test");
List<Note> notebook = Arrays.asList(note1, note2);
//when
notebookIndex.addIndexDocs(notebook);
}
@Test public void canIndexAndQuery() {
//given
Note note1 = newNoteWithParapgraph("Notebook1", "test");
Note note2 = newNoteWithParapgraphs("Notebook2", "not test", "not test at all");
notebookIndex.addIndexDocs(Arrays.asList(note1, note2));
//when
List<Map<String, String>> results = notebookIndex.query("all");
//then
assertThat(results).isNotEmpty();
assertThat(results.size()).isEqualTo(1);
assertThat(results.get(0))
.containsEntry("id", formatId(note2.getId(), note2.getLastParagraph()));
}
@Test public void canIndexAndQueryByNotebookName() {
//given
Note note1 = newNoteWithParapgraph("Notebook1", "test");
Note note2 = newNoteWithParapgraphs("Notebook2", "not test", "not test at all");
notebookIndex.addIndexDocs(Arrays.asList(note1, note2));
//when
List<Map<String, String>> results = notebookIndex.query("Notebook1");
//then
assertThat(results).isNotEmpty();
assertThat(results.size()).isEqualTo(1);
assertThat(results.get(0)).containsEntry("id", note1.getId());
}
@Test public void indexKeyContract() throws IOException {
//give
Note note1 = newNoteWithParapgraph("Notebook1", "test");
//when
notebookIndex.addIndexDoc(note1);
//then
String id = resultForQuery("test").get(0).get(LuceneSearch.ID_FIELD);
assertThat(Splitter.on("/").split(id)) //key structure <noteId>/paragraph/<paragraphId>
.containsAllOf(note1.getId(), LuceneSearch.PARAGRAPH, note1.getLastParagraph().getId());
}
@Test //(expected=IllegalStateException.class)
public void canNotSearchBeforeIndexing() {
//given NO notebookIndex.index() was called
//when
List<Map<String, String>> result = notebookIndex.query("anything");
//then
assertThat(result).isEmpty();
//assert logs were printed
//"ERROR org.apache.zeppelin.search.SearchService:97 - Failed to open index dir RAMDirectory"
}
@Test public void canIndexAndReIndex() throws IOException {
//given
Note note1 = newNoteWithParapgraph("Notebook1", "test");
Note note2 = newNoteWithParapgraphs("Notebook2", "not test", "not test at all");
notebookIndex.addIndexDocs(Arrays.asList(note1, note2));
//when
Paragraph p2 = note2.getLastParagraph();
p2.setText("test indeed");
notebookIndex.updateIndexDoc(note2);
//then
List<Map<String, String>> results = notebookIndex.query("all");
assertThat(results).isEmpty();
results = notebookIndex.query("indeed");
assertThat(results).isNotEmpty();
}
@Test public void canDeleteNull() throws IOException {
//give
// looks like a bug in web UI: it tries to delete a note twice (after it has just been deleted)
//when
notebookIndex.deleteIndexDocs(null);
}
@Test public void canDeleteFromIndex() throws IOException {
//given
Note note1 = newNoteWithParapgraph("Notebook1", "test");
Note note2 = newNoteWithParapgraphs("Notebook2", "not test", "not test at all");
notebookIndex.addIndexDocs(Arrays.asList(note1, note2));
assertThat(resultForQuery("Notebook2")).isNotEmpty();
//when
notebookIndex.deleteIndexDocs(note2);
//then
assertThat(notebookIndex.query("all")).isEmpty();
assertThat(resultForQuery("Notebook2")).isEmpty();
List<Map<String, String>> results = resultForQuery("test");
assertThat(results).isNotEmpty();
assertThat(results.size()).isEqualTo(1);
}
@Test public void indexParagraphUpdatedOnNoteSave() throws IOException {
//given: total 2 notebooks, 3 paragraphs
Note note1 = newNoteWithParapgraph("Notebook1", "test");
Note note2 = newNoteWithParapgraphs("Notebook2", "not test", "not test at all");
notebookIndex.addIndexDocs(Arrays.asList(note1, note2));
assertThat(resultForQuery("test").size()).isEqualTo(3);
//when
Paragraph p1 = note1.getLastParagraph();
p1.setText("no no no");
note1.persist();
//then
assertThat(resultForQuery("Notebook1").size()).isEqualTo(1);
List<Map<String, String>> results = resultForQuery("test");
assertThat(results).isNotEmpty();
assertThat(results.size()).isEqualTo(2);
//does not include Notebook1's paragraph any more
for (Map<String, String> result: results) {
assertThat(result.get("id").startsWith(note1.getId())).isFalse();;
}
}
@Test public void indexNoteNameUpdatedOnNoteSave() throws IOException {
//given: total 2 notebooks, 3 paragraphs
Note note1 = newNoteWithParapgraph("Notebook1", "test");
Note note2 = newNoteWithParapgraphs("Notebook2", "not test", "not test at all");
notebookIndex.addIndexDocs(Arrays.asList(note1, note2));
assertThat(resultForQuery("test").size()).isEqualTo(3);
//when
note1.setName("NotebookN");
note1.persist();
//then
assertThat(resultForQuery("Notebook1")).isEmpty();
assertThat(resultForQuery("NotebookN")).isNotEmpty();
assertThat(resultForQuery("NotebookN").size()).isEqualTo(1);
}
private List<Map<String, String>> resultForQuery(String q) {
return notebookIndex.query(q);
}
/**
* Creates a new Note \w given name,
* adds a new paragraph \w given text
*
* @param noteName name of the note
* @param parText text of the paragraph
* @return Note
*/
private Note newNoteWithParapgraph(String noteName, String parText) {
Note note1 = newNote(noteName);
addParagraphWithText(note1, parText);
return note1;
}
/**
* Creates a new Note \w given name,
* adds N paragraphs \w given texts
*/
private Note newNoteWithParapgraphs(String noteName, String... parTexts) {
Note note1 = newNote(noteName);
for (String parText : parTexts) {
addParagraphWithText(note1, parText);
}
return note1;
}
private Paragraph addParagraphWithText(Note note, String text) {
Paragraph p = note.addParagraph();
p.setText(text);
return p;
}
private Note newNote(String name) {
Note note = new Note(notebookRepoMock, replLoaderMock, null, notebookIndex);
note.setName(name);
return note;
}
}