mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Update zeppelin-server and zeppelin-zengine
This commit is contained in:
parent
8b13c1e117
commit
f35fe8e9e0
5 changed files with 55 additions and 30 deletions
|
|
@ -602,6 +602,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
private void angularObjectUpdated(NotebookSocket conn, Notebook notebook,
|
||||
Message fromMessage) {
|
||||
String noteId = (String) fromMessage.get("noteId");
|
||||
String paragraphId = (String) fromMessage.get("paragraphId");
|
||||
String interpreterGroupId = (String) fromMessage.get("interpreterGroupId");
|
||||
String varName = (String) fromMessage.get("name");
|
||||
Object varValue = fromMessage.get("value");
|
||||
|
|
@ -620,19 +621,26 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
AngularObjectRegistry angularObjectRegistry = setting
|
||||
.getInterpreterGroup().getAngularObjectRegistry();
|
||||
// first trying to get local registry
|
||||
ao = angularObjectRegistry.get(varName, noteId);
|
||||
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
|
||||
if (ao == null) {
|
||||
// then try global registry
|
||||
ao = angularObjectRegistry.get(varName, null);
|
||||
// then try notebook scope registry
|
||||
ao = angularObjectRegistry.get(varName, noteId, null);
|
||||
if (ao == null) {
|
||||
LOG.warn("Object {} is not binded", varName);
|
||||
// then try global scope registry
|
||||
ao = angularObjectRegistry.get(varName, null, null);
|
||||
if (ao == null) {
|
||||
LOG.warn("Object {} is not binded", varName);
|
||||
} else {
|
||||
// path from client -> server
|
||||
ao.set(varValue, false);
|
||||
global = true;
|
||||
}
|
||||
} else {
|
||||
// path from client -> server
|
||||
ao.set(varValue, false);
|
||||
global = true;
|
||||
global = false;
|
||||
}
|
||||
} else {
|
||||
// path from client -> server
|
||||
ao.set(varValue, false);
|
||||
global = false;
|
||||
}
|
||||
|
|
@ -657,7 +665,8 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
n.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
|
||||
.put("interpreterGroupId", interpreterGroupId)
|
||||
.put("noteId", n.id()),
|
||||
.put("noteId", n.id())
|
||||
.put("paragraphId", ao.getParagraphId()),
|
||||
conn);
|
||||
}
|
||||
}
|
||||
|
|
@ -667,7 +676,8 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
note.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
|
||||
.put("interpreterGroupId", interpreterGroupId)
|
||||
.put("noteId", note.id()),
|
||||
.put("noteId", note.id())
|
||||
.put("paragraphId", ao.getParagraphId()),
|
||||
conn);
|
||||
}
|
||||
}
|
||||
|
|
@ -814,7 +824,9 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
.put("angularObject", object)
|
||||
.put("interpreterGroupId",
|
||||
intpSetting.getInterpreterGroup().getId())
|
||||
.put("noteId", note.id())));
|
||||
.put("noteId", note.id())
|
||||
.put("paragraphId", object.getParagraphId())
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -848,14 +860,15 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
.put("angularObject", object)
|
||||
.put("interpreterGroupId", interpreterGroupId)
|
||||
.put("noteId", note.id()));
|
||||
.put("noteId", note.id())
|
||||
.put("paragraphId", object.getParagraphId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemove(String interpreterGroupId, String name, String noteId) {
|
||||
public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
|
||||
Notebook notebook = notebook();
|
||||
List<Note> notes = notebook.getAllNotes();
|
||||
for (Note note : notes) {
|
||||
|
|
@ -869,7 +882,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
broadcast(
|
||||
note.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
|
||||
"noteId", noteId));
|
||||
"noteId", noteId).put("paragraphId", paragraphId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,8 +45,6 @@ import static org.mockito.Mockito.*;
|
|||
* BASIC Zeppelin rest api tests
|
||||
*/
|
||||
public class NotebookServerTest extends AbstractTestRestApi {
|
||||
|
||||
|
||||
private static Notebook notebook;
|
||||
private static NotebookServer notebookServer;
|
||||
private static Gson gson;
|
||||
|
|
@ -97,7 +95,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
|
|||
}
|
||||
|
||||
// add angularObject
|
||||
interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId());
|
||||
interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null);
|
||||
|
||||
// create two sockets and open it
|
||||
NotebookSocket sock1 = createWebSocket();
|
||||
|
|
|
|||
|
|
@ -145,7 +145,6 @@ public class Note implements Serializable, JobListener {
|
|||
/**
|
||||
* Add paragraph last.
|
||||
*
|
||||
* @param p
|
||||
*/
|
||||
public Paragraph addParagraph() {
|
||||
Paragraph p = new Paragraph(this, this, replLoader);
|
||||
|
|
@ -187,7 +186,6 @@ public class Note implements Serializable, JobListener {
|
|||
* Insert paragraph in given index.
|
||||
*
|
||||
* @param index
|
||||
* @param p
|
||||
*/
|
||||
public Paragraph insertParagraph(int index) {
|
||||
Paragraph p = new Paragraph(this, this, replLoader);
|
||||
|
|
@ -215,6 +213,8 @@ public class Note implements Serializable, JobListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
removeAllAngularObjectInParagraph(paragraphId);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -339,8 +339,6 @@ public class Note implements Serializable, JobListener {
|
|||
|
||||
/**
|
||||
* Run all paragraphs sequentially.
|
||||
*
|
||||
* @param jobListener
|
||||
*/
|
||||
public void runAll() {
|
||||
synchronized (paragraphs) {
|
||||
|
|
@ -399,6 +397,21 @@ public class Note implements Serializable, JobListener {
|
|||
}
|
||||
}
|
||||
|
||||
private void removeAllAngularObjectInParagraph(String paragraphId) {
|
||||
angularObjects = new HashMap<String, List<AngularObject>>();
|
||||
|
||||
List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup();
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
registry.removeAll(id, paragraphId);
|
||||
}
|
||||
}
|
||||
|
||||
public void persist() throws IOException {
|
||||
snapshotAngularObjectRegistry();
|
||||
index.updateIndexDoc(this);
|
||||
|
|
|
|||
|
|
@ -228,9 +228,9 @@ public class Notebook {
|
|||
for (InterpreterSetting settings : replFactory.get()) {
|
||||
AngularObjectRegistry registry = settings.getInterpreterGroup().getAngularObjectRegistry();
|
||||
if (registry instanceof RemoteAngularObjectRegistry) {
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id);
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
|
||||
} else {
|
||||
registry.removeAll(id);
|
||||
registry.removeAll(id, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -304,12 +304,13 @@ public class Notebook {
|
|||
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
String noteId = snapshot.getAngularObject().getNoteId();
|
||||
String paragraphId = snapshot.getAngularObject().getParagraphId();
|
||||
// at this point, remote interpreter process is not created.
|
||||
// so does not make sense add it to the remote.
|
||||
//
|
||||
// therefore instead of addAndNotifyRemoteProcess(), need to use add()
|
||||
// that results add angularObject only in ZeppelinServer side not remoteProcessSide
|
||||
registry.add(name, snapshot.getAngularObject().get(), noteId);
|
||||
registry.add(name, snapshot.getAngularObject().get(), noteId, paragraphId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -319,17 +319,17 @@ public class NotebookTest implements JobListenerFactory{
|
|||
.getAngularObjectRegistry();
|
||||
|
||||
// add local scope object
|
||||
registry.add("o1", "object1", note.id());
|
||||
registry.add("o1", "object1", note.id(), null);
|
||||
// add global scope object
|
||||
registry.add("o2", "object2", null);
|
||||
registry.add("o2", "object2", null, null);
|
||||
|
||||
// remove notebook
|
||||
notebook.removeNote(note.id());
|
||||
|
||||
// local object should be removed
|
||||
assertNull(registry.get("o1", note.id()));
|
||||
assertNull(registry.get("o1", note.id(), null));
|
||||
// global object sould be remained
|
||||
assertNotNull(registry.get("o2", null));
|
||||
assertNotNull(registry.get("o2", null, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -344,9 +344,9 @@ public class NotebookTest implements JobListenerFactory{
|
|||
.getAngularObjectRegistry();
|
||||
|
||||
// add local scope object
|
||||
registry.add("o1", "object1", note.id());
|
||||
registry.add("o1", "object1", note.id(), null);
|
||||
// add global scope object
|
||||
registry.add("o2", "object2", null);
|
||||
registry.add("o2", "object2", null, null);
|
||||
|
||||
// restart interpreter
|
||||
factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id());
|
||||
|
|
@ -355,8 +355,8 @@ public class NotebookTest implements JobListenerFactory{
|
|||
.getAngularObjectRegistry();
|
||||
|
||||
// local and global scope object should be removed
|
||||
assertNull(registry.get("o1", note.id()));
|
||||
assertNull(registry.get("o2", null));
|
||||
assertNull(registry.get("o1", note.id(), null));
|
||||
assertNull(registry.get("o2", null, null));
|
||||
notebook.removeNote(note.id());
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue