add zeppelin websocket client

establish connection between zeppelin and zeppelinhub clients
This commit is contained in:
Khalid Huseynov 2016-04-27 18:25:14 +09:00
parent 1ce01ef638
commit 9f1b8bf298
11 changed files with 485 additions and 35 deletions

View file

@ -139,6 +139,12 @@
<version>9.2.15.v20160210</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
<version>9.2.15.v20160210</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>

View file

@ -42,8 +42,8 @@ public class ZeppelinHubRepo implements NotebookRepo {
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
zeppelinhubHandler = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
//TODO(khalid): add zeppelin uri?
websocketClient = new Client(StringUtils.EMPTY, getZeppelinhubWebsocketUri(conf), token);
websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
getZeppelinhubWebsocketUri(conf), token);
websocketClient.start();
}
@ -84,7 +84,16 @@ public class ZeppelinHubRepo implements NotebookRepo {
}
return zeppelinHubUri;
}
private String getZeppelinWebsocketUri(ZeppelinConfiguration conf) {
int port = conf.getServerPort();
if (port <= 0) {
port = 80;
}
String ws = conf.useSsl() ? "wss" : "ws";
return ws + "://localhost:" + port + "/ws";
}
public void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) {
zeppelinhubHandler = zeppelinhub;
}

View file

@ -1,5 +1,6 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
import org.apache.zeppelin.notebook.socket.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -8,24 +9,51 @@ import org.slf4j.LoggerFactory;
*
*/
public class Client {
private Logger LOG = LoggerFactory.getLogger(Client.class);
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private final ZeppelinhubClient zeppelinhubClient;
public Client(String zeppelinUri, String zeppelinhub, String token) {
LOG.debug("Init Client");
zeppelinhubClient = ZeppelinhubClient.newInstance(zeppelinhub, token);
private final ZeppelinClient zeppelinClient;
private static Client instance = null;
public static Client initialize(String zeppelinUri, String zeppelinhubUri, String token) {
if (instance == null) {
instance = new Client(zeppelinUri, zeppelinhubUri, token);
}
return instance;
}
public static Client getInstance() {
return instance;
}
private Client(String zeppelinUri, String zeppelinhubUri, String token) {
LOG.debug("Init Client");
zeppelinhubClient = ZeppelinhubClient.initialize(zeppelinhubUri, token);
zeppelinClient = ZeppelinClient.initialize(zeppelinUri, token);
}
public void start() {
if (zeppelinhubClient != null) {
zeppelinhubClient.start();
if (zeppelinClient != null) {
zeppelinClient.start();
}
}
}
public void stop() {
if (zeppelinhubClient != null) {
zeppelinhubClient.stop();
}
if (zeppelinClient != null) {
zeppelinClient.stop();
}
}
public void relayToHub(String message) {
zeppelinhubClient.send(message);
}
public void relayToZeppelin(Message message, String noteId) {
zeppelinClient.send(message, noteId);
}
}

View file

@ -0,0 +1,201 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
import org.apache.zeppelin.notebook.socket.Message;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
/**
* Zeppelin websocket client.
*
*/
public class ZeppelinClient {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinClient.class);
private final URI zeppelinWebsocketUrl;
private final String zeppelinhubToken;
private final WebSocketClient wsClient;
private static Gson gson;
private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
private static ZeppelinClient instance = null;
public static ZeppelinClient initialize(String zeppelinUrl, String token) {
if (instance == null) {
instance = new ZeppelinClient(zeppelinUrl, token);
}
return instance;
}
public static ZeppelinClient getInstance() {
return instance;
}
private ZeppelinClient(String zeppelinUrl, String token) {
zeppelinWebsocketUrl = URI.create(zeppelinUrl);
zeppelinhubToken = token;
wsClient = createNewWebsocketClient();
gson = new Gson();
zeppelinConnectionMap = new ConcurrentHashMap<>();
LOG.info("Initialized Zeppelin websocket client on {}", zeppelinWebsocketUrl);
}
private WebSocketClient createNewWebsocketClient() {
SslContextFactory sslContextFactory = new SslContextFactory();
WebSocketClient client = new WebSocketClient(sslContextFactory);
//TODO(khalid): other client settings
return client;
}
public void start() {
try {
if (wsClient != null) {
wsClient.start();
} else {
LOG.warn("Cannot start zeppelin websocket client - isn't initialized");
}
} catch (Exception e) {
LOG.error("Cannot start Zeppelin websocket client", e);
}
}
public void stop() {
try {
if (wsClient != null) {
wsClient.stop();
} else {
LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
}
} catch (Exception e) {
LOG.error("Cannot stop Zeppelin websocket client", e);
}
}
public String serialize(Message zeppelinMsg) {
// TODO(khalid): handle authentication
String msg = gson.toJson(zeppelinMsg);
return msg;
}
public Message deserialize(String zeppelinMessage) {
if (StringUtils.isBlank(zeppelinMessage)) {
return null;
}
Message msg;
try {
msg = gson.fromJson(zeppelinMessage, Message.class);
} catch (JsonSyntaxException ex) {
LOG.error("Cannot deserialize zeppelin message", ex);
msg = null;
}
return msg;
}
public void send(Message msg, String noteId) {
Session noteSession = getZeppelinConnection(noteId);
if (!isSessionOpen(noteSession)) {
LOG.error("Cannot open websocket connection to Zeppelin note {}", noteId);
return;
}
noteSession.getRemote().sendStringByFuture(serialize(msg));
}
private boolean isSessionOpen(Session session) {
return (session != null) && (session.isOpen());
}
/* per notebook based ws connection, returns null if can't connect */
public Session getZeppelinConnection(String noteId) {
if (StringUtils.isBlank(noteId)) {
LOG.warn("Cannot return websocket connection for blank noteId");
return null;
}
// return existing connection
if (zeppelinConnectionMap.containsKey(noteId)) {
LOG.info("Connection for {} exists in map", noteId);
return zeppelinConnectionMap.get(noteId);
}
// create connection
ClientUpgradeRequest request = new ClientUpgradeRequest();
ZeppelinWebsocket socket = new ZeppelinWebsocket(noteId);
Future<Session> future = null;
Session session = null;
try {
future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
session = future.get();
} catch (IOException | InterruptedException | ExecutionException e) {
LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
return null;
}
if (zeppelinConnectionMap.containsKey(noteId)) {
session.close();
session = zeppelinConnectionMap.get(noteId);
} else {
String getNote = serialize(zeppelinGetNoteMsg(noteId));
//TODO(khalid): may need to check return whether successful
session.getRemote().sendStringByFuture(getNote);
zeppelinConnectionMap.put(noteId, session);
}
//TODO(khalid): clean log later
LOG.info("Create Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
return session;
}
private Message zeppelinGetNoteMsg(String noteId) {
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("id", noteId);
getNoteMsg.data = data;
return getNoteMsg;
}
public void handleMsgFromZeppelin(String message, String noteId) {
Map<String, String> meta = new HashMap<String, String>();
meta.put("token", zeppelinhubToken);
meta.put("noteId", noteId);
Message zeppelinMsg = deserialize(message);
if (zeppelinMsg == null) {
return;
}
ZeppelinhubMessage hubMsg = ZeppelinhubMessage.newMessage(zeppelinMsg, meta);
Client client = Client.getInstance();
if (client == null) {
LOG.warn("Client isn't initialized yet");
return;
}
client.relayToHub(hubMsg.serialize());
}
/**
* Close and remove ZeppelinConnection
*/
public void removeZeppelinConnection(String noteId) {
if (zeppelinConnectionMap.containsKey(noteId)) {
Session conn = zeppelinConnectionMap.get(noteId);
if (conn.isOpen()) {
conn.close();
}
zeppelinConnectionMap.remove(noteId);
}
// TODO(khalid): clean log later
LOG.info("Removed Zeppelin ws connection for the following note {}", noteId);
}
}

View file

@ -4,22 +4,35 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
import java.io.IOException;
import java.net.HttpCookie;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinhubWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session.ZeppelinhubSession;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.util.json.JSONArray;
import com.amazonaws.util.json.JSONException;
import com.amazonaws.util.json.JSONObject;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
* Manage a zeppelinhub websocket connection.
@ -35,14 +48,33 @@ public class ZeppelinhubClient {
private static final int MB = 1048576;
private static final int MAXIMUN_TEXT_SIZE = 64 * MB;
private static final long CONNECTION_IDLE_TIME = TimeUnit.SECONDS.toMillis(30);
private static ZeppelinhubClient instance = null;
private static Gson gson;
private SchedulerService schedulerService;
private ZeppelinhubSession zeppelinhubSession;
public static ZeppelinhubClient newInstance(String url, String token) {
return new ZeppelinhubClient(url, token);
public static ZeppelinhubClient initialize(String zeppelinhubUrl, String token) {
if (instance == null) {
instance = new ZeppelinhubClient(zeppelinhubUrl, token);
}
return instance;
}
public static ZeppelinhubClient getInstance() {
return instance;
}
private ZeppelinhubClient(String url, String token) {
zeppelinhubWebsocketUrl = URI.create(url);
client = createNewWebsocketClient();
conectionRequest = setConnectionrequest(token);
zeppelinhubToken = token;
schedulerService = SchedulerService.create(10);
gson = new Gson();
LOG.info("Initialized ZeppelinHub websocket client on {}", zeppelinhubWebsocketUrl);
}
public void start() {
try {
client.start();
@ -79,15 +111,7 @@ public class ZeppelinhubClient {
private boolean isConnectedToZeppelinhub() {
return (zeppelinhubSession == null || !zeppelinhubSession.isSessionOpen()) ? false : true;
}
private ZeppelinhubClient(String url, String token) {
zeppelinhubWebsocketUrl = URI.create(url);
client = createNewWebsocketClient();
conectionRequest = setConnectionrequest(token);
zeppelinhubToken = token;
schedulerService = SchedulerService.create(10);
}
private ZeppelinhubSession connect() {
ZeppelinhubSession zeppelinSession;
try {
@ -109,7 +133,8 @@ public class ZeppelinhubClient {
}
private WebSocketClient createNewWebsocketClient() {
WebSocketClient client = new WebSocketClient();
SslContextFactory sslContextFactory = new SslContextFactory();
WebSocketClient client = new WebSocketClient(sslContextFactory);
client.setMaxTextMessageBufferSize(MAXIMUN_TEXT_SIZE);
client.setMaxIdleTimeout(CONNECTION_IDLE_TIME);
return client;
@ -118,5 +143,90 @@ public class ZeppelinhubClient {
private void addRoutines() {
schedulerService.add(ZeppelinHubHeartbeat.newInstance(this), 10, 23);
}
public void handleMsgFromZeppelinHub(String message) {
ZeppelinhubMessage hubMsg = ZeppelinhubMessage.deserialize(message);
if (hubMsg.equals(ZeppelinhubMessage.EMPTY)) {
LOG.error("Cannot handle ZeppelinHub message is empty");
return;
}
String op = StringUtils.EMPTY;
if (hubMsg.op instanceof String) {
op = (String) hubMsg.op;
} else {
LOG.error("Message OP from ZeppelinHub isn't string {}", hubMsg.op);
return;
}
if (ZeppelinhubUtils.isHubOp(op)) {
handleHubOpMsg(ZeppelinhubUtils.stringToHubOp(op), hubMsg, message);
} else if (ZeppelinhubUtils.isZeppelinOp(op)) {
forwardToZeppelin(ZeppelinhubUtils.stringToZeppelinOp(op), hubMsg);
}
}
private void handleHubOpMsg(ZeppelinHubOp op, ZeppelinhubMessage hubMsg, String msg) {
if (op == null || msg.equals(ZeppelinhubMessage.EMPTY)) {
LOG.error("Cannot handle empty op or msg");
return;
}
switch (op) {
case RUN_NOTEBOOK:
runAllParagraph(hubMsg.meta.get("noteId"), msg);
break;
case PONG:
// do nothing
break;
default:
LOG.warn("Received {} from ZeppelinHub, not handled", op);
break;
}
}
@SuppressWarnings("unchecked")
private void forwardToZeppelin(Message.OP op, ZeppelinhubMessage hubMsg) {
Message zeppelinMsg = new Message(op);
if (!(hubMsg.data instanceof Map)) {
LOG.error("Data field of message from ZeppelinHub isn't in correct Map format");
return;
}
zeppelinMsg.data = (Map<String, Object>) hubMsg.data;
Client client = Client.getInstance();
if (client == null) {
LOG.warn("Base client isn't initialized, returning");
return;
}
client.relayToZeppelin(zeppelinMsg, hubMsg.meta.get("noteId"));
}
private void runAllParagraph(String noteId, String hubMsg) {
LOG.info("Running paragraph with noteId {}", noteId);
try {
JSONObject data = new JSONObject(hubMsg);
if (data.equals(JSONObject.NULL) || !(data.get("data") instanceof JSONArray)) {
LOG.error("Wrong \"data\" format for RUN_NOTEBOOK");
return;
}
Client client = Client.getInstance();
if (client == null) {
LOG.warn("Base client isn't initialized, returning");
return;
}
Message zeppelinMsg = new Message(OP.RUN_PARAGRAPH);
JSONArray paragraphs = data.getJSONArray("data");
for (int i = 0; i < paragraphs.length(); i++) {
if (!(paragraphs.get(i) instanceof JSONObject)) {
LOG.warn("Wrong \"paragraph\" format for RUN_NOTEBOOK");
continue;
}
zeppelinMsg.data = gson.fromJson(paragraphs.getString(i),
new TypeToken<Map<String, Object>>(){}.getType());
client.relayToZeppelin(zeppelinMsg, noteId);
LOG.info("\nSending RUN_PARAGRAPH message to Zeppelin ");
}
} catch (JSONException e) {
LOG.error("Failed to parse RUN_NOTEBOOK message from ZeppelinHub ", e);
}
}
}

View file

@ -0,0 +1,58 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Zeppelin websocket listener class.
*
*/
public class ZeppelinWebsocket implements WebSocketListener {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class);
public Session connection;
public String noteId;
public ZeppelinWebsocket(String noteId) {
this.noteId = noteId;
}
@Override
public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
}
@Override
public void onWebSocketClose(int code, String message) {
LOG.info("Zeppelin connection closed with code: {}, message: {}", code, message);
// parentClient.removeConnMap(noteId);
}
@Override
public void onWebSocketConnect(Session session) {
LOG.info("Zeppelin connection opened");
this.connection = session;
}
@Override
public void onWebSocketError(Throwable e) {
LOG.warn("Zeppelin socket connection error ", e);
}
@Override
public void onWebSocketText(String data) {
LOG.debug("Zeppelin client received Message: " + data);
// propagate to ZeppelinHub
try {
ZeppelinClient zeppelinClient = ZeppelinClient.getInstance();
if (zeppelinClient != null) {
zeppelinClient.handleMsgFromZeppelin(data, noteId);
}
} catch (Exception e) {
LOG.error("Failed to send message to ZeppelinHub: ", e);
}
}
}

View file

@ -1,5 +1,6 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
@ -15,7 +16,6 @@ public class ZeppelinhubWebsocket implements WebSocketListener {
private final String token;
private ZeppelinhubWebsocket(String token) {
super();
this.token = token;
}
@ -48,8 +48,10 @@ public class ZeppelinhubWebsocket implements WebSocketListener {
@Override
public void onWebSocketText(String message) {
LOG.info("Got msg {}", message);
if (isSessionOpen()) {
// do something.
// handle message from ZeppelinHub.
ZeppelinhubClient client = ZeppelinhubClient.getInstance();
if (client != null) {
client.handleMsgFromZeppelinHub(message);
}
}

View file

@ -4,7 +4,7 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol;
* Zeppelinhub Op.
*/
public enum ZeppelinHubOp {
ALIVE,
LIVE,
DEAD,
PING,
PONG,

View file

@ -4,6 +4,7 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,11 +40,18 @@ public class ZeppelinhubMessage {
public static ZeppelinhubMessage newMessage(Object op, Object data, Map<String, String> meta) {
return new ZeppelinhubMessage(op, data, meta);
}
public static ZeppelinhubMessage newMessage(Message zeppelinMsg, Map<String, String> meta) {
if (zeppelinMsg == null) {
return EMPTY;
}
return new ZeppelinhubMessage(zeppelinMsg.op, zeppelinMsg.data, meta);
}
public String serialize() {
return gson.toJson(this, ZeppelinhubMessage.class);
}
public static ZeppelinhubMessage deserialize(String zeppelinhubMessage) {
if (StringUtils.isBlank(zeppelinhubMessage)) {
return EMPTY;

View file

@ -1,6 +1,5 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinhubRestApiHandler;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
import org.slf4j.Logger;
@ -11,7 +10,7 @@ import org.slf4j.LoggerFactory;
*
*/
public class ZeppelinHubHeartbeat implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinHubHeartbeat.class);
private ZeppelinhubClient client;
public static ZeppelinHubHeartbeat newInstance(ZeppelinhubClient client) {

View file

@ -5,6 +5,7 @@ import java.util.HashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
import org.apache.zeppelin.notebook.socket.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -23,7 +24,7 @@ public class ZeppelinhubUtils {
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("token", token);
return ZeppelinhubMessage
.newMessage(ZeppelinHubOp.ALIVE, data, new HashMap<String, String>())
.newMessage(ZeppelinHubOp.LIVE, data, new HashMap<String, String>())
.serialize();
}
@ -50,4 +51,32 @@ public class ZeppelinhubUtils {
.newMessage(ZeppelinHubOp.PING, data, new HashMap<String, String>())
.serialize();
}
public static ZeppelinHubOp stringToHubOp(String text) {
ZeppelinHubOp hubOp = null;
try {
hubOp = ZeppelinHubOp.valueOf(text);
} catch (IllegalArgumentException e) {
// in case of non Hub op
}
return hubOp;
}
public static boolean isHubOp(String text) {
return (stringToHubOp(text) != null);
}
public static Message.OP stringToZeppelinOp(String text) {
Message.OP zeppelinOp = null;
try {
zeppelinOp = Message.OP.valueOf(text);
} catch (IllegalArgumentException e) {
// in case of non Hub op
}
return zeppelinOp;
}
public static boolean isZeppelinOp(String text) {
return (stringToZeppelinOp(text) != null);
}
}