This commit is contained in:
astroshim 2016-11-18 07:51:55 +09:00
commit 649ff6e856
59 changed files with 1098 additions and 185 deletions

View file

@ -7,7 +7,7 @@ app/styles/looknfeel
Overall look and theme of the Zeppelin notebook page can be customized here.
### Code Syntax Highlighting
There are two parts to code highlighting. First, Zeppelin uses the Ace Editor for its note paragraphs. Color style for this can be changed by setting theme on the editor instance. Second, Zeppelin's Markdown interpreter calls markdown4j to emit HTML, and such content may contain <pre><code> tags that can be consumed by Highlight.js.
There are two parts to code highlighting. First, Zeppelin uses the Ace Editor for its note paragraphs. Color style for this can be changed by setting theme on the editor instance. Second, Zeppelin's Markdown interpreter calls pegdown parser to emit HTML, and such content may contain <pre><code> tags that can be consumed by Highlight.js.
#### Theme on Ace Editor
app/scripts/controllers/paragraph.js
@ -16,7 +16,7 @@ Call setTheme on the editor with the theme path/name.
[List of themes on GitHub](https://github.com/ajaxorg/ace/tree/master/lib/ace/theme)
#### Style for Markdown Code Blocks
Highlight.js parses and converts <pre><code> blocks from markdown4j into keywords and language syntax with proper styles. It also attempts to infer the best fitting language if it is not provided. The visual style can be changed by simply including the desired [stylesheet](https://github.com/components/highlightjs/tree/master/styles) into app/index.html. See the next section on build.
Highlight.js parses and converts <pre><code> blocks from pegdown parser into keywords and language syntax with proper styles. It also attempts to infer the best fitting language if it is not provided. The visual style can be changed by simply including the desired [stylesheet](https://github.com/components/highlightjs/tree/master/styles) into app/index.html. See the next section on build.
Note that code block background color is overriden in app/styles/notebook.css (look for .paragraph .tableDisplay .hljs).

View file

@ -131,6 +131,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>info.archinnov</groupId>
<artifactId>achilles-embedded</artifactId>
@ -145,6 +152,10 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</exclusion>
</exclusions>
</dependency>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 66 KiB

After

Width:  |  Height:  |  Size: 77 KiB

View file

@ -23,17 +23,18 @@ limitations under the License.
<div id="toc"></div>
## Overview
## Overview
[Apache Spark](http://spark.apache.org/) has supported three cluster manager types([Standalone](http://spark.apache.org/docs/latest/spark-standalone.html), [Apache Mesos](http://spark.apache.org/docs/latest/running-on-mesos.html) and [Hadoop YARN](http://spark.apache.org/docs/latest/running-on-yarn.html)) so far.
This document will guide you how you can build and configure the environment on 3 types of Spark cluster manager with Apache Zeppelin using [Docker](https://www.docker.com/) scripts.
So [install docker](https://docs.docker.com/engine/installation/) on the machine first.
## Spark standalone mode
[Spark standalone](http://spark.apache.org/docs/latest/spark-standalone.html) is a simple cluster manager included with Spark that makes it easy to set up a cluster.
You can simply set up Spark standalone environment with below steps.
You can simply set up Spark standalone environment with below steps.
> **Note :** Since Apache Zeppelin and Spark use same `8080` port for their web UI, you might need to change `zeppelin.server.port` in `conf/zeppelin-site.xml`.
### 1. Build Docker file
You can find docker script files under `scripts/docker/spark-cluster-managers`.
@ -52,9 +53,11 @@ docker run -it \
-p 8081:8081 \
-h sparkmaster \
--name spark_standalone \
spark_standalone bash;
spark_standalone bash;
```
Note that `sparkmaster` hostname used here to run docker container should be defined in your `/etc/hosts`.
### 3. Configure Spark interpreter in Zeppelin
Set Spark master as `spark://<hostname>:7077` in Zeppelin **Interpreters** setting page.
@ -81,7 +84,7 @@ You can simply set up [Spark on YARN](http://spark.apache.org/docs/latest/runnin
You can find docker script files under `scripts/docker/spark-cluster-managers`.
```
cd $ZEPPELIN_HOME/scripts/docker/spark-cluster-managers/spark_yarn
cd $ZEPPELIN_HOME/scripts/docker/spark-cluster-managers/spark_yarn_cluster
docker build -t "spark_yarn" .
```
@ -111,6 +114,8 @@ docker run -it \
spark_yarn bash;
```
Note that `sparkmaster` hostname used here to run docker container should be defined in your `/etc/hosts`.
### 3. Verify running Spark on YARN.
You can simply verify the processes of Spark and YARN are running well in Docker with below command.
@ -172,6 +177,8 @@ docker run --net=host -it \
spark_mesos bash;
```
Note that `sparkmaster` hostname used here to run docker container should be defined in your `/etc/hosts`.
### 3. Verify running Spark on Mesos.
You can simply verify the processes of Spark and Mesos are running well in Docker with below command.
@ -201,4 +208,3 @@ Don't forget to set Spark `master` as `mesos://127.0.1.1:5050` in Zeppelin **Int
After running a single paragraph with Spark interpreter in Zeppelin, browse `http://<hostname>:5050/#/frameworks` and check Zeppelin application is running well or not.
<img src="../assets/themes/zeppelin/img/docs-img/mesos_frameworks.png" />

View file

@ -53,3 +53,4 @@ So, copying `notebook` and `conf` directory should be enough.
- Mapping from `%jdbc(prefix)` to `%prefix` is no longer available. Instead, you can use %[interpreter alias] with multiple interpreter setttings on GUI.
- Usage of `ZEPPELIN_PORT` is not supported in ssl mode. Instead use `ZEPPELIN_SSL_PORT` to configure the ssl port. Value from `ZEPPELIN_PORT` is used only when `ZEPPELIN_SSL` is set to `false`.
- The support on Spark 1.1.x to 1.3.x is deprecated.
- From 0.7, we uses `pegdown` as the `markdown.parser.type` option for the `%md` interpreter. Rendered markdown might be different from what you expected

View file

@ -25,28 +25,14 @@ limitations under the License.
## Overview
[Markdown](http://daringfireball.net/projects/markdown/) is a plain text formatting syntax designed so that it can be converted to HTML.
Apache Zeppelin uses [markdown4j](https://github.com/jdcasey/markdown4j) and [pegdown](https://github.com/sirthias/pegdown) as markdown parsers.
Apache Zeppelin uses [pegdown](https://github.com/sirthias/pegdown) and [markdown4j](https://github.com/jdcasey/markdown4j) as markdown parsers.
In Zeppelin notebook, you can use ` %md ` in the beginning of a paragraph to invoke the Markdown interpreter and generate static html from Markdown plain text.
In Zeppelin, Markdown interpreter is enabled by default and uses the [markdown4j](https://github.com/jdcasey/markdown4j) parser.
In Zeppelin, Markdown interpreter is enabled by default and uses the [pegdown](https://github.com/sirthias/pegdown) parser.
<img src="../assets/themes/zeppelin/img/docs-img/markdown-interpreter-setting.png" width="60%" />
## Configuration
<table class="table-configuration">
<tr>
<th>Name</th>
<th>Default Value</th>
<th>Description</th>
</tr>
<tr>
<td>markdown.parser.type</td>
<td>markdown4j</td>
<td>Markdown Parser Type. <br/> Available values: markdown4j, pegdown.</td>
</tr>
</table>
## Example
The following example demonstrates the basic usage of Markdown in a Zeppelin notebook.
@ -57,16 +43,34 @@ The following example demonstrates the basic usage of Markdown in a Zeppelin not
Markdown interpreter leverages %html display system internally. That means you can mix mathematical expressions with markdown syntax. For more information, please see [Mathematical Expression](../displaysystem/basicdisplaysystem.html#mathematical-expressions) section.
## Configuration
<table class="table-configuration">
<tr>
<th>Name</th>
<th>Default Value</th>
<th>Description</th>
</tr>
<tr>
<td>markdown.parser.type</td>
<td>pegdown</td>
<td>Markdown Parser Type. <br/> Available values: pegdown, markdown4j.</td>
</tr>
</table>
### Markdown4j Parser
`markdown4j` parser provides [YUML](http://yuml.me/) and [Websequence](https://www.websequencediagrams.com/) extensions
<img src="../assets/themes/zeppelin/img/docs-img/markdown-example-markdown4j-parser.png" width="70%" />
### Pegdown Parser
`pegdown` parser provides github flavored markdown.
<img src="../assets/themes/zeppelin/img/docs-img/markdown-example-pegdown-parser.png" width="70%" />
`pegdown` parser provides [YUML](http://yuml.me/) and [Websequence](https://www.websequencediagrams.com/) plugins also.
<img src="../assets/themes/zeppelin/img/docs-img/markdown-example-pegdown-parser-plugins.png" width="70%" />
### Markdown4j Parser
Since pegdown parser is more accurate and provides much more markdown syntax
`markdown4j` option might be removed later. But keep this parser for the backward compatibility.

View file

@ -59,7 +59,7 @@ $beam.scio
val (sc, args) = ContextAndArgs(argz)
```
Use `sc` context the way you would in regular pipeline/REPL.
Use `sc` context the way you would in a regular pipeline/REPL.
Example:
@ -73,7 +73,7 @@ If you close Scio context, go ahead an create a new one using `ContextAndArgs`.
### Progress
There can be only one paragraph running at a time. There is no notion of overall progress, thus progress bar will be `0`.
There can be only one paragraph running at once. There is no notion of overall progress, thus progress bar will show `0`.
### SCollection display helpers
@ -93,11 +93,11 @@ There are different helper methods for different objects. You can easily display
##### `SCollection` helper
`SCollection` has `closeAndDisplay` Zeppelin helper method for types listed above. Use it to synchronously close Scio context, once available pull and display results.
`SCollection` has `closeAndDisplay` Zeppelin helper method for types listed above. Use it to synchronously close Scio context, and once available pull and display results.
##### `Future[Tap]` helper
`Future[Tap]` has `waitAndDisplay` Zeppelin helper method for types listed above. Use it synchronously wait for results, once available pull and display results.
`Future[Tap]` has `waitAndDisplay` Zeppelin helper method for types listed above. Use it to synchronously wait for results, and once available pull and display results.
##### `Tap` helper
@ -159,6 +159,7 @@ sc.avroFile[EndSongCleaned]("gs://<bucket>/tmp/my.avro").take(10).closeAndDispla
### Google credentials
Scio Interpreter will try to infer your Google Cloud credentials from its environment, it will take into the account:
* `argz` interpreter settings ([doc](https://github.com/spotify/scio/wiki#options))
* environment variable (`GOOGLE_APPLICATION_CREDENTIALS`)
* gcloud configuration

View file

@ -33,13 +33,17 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** MarkdownInterpreter interpreter for Zeppelin. */
/**
* MarkdownInterpreter interpreter for Zeppelin.
*/
public class Markdown extends Interpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(Markdown.class);
private MarkdownParser parser;
/** Markdown Parser Type. */
/**
* Markdown Parser Type.
*/
public enum MarkdownParserType {
PEGDOWN {
@Override
@ -82,7 +86,8 @@ public class Markdown extends Interpreter {
}
@Override
public void close() {}
public void close() {
}
@Override
public InterpreterResult interpret(String markdownText, InterpreterContext interpreterContext) {
@ -99,7 +104,8 @@ public class Markdown extends Interpreter {
}
@Override
public void cancel(InterpreterContext context) {}
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {

View file

@ -23,7 +23,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
/** Markdown Parser using markdown4j processor . */
/**
* Markdown Parser using markdown4j processor.
*/
public class Markdown4jParser implements MarkdownParser {
private Markdown4jProcessor processor;

View file

@ -17,7 +17,9 @@
package org.apache.zeppelin.markdown;
/** Abstract Markdown Parser. */
/**
* Abstract Markdown Parser.
*/
public interface MarkdownParser {
String render(String markdownText);
}

View file

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.markdown;
import org.parboiled.support.Var;
import java.util.HashMap;
import java.util.Map;
/**
* Implementation of Var to support parameter parsing.
*
* @param <K> Key
* @param <V> Value
*/
public class ParamVar<K, V> extends Var<Map<K, V>> {
public ParamVar() {
super(new HashMap<K, V>());
}
public boolean put(K key, V value) {
get().put(key, value);
return true;
}
}

View file

@ -19,17 +19,23 @@ package org.apache.zeppelin.markdown;
import org.pegdown.Extensions;
import org.pegdown.PegDownProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.pegdown.plugins.PegDownPlugins;
/** Markdown Parser using pegdown processor. */
/**
* Markdown Parser using pegdown processor.
*/
public class PegdownParser implements MarkdownParser {
private PegDownProcessor processor;
public static final long PARSING_TIMEOUT_AS_MILLIS = 5000;
public static final int OPTIONS = Extensions.ALL_WITH_OPTIONALS - Extensions.ANCHORLINKS;
public PegdownParser() {
int pegdownOptions = Extensions.ALL_WITH_OPTIONALS - Extensions.ANCHORLINKS;
int parsingTimeoutAsMillis = 5000;
processor = new PegDownProcessor(pegdownOptions, parsingTimeoutAsMillis);
PegDownPlugins plugins = new PegDownPlugins.Builder()
.withPlugin(PegdownYumlPlugin.class)
.withPlugin(PegdownWebSequencelPlugin.class)
.build();
processor = new PegDownProcessor(OPTIONS, PARSING_TIMEOUT_AS_MILLIS, plugins);
}
@Override
@ -45,12 +51,14 @@ public class PegdownParser implements MarkdownParser {
return html;
}
/** wrap with markdown class div to styling DOM using css. */
/**
* wrap with markdown class div to styling DOM using css.
*/
public static String wrapWithMarkdownClassDiv(String html) {
return new StringBuilder()
.append("<div class=\"markdown-body\">\n")
.append(html)
.append("\n</div>")
.toString();
.append("<div class=\"markdown-body\">\n")
.append(html)
.append("\n</div>")
.toString();
}
}

View file

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.markdown;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.parboiled.BaseParser;
import org.parboiled.Rule;
import org.parboiled.support.StringBuilderVar;
import org.pegdown.Parser;
import org.pegdown.ast.ExpImageNode;
import org.pegdown.ast.TextNode;
import org.pegdown.plugins.BlockPluginParser;
import org.pegdown.plugins.PegDownPlugins;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
/**
* Pegdown plugin for Websequence diagram
*/
public class PegdownWebSequencelPlugin extends Parser implements BlockPluginParser {
private static final String WEBSEQ_URL = "http://www.websequencediagrams.com";
public PegdownWebSequencelPlugin() {
super(PegdownParser.OPTIONS,
PegdownParser.PARSING_TIMEOUT_AS_MILLIS,
DefaultParseRunnerProvider);
}
public PegdownWebSequencelPlugin(Integer opts, Long millis, ParseRunnerProvider provider,
PegDownPlugins plugins) {
super(opts, millis, provider, plugins);
}
public static final String TAG = "%%%";
Rule StartMarker() {
return Sequence(Spn1(), TAG, Sp(), "sequence", Sp());
}
String EndMarker() {
return TAG;
}
Rule Body() {
return OneOrMore(TestNot(TAG), BaseParser.ANY);
}
Rule BlockRule() {
StringBuilderVar style = new StringBuilderVar();
StringBuilderVar body = new StringBuilderVar();
return NodeSequence(
StartMarker(),
Optional(
String("style="),
Sequence(OneOrMore(Letter()), style.append(match()), Spn1())),
Sequence(Body(), body.append(match())),
EndMarker(),
push(
new ExpImageNode("title",
createWebsequenceUrl(style.getString(), body.getString()),
new TextNode("")))
);
}
public static String createWebsequenceUrl(String style,
String content) {
style = StringUtils.defaultString(style, "default");
OutputStreamWriter writer = null;
BufferedReader reader = null;
String webSeqUrl = "";
try {
String query = new StringBuilder()
.append("style=")
.append(style)
.append("&message=")
.append(URLEncoder.encode(content, "UTF-8"))
.append("&apiVersion=1")
.toString();
URL url = new URL(WEBSEQ_URL);
URLConnection conn = url.openConnection();
conn.setDoOutput(true);
writer = new OutputStreamWriter(conn.getOutputStream(), StandardCharsets.UTF_8);
writer.write(query);
writer.flush();
StringBuilder response = new StringBuilder();
reader = new BufferedReader(
new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
}
writer.close();
reader.close();
String json = response.toString();
int start = json.indexOf("?png=");
int end = json.indexOf("\"", start);
if (start != -1 && end != -1) {
webSeqUrl = WEBSEQ_URL + "/" + json.substring(start, end);
}
} catch (IOException e) {
throw new RuntimeException("Failed to get proper response from websequencediagrams.com", e);
} finally {
IOUtils.closeQuietly(writer);
IOUtils.closeQuietly(reader);
}
return webSeqUrl;
}
@Override
public Rule[] blockPluginRules() {
return new Rule[]{BlockRule()};
}
}

View file

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.markdown;
import static org.apache.commons.lang3.StringUtils.defaultString;
import org.parboiled.BaseParser;
import org.parboiled.Rule;
import org.parboiled.support.StringBuilderVar;
import org.pegdown.Parser;
import org.pegdown.ast.ExpImageNode;
import org.pegdown.ast.TextNode;
import org.pegdown.plugins.BlockPluginParser;
import org.pegdown.plugins.PegDownPlugins;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Map;
/**
* Pegdown plugin for YUML
*/
public class PegdownYumlPlugin extends Parser implements BlockPluginParser {
public PegdownYumlPlugin() {
super(PegdownParser.OPTIONS,
PegdownParser.PARSING_TIMEOUT_AS_MILLIS,
DefaultParseRunnerProvider);
}
public PegdownYumlPlugin(Integer options,
Long maxParsingTimeInMillis,
ParseRunnerProvider parseRunnerProvider,
PegDownPlugins plugins) {
super(options, maxParsingTimeInMillis, parseRunnerProvider, plugins);
}
public static final String TAG = "%%%";
Rule StartMarker() {
return Sequence(Spn1(), TAG, Sp(), "yuml", Sp());
}
String EndMarker() {
return TAG;
}
Rule ParameterName() {
return FirstOf("type", "style", "scale", "format", "dir");
}
Rule Body() {
return OneOrMore(TestNot(TAG), BaseParser.ANY);
}
Rule BlockRule() {
ParamVar<String, String> params = new ParamVar<String, String>();
StringBuilderVar name = new StringBuilderVar();
StringBuilderVar value = new StringBuilderVar();
StringBuilderVar body = new StringBuilderVar();
return NodeSequence(
StartMarker(),
ZeroOrMore(
Sequence(
ParameterName(), name.append(match()),
String("="),
OneOrMore(Alphanumeric()), value.append(match())),
Sp(),
params.put(name.getString(), value.getString()),
name.clear(), value.clear()),
Body(),
body.append(match()),
EndMarker(),
push(
new ExpImageNode(
"title", createYumlUrl(params.get(), body.getString()), new TextNode("")))
);
}
public static String createYumlUrl(Map<String, String> params, String body) {
StringBuilder inlined = new StringBuilder();
for (String line : body.split("\\r?\\n")) {
line = line.trim();
if (line.length() > 0) {
if (inlined.length() > 0) {
inlined.append(", ");
}
inlined.append(line);
}
}
String encodedBody = null;
try {
encodedBody = URLEncoder.encode(inlined.toString(), "UTF-8");
} catch (UnsupportedEncodingException e) {
new RuntimeException("Failed to encode YUML markdown body", e);
}
StringBuilder mergedStyle = new StringBuilder();
String style = defaultString(params.get("style"), "scruffy");
String type = defaultString(params.get("type"), "class");
String format = defaultString(params.get("format"), "svg");
mergedStyle.append(style);
if (null != params.get("dir")) {
mergedStyle.append(";dir:" + params.get("dir"));
}
if (null != params.get("scale")) {
mergedStyle.append(";scale:" + params.get("scale"));
}
return new StringBuilder()
.append("http://yuml.me/diagram/")
.append(mergedStyle.toString() + "/")
.append(type + "/")
.append(encodedBody)
.append("." + format)
.toString();
}
@Override
public Rule[] blockPluginRules() {
return new Rule[]{BlockRule()};
}
}

View file

@ -7,8 +7,8 @@
"markdown.parser.type": {
"envName": "MARKDOWN_PARSER_TYPE",
"propertyName": "markdown.parser.type",
"defaultValue": "markdown4j",
"description": "Markdown Parser Type. Available values: markdown4j, pegdown. Default = markdown4j"
"defaultValue": "pegdown",
"description": "Markdown Parser Type. Available values: pegdown, markdown4j. Default = pegdown"
}
},
"editor": {

View file

@ -22,8 +22,13 @@ import static org.junit.Assert.assertEquals;
import java.util.Properties;
import org.apache.zeppelin.interpreter.InterpreterResult;
import static org.apache.zeppelin.markdown.PegdownParser.wrapWithMarkdownClassDiv;
import static org.junit.Assert.assertThat;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -262,7 +267,6 @@ public class PegdownParserTest {
@Test
public void testAlignedTable() {
String input =
new StringBuilder()
.append("| First Header | Second Header | Third Header |\n")
@ -299,4 +303,34 @@ public class PegdownParserTest {
InterpreterResult result = md.interpret(input, null);
assertEquals(wrapWithMarkdownClassDiv(expected), result.message());
}
@Test
public void testWebsequencePlugin() {
String input =
new StringBuilder()
.append("\n \n %%% sequence style=modern-blue\n")
.append("title Authentication Sequence\n")
.append("Alice->Bob: Authentication Request\n")
.append("note right of Bob: Bob thinks about it\n")
.append("Bob->Alice: Authentication Response\n")
.append(" %%% ")
.toString();
InterpreterResult result = md.interpret(input, null);
assertThat(result.message(), CoreMatchers.containsString("<img src=\"http://www.websequencediagrams.com/?png="));
}
@Test
public void testYumlPlugin() {
String input = new StringBuilder()
.append("\n \n %%% yuml style=nofunky scale=120 format=svg\n")
.append("[Customer]<>-orders>[Order]\n")
.append("[Order]++-0..>[LineItem]\n")
.append("[Order]-[note:Aggregate root.]\n")
.append(" %%% ")
.toString();
InterpreterResult result = md.interpret(input, null);
assertThat(result.message(), CoreMatchers.containsString("<img src=\"http://yuml.me/diagram/"));
}
}

View file

@ -316,6 +316,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
sparkInterpreter.populateSparkWebUrl(context);
if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
return new InterpreterResult(Code.ERROR, "Spark "
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");

View file

@ -27,8 +27,6 @@ import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Joiner;
@ -45,6 +43,7 @@ import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.SparkUI;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@ -113,6 +112,7 @@ public class SparkInterpreter extends Interpreter {
private InterpreterOutputStream out;
private SparkDependencyResolver dep;
private String sparkUrl;
/**
* completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
@ -939,6 +939,13 @@ public class SparkInterpreter extends Interpreter {
numReferenceOfSparkContext.incrementAndGet();
}
private String getSparkUIUrl() {
Option<SparkUI> sparkUiOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
SparkUI sparkUi = sparkUiOption.get();
String sparkWebUrl = sparkUi.appUIAddress();
return sparkWebUrl;
}
private Results.Result interpret(String line) {
return (Results.Result) Utils.invokeMethod(
intp,
@ -947,6 +954,20 @@ public class SparkInterpreter extends Interpreter {
new Object[] {line});
}
public void populateSparkWebUrl(InterpreterContext ctx) {
if (sparkUrl == null) {
sparkUrl = getSparkUIUrl();
Map<String, String> infos = new java.util.HashMap<>();
if (sparkUrl != null) {
infos.put("url", sparkUrl);
logger.info("Sending metainfos to Zeppelin server: {}", infos.toString());
if (ctx != null && ctx.getClient() != null) {
ctx.getClient().onMetaInfosReceived(infos);
}
}
}
}
private List<File> currentClassPath() {
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
@ -1086,7 +1107,7 @@ public class SparkInterpreter extends Interpreter {
return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
+ " is not supported");
}
populateSparkWebUrl(context);
z.setInterpreterContext(context);
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(Code.SUCCESS);

View file

@ -97,6 +97,7 @@ public class SparkRInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {
getSparkInterpreter().populateSparkWebUrl(interpreterContext);
String imageWidth = getProperty("zeppelin.R.image.width");
String[] sl = lines.split("\n");

View file

@ -96,6 +96,7 @@ public class SparkSqlInterpreter extends Interpreter {
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
}
sparkInterpreter.populateSparkWebUrl(context);
sqlc = getSparkInterpreter().getSQLContext();
SparkContext sc = sqlc.sparkContext();
if (concurrentSQL()) {

View file

@ -316,7 +316,6 @@ The following components are provided under the BSD-style License.
(BSD-like) ASM asm-tree (org.ow2.asm:asm-tree:5.0.3 - http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
(BSD-like) ASM asm-analysis (org.ow2.asm:asm-analysis:5.0.3 - http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
(BSD-like) ASM asm-utils (org.ow2.asm:asm-utils:5.0.3 - http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
(New BSD License) Markdown4j (org.commonjava.googlecode.markdown4j:markdown4j:jar:2.2-cj-1.0 - https://code.google.com/p/markdown4j/)
(New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(New BSD License) Py4J (net.sf.py4j:py4j:0.10.3 - http://py4j.sourceforge.net/) - https://github.com/bartdag/py4j/blob/0.10.3/LICENSE.txt
(New BSD License) Markdown4j (org.commonjava.googlecode.markdown4j:markdown4j:jar:2.2-cj-1.0 - https://code.google.com/p/markdown4j/)

View file

@ -20,10 +20,12 @@ package org.apache.zeppelin.interpreter;
import java.util.List;
import java.util.Map;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.resource.ResourcePool;
/**
@ -58,6 +60,7 @@ public class InterpreterContext {
private ResourcePool resourcePool;
private List<InterpreterContextRunner> runners;
private String className;
private RemoteEventClientWrapper client;
public InterpreterContext(String noteId,
String paragraphId,
@ -86,6 +89,22 @@ public class InterpreterContext {
this.out = out;
}
public InterpreterContext(String noteId,
String paragraphId,
String paragraphTitle,
String paragraphText,
AuthenticationInfo authenticationInfo,
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry,
ResourcePool resourcePool,
List<InterpreterContextRunner> contextRunners,
InterpreterOutput output,
RemoteInterpreterEventClient eventClient) {
this(noteId, paragraphId, paragraphTitle, paragraphText, authenticationInfo, config, gui,
angularObjectRegistry, resourcePool, contextRunners, output);
this.client = new RemoteEventClient(eventClient);
}
public String getNoteId() {
return noteId;
@ -138,4 +157,8 @@ public class InterpreterContext {
public void setClassName(String className) {
this.className = className;
}
public RemoteEventClientWrapper getClient() {
return client;
}
}

View file

@ -0,0 +1,24 @@
package org.apache.zeppelin.interpreter.remote;
import java.util.Map;
/**
*
* Wrapper arnd RemoteInterpreterEventClient
* to expose methods in the client
*
*/
public class RemoteEventClient implements RemoteEventClientWrapper {
private RemoteInterpreterEventClient client;
public RemoteEventClient(RemoteInterpreterEventClient client) {
this.client = client;
}
@Override
public void onMetaInfosReceived(Map<String, String> infos) {
client.onMetaInfosReceived(infos);
}
}

View file

@ -0,0 +1,15 @@
package org.apache.zeppelin.interpreter.remote;
import java.util.Map;
/**
*
* Wrapper interface for RemoterInterpreterEventClient
* to expose only required methods from EventClient
*
*/
public interface RemoteEventClientWrapper {
public void onMetaInfosReceived(Map<String, String> infos);
}

View file

@ -279,6 +279,11 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
gson.toJson(appendOutput)));
}
public void onMetaInfosReceived(Map<String, String> infos) {
sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.META_INFOS,
gson.toJson(infos)));
}
/**
* Wait for eventQueue becomes empty
*/

View file

@ -195,6 +195,14 @@ public class RemoteInterpreterEventPoller extends Thread {
String status = appStatusUpdate.get("status");
appListener.onStatusChange(noteId, paragraphId, appId, status);
} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
Map<String, String> metaInfos = gson.fromJson(event.getData(),
new TypeToken<Map<String, String>>() {
}.getType());
String id = interpreterGroup.getId();
int indexOfColon = id.indexOf(":");
String settingId = id.substring(0, indexOfColon);
listener.onMetaInfosReceived(settingId, metaInfos);
}
logger.debug("Event from remoteproceess {}", event.getType());
} catch (Exception e) {

View file

@ -16,10 +16,13 @@
*/
package org.apache.zeppelin.interpreter.remote;
import java.util.Map;
/**
* Event from remoteInterpreterProcess
*/
public interface RemoteInterpreterProcessListener {
public void onOutputAppend(String noteId, String paragraphId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String output);
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
}

View file

@ -553,7 +553,7 @@ public class RemoteInterpreterServer
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry(),
interpreterGroup.getResourcePool(),
contextRunners, output);
contextRunners, output, eventClient);
}

View file

@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-10-19")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-18")
public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");

View file

@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-10-19")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-18")
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");

View file

@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-10-19")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-18")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-10-19")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-18")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -22,7 +22,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
OUTPUT_APPEND(8),
OUTPUT_UPDATE(9),
ANGULAR_REGISTRY_PUSH(10),
APP_STATUS_UPDATE(11);
APP_STATUS_UPDATE(11),
META_INFOS(12);
private final int value;
@ -65,6 +66,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return ANGULAR_REGISTRY_PUSH;
case 11:
return APP_STATUS_UPDATE;
case 12:
return META_INFOS;
default:
return null;
}

View file

@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-10-19")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-18")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-10-19")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-18")
public class RemoteInterpreterService {
public interface Iface {

View file

@ -50,6 +50,7 @@ enum RemoteInterpreterEventType {
OUTPUT_UPDATE = 9,
ANGULAR_REGISTRY_PUSH = 10,
APP_STATUS_UPDATE = 11,
META_INFOS = 12
}
struct RemoteInterpreterEvent {

View file

@ -29,6 +29,7 @@ import org.junit.Test;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@ -155,4 +156,9 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
public void onOutputUpdated(String noteId, String paragraphId, String output) {
}
@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
}
}

View file

@ -302,4 +302,9 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
public void onOutputUpdated(String noteId, String paragraphId, String output) {
}
@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
}
}

View file

@ -18,9 +18,12 @@
package org.apache.zeppelin.rest;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@ -28,6 +31,7 @@ import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@ -215,6 +219,31 @@ public class InterpreterRestApi {
return new JsonResponse(Status.CREATED).build();
}
/**
* get the metainfo property value
*/
@GET
@Path("getmetainfos/{settingId}")
public Response getMetaInfo(@Context HttpServletRequest req,
@PathParam("settingId") String settingId) {
String propName = req.getParameter("propName");
if (propName == null) {
return new JsonResponse<>(Status.BAD_REQUEST).build();
}
String propValue = null;
InterpreterSetting interpreterSetting = interpreterFactory.get(settingId);
Map<String, String> infos = interpreterSetting.getInfos();
if (infos != null) {
propValue = infos.get(propName);
}
Map<String, String> respMap = new HashMap<>();
respMap.put(propName, propValue);
logger.debug("Get meta info");
logger.debug("Interpretersetting Id: {}, property Name:{}, property value: {}", settingId,
propName, propValue);
return new JsonResponse<>(Status.OK, respMap).build();
}
/**
* Delete repository
*

View file

@ -48,16 +48,24 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.NotebookEventListener;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.ParagraphJobListener;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.notebook.socket.WatcherMessage;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.ticket.TicketContainer;
import org.apache.zeppelin.types.InterpreterSettingsList;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.util.WatcherSecurityKey;
import org.apache.zeppelin.utils.InterpreterBindingUtils;
import org.apache.zeppelin.utils.SecurityUtils;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
@ -67,6 +75,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@ -97,6 +106,14 @@ public class NotebookServer extends WebSocketServlet implements
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
/**
* This is a special endpoint in the notebook websoket, Every connection in this Queue
* will be able to watch every websocket event, it doesnt need to be listed into the map of
* noteSocketMap. This can be used to get information about websocket traffic and watch what
* is going on.
*/
final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
private Notebook notebook() {
return ZeppelinServer.notebook;
}
@ -275,6 +292,9 @@ public class NotebookServer extends WebSocketServlet implements
case GET_INTERPRETER_SETTINGS:
getInterpreterSettings(conn, subject);
break;
case WATCHER:
switchConnectionToWatcher(conn, messagereceived);
break;
default:
break;
}
@ -389,6 +409,7 @@ public class NotebookServer extends WebSocketServlet implements
private void broadcast(String noteId, Message m) {
synchronized (noteSocketMap) {
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
if (socketLists == null || socketLists.size() == 0) {
return;
@ -406,6 +427,7 @@ public class NotebookServer extends WebSocketServlet implements
private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
synchronized (noteSocketMap) {
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
if (socketLists == null || socketLists.size() == 0) {
return;
@ -431,11 +453,7 @@ public class NotebookServer extends WebSocketServlet implements
}
for (NotebookSocket conn: userConnectedSockets.get(user)) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
LOG.error("socket error", e);
}
unicast(m, conn);
}
}
@ -445,6 +463,7 @@ public class NotebookServer extends WebSocketServlet implements
} catch (IOException e) {
LOG.error("socket error", e);
}
broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
}
public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
@ -545,10 +564,8 @@ public class NotebookServer extends WebSocketServlet implements
broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
}
public void broadcastInterpreterBindings(String noteId,
List settingList) {
broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS)
.put("interpreterBindings", settingList));
public void broadcastInterpreterBindings(String noteId, List settingList) {
broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS).put("interpreterBindings", settingList));
}
public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
@ -1764,5 +1781,56 @@ public class NotebookServer extends WebSocketServlet implements
.put("interpreterSettings", availableSettings)));
}
@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
InterpreterSetting interpreterSetting = notebook().getInterpreterFactory()
.get(settingId);
interpreterSetting.setInfos(metaInfos);
}
private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived)
throws IOException {
if (!isSessionAllowedToSwitchToWatcher(conn)) {
LOG.error("Cannot switch this client to watcher, invalid security key");
return;
}
LOG.info("Going to add {} to watcher socket", conn);
// add the connection to the watcher.
if (watcherSockets.contains(conn)) {
LOG.info("connection alrerady present in the watcher");
return;
}
watcherSockets.add(conn);
// remove this connection from regular zeppelin ws usage.
removeConnectionFromAllNote(conn);
connectedSockets.remove(conn);
removeUserConnection(conn.getUser(), conn);
}
private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
return !(StringUtils.isBlank(watcherSecurityKey)
|| !watcherSecurityKey.equals(WatcherSecurityKey.getKey()));
}
private void broadcastToWatchers(String noteId, String subject, Message message) {
synchronized (watcherSockets) {
if (watcherSockets.isEmpty()) {
return;
}
for (NotebookSocket watcher : watcherSockets) {
try {
watcher.send(WatcherMessage
.builder(noteId)
.subject(subject)
.message(serializeMessage(message))
.build()
.serialize());
} catch (IOException e) {
LOG.error("Cannot broadcast message to watcher", e);
}
}
}
}
}

View file

@ -173,8 +173,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
while (p.getStatus() != Status.FINISHED) {
Thread.sleep(100);
}
assertEquals("<p>markdown</p>\n", p.getResult().message());
assertEquals(p.getResult().message(), getSimulatedMarkdownResult("markdown"));
// restart interpreter
for (InterpreterSetting setting : ZeppelinServer.notebook.getInterpreterFactory().getInterpreterSettings(note.getId())) {
@ -196,7 +195,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
while (p.getStatus() != Status.FINISHED) {
Thread.sleep(100);
}
assertEquals("<p>markdown restarted</p>\n", p.getResult().message());
assertEquals(p.getResult().message(), getSimulatedMarkdownResult("markdown restarted"));
//cleanup
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}
@ -218,7 +217,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
while (p.getStatus() != Status.FINISHED) {
Thread.sleep(100);
}
assertEquals("<p>markdown</p>\n", p.getResult().message());
assertEquals(p.getResult().message(), getSimulatedMarkdownResult("markdown"));
// get md interpreter
InterpreterSetting mdIntpSetting = null;
@ -275,4 +274,8 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
assertThat("Test delete method:", delete, isAllowed());
delete.releaseConnection();
}
public static String getSimulatedMarkdownResult(String markdown) {
return String.format("<div class=\"markdown-body\">\n<p>%s</p>\n</div>", markdown);
}
}

View file

@ -24,6 +24,7 @@
$scope.availableInterpreters = {};
$scope.showAddNewSetting = false;
$scope.showRepositoryInfo = false;
$scope.searchInterpreter = '';
$scope._ = _;
ngToast.dismiss();
@ -346,8 +347,8 @@
.success(function(data, status, headers, config) {
$scope.interpreterSettings[index] = data.body;
removeTMPSettings(index);
checkDownloadingDependencies();
thisConfirm.close();
$route.reload();
})
.error(function(data, status, headers, config) {
console.log('Error %o %o', status, data.message);
@ -691,6 +692,22 @@
getRepositories();
};
$scope.showSparkUI = function(settingId) {
$http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId + '?propName=url')
.success(function(data, status, headers, config) {
var url = data.body.url;
if (!url) {
BootstrapDialog.alert({
message: 'No spark application running'
});
return;
}
window.open(url, '_blank');
}).error(function(data, status, headers, config) {
console.log('Error %o %o', status, data.message);
});
};
init();
}

View file

@ -128,6 +128,10 @@ limitations under the License.
</h3>
<span style="float:right" ng-show="!valueform.$visible" >
<button class="btn btn-default btn-xs"
ng-click="showSparkUI(setting.id)"
ng-show="setting.group == 'spark'">
<span class="fa fa-external-link"></span> spark ui</button>
<button class="btn btn-default btn-xs"
ng-click="valueform.$show();
copyOriginInterpreterSettingProperties(setting.id)">

View file

@ -21,8 +21,8 @@ limitations under the License.
<div id="{{paragraph.id}}_comment"
class="text"
ng-if="getResultType()=='TABLE' && paragraph.result.comment"
ng-bind-html="paragraph.result.comment">
ng-if="getResultType()=='TABLE' && tableDataComment"
ng-bind-html="tableDataComment">
</div>
<div id="{{paragraph.id}}_text"

View file

@ -200,6 +200,7 @@
tableData = new TableData();
tableData.loadParagraphResult($scope.paragraph.result);
$scope.tableDataColumns = tableData.columns;
$scope.tableDataComment = tableData.comment;
$scope.setGraphMode($scope.getGraphMode(), false, false);
} else if ($scope.getResultType() === 'HTML') {
$scope.renderHtml();
@ -1054,11 +1055,30 @@
};
$timeout(retryRenderer);
} else if (refresh) {
console.log('Refresh data');
// when graph options or data are changed
builtInViz.instance.setConfig($scope.paragraph.config.graph);
builtInViz.instance.render(tableData);
var retryRenderer = function() {
var targetEl = angular.element('#p' + $scope.paragraph.id + '_' + type);
if (targetEl.length) {
targetEl.height(height);
builtInViz.instance.setConfig($scope.paragraph.config.graph);
builtInViz.instance.render(tableData);
} else {
$timeout(retryRenderer, 10);
}
};
$timeout(retryRenderer);
} else {
builtInViz.instance.activate();
var retryRenderer = function() {
var targetEl = angular.element('#p' + $scope.paragraph.id + '_' + type);
if (targetEl.length) {
targetEl.height(height);
builtInViz.instance.activate();
} else {
$timeout(retryRenderer, 10);
}
};
$timeout(retryRenderer);
}
}
}
@ -1297,15 +1317,15 @@
dsv += tableData.columns[titleIndex].name + delimiter;
}
dsv = dsv.substring(0, dsv.length - 1) + '\n';
for (var r in $scope.paragraph.result.msgTable) {
var row = $scope.paragraph.result.msgTable[r];
for (var r in tableData.rows) {
var row = tableData.rows[r];
var dsvRow = '';
for (var index in row) {
var stringValue = (row[index].value).toString();
var stringValue = (row[index]).toString();
if (stringValue.contains(delimiter)) {
dsvRow += '"' + stringValue + '"' + delimiter;
} else {
dsvRow += row[index].value + delimiter;
dsvRow += row[index] + delimiter;
}
}
dsv += dsvRow.substring(0, dsvRow.length - 1) + '\n';
@ -1691,6 +1711,7 @@
tableData = new TableData();
tableData.loadParagraphResult($scope.paragraph.result);
$scope.tableDataColumns = tableData.columns;
$scope.tableDataComment = tableData.comment;
clearUnknownColsFromGraphOption();
selectDefaultColsForGraphOption();
}

View file

@ -51,11 +51,8 @@ zeppelin.Visualization.prototype.refresh = function() {
* Activate. invoked when visualization is selected
*/
zeppelin.Visualization.prototype.activate = function() {
console.log('active');
if (!this._active && this._resized) {
var self = this;
// give some time for element ready
setTimeout(function() {self.refresh();}, 300);
if (!this._active || this._resized) {
this.refresh();
this._resized = false;
}
this._active = true;
@ -65,7 +62,6 @@ zeppelin.Visualization.prototype.activate = function() {
* Activate. invoked when visualization is de selected
*/
zeppelin.Visualization.prototype.deactivate = function() {
console.log('deactive');
this._active = false;
};

View file

@ -83,12 +83,20 @@
vm.importNote = function() {
$scope.note.errorText = '';
if ($scope.note.importUrl) {
jQuery.getJSON($scope.note.importUrl, function(result) {
vm.processImportJson(result);
}).fail(function() {
$scope.note.errorText = 'Unable to Fetch URL';
$scope.$apply();
});
jQuery.ajax({
url: $scope.note.importUrl,
type: 'GET',
dataType: 'json',
jsonp: false,
xhrFields: {
withCredentials: false
},
error: function(xhr, ajaxOptions, thrownError) {
$scope.note.errorText = 'Unable to Fetch URL';
$scope.$apply();
}}).done(function(data) {
vm.processImportJson(data);
});
} else {
$scope.note.errorText = 'Enter URL';
$scope.$apply();

View file

@ -982,6 +982,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
// Check if dependency in specified path is changed
// If it did, overwrite old dependency jar with new one
if (intpSetting != null) {
//clean up metaInfos
intpSetting.setInfos(null);
copyDependenciesFromLocalPath(intpSetting);
stopJobAllInterpreter(intpSetting);

View file

@ -44,6 +44,8 @@ public class InterpreterSetting {
private String name;
// always be null in case of InterpreterSettingRef
private String group;
private transient Map<String, String> infos;
/**
* properties can be either Properties or Map<String, InterpreterProperty>
* properties should be:
@ -276,4 +278,12 @@ public class InterpreterSetting {
public void setErrorReason(String errorReason) {
this.errorReason = errorReason;
}
public void setInfos(Map<String, String> infos) {
this.infos = infos;
}
public Map<String, String> getInfos() {
return infos;
}
}

View file

@ -30,12 +30,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
@ -57,6 +52,11 @@ import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.Credentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
/**
* Binded interpreters for a note

View file

@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -27,13 +28,14 @@ import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.WatcherWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHeartbeat;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.util.WatcherSecurityKey;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
@ -54,11 +56,14 @@ public class ZeppelinClient {
private final String zeppelinhubToken;
private final WebSocketClient wsClient;
private static Gson gson;
private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
// Keep track of current open connection per notebook.
private ConcurrentHashMap<String, Session> notesConnection;
// Listen to every note actions.
private static Session watcherSession;
private static ZeppelinClient instance = null;
private SchedulerService schedulerService;
private Authentication authModule;
private static final int min = 60;
private static final int MIN = 60;
public static ZeppelinClient initialize(String zeppelinUrl, String token,
ZeppelinConfiguration conf) {
@ -77,7 +82,7 @@ public class ZeppelinClient {
zeppelinhubToken = token;
wsClient = createNewWebsocketClient();
gson = new Gson();
zeppelinConnectionMap = new ConcurrentHashMap<>();
notesConnection = new ConcurrentHashMap<>();
schedulerService = SchedulerService.getInstance();
authModule = Authentication.initialize(token, conf);
if (authModule != null) {
@ -89,7 +94,7 @@ public class ZeppelinClient {
private WebSocketClient createNewWebsocketClient() {
SslContextFactory sslContextFactory = new SslContextFactory();
WebSocketClient client = new WebSocketClient(sslContextFactory);
client.setMaxIdleTimeout(5 * min * 1000);
client.setMaxIdleTimeout(5 * MIN * 1000);
client.setMaxTextMessageBufferSize(Client.getMaxNoteSize());
client.getPolicy().setMaxTextMessageSize(Client.getMaxNoteSize());
//TODO(khalid): other client settings
@ -110,17 +115,26 @@ public class ZeppelinClient {
}
private void addRoutines() {
schedulerService.add(ZeppelinHeartbeat.newInstance(this), 15, 4 * min);
schedulerService.add(ZeppelinHeartbeat.newInstance(this), 10, 1 * MIN);
new Timer().schedule(new java.util.TimerTask() {
@Override
public void run() {
watcherSession = openWatcherSession();
}
}, 5000);
}
public void stop() {
try {
if (wsClient != null) {
removeAllZeppelinConnections();
removeAllConnections();
wsClient.stop();
} else {
LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
}
if (watcherSession != null) {
watcherSession.close();
}
} catch (Exception e) {
LOG.error("Cannot stop Zeppelin websocket client", e);
}
@ -153,6 +167,22 @@ public class ZeppelinClient {
}
return msg;
}
private Session openWatcherSession() {
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setHeader(WatcherSecurityKey.HTTP_HEADER, WatcherSecurityKey.getKey());
WatcherWebsocket socket = WatcherWebsocket.createInstace();
Future<Session> future = null;
Session session = null;
try {
future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
session = future.get();
} catch (IOException | InterruptedException | ExecutionException e) {
LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
return session;
}
return session;
}
public void send(Message msg, String noteId) {
Session noteSession = getZeppelinConnection(noteId);
@ -162,27 +192,16 @@ public class ZeppelinClient {
}
noteSession.getRemote().sendStringByFuture(serialize(msg));
}
private boolean isSessionOpen(Session session) {
return (session != null) && (session.isOpen());
}
/* per notebook based ws connection, returns null if can't connect */
public Session getZeppelinConnection(String noteId) {
if (StringUtils.isBlank(noteId)) {
LOG.warn("Cannot return websocket connection for blank noteId");
LOG.warn("Cannot get Websocket session with blanck noteId");
return null;
}
if (zeppelinConnectionMap.containsKey(noteId)) {
LOG.info("Connection for {} exists in map", noteId);
return getNoteSession(noteId);
}
//TODO(khalid): clean log later
LOG.info("Creating Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
return openNoteSession(noteId);
return getNoteSession(noteId);
}
/*
private Message zeppelinGetNoteMsg(String noteId) {
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
HashMap<String, Object> data = new HashMap<>();
@ -190,12 +209,14 @@ public class ZeppelinClient {
getNoteMsg.data = data;
return getNoteMsg;
}
*/
private Session getNoteSession(String noteId) {
Session session = zeppelinConnectionMap.get(noteId);
if (session == null || !session.isOpen()) {
LOG.info("Not connection to {}", noteId);
zeppelinConnectionMap.remove(noteId);
LOG.info("Getting Note websocket connection for note {}", noteId);
Session session = notesConnection.get(noteId);
if (!isSessionOpen(session)) {
LOG.info("No open connection for note {}, opening one", noteId);
notesConnection.remove(noteId);
session = openNoteSession(noteId);
}
return session;
@ -214,17 +235,28 @@ public class ZeppelinClient {
return session;
}
if (zeppelinConnectionMap.containsKey(noteId)) {
if (notesConnection.containsKey(noteId)) {
session.close();
session = zeppelinConnectionMap.get(noteId);
session = notesConnection.get(noteId);
} else {
String getNote = serialize(zeppelinGetNoteMsg(noteId));
// TODO(khalid): may need to check return whether successful
session.getRemote().sendStringByFuture(getNote);
zeppelinConnectionMap.put(noteId, session);
notesConnection.put(noteId, session);
}
return session;
}
private boolean isSessionOpen(Session session) {
return (session != null) && (session.isOpen());
}
private Message zeppelinGetNoteMsg(String noteId) {
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("id", noteId);
getNoteMsg.data = data;
return getNoteMsg;
}
public void handleMsgFromZeppelin(String message, String noteId) {
Map<String, String> meta = new HashMap<>();
@ -243,46 +275,48 @@ public class ZeppelinClient {
client.relayToZeppelinHub(hubMsg.serialize());
}
public void removeNoteConnection(String noteId) {
if (StringUtils.isBlank(noteId)) {
LOG.error("Cannot remove session for empty noteId");
return;
}
if (notesConnection.containsKey(noteId)) {
Session connection = notesConnection.get(noteId);
if (connection.isOpen()) {
connection.close();
}
notesConnection.remove(noteId);
}
LOG.info("Removed note websocket connection for note {}", noteId);
}
private void removeAllConnections() {
if (watcherSession != null && watcherSession.isOpen()) {
watcherSession.close();
}
Session noteSession = null;
for (Map.Entry<String, Session> note: notesConnection.entrySet()) {
noteSession = note.getValue();
if (isSessionOpen(noteSession)) {
noteSession.close();
}
}
notesConnection.clear();
}
public void ping() {
if (watcherSession == null) {
LOG.info("Cannot send PING event, no watcher found");
return;
}
watcherSession.getRemote().sendStringByFuture(serialize(new Message(OP.PING)));
}
/**
* Close and remove ZeppelinConnection
* Only used in test.
*/
public void removeZeppelinConnection(String noteId) {
if (zeppelinConnectionMap.containsKey(noteId)) {
Session conn = zeppelinConnectionMap.get(noteId);
if (conn.isOpen()) {
conn.close();
}
zeppelinConnectionMap.remove(noteId);
}
// TODO(khalid): clean log later
LOG.info("Removed Zeppelin ws connection for the following note {}", noteId);
}
/**
* Close and remove all ZeppelinConnection
*/
public void removeAllZeppelinConnections() {
for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
if (isSessionOpen(entry.getValue())) {
entry.getValue().close();
}
zeppelinConnectionMap.remove(entry.getKey());
}
LOG.info("Removed all Zeppelin ws connections");
}
public void pingAllNotes() {
for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
if (isSessionOpen(entry.getValue())) {
send(new Message(OP.PING), entry.getKey());
} else {
// for cleanup
zeppelinConnectionMap.remove(entry.getKey());
}
}
}
public int countConnectedNotes() {
return zeppelinConnectionMap.size();
return notesConnection.size();
}
}

View file

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.notebook.socket.WatcherMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
/**
* Zeppelin Watcher that will forward user note to ZeppelinHub.
*
*/
public class WatcherWebsocket implements WebSocketListener {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class);
private static final Gson GSON = new Gson();
public Session connection;
public static WatcherWebsocket createInstace() {
return new WatcherWebsocket();
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len) {
}
@Override
public void onWebSocketClose(int code, String reason) {
LOG.info("WatcherWebsocket connection closed with code: {}, message: {}", code, reason);
}
@Override
public void onWebSocketConnect(Session session) {
LOG.info("WatcherWebsocket connection opened");
this.connection = session;
session.getRemote().sendStringByFuture(GSON.toJson(new Message(OP.WATCHER)));
}
@Override
public void onWebSocketError(Throwable cause) {
LOG.warn("WatcherWebsocket socket connection error ", cause);
}
@Override
public void onWebSocketText(String message) {
WatcherMessage watcherMsg = GSON.fromJson(message, WatcherMessage.class);
if (StringUtils.isBlank(watcherMsg.noteId)) {
return;
}
try {
ZeppelinClient zeppelinClient = ZeppelinClient.getInstance();
if (zeppelinClient != null) {
zeppelinClient.handleMsgFromZeppelin(watcherMsg.message, watcherMsg.noteId);
}
} catch (Exception e) {
LOG.error("Failed to send message to ZeppelinHub: ", e);
}
}
}

View file

@ -43,7 +43,7 @@ public class ZeppelinWebsocket implements WebSocketListener {
@Override
public void onWebSocketClose(int code, String message) {
LOG.info("Zeppelin connection closed with code: {}, message: {}", code, message);
// parentClient.removeConnMap(noteId);
ZeppelinClient.getInstance().removeNoteConnection(noteId);
}
@Override
@ -54,7 +54,8 @@ public class ZeppelinWebsocket implements WebSocketListener {
@Override
public void onWebSocketError(Throwable e) {
LOG.warn("Zeppelin socket connection error: {}", e.toString());
LOG.warn("Zeppelin socket connection error ", e);
ZeppelinClient.getInstance().removeNoteConnection(noteId);
}
@Override

View file

@ -38,7 +38,7 @@ public class ZeppelinHeartbeat implements Runnable {
@Override
public void run() {
LOG.debug("Sending PING to all connected Zeppelin notes");
client.pingAllNotes();
LOG.debug("Sending PING to Zeppelin Websocket Server");
client.ping();
}
}

View file

@ -145,9 +145,12 @@ public class Message {
INTERPRETER_BINDINGS, // [s-c] interpreter bindings
GET_INTERPRETER_SETTINGS, // [c-s] get interpreter settings
INTERPRETER_SETTINGS, // [s-c] interpreter settings
ERROR_INFO // [s-c] error information to be sent
ERROR_INFO, // [s-c] error information to be sent
WATCHER, // [s-c] Change websocket to watcher mode.
}
public static final Message EMPTY = new Message(null);
public OP op;
public Map<String, Object> data = new HashMap<>();
public String ticket = "anonymous";

View file

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.socket;
import com.google.gson.Gson;
/**
* Zeppelin websocket massage template class for watcher socket.
*/
public class WatcherMessage {
public String message;
public String noteId;
public String subject;
private static final Gson gson = new Gson();
public static Builder builder(String noteId) {
return new Builder(noteId);
}
private WatcherMessage(Builder builder) {
this.noteId = builder.noteId;
this.message = builder.message;
this.subject = builder.subject;
}
public String serialize() {
return gson.toJson(this);
}
/**
* Simple builder.
*/
public static class Builder {
private final String noteId;
private String subject;
private String message;
public Builder(String noteId) {
this.noteId = noteId;
}
public Builder subject(String subject) {
this.subject = subject;
return this;
}
public Builder message(String message) {
this.message = message;
return this;
}
public WatcherMessage build() {
return new WatcherMessage(this);
}
}
}

View file

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.util;
import java.util.UUID;
/**
* Simple implementation of a auto-generated key for websocket watcher.
* This is a LAZY implementation, we might want to update this later on :)
*/
public class WatcherSecurityKey {
public static final String HTTP_HEADER = "X-Watcher-Key";
private static final String KEY = UUID.randomUUID().toString();
protected WatcherSecurityKey() {}
public static String getKey() {
return KEY;
}
}

View file

@ -1,6 +1,10 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -74,7 +78,7 @@ public class ZeppelinClientTest {
assertEquals(connectionB, client.getZeppelinConnection("BBBB"));
// Remove connection to note AAAA
client.removeZeppelinConnection("AAAA");
client.removeNoteConnection("AAAA");
assertEquals(client.countConnectedNotes(), 1);
assertNotEquals(connectionA, client.getZeppelinConnection("AAAA"));
assertEquals(client.countConnectedNotes(), 2);
@ -117,7 +121,7 @@ public class ZeppelinClientTest {
msg.data = Maps.newHashMap();
msg.data.put("key", "value");
client.send(msg, "DDDD");
client.removeZeppelinConnection("DDDD");
client.removeNoteConnection("DDDD");
client.stop();
}
}