mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Strting to rework ZeppelinClient
This commit is contained in:
parent
e5b3a1d5a6
commit
aa55a5a003
1 changed files with 46 additions and 18 deletions
|
|
@ -20,18 +20,16 @@ import java.io.IOException;
|
|||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.WatcherWebsocket;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHeartbeat;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.apache.zeppelin.notebook.socket.Message.OP;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
|
|
@ -54,7 +52,8 @@ public class ZeppelinClient {
|
|||
private final String zeppelinhubToken;
|
||||
private final WebSocketClient wsClient;
|
||||
private static Gson gson;
|
||||
private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
|
||||
//private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
|
||||
private static Session watcherSession;
|
||||
private static ZeppelinClient instance = null;
|
||||
private SchedulerService schedulerService;
|
||||
private Authentication authModule;
|
||||
|
|
@ -77,12 +76,13 @@ public class ZeppelinClient {
|
|||
zeppelinhubToken = token;
|
||||
wsClient = createNewWebsocketClient();
|
||||
gson = new Gson();
|
||||
zeppelinConnectionMap = new ConcurrentHashMap<>();
|
||||
//zeppelinConnectionMap = new ConcurrentHashMap<>();
|
||||
schedulerService = SchedulerService.getInstance();
|
||||
authModule = Authentication.initialize(token, conf);
|
||||
if (authModule != null) {
|
||||
SchedulerService.getInstance().addOnce(authModule, 10);
|
||||
}
|
||||
//
|
||||
LOG.info("Initialized Zeppelin websocket client on {}", zeppelinWebsocketUrl);
|
||||
}
|
||||
|
||||
|
|
@ -111,12 +111,18 @@ public class ZeppelinClient {
|
|||
|
||||
private void addRoutines() {
|
||||
schedulerService.add(ZeppelinHeartbeat.newInstance(this), 15, 4 * min);
|
||||
new java.util.Timer().schedule(new java.util.TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
openWatcherSession();
|
||||
}
|
||||
}, 5000);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
try {
|
||||
if (wsClient != null) {
|
||||
removeAllZeppelinConnections();
|
||||
//removeAllZeppelinConnections();
|
||||
wsClient.stop();
|
||||
} else {
|
||||
LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
|
||||
|
|
@ -153,21 +159,36 @@ public class ZeppelinClient {
|
|||
}
|
||||
return msg;
|
||||
}
|
||||
|
||||
private Session openWatcherSession() {
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
WatcherWebsocket socket = WatcherWebsocket.createInstace();
|
||||
Future<Session> future = null;
|
||||
Session session = null;
|
||||
try {
|
||||
future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
|
||||
session = future.get();
|
||||
} catch (IOException | InterruptedException | ExecutionException e) {
|
||||
LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
|
||||
return session;
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
public void send(Message msg, String noteId) {
|
||||
Session noteSession = getZeppelinConnection(noteId);
|
||||
if (!isSessionOpen(noteSession)) {
|
||||
LOG.error("Cannot open websocket connection to Zeppelin note {}", noteId);
|
||||
return;
|
||||
}
|
||||
noteSession.getRemote().sendStringByFuture(serialize(msg));
|
||||
//Session noteSession = getZeppelinConnection(noteId);
|
||||
//if (!isSessionOpen(noteSession)) {
|
||||
// LOG.error("Cannot open websocket connection to Zeppelin note {}", noteId);
|
||||
// return;
|
||||
// }
|
||||
// noteSession.getRemote().sendStringByFuture(serialize(msg));
|
||||
}
|
||||
|
||||
private boolean isSessionOpen(Session session) {
|
||||
return (session != null) && (session.isOpen());
|
||||
}
|
||||
|
||||
/* per notebook based ws connection, returns null if can't connect */
|
||||
/* per notebook based ws connection, returns null if can't connect *
|
||||
public Session getZeppelinConnection(String noteId) {
|
||||
if (StringUtils.isBlank(noteId)) {
|
||||
LOG.warn("Cannot return websocket connection for blank noteId");
|
||||
|
|
@ -182,7 +203,7 @@ public class ZeppelinClient {
|
|||
LOG.info("Creating Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
|
||||
return openNoteSession(noteId);
|
||||
}
|
||||
|
||||
*/
|
||||
private Message zeppelinGetNoteMsg(String noteId) {
|
||||
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
|
||||
HashMap<String, Object> data = new HashMap<>();
|
||||
|
|
@ -190,7 +211,7 @@ public class ZeppelinClient {
|
|||
getNoteMsg.data = data;
|
||||
return getNoteMsg;
|
||||
}
|
||||
|
||||
/*
|
||||
private Session getNoteSession(String noteId) {
|
||||
Session session = zeppelinConnectionMap.get(noteId);
|
||||
if (session == null || !session.isOpen()) {
|
||||
|
|
@ -225,7 +246,7 @@ public class ZeppelinClient {
|
|||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
*/
|
||||
public void handleMsgFromZeppelin(String message, String noteId) {
|
||||
Map<String, String> meta = new HashMap<>();
|
||||
meta.put("token", zeppelinhubToken);
|
||||
|
|
@ -245,7 +266,7 @@ public class ZeppelinClient {
|
|||
|
||||
/**
|
||||
* Close and remove ZeppelinConnection
|
||||
*/
|
||||
*
|
||||
public void removeZeppelinConnection(String noteId) {
|
||||
if (zeppelinConnectionMap.containsKey(noteId)) {
|
||||
Session conn = zeppelinConnectionMap.get(noteId);
|
||||
|
|
@ -260,7 +281,7 @@ public class ZeppelinClient {
|
|||
|
||||
/**
|
||||
* Close and remove all ZeppelinConnection
|
||||
*/
|
||||
*
|
||||
public void removeAllZeppelinConnections() {
|
||||
for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
|
||||
if (isSessionOpen(entry.getValue())) {
|
||||
|
|
@ -285,4 +306,11 @@ public class ZeppelinClient {
|
|||
public int countConnectedNotes() {
|
||||
return zeppelinConnectionMap.size();
|
||||
}
|
||||
*/
|
||||
public void ping() {
|
||||
if (watcherSession == null) {
|
||||
return;
|
||||
}
|
||||
watcherSession.getRemote().sendStringByFuture(serialize(new Message(OP.PING)));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue