Light refactoring :: add missing header, add comment, refacto some methods

This commit is contained in:
Anthony Corbacho 2016-11-03 17:17:12 +09:00
parent 8f7e1b3134
commit 092791e4ab
3 changed files with 50 additions and 13 deletions

View file

@ -106,9 +106,12 @@ public class NotebookServer extends WebSocketServlet implements
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
// This is a special endpoint in the notebook websoket, Every connection in this Queue
// will be able to watch every websocket event, it doesnt need to be listed into the map of
// noteSocketMap.
/**
* This is a special endpoint in the notebook websoket, Every connection in this Queue
* will be able to watch every websocket event, it doesnt need to be listed into the map of
* noteSocketMap. This can be used to get information about websocket traffic and watch what
* is going on.
*/
final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
private Notebook notebook() {
@ -1789,19 +1792,16 @@ public class NotebookServer extends WebSocketServlet implements
private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived)
throws IOException {
// TODO(anthony): add header check. for security.
if (!isSessionAllowedToSwitchToWatcher(conn)) {
LOG.error("Cannot switch this client to watcher, invalid security key");
return;
}
LOG.info("Going to add {} to watcher socket", conn);
// add the connection to the watcher.
if (watcherSockets.contains(conn)) {
LOG.info("connection alrerady present in the watcher");
return;
}
String watcherSecurityKey = conn.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
if (StringUtils.isBlank(watcherSecurityKey) ||
!watcherSecurityKey.equals(WatcherSecurityKey.getKey())) {
LOG.error("Cannot switch this client to watcher, invalid security key");
return;
}
watcherSockets.add(conn);
// remove this connection from regular zeppelin ws usage.
@ -1810,6 +1810,12 @@ public class NotebookServer extends WebSocketServlet implements
removeUserConnection(conn.getUser(), conn);
}
private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
return !(StringUtils.isBlank(watcherSecurityKey)
|| !watcherSecurityKey.equals(WatcherSecurityKey.getKey()));
}
private void broadcastToWatchers(String noteId, String subject, Message message) {
synchronized (watcherSockets) {
if (watcherSockets.isEmpty()) {

View file

@ -127,6 +127,7 @@ public class ZeppelinClient {
public void stop() {
try {
if (wsClient != null) {
removeAllConnections();
wsClient.stop();
} else {
LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
@ -288,10 +289,25 @@ public class ZeppelinClient {
}
LOG.info("Removed note websocket connection for note {}", noteId);
}
private void removeAllConnections() {
if (watcherSession != null && watcherSession.isOpen()) {
watcherSession.close();
}
Session noteSession = null;
for (Map.Entry<String, Session> note: notesConnection.entrySet()) {
noteSession = note.getValue();
if(isSessionOpen(noteSession)) {
noteSession.close();
}
}
notesConnection.clear();
}
public void ping() {
if (watcherSession == null) {
LOG.info("Cannot send PING event, watcher is null");
LOG.info("Cannot send PING event, no watcher found");
return;
}
watcherSession.getRemote().sendStringByFuture(serialize(new Message(OP.PING)));

View file

@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
import org.apache.commons.lang.StringUtils;
@ -13,7 +29,7 @@ import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
/**
*
* Zeppelin Watcher that will forward user note to ZeppelinHub.
*
*/
public class WatcherWebsocket implements WebSocketListener {
@ -48,7 +64,6 @@ public class WatcherWebsocket implements WebSocketListener {
@Override
public void onWebSocketText(String message) {
LOG.debug("WatcherWebsocket client received Message: " + message);
WatcherMessage watcherMsg = GSON.fromJson(message, WatcherMessage.class);
if (StringUtils.isBlank(watcherMsg.noteId)) {
return;