mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-25 prevent watcher called multiple times
This commit is contained in:
parent
d4d270e431
commit
4d32d1971b
9 changed files with 133 additions and 37 deletions
|
|
@ -22,9 +22,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.nflabs.zeppelin.scheduler.ExecutorFactory;
|
||||
|
||||
/**
|
||||
|
|
@ -115,7 +112,6 @@ public class AngularObject<T> {
|
|||
|
||||
ExecutorService executor = ExecutorFactory.singleton().createOrGet("angularObjectWatcher", 50);
|
||||
for (final AngularObjectWatcher w : ws) {
|
||||
Logger logger = LoggerFactory.getLogger(AngularObject.class);
|
||||
executor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
|||
|
|
@ -26,16 +26,25 @@ import java.util.Map;
|
|||
*
|
||||
*
|
||||
*/
|
||||
public class AngularObjectRegistry implements AngularObjectListener {
|
||||
public class AngularObjectRegistry {
|
||||
Map<String, AngularObject> registry = new HashMap<String, AngularObject>();
|
||||
private AngularObjectRegistryListener listener;
|
||||
private String interpreterId;
|
||||
|
||||
public AngularObjectRegistry(String interpreterId,
|
||||
AngularObjectRegistryListener listener) {
|
||||
AngularObjectListener angularObjectListener;
|
||||
|
||||
public AngularObjectRegistry(final String interpreterId,
|
||||
final AngularObjectRegistryListener listener) {
|
||||
this.interpreterId = interpreterId;
|
||||
this.listener = listener;
|
||||
|
||||
angularObjectListener = new AngularObjectListener() {
|
||||
@Override
|
||||
public void updated(AngularObject updatedObject) {
|
||||
if (listener != null) {
|
||||
listener.onUpdate(interpreterId, updatedObject);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public AngularObjectRegistryListener getListener() {
|
||||
|
|
@ -60,7 +69,11 @@ public class AngularObjectRegistry implements AngularObjectListener {
|
|||
}
|
||||
|
||||
protected AngularObject createNewAngularObject(String name, Object o) {
|
||||
return new AngularObject(name, o, this);
|
||||
return new AngularObject(name, o, angularObjectListener);
|
||||
}
|
||||
|
||||
protected AngularObjectListener getAngularObjectListener() {
|
||||
return angularObjectListener;
|
||||
}
|
||||
|
||||
public AngularObject remove(String name) {
|
||||
|
|
@ -87,13 +100,6 @@ public class AngularObjectRegistry implements AngularObjectListener {
|
|||
return all;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updated(AngularObject updatedObject) {
|
||||
if (listener != null) {
|
||||
listener.onUpdate(interpreterId, updatedObject);
|
||||
}
|
||||
}
|
||||
|
||||
public String getInterpreterGroupId() {
|
||||
return interpreterId;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,9 +36,15 @@ public class RemoteAngularObject extends AngularObject {
|
|||
|
||||
@Override
|
||||
public void set(Object o, boolean emit) {
|
||||
super.set(o, emit);
|
||||
set(o, emit, true);
|
||||
}
|
||||
|
||||
// send updated value to remote interpreter
|
||||
remoteInterpreterProcess.updateRemoteAngularObject(getName(), o);
|
||||
public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) {
|
||||
super.set(o, emitWeb);
|
||||
|
||||
if (emitRemoteProcess) {
|
||||
// send updated value to remote interpreter
|
||||
remoteInterpreterProcess.updateRemoteAngularObject(getName(), o);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,7 +60,8 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
|
|||
if (remoteInterpreterProcess == null) {
|
||||
throw new RuntimeException("Remote Interpreter process not found");
|
||||
}
|
||||
return new RemoteAngularObject(name, o, getInterpreterGroupId(), this,
|
||||
return new RemoteAngularObject(name, o, getInterpreterGroupId(),
|
||||
getAngularObjectListener(),
|
||||
getRemoteInterpreterProcess());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -93,7 +93,13 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
|
||||
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
|
||||
AngularObject localAngularObject = angularObjectRegistry.get(angularObject.getName());
|
||||
localAngularObject.set(angularObject.get());
|
||||
if (localAngularObject instanceof RemoteAngularObject) {
|
||||
// to avoid ping-pong loop
|
||||
((RemoteAngularObject) localAngularObject).set(
|
||||
angularObject.get(), true, false);
|
||||
} else {
|
||||
localAngularObject.set(angularObject.get());
|
||||
}
|
||||
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
|
||||
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
|
||||
angularObjectRegistry.remove(angularObject.getName());
|
||||
|
|
|
|||
|
|
@ -17,13 +17,61 @@
|
|||
|
||||
package com.nflabs.zeppelin.display;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class AngularObjectTest {
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
public void testListener() {
|
||||
final AtomicInteger updated = new AtomicInteger(0);
|
||||
AngularObject ao = new AngularObject("name", "value", new AngularObjectListener() {
|
||||
|
||||
@Override
|
||||
public void updated(AngularObject updatedObject) {
|
||||
updated.incrementAndGet();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
assertEquals(0, updated.get());
|
||||
ao.set("newValue");
|
||||
assertEquals(1, updated.get());
|
||||
assertEquals("newValue", ao.get());
|
||||
|
||||
ao.set("newValue");
|
||||
assertEquals(2, updated.get());
|
||||
|
||||
ao.set("newnewValue", false);
|
||||
assertEquals(2, updated.get());
|
||||
assertEquals("newnewValue", ao.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWatcher() throws InterruptedException {
|
||||
final AtomicInteger updated = new AtomicInteger(0);
|
||||
final AtomicInteger onWatch = new AtomicInteger(0);
|
||||
AngularObject ao = new AngularObject("name", "value", new AngularObjectListener() {
|
||||
@Override
|
||||
public void updated(AngularObject updatedObject) {
|
||||
updated.incrementAndGet();
|
||||
}
|
||||
});
|
||||
|
||||
ao.addWatcher(new AngularObjectWatcher() {
|
||||
@Override
|
||||
public void watch(Object oldObject, Object newObject) {
|
||||
onWatch.incrementAndGet();
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(0, onWatch.get());
|
||||
ao.set("newValue");
|
||||
|
||||
Thread.sleep(500);
|
||||
assertEquals(1, onWatch.get());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -418,7 +418,9 @@ public class NotebookServer extends WebSocketServer implements
|
|||
String varName = (String) fromMessage.get("name");
|
||||
Object varValue = fromMessage.get("value");
|
||||
|
||||
for (Note note : notebook.getAllNotes()) {
|
||||
// propagate change to (Remote) AngularObjectRegistry
|
||||
Note note = notebook.getNote(noteId);
|
||||
if (note != null) {
|
||||
List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getInterpreterGroup() == null) {
|
||||
|
|
@ -434,11 +436,29 @@ public class NotebookServer extends WebSocketServer implements
|
|||
} else {
|
||||
// path from client -> server
|
||||
ao.set(varValue, false);
|
||||
this.broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// broadcast change to all web session that uses related interpreter.
|
||||
for (Note n : notebook.getAllNotes()) {
|
||||
List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getInterpreterGroup() == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry = setting
|
||||
.getInterpreterGroup().getAngularObjectRegistry();
|
||||
AngularObject ao = angularObjectRegistry.get(varName);
|
||||
this.broadcast(n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
.put("angularObject", ao)
|
||||
.put("interpreterGroupId", interpreterGroupId)
|
||||
.put("noteId", note.id()));
|
||||
}
|
||||
.put("noteId", n.id()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,8 +24,7 @@
|
|||
*/
|
||||
angular.module('zeppelinWebApp')
|
||||
.controller('MainCtrl', function($scope, WebSocket, $rootScope, $window) {
|
||||
$rootScope.compiledScope = $scope.$new(true, $rootScope);
|
||||
$rootScope.angularObjectRegistry = {};
|
||||
$rootScope.compiledScope = $scope.$new(true, $rootScope);
|
||||
$scope.WebSocketWaitingList = [];
|
||||
$scope.connected = false;
|
||||
$scope.looknfeel = 'default';
|
||||
|
|
|
|||
|
|
@ -44,6 +44,8 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
|
|||
$scope.interpreterSettings = [];
|
||||
$scope.interpreterBindings = [];
|
||||
|
||||
var angularObjectRegistry = {};
|
||||
|
||||
$scope.getCronOptionNameFromValue = function(value) {
|
||||
if (!value) {
|
||||
return '';
|
||||
|
|
@ -448,24 +450,36 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
|
|||
var scope = $rootScope.compiledScope;
|
||||
var varName = data.angularObject.name;
|
||||
|
||||
if (!$rootScope.angularObjectRegistry[varName]) {
|
||||
var clearWatcher = scope.$watch(varName, function(newValue, oldValue) {
|
||||
console.log("value updated to %o %o", varName, newValue);
|
||||
if (angular.equals(data.angularObject.object, scope[varName])) {
|
||||
// return when update has no change
|
||||
return;
|
||||
}
|
||||
|
||||
if (!angularObjectRegistry[varName]) {
|
||||
angularObjectRegistry[varName] = {
|
||||
interpreterGroupId : data.interpreterGroupId,
|
||||
}
|
||||
}
|
||||
|
||||
angularObjectRegistry[varName].skipEmit = true;
|
||||
|
||||
if (!angularObjectRegistry[varName].clearWatcher) {
|
||||
angularObjectRegistry[varName].clearWatcher = scope.$watch(varName, function(newValue, oldValue) {
|
||||
if (angularObjectRegistry[varName].skipEmit) {
|
||||
angularObjectRegistry[varName].skipEmit = false;
|
||||
return;
|
||||
}
|
||||
|
||||
$rootScope.$emit('sendNewEvent', {
|
||||
op: 'ANGULAR_OBJECT_UPDATED',
|
||||
data: {
|
||||
noteId: $routeParams.noteId,
|
||||
name:varName,
|
||||
value:newValue,
|
||||
interpreterGroupId:$rootScope.angularObjectRegistry[varName].interpreterGroupId
|
||||
interpreterGroupId:angularObjectRegistry[varName].interpreterGroupId
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
$rootScope.angularObjectRegistry[varName] = {
|
||||
interpreterGroupId : data.interpreterGroupId,
|
||||
clearWatcher : clearWatcher
|
||||
};
|
||||
}
|
||||
scope[varName] = data.angularObject.object;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue