mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Light refactoring :: add missing header, add comment, refacto some methods
This commit is contained in:
parent
8f7e1b3134
commit
092791e4ab
3 changed files with 50 additions and 13 deletions
|
|
@ -106,9 +106,12 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
|
||||
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
|
||||
|
||||
// This is a special endpoint in the notebook websoket, Every connection in this Queue
|
||||
// will be able to watch every websocket event, it doesnt need to be listed into the map of
|
||||
// noteSocketMap.
|
||||
/**
|
||||
* This is a special endpoint in the notebook websoket, Every connection in this Queue
|
||||
* will be able to watch every websocket event, it doesnt need to be listed into the map of
|
||||
* noteSocketMap. This can be used to get information about websocket traffic and watch what
|
||||
* is going on.
|
||||
*/
|
||||
final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
|
||||
|
||||
private Notebook notebook() {
|
||||
|
|
@ -1789,19 +1792,16 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
|
||||
private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived)
|
||||
throws IOException {
|
||||
// TODO(anthony): add header check. for security.
|
||||
if (!isSessionAllowedToSwitchToWatcher(conn)) {
|
||||
LOG.error("Cannot switch this client to watcher, invalid security key");
|
||||
return;
|
||||
}
|
||||
LOG.info("Going to add {} to watcher socket", conn);
|
||||
// add the connection to the watcher.
|
||||
if (watcherSockets.contains(conn)) {
|
||||
LOG.info("connection alrerady present in the watcher");
|
||||
return;
|
||||
}
|
||||
String watcherSecurityKey = conn.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
|
||||
if (StringUtils.isBlank(watcherSecurityKey) ||
|
||||
!watcherSecurityKey.equals(WatcherSecurityKey.getKey())) {
|
||||
LOG.error("Cannot switch this client to watcher, invalid security key");
|
||||
return;
|
||||
}
|
||||
watcherSockets.add(conn);
|
||||
|
||||
// remove this connection from regular zeppelin ws usage.
|
||||
|
|
@ -1810,6 +1810,12 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
removeUserConnection(conn.getUser(), conn);
|
||||
}
|
||||
|
||||
private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
|
||||
String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
|
||||
return !(StringUtils.isBlank(watcherSecurityKey)
|
||||
|| !watcherSecurityKey.equals(WatcherSecurityKey.getKey()));
|
||||
}
|
||||
|
||||
private void broadcastToWatchers(String noteId, String subject, Message message) {
|
||||
synchronized (watcherSockets) {
|
||||
if (watcherSockets.isEmpty()) {
|
||||
|
|
|
|||
|
|
@ -127,6 +127,7 @@ public class ZeppelinClient {
|
|||
public void stop() {
|
||||
try {
|
||||
if (wsClient != null) {
|
||||
removeAllConnections();
|
||||
wsClient.stop();
|
||||
} else {
|
||||
LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
|
||||
|
|
@ -288,10 +289,25 @@ public class ZeppelinClient {
|
|||
}
|
||||
LOG.info("Removed note websocket connection for note {}", noteId);
|
||||
}
|
||||
|
||||
private void removeAllConnections() {
|
||||
if (watcherSession != null && watcherSession.isOpen()) {
|
||||
watcherSession.close();
|
||||
}
|
||||
|
||||
Session noteSession = null;
|
||||
for (Map.Entry<String, Session> note: notesConnection.entrySet()) {
|
||||
noteSession = note.getValue();
|
||||
if(isSessionOpen(noteSession)) {
|
||||
noteSession.close();
|
||||
}
|
||||
}
|
||||
notesConnection.clear();
|
||||
}
|
||||
|
||||
public void ping() {
|
||||
if (watcherSession == null) {
|
||||
LOG.info("Cannot send PING event, watcher is null");
|
||||
LOG.info("Cannot send PING event, no watcher found");
|
||||
return;
|
||||
}
|
||||
watcherSession.getRemote().sendStringByFuture(serialize(new Message(OP.PING)));
|
||||
|
|
|
|||
|
|
@ -1,3 +1,19 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
|
@ -13,7 +29,7 @@ import org.slf4j.LoggerFactory;
|
|||
import com.google.gson.Gson;
|
||||
|
||||
/**
|
||||
*
|
||||
* Zeppelin Watcher that will forward user note to ZeppelinHub.
|
||||
*
|
||||
*/
|
||||
public class WatcherWebsocket implements WebSocketListener {
|
||||
|
|
@ -48,7 +64,6 @@ public class WatcherWebsocket implements WebSocketListener {
|
|||
|
||||
@Override
|
||||
public void onWebSocketText(String message) {
|
||||
LOG.debug("WatcherWebsocket client received Message: " + message);
|
||||
WatcherMessage watcherMsg = GSON.fromJson(message, WatcherMessage.class);
|
||||
if (StringUtils.isBlank(watcherMsg.noteId)) {
|
||||
return;
|
||||
|
|
|
|||
Loading…
Reference in a new issue