multicast fine grained note lists to users instead of broadcast

This commit is contained in:
Khalid Huseynov 2016-09-20 19:04:25 +09:00
parent 6614e2bb52
commit 9427e6260a
2 changed files with 63 additions and 6 deletions

View file

@ -59,6 +59,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@ -85,6 +86,8 @@ public class NotebookServer extends WebSocketServlet implements
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets =
new ConcurrentHashMap<String, Queue<NotebookSocket>>();
private Notebook notebook() {
return ZeppelinServer.notebook;
@ -160,6 +163,9 @@ public class NotebookServer extends WebSocketServlet implements
userAndRoles.addAll(roles);
}
}
if (StringUtils.isEmpty(conn.getUser())) {
addUserConnection(messagereceived.principal, conn);
}
AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);
/** Lets be elegant here */
@ -168,8 +174,7 @@ public class NotebookServer extends WebSocketServlet implements
unicastNoteList(conn, subject);
break;
case RELOAD_NOTES_FROM_REPO:
//broadcastReloadedNoteList(subject);
unicastNoteList(conn, subject);
broadcastReloadedNoteList(subject);
break;
case GET_HOME_NOTE:
sendHomeNote(conn, userAndRoles, notebook, messagereceived);
@ -265,6 +270,26 @@ public class NotebookServer extends WebSocketServlet implements
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
removeConnectionFromAllNote(conn);
connectedSockets.remove(conn);
removeUserConnection(conn.getUser(), conn);
}
private void removeUserConnection(String user, NotebookSocket conn) {
if (userConnectedSockets.containsKey(user)) {
userConnectedSockets.get(user).remove(conn);
} else {
LOG.warn("Closing connection that is absent in user connections");
}
}
private void addUserConnection(String user, NotebookSocket conn) {
conn.setUser(user);
if (userConnectedSockets.containsKey(user)) {
userConnectedSockets.get(user).add(conn);
} else {
Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>();
socketQueue.add(conn);
userConnectedSockets.put(user, socketQueue);
}
}
protected Message deserializeMessage(String msg) {
@ -380,8 +405,12 @@ public class NotebookServer extends WebSocketServlet implements
}
}
private void broadcastAll(Message m) {
for (NotebookSocket conn : connectedSockets) {
private void multicastToUser(String user, Message m) {
if (!userConnectedSockets.containsKey(user)) {
LOG.warn("Broadcasting to user that is not in connections map");
return;
}
for (NotebookSocket conn: userConnectedSockets.get(user)) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
@ -502,8 +531,17 @@ public class NotebookServer extends WebSocketServlet implements
}
public void broadcastNoteList(AuthenticationInfo subject) {
//send first to requesting user
List<Map<String, String>> notesInfo = generateNotebooksInfo(false, subject);
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
//to others afterwards
for (String user: userConnectedSockets.keySet()) {
if (subject.getUser() == user) {
continue;
}
notesInfo = generateNotebooksInfo(false, new AuthenticationInfo(user));
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
}
}
public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) {
@ -512,8 +550,17 @@ public class NotebookServer extends WebSocketServlet implements
}
public void broadcastReloadedNoteList(AuthenticationInfo subject) {
//send first to requesting user
List<Map<String, String>> notesInfo = generateNotebooksInfo(true, subject);
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
//to others afterwards
for (String user: userConnectedSockets.keySet()) {
if (subject.getUser() == user) {
continue;
}
notesInfo = generateNotebooksInfo(true, new AuthenticationInfo(user));
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
}
}
void permissionError(NotebookSocket conn, String op,

View file

@ -20,6 +20,7 @@ import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@ -32,12 +33,14 @@ public class NotebookSocket extends WebSocketAdapter {
private NotebookSocketListener listener;
private HttpServletRequest request;
private String protocol;
private String user;
public NotebookSocket(HttpServletRequest req, String protocol,
NotebookSocketListener listener) {
this.listener = listener;
this.request = req;
this.protocol = protocol;
this.user = StringUtils.EMPTY;
}
@Override
@ -69,4 +72,11 @@ public class NotebookSocket extends WebSocketAdapter {
connection.getRemote().sendString(serializeMessage);
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
}