mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-25 send scope variables when loading note
This commit is contained in:
parent
67f6926c1f
commit
c2881982ed
3 changed files with 78 additions and 16 deletions
|
|
@ -18,6 +18,8 @@
|
|||
package com.nflabs.zeppelin.display;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
|
@ -73,6 +75,14 @@ public class AngularObjectRegistry implements AngularObjectListener {
|
|||
}
|
||||
}
|
||||
|
||||
public List<AngularObject> getAll() {
|
||||
List<AngularObject> all = new LinkedList<AngularObject>();
|
||||
synchronized (registry) {
|
||||
all.addAll(registry.values());
|
||||
}
|
||||
return all;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updated(AngularObject updatedObject) {
|
||||
if (listener != null) {
|
||||
|
|
|
|||
|
|
@ -227,6 +227,19 @@ public class NotebookServer extends WebSocketServer implements
|
|||
return id;
|
||||
}
|
||||
|
||||
private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) {
|
||||
Notebook notebook = notebook();
|
||||
List<Note> notes = notebook.getAllNotes();
|
||||
for (Note note : notes) {
|
||||
List<String> ids = note.getNoteReplLoader().getInterpreters();
|
||||
for (String id : ids) {
|
||||
if (id.equals(interpreterGroupId)) {
|
||||
broadcast(note.id(), m);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void broadcast(String noteId, Message m) {
|
||||
LOG.info("SEND >> " + m.op);
|
||||
synchronized (noteSocketMap) {
|
||||
|
|
@ -271,9 +284,11 @@ public class NotebookServer extends WebSocketServer implements
|
|||
return;
|
||||
}
|
||||
Note note = notebook.getNote(noteId);
|
||||
|
||||
if (note != null) {
|
||||
addConnectionToNote(note.id(), conn);
|
||||
conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
|
||||
sendAllAngularObjects(note, conn);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -417,6 +432,21 @@ public class NotebookServer extends WebSocketServer implements
|
|||
} else {
|
||||
// path from client -> server
|
||||
ao.set(varValue, false);
|
||||
|
||||
synchronized (noteSocketMap) {
|
||||
List<WebSocket> socketLists = noteSocketMap.get(noteId);
|
||||
if (socketLists == null || socketLists.size() == 0) {
|
||||
return;
|
||||
}
|
||||
for (WebSocket c : socketLists) {
|
||||
if (c.equals(conn)) continue;
|
||||
|
||||
c.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
.put("angularObject", ao)
|
||||
.put("interpreterGroupId", interpreterGroupId)
|
||||
.put("noteId", noteId)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -541,6 +571,24 @@ public class NotebookServer extends WebSocketServer implements
|
|||
return new ParagraphJobListener(this, note);
|
||||
}
|
||||
|
||||
private void sendAllAngularObjects(Note note, WebSocket conn) {
|
||||
List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
|
||||
if (settings == null || settings.size() ==0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (InterpreterSetting intpSetting : settings) {
|
||||
AngularObjectRegistry registry = intpSetting.getInterpreterGroup().getAngularObjectRegistry();
|
||||
List<AngularObject> objects = registry.getAll();
|
||||
for (AngularObject object : objects) {
|
||||
conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
.put("angularObject", object)
|
||||
.put("interpreterGroupId", intpSetting.getInterpreterGroup().getId())
|
||||
.put("noteId", note.id())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAdd(String interpreterGroupId, AngularObject object) {
|
||||
onUpdate(interpreterGroupId, object);
|
||||
|
|
|
|||
|
|
@ -448,24 +448,28 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
|
|||
var scope = $rootScope.compiledScope;
|
||||
var varName = data.angularObject.name;
|
||||
|
||||
$rootScope.angularObjectRegistry[varName] = {
|
||||
interpreterGroupId : data.interpreterGroupId
|
||||
};
|
||||
scope[varName] = data.angularObject.object;
|
||||
|
||||
scope.$watch(varName, function(newValue, oldValue) {
|
||||
console.log("value updated to %o %o", varName, newValue);
|
||||
$rootScope.$emit('sendNewEvent', {
|
||||
op: 'ANGULAR_OBJECT_UPDATED',
|
||||
data: {
|
||||
noteId: $routeParams.noteId,
|
||||
name:varName,
|
||||
value:newValue,
|
||||
interpreterGroupId:$rootScope.angularObjectRegistry[varName].interpreterGroupId
|
||||
}
|
||||
if (!$rootScope.angularObjectRegistry[varName]) {
|
||||
var clearWatcher = scope.$watch(varName, function(newValue, oldValue) {
|
||||
console.log("value updated to %o %o", varName, newValue);
|
||||
$rootScope.$emit('sendNewEvent', {
|
||||
op: 'ANGULAR_OBJECT_UPDATED',
|
||||
data: {
|
||||
noteId: $routeParams.noteId,
|
||||
name:varName,
|
||||
value:newValue,
|
||||
interpreterGroupId:$rootScope.angularObjectRegistry[varName].interpreterGroupId
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
$rootScope.angularObjectRegistry[varName] = {
|
||||
interpreterGroupId : data.interpreterGroupId,
|
||||
clearWatcher : clearWatcher
|
||||
};
|
||||
}
|
||||
scope[varName] = data.angularObject.object;
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
var isFunction = function(functionToCheck) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue