ZEPPELIN-25 prevent watcher called multiple times

This commit is contained in:
Lee moon soo 2015-04-05 21:36:46 +09:00
parent d4d270e431
commit 4d32d1971b
9 changed files with 133 additions and 37 deletions

View file

@ -22,9 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nflabs.zeppelin.scheduler.ExecutorFactory;
/**
@ -115,7 +112,6 @@ public class AngularObject<T> {
ExecutorService executor = ExecutorFactory.singleton().createOrGet("angularObjectWatcher", 50);
for (final AngularObjectWatcher w : ws) {
Logger logger = LoggerFactory.getLogger(AngularObject.class);
executor.submit(new Runnable() {
@Override
public void run() {

View file

@ -26,16 +26,25 @@ import java.util.Map;
*
*
*/
public class AngularObjectRegistry implements AngularObjectListener {
public class AngularObjectRegistry {
Map<String, AngularObject> registry = new HashMap<String, AngularObject>();
private AngularObjectRegistryListener listener;
private String interpreterId;
public AngularObjectRegistry(String interpreterId,
AngularObjectRegistryListener listener) {
AngularObjectListener angularObjectListener;
public AngularObjectRegistry(final String interpreterId,
final AngularObjectRegistryListener listener) {
this.interpreterId = interpreterId;
this.listener = listener;
angularObjectListener = new AngularObjectListener() {
@Override
public void updated(AngularObject updatedObject) {
if (listener != null) {
listener.onUpdate(interpreterId, updatedObject);
}
}
};
}
public AngularObjectRegistryListener getListener() {
@ -60,7 +69,11 @@ public class AngularObjectRegistry implements AngularObjectListener {
}
protected AngularObject createNewAngularObject(String name, Object o) {
return new AngularObject(name, o, this);
return new AngularObject(name, o, angularObjectListener);
}
protected AngularObjectListener getAngularObjectListener() {
return angularObjectListener;
}
public AngularObject remove(String name) {
@ -87,13 +100,6 @@ public class AngularObjectRegistry implements AngularObjectListener {
return all;
}
@Override
public void updated(AngularObject updatedObject) {
if (listener != null) {
listener.onUpdate(interpreterId, updatedObject);
}
}
public String getInterpreterGroupId() {
return interpreterId;
}

View file

@ -36,9 +36,15 @@ public class RemoteAngularObject extends AngularObject {
@Override
public void set(Object o, boolean emit) {
super.set(o, emit);
set(o, emit, true);
}
// send updated value to remote interpreter
remoteInterpreterProcess.updateRemoteAngularObject(getName(), o);
public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) {
super.set(o, emitWeb);
if (emitRemoteProcess) {
// send updated value to remote interpreter
remoteInterpreterProcess.updateRemoteAngularObject(getName(), o);
}
}
}

View file

@ -60,7 +60,8 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
if (remoteInterpreterProcess == null) {
throw new RuntimeException("Remote Interpreter process not found");
}
return new RemoteAngularObject(name, o, getInterpreterGroupId(), this,
return new RemoteAngularObject(name, o, getInterpreterGroupId(),
getAngularObjectListener(),
getRemoteInterpreterProcess());
}
}

View file

@ -93,7 +93,13 @@ public class RemoteInterpreterEventPoller extends Thread {
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
AngularObject localAngularObject = angularObjectRegistry.get(angularObject.getName());
localAngularObject.set(angularObject.get());
if (localAngularObject instanceof RemoteAngularObject) {
// to avoid ping-pong loop
((RemoteAngularObject) localAngularObject).set(
angularObject.get(), true, false);
} else {
localAngularObject.set(angularObject.get());
}
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
angularObjectRegistry.remove(angularObject.getName());

View file

@ -17,13 +17,61 @@
package com.nflabs.zeppelin.display;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class AngularObjectTest {
@Test
public void test() {
public void testListener() {
final AtomicInteger updated = new AtomicInteger(0);
AngularObject ao = new AngularObject("name", "value", new AngularObjectListener() {
@Override
public void updated(AngularObject updatedObject) {
updated.incrementAndGet();
}
});
assertEquals(0, updated.get());
ao.set("newValue");
assertEquals(1, updated.get());
assertEquals("newValue", ao.get());
ao.set("newValue");
assertEquals(2, updated.get());
ao.set("newnewValue", false);
assertEquals(2, updated.get());
assertEquals("newnewValue", ao.get());
}
@Test
public void testWatcher() throws InterruptedException {
final AtomicInteger updated = new AtomicInteger(0);
final AtomicInteger onWatch = new AtomicInteger(0);
AngularObject ao = new AngularObject("name", "value", new AngularObjectListener() {
@Override
public void updated(AngularObject updatedObject) {
updated.incrementAndGet();
}
});
ao.addWatcher(new AngularObjectWatcher() {
@Override
public void watch(Object oldObject, Object newObject) {
onWatch.incrementAndGet();
}
});
assertEquals(0, onWatch.get());
ao.set("newValue");
Thread.sleep(500);
assertEquals(1, onWatch.get());
}
}

View file

@ -418,7 +418,9 @@ public class NotebookServer extends WebSocketServer implements
String varName = (String) fromMessage.get("name");
Object varValue = fromMessage.get("value");
for (Note note : notebook.getAllNotes()) {
// propagate change to (Remote) AngularObjectRegistry
Note note = notebook.getNote(noteId);
if (note != null) {
List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup() == null) {
@ -434,11 +436,29 @@ public class NotebookServer extends WebSocketServer implements
} else {
// path from client -> server
ao.set(varValue, false);
this.broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
}
break;
}
}
}
// broadcast change to all web session that uses related interpreter.
for (Note n : notebook.getAllNotes()) {
List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup() == null) {
continue;
}
if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
AngularObjectRegistry angularObjectRegistry = setting
.getInterpreterGroup().getAngularObjectRegistry();
AngularObject ao = angularObjectRegistry.get(varName);
this.broadcast(n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", note.id()));
}
.put("noteId", n.id()));
}
}
}

View file

@ -24,8 +24,7 @@
*/
angular.module('zeppelinWebApp')
.controller('MainCtrl', function($scope, WebSocket, $rootScope, $window) {
$rootScope.compiledScope = $scope.$new(true, $rootScope);
$rootScope.angularObjectRegistry = {};
$rootScope.compiledScope = $scope.$new(true, $rootScope);
$scope.WebSocketWaitingList = [];
$scope.connected = false;
$scope.looknfeel = 'default';

View file

@ -44,6 +44,8 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
$scope.interpreterSettings = [];
$scope.interpreterBindings = [];
var angularObjectRegistry = {};
$scope.getCronOptionNameFromValue = function(value) {
if (!value) {
return '';
@ -448,24 +450,36 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
var scope = $rootScope.compiledScope;
var varName = data.angularObject.name;
if (!$rootScope.angularObjectRegistry[varName]) {
var clearWatcher = scope.$watch(varName, function(newValue, oldValue) {
console.log("value updated to %o %o", varName, newValue);
if (angular.equals(data.angularObject.object, scope[varName])) {
// return when update has no change
return;
}
if (!angularObjectRegistry[varName]) {
angularObjectRegistry[varName] = {
interpreterGroupId : data.interpreterGroupId,
}
}
angularObjectRegistry[varName].skipEmit = true;
if (!angularObjectRegistry[varName].clearWatcher) {
angularObjectRegistry[varName].clearWatcher = scope.$watch(varName, function(newValue, oldValue) {
if (angularObjectRegistry[varName].skipEmit) {
angularObjectRegistry[varName].skipEmit = false;
return;
}
$rootScope.$emit('sendNewEvent', {
op: 'ANGULAR_OBJECT_UPDATED',
data: {
noteId: $routeParams.noteId,
name:varName,
value:newValue,
interpreterGroupId:$rootScope.angularObjectRegistry[varName].interpreterGroupId
interpreterGroupId:angularObjectRegistry[varName].interpreterGroupId
}
});
});
$rootScope.angularObjectRegistry[varName] = {
interpreterGroupId : data.interpreterGroupId,
clearWatcher : clearWatcher
};
}
scope[varName] = data.angularObject.object;
}