Strting to rework ZeppelinClient

This commit is contained in:
Anthony Corbacho 2016-10-27 17:32:59 +09:00
parent e5b3a1d5a6
commit aa55a5a003

View file

@ -20,18 +20,16 @@ import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.WatcherWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHeartbeat;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -54,7 +52,8 @@ public class ZeppelinClient {
private final String zeppelinhubToken;
private final WebSocketClient wsClient;
private static Gson gson;
private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
//private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
private static Session watcherSession;
private static ZeppelinClient instance = null;
private SchedulerService schedulerService;
private Authentication authModule;
@ -77,12 +76,13 @@ public class ZeppelinClient {
zeppelinhubToken = token;
wsClient = createNewWebsocketClient();
gson = new Gson();
zeppelinConnectionMap = new ConcurrentHashMap<>();
//zeppelinConnectionMap = new ConcurrentHashMap<>();
schedulerService = SchedulerService.getInstance();
authModule = Authentication.initialize(token, conf);
if (authModule != null) {
SchedulerService.getInstance().addOnce(authModule, 10);
}
//
LOG.info("Initialized Zeppelin websocket client on {}", zeppelinWebsocketUrl);
}
@ -111,12 +111,18 @@ public class ZeppelinClient {
private void addRoutines() {
schedulerService.add(ZeppelinHeartbeat.newInstance(this), 15, 4 * min);
new java.util.Timer().schedule(new java.util.TimerTask() {
@Override
public void run() {
openWatcherSession();
}
}, 5000);
}
public void stop() {
try {
if (wsClient != null) {
removeAllZeppelinConnections();
//removeAllZeppelinConnections();
wsClient.stop();
} else {
LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
@ -153,21 +159,36 @@ public class ZeppelinClient {
}
return msg;
}
private Session openWatcherSession() {
ClientUpgradeRequest request = new ClientUpgradeRequest();
WatcherWebsocket socket = WatcherWebsocket.createInstace();
Future<Session> future = null;
Session session = null;
try {
future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
session = future.get();
} catch (IOException | InterruptedException | ExecutionException e) {
LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
return session;
}
return session;
}
public void send(Message msg, String noteId) {
Session noteSession = getZeppelinConnection(noteId);
if (!isSessionOpen(noteSession)) {
LOG.error("Cannot open websocket connection to Zeppelin note {}", noteId);
return;
}
noteSession.getRemote().sendStringByFuture(serialize(msg));
//Session noteSession = getZeppelinConnection(noteId);
//if (!isSessionOpen(noteSession)) {
// LOG.error("Cannot open websocket connection to Zeppelin note {}", noteId);
// return;
// }
// noteSession.getRemote().sendStringByFuture(serialize(msg));
}
private boolean isSessionOpen(Session session) {
return (session != null) && (session.isOpen());
}
/* per notebook based ws connection, returns null if can't connect */
/* per notebook based ws connection, returns null if can't connect *
public Session getZeppelinConnection(String noteId) {
if (StringUtils.isBlank(noteId)) {
LOG.warn("Cannot return websocket connection for blank noteId");
@ -182,7 +203,7 @@ public class ZeppelinClient {
LOG.info("Creating Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
return openNoteSession(noteId);
}
*/
private Message zeppelinGetNoteMsg(String noteId) {
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
HashMap<String, Object> data = new HashMap<>();
@ -190,7 +211,7 @@ public class ZeppelinClient {
getNoteMsg.data = data;
return getNoteMsg;
}
/*
private Session getNoteSession(String noteId) {
Session session = zeppelinConnectionMap.get(noteId);
if (session == null || !session.isOpen()) {
@ -225,7 +246,7 @@ public class ZeppelinClient {
}
return session;
}
*/
public void handleMsgFromZeppelin(String message, String noteId) {
Map<String, String> meta = new HashMap<>();
meta.put("token", zeppelinhubToken);
@ -245,7 +266,7 @@ public class ZeppelinClient {
/**
* Close and remove ZeppelinConnection
*/
*
public void removeZeppelinConnection(String noteId) {
if (zeppelinConnectionMap.containsKey(noteId)) {
Session conn = zeppelinConnectionMap.get(noteId);
@ -260,7 +281,7 @@ public class ZeppelinClient {
/**
* Close and remove all ZeppelinConnection
*/
*
public void removeAllZeppelinConnections() {
for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
if (isSessionOpen(entry.getValue())) {
@ -285,4 +306,11 @@ public class ZeppelinClient {
public int countConnectedNotes() {
return zeppelinConnectionMap.size();
}
*/
public void ping() {
if (watcherSession == null) {
return;
}
watcherSession.getRemote().sendStringByFuture(serialize(new Message(OP.PING)));
}
}