mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
multicast fine grained note lists to users instead of broadcast
This commit is contained in:
parent
6614e2bb52
commit
9427e6260a
2 changed files with 63 additions and 6 deletions
|
|
@ -59,6 +59,7 @@ import java.io.IOException;
|
|||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
/**
|
||||
|
|
@ -85,6 +86,8 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create();
|
||||
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
|
||||
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
|
||||
final Map<String, Queue<NotebookSocket>> userConnectedSockets =
|
||||
new ConcurrentHashMap<String, Queue<NotebookSocket>>();
|
||||
|
||||
private Notebook notebook() {
|
||||
return ZeppelinServer.notebook;
|
||||
|
|
@ -160,6 +163,9 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
userAndRoles.addAll(roles);
|
||||
}
|
||||
}
|
||||
if (StringUtils.isEmpty(conn.getUser())) {
|
||||
addUserConnection(messagereceived.principal, conn);
|
||||
}
|
||||
AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);
|
||||
|
||||
/** Lets be elegant here */
|
||||
|
|
@ -168,8 +174,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
unicastNoteList(conn, subject);
|
||||
break;
|
||||
case RELOAD_NOTES_FROM_REPO:
|
||||
//broadcastReloadedNoteList(subject);
|
||||
unicastNoteList(conn, subject);
|
||||
broadcastReloadedNoteList(subject);
|
||||
break;
|
||||
case GET_HOME_NOTE:
|
||||
sendHomeNote(conn, userAndRoles, notebook, messagereceived);
|
||||
|
|
@ -265,6 +270,26 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
|
||||
removeConnectionFromAllNote(conn);
|
||||
connectedSockets.remove(conn);
|
||||
removeUserConnection(conn.getUser(), conn);
|
||||
}
|
||||
|
||||
private void removeUserConnection(String user, NotebookSocket conn) {
|
||||
if (userConnectedSockets.containsKey(user)) {
|
||||
userConnectedSockets.get(user).remove(conn);
|
||||
} else {
|
||||
LOG.warn("Closing connection that is absent in user connections");
|
||||
}
|
||||
}
|
||||
|
||||
private void addUserConnection(String user, NotebookSocket conn) {
|
||||
conn.setUser(user);
|
||||
if (userConnectedSockets.containsKey(user)) {
|
||||
userConnectedSockets.get(user).add(conn);
|
||||
} else {
|
||||
Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>();
|
||||
socketQueue.add(conn);
|
||||
userConnectedSockets.put(user, socketQueue);
|
||||
}
|
||||
}
|
||||
|
||||
protected Message deserializeMessage(String msg) {
|
||||
|
|
@ -380,8 +405,12 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
}
|
||||
}
|
||||
|
||||
private void broadcastAll(Message m) {
|
||||
for (NotebookSocket conn : connectedSockets) {
|
||||
private void multicastToUser(String user, Message m) {
|
||||
if (!userConnectedSockets.containsKey(user)) {
|
||||
LOG.warn("Broadcasting to user that is not in connections map");
|
||||
return;
|
||||
}
|
||||
for (NotebookSocket conn: userConnectedSockets.get(user)) {
|
||||
try {
|
||||
conn.send(serializeMessage(m));
|
||||
} catch (IOException e) {
|
||||
|
|
@ -502,8 +531,17 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
}
|
||||
|
||||
public void broadcastNoteList(AuthenticationInfo subject) {
|
||||
//send first to requesting user
|
||||
List<Map<String, String>> notesInfo = generateNotebooksInfo(false, subject);
|
||||
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
|
||||
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
|
||||
//to others afterwards
|
||||
for (String user: userConnectedSockets.keySet()) {
|
||||
if (subject.getUser() == user) {
|
||||
continue;
|
||||
}
|
||||
notesInfo = generateNotebooksInfo(false, new AuthenticationInfo(user));
|
||||
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
|
||||
}
|
||||
}
|
||||
|
||||
public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) {
|
||||
|
|
@ -512,8 +550,17 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
}
|
||||
|
||||
public void broadcastReloadedNoteList(AuthenticationInfo subject) {
|
||||
//send first to requesting user
|
||||
List<Map<String, String>> notesInfo = generateNotebooksInfo(true, subject);
|
||||
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
|
||||
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
|
||||
//to others afterwards
|
||||
for (String user: userConnectedSockets.keySet()) {
|
||||
if (subject.getUser() == user) {
|
||||
continue;
|
||||
}
|
||||
notesInfo = generateNotebooksInfo(true, new AuthenticationInfo(user));
|
||||
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
|
||||
}
|
||||
}
|
||||
|
||||
void permissionError(NotebookSocket conn, String op,
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
|||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
||||
|
||||
|
|
@ -32,12 +33,14 @@ public class NotebookSocket extends WebSocketAdapter {
|
|||
private NotebookSocketListener listener;
|
||||
private HttpServletRequest request;
|
||||
private String protocol;
|
||||
private String user;
|
||||
|
||||
public NotebookSocket(HttpServletRequest req, String protocol,
|
||||
NotebookSocketListener listener) {
|
||||
this.listener = listener;
|
||||
this.request = req;
|
||||
this.protocol = protocol;
|
||||
this.user = StringUtils.EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -69,4 +72,11 @@ public class NotebookSocket extends WebSocketAdapter {
|
|||
connection.getRemote().sendString(serializeMessage);
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
public void setUser(String user) {
|
||||
this.user = user;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue