mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
When user run a notebook, check if the websocket session exist, open it if close
This commit is contained in:
parent
36176eaf12
commit
32497dc988
1 changed files with 28 additions and 16 deletions
|
|
@ -135,20 +135,42 @@ public class ZeppelinClient {
|
|||
private boolean isSessionOpen(Session session) {
|
||||
return (session != null) && (session.isOpen());
|
||||
}
|
||||
|
||||
|
||||
/* per notebook based ws connection, returns null if can't connect */
|
||||
public Session getZeppelinConnection(String noteId) {
|
||||
if (StringUtils.isBlank(noteId)) {
|
||||
LOG.warn("Cannot return websocket connection for blank noteId");
|
||||
return null;
|
||||
}
|
||||
// return existing connection
|
||||
|
||||
if (zeppelinConnectionMap.containsKey(noteId)) {
|
||||
LOG.info("Connection for {} exists in map", noteId);
|
||||
return zeppelinConnectionMap.get(noteId);
|
||||
return getNoteSession(noteId);
|
||||
}
|
||||
//TODO(khalid): clean log later
|
||||
LOG.info("Creating Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
|
||||
return openNoteSession(noteId);
|
||||
}
|
||||
|
||||
// create connection
|
||||
private Message zeppelinGetNoteMsg(String noteId) {
|
||||
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
|
||||
HashMap<String, Object> data = new HashMap<String, Object>();
|
||||
data.put("id", noteId);
|
||||
getNoteMsg.data = data;
|
||||
return getNoteMsg;
|
||||
}
|
||||
|
||||
private Session getNoteSession(String noteId) {
|
||||
Session session = zeppelinConnectionMap.get(noteId);
|
||||
if (session == null || !session.isOpen()) {
|
||||
LOG.info("Not connection to {}", noteId);
|
||||
zeppelinConnectionMap.remove(noteId);
|
||||
session = openNoteSession(noteId);
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
private Session openNoteSession(String noteId) {
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
ZeppelinWebsocket socket = new ZeppelinWebsocket(noteId);
|
||||
Future<Session> future = null;
|
||||
|
|
@ -158,7 +180,7 @@ public class ZeppelinClient {
|
|||
session = future.get();
|
||||
} catch (IOException | InterruptedException | ExecutionException e) {
|
||||
LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
|
||||
return null;
|
||||
return session;
|
||||
}
|
||||
|
||||
if (zeppelinConnectionMap.containsKey(noteId)) {
|
||||
|
|
@ -166,23 +188,13 @@ public class ZeppelinClient {
|
|||
session = zeppelinConnectionMap.get(noteId);
|
||||
} else {
|
||||
String getNote = serialize(zeppelinGetNoteMsg(noteId));
|
||||
//TODO(khalid): may need to check return whether successful
|
||||
// TODO(khalid): may need to check return whether successful
|
||||
session.getRemote().sendStringByFuture(getNote);
|
||||
zeppelinConnectionMap.put(noteId, session);
|
||||
}
|
||||
//TODO(khalid): clean log later
|
||||
LOG.info("Create Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
|
||||
return session;
|
||||
}
|
||||
|
||||
private Message zeppelinGetNoteMsg(String noteId) {
|
||||
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
|
||||
HashMap<String, Object> data = new HashMap<String, Object>();
|
||||
data.put("id", noteId);
|
||||
getNoteMsg.data = data;
|
||||
return getNoteMsg;
|
||||
}
|
||||
|
||||
public void handleMsgFromZeppelin(String message, String noteId) {
|
||||
Map<String, String> meta = new HashMap<String, String>();
|
||||
meta.put("token", zeppelinhubToken);
|
||||
|
|
|
|||
Loading…
Reference in a new issue