Update zeppelin-server and zeppelin-zengine

This commit is contained in:
Lee moon soo 2016-01-01 00:36:41 -08:00
parent 8b13c1e117
commit f35fe8e9e0
5 changed files with 55 additions and 30 deletions

View file

@ -602,6 +602,7 @@ public class NotebookServer extends WebSocketServlet implements
private void angularObjectUpdated(NotebookSocket conn, Notebook notebook,
Message fromMessage) {
String noteId = (String) fromMessage.get("noteId");
String paragraphId = (String) fromMessage.get("paragraphId");
String interpreterGroupId = (String) fromMessage.get("interpreterGroupId");
String varName = (String) fromMessage.get("name");
Object varValue = fromMessage.get("value");
@ -620,19 +621,26 @@ public class NotebookServer extends WebSocketServlet implements
AngularObjectRegistry angularObjectRegistry = setting
.getInterpreterGroup().getAngularObjectRegistry();
// first trying to get local registry
ao = angularObjectRegistry.get(varName, noteId);
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
if (ao == null) {
// then try global registry
ao = angularObjectRegistry.get(varName, null);
// then try notebook scope registry
ao = angularObjectRegistry.get(varName, noteId, null);
if (ao == null) {
LOG.warn("Object {} is not binded", varName);
// then try global scope registry
ao = angularObjectRegistry.get(varName, null, null);
if (ao == null) {
LOG.warn("Object {} is not binded", varName);
} else {
// path from client -> server
ao.set(varValue, false);
global = true;
}
} else {
// path from client -> server
ao.set(varValue, false);
global = true;
global = false;
}
} else {
// path from client -> server
ao.set(varValue, false);
global = false;
}
@ -657,7 +665,8 @@ public class NotebookServer extends WebSocketServlet implements
n.id(),
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", n.id()),
.put("noteId", n.id())
.put("paragraphId", ao.getParagraphId()),
conn);
}
}
@ -667,7 +676,8 @@ public class NotebookServer extends WebSocketServlet implements
note.id(),
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", note.id()),
.put("noteId", note.id())
.put("paragraphId", ao.getParagraphId()),
conn);
}
}
@ -814,7 +824,9 @@ public class NotebookServer extends WebSocketServlet implements
.put("angularObject", object)
.put("interpreterGroupId",
intpSetting.getInterpreterGroup().getId())
.put("noteId", note.id())));
.put("noteId", note.id())
.put("paragraphId", object.getParagraphId())
));
}
}
}
@ -848,14 +860,15 @@ public class NotebookServer extends WebSocketServlet implements
new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", object)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", note.id()));
.put("noteId", note.id())
.put("paragraphId", object.getParagraphId()));
}
}
}
}
@Override
public void onRemove(String interpreterGroupId, String name, String noteId) {
public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
Notebook notebook = notebook();
List<Note> notes = notebook.getAllNotes();
for (Note note : notes) {
@ -869,7 +882,7 @@ public class NotebookServer extends WebSocketServlet implements
broadcast(
note.id(),
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
"noteId", noteId));
"noteId", noteId).put("paragraphId", paragraphId));
}
}
}

View file

@ -45,8 +45,6 @@ import static org.mockito.Mockito.*;
* BASIC Zeppelin rest api tests
*/
public class NotebookServerTest extends AbstractTestRestApi {
private static Notebook notebook;
private static NotebookServer notebookServer;
private static Gson gson;
@ -97,7 +95,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
}
// add angularObject
interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId());
interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null);
// create two sockets and open it
NotebookSocket sock1 = createWebSocket();

View file

@ -145,7 +145,6 @@ public class Note implements Serializable, JobListener {
/**
* Add paragraph last.
*
* @param p
*/
public Paragraph addParagraph() {
Paragraph p = new Paragraph(this, this, replLoader);
@ -187,7 +186,6 @@ public class Note implements Serializable, JobListener {
* Insert paragraph in given index.
*
* @param index
* @param p
*/
public Paragraph insertParagraph(int index) {
Paragraph p = new Paragraph(this, this, replLoader);
@ -215,6 +213,8 @@ public class Note implements Serializable, JobListener {
}
}
}
removeAllAngularObjectInParagraph(paragraphId);
return null;
}
@ -339,8 +339,6 @@ public class Note implements Serializable, JobListener {
/**
* Run all paragraphs sequentially.
*
* @param jobListener
*/
public void runAll() {
synchronized (paragraphs) {
@ -399,6 +397,21 @@ public class Note implements Serializable, JobListener {
}
}
private void removeAllAngularObjectInParagraph(String paragraphId) {
angularObjects = new HashMap<String, List<AngularObject>>();
List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
if (settings == null || settings.size() == 0) {
return;
}
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getInterpreterGroup();
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
registry.removeAll(id, paragraphId);
}
}
public void persist() throws IOException {
snapshotAngularObjectRegistry();
index.updateIndexDoc(this);

View file

@ -228,9 +228,9 @@ public class Notebook {
for (InterpreterSetting settings : replFactory.get()) {
AngularObjectRegistry registry = settings.getInterpreterGroup().getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id);
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
} else {
registry.removeAll(id);
registry.removeAll(id, null);
}
}
@ -304,12 +304,13 @@ public class Notebook {
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
String noteId = snapshot.getAngularObject().getNoteId();
String paragraphId = snapshot.getAngularObject().getParagraphId();
// at this point, remote interpreter process is not created.
// so does not make sense add it to the remote.
//
// therefore instead of addAndNotifyRemoteProcess(), need to use add()
// that results add angularObject only in ZeppelinServer side not remoteProcessSide
registry.add(name, snapshot.getAngularObject().get(), noteId);
registry.add(name, snapshot.getAngularObject().get(), noteId, paragraphId);
}
}
}

View file

@ -319,17 +319,17 @@ public class NotebookTest implements JobListenerFactory{
.getAngularObjectRegistry();
// add local scope object
registry.add("o1", "object1", note.id());
registry.add("o1", "object1", note.id(), null);
// add global scope object
registry.add("o2", "object2", null);
registry.add("o2", "object2", null, null);
// remove notebook
notebook.removeNote(note.id());
// local object should be removed
assertNull(registry.get("o1", note.id()));
assertNull(registry.get("o1", note.id(), null));
// global object sould be remained
assertNotNull(registry.get("o2", null));
assertNotNull(registry.get("o2", null, null));
}
@Test
@ -344,9 +344,9 @@ public class NotebookTest implements JobListenerFactory{
.getAngularObjectRegistry();
// add local scope object
registry.add("o1", "object1", note.id());
registry.add("o1", "object1", note.id(), null);
// add global scope object
registry.add("o2", "object2", null);
registry.add("o2", "object2", null, null);
// restart interpreter
factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id());
@ -355,8 +355,8 @@ public class NotebookTest implements JobListenerFactory{
.getAngularObjectRegistry();
// local and global scope object should be removed
assertNull(registry.get("o1", note.id()));
assertNull(registry.get("o2", null));
assertNull(registry.get("o1", note.id(), null));
assertNull(registry.get("o2", null, null));
notebook.removeNote(note.id());
}