mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
add zeppelin websocket client
establish connection between zeppelin and zeppelinhub clients
This commit is contained in:
parent
1ce01ef638
commit
9f1b8bf298
11 changed files with 485 additions and 35 deletions
|
|
@ -139,6 +139,12 @@
|
|||
<version>9.2.15.v20160210</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.websocket</groupId>
|
||||
<artifactId>websocket-client</artifactId>
|
||||
<version>9.2.15.v20160210</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.quartz-scheduler</groupId>
|
||||
<artifactId>quartz</artifactId>
|
||||
|
|
|
|||
|
|
@ -42,8 +42,8 @@ public class ZeppelinHubRepo implements NotebookRepo {
|
|||
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
|
||||
zeppelinhubHandler = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
|
||||
|
||||
//TODO(khalid): add zeppelin uri?
|
||||
websocketClient = new Client(StringUtils.EMPTY, getZeppelinhubWebsocketUri(conf), token);
|
||||
websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
|
||||
getZeppelinhubWebsocketUri(conf), token);
|
||||
websocketClient.start();
|
||||
}
|
||||
|
||||
|
|
@ -84,7 +84,16 @@ public class ZeppelinHubRepo implements NotebookRepo {
|
|||
}
|
||||
return zeppelinHubUri;
|
||||
}
|
||||
|
||||
|
||||
private String getZeppelinWebsocketUri(ZeppelinConfiguration conf) {
|
||||
int port = conf.getServerPort();
|
||||
if (port <= 0) {
|
||||
port = 80;
|
||||
}
|
||||
String ws = conf.useSsl() ? "wss" : "ws";
|
||||
return ws + "://localhost:" + port + "/ws";
|
||||
}
|
||||
|
||||
public void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) {
|
||||
zeppelinhubHandler = zeppelinhub;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
|
||||
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -8,24 +9,51 @@ import org.slf4j.LoggerFactory;
|
|||
*
|
||||
*/
|
||||
public class Client {
|
||||
private Logger LOG = LoggerFactory.getLogger(Client.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
|
||||
private final ZeppelinhubClient zeppelinhubClient;
|
||||
|
||||
public Client(String zeppelinUri, String zeppelinhub, String token) {
|
||||
LOG.debug("Init Client");
|
||||
zeppelinhubClient = ZeppelinhubClient.newInstance(zeppelinhub, token);
|
||||
private final ZeppelinClient zeppelinClient;
|
||||
private static Client instance = null;
|
||||
|
||||
public static Client initialize(String zeppelinUri, String zeppelinhubUri, String token) {
|
||||
if (instance == null) {
|
||||
instance = new Client(zeppelinUri, zeppelinhubUri, token);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
|
||||
public static Client getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
private Client(String zeppelinUri, String zeppelinhubUri, String token) {
|
||||
LOG.debug("Init Client");
|
||||
zeppelinhubClient = ZeppelinhubClient.initialize(zeppelinhubUri, token);
|
||||
zeppelinClient = ZeppelinClient.initialize(zeppelinUri, token);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
if (zeppelinhubClient != null) {
|
||||
zeppelinhubClient.start();
|
||||
if (zeppelinClient != null) {
|
||||
zeppelinClient.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void stop() {
|
||||
if (zeppelinhubClient != null) {
|
||||
zeppelinhubClient.stop();
|
||||
}
|
||||
if (zeppelinClient != null) {
|
||||
zeppelinClient.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public void relayToHub(String message) {
|
||||
zeppelinhubClient.send(message);
|
||||
}
|
||||
|
||||
public void relayToZeppelin(Message message, String noteId) {
|
||||
zeppelinClient.send(message, noteId);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,201 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonSyntaxException;
|
||||
|
||||
/**
|
||||
* Zeppelin websocket client.
|
||||
*
|
||||
*/
|
||||
public class ZeppelinClient {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinClient.class);
|
||||
private final URI zeppelinWebsocketUrl;
|
||||
private final String zeppelinhubToken;
|
||||
private final WebSocketClient wsClient;
|
||||
private static Gson gson;
|
||||
private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
|
||||
private static ZeppelinClient instance = null;
|
||||
|
||||
public static ZeppelinClient initialize(String zeppelinUrl, String token) {
|
||||
if (instance == null) {
|
||||
instance = new ZeppelinClient(zeppelinUrl, token);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static ZeppelinClient getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
private ZeppelinClient(String zeppelinUrl, String token) {
|
||||
zeppelinWebsocketUrl = URI.create(zeppelinUrl);
|
||||
zeppelinhubToken = token;
|
||||
wsClient = createNewWebsocketClient();
|
||||
gson = new Gson();
|
||||
zeppelinConnectionMap = new ConcurrentHashMap<>();
|
||||
LOG.info("Initialized Zeppelin websocket client on {}", zeppelinWebsocketUrl);
|
||||
}
|
||||
|
||||
private WebSocketClient createNewWebsocketClient() {
|
||||
SslContextFactory sslContextFactory = new SslContextFactory();
|
||||
WebSocketClient client = new WebSocketClient(sslContextFactory);
|
||||
//TODO(khalid): other client settings
|
||||
return client;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
try {
|
||||
if (wsClient != null) {
|
||||
wsClient.start();
|
||||
} else {
|
||||
LOG.warn("Cannot start zeppelin websocket client - isn't initialized");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot start Zeppelin websocket client", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
try {
|
||||
if (wsClient != null) {
|
||||
wsClient.stop();
|
||||
} else {
|
||||
LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot stop Zeppelin websocket client", e);
|
||||
}
|
||||
}
|
||||
|
||||
public String serialize(Message zeppelinMsg) {
|
||||
// TODO(khalid): handle authentication
|
||||
String msg = gson.toJson(zeppelinMsg);
|
||||
return msg;
|
||||
}
|
||||
|
||||
public Message deserialize(String zeppelinMessage) {
|
||||
if (StringUtils.isBlank(zeppelinMessage)) {
|
||||
return null;
|
||||
}
|
||||
Message msg;
|
||||
try {
|
||||
msg = gson.fromJson(zeppelinMessage, Message.class);
|
||||
} catch (JsonSyntaxException ex) {
|
||||
LOG.error("Cannot deserialize zeppelin message", ex);
|
||||
msg = null;
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
|
||||
public void send(Message msg, String noteId) {
|
||||
Session noteSession = getZeppelinConnection(noteId);
|
||||
if (!isSessionOpen(noteSession)) {
|
||||
LOG.error("Cannot open websocket connection to Zeppelin note {}", noteId);
|
||||
return;
|
||||
}
|
||||
noteSession.getRemote().sendStringByFuture(serialize(msg));
|
||||
}
|
||||
|
||||
private boolean isSessionOpen(Session session) {
|
||||
return (session != null) && (session.isOpen());
|
||||
}
|
||||
|
||||
/* per notebook based ws connection, returns null if can't connect */
|
||||
public Session getZeppelinConnection(String noteId) {
|
||||
if (StringUtils.isBlank(noteId)) {
|
||||
LOG.warn("Cannot return websocket connection for blank noteId");
|
||||
return null;
|
||||
}
|
||||
// return existing connection
|
||||
if (zeppelinConnectionMap.containsKey(noteId)) {
|
||||
LOG.info("Connection for {} exists in map", noteId);
|
||||
return zeppelinConnectionMap.get(noteId);
|
||||
}
|
||||
|
||||
// create connection
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
ZeppelinWebsocket socket = new ZeppelinWebsocket(noteId);
|
||||
Future<Session> future = null;
|
||||
Session session = null;
|
||||
try {
|
||||
future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
|
||||
session = future.get();
|
||||
} catch (IOException | InterruptedException | ExecutionException e) {
|
||||
LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (zeppelinConnectionMap.containsKey(noteId)) {
|
||||
session.close();
|
||||
session = zeppelinConnectionMap.get(noteId);
|
||||
} else {
|
||||
String getNote = serialize(zeppelinGetNoteMsg(noteId));
|
||||
//TODO(khalid): may need to check return whether successful
|
||||
session.getRemote().sendStringByFuture(getNote);
|
||||
zeppelinConnectionMap.put(noteId, session);
|
||||
}
|
||||
//TODO(khalid): clean log later
|
||||
LOG.info("Create Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
|
||||
return session;
|
||||
}
|
||||
|
||||
private Message zeppelinGetNoteMsg(String noteId) {
|
||||
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
|
||||
HashMap<String, Object> data = new HashMap<String, Object>();
|
||||
data.put("id", noteId);
|
||||
getNoteMsg.data = data;
|
||||
return getNoteMsg;
|
||||
}
|
||||
|
||||
public void handleMsgFromZeppelin(String message, String noteId) {
|
||||
Map<String, String> meta = new HashMap<String, String>();
|
||||
meta.put("token", zeppelinhubToken);
|
||||
meta.put("noteId", noteId);
|
||||
Message zeppelinMsg = deserialize(message);
|
||||
if (zeppelinMsg == null) {
|
||||
return;
|
||||
}
|
||||
ZeppelinhubMessage hubMsg = ZeppelinhubMessage.newMessage(zeppelinMsg, meta);
|
||||
Client client = Client.getInstance();
|
||||
if (client == null) {
|
||||
LOG.warn("Client isn't initialized yet");
|
||||
return;
|
||||
}
|
||||
client.relayToHub(hubMsg.serialize());
|
||||
}
|
||||
|
||||
/**
|
||||
* Close and remove ZeppelinConnection
|
||||
*/
|
||||
public void removeZeppelinConnection(String noteId) {
|
||||
if (zeppelinConnectionMap.containsKey(noteId)) {
|
||||
Session conn = zeppelinConnectionMap.get(noteId);
|
||||
if (conn.isOpen()) {
|
||||
conn.close();
|
||||
}
|
||||
zeppelinConnectionMap.remove(noteId);
|
||||
}
|
||||
// TODO(khalid): clean log later
|
||||
LOG.info("Removed Zeppelin ws connection for the following note {}", noteId);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -4,22 +4,35 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
|
|||
import java.io.IOException;
|
||||
import java.net.HttpCookie;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinhubWebsocket;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session.ZeppelinhubSession;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.apache.zeppelin.notebook.socket.Message.OP;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.amazonaws.util.json.JSONArray;
|
||||
import com.amazonaws.util.json.JSONException;
|
||||
import com.amazonaws.util.json.JSONObject;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
/**
|
||||
* Manage a zeppelinhub websocket connection.
|
||||
|
|
@ -35,14 +48,33 @@ public class ZeppelinhubClient {
|
|||
private static final int MB = 1048576;
|
||||
private static final int MAXIMUN_TEXT_SIZE = 64 * MB;
|
||||
private static final long CONNECTION_IDLE_TIME = TimeUnit.SECONDS.toMillis(30);
|
||||
private static ZeppelinhubClient instance = null;
|
||||
private static Gson gson;
|
||||
|
||||
private SchedulerService schedulerService;
|
||||
private ZeppelinhubSession zeppelinhubSession;
|
||||
|
||||
public static ZeppelinhubClient newInstance(String url, String token) {
|
||||
return new ZeppelinhubClient(url, token);
|
||||
|
||||
public static ZeppelinhubClient initialize(String zeppelinhubUrl, String token) {
|
||||
if (instance == null) {
|
||||
instance = new ZeppelinhubClient(zeppelinhubUrl, token);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
|
||||
public static ZeppelinhubClient getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
private ZeppelinhubClient(String url, String token) {
|
||||
zeppelinhubWebsocketUrl = URI.create(url);
|
||||
client = createNewWebsocketClient();
|
||||
conectionRequest = setConnectionrequest(token);
|
||||
zeppelinhubToken = token;
|
||||
schedulerService = SchedulerService.create(10);
|
||||
gson = new Gson();
|
||||
LOG.info("Initialized ZeppelinHub websocket client on {}", zeppelinhubWebsocketUrl);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
try {
|
||||
client.start();
|
||||
|
|
@ -79,15 +111,7 @@ public class ZeppelinhubClient {
|
|||
private boolean isConnectedToZeppelinhub() {
|
||||
return (zeppelinhubSession == null || !zeppelinhubSession.isSessionOpen()) ? false : true;
|
||||
}
|
||||
|
||||
private ZeppelinhubClient(String url, String token) {
|
||||
zeppelinhubWebsocketUrl = URI.create(url);
|
||||
client = createNewWebsocketClient();
|
||||
conectionRequest = setConnectionrequest(token);
|
||||
zeppelinhubToken = token;
|
||||
schedulerService = SchedulerService.create(10);
|
||||
}
|
||||
|
||||
|
||||
private ZeppelinhubSession connect() {
|
||||
ZeppelinhubSession zeppelinSession;
|
||||
try {
|
||||
|
|
@ -109,7 +133,8 @@ public class ZeppelinhubClient {
|
|||
}
|
||||
|
||||
private WebSocketClient createNewWebsocketClient() {
|
||||
WebSocketClient client = new WebSocketClient();
|
||||
SslContextFactory sslContextFactory = new SslContextFactory();
|
||||
WebSocketClient client = new WebSocketClient(sslContextFactory);
|
||||
client.setMaxTextMessageBufferSize(MAXIMUN_TEXT_SIZE);
|
||||
client.setMaxIdleTimeout(CONNECTION_IDLE_TIME);
|
||||
return client;
|
||||
|
|
@ -118,5 +143,90 @@ public class ZeppelinhubClient {
|
|||
private void addRoutines() {
|
||||
schedulerService.add(ZeppelinHubHeartbeat.newInstance(this), 10, 23);
|
||||
}
|
||||
|
||||
|
||||
public void handleMsgFromZeppelinHub(String message) {
|
||||
ZeppelinhubMessage hubMsg = ZeppelinhubMessage.deserialize(message);
|
||||
if (hubMsg.equals(ZeppelinhubMessage.EMPTY)) {
|
||||
LOG.error("Cannot handle ZeppelinHub message is empty");
|
||||
return;
|
||||
}
|
||||
String op = StringUtils.EMPTY;
|
||||
if (hubMsg.op instanceof String) {
|
||||
op = (String) hubMsg.op;
|
||||
} else {
|
||||
LOG.error("Message OP from ZeppelinHub isn't string {}", hubMsg.op);
|
||||
return;
|
||||
}
|
||||
if (ZeppelinhubUtils.isHubOp(op)) {
|
||||
handleHubOpMsg(ZeppelinhubUtils.stringToHubOp(op), hubMsg, message);
|
||||
} else if (ZeppelinhubUtils.isZeppelinOp(op)) {
|
||||
forwardToZeppelin(ZeppelinhubUtils.stringToZeppelinOp(op), hubMsg);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleHubOpMsg(ZeppelinHubOp op, ZeppelinhubMessage hubMsg, String msg) {
|
||||
if (op == null || msg.equals(ZeppelinhubMessage.EMPTY)) {
|
||||
LOG.error("Cannot handle empty op or msg");
|
||||
return;
|
||||
}
|
||||
switch (op) {
|
||||
case RUN_NOTEBOOK:
|
||||
runAllParagraph(hubMsg.meta.get("noteId"), msg);
|
||||
break;
|
||||
case PONG:
|
||||
// do nothing
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Received {} from ZeppelinHub, not handled", op);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void forwardToZeppelin(Message.OP op, ZeppelinhubMessage hubMsg) {
|
||||
Message zeppelinMsg = new Message(op);
|
||||
if (!(hubMsg.data instanceof Map)) {
|
||||
LOG.error("Data field of message from ZeppelinHub isn't in correct Map format");
|
||||
return;
|
||||
}
|
||||
zeppelinMsg.data = (Map<String, Object>) hubMsg.data;
|
||||
Client client = Client.getInstance();
|
||||
if (client == null) {
|
||||
LOG.warn("Base client isn't initialized, returning");
|
||||
return;
|
||||
}
|
||||
client.relayToZeppelin(zeppelinMsg, hubMsg.meta.get("noteId"));
|
||||
}
|
||||
|
||||
private void runAllParagraph(String noteId, String hubMsg) {
|
||||
LOG.info("Running paragraph with noteId {}", noteId);
|
||||
try {
|
||||
JSONObject data = new JSONObject(hubMsg);
|
||||
if (data.equals(JSONObject.NULL) || !(data.get("data") instanceof JSONArray)) {
|
||||
LOG.error("Wrong \"data\" format for RUN_NOTEBOOK");
|
||||
return;
|
||||
}
|
||||
Client client = Client.getInstance();
|
||||
if (client == null) {
|
||||
LOG.warn("Base client isn't initialized, returning");
|
||||
return;
|
||||
}
|
||||
Message zeppelinMsg = new Message(OP.RUN_PARAGRAPH);
|
||||
|
||||
JSONArray paragraphs = data.getJSONArray("data");
|
||||
for (int i = 0; i < paragraphs.length(); i++) {
|
||||
if (!(paragraphs.get(i) instanceof JSONObject)) {
|
||||
LOG.warn("Wrong \"paragraph\" format for RUN_NOTEBOOK");
|
||||
continue;
|
||||
}
|
||||
zeppelinMsg.data = gson.fromJson(paragraphs.getString(i),
|
||||
new TypeToken<Map<String, Object>>(){}.getType());
|
||||
client.relayToZeppelin(zeppelinMsg, noteId);
|
||||
LOG.info("\nSending RUN_PARAGRAPH message to Zeppelin ");
|
||||
}
|
||||
} catch (JSONException e) {
|
||||
LOG.error("Failed to parse RUN_NOTEBOOK message from ZeppelinHub ", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,58 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
|
||||
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Zeppelin websocket listener class.
|
||||
*
|
||||
*/
|
||||
public class ZeppelinWebsocket implements WebSocketListener {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class);
|
||||
public Session connection;
|
||||
public String noteId;
|
||||
|
||||
public ZeppelinWebsocket(String noteId) {
|
||||
this.noteId = noteId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int code, String message) {
|
||||
LOG.info("Zeppelin connection closed with code: {}, message: {}", code, message);
|
||||
// parentClient.removeConnMap(noteId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session session) {
|
||||
LOG.info("Zeppelin connection opened");
|
||||
this.connection = session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable e) {
|
||||
LOG.warn("Zeppelin socket connection error ", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String data) {
|
||||
LOG.debug("Zeppelin client received Message: " + data);
|
||||
// propagate to ZeppelinHub
|
||||
try {
|
||||
ZeppelinClient zeppelinClient = ZeppelinClient.getInstance();
|
||||
if (zeppelinClient != null) {
|
||||
zeppelinClient.handleMsgFromZeppelin(data, noteId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to send message to ZeppelinHub: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
|
||||
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketListener;
|
||||
|
|
@ -15,7 +16,6 @@ public class ZeppelinhubWebsocket implements WebSocketListener {
|
|||
private final String token;
|
||||
|
||||
private ZeppelinhubWebsocket(String token) {
|
||||
super();
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
|
|
@ -48,8 +48,10 @@ public class ZeppelinhubWebsocket implements WebSocketListener {
|
|||
@Override
|
||||
public void onWebSocketText(String message) {
|
||||
LOG.info("Got msg {}", message);
|
||||
if (isSessionOpen()) {
|
||||
// do something.
|
||||
// handle message from ZeppelinHub.
|
||||
ZeppelinhubClient client = ZeppelinhubClient.getInstance();
|
||||
if (client != null) {
|
||||
client.handleMsgFromZeppelinHub(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol;
|
|||
* Zeppelinhub Op.
|
||||
*/
|
||||
public enum ZeppelinHubOp {
|
||||
ALIVE,
|
||||
LIVE,
|
||||
DEAD,
|
||||
PING,
|
||||
PONG,
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import java.util.Map;
|
|||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.apache.zeppelin.notebook.socket.Message.OP;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -39,11 +40,18 @@ public class ZeppelinhubMessage {
|
|||
public static ZeppelinhubMessage newMessage(Object op, Object data, Map<String, String> meta) {
|
||||
return new ZeppelinhubMessage(op, data, meta);
|
||||
}
|
||||
|
||||
|
||||
public static ZeppelinhubMessage newMessage(Message zeppelinMsg, Map<String, String> meta) {
|
||||
if (zeppelinMsg == null) {
|
||||
return EMPTY;
|
||||
}
|
||||
return new ZeppelinhubMessage(zeppelinMsg.op, zeppelinMsg.data, meta);
|
||||
}
|
||||
|
||||
public String serialize() {
|
||||
return gson.toJson(this, ZeppelinhubMessage.class);
|
||||
}
|
||||
|
||||
|
||||
public static ZeppelinhubMessage deserialize(String zeppelinhubMessage) {
|
||||
if (StringUtils.isBlank(zeppelinhubMessage)) {
|
||||
return EMPTY;
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
|
||||
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinhubRestApiHandler;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -11,7 +10,7 @@ import org.slf4j.LoggerFactory;
|
|||
*
|
||||
*/
|
||||
public class ZeppelinHubHeartbeat implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinHubHeartbeat.class);
|
||||
private ZeppelinhubClient client;
|
||||
|
||||
public static ZeppelinHubHeartbeat newInstance(ZeppelinhubClient client) {
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import java.util.HashMap;
|
|||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -23,7 +24,7 @@ public class ZeppelinhubUtils {
|
|||
HashMap<String, Object> data = new HashMap<String, Object>();
|
||||
data.put("token", token);
|
||||
return ZeppelinhubMessage
|
||||
.newMessage(ZeppelinHubOp.ALIVE, data, new HashMap<String, String>())
|
||||
.newMessage(ZeppelinHubOp.LIVE, data, new HashMap<String, String>())
|
||||
.serialize();
|
||||
}
|
||||
|
||||
|
|
@ -50,4 +51,32 @@ public class ZeppelinhubUtils {
|
|||
.newMessage(ZeppelinHubOp.PING, data, new HashMap<String, String>())
|
||||
.serialize();
|
||||
}
|
||||
|
||||
public static ZeppelinHubOp stringToHubOp(String text) {
|
||||
ZeppelinHubOp hubOp = null;
|
||||
try {
|
||||
hubOp = ZeppelinHubOp.valueOf(text);
|
||||
} catch (IllegalArgumentException e) {
|
||||
// in case of non Hub op
|
||||
}
|
||||
return hubOp;
|
||||
}
|
||||
|
||||
public static boolean isHubOp(String text) {
|
||||
return (stringToHubOp(text) != null);
|
||||
}
|
||||
|
||||
public static Message.OP stringToZeppelinOp(String text) {
|
||||
Message.OP zeppelinOp = null;
|
||||
try {
|
||||
zeppelinOp = Message.OP.valueOf(text);
|
||||
} catch (IllegalArgumentException e) {
|
||||
// in case of non Hub op
|
||||
}
|
||||
return zeppelinOp;
|
||||
}
|
||||
|
||||
public static boolean isZeppelinOp(String text) {
|
||||
return (stringToZeppelinOp(text) != null);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue