mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Added new WS queue called watcher, watcher will be abler to listen to almost every note action performed in zeppelin notebook websocket server
This commit is contained in:
parent
45849cec62
commit
0d7f493657
1 changed files with 62 additions and 12 deletions
|
|
@ -48,10 +48,17 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.notebook.*;
|
||||
import org.apache.zeppelin.notebook.JobListenerFactory;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.NotebookAuthorization;
|
||||
import org.apache.zeppelin.notebook.NotebookEventListener;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.notebook.ParagraphJobListener;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.apache.zeppelin.notebook.socket.Message.OP;
|
||||
import org.apache.zeppelin.notebook.socket.WatcherMessage;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.server.ZeppelinServer;
|
||||
|
|
@ -67,6 +74,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Queues;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
|
@ -97,6 +105,11 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
|
||||
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
|
||||
|
||||
// This is a special endpoint in the notebook websoket, Every connection in this Queue
|
||||
// will be able to watch every websocket event, it doesnt need to be listed into the map of
|
||||
// noteSocketMap.
|
||||
final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
|
||||
|
||||
private Notebook notebook() {
|
||||
return ZeppelinServer.notebook;
|
||||
}
|
||||
|
|
@ -275,6 +288,9 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
case GET_INTERPRETER_SETTINGS:
|
||||
getInterpreterSettings(conn, subject);
|
||||
break;
|
||||
case WATCHER:
|
||||
switchConnectionToWatcher(conn, messagereceived);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
@ -382,6 +398,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
for (String id : ids) {
|
||||
if (id.equals(interpreterGroupId)) {
|
||||
broadcast(note.getId(), m);
|
||||
broadcastToWatchers(note.getId(), StringUtils.EMPTY, m);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -394,6 +411,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
return;
|
||||
}
|
||||
LOG.debug("SEND >> " + m.op);
|
||||
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
|
||||
for (NotebookSocket conn : socketLists) {
|
||||
try {
|
||||
conn.send(serializeMessage(m));
|
||||
|
|
@ -411,6 +429,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
return;
|
||||
}
|
||||
LOG.debug("SEND >> " + m.op);
|
||||
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
|
||||
for (NotebookSocket conn : socketLists) {
|
||||
if (exclude.equals(conn)) {
|
||||
continue;
|
||||
|
|
@ -431,11 +450,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
}
|
||||
|
||||
for (NotebookSocket conn: userConnectedSockets.get(user)) {
|
||||
try {
|
||||
conn.send(serializeMessage(m));
|
||||
} catch (IOException e) {
|
||||
LOG.error("socket error", e);
|
||||
}
|
||||
unicast(m, conn);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -445,6 +460,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
} catch (IOException e) {
|
||||
LOG.error("socket error", e);
|
||||
}
|
||||
broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
|
||||
}
|
||||
|
||||
public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
|
||||
|
|
@ -543,12 +559,11 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
|
||||
public void broadcastNote(Note note) {
|
||||
broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
|
||||
broadcastToWatchers(note.getId(), "", new Message(OP.NOTE).put("note", note));
|
||||
}
|
||||
|
||||
public void broadcastInterpreterBindings(String noteId,
|
||||
List settingList) {
|
||||
broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS)
|
||||
.put("interpreterBindings", settingList));
|
||||
public void broadcastInterpreterBindings(String noteId, List settingList) {
|
||||
broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS).put("interpreterBindings", settingList));
|
||||
}
|
||||
|
||||
public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
|
||||
|
|
@ -1770,6 +1785,41 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
.get(settingId);
|
||||
interpreterSetting.setInfos(metaInfos);
|
||||
}
|
||||
|
||||
|
||||
private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived)
|
||||
throws IOException {
|
||||
// TODO(anthony): add header check. for security.
|
||||
LOG.info("Going to add {} to watcher socket", conn);
|
||||
// add the connection to the watcher.
|
||||
if (watcherSockets.contains(conn)) {
|
||||
LOG.info("connection alrerady present in the watcher");
|
||||
return;
|
||||
}
|
||||
watcherSockets.add(conn);
|
||||
|
||||
// remove this connection from regular zeppelin ws usage.
|
||||
removeConnectionFromAllNote(conn);
|
||||
connectedSockets.remove(conn);
|
||||
removeUserConnection(conn.getUser(), conn);
|
||||
}
|
||||
|
||||
private void broadcastToWatchers(String noteId, String subject, Message message) {
|
||||
synchronized (watcherSockets) {
|
||||
if (watcherSockets.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (NotebookSocket watcher : watcherSockets) {
|
||||
try {
|
||||
watcher.send(WatcherMessage
|
||||
.builder(noteId)
|
||||
.subject(subject)
|
||||
.message(serializeMessage(message))
|
||||
.build()
|
||||
.serialize());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot broadcast message to watcher", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue