Added new WS queue called watcher, watcher will be abler to listen to almost every note action performed in zeppelin notebook websocket server

This commit is contained in:
Anthony Corbacho 2016-10-27 14:21:09 +09:00
parent 45849cec62
commit 0d7f493657

View file

@ -48,10 +48,17 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.NotebookEventListener;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.ParagraphJobListener;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.notebook.socket.WatcherMessage;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
@ -67,6 +74,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@ -97,6 +105,11 @@ public class NotebookServer extends WebSocketServlet implements
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
// This is a special endpoint in the notebook websoket, Every connection in this Queue
// will be able to watch every websocket event, it doesnt need to be listed into the map of
// noteSocketMap.
final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
private Notebook notebook() {
return ZeppelinServer.notebook;
}
@ -275,6 +288,9 @@ public class NotebookServer extends WebSocketServlet implements
case GET_INTERPRETER_SETTINGS:
getInterpreterSettings(conn, subject);
break;
case WATCHER:
switchConnectionToWatcher(conn, messagereceived);
break;
default:
break;
}
@ -382,6 +398,7 @@ public class NotebookServer extends WebSocketServlet implements
for (String id : ids) {
if (id.equals(interpreterGroupId)) {
broadcast(note.getId(), m);
broadcastToWatchers(note.getId(), StringUtils.EMPTY, m);
}
}
}
@ -394,6 +411,7 @@ public class NotebookServer extends WebSocketServlet implements
return;
}
LOG.debug("SEND >> " + m.op);
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
for (NotebookSocket conn : socketLists) {
try {
conn.send(serializeMessage(m));
@ -411,6 +429,7 @@ public class NotebookServer extends WebSocketServlet implements
return;
}
LOG.debug("SEND >> " + m.op);
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
for (NotebookSocket conn : socketLists) {
if (exclude.equals(conn)) {
continue;
@ -431,11 +450,7 @@ public class NotebookServer extends WebSocketServlet implements
}
for (NotebookSocket conn: userConnectedSockets.get(user)) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
LOG.error("socket error", e);
}
unicast(m, conn);
}
}
@ -445,6 +460,7 @@ public class NotebookServer extends WebSocketServlet implements
} catch (IOException e) {
LOG.error("socket error", e);
}
broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
}
public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
@ -543,12 +559,11 @@ public class NotebookServer extends WebSocketServlet implements
public void broadcastNote(Note note) {
broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
broadcastToWatchers(note.getId(), "", new Message(OP.NOTE).put("note", note));
}
public void broadcastInterpreterBindings(String noteId,
List settingList) {
broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS)
.put("interpreterBindings", settingList));
public void broadcastInterpreterBindings(String noteId, List settingList) {
broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS).put("interpreterBindings", settingList));
}
public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
@ -1770,6 +1785,41 @@ public class NotebookServer extends WebSocketServlet implements
.get(settingId);
interpreterSetting.setInfos(metaInfos);
}
private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived)
throws IOException {
// TODO(anthony): add header check. for security.
LOG.info("Going to add {} to watcher socket", conn);
// add the connection to the watcher.
if (watcherSockets.contains(conn)) {
LOG.info("connection alrerady present in the watcher");
return;
}
watcherSockets.add(conn);
// remove this connection from regular zeppelin ws usage.
removeConnectionFromAllNote(conn);
connectedSockets.remove(conn);
removeUserConnection(conn.getUser(), conn);
}
private void broadcastToWatchers(String noteId, String subject, Message message) {
synchronized (watcherSockets) {
if (watcherSockets.isEmpty()) {
return;
}
for (NotebookSocket watcher : watcherSockets) {
try {
watcher.send(WatcherMessage
.builder(noteId)
.subject(subject)
.message(serializeMessage(message))
.build()
.serialize());
} catch (IOException e) {
LOG.error("Cannot broadcast message to watcher", e);
}
}
}
}
}