When user run a notebook, check if the websocket session exist, open it if close

This commit is contained in:
Anthony Corbacho 2016-05-12 18:45:57 +09:00 committed by Khalid Huseynov
parent 36176eaf12
commit 32497dc988

View file

@ -135,20 +135,42 @@ public class ZeppelinClient {
private boolean isSessionOpen(Session session) {
return (session != null) && (session.isOpen());
}
/* per notebook based ws connection, returns null if can't connect */
public Session getZeppelinConnection(String noteId) {
if (StringUtils.isBlank(noteId)) {
LOG.warn("Cannot return websocket connection for blank noteId");
return null;
}
// return existing connection
if (zeppelinConnectionMap.containsKey(noteId)) {
LOG.info("Connection for {} exists in map", noteId);
return zeppelinConnectionMap.get(noteId);
return getNoteSession(noteId);
}
//TODO(khalid): clean log later
LOG.info("Creating Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
return openNoteSession(noteId);
}
// create connection
private Message zeppelinGetNoteMsg(String noteId) {
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("id", noteId);
getNoteMsg.data = data;
return getNoteMsg;
}
private Session getNoteSession(String noteId) {
Session session = zeppelinConnectionMap.get(noteId);
if (session == null || !session.isOpen()) {
LOG.info("Not connection to {}", noteId);
zeppelinConnectionMap.remove(noteId);
session = openNoteSession(noteId);
}
return session;
}
private Session openNoteSession(String noteId) {
ClientUpgradeRequest request = new ClientUpgradeRequest();
ZeppelinWebsocket socket = new ZeppelinWebsocket(noteId);
Future<Session> future = null;
@ -158,7 +180,7 @@ public class ZeppelinClient {
session = future.get();
} catch (IOException | InterruptedException | ExecutionException e) {
LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
return null;
return session;
}
if (zeppelinConnectionMap.containsKey(noteId)) {
@ -166,23 +188,13 @@ public class ZeppelinClient {
session = zeppelinConnectionMap.get(noteId);
} else {
String getNote = serialize(zeppelinGetNoteMsg(noteId));
//TODO(khalid): may need to check return whether successful
// TODO(khalid): may need to check return whether successful
session.getRemote().sendStringByFuture(getNote);
zeppelinConnectionMap.put(noteId, session);
}
//TODO(khalid): clean log later
LOG.info("Create Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
return session;
}
private Message zeppelinGetNoteMsg(String noteId) {
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("id", noteId);
getNoteMsg.data = data;
return getNoteMsg;
}
public void handleMsgFromZeppelin(String message, String noteId) {
Map<String, String> meta = new HashMap<String, String>();
meta.put("token", zeppelinhubToken);