ZEPPELIN-25 impelemnet JS(angular) -JVM(scala) two-way binding

This commit is contained in:
Lee moon soo 2015-04-04 17:29:57 +09:00
parent bb52d7bf39
commit 67f6926c1f
34 changed files with 2915 additions and 37 deletions

View file

@ -251,4 +251,22 @@ public class ZeppelinContext extends HashMap<String, Object> {
this.interpreterContext = interpreterContext;
}
/*
public void angularBind(String name, sObject o) {
}
public void angularBind(String name, sObject o, sWatcher w) {
}
public void angularBind(String name, Function f) {
}
public void angularUnbind(sString name) {
}
*/
}

View file

@ -23,6 +23,7 @@ import java.io.File;
import java.util.HashMap;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterGroup;
@ -57,7 +58,8 @@ public class DepInterpreterTest {
intpGroup.add(dep);
dep.setInterpreterGroup(intpGroup);
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI());
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null));
}
@After

View file

@ -24,8 +24,10 @@ import java.io.File;
import java.util.HashMap;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.spark.SparkInterpreter;
@ -55,7 +57,9 @@ public class SparkInterpreterTest {
repl.open();
}
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI());
InterpreterGroup intpGroup = new InterpreterGroup();
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null));
}
@After

View file

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterGroup;
@ -38,6 +39,7 @@ public class SparkSqlInterpreterTest {
private SparkSqlInterpreter sql;
private SparkInterpreter repl;
private InterpreterContext context;
private InterpreterGroup intpGroup;
@Before
public void setUp() throws Exception {
@ -55,13 +57,14 @@ public class SparkSqlInterpreterTest {
sql = new SparkSqlInterpreter(p);
InterpreterGroup intpGroup = new InterpreterGroup();
intpGroup = new InterpreterGroup();
intpGroup.add(repl);
intpGroup.add(sql);
sql.setInterpreterGroup(intpGroup);
sql.open();
}
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI());
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null));
}
@After

View file

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.display;
import java.util.Map;
/**
*
*
* @param <T>
*/
public class AngularObject<T> {
/**
*
*/
public static enum AngularObjectType {
STRING,
MAP,
FUNCTION
};
private String name;
private T object;
private transient AngularObjectListener listener;
private AngularObjectType type;
protected AngularObject(String name, T o,
AngularObjectListener listener) {
this.name = name;
this.listener = listener;
object = o;
type = checkType();
}
public AngularObjectType getType() {
return type;
}
public AngularObjectType checkType() {
if (object == null) {
return AngularObjectType.STRING;
} else if (object instanceof Map) {
return AngularObjectType.MAP;
} else if (object instanceof String) {
return AngularObjectType.STRING;
}
return AngularObjectType.STRING;
}
public String getName() {
return name;
}
@Override
public boolean equals(Object o) {
if (o instanceof AngularObject) {
return name.equals(((AngularObject) o).name);
} else {
return false;
}
}
public Object get() {
return object;
}
public void emit(){
if (listener != null) {
listener.updated(this);
}
}
public void set(T o) {
set(o, true);
}
public void set(T o, boolean emit) {
object = o;
if (emit) {
emit();
}
}
public void setListener(AngularObjectListener listener) {
this.listener = listener;
}
public AngularObjectListener getListener() {
return listener;
}
}

View file

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.display;
/**
*
*/
public interface AngularObjectListener {
public void updated(AngularObject updatedObject);
}

View file

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.display;
import java.util.HashMap;
import java.util.Map;
/**
*
*
*/
public class AngularObjectRegistry implements AngularObjectListener {
Map<String, AngularObject> registry = new HashMap<String, AngularObject>();
private AngularObjectRegistryListener listener;
private String interpreterId;
public AngularObjectRegistry(String interpreterId,
AngularObjectRegistryListener listener) {
this.interpreterId = interpreterId;
this.listener = listener;
}
public AngularObjectRegistryListener getListener() {
return listener;
}
public AngularObject add(String name, Object o) {
AngularObject ao = createNewAngularObject(name, o);
synchronized (registry) {
registry.put(name, ao);
if (listener != null) {
listener.onAdd(interpreterId, ao);
}
}
return ao;
}
protected AngularObject createNewAngularObject(String name, Object o) {
return new AngularObject(name, o, this);
}
public AngularObject remove(String name) {
synchronized (registry) {
AngularObject o = registry.remove(name);
if (listener != null) {
listener.onRemove(interpreterId, o);;
}
return o;
}
}
public AngularObject get(String name) {
synchronized (registry) {
return registry.get(name);
}
}
@Override
public void updated(AngularObject updatedObject) {
if (listener != null) {
listener.onUpdate(interpreterId, updatedObject);
}
}
public String getInterpreterGroupId() {
return interpreterId;
}
}

View file

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.display;
/**
*
*
*/
public interface AngularObjectRegistryListener {
public void onAdd(String interpreterGroupId, AngularObject object);
public void onUpdate(String interpreterGroupId, AngularObject object);
public void onRemove(String interpreterGroupId, AngularObject object);
}

View file

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.interpreter.remote;
import com.nflabs.zeppelin.display.AngularObject;
import com.nflabs.zeppelin.display.AngularObjectListener;
/**
*
*/
public class RemoteAngularObject extends AngularObject {
private transient RemoteInterpreterProcess remoteInterpreterProcess;
RemoteAngularObject(String name, Object o, String interpreterGroupId,
AngularObjectListener listener,
RemoteInterpreterProcess remoteInterpreterProcess) {
super(name, o, listener);
this.remoteInterpreterProcess = remoteInterpreterProcess;
}
@Override
public void set(Object o, boolean emit) {
super.set(o, emit);
// send updated value to remote interpreter
remoteInterpreterProcess.updateRemoteAngularObject(getName(), o);
}
}

View file

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.interpreter.remote;
import com.nflabs.zeppelin.display.AngularObject;
import com.nflabs.zeppelin.display.AngularObjectRegistry;
import com.nflabs.zeppelin.display.AngularObjectRegistryListener;
import com.nflabs.zeppelin.interpreter.Interpreter;
import com.nflabs.zeppelin.interpreter.InterpreterGroup;
import com.nflabs.zeppelin.interpreter.WrappedInterpreter;
/**
*
*/
public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
private InterpreterGroup interpreterGroup;
public RemoteAngularObjectRegistry(String interpreterId,
AngularObjectRegistryListener listener,
InterpreterGroup interpreterGroup) {
super(interpreterId, listener);
this.interpreterGroup = interpreterGroup;
}
private RemoteInterpreterProcess getRemoteInterpreterProcess() {
if (interpreterGroup.size() == 0) {
throw new RuntimeException("Can't get remoteInterpreterProcess");
}
Interpreter p = interpreterGroup.get(0);
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
if (p instanceof RemoteInterpreter) {
return ((RemoteInterpreter) p).getInterpreterProcess();
} else {
throw new RuntimeException("Can't get remoteInterpreterProcess");
}
}
@Override
protected AngularObject createNewAngularObject(String name, Object o) {
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
throw new RuntimeException("Remote Interpreter process not found");
}
return new RemoteAngularObject(name, o, getInterpreterGroupId(), this,
getRemoteInterpreterProcess());
}
}

View file

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.interpreter.remote;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.nflabs.zeppelin.display.AngularObject;
import com.nflabs.zeppelin.display.AngularObjectRegistry;
import com.nflabs.zeppelin.interpreter.InterpreterGroup;
import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
/**
*
*/
public class RemoteInterpreterEventPoller extends Thread {
Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
private RemoteInterpreterProcess interpreterProcess;
boolean shutdown;
private InterpreterGroup interpreterGroup;
public RemoteInterpreterEventPoller(
InterpreterGroup interpreterGroup,
RemoteInterpreterProcess interpreterProcess) {
this.interpreterGroup = interpreterGroup;
this.interpreterProcess = interpreterProcess;
shutdown = false;
}
@Override
public void run() {
Client client = null;
while (shutdown == false) {
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
logger.error("Can't get RemoteInterpreterEvent", e1);
try {
synchronized (this) {
wait(1000);
}
} catch (InterruptedException e) {
}
continue;
}
RemoteInterpreterEvent event = null;
try {
event = client.getEvent();
} catch (TException e) {
logger.error("Can't get RemoteInterpreterEvent", e);
try {
synchronized (this) {
wait(1000);
}
} catch (InterruptedException e1) {
}
continue;
}
interpreterProcess.releaseClient(client);
Gson gson = new Gson();
AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
try {
if (event.getType() == RemoteInterpreterEventType.NO_OP) {
continue;
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
angularObjectRegistry.add(angularObject.getName(), angularObject.get());
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
AngularObject localAngularObject = angularObjectRegistry.get(angularObject.getName());
localAngularObject.set(angularObject.get());
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
angularObjectRegistry.remove(angularObject.getName());
}
} catch (Exception e) {
logger.error("Can't handle event " + event, e);
}
}
}
public void shutdown() {
shutdown = true;
synchronized (this) {
notify();
}
}
}

View file

@ -0,0 +1,502 @@
/**
* Autogenerated by Thrift Compiler (0.9.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package com.nflabs.zeppelin.interpreter.thrift;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.SchemeFactory;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.scheme.TupleScheme;
import org.apache.thrift.protocol.TTupleProtocol;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.EncodingUtils;
import org.apache.thrift.TException;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.EnumMap;
import java.util.Set;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.Collections;
import java.util.BitSet;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)1);
private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
schemes.put(StandardScheme.class, new RemoteInterpreterEventStandardSchemeFactory());
schemes.put(TupleScheme.class, new RemoteInterpreterEventTupleSchemeFactory());
}
/**
*
* @see RemoteInterpreterEventType
*/
public RemoteInterpreterEventType type; // required
public String data; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
/**
*
* @see RemoteInterpreterEventType
*/
TYPE((short)1, "type"),
DATA((short)2, "data");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}
/**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
case 1: // TYPE
return TYPE;
case 2: // DATA
return DATA;
default:
return null;
}
}
/**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}
/**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
}
private final short _thriftId;
private final String _fieldName;
_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {
return _thriftId;
}
public String getFieldName() {
return _fieldName;
}
}
// isset id assignments
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, RemoteInterpreterEventType.class)));
tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteInterpreterEvent.class, metaDataMap);
}
public RemoteInterpreterEvent() {
}
public RemoteInterpreterEvent(
RemoteInterpreterEventType type,
String data)
{
this();
this.type = type;
this.data = data;
}
/**
* Performs a deep copy on <i>other</i>.
*/
public RemoteInterpreterEvent(RemoteInterpreterEvent other) {
if (other.isSetType()) {
this.type = other.type;
}
if (other.isSetData()) {
this.data = other.data;
}
}
public RemoteInterpreterEvent deepCopy() {
return new RemoteInterpreterEvent(this);
}
@Override
public void clear() {
this.type = null;
this.data = null;
}
/**
*
* @see RemoteInterpreterEventType
*/
public RemoteInterpreterEventType getType() {
return this.type;
}
/**
*
* @see RemoteInterpreterEventType
*/
public RemoteInterpreterEvent setType(RemoteInterpreterEventType type) {
this.type = type;
return this;
}
public void unsetType() {
this.type = null;
}
/** Returns true if field type is set (has been assigned a value) and false otherwise */
public boolean isSetType() {
return this.type != null;
}
public void setTypeIsSet(boolean value) {
if (!value) {
this.type = null;
}
}
public String getData() {
return this.data;
}
public RemoteInterpreterEvent setData(String data) {
this.data = data;
return this;
}
public void unsetData() {
this.data = null;
}
/** Returns true if field data is set (has been assigned a value) and false otherwise */
public boolean isSetData() {
return this.data != null;
}
public void setDataIsSet(boolean value) {
if (!value) {
this.data = null;
}
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case TYPE:
if (value == null) {
unsetType();
} else {
setType((RemoteInterpreterEventType)value);
}
break;
case DATA:
if (value == null) {
unsetData();
} else {
setData((String)value);
}
break;
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
case TYPE:
return getType();
case DATA:
return getData();
}
throw new IllegalStateException();
}
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}
switch (field) {
case TYPE:
return isSetType();
case DATA:
return isSetData();
}
throw new IllegalStateException();
}
@Override
public boolean equals(Object that) {
if (that == null)
return false;
if (that instanceof RemoteInterpreterEvent)
return this.equals((RemoteInterpreterEvent)that);
return false;
}
public boolean equals(RemoteInterpreterEvent that) {
if (that == null)
return false;
boolean this_present_type = true && this.isSetType();
boolean that_present_type = true && that.isSetType();
if (this_present_type || that_present_type) {
if (!(this_present_type && that_present_type))
return false;
if (!this.type.equals(that.type))
return false;
}
boolean this_present_data = true && this.isSetData();
boolean that_present_data = true && that.isSetData();
if (this_present_data || that_present_data) {
if (!(this_present_data && that_present_data))
return false;
if (!this.data.equals(that.data))
return false;
}
return true;
}
@Override
public int hashCode() {
return 0;
}
public int compareTo(RemoteInterpreterEvent other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}
int lastComparison = 0;
RemoteInterpreterEvent typedOther = (RemoteInterpreterEvent)other;
lastComparison = Boolean.valueOf(isSetType()).compareTo(typedOther.isSetType());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetType()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, typedOther.type);
if (lastComparison != 0) {
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetData()).compareTo(typedOther.isSetData());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetData()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.data, typedOther.data);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}
public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("RemoteInterpreterEvent(");
boolean first = true;
sb.append("type:");
if (this.type == null) {
sb.append("null");
} else {
sb.append(this.type);
}
first = false;
if (!first) sb.append(", ");
sb.append("data:");
if (this.data == null) {
sb.append("null");
} else {
sb.append(this.data);
}
first = false;
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private static class RemoteInterpreterEventStandardSchemeFactory implements SchemeFactory {
public RemoteInterpreterEventStandardScheme getScheme() {
return new RemoteInterpreterEventStandardScheme();
}
}
private static class RemoteInterpreterEventStandardScheme extends StandardScheme<RemoteInterpreterEvent> {
public void read(org.apache.thrift.protocol.TProtocol iprot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // TYPE
if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
struct.type = RemoteInterpreterEventType.findByValue(iprot.readI32());
struct.setTypeIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 2: // DATA
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.data = iprot.readString();
struct.setDataIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
public void write(org.apache.thrift.protocol.TProtocol oprot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.type != null) {
oprot.writeFieldBegin(TYPE_FIELD_DESC);
oprot.writeI32(struct.type.getValue());
oprot.writeFieldEnd();
}
if (struct.data != null) {
oprot.writeFieldBegin(DATA_FIELD_DESC);
oprot.writeString(struct.data);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
private static class RemoteInterpreterEventTupleSchemeFactory implements SchemeFactory {
public RemoteInterpreterEventTupleScheme getScheme() {
return new RemoteInterpreterEventTupleScheme();
}
}
private static class RemoteInterpreterEventTupleScheme extends TupleScheme<RemoteInterpreterEvent> {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
BitSet optionals = new BitSet();
if (struct.isSetType()) {
optionals.set(0);
}
if (struct.isSetData()) {
optionals.set(1);
}
oprot.writeBitSet(optionals, 2);
if (struct.isSetType()) {
oprot.writeI32(struct.type.getValue());
}
if (struct.isSetData()) {
oprot.writeString(struct.data);
}
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
struct.type = RemoteInterpreterEventType.findByValue(iprot.readI32());
struct.setTypeIsSet(true);
}
if (incoming.get(1)) {
struct.data = iprot.readString();
struct.setDataIsSet(true);
}
}
}
}

View file

@ -0,0 +1,51 @@
/**
* Autogenerated by Thrift Compiler (0.9.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package com.nflabs.zeppelin.interpreter.thrift;
import java.util.Map;
import java.util.HashMap;
import org.apache.thrift.TEnum;
public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
NO_OP(1),
ANGULAR_OBJECT_ADD(2),
ANGULAR_OBJECT_UPDATE(3),
ANGULAR_OBJECT_REMOVE(4);
private final int value;
private RemoteInterpreterEventType(int value) {
this.value = value;
}
/**
* Get the integer value of this enum value, as defined in the Thrift IDL.
*/
public int getValue() {
return value;
}
/**
* Find a the enum type by its integer value, as defined in the Thrift IDL.
* @return null if the value is not found.
*/
public static RemoteInterpreterEventType findByValue(int value) {
switch (value) {
case 1:
return NO_OP;
case 2:
return ANGULAR_OBJECT_ADD;
case 3:
return ANGULAR_OBJECT_UPDATE;
case 4:
return ANGULAR_OBJECT_REMOVE;
default:
return null;
}
}
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter;
import java.util.Map;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
/**
@ -30,19 +31,22 @@ public class InterpreterContext {
private final String paragraphText;
private final Map<String, Object> config;
private GUI gui;
private AngularObjectRegistry angularObjectRegistry;
public InterpreterContext(String paragraphId,
String paragraphTitle,
String paragraphText,
Map<String, Object> config,
GUI gui
GUI gui,
AngularObjectRegistry angularObjectRegistry
) {
this.paragraphId = paragraphId;
this.paragraphTitle = paragraphTitle;
this.paragraphText = paragraphText;
this.config = config;
this.gui = gui;
this.angularObjectRegistry = angularObjectRegistry;
}
public String getParagraphId() {
@ -65,4 +69,8 @@ public class InterpreterContext {
return gui;
}
public AngularObjectRegistry getAngularObjectRegistry() {
return angularObjectRegistry;
}
}

View file

@ -21,6 +21,8 @@ import java.util.LinkedList;
import java.util.Properties;
import java.util.Random;
import com.nflabs.zeppelin.display.AngularObjectRegistry;
/**
* InterpreterGroup is list of interpreters in the same group.
* And unit of interpreter instantiate, restart, bind, unbind.
@ -28,6 +30,12 @@ import java.util.Random;
public class InterpreterGroup extends LinkedList<Interpreter>{
String id;
AngularObjectRegistry angularObjectRegistry;
public InterpreterGroup() {
this.id = getId();
}
private static String generateId() {
return "InterpreterGroup_" + System.currentTimeMillis() + "_"
+ new Random().nextInt();
@ -51,6 +59,14 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
return p;
}
public AngularObjectRegistry getAngularObjectRegistry() {
return angularObjectRegistry;
}
public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
this.angularObjectRegistry = angularObjectRegistry;
}
public void close() {
for (Interpreter intp : this) {
intp.close();

View file

@ -119,7 +119,7 @@ public class RemoteInterpreter extends Interpreter {
}
}
int rc = interpreterProcess.reference();
int rc = interpreterProcess.reference(getInterpreterGroup());
synchronized (interpreterProcess) {
// when first process created

View file

@ -29,10 +29,14 @@ import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
/**
*
*/
@ -48,6 +52,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
private GenericObjectPool<Client> clientPool;
private Map<String, String> env;
private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
public RemoteInterpreterProcess(String intpRunner, String intpDir, Map<String, String> env) {
this.interpreterRunner = intpRunner;
@ -60,7 +65,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
return port;
}
public int reference() {
public int reference(InterpreterGroup interpreterGroup) {
synchronized (referenceCount) {
if (executor == null) {
// start server process
@ -108,6 +113,9 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
}
clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port));
remoteInterpreterEventPoller = new RemoteInterpreterEventPoller(interpreterGroup, this);
remoteInterpreterEventPoller.start();
}
return referenceCount.incrementAndGet();
}
@ -126,6 +134,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
int r = referenceCount.decrementAndGet();
if (r == 0) {
logger.info("shutdown interpreter process");
remoteInterpreterEventPoller.shutdown();
// first try shutdown
try {
Client client = getClient();
@ -205,4 +215,28 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
return clientPool.getNumIdle();
}
}
/**
* Called when angular object is updated in client side to propagate
* change to the remote process
* @param name
* @param o
*/
public void updateRemoteAngularObject(String name, Object o) {
Client client = null;
try {
client = getClient();
} catch (Exception e) {
logger.error("Can't update angular object", e);
}
try {
Gson gson = new Gson();
client.angularObjectUpdate(name, gson.toJson(o));
} catch (TException e) {
logger.error("Can't update angular object", e);
} finally {
releaseClient(client);
}
}
}

View file

@ -21,6 +21,7 @@ package org.apache.zeppelin.interpreter.remote;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -29,6 +30,10 @@ import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObject.AngularObjectType;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
import org.apache.zeppelin.interpreter.Interpreter;
@ -39,6 +44,8 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.scheduler.Job;
@ -52,16 +59,17 @@ import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
*
*/
public class RemoteInterpreterServer
extends Thread
implements RemoteInterpreterService.Iface {
implements RemoteInterpreterService.Iface, AngularObjectRegistryListener {
Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class);
InterpreterGroup interpreterGroup = new InterpreterGroup();
InterpreterGroup interpreterGroup;
AngularObjectRegistry angularObjectRegistry;
Gson gson = new Gson();
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
@ -69,8 +77,14 @@ public class RemoteInterpreterServer
private int port;
private TThreadPoolServer server;
List<RemoteInterpreterEvent> eventQueue = new LinkedList<RemoteInterpreterEvent>();
public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
interpreterGroup = new InterpreterGroup();
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
processor = new RemoteInterpreterService.Processor<RemoteInterpreterServer>(this);
TServerSocket serverTransport = new TServerSocket(port);
server = new TThreadPoolServer(
@ -306,7 +320,8 @@ public class RemoteInterpreterServer
ric.getParagraphText(),
(Map<String, Object>) gson.fromJson(ric.getConfig(),
new TypeToken<Map<String, Object>>() {}.getType()),
gson.fromJson(ric.getGui(), GUI.class));
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry());
}
private RemoteInterpreterResult convert(InterpreterResult result,
@ -339,4 +354,79 @@ public class RemoteInterpreterServer
}
return "Unknown";
}
@Override
public void onAdd(String interpreterGroupId, AngularObject object) {
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.ANGULAR_OBJECT_ADD, gson.toJson(object)));
}
@Override
public void onUpdate(String interpreterGroupId, AngularObject object) {
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE, gson.toJson(object)));
}
@Override
public void onRemove(String interpreterGroupId, AngularObject object) {
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(object)));
}
private void sendEvent(RemoteInterpreterEvent event) {
synchronized (eventQueue) {
eventQueue.add(event);
eventQueue.notifyAll();
}
}
@Override
public RemoteInterpreterEvent getEvent() throws TException {
synchronized (eventQueue) {
if (eventQueue.isEmpty()) {
try {
eventQueue.wait(1000);
} catch (InterruptedException e) {
}
}
if (eventQueue.isEmpty()) {
return new RemoteInterpreterEvent(RemoteInterpreterEventType.NO_OP, "");
} else {
return eventQueue.remove(0);
}
}
}
/**
* called when object is updated in client (web) side.
* @param className
* @param name
* @param object
* @throws TException
*/
@Override
public void angularObjectUpdate(String name, String object)
throws TException {
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
AngularObject ao = registry.get(name);
if (ao == null) {
logger.error("Angular object {} not exists", name);
return;
}
if (ao.getType() == AngularObjectType.STRING) {
String value = gson.fromJson(object, String.class);
ao.set(value, false);
} else if (ao.getType() == AngularObjectType.MAP) {
Map<String, Object> value = gson.fromJson(object,
new TypeToken<Map<String, Object>>() {
}.getType());
ao.set(value, false);
} else {
logger.error("Update angular object type {} not supported", ao.getType());
}
}
}

View file

@ -35,6 +35,18 @@ struct RemoteInterpreterResult {
5: string gui // json serialized gui
}
enum RemoteInterpreterEventType {
NO_OP = 1,
ANGULAR_OBJECT_ADD = 2,
ANGULAR_OBJECT_UPDATE = 3,
ANGULAR_OBJECT_REMOVE = 4
}
struct RemoteInterpreterEvent {
1: RemoteInterpreterEventType type,
2: string data // json serialized data
}
service RemoteInterpreterService {
void createInterpreter(1: string className, 2: map<string, string> properties);
@ -48,4 +60,7 @@ service RemoteInterpreterService {
void shutdown();
string getStatus(1:string jobId);
RemoteInterpreterEvent getEvent();
void angularObjectUpdate(1: string name, 2: string object);
}

View file

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.display;
import org.junit.Test;
public class AngularObjectTest {
@Test
public void test() {
}
}

View file

@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import java.util.HashMap;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.junit.Test;
@ -30,11 +31,12 @@ public class RemoteInterpreterProcessTest {
@Test
public void testStartStop() {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
assertEquals(1, rip.reference());
assertEquals(2, rip.reference());
assertEquals(1, rip.reference(intpGroup));
assertEquals(2, rip.reference(intpGroup));
assertEquals(true, rip.isRunning());
assertEquals(1, rip.dereference());
assertEquals(true, rip.isRunning());
@ -44,8 +46,9 @@ public class RemoteInterpreterProcessTest {
@Test
public void testClientFactory() throws Exception {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
rip.reference();
rip.reference(intpGroup);
assertEquals(0, rip.getNumActiveClient());
assertEquals(0, rip.getNumIdleClient());

View file

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterGroup;
@ -109,7 +110,8 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
new GUI()));
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
intpB.open();
assertEquals(2, process.referenceCount());
@ -159,7 +161,8 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
new GUI()));
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
assertEquals("500", ret.message());
ret = intpB.interpret("500",
@ -168,7 +171,8 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
new GUI()));
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
assertEquals("1000", ret.message());
long end = System.currentTimeMillis();
assertTrue(end - start >= 1000);
@ -231,7 +235,8 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
new GUI()));
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
}
@Override
@ -262,7 +267,8 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
new GUI()));
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
}
@Override
@ -333,7 +339,8 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
new GUI()));
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
synchronized (results) {
results.add(ret.message());
@ -413,7 +420,8 @@ public class RemoteInterpreterTest {
"title",
"text",
new HashMap<String, Object>(),
new GUI()));
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
synchronized (results) {
results.add(ret.message());

View file

@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterGroup;
@ -53,7 +54,7 @@ public class RemoteSchedulerTest {
@Test
public void test() throws Exception {
Properties p = new Properties();
InterpreterGroup intpGroup = new InterpreterGroup();
final InterpreterGroup intpGroup = new InterpreterGroup();
Map<String, String> env = new HashMap<String, String>();
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
@ -93,7 +94,8 @@ public class RemoteSchedulerTest {
"title",
"text",
new HashMap<String, Object>(),
new GUI()));
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
return "1000";
}

View file

@ -297,7 +297,7 @@ public class ZeppelinServer extends Application {
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf);
this.replFactory = new InterpreterFactory(conf, notebookServer);
notebook = new Notebook(conf, schedulerFactory, replFactory, notebookServer);
}

View file

@ -90,6 +90,11 @@ public class Message {
// @param notes serialized List<NoteInfo> object
PARAGRAPH_REMOVE,
ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object
ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del
ANGULAR_OBJECT_UPDATED // [c-s] angular object value updated
}
public OP op;

View file

@ -25,7 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
@ -50,7 +54,8 @@ import com.google.gson.Gson;
*
* @author anthonycorbacho
*/
public class NotebookServer extends WebSocketServer implements JobListenerFactory {
public class NotebookServer extends WebSocketServer implements
JobListenerFactory, AngularObjectRegistryListener {
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
private static final int DEFAULT_PORT = 8282;
@ -130,6 +135,9 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
case COMPLETION:
completion(conn, notebook, messagereceived);
break;
case ANGULAR_OBJECT_UPDATED:
angularObjectUpdated(conn, notebook, messagereceived);
break;
default:
broadcastNoteList();
break;
@ -380,6 +388,41 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
conn.send(serializeMessage(resp));
}
/**
* When angular object updated from client
* @param conn
* @param notebook
* @param fromMessage
*/
private void angularObjectUpdated(WebSocket conn, Notebook notebook,
Message fromMessage) {
String noteId = (String) fromMessage.get("noteId");
String interpreterGroupId = (String) fromMessage.get("interpreterGroupId");
String varName = (String) fromMessage.get("name");
Object varValue = fromMessage.get("value");
Note note = notebook.getNote(noteId);
List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup() == null) {
continue;
}
if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
AngularObjectRegistry angularObjectRegistry = setting
.getInterpreterGroup().getAngularObjectRegistry();
AngularObject ao = angularObjectRegistry.get(varName);
if (ao == null) {
LOG.warn("Object {} is not binded", varName);
} else {
// path from client -> server
ao.set(varValue, false);
}
}
}
}
private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
throws IOException {
final String paragraphId = (String) fromMessage.get("id");
@ -497,4 +540,48 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
public JobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener(this, note);
}
@Override
public void onAdd(String interpreterGroupId, AngularObject object) {
onUpdate(interpreterGroupId, object);
}
@Override
public void onUpdate(String interpreterGroupId, AngularObject object) {
Notebook notebook = notebook();
List<Note> notes = notebook.getAllNotes();
for (Note note : notes) {
List<InterpreterSetting> intpSettings = note.getNoteReplLoader()
.getInterpreterSettings();
if (intpSettings.isEmpty()) continue;
for (InterpreterSetting setting : intpSettings) {
if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) {
broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", object)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", note.id()));
}
}
}
}
@Override
public void onRemove(String interpreterGroupId, AngularObject object) {
Notebook notebook = notebook();
List<Note> notes = notebook.getAllNotes();
for (Note note : notes) {
List<String> ids = note.getNoteReplLoader().getInterpreters();
for (String id : ids) {
if (id.equals(interpreterGroupId)) {
broadcast(
note.id(),
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name",
object.getName()));
}
}
}
}
}

View file

@ -24,7 +24,8 @@
*/
angular.module('zeppelinWebApp')
.controller('MainCtrl', function($scope, WebSocket, $rootScope, $window) {
$rootScope.compiledScope = $scope.$new(true, $rootScope);
$rootScope.angularObjectRegistry = {};
$scope.WebSocketWaitingList = [];
$scope.connected = false;
$scope.looknfeel = 'default';
@ -65,6 +66,8 @@ angular.module('zeppelinWebApp')
$scope.$broadcast('updateProgress', data);
} else if (op === 'COMPLETION_LIST') {
$scope.$broadcast('completionList', data);
} else if (op === 'ANGULAR_OBJECT_UPDATE') {
$scope.$broadcast('angularObjectUpdate', data);
}
});

View file

@ -442,4 +442,35 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
return true;
}
};
$scope.$on('angularObjectUpdate', function(event, data) {
if (data.noteId === $scope.note.id) {
var scope = $rootScope.compiledScope;
var varName = data.angularObject.name;
$rootScope.angularObjectRegistry[varName] = {
interpreterGroupId : data.interpreterGroupId
};
scope[varName] = data.angularObject.object;
scope.$watch(varName, function(newValue, oldValue) {
console.log("value updated to %o %o", varName, newValue);
$rootScope.$emit('sendNewEvent', {
op: 'ANGULAR_OBJECT_UPDATED',
data: {
noteId: $routeParams.noteId,
name:varName,
value:newValue,
interpreterGroupId:$rootScope.angularObjectRegistry[varName].interpreterGroupId
}
});
});
}
});
var isFunction = function(functionToCheck) {
var getType = {};
return functionToCheck && getType.toString.call(functionToCheck) === '[object Function]';
}
});

View file

@ -84,7 +84,8 @@ angular.module('zeppelinWebApp')
if ($('#p'+$scope.paragraph.id+'_angular').length) {
try {
$('#p'+$scope.paragraph.id+'_angular').html($scope.paragraph.result.msg);
$compile($('#p'+$scope.paragraph.id+'_angular').contents())($scope);
$compile($('#p'+$scope.paragraph.id+'_angular').contents())($rootScope.compiledScope);
} catch(err) {
console.log('ANGULAR rendering error %o', err);
}
@ -1615,5 +1616,4 @@ angular.module('zeppelinWebApp')
var redirectToUrl = location.protocol + '//' + location.host + '/#/notebook/' + noteId + '/paragraph/' + $scope.paragraph.id+'?asIframe';
$window.open(redirectToUrl);
};
});

View file

@ -43,7 +43,10 @@ import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,15 +76,21 @@ public class InterpreterFactory {
private InterpreterOption defaultOption;
public InterpreterFactory(ZeppelinConfiguration conf) throws InterpreterException, IOException {
this(conf, new InterpreterOption(true));
AngularObjectRegistryListener angularObjectRegistryListener;
public InterpreterFactory(ZeppelinConfiguration conf,
AngularObjectRegistryListener angularObjectRegistryListener)
throws InterpreterException, IOException {
this(conf, new InterpreterOption(true), angularObjectRegistryListener);
}
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption)
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
AngularObjectRegistryListener angularObjectRegistryListener)
throws InterpreterException, IOException {
this.conf = conf;
this.defaultOption = defaultOption;
this.angularObjectRegistryListener = angularObjectRegistryListener;
String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
interpreterClassList = replsConf.split(",");
@ -338,7 +347,24 @@ public class InterpreterFactory {
InterpreterOption option,
Properties properties)
throws InterpreterException {
AngularObjectRegistry angularObjectRegistry;
InterpreterGroup interpreterGroup = new InterpreterGroup();
if (option.isRemote()) {
angularObjectRegistry = new RemoteAngularObjectRegistry(
interpreterGroup.getId(),
angularObjectRegistryListener,
interpreterGroup
);
} else {
angularObjectRegistry = new AngularObjectRegistry(
interpreterGroup.getId(),
angularObjectRegistryListener);
}
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
for (String className : interpreterClassList) {
Set<String> keys = Interpreter.registeredInterpreters.keySet();

View file

@ -23,11 +23,13 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.JobListener;
@ -207,11 +209,19 @@ public class Paragraph extends Job implements Serializable {
}
private InterpreterContext getInterpreterContext() {
AngularObjectRegistry registry = null;
if (!getNoteReplLoader().getInterpreterSettings().isEmpty()) {
InterpreterSetting intpGroup = getNoteReplLoader().getInterpreterSettings().get(0);
registry = intpGroup.getInterpreterGroup().getAngularObjectRegistry();
}
InterpreterContext interpreterContext = new InterpreterContext(getId(),
this.getTitle(),
this.getText(),
this.getConfig(),
this.settings);
this.settings,
registry);
return interpreterContext;
}

View file

@ -54,8 +54,8 @@ public class InterpreterFactoryTest {
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
conf = new ZeppelinConfiguration();
factory = new InterpreterFactory(conf, new InterpreterOption(false));
context = new InterpreterContext("id", "title", "text", null, null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
context = new InterpreterContext("id", "title", "text", null, null, null);
}
@ -122,7 +122,7 @@ public class InterpreterFactoryTest {
factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties());
assertEquals(3, factory.get().size());
InterpreterFactory factory2 = new InterpreterFactory(conf);
InterpreterFactory factory2 = new InterpreterFactory(conf, null);
assertEquals(3, factory2.get().size());
}
}

View file

@ -69,7 +69,7 @@ public class NotebookTest implements JobListenerFactory{
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
factory = new InterpreterFactory(conf, new InterpreterOption(false));
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
notebook = new Notebook(conf, schedulerFactory, factory, this);
}
@ -108,7 +108,7 @@ public class NotebookTest implements JobListenerFactory{
p1.setText("hello world");
note.persist();
Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf), this);
Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf, null), this);
assertEquals(1, notebook2.getAllNotes().size());
}