add zeppelin client tests

This commit is contained in:
Khalid Huseynov 2016-05-09 16:12:33 +09:00
parent 9f1b8bf298
commit ebcf692b72
6 changed files with 243 additions and 0 deletions

View file

@ -239,6 +239,27 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.2.15.v20160210</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.2.15.v20160210</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<version>9.2.15.v20160210</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View file

@ -198,4 +198,7 @@ public class ZeppelinClient {
LOG.info("Removed Zeppelin ws connection for the following note {}", noteId);
}
public int countConnectedNotes() {
return zeppelinConnectionMap.size();
}
}

View file

@ -0,0 +1,121 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
import static org.junit.Assert.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock.MockEchoWebsocketServer;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.eclipse.jetty.websocket.api.Session;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
public class ZeppelinClientTest {
private Logger LOG = LoggerFactory.getLogger(ZeppelinClientTest.class);
private final int zeppelinPort = 8080;
private final String validWebsocketUrl = "ws://localhost:" + zeppelinPort + "/ws";
private ExecutorService executor;
private MockEchoWebsocketServer echoServer;
@Before
public void setUp() throws Exception {
startWebsocketServer();
}
@After
public void tearDown() throws Exception {
//tear down routine
echoServer.stop();
executor.shutdown();
}
private void startWebsocketServer() throws InterruptedException {
// mock zeppelin websocket server setup
executor = Executors.newFixedThreadPool(1);
echoServer = new MockEchoWebsocketServer(zeppelinPort);
executor.submit(echoServer);
}
@Test
public void zeppelinConnectionTest() {
try {
// Wait for websocket server to start
Thread.sleep(2000);
} catch (InterruptedException e) {
LOG.warn("Cannot wait for websocket server to start, returning");
return;
}
// Initialize and start Zeppelin client
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "dummy token");
client.start();
LOG.info("Zeppelin websocket client started");
// Connection to note AAAA
Session connectionA = client.getZeppelinConnection("AAAA");
assertNotNull(connectionA);
assertTrue(connectionA.isOpen());
assertEquals(client.countConnectedNotes(), 1);
assertEquals(connectionA, client.getZeppelinConnection("AAAA"));
// Connection to note BBBB
Session connectionB = client.getZeppelinConnection("BBBB");
assertNotNull(connectionB);
assertTrue(connectionB.isOpen());
assertEquals(client.countConnectedNotes(), 2);
assertEquals(connectionB, client.getZeppelinConnection("BBBB"));
// Remove connection to note AAAA
client.removeZeppelinConnection("AAAA");
assertEquals(client.countConnectedNotes(), 1);
assertNotEquals(connectionA, client.getZeppelinConnection("AAAA"));
assertEquals(client.countConnectedNotes(), 2);
client.stop();
}
@Test
public void zeppelinClientSingletonTest() {
ZeppelinClient client = ZeppelinClient.getInstance();
assertNull(client);
client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
assertNotNull(client);
client = ZeppelinClient.getInstance();
assertNotNull(client);
}
@Test
public void zeppelinMessageSerializationTest() {
Message msg = new Message(OP.LIST_NOTES);
msg.data = Maps.newHashMap();
msg.data.put("key", "value");
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
String serializedMsg = client.serialize(msg);
Message deserializedMsg = client.deserialize(serializedMsg);
assertEquals(msg.op, deserializedMsg.op);
assertEquals(msg.data.get("key"), deserializedMsg.data.get("key"));
String invalidMsg = "random text";
deserializedMsg =client.deserialize(invalidMsg);
assertNull(deserializedMsg);
}
@Test
public void sendToZeppelinTest() {
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
client.start();
Message msg = new Message(OP.LIST_NOTES);
msg.data = Maps.newHashMap();
msg.data.put("key", "value");
client.send(msg, "DDDD");
client.removeZeppelinConnection("DDDD");
client.stop();
}
}

View file

@ -0,0 +1,46 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.slf4j.LoggerFactory;
public class MockEchoWebsocketServer implements Runnable {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockEchoWebsocketServer.class);
private Server server;
public MockEchoWebsocketServer(int port) {
server = new Server();
ServerConnector connector = new ServerConnector(server);
connector.setPort(port);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
server.setHandler(context);
//ServletHolder holderEvents = new ServletHolder("ws-events", MockEventServlet.class);
context.addServlet(MockEventServlet.class, "/ws/*");
}
public void start() throws Exception {
LOG.info("Starting mock echo websocket server");
server.start();
server.join();
}
public void stop() throws Exception {
LOG.info("Stopping mock echo websocket server");
server.stop();
}
@Override
public void run() {
try {
this.start();
} catch (Exception e) {
LOG.error("Couldn't start mock echo websocket server", e);
}
}
}

View file

@ -0,0 +1,14 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
@SuppressWarnings("serial")
public class MockEventServlet extends WebSocketServlet
{
@Override
public void configure(WebSocketServletFactory factory)
{
factory.register(MockEventSocket.class);
}
}

View file

@ -0,0 +1,38 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MockEventSocket extends WebSocketAdapter {
private static final Logger LOG = LoggerFactory.getLogger(MockEventServlet.class);
private Session session;
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
this.session = session;
LOG.info("Socket Connected: " + session);
}
@Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);
session.getRemote().sendStringByFuture(message);
LOG.info("Received TEXT message: {}", message);
}
@Override
public void onWebSocketClose(int statusCode, String reason) {
super.onWebSocketClose(statusCode, reason);
LOG.info("Socket Closed: [{}] {}", statusCode, reason);
}
@Override
public void onWebSocketError(Throwable cause) {
super.onWebSocketError(cause);
LOG.error("Websocket error: {}", cause);
}
}