Merge remote-tracking branch 'origin/master' into livyInterperter

This commit is contained in:
Prabhjyot Singh 2016-03-22 11:50:27 +05:30
commit 32fbc1a082
31 changed files with 1364 additions and 143 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 202 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 155 KiB

View file

@ -23,6 +23,122 @@ Authentication is company-specific.
One option is to use [Basic Access Authentication](https://en.wikipedia.org/wiki/Basic_access_authentication)
### HTTP Basic Authentication using NGINX
> **Quote from Wikipedia:** NGINX is a web server. It can act as a reverse proxy server for HTTP, HTTPS, SMTP, POP3, and IMAP protocols, as well as a load balancer and an HTTP cache.
So you can use NGINX server as proxy server to serve HTTP Basic Authentication as a separate process along with Zeppelin server.
Here are instructions how to accomplish the setup NGINX as a front-end authentication server and connect Zeppelin at behind.
This instruction based on Ubuntu 14.04 LTS but may work with other OS with few configuration changes.
1. Install NGINX server on your server instance
You can install NGINX server with same machine where zeppelin installed or separate machine where it is dedicated to serve as proxy server.
```
$ apt-get install nginx
```
1. Setup init script in NGINX
In most cases, NGINX configuration located under `/etc/nginx/sites-available`. Create your own configuration or add your existing configuration at `/etc/nginx/sites-available`.
```
$ cd /etc/nginx/sites-available
$ touch my-basic-auth
```
Now add this script into `my-basic-auth` file. You can comment out `optional` lines If you want serve Zeppelin under regular HTTP 80 Port.
```
upstream zeppelin {
server [YOUR-ZEPPELIN-SERVER-IP]:8090;
}
upstream zeppelin-wss {
server [YOUR-ZEPPELIN-SERVER-IP]:8091;
}
# Zeppelin Website
server {
listen [YOUR-ZEPPELIN-WEB-SERVER-PORT];
listen 443 ssl; # optional, to serve HTTPS connection
server_name [YOUR-ZEPPELIN-SERVER-HOST]; # for example: zeppelin.mycompany.com
ssl_certificate [PATH-TO-YOUR-CERT-FILE]; # optional, to serve HTTPS connection
ssl_certificate_key [PATH-TO-YOUR-CERT-KEY-FILE]; # optional, to serve HTTPS connection
if ($ssl_protocol = "") {
rewrite ^ https://$host$request_uri? permanent; # optional, force to use HTTPS
}
location / {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_set_header X-NginX-Proxy true;
proxy_pass http://zeppelin;
proxy_redirect off;
auth_basic "Restricted";
auth_basic_user_file /etc/nginx/.htpasswd;
}
}
# Zeppelin Websocket
server {
listen [YOUR-ZEPPELIN-WEBSOCKET-PORT] ssl; # add ssl is optional, to serve HTTPS connection
server_name [YOUR-ZEPPELIN-SERVER-HOST]; # for example: zeppelin.mycompany.com
ssl_certificate [PATH-TO-YOUR-CERT-FILE]; # optional, to serve HTTPS connection
ssl_certificate_key [PATH-TO-YOUR-CERT-KEY-FILE]; # optional, to serve HTTPS connection
location / {
proxy_pass http://zeppelin-wss;
proxy_http_version 1.1;
proxy_set_header Upgrade websocket;
proxy_set_header Connection upgrade;
proxy_read_timeout 86400;
}
}
```
Then make a symbolic link to this file from `/etc/nginx/sites-enabled/` to enable configuration above when NGINX reloads.
```
$ ln -s /etc/nginx/sites-enabled/my-basic-auth /etc/nginx/sites-available/my-basic-auth
```
1. Setup user credential into `.htpasswd` file and restart server
Now you need to setup `.htpasswd` file to serve list of authenticated user credentials for NGINX server.
```
$ cd /etc/nginx
$ htpasswd -c htpasswd [YOUR_ID]
$ NEW passwd: [YOUR_PASSWORD]
$ RE-type new passwd: [YOUR_PASSWORD_AGAIN]
```
Or you can use your own apache `.htpasswd` files in other location by setup property `auth_basic_user_file`
Restart NGINX server.
```
$ service nginx restart
```
Then check HTTP Basic Authentication works in browser. If you can see regular basic auth popup and then able to login with credential you entered into `.htpasswd` you are good to go.
<img src="/assets/themes/zeppelin/img/screenshots/authentication-basic-auth-nginx-request.png" />
<img src="/assets/themes/zeppelin/img/screenshots/authentication-basic-auth-nginx-https.png" />
1. More security consideration
* Using HTTPS connection with Basic Authentication is highly recommended since basic auth without encryption may expose your important credential information over the network.
* Using [Shiro Security feature built-into Zeppelin](https://github.com/apache/incubator-zeppelin/blob/master/SECURITY-README.md) is recommended if you prefer all-in-one solution for authentication but NGINX may provides ad-hoc solution for re-use authentication served by your system's NGINX server or in case of you need to separate authentication from zeppelin server.
* It is recommended to isolate direct connection to Zeppelin server from public internet or external services to secure your zeppelin instance from unexpected attack or problems caused by public zone.
### Another option
Another option is to have an authentication server that can verify user credentials in an LDAP server.
If an incoming request to the Zeppelin server does not have a cookie with user information encrypted with the authentication server public key, the user
is redirected to the authentication server. Once the user is verified, the authentication server redirects the browser to a specific

View file

@ -195,6 +195,13 @@
<version>2.4</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>

View file

@ -774,13 +774,34 @@ public class SparkInterpreter extends Interpreter {
context.out.clear();
Code r = null;
String incomplete = "";
boolean inComment = false;
for (int l = 0; l < linesToRun.length; l++) {
String s = linesToRun[l];
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
if (l + 1 < linesToRun.length) {
String nextLine = linesToRun[l + 1].trim();
if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) {
boolean continuation = false;
if (nextLine.isEmpty()
|| nextLine.startsWith("//") // skip empty line or comment
|| nextLine.startsWith("}")
|| nextLine.startsWith("object")) { // include "} object" for Scala companion object
continuation = true;
} else if (!inComment && nextLine.startsWith("/*")) {
inComment = true;
continuation = true;
} else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
inComment = false;
continuation = true;
} else if (nextLine.length() > 1
&& nextLine.charAt(0) == '.'
&& nextLine.charAt(1) != '.' // ".."
&& nextLine.charAt(1) != '/') { // "./"
continuation = true;
} else if (inComment) {
continuation = true;
}
if (continuation) {
incomplete += s + "\n";
continue;
}

View file

@ -371,7 +371,11 @@ public class ZeppelinContext {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
String noteId = interpreterContext.getNoteId();
// try get local object
AngularObject ao = registry.get(name, interpreterContext.getNoteId(), null);
AngularObject paragraphAo = registry.get(name, noteId, interpreterContext.getParagraphId());
AngularObject noteAo = registry.get(name, noteId, null);
AngularObject ao = paragraphAo != null ? paragraphAo : noteAo;
if (ao == null) {
// then global object
ao = registry.get(name, null, null);

View file

@ -41,6 +41,9 @@ class Logger(object):
def reset(self):
self.out = ""
def flush(self):
pass
class PyZeppelinContext(dict):
def __init__(self, zc):

View file

@ -141,6 +141,17 @@ public class SparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
}
@Test
public void testNextLineComments() {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
}
@Test
public void testNextLineCompanionObject() {
String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}";
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, context).code());
}
@Test
public void testEndWithComment() {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.display;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.apache.zeppelin.scheduler.ExecutorFactory;
@ -43,6 +44,15 @@ public class AngularObject<T> {
private String noteId; // noteId belonging to. null for global scope
private String paragraphId; // paragraphId belongs to. null for notebook scope
/**
* Public constructor, neccessary for the deserialization when using Thrift angularRegistryPush()
* Without public constructor, GSON library will instantiate the AngularObject using
* serialization so the <strong>watchers</strong> list won't be initialized and will throw
* NullPointerException the first time it is accessed
*/
public AngularObject() {
}
/**
* To create new AngularObject, use AngularObjectRegistry.add()
*
@ -111,17 +121,17 @@ public class AngularObject<T> {
@Override
public boolean equals(Object o) {
if (o instanceof AngularObject) {
AngularObject ao = (AngularObject) o;
if (noteId == null && ao.noteId == null ||
(noteId != null && ao.noteId != null && noteId.equals(ao.noteId))) {
if (paragraphId == null && ao.paragraphId == null ||
(paragraphId != null && ao.paragraphId != null && paragraphId.equals(ao.paragraphId))) {
return name.equals(ao.name);
}
}
}
return false;
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AngularObject<?> that = (AngularObject<?>) o;
return Objects.equals(name, that.name) &&
Objects.equals(noteId, that.noteId) &&
Objects.equals(paragraphId, that.paragraphId);
}
@Override
public int hashCode() {
return Objects.hash(name, noteId, paragraphId);
}
/**
@ -232,4 +242,14 @@ public class AngularObject<T> {
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("AngularObject{");
sb.append("noteId='").append(noteId).append('\'');
sb.append(", paragraphId='").append(paragraphId).append('\'');
sb.append(", object=").append(object);
sb.append(", name='").append(name).append('\'');
sb.append('}');
return sb.toString();
}
}

View file

@ -246,4 +246,12 @@ public class AngularObjectRegistry {
public String getInterpreterGroupId() {
return interpreterId;
}
public Map<String, Map<String, AngularObject>> getRegistry() {
return registry;
}
public void setRegistry(Map<String, Map<String, AngularObject>> registry) {
this.registry = registry;
}
}

View file

@ -48,6 +48,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
AngularObjectRegistry angularObjectRegistry;
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
ResourcePool resourcePool;
boolean angularRegistryPushed = false;
// map [notebook session, Interpreters in the group], to support per note session interpreters
//Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
@ -254,4 +255,12 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public ResourcePool getResourcePool() {
return resourcePool;
}
public boolean isAngularRegistryPushed() {
return angularRegistryPushed;
}
public void setAngularRegistryPushed(boolean angularRegistryPushed) {
this.angularRegistryPushed = angularRegistryPushed;
}
}

View file

@ -63,7 +63,7 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
Gson gson = new Gson();
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
return null;
return super.add(name, o, noteId, paragraphId, true);
}
Client client = null;
@ -97,7 +97,7 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
paragraphId) {
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
return null;
return super.remove(name, noteId, paragraphId);
}
Client client = null;

View file

@ -20,6 +20,8 @@ package org.apache.zeppelin.interpreter.remote;
import java.util.*;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
@ -128,10 +130,11 @@ public class RemoteInterpreter extends Interpreter {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
interpreterProcess.reference(getInterpreterGroup());
final InterpreterGroup interpreterGroup = getInterpreterGroup();
interpreterProcess.reference(interpreterGroup);
interpreterProcess.setMaxPoolSize(
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
String groupId = getInterpreterGroup().getId();
String groupId = interpreterGroup.getId();
synchronized (interpreterProcess) {
Client client = null;
@ -146,7 +149,14 @@ public class RemoteInterpreter extends Interpreter {
logger.info("Create remote interpreter {}", getClassName());
property.put("zeppelin.interpreter.localRepo", localRepoPath);
client.createInterpreter(groupId, noteId,
getClassName(), (Map) property);
getClassName(), (Map) property);
// Push angular object loaded from JSON file to remote interpreter
if (!interpreterGroup.isAngularRegistryPushed()) {
pushAngularObjectRegistryToRemote(client);
interpreterGroup.setAngularRegistryPushed(true);
}
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@ -387,4 +397,30 @@ public class RemoteInterpreter extends Interpreter {
Type.valueOf(result.getType()),
result.getMsg());
}
/**
* Push local angular object registry to
* remote interpreter. This method should be
* call ONLY inside the init() method
* @param client
* @throws TException
*/
void pushAngularObjectRegistryToRemote(Client client) throws TException {
final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
.getAngularObjectRegistry();
if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
.getRegistry();
logger.info("Push local angular object registry from ZeppelinServer to" +
" remote interpreter group {}", this.getInterpreterGroup().getId());
final java.lang.reflect.Type registryType = new TypeToken<Map<String,
Map<String, AngularObject>>>() {}.getType();
Gson gson = new Gson();
client.angularRegistryPush(gson.toJson(registry, registryType));
}
}
}

View file

@ -684,4 +684,16 @@ public class RemoteInterpreterServer
}
}
}
@Override
public void angularRegistryPush(String registryAsString) throws TException {
try {
Map<String, Map<String, AngularObject>> deserializedRegistry = gson
.fromJson(registryAsString,
new TypeToken<Map<String, Map<String, AngularObject>>>() { }.getType());
interpreterGroup.getAngularObjectRegistry().setRegistry(deserializedRegistry);
} catch (Exception e) {
logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e);
}
}
}

View file

@ -1,20 +1,3 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -1,20 +1,3 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -1,20 +1,3 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
@ -37,7 +20,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
RESOURCE_POOL_GET_ALL(6),
RESOURCE_GET(7),
OUTPUT_APPEND(8),
OUTPUT_UPDATE(9);
OUTPUT_UPDATE(9),
ANGULAR_REGISTRY_PUSH(10);
private final int value;
@ -76,6 +60,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return OUTPUT_APPEND;
case 9:
return OUTPUT_UPDATE;
case 10:
return ANGULAR_REGISTRY_PUSH;
default:
return null;
}

View file

@ -1,20 +1,3 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -1,20 +1,3 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-17")
public class RemoteInterpreterService {
public interface Iface {
@ -94,6 +77,8 @@ public class RemoteInterpreterService {
public void angularObjectRemove(String name, String noteId, String paragraphId) throws org.apache.thrift.TException;
public void angularRegistryPush(String registry) throws org.apache.thrift.TException;
}
public interface AsyncIface {
@ -136,6 +121,8 @@ public class RemoteInterpreterService {
public void angularObjectRemove(String name, String noteId, String paragraphId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
public void angularRegistryPush(String registry, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
}
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
@ -592,6 +579,26 @@ public class RemoteInterpreterService {
return;
}
public void angularRegistryPush(String registry) throws org.apache.thrift.TException
{
send_angularRegistryPush(registry);
recv_angularRegistryPush();
}
public void send_angularRegistryPush(String registry) throws org.apache.thrift.TException
{
angularRegistryPush_args args = new angularRegistryPush_args();
args.setRegistry(registry);
sendBase("angularRegistryPush", args);
}
public void recv_angularRegistryPush() throws org.apache.thrift.TException
{
angularRegistryPush_result result = new angularRegistryPush_result();
receiveBase(result, "angularRegistryPush");
return;
}
}
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@ -1299,6 +1306,38 @@ public class RemoteInterpreterService {
}
}
public void angularRegistryPush(String registry, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
angularRegistryPush_call method_call = new angularRegistryPush_call(registry, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
public static class angularRegistryPush_call extends org.apache.thrift.async.TAsyncMethodCall {
private String registry;
public angularRegistryPush_call(String registry, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.registry = registry;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("angularRegistryPush", org.apache.thrift.protocol.TMessageType.CALL, 0));
angularRegistryPush_args args = new angularRegistryPush_args();
args.setRegistry(registry);
args.write(prot);
prot.writeMessageEnd();
}
public void getResult() throws org.apache.thrift.TException {
if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
throw new IllegalStateException("Method call not finished!");
}
org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
(new Client(prot)).recv_angularRegistryPush();
}
}
}
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
@ -1331,6 +1370,7 @@ public class RemoteInterpreterService {
processMap.put("angularObjectUpdate", new angularObjectUpdate());
processMap.put("angularObjectAdd", new angularObjectAdd());
processMap.put("angularObjectRemove", new angularObjectRemove());
processMap.put("angularRegistryPush", new angularRegistryPush());
return processMap;
}
@ -1716,6 +1756,26 @@ public class RemoteInterpreterService {
}
}
public static class angularRegistryPush<I extends Iface> extends org.apache.thrift.ProcessFunction<I, angularRegistryPush_args> {
public angularRegistryPush() {
super("angularRegistryPush");
}
public angularRegistryPush_args getEmptyArgsInstance() {
return new angularRegistryPush_args();
}
protected boolean isOneway() {
return false;
}
public angularRegistryPush_result getResult(I iface, angularRegistryPush_args args) throws org.apache.thrift.TException {
angularRegistryPush_result result = new angularRegistryPush_result();
iface.angularRegistryPush(args.registry);
return result;
}
}
}
public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
@ -1748,6 +1808,7 @@ public class RemoteInterpreterService {
processMap.put("angularObjectUpdate", new angularObjectUpdate());
processMap.put("angularObjectAdd", new angularObjectAdd());
processMap.put("angularObjectRemove", new angularObjectRemove());
processMap.put("angularRegistryPush", new angularRegistryPush());
return processMap;
}
@ -2712,6 +2773,56 @@ public class RemoteInterpreterService {
}
}
public static class angularRegistryPush<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, angularRegistryPush_args, Void> {
public angularRegistryPush() {
super("angularRegistryPush");
}
public angularRegistryPush_args getEmptyArgsInstance() {
return new angularRegistryPush_args();
}
public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
return new AsyncMethodCallback<Void>() {
public void onComplete(Void o) {
angularRegistryPush_result result = new angularRegistryPush_result();
try {
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
return;
} catch (Exception e) {
LOGGER.error("Exception writing to internal frame buffer", e);
}
fb.close();
}
public void onError(Exception e) {
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
org.apache.thrift.TBase msg;
angularRegistryPush_result result = new angularRegistryPush_result();
{
msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
}
try {
fcall.sendResponse(fb,msg,msgType,seqid);
return;
} catch (Exception ex) {
LOGGER.error("Exception writing to internal frame buffer", ex);
}
fb.close();
}
};
}
protected boolean isOneway() {
return false;
}
public void start(I iface, angularRegistryPush_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
iface.angularRegistryPush(args.registry,resultHandler);
}
}
}
public static class createInterpreter_args implements org.apache.thrift.TBase<createInterpreter_args, createInterpreter_args._Fields>, java.io.Serializable, Cloneable, Comparable<createInterpreter_args> {
@ -18355,4 +18466,613 @@ public class RemoteInterpreterService {
}
public static class angularRegistryPush_args implements org.apache.thrift.TBase<angularRegistryPush_args, angularRegistryPush_args._Fields>, java.io.Serializable, Cloneable, Comparable<angularRegistryPush_args> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("angularRegistryPush_args");
private static final org.apache.thrift.protocol.TField REGISTRY_FIELD_DESC = new org.apache.thrift.protocol.TField("registry", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
schemes.put(StandardScheme.class, new angularRegistryPush_argsStandardSchemeFactory());
schemes.put(TupleScheme.class, new angularRegistryPush_argsTupleSchemeFactory());
}
public String registry; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
REGISTRY((short)1, "registry");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}
/**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
case 1: // REGISTRY
return REGISTRY;
default:
return null;
}
}
/**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}
/**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
}
private final short _thriftId;
private final String _fieldName;
_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {
return _thriftId;
}
public String getFieldName() {
return _fieldName;
}
}
// isset id assignments
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.REGISTRY, new org.apache.thrift.meta_data.FieldMetaData("registry", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(angularRegistryPush_args.class, metaDataMap);
}
public angularRegistryPush_args() {
}
public angularRegistryPush_args(
String registry)
{
this();
this.registry = registry;
}
/**
* Performs a deep copy on <i>other</i>.
*/
public angularRegistryPush_args(angularRegistryPush_args other) {
if (other.isSetRegistry()) {
this.registry = other.registry;
}
}
public angularRegistryPush_args deepCopy() {
return new angularRegistryPush_args(this);
}
@Override
public void clear() {
this.registry = null;
}
public String getRegistry() {
return this.registry;
}
public angularRegistryPush_args setRegistry(String registry) {
this.registry = registry;
return this;
}
public void unsetRegistry() {
this.registry = null;
}
/** Returns true if field registry is set (has been assigned a value) and false otherwise */
public boolean isSetRegistry() {
return this.registry != null;
}
public void setRegistryIsSet(boolean value) {
if (!value) {
this.registry = null;
}
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case REGISTRY:
if (value == null) {
unsetRegistry();
} else {
setRegistry((String)value);
}
break;
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
case REGISTRY:
return getRegistry();
}
throw new IllegalStateException();
}
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}
switch (field) {
case REGISTRY:
return isSetRegistry();
}
throw new IllegalStateException();
}
@Override
public boolean equals(Object that) {
if (that == null)
return false;
if (that instanceof angularRegistryPush_args)
return this.equals((angularRegistryPush_args)that);
return false;
}
public boolean equals(angularRegistryPush_args that) {
if (that == null)
return false;
boolean this_present_registry = true && this.isSetRegistry();
boolean that_present_registry = true && that.isSetRegistry();
if (this_present_registry || that_present_registry) {
if (!(this_present_registry && that_present_registry))
return false;
if (!this.registry.equals(that.registry))
return false;
}
return true;
}
@Override
public int hashCode() {
List<Object> list = new ArrayList<Object>();
boolean present_registry = true && (isSetRegistry());
list.add(present_registry);
if (present_registry)
list.add(registry);
return list.hashCode();
}
@Override
public int compareTo(angularRegistryPush_args other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}
int lastComparison = 0;
lastComparison = Boolean.valueOf(isSetRegistry()).compareTo(other.isSetRegistry());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetRegistry()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.registry, other.registry);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}
public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("angularRegistryPush_args(");
boolean first = true;
sb.append("registry:");
if (this.registry == null) {
sb.append("null");
} else {
sb.append(this.registry);
}
first = false;
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private static class angularRegistryPush_argsStandardSchemeFactory implements SchemeFactory {
public angularRegistryPush_argsStandardScheme getScheme() {
return new angularRegistryPush_argsStandardScheme();
}
}
private static class angularRegistryPush_argsStandardScheme extends StandardScheme<angularRegistryPush_args> {
public void read(org.apache.thrift.protocol.TProtocol iprot, angularRegistryPush_args struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // REGISTRY
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.registry = iprot.readString();
struct.setRegistryIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
public void write(org.apache.thrift.protocol.TProtocol oprot, angularRegistryPush_args struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.registry != null) {
oprot.writeFieldBegin(REGISTRY_FIELD_DESC);
oprot.writeString(struct.registry);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
private static class angularRegistryPush_argsTupleSchemeFactory implements SchemeFactory {
public angularRegistryPush_argsTupleScheme getScheme() {
return new angularRegistryPush_argsTupleScheme();
}
}
private static class angularRegistryPush_argsTupleScheme extends TupleScheme<angularRegistryPush_args> {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, angularRegistryPush_args struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
BitSet optionals = new BitSet();
if (struct.isSetRegistry()) {
optionals.set(0);
}
oprot.writeBitSet(optionals, 1);
if (struct.isSetRegistry()) {
oprot.writeString(struct.registry);
}
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, angularRegistryPush_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
struct.registry = iprot.readString();
struct.setRegistryIsSet(true);
}
}
}
}
public static class angularRegistryPush_result implements org.apache.thrift.TBase<angularRegistryPush_result, angularRegistryPush_result._Fields>, java.io.Serializable, Cloneable, Comparable<angularRegistryPush_result> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("angularRegistryPush_result");
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
schemes.put(StandardScheme.class, new angularRegistryPush_resultStandardSchemeFactory());
schemes.put(TupleScheme.class, new angularRegistryPush_resultTupleSchemeFactory());
}
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
;
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}
/**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
default:
return null;
}
}
/**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}
/**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
}
private final short _thriftId;
private final String _fieldName;
_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {
return _thriftId;
}
public String getFieldName() {
return _fieldName;
}
}
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(angularRegistryPush_result.class, metaDataMap);
}
public angularRegistryPush_result() {
}
/**
* Performs a deep copy on <i>other</i>.
*/
public angularRegistryPush_result(angularRegistryPush_result other) {
}
public angularRegistryPush_result deepCopy() {
return new angularRegistryPush_result(this);
}
@Override
public void clear() {
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
}
throw new IllegalStateException();
}
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}
switch (field) {
}
throw new IllegalStateException();
}
@Override
public boolean equals(Object that) {
if (that == null)
return false;
if (that instanceof angularRegistryPush_result)
return this.equals((angularRegistryPush_result)that);
return false;
}
public boolean equals(angularRegistryPush_result that) {
if (that == null)
return false;
return true;
}
@Override
public int hashCode() {
List<Object> list = new ArrayList<Object>();
return list.hashCode();
}
@Override
public int compareTo(angularRegistryPush_result other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}
int lastComparison = 0;
return 0;
}
public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("angularRegistryPush_result(");
boolean first = true;
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private static class angularRegistryPush_resultStandardSchemeFactory implements SchemeFactory {
public angularRegistryPush_resultStandardScheme getScheme() {
return new angularRegistryPush_resultStandardScheme();
}
}
private static class angularRegistryPush_resultStandardScheme extends StandardScheme<angularRegistryPush_result> {
public void read(org.apache.thrift.protocol.TProtocol iprot, angularRegistryPush_result struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
public void write(org.apache.thrift.protocol.TProtocol oprot, angularRegistryPush_result struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
private static class angularRegistryPush_resultTupleSchemeFactory implements SchemeFactory {
public angularRegistryPush_resultTupleScheme getScheme() {
return new angularRegistryPush_resultTupleScheme();
}
}
private static class angularRegistryPush_resultTupleScheme extends TupleScheme<angularRegistryPush_result> {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, angularRegistryPush_result struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, angularRegistryPush_result struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
}
}
}
}

View file

@ -47,7 +47,8 @@ enum RemoteInterpreterEventType {
RESOURCE_POOL_GET_ALL = 6,
RESOURCE_GET = 7
OUTPUT_APPEND = 8,
OUTPUT_UPDATE = 9
OUTPUT_UPDATE = 9,
ANGULAR_REGISTRY_PUSH=10
}
struct RemoteInterpreterEvent {
@ -86,4 +87,5 @@ service RemoteInterpreterService {
object);
void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
void angularRegistryPush(1: string registry);
}

View file

@ -28,7 +28,10 @@ import java.util.Map;
import java.util.Properties;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
@ -42,6 +45,10 @@ import org.apache.zeppelin.scheduler.Scheduler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
public class RemoteInterpreterTest {
@ -664,4 +671,29 @@ public class RemoteInterpreterTest {
assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
}
@Test
public void should_push_local_angular_repo_to_remote() throws Exception {
//Given
final Client client = Mockito.mock(Client.class);
final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
MockInterpreterA.class.getName(), "runner", "path","localRepo", env, 10 * 1000, null);
final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
interpreterGroup.setAngularObjectRegistry(registry);
intr.setInterpreterGroup(interpreterGroup);
final java.lang.reflect.Type registryType = new TypeToken<Map<String,
Map<String, AngularObject>>>() {}.getType();
final Gson gson = new Gson();
final String expected = gson.toJson(registry.getRegistry(), registryType);
//When
intr.pushAngularObjectRegistryToRemote(client);
//Then
Mockito.verify(client).angularRegistryPush(expected);
}
}

View file

@ -191,6 +191,11 @@
</exclusions>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>

View file

@ -103,6 +103,8 @@ public class Message {
ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated,
ANGULAR_OBJECT_CLIENT_BIND, // [c-s] angular object updated from AngularJS z object
LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations
CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
// @param settings serialized Map<String, String> object
@ -131,4 +133,17 @@ public class Message {
public Object get(String k) {
return data.get(k);
}
public <T> T getType(String key) {
return (T) data.get(key);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Message{");
sb.append("data=").append(data);
sb.append(", op=").append(op);
sb.append('}');
return sb.toString();
}
}

View file

@ -16,6 +16,15 @@
*/
package org.apache.zeppelin.socket;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
@ -25,6 +34,8 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
@ -43,13 +54,6 @@ import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Zeppelin websocket service.
*
@ -99,6 +103,11 @@ public class NotebookServer extends WebSocketServlet implements
LOG.debug("RECEIVE PRINCIPAL << " + messagereceived.principal);
LOG.debug("RECEIVE TICKET << " + messagereceived.ticket);
LOG.debug("RECEIVE ROLES << " + messagereceived.roles);
if (LOG.isTraceEnabled()) {
LOG.trace("RECEIVE MSG = " + messagereceived);
}
String ticket = TicketContainer.instance.getTicket(messagereceived.principal);
if (ticket != null && !ticket.equals(messagereceived.ticket))
throw new Exception("Invalid ticket " + messagereceived.ticket + " != " + ticket);
@ -178,6 +187,9 @@ public class NotebookServer extends WebSocketServlet implements
case ANGULAR_OBJECT_UPDATED:
angularObjectUpdated(conn, userAndRoles, notebook, messagereceived);
break;
case ANGULAR_OBJECT_CLIENT_BIND:
angularObjectClientBind(conn, userAndRoles, notebook, messagereceived);
break;
case LIST_CONFIGURATIONS:
sendAllConfigurations(conn, userAndRoles, notebook);
break;
@ -205,7 +217,7 @@ public class NotebookServer extends WebSocketServlet implements
return gson.fromJson(msg, Message.class);
}
private String serializeMessage(Message m) {
protected String serializeMessage(Message m) {
return gson.toJson(m);
}
@ -716,6 +728,91 @@ public class NotebookServer extends WebSocketServlet implements
}
}
/**
* Push the given Angular variable to the target
* interpreter angular registry given a noteId
* and a paragraph id
* @param conn
* @param notebook
* @param fromMessage
* @throws Exception
*/
protected void angularObjectClientBind(NotebookSocket conn, HashSet<String> userAndRoles,
Notebook notebook, Message fromMessage)
throws Exception {
String noteId = fromMessage.getType("noteId");
String varName = fromMessage.getType("name");
Object varValue = fromMessage.get("value");
String paragraphId = fromMessage.getType("paragraphId");
Note note = notebook.getNote(noteId);
if (paragraphId == null) {
throw new IllegalArgumentException("target paragraph not specified for " +
"angular value bind");
}
if (note != null) {
final InterpreterGroup interpreterGroup = findInterpreterGroupForParagraph(note,
paragraphId);
final AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
RemoteAngularObjectRegistry remoteRegistry = (RemoteAngularObjectRegistry) registry;
pushAngularObjectToRemoteRegistry(noteId, paragraphId, varName, varValue, remoteRegistry,
interpreterGroup.getId(), conn);
} else {
pushAngularObjectToLocalRepo(noteId, paragraphId, varName, varValue, registry,
interpreterGroup.getId(), conn);
}
}
}
private InterpreterGroup findInterpreterGroupForParagraph(Note note, String paragraphId)
throws Exception {
final Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph == null) {
throw new IllegalArgumentException("Unknown paragraph with id : " + paragraphId);
}
return paragraph.getCurrentRepl().getInterpreterGroup();
}
private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId,
String varName, Object varValue, RemoteAngularObjectRegistry remoteRegistry,
String interpreterGroupId, NotebookSocket conn) {
final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue,
noteId, paragraphId);
this.broadcastExcept(
noteId,
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", noteId)
.put("paragraphId", paragraphId),
conn);
}
private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName,
Object varValue, AngularObjectRegistry registry,
String interpreterGroupId, NotebookSocket conn) {
AngularObject angularObject = registry.get(varName, noteId, paragraphId);
if (angularObject == null) {
angularObject = registry.add(varName, varValue, noteId, paragraphId);
} else {
angularObject.set(varValue, true);
}
this.broadcastExcept(
noteId,
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", noteId)
.put("paragraphId", paragraphId),
conn);
}
private void moveParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");

View file

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.display;
public class AngularObjectBuilder {
public static <T> AngularObject<T> build(String varName, T value, String noteId,
String paragraphId) {
return new AngularObject<>(varName, value, noteId, paragraphId, null);
}
}

View file

@ -20,8 +20,13 @@
package org.apache.zeppelin.socket;
import com.google.gson.Gson;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectBuilder;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
@ -36,8 +41,10 @@ import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.List;
import static java.util.Arrays.asList;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@ -157,6 +164,104 @@ public class NotebookServerTest extends AbstractTestRestApi {
notebook.removeNote(note.getId());
}
@Test
public void should_bind_angular_object_to_remote_for_paragraphs() throws Exception {
//Given
final String varName = "name";
final String value = "DuyHai DOAN";
final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_BIND)
.put("noteId", "noteId")
.put("name", varName)
.put("value", value)
.put("paragraphId", "paragraphId");
final NotebookServer server = new NotebookServer();
final Notebook notebook = mock(Notebook.class);
final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
when(notebook.getNote("noteId")).thenReturn(note);
final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
when(note.getParagraph("paragraphId")).thenReturn(paragraph);
final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class);
final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
mdGroup.setAngularObjectRegistry(mdRegistry);
when(paragraph.getCurrentRepl().getInterpreterGroup()).thenReturn(mdGroup);
final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraphId");
when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraphId")).thenReturn(ao1);
NotebookSocket conn = mock(NotebookSocket.class);
NotebookSocket otherConn = mock(NotebookSocket.class);
final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", ao1)
.put("interpreterGroupId", "mdGroup")
.put("noteId", "noteId")
.put("paragraphId", "paragraphId"));
server.noteSocketMap.put("noteId", asList(conn, otherConn));
// When
server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived);
// Then
verify(mdRegistry, never()).addAndNotifyRemoteProcess(varName, value, "noteId", null);
verify(otherConn).send(mdMsg1);
}
@Test
public void should_bind_angular_object_to_local_for_paragraphs() throws Exception {
//Given
final String varName = "name";
final String value = "DuyHai DOAN";
final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_BIND)
.put("noteId", "noteId")
.put("name", varName)
.put("value", value)
.put("paragraphId", "paragraphId");
final NotebookServer server = new NotebookServer();
final Notebook notebook = mock(Notebook.class);
final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
when(notebook.getNote("noteId")).thenReturn(note);
final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
when(note.getParagraph("paragraphId")).thenReturn(paragraph);
final AngularObjectRegistry mdRegistry = mock(AngularObjectRegistry.class);
final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
mdGroup.setAngularObjectRegistry(mdRegistry);
when(paragraph.getCurrentRepl().getInterpreterGroup()).thenReturn(mdGroup);
final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraphId");
when(mdRegistry.add(varName, value, "noteId", "paragraphId")).thenReturn(ao1);
NotebookSocket conn = mock(NotebookSocket.class);
NotebookSocket otherConn = mock(NotebookSocket.class);
final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", ao1)
.put("interpreterGroupId", "mdGroup")
.put("noteId", "noteId")
.put("paragraphId", "paragraphId"));
server.noteSocketMap.put("noteId", asList(conn, otherConn));
// When
server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived);
// Then
verify(otherConn).send(mdMsg1);
}
private NotebookSocket createWebSocket() {
NotebookSocket sock = mock(NotebookSocket.class);
when(sock.getRequest()).thenReturn(createHttpServletRequest());

View file

@ -158,7 +158,7 @@ limitations under the License.
<span style="position:relative; top:2px; margin-right:4px; cursor:pointer;"
ng-click="togglePermissions()"
tooltip-placement="bottom" tooltip="Note permissions">
<i class="fa fa-lock" ng-style="{color: showSetting ? '#3071A9' : 'black' }"></i>
<i class="fa fa-lock" ng-style="{color: showPermissions ? '#3071A9' : 'black' }"></i>
</span>
<span class="btn-group">

View file

@ -14,30 +14,6 @@ limitations under the License.
<!-- Here the controller <NotebookCtrl> is not needed because explicitly set in the app.js (route) -->
<div ng-include src="'app/notebook/notebook-actionBar.html'"></div>
<div style="padding-top: 36px;">
<!-- permissions -->
<div ng-if="showPermissions" class="permissions">
<div>
<h4>Note Permissions (Only note owners can change)</h4>
</div>
<hr />
<div>
<p>
Enter comma separated users and groups in the fields. <br />
Empty field (*) implies anyone can do the operation.
</p>
<div class="permissionsForm"
data-ng-model="permissions">
<p>Owners : <input ng-list ng-model="permissions.owners" placeholder="*"> Owners can change permissions, read and write the note. </p>
<p>Readers : <input ng-list ng-model="permissions.readers" placeholder="*"> Readers can only read the note.</p>
<p>Writers : <input ng-list ng-model="permissions.writers" placeholder="*"> Writers can read and write the note.</p>
</div>
</div>
<br />
<div>
<button class="btn btn-primary" ng-click="savePermissions()">Save</button>
<button class="btn btn-default" ng-click="closePermissions()">Cancel</button>
</div>
</div>
<!-- settings -->
<div ng-if="showSetting" class="setting">
<div>
@ -81,6 +57,31 @@ limitations under the License.
</div>
</div>
<!-- permissions -->
<div ng-if="showPermissions" class="permissions">
<div>
<h4>Note Permissions (Only note owners can change)</h4>
</div>
<hr />
<div>
<p>
Enter comma separated users and groups in the fields. <br />
Empty field (*) implies anyone can do the operation.
</p>
<div class="permissionsForm"
data-ng-model="permissions">
<p>Owners : <input ng-list ng-model="permissions.owners" placeholder="*"> Owners can change permissions, read and write the note. </p>
<p>Readers : <input ng-list ng-model="permissions.readers" placeholder="*"> Readers can only read the note.</p>
<p>Writers : <input ng-list ng-model="permissions.writers" placeholder="*"> Writers can read and write the note.</p>
</div>
</div>
<br />
<div>
<button class="btn btn-primary" ng-click="savePermissions()">Save</button>
<button class="btn btn-default" ng-click="closePermissions()">Cancel</button>
</div>
</div>
<div class="note-jump"></div>
<!-- Include the paragraphs according to the note -->

View file

@ -23,11 +23,22 @@ angular.module('zeppelinWebApp')
$scope.editor = null;
var paragraphScope = $rootScope.$new(true, $rootScope);
// to keep backward compatibility
$scope.compiledScope = paragraphScope;
var angularObjectRegistry = {};
paragraphScope.z = {
// Example: z.angularBind('my_var', 'Test Value', '20150213-231621_168813393')
angularBind: function(varName, value, paragraphId) {
// Only push to server if there paragraphId is defined
if (paragraphId) {
websocketMsgSrv.clientBindAngularObject($routeParams.noteId, varName, value, paragraphId);
}
}
};
var angularObjectRegistry = {};
var editorModes = {
'ace/mode/scala': /^%spark/,

View file

@ -70,6 +70,18 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope,
});
},
clientBindAngularObject: function(noteId, name, value, paragraphId) {
websocketEvents.sendNewEvent({
op: 'ANGULAR_OBJECT_CLIENT_BIND',
data: {
noteId: noteId,
name: name,
value: value,
paragraphId: paragraphId
}
});
},
cancelParagraphRun: function(paragraphId) {
websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}});
},

View file

@ -35,6 +35,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
/**
* Paragraph is a representation of an execution unit.
*
@ -52,6 +54,13 @@ public class Paragraph extends Job implements Serializable, Cloneable {
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
public final GUI settings; // form and parameter settings
@VisibleForTesting
Paragraph() {
super(generateId(), null);
config = new HashMap<>();
settings = new GUI();
}
public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoader) {
super(generateId(), listener);
this.note = note;
@ -163,6 +172,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
return replLoader.get(name);
}
public Interpreter getCurrentRepl() {
return getRepl(getRequiredReplName());
}
public List<String> completion(String buffer, int cursor) {
String replName = getRequiredReplName(buffer);
if (replName != null) {