add zeppelinhub websocket client

This commit is contained in:
Anthony Corbacho 2016-04-27 17:03:00 +09:00 committed by Khalid Huseynov
parent f8f168ac06
commit 45bec472b6
13 changed files with 528 additions and 6 deletions

View file

@ -132,6 +132,12 @@
<artifactId>jetty-client</artifactId>
<version>9.2.15.v20160210</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
<version>9.2.15.v20160210</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>

View file

@ -11,6 +11,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,8 +27,10 @@ public class ZeppelinHubRepo implements NotebookRepo {
private static final String DEFAULT_SERVER = "https://www.zeppelinhub.com";
static final String ZEPPELIN_CONF_PROP_NAME_SERVER = "zeppelinhub.api.address";
static final String ZEPPELIN_CONF_PROP_NAME_TOKEN = "zeppelinhub.api.token";
public static final String TOKEN_HEADER = "X-Zeppelin-Token";
private static final Gson GSON = new Gson();
private static final Note EMPTY_NOTE = new Note();
private final Client websocketClient;
private String token;
private ZeppelinhubRestApiHandler zeppelinhubHandler;
@ -37,8 +40,50 @@ public class ZeppelinHubRepo implements NotebookRepo {
LOG.info("Initializing ZeppelinHub integration module version ?");
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
zeppelinhubHandler = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
//TODO(khalid): add zeppelin uri?
websocketClient = new Client(StringUtils.EMPTY, getZeppelinhubWebsocketUri(conf), token);
websocketClient.start();
}
String getZeppelinHubWsUri(URI api) throws IOException {
URI apiRoot = api;
String scheme = apiRoot.getScheme();
int port = apiRoot.getPort();
if (port <= 0) {
port = (scheme != null && scheme.equals("https")) ? 443 : 80;
}
if (scheme == null) {
LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}",
apiRoot, DEFAULT_SERVER);
try {
apiRoot = new URI(DEFAULT_SERVER);
} catch (URISyntaxException e) {
LOG.error("Invalid default zeppelinhub url {}", DEFAULT_SERVER, e);
throw new IOException(e);
}
scheme = apiRoot.getScheme();
port = apiRoot.getPort();
if (port <= 0) {
port = (scheme != null && scheme.equals("https")) ? 443 : 80;
}
}
String ws = scheme.equals("https") ? "wss://" : "ws://";
return ws + apiRoot.getHost() + ":" + port + "/async";
}
private String getZeppelinhubWebsocketUri(ZeppelinConfiguration conf) {
String zeppelinHubUri = StringUtils.EMPTY;
try {
zeppelinHubUri = getZeppelinHubWsUri(new URI(conf.getString("ZEPPELINHUB_API_ADDRESS",
ZEPPELIN_CONF_PROP_NAME_SERVER, DEFAULT_SERVER)));
} catch (URISyntaxException | IOException e) {
LOG.error("Cannot get zeppelinhub URI", e);
}
return zeppelinHubUri;
}
public void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) {
zeppelinhubHandler = zeppelinhub;
}
@ -118,7 +163,7 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public void close() {
websocketClient.stop();
}
@Override

View file

@ -10,7 +10,22 @@ import org.slf4j.LoggerFactory;
public class Client {
private Logger LOG = LoggerFactory.getLogger(Client.class);
public Client() {
private final ZeppelinhubClient zeppelinhubClient;
public Client(String zeppelinUri, String zeppelinhub, String token) {
LOG.debug("Init Client");
zeppelinhubClient = ZeppelinhubClient.newInstance(zeppelinhub, token);
}
public void start() {
if (zeppelinhubClient != null) {
zeppelinhubClient.start();
}
}
public void stop() {
if (zeppelinhubClient != null) {
zeppelinhubClient.stop();
}
}
}

View file

@ -0,0 +1,122 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
import java.io.IOException;
import java.net.HttpCookie;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinhubWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session.ZeppelinhubSession;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
* Manage a zeppelinhub websocket connection.
*/
public class ZeppelinhubClient {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubClient.class);
private final WebSocketClient client;
private final URI zeppelinhubWebsocketUrl;
private final ClientUpgradeRequest conectionRequest;
private final String zeppelinhubToken;
private static final int MB = 1048576;
private static final int MAXIMUN_TEXT_SIZE = 64 * MB;
private static final long CONNECTION_IDLE_TIME = TimeUnit.SECONDS.toMillis(30);
private SchedulerService schedulerService;
private ZeppelinhubSession zeppelinhubSession;
public static ZeppelinhubClient newInstance(String url, String token) {
return new ZeppelinhubClient(url, token);
}
public void start() {
try {
client.start();
zeppelinhubSession = connect();
addRoutines();
} catch (Exception e) {
LOG.error("Cannot connect to zeppelinhub via websocket", e);
}
}
public void stop() {
LOG.info("Stopping Zeppelinhub websocket client");
try {
zeppelinhubSession.close();
schedulerService.close();
client.stop();
} catch (Exception e) {
LOG.error("Cannot stop zeppelinhub websocket client", e);
}
}
public String getToken() {
return this.zeppelinhubToken;
}
public void send(String msg) {
if (!isConnectedToZeppelinhub()) {
LOG.info("Zeppelinhub connection is not open, opening it");
zeppelinhubSession = connect();
}
zeppelinhubSession.sendByFuture(msg);
}
private boolean isConnectedToZeppelinhub() {
return (zeppelinhubSession == null || !zeppelinhubSession.isSessionOpen()) ? false : true;
}
private ZeppelinhubClient(String url, String token) {
zeppelinhubWebsocketUrl = URI.create(url);
client = createNewWebsocketClient();
conectionRequest = setConnectionrequest(token);
zeppelinhubToken = token;
schedulerService = SchedulerService.create(10);
}
private ZeppelinhubSession connect() {
ZeppelinhubSession zeppelinSession;
try {
ZeppelinhubWebsocket ws = ZeppelinhubWebsocket.newInstance(zeppelinhubToken);
Future<Session> future = client.connect(ws, zeppelinhubWebsocketUrl, conectionRequest);
Session session = future.get();
zeppelinSession = ZeppelinhubSession.createInstance(session, zeppelinhubToken);
} catch (IOException | InterruptedException | ExecutionException e) {
LOG.info("Couldnt connect to zeppelinhub", e);
zeppelinSession = ZeppelinhubSession.EMPTY;
}
return zeppelinSession;
}
private ClientUpgradeRequest setConnectionrequest(String token) {
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setCookies(Lists.newArrayList(new HttpCookie(ZeppelinHubRepo.TOKEN_HEADER, token)));
return request;
}
private WebSocketClient createNewWebsocketClient() {
WebSocketClient client = new WebSocketClient();
client.setMaxTextMessageBufferSize(MAXIMUN_TEXT_SIZE);
client.setMaxIdleTimeout(CONNECTION_IDLE_TIME);
return client;
}
private void addRoutines() {
schedulerService.add(ZeppelinHubHeartbeat.newInstance(this), 10, 23);
}
}

View file

@ -0,0 +1,65 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Zeppelinhub websocket handler.
*/
public class ZeppelinhubWebsocket implements WebSocketListener {
private Logger LOG = LoggerFactory.getLogger(ZeppelinhubWebsocket.class);
private Session zeppelinHubSession;
private final String token;
private ZeppelinhubWebsocket(String token) {
super();
this.token = token;
}
public static ZeppelinhubWebsocket newInstance(String token) {
return new ZeppelinhubWebsocket(token);
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len) {}
@Override
public void onWebSocketClose(int statusCode, String reason) {
LOG.info("Closing websocket connection [{}] : {}", statusCode, reason);
send(ZeppelinhubUtils.deadMessage(token));
this.zeppelinHubSession = null;
}
@Override
public void onWebSocketConnect(Session session) {
LOG.info("Opening a new session to Zeppelinhub {}", session.hashCode());
this.zeppelinHubSession = session;
send(ZeppelinhubUtils.liveMessage(token));
}
@Override
public void onWebSocketError(Throwable cause) {
LOG.info("Got error", cause);
}
@Override
public void onWebSocketText(String message) {
LOG.info("Got msg {}", message);
if (isSessionOpen()) {
// do something.
}
}
private boolean isSessionOpen() {
return ((zeppelinHubSession != null) && (zeppelinHubSession.isOpen())) ? true : false;
}
private void send(String msg) {
if (isSessionOpen()) {
zeppelinHubSession.getRemote().sendStringByFuture(msg);
}
}
}

View file

@ -0,0 +1,14 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol;
/**
* Zeppelinhub Op.
*/
public enum ZeppelinHubOp {
ALIVE,
DEAD,
PING,
PONG,
RUN_NOTEBOOK,
WELCOME,
ZEPPELIN_STATUS
}

View file

@ -21,7 +21,7 @@ public class ZeppelinhubMessage {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
public static final ZeppelinhubMessage EMPTY = new ZeppelinhubMessage();
public OP op;
public Object op;
public Object data;
public Map<String, String> meta = Maps.newHashMap();
@ -30,13 +30,13 @@ public class ZeppelinhubMessage {
this.data = null;
}
private ZeppelinhubMessage(OP op, Object data, Map<String, String> meta) {
private ZeppelinhubMessage(Object op, Object data, Map<String, String> meta) {
this.op = op;
this.data = data;
this.meta = meta;
}
public static ZeppelinhubMessage newMessage(OP op, Object data, Map<String, String> meta) {
public static ZeppelinhubMessage newMessage(Object op, Object data, Map<String, String> meta) {
return new ZeppelinhubMessage(op, data, meta);
}
@ -51,7 +51,7 @@ public class ZeppelinhubMessage {
ZeppelinhubMessage msg;
try {
msg = gson.fromJson(zeppelinhubMessage, ZeppelinhubMessage.class);
} catch(JsonSyntaxException ex) {
} catch (JsonSyntaxException ex) {
LOG.error("Cannot deserialize zeppelinhub message", ex);
msg = EMPTY;
}

View file

@ -0,0 +1,52 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* todo: add description.
*
* @author anthonyc
*
*/
abstract class Retryable {
private static final Logger LOG = LoggerFactory.getLogger(Retryable.class);
protected abstract void attempt() throws Exception;
/*
* @delay: initial delay (ms)
*
* @interval: interval between re-trials (ms)
*
* @timeout: timeout after which re-trials are aborted (ms)
*/
public void execute(long delay, long interval, long timeout) throws Exception {
long start = System.currentTimeMillis();
if (delay > 0L) {
sleep(delay);
}
while (true) {
try {
attempt();
return;
} catch (Exception e) {
if (System.currentTimeMillis() - start < timeout) {
LOG.info("Retrying in " + interval + " ms");
sleep(interval);
// continue
} else {
throw e;
}
}
}
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException interruptedException) {
// continue
}
}
}

View file

@ -0,0 +1,33 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Creates a thread pool that can schedule zeppelinhub commands.
*
* @author anthonyc
*
*/
public class SchedulerService {
private final ScheduledExecutorService pool;
private SchedulerService(int numberOfThread) {
pool = Executors.newScheduledThreadPool(numberOfThread);
}
public static SchedulerService create(int numberOfThread) {
return new SchedulerService(numberOfThread);
}
public void add(Runnable service, int firstExecution, int period) {
pool.scheduleAtFixedRate(service, firstExecution, period, TimeUnit.SECONDS);
}
public void close() {
pool.shutdown();
}
}

View file

@ -0,0 +1,30 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinhubRestApiHandler;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Routine that send PING event to zeppelinhub.
*
*/
public class ZeppelinHubHeartbeat implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
private ZeppelinhubClient client;
public static ZeppelinHubHeartbeat newInstance(ZeppelinhubClient client) {
return new ZeppelinHubHeartbeat(client);
}
private ZeppelinHubHeartbeat(ZeppelinhubClient client) {
this.client = client;
}
@Override
public void run() {
LOG.debug("Sending PING to zeppelinhub");
client.send(ZeppelinhubUtils.pingMessage(client.getToken()));
}
}

View file

@ -0,0 +1,40 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
/**
* Check and test if zeppelinhub connection is still open.
*
* @author anthonyc
*
*
public class ZeppelinhubConntection extends Retryable implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubConntection.class);
private long delay;
private long interval;
private long timeout;
private final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
private ZeppelinhubConntection(Session session, long delay, long interval, long timeout) {
ZeppelinhubSession = session;
this.delay = delay;
this.interval = interval;
this.timeout = timeout;
}
@Override
public void run() {
try {
execute(delay, interval, timeout);
} catch (Exception e) {
LOG.error(
"Failed to execute retryable ZeppelinHub connection task with interval {} after {} ms {}",
interval, timeout, e);
}
}
@Override
protected void attempt() throws Exception {
LOG.info("Connecting to ZeppelinHub");
}
}
*/

View file

@ -0,0 +1,47 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.websocket.api.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Zeppelinhub session.
*/
public class ZeppelinhubSession {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubSession.class);
private Session session;
private final String token;
public static final ZeppelinhubSession EMPTY = new ZeppelinhubSession(null, StringUtils.EMPTY);
public static ZeppelinhubSession createInstance(Session session, String token) {
return new ZeppelinhubSession(session, token);
}
private ZeppelinhubSession(Session session, String token) {
this.session = session;
this.token = token;
}
public boolean isSessionOpen() {
return ((session != null) && (session.isOpen()));
}
public void close() {
if (isSessionOpen()) {
session.close();
}
}
public void sendByFuture(String msg) {
if (StringUtils.isBlank(msg)) {
LOG.error("Cannot send event to Zeppelinhub, msg is empty");
}
if (isSessionOpen()) {
session.getRemote().sendStringByFuture(msg);
} else {
LOG.error("Cannot send event to Zeppelinhub, session is close");
}
}
}

View file

@ -0,0 +1,53 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils;
import java.util.HashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class.
*
*/
public class ZeppelinhubUtils {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubUtils.class);
public static String liveMessage(String token) {
if (StringUtils.isBlank(token)) {
LOG.error("Cannot create Live message: token is null or empty");
return ZeppelinhubMessage.EMPTY.serialize();
}
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("token", token);
return ZeppelinhubMessage
.newMessage(ZeppelinHubOp.ALIVE, data, new HashMap<String, String>())
.serialize();
}
public static String deadMessage(String token) {
if (StringUtils.isBlank(token)) {
LOG.error("Cannot create Dead message: token is null or empty");
return ZeppelinhubMessage.EMPTY.serialize();
}
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("token", token);
return ZeppelinhubMessage
.newMessage(ZeppelinHubOp.DEAD, data, new HashMap<String, String>())
.serialize();
}
public static String pingMessage(String token) {
if (StringUtils.isBlank(token)) {
LOG.error("Cannot create Ping message: token is null or empty");
return ZeppelinhubMessage.EMPTY.serialize();
}
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("token", token);
return ZeppelinhubMessage
.newMessage(ZeppelinHubOp.PING, data, new HashMap<String, String>())
.serialize();
}
}