mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
add zeppelinhub websocket client
This commit is contained in:
parent
f8f168ac06
commit
45bec472b6
13 changed files with 528 additions and 6 deletions
|
|
@ -132,6 +132,12 @@
|
|||
<artifactId>jetty-client</artifactId>
|
||||
<version>9.2.15.v20160210</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.websocket</groupId>
|
||||
<artifactId>websocket-client</artifactId>
|
||||
<version>9.2.15.v20160210</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.quartz-scheduler</groupId>
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
|||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.NoteInfo;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -26,8 +27,10 @@ public class ZeppelinHubRepo implements NotebookRepo {
|
|||
private static final String DEFAULT_SERVER = "https://www.zeppelinhub.com";
|
||||
static final String ZEPPELIN_CONF_PROP_NAME_SERVER = "zeppelinhub.api.address";
|
||||
static final String ZEPPELIN_CONF_PROP_NAME_TOKEN = "zeppelinhub.api.token";
|
||||
public static final String TOKEN_HEADER = "X-Zeppelin-Token";
|
||||
private static final Gson GSON = new Gson();
|
||||
private static final Note EMPTY_NOTE = new Note();
|
||||
private final Client websocketClient;
|
||||
|
||||
private String token;
|
||||
private ZeppelinhubRestApiHandler zeppelinhubHandler;
|
||||
|
|
@ -37,8 +40,50 @@ public class ZeppelinHubRepo implements NotebookRepo {
|
|||
LOG.info("Initializing ZeppelinHub integration module version ?");
|
||||
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
|
||||
zeppelinhubHandler = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
|
||||
|
||||
//TODO(khalid): add zeppelin uri?
|
||||
websocketClient = new Client(StringUtils.EMPTY, getZeppelinhubWebsocketUri(conf), token);
|
||||
websocketClient.start();
|
||||
}
|
||||
|
||||
String getZeppelinHubWsUri(URI api) throws IOException {
|
||||
URI apiRoot = api;
|
||||
String scheme = apiRoot.getScheme();
|
||||
int port = apiRoot.getPort();
|
||||
if (port <= 0) {
|
||||
port = (scheme != null && scheme.equals("https")) ? 443 : 80;
|
||||
}
|
||||
|
||||
if (scheme == null) {
|
||||
LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}",
|
||||
apiRoot, DEFAULT_SERVER);
|
||||
try {
|
||||
apiRoot = new URI(DEFAULT_SERVER);
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Invalid default zeppelinhub url {}", DEFAULT_SERVER, e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
scheme = apiRoot.getScheme();
|
||||
port = apiRoot.getPort();
|
||||
if (port <= 0) {
|
||||
port = (scheme != null && scheme.equals("https")) ? 443 : 80;
|
||||
}
|
||||
}
|
||||
String ws = scheme.equals("https") ? "wss://" : "ws://";
|
||||
return ws + apiRoot.getHost() + ":" + port + "/async";
|
||||
}
|
||||
|
||||
private String getZeppelinhubWebsocketUri(ZeppelinConfiguration conf) {
|
||||
String zeppelinHubUri = StringUtils.EMPTY;
|
||||
try {
|
||||
zeppelinHubUri = getZeppelinHubWsUri(new URI(conf.getString("ZEPPELINHUB_API_ADDRESS",
|
||||
ZEPPELIN_CONF_PROP_NAME_SERVER, DEFAULT_SERVER)));
|
||||
} catch (URISyntaxException | IOException e) {
|
||||
LOG.error("Cannot get zeppelinhub URI", e);
|
||||
}
|
||||
return zeppelinHubUri;
|
||||
}
|
||||
|
||||
public void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) {
|
||||
zeppelinhubHandler = zeppelinhub;
|
||||
}
|
||||
|
|
@ -118,7 +163,7 @@ public class ZeppelinHubRepo implements NotebookRepo {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
websocketClient.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -10,7 +10,22 @@ import org.slf4j.LoggerFactory;
|
|||
public class Client {
|
||||
private Logger LOG = LoggerFactory.getLogger(Client.class);
|
||||
|
||||
public Client() {
|
||||
private final ZeppelinhubClient zeppelinhubClient;
|
||||
|
||||
public Client(String zeppelinUri, String zeppelinhub, String token) {
|
||||
LOG.debug("Init Client");
|
||||
zeppelinhubClient = ZeppelinhubClient.newInstance(zeppelinhub, token);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
if (zeppelinhubClient != null) {
|
||||
zeppelinhubClient.start();
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (zeppelinhubClient != null) {
|
||||
zeppelinhubClient.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,122 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.HttpCookie;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinhubWebsocket;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session.ZeppelinhubSession;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Manage a zeppelinhub websocket connection.
|
||||
*/
|
||||
public class ZeppelinhubClient {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubClient.class);
|
||||
|
||||
private final WebSocketClient client;
|
||||
private final URI zeppelinhubWebsocketUrl;
|
||||
private final ClientUpgradeRequest conectionRequest;
|
||||
private final String zeppelinhubToken;
|
||||
|
||||
private static final int MB = 1048576;
|
||||
private static final int MAXIMUN_TEXT_SIZE = 64 * MB;
|
||||
private static final long CONNECTION_IDLE_TIME = TimeUnit.SECONDS.toMillis(30);
|
||||
|
||||
private SchedulerService schedulerService;
|
||||
private ZeppelinhubSession zeppelinhubSession;
|
||||
|
||||
public static ZeppelinhubClient newInstance(String url, String token) {
|
||||
return new ZeppelinhubClient(url, token);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
try {
|
||||
client.start();
|
||||
zeppelinhubSession = connect();
|
||||
addRoutines();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot connect to zeppelinhub via websocket", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
LOG.info("Stopping Zeppelinhub websocket client");
|
||||
try {
|
||||
zeppelinhubSession.close();
|
||||
schedulerService.close();
|
||||
client.stop();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot stop zeppelinhub websocket client", e);
|
||||
}
|
||||
}
|
||||
|
||||
public String getToken() {
|
||||
return this.zeppelinhubToken;
|
||||
}
|
||||
|
||||
public void send(String msg) {
|
||||
if (!isConnectedToZeppelinhub()) {
|
||||
LOG.info("Zeppelinhub connection is not open, opening it");
|
||||
zeppelinhubSession = connect();
|
||||
}
|
||||
zeppelinhubSession.sendByFuture(msg);
|
||||
}
|
||||
|
||||
private boolean isConnectedToZeppelinhub() {
|
||||
return (zeppelinhubSession == null || !zeppelinhubSession.isSessionOpen()) ? false : true;
|
||||
}
|
||||
|
||||
private ZeppelinhubClient(String url, String token) {
|
||||
zeppelinhubWebsocketUrl = URI.create(url);
|
||||
client = createNewWebsocketClient();
|
||||
conectionRequest = setConnectionrequest(token);
|
||||
zeppelinhubToken = token;
|
||||
schedulerService = SchedulerService.create(10);
|
||||
}
|
||||
|
||||
private ZeppelinhubSession connect() {
|
||||
ZeppelinhubSession zeppelinSession;
|
||||
try {
|
||||
ZeppelinhubWebsocket ws = ZeppelinhubWebsocket.newInstance(zeppelinhubToken);
|
||||
Future<Session> future = client.connect(ws, zeppelinhubWebsocketUrl, conectionRequest);
|
||||
Session session = future.get();
|
||||
zeppelinSession = ZeppelinhubSession.createInstance(session, zeppelinhubToken);
|
||||
} catch (IOException | InterruptedException | ExecutionException e) {
|
||||
LOG.info("Couldnt connect to zeppelinhub", e);
|
||||
zeppelinSession = ZeppelinhubSession.EMPTY;
|
||||
}
|
||||
return zeppelinSession;
|
||||
}
|
||||
|
||||
private ClientUpgradeRequest setConnectionrequest(String token) {
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
request.setCookies(Lists.newArrayList(new HttpCookie(ZeppelinHubRepo.TOKEN_HEADER, token)));
|
||||
return request;
|
||||
}
|
||||
|
||||
private WebSocketClient createNewWebsocketClient() {
|
||||
WebSocketClient client = new WebSocketClient();
|
||||
client.setMaxTextMessageBufferSize(MAXIMUN_TEXT_SIZE);
|
||||
client.setMaxIdleTimeout(CONNECTION_IDLE_TIME);
|
||||
return client;
|
||||
}
|
||||
|
||||
private void addRoutines() {
|
||||
schedulerService.add(ZeppelinHubHeartbeat.newInstance(this), 10, 23);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,65 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
|
||||
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Zeppelinhub websocket handler.
|
||||
*/
|
||||
public class ZeppelinhubWebsocket implements WebSocketListener {
|
||||
private Logger LOG = LoggerFactory.getLogger(ZeppelinhubWebsocket.class);
|
||||
private Session zeppelinHubSession;
|
||||
private final String token;
|
||||
|
||||
private ZeppelinhubWebsocket(String token) {
|
||||
super();
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
public static ZeppelinhubWebsocket newInstance(String token) {
|
||||
return new ZeppelinhubWebsocket(token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketBinary(byte[] payload, int offset, int len) {}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason) {
|
||||
LOG.info("Closing websocket connection [{}] : {}", statusCode, reason);
|
||||
send(ZeppelinhubUtils.deadMessage(token));
|
||||
this.zeppelinHubSession = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session session) {
|
||||
LOG.info("Opening a new session to Zeppelinhub {}", session.hashCode());
|
||||
this.zeppelinHubSession = session;
|
||||
send(ZeppelinhubUtils.liveMessage(token));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable cause) {
|
||||
LOG.info("Got error", cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String message) {
|
||||
LOG.info("Got msg {}", message);
|
||||
if (isSessionOpen()) {
|
||||
// do something.
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isSessionOpen() {
|
||||
return ((zeppelinHubSession != null) && (zeppelinHubSession.isOpen())) ? true : false;
|
||||
}
|
||||
|
||||
private void send(String msg) {
|
||||
if (isSessionOpen()) {
|
||||
zeppelinHubSession.getRemote().sendStringByFuture(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol;
|
||||
|
||||
/**
|
||||
* Zeppelinhub Op.
|
||||
*/
|
||||
public enum ZeppelinHubOp {
|
||||
ALIVE,
|
||||
DEAD,
|
||||
PING,
|
||||
PONG,
|
||||
RUN_NOTEBOOK,
|
||||
WELCOME,
|
||||
ZEPPELIN_STATUS
|
||||
}
|
||||
|
|
@ -21,7 +21,7 @@ public class ZeppelinhubMessage {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
|
||||
public static final ZeppelinhubMessage EMPTY = new ZeppelinhubMessage();
|
||||
|
||||
public OP op;
|
||||
public Object op;
|
||||
public Object data;
|
||||
public Map<String, String> meta = Maps.newHashMap();
|
||||
|
||||
|
|
@ -30,13 +30,13 @@ public class ZeppelinhubMessage {
|
|||
this.data = null;
|
||||
}
|
||||
|
||||
private ZeppelinhubMessage(OP op, Object data, Map<String, String> meta) {
|
||||
private ZeppelinhubMessage(Object op, Object data, Map<String, String> meta) {
|
||||
this.op = op;
|
||||
this.data = data;
|
||||
this.meta = meta;
|
||||
}
|
||||
|
||||
public static ZeppelinhubMessage newMessage(OP op, Object data, Map<String, String> meta) {
|
||||
public static ZeppelinhubMessage newMessage(Object op, Object data, Map<String, String> meta) {
|
||||
return new ZeppelinhubMessage(op, data, meta);
|
||||
}
|
||||
|
||||
|
|
@ -51,7 +51,7 @@ public class ZeppelinhubMessage {
|
|||
ZeppelinhubMessage msg;
|
||||
try {
|
||||
msg = gson.fromJson(zeppelinhubMessage, ZeppelinhubMessage.class);
|
||||
} catch(JsonSyntaxException ex) {
|
||||
} catch (JsonSyntaxException ex) {
|
||||
LOG.error("Cannot deserialize zeppelinhub message", ex);
|
||||
msg = EMPTY;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,52 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* todo: add description.
|
||||
*
|
||||
* @author anthonyc
|
||||
*
|
||||
*/
|
||||
abstract class Retryable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Retryable.class);
|
||||
protected abstract void attempt() throws Exception;
|
||||
|
||||
/*
|
||||
* @delay: initial delay (ms)
|
||||
*
|
||||
* @interval: interval between re-trials (ms)
|
||||
*
|
||||
* @timeout: timeout after which re-trials are aborted (ms)
|
||||
*/
|
||||
public void execute(long delay, long interval, long timeout) throws Exception {
|
||||
long start = System.currentTimeMillis();
|
||||
if (delay > 0L) {
|
||||
sleep(delay);
|
||||
}
|
||||
while (true) {
|
||||
try {
|
||||
attempt();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
if (System.currentTimeMillis() - start < timeout) {
|
||||
LOG.info("Retrying in " + interval + " ms");
|
||||
sleep(interval);
|
||||
// continue
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sleep(long millis) {
|
||||
try {
|
||||
Thread.sleep(millis);
|
||||
} catch (InterruptedException interruptedException) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Creates a thread pool that can schedule zeppelinhub commands.
|
||||
*
|
||||
* @author anthonyc
|
||||
*
|
||||
*/
|
||||
public class SchedulerService {
|
||||
|
||||
private final ScheduledExecutorService pool;
|
||||
|
||||
private SchedulerService(int numberOfThread) {
|
||||
pool = Executors.newScheduledThreadPool(numberOfThread);
|
||||
}
|
||||
|
||||
public static SchedulerService create(int numberOfThread) {
|
||||
return new SchedulerService(numberOfThread);
|
||||
}
|
||||
|
||||
public void add(Runnable service, int firstExecution, int period) {
|
||||
pool.scheduleAtFixedRate(service, firstExecution, period, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
pool.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
|
||||
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinhubRestApiHandler;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Routine that send PING event to zeppelinhub.
|
||||
*
|
||||
*/
|
||||
public class ZeppelinHubHeartbeat implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
|
||||
private ZeppelinhubClient client;
|
||||
|
||||
public static ZeppelinHubHeartbeat newInstance(ZeppelinhubClient client) {
|
||||
return new ZeppelinHubHeartbeat(client);
|
||||
}
|
||||
|
||||
private ZeppelinHubHeartbeat(ZeppelinhubClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.debug("Sending PING to zeppelinhub");
|
||||
client.send(ZeppelinhubUtils.pingMessage(client.getToken()));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
|
||||
|
||||
/**
|
||||
* Check and test if zeppelinhub connection is still open.
|
||||
*
|
||||
* @author anthonyc
|
||||
*
|
||||
*
|
||||
public class ZeppelinhubConntection extends Retryable implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubConntection.class);
|
||||
private long delay;
|
||||
private long interval;
|
||||
private long timeout;
|
||||
private final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
|
||||
|
||||
private ZeppelinhubConntection(Session session, long delay, long interval, long timeout) {
|
||||
ZeppelinhubSession = session;
|
||||
this.delay = delay;
|
||||
this.interval = interval;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
execute(delay, interval, timeout);
|
||||
} catch (Exception e) {
|
||||
LOG.error(
|
||||
"Failed to execute retryable ZeppelinHub connection task with interval {} after {} ms {}",
|
||||
interval, timeout, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void attempt() throws Exception {
|
||||
LOG.info("Connecting to ZeppelinHub");
|
||||
}
|
||||
|
||||
}
|
||||
*/
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Zeppelinhub session.
|
||||
*/
|
||||
public class ZeppelinhubSession {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubSession.class);
|
||||
private Session session;
|
||||
private final String token;
|
||||
|
||||
public static final ZeppelinhubSession EMPTY = new ZeppelinhubSession(null, StringUtils.EMPTY);
|
||||
|
||||
public static ZeppelinhubSession createInstance(Session session, String token) {
|
||||
return new ZeppelinhubSession(session, token);
|
||||
}
|
||||
|
||||
private ZeppelinhubSession(Session session, String token) {
|
||||
this.session = session;
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
public boolean isSessionOpen() {
|
||||
return ((session != null) && (session.isOpen()));
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if (isSessionOpen()) {
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void sendByFuture(String msg) {
|
||||
if (StringUtils.isBlank(msg)) {
|
||||
LOG.error("Cannot send event to Zeppelinhub, msg is empty");
|
||||
}
|
||||
if (isSessionOpen()) {
|
||||
session.getRemote().sendStringByFuture(msg);
|
||||
} else {
|
||||
LOG.error("Cannot send event to Zeppelinhub, session is close");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Helper class.
|
||||
*
|
||||
*/
|
||||
public class ZeppelinhubUtils {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubUtils.class);
|
||||
|
||||
public static String liveMessage(String token) {
|
||||
if (StringUtils.isBlank(token)) {
|
||||
LOG.error("Cannot create Live message: token is null or empty");
|
||||
return ZeppelinhubMessage.EMPTY.serialize();
|
||||
}
|
||||
HashMap<String, Object> data = new HashMap<String, Object>();
|
||||
data.put("token", token);
|
||||
return ZeppelinhubMessage
|
||||
.newMessage(ZeppelinHubOp.ALIVE, data, new HashMap<String, String>())
|
||||
.serialize();
|
||||
}
|
||||
|
||||
public static String deadMessage(String token) {
|
||||
if (StringUtils.isBlank(token)) {
|
||||
LOG.error("Cannot create Dead message: token is null or empty");
|
||||
return ZeppelinhubMessage.EMPTY.serialize();
|
||||
}
|
||||
HashMap<String, Object> data = new HashMap<String, Object>();
|
||||
data.put("token", token);
|
||||
return ZeppelinhubMessage
|
||||
.newMessage(ZeppelinHubOp.DEAD, data, new HashMap<String, String>())
|
||||
.serialize();
|
||||
}
|
||||
|
||||
public static String pingMessage(String token) {
|
||||
if (StringUtils.isBlank(token)) {
|
||||
LOG.error("Cannot create Ping message: token is null or empty");
|
||||
return ZeppelinhubMessage.EMPTY.serialize();
|
||||
}
|
||||
HashMap<String, Object> data = new HashMap<String, Object>();
|
||||
data.put("token", token);
|
||||
return ZeppelinhubMessage
|
||||
.newMessage(ZeppelinHubOp.PING, data, new HashMap<String, String>())
|
||||
.serialize();
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue