Add paragraph scope of angular object

This commit is contained in:
Lee moon soo 2015-12-31 23:10:08 -08:00
parent 0c42f4332e
commit 8b13c1e117
17 changed files with 499 additions and 141 deletions

View file

@ -39,11 +39,12 @@ public class AngularObject<T> {
= new LinkedList<AngularObjectWatcher>();
private String noteId; // noteId belonging to. null for global scope
protected AngularObject(String name, T o, String noteId,
private String paragraphId; // paragraphId belongs to. null for notebook scope
protected AngularObject(String name, T o, String noteId, String paragraphId,
AngularObjectListener listener) {
this.name = name;
this.noteId = noteId;
this.paragraphId = paragraphId;
this.listener = listener;
object = o;
}
@ -59,7 +60,15 @@ public class AngularObject<T> {
public String getNoteId() {
return noteId;
}
public String getParagraphId() {
return paragraphId;
}
public void setParagraphId(String paragraphId) {
this.paragraphId = paragraphId;
}
public boolean isGlobal() {
return noteId == null;
}
@ -70,7 +79,10 @@ public class AngularObject<T> {
AngularObject ao = (AngularObject) o;
if (noteId == null && ao.noteId == null ||
(noteId != null && ao.noteId != null && noteId.equals(ao.noteId))) {
return name.equals(ao.name);
if (paragraphId == null && ao.paragraphId == null ||
(paragraphId != null && ao.paragraphId != null && paragraphId.equals(ao.paragraphId))) {
return name.equals(ao.name);
}
}
}
return false;

View file

@ -65,21 +65,25 @@ public class AngularObjectRegistry {
* @param noteId noteId belonging to. null for global object.
* @return
*/
public AngularObject add(String name, Object o, String noteId) {
return add(name, o, noteId, true);
public AngularObject add(String name, Object o, String noteId, String paragraphId) {
return add(name, o, noteId, paragraphId, true);
}
private String getRegistryKey(String noteId) {
private String getRegistryKey(String noteId, String paragraphId) {
if (noteId == null) {
return GLOBAL_KEY;
} else {
return noteId;
if (paragraphId == null) {
return noteId;
} else {
return noteId + "_" + paragraphId;
}
}
}
private Map<String, AngularObject> getRegistryForKey(String noteId) {
private Map<String, AngularObject> getRegistryForKey(String noteId, String paragraphId) {
synchronized (registry) {
String key = getRegistryKey(noteId);
String key = getRegistryKey(noteId, paragraphId);
if (!registry.containsKey(key)) {
registry.put(key, new HashMap<String, AngularObject>());
}
@ -88,11 +92,12 @@ public class AngularObjectRegistry {
}
}
public AngularObject add(String name, Object o, String noteId, boolean emit) {
AngularObject ao = createNewAngularObject(name, o, noteId);
public AngularObject add(String name, Object o, String noteId, String paragraphId,
boolean emit) {
AngularObject ao = createNewAngularObject(name, o, noteId, paragraphId);
synchronized (registry) {
Map<String, AngularObject> noteLocalRegistry = getRegistryForKey(noteId);
Map<String, AngularObject> noteLocalRegistry = getRegistryForKey(noteId, paragraphId);
noteLocalRegistry.put(name, ao);
if (listener != null && emit) {
listener.onAdd(interpreterId, ao);
@ -102,49 +107,50 @@ public class AngularObjectRegistry {
return ao;
}
protected AngularObject createNewAngularObject(String name, Object o, String noteId) {
return new AngularObject(name, o, noteId, angularObjectListener);
protected AngularObject createNewAngularObject(String name, Object o, String noteId,
String paragraphId) {
return new AngularObject(name, o, noteId, paragraphId, angularObjectListener);
}
protected AngularObjectListener getAngularObjectListener() {
return angularObjectListener;
}
public AngularObject remove(String name, String noteId) {
return remove(name, noteId, true);
public AngularObject remove(String name, String noteId, String paragraphId) {
return remove(name, noteId, paragraphId, true);
}
public AngularObject remove(String name, String noteId, boolean emit) {
public AngularObject remove(String name, String noteId, String paragraphId, boolean emit) {
synchronized (registry) {
Map<String, AngularObject> r = getRegistryForKey(noteId);
Map<String, AngularObject> r = getRegistryForKey(noteId, paragraphId);
AngularObject o = r.remove(name);
if (listener != null && emit) {
listener.onRemove(interpreterId, name, noteId);;
listener.onRemove(interpreterId, name, noteId, paragraphId);;
}
return o;
}
}
public void removeAll(String noteId) {
public void removeAll(String noteId, String paragraphId) {
synchronized (registry) {
List<AngularObject> all = getAll(noteId);
List<AngularObject> all = getAll(noteId, paragraphId);
for (AngularObject ao : all) {
remove(ao.getName(), noteId);
remove(ao.getName(), noteId, paragraphId);
}
}
}
public AngularObject get(String name, String noteId) {
public AngularObject get(String name, String noteId, String paragraphId) {
synchronized (registry) {
Map<String, AngularObject> r = getRegistryForKey(noteId);
Map<String, AngularObject> r = getRegistryForKey(noteId, paragraphId);
return r.get(name);
}
}
public List<AngularObject> getAll(String noteId) {
public List<AngularObject> getAll(String noteId, String paragraphId) {
List<AngularObject> all = new LinkedList<AngularObject>();
synchronized (registry) {
Map<String, AngularObject> r = getRegistryForKey(noteId);
Map<String, AngularObject> r = getRegistryForKey(noteId, paragraphId);
if (r != null) {
all.addAll(r.values());
}
@ -160,13 +166,14 @@ public class AngularObjectRegistry {
public List<AngularObject> getAllWithGlobal(String noteId) {
List<AngularObject> all = new LinkedList<AngularObject>();
synchronized (registry) {
Map<String, AngularObject> global = getRegistryForKey(null);
Map<String, AngularObject> global = getRegistryForKey(null, null);
if (global != null) {
all.addAll(global.values());
}
Map<String, AngularObject> local = getRegistryForKey(noteId);
if (local != null) {
all.addAll(local.values());
for (String key : registry.keySet()) {
if (key.startsWith(noteId)) {
all.addAll(registry.get(key).values());
}
}
}
return all;

View file

@ -24,5 +24,5 @@ package org.apache.zeppelin.display;
public interface AngularObjectRegistryListener {
public void onAdd(String interpreterGroupId, AngularObject object);
public void onUpdate(String interpreterGroupId, AngularObject object);
public void onRemove(String interpreterGroupId, String name, String noteId);
public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId);
}

View file

@ -27,10 +27,11 @@ public class RemoteAngularObject extends AngularObject {
private transient RemoteInterpreterProcess remoteInterpreterProcess;
RemoteAngularObject(String name, Object o, String noteId, String interpreterGroupId,
RemoteAngularObject(String name, Object o, String noteId, String paragraphId, String
interpreterGroupId,
AngularObjectListener listener,
RemoteInterpreterProcess remoteInterpreterProcess) {
super(name, o, noteId, listener);
super(name, o, noteId, paragraphId, listener);
this.remoteInterpreterProcess = remoteInterpreterProcess;
}
@ -44,7 +45,8 @@ public class RemoteAngularObject extends AngularObject {
if (emitRemoteProcess) {
// send updated value to remote interpreter
remoteInterpreterProcess.updateRemoteAngularObject(getName(), getNoteId(), o);
remoteInterpreterProcess.updateRemoteAngularObject(getName(), getNoteId(), getParagraphId()
, o);
}
}
}

View file

@ -70,7 +70,8 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
* @param noteId
* @return
*/
public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId) {
public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String
paragraphId) {
Gson gson = new Gson();
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
@ -81,8 +82,8 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
client.angularObjectAdd(name, noteId, gson.toJson(o));
return super.add(name, o, noteId, true);
client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
return super.add(name, o, noteId, paragraphId, true);
} catch (TException e) {
broken = true;
logger.error("Error", e);
@ -101,9 +102,11 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
* this method should be used instead of remove()
* @param name
* @param noteId
* @param paragraphId
* @return
*/
public AngularObject removeAndNotifyRemoteProcess(String name, String noteId) {
public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String
paragraphId) {
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
return null;
@ -113,8 +116,8 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
client.angularObjectRemove(name, noteId);
return super.remove(name, noteId);
client.angularObjectRemove(name, noteId, paragraphId);
return super.remove(name, noteId, paragraphId);
} catch (TException e) {
broken = true;
logger.error("Error", e);
@ -128,20 +131,21 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
return null;
}
public void removeAllAndNotifyRemoteProcess(String noteId) {
List<AngularObject> all = getAll(noteId);
public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
List<AngularObject> all = getAll(noteId, paragraphId);
for (AngularObject ao : all) {
removeAndNotifyRemoteProcess(ao.getName(), noteId);
removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId);
}
}
@Override
protected AngularObject createNewAngularObject(String name, Object o, String noteId) {
protected AngularObject createNewAngularObject(String name, Object o, String noteId, String
paragraphId) {
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
throw new RuntimeException("Remote Interpreter process not found");
}
return new RemoteAngularObject(name, o, noteId, getInterpreterGroupId(),
return new RemoteAngularObject(name, o, noteId, paragraphId, getInterpreterGroupId(),
getAngularObjectListener(),
getRemoteInterpreterProcess());
}

View file

@ -88,12 +88,12 @@ public class RemoteInterpreterEventPoller extends Thread {
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
angularObjectRegistry.add(angularObject.getName(),
angularObject.get(), angularObject.getNoteId());
angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId());
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
AngularObject angularObject = gson.fromJson(event.getData(),
AngularObject.class);
AngularObject localAngularObject = angularObjectRegistry.get(
angularObject.getName(), angularObject.getNoteId());
angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId());
if (localAngularObject instanceof RemoteAngularObject) {
// to avoid ping-pong loop
((RemoteAngularObject) localAngularObject).set(
@ -103,7 +103,8 @@ public class RemoteInterpreterEventPoller extends Thread {
}
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId());
angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId(),
angularObject.getParagraphId());
} else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
InterpreterContextRunner runnerFromRemote = gson.fromJson(
event.getData(), RemoteInterpreterContextRunner.class);

View file

@ -266,7 +266,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
* @param name
* @param o
*/
public void updateRemoteAngularObject(String name, String noteId, Object o) {
public void updateRemoteAngularObject(String name, String noteId, String paragraphId, Object o) {
Client client = null;
try {
client = getClient();
@ -282,7 +282,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
boolean broken = false;
try {
Gson gson = new Gson();
client.angularObjectUpdate(name, noteId, gson.toJson(o));
client.angularObjectUpdate(name, noteId, paragraphId, gson.toJson(o));
} catch (TException e) {
broken = true;
logger.error("Can't update angular object", e);

View file

@ -434,7 +434,7 @@ public class RemoteInterpreterServer
}
@Override
public void onRemove(String interpreterGroupId, String name, String noteId) {
public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
Map<String, String> removeObject = new HashMap<String, String>();
removeObject.put("name", name);
removeObject.put("noteId", noteId);
@ -473,15 +473,16 @@ public class RemoteInterpreterServer
* called when object is updated in client (web) side.
* @param name
* @param noteId noteId where the update issues
* @param paragraphId paragraphId where the update issues
* @param object
* @throws TException
*/
@Override
public void angularObjectUpdate(String name, String noteId, String object)
public void angularObjectUpdate(String name, String noteId, String paragraphId, String object)
throws TException {
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
// first try local objects
AngularObject ao = registry.get(name, noteId);
AngularObject ao = registry.get(name, noteId, paragraphId);
if (ao == null) {
logger.error("Angular object {} not exists", name);
return;
@ -530,13 +531,13 @@ public class RemoteInterpreterServer
* Dont't need to emit event to zeppelin server
*/
@Override
public void angularObjectAdd(String name, String noteId, String object)
public void angularObjectAdd(String name, String noteId, String paragraphId, String object)
throws TException {
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
// first try local objects
AngularObject ao = registry.get(name, noteId);
AngularObject ao = registry.get(name, noteId, paragraphId);
if (ao != null) {
angularObjectUpdate(name, noteId, object);
angularObjectUpdate(name, noteId, paragraphId, object);
return;
}
@ -556,12 +557,13 @@ public class RemoteInterpreterServer
value = gson.fromJson(object, String.class);
}
registry.add(name, value, noteId, false);
registry.add(name, value, noteId, paragraphId, false);
}
@Override
public void angularObjectRemove(String name, String noteId) throws TException {
public void angularObjectRemove(String name, String noteId, String paragraphId) throws
TException {
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
registry.remove(name, noteId, false);
registry.remove(name, noteId, paragraphId, false);
}
}

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-31")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-31")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-12-31")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -65,7 +65,8 @@ service RemoteInterpreterService {
string getStatus(1:string jobId);
RemoteInterpreterEvent getEvent();
void angularObjectUpdate(1: string name, 2: string noteId, 3: string object);
void angularObjectAdd(1: string name, 2: string noteId, 3: string object);
void angularObjectRemove(1: string name, 2: string noteId);
void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
object);
void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
}

View file

@ -45,32 +45,32 @@ public class AngularObjectRegistryTest {
}
@Override
public void onRemove(String interpreterGroupId, String name, String noteId) {
public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
onRemove.incrementAndGet();
}
});
registry.add("name1", "value1", "note1");
assertEquals(1, registry.getAll("note1").size());
registry.add("name1", "value1", "note1", null);
assertEquals(1, registry.getAll("note1", null).size());
assertEquals(1, onAdd.get());
assertEquals(0, onUpdate.get());
registry.get("name1", "note1").set("newValue");
registry.get("name1", "note1", null).set("newValue");
assertEquals(1, onUpdate.get());
registry.remove("name1", "note1");
assertEquals(0, registry.getAll("note1").size());
registry.remove("name1", "note1", null);
assertEquals(0, registry.getAll("note1", null).size());
assertEquals(1, onRemove.get());
assertEquals(null, registry.get("name1", "note1"));
assertEquals(null, registry.get("name1", "note1", null));
// namespace
registry.add("name1", "value11", "note2");
assertEquals("value11", registry.get("name1", "note2").get());
assertEquals(null, registry.get("name1", "note1"));
registry.add("name1", "value11", "note2", null);
assertEquals("value11", registry.get("name1", "note2", null).get());
assertEquals(null, registry.get("name1", "note1", null));
// null namespace
registry.add("name1", "global1", null);
assertEquals("global1", registry.get("name1", null).get());
registry.add("name1", "global1", null, null);
assertEquals("global1", registry.get("name1", null, null).get());
}
}

View file

@ -29,7 +29,7 @@ public class AngularObjectTest {
@Test
public void testListener() {
final AtomicInteger updated = new AtomicInteger(0);
AngularObject ao = new AngularObject("name", "value", "note1", new AngularObjectListener() {
AngularObject ao = new AngularObject("name", "value", "note1", null, new AngularObjectListener() {
@Override
public void updated(AngularObject updatedObject) {
@ -55,7 +55,7 @@ public class AngularObjectTest {
public void testWatcher() throws InterruptedException {
final AtomicInteger updated = new AtomicInteger(0);
final AtomicInteger onWatch = new AtomicInteger(0);
AngularObject ao = new AngularObject("name", "value", "note1", new AngularObjectListener() {
AngularObject ao = new AngularObject("name", "value", "note1", null, new AngularObjectListener() {
@Override
public void updated(AngularObject updatedObject) {
updated.incrementAndGet();

View file

@ -109,7 +109,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
result = ret.message().split(" ");
assertEquals("1", result[0]); // size of registry
assertEquals("0", result[1]); // num watcher called
assertEquals("v1", localRegistry.get("n1", "note").get());
assertEquals("v1", localRegistry.get("n1", "note", null).get());
// update object
ret = intp.interpret("update n1 v11", context);
@ -117,7 +117,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
Thread.sleep(500);
assertEquals("1", result[0]); // size of registry
assertEquals("1", result[1]); // num watcher called
assertEquals("v11", localRegistry.get("n1", "note").get());
assertEquals("v11", localRegistry.get("n1", "note", null).get());
// remove object
ret = intp.interpret("remove n1", context);
@ -125,7 +125,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
Thread.sleep(500);
assertEquals("0", result[0]); // size of registry
assertEquals("1", result[1]); // num watcher called
assertEquals(null, localRegistry.get("n1", "note"));
assertEquals(null, localRegistry.get("n1", "note", null));
}
@Test
@ -143,10 +143,10 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
Thread.sleep(500);
result = ret.message().split(" ");
assertEquals("1", result[0]); // size of registry
assertEquals("v1", localRegistry.get("n1", "note").get());
assertEquals("v1", localRegistry.get("n1", "note", null).get());
// remove object in local registry.
localRegistry.removeAndNotifyRemoteProcess("n1", "note");
localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
result = ret.message().split(" ");
@ -164,7 +164,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
assertEquals("0", result[0]); // size of registry
// create object
localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note");
localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
// get from remote registry
ret = intp.interpret("get", context);
@ -184,7 +184,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
}
@Override
public void onRemove(String interpreterGroupId, String name, String noteId) {
public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
onRemove.incrementAndGet();
}

View file

@ -71,8 +71,9 @@ public class MockInterpreterAngular extends Interpreter {
AngularObjectRegistry registry = context.getAngularObjectRegistry();
if (cmd.equals("add")) {
registry.add(name, value, context.getNoteId());
registry.get(name, context.getNoteId()).addWatcher(new AngularObjectWatcher(null) {
registry.add(name, value, context.getNoteId(), null);
registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher
(null) {
@Override
public void watch(Object oldObject, Object newObject,
@ -82,9 +83,9 @@ public class MockInterpreterAngular extends Interpreter {
});
} else if (cmd.equalsIgnoreCase("update")) {
registry.get(name, context.getNoteId()).set(value);
registry.get(name, context.getNoteId(), null).set(value);
} else if (cmd.equals("remove")) {
registry.remove(name, context.getNoteId());
registry.remove(name, context.getNoteId(), null);
}
try {
@ -93,7 +94,8 @@ public class MockInterpreterAngular extends Interpreter {
logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e);
}
String msg = registry.getAll(context.getNoteId()).size() + " " + Integer.toString(numWatch.get());
String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch
.get());
return new InterpreterResult(Code.SUCCESS, msg);
}