mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
add zeppelin client tests
This commit is contained in:
parent
9f1b8bf298
commit
ebcf692b72
6 changed files with 243 additions and 0 deletions
|
|
@ -239,6 +239,27 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-server</artifactId>
|
||||
<version>9.2.15.v20160210</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-servlet</artifactId>
|
||||
<version>9.2.15.v20160210</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.websocket</groupId>
|
||||
<artifactId>websocket-server</artifactId>
|
||||
<version>9.2.15.v20160210</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
|||
|
|
@ -198,4 +198,7 @@ public class ZeppelinClient {
|
|||
LOG.info("Removed Zeppelin ws connection for the following note {}", noteId);
|
||||
}
|
||||
|
||||
public int countConnectedNotes() {
|
||||
return zeppelinConnectionMap.size();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,121 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock.MockEchoWebsocketServer;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.apache.zeppelin.notebook.socket.Message.OP;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
public class ZeppelinClientTest {
|
||||
private Logger LOG = LoggerFactory.getLogger(ZeppelinClientTest.class);
|
||||
private final int zeppelinPort = 8080;
|
||||
private final String validWebsocketUrl = "ws://localhost:" + zeppelinPort + "/ws";
|
||||
private ExecutorService executor;
|
||||
private MockEchoWebsocketServer echoServer;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
startWebsocketServer();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
//tear down routine
|
||||
echoServer.stop();
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
private void startWebsocketServer() throws InterruptedException {
|
||||
// mock zeppelin websocket server setup
|
||||
executor = Executors.newFixedThreadPool(1);
|
||||
echoServer = new MockEchoWebsocketServer(zeppelinPort);
|
||||
executor.submit(echoServer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void zeppelinConnectionTest() {
|
||||
try {
|
||||
// Wait for websocket server to start
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Cannot wait for websocket server to start, returning");
|
||||
return;
|
||||
}
|
||||
// Initialize and start Zeppelin client
|
||||
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "dummy token");
|
||||
client.start();
|
||||
LOG.info("Zeppelin websocket client started");
|
||||
|
||||
// Connection to note AAAA
|
||||
Session connectionA = client.getZeppelinConnection("AAAA");
|
||||
assertNotNull(connectionA);
|
||||
assertTrue(connectionA.isOpen());
|
||||
|
||||
assertEquals(client.countConnectedNotes(), 1);
|
||||
assertEquals(connectionA, client.getZeppelinConnection("AAAA"));
|
||||
|
||||
// Connection to note BBBB
|
||||
Session connectionB = client.getZeppelinConnection("BBBB");
|
||||
assertNotNull(connectionB);
|
||||
assertTrue(connectionB.isOpen());
|
||||
|
||||
assertEquals(client.countConnectedNotes(), 2);
|
||||
assertEquals(connectionB, client.getZeppelinConnection("BBBB"));
|
||||
|
||||
// Remove connection to note AAAA
|
||||
client.removeZeppelinConnection("AAAA");
|
||||
assertEquals(client.countConnectedNotes(), 1);
|
||||
assertNotEquals(connectionA, client.getZeppelinConnection("AAAA"));
|
||||
assertEquals(client.countConnectedNotes(), 2);
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void zeppelinClientSingletonTest() {
|
||||
ZeppelinClient client = ZeppelinClient.getInstance();
|
||||
assertNull(client);
|
||||
client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
|
||||
assertNotNull(client);
|
||||
client = ZeppelinClient.getInstance();
|
||||
assertNotNull(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void zeppelinMessageSerializationTest() {
|
||||
Message msg = new Message(OP.LIST_NOTES);
|
||||
msg.data = Maps.newHashMap();
|
||||
msg.data.put("key", "value");
|
||||
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
|
||||
String serializedMsg = client.serialize(msg);
|
||||
Message deserializedMsg = client.deserialize(serializedMsg);
|
||||
assertEquals(msg.op, deserializedMsg.op);
|
||||
assertEquals(msg.data.get("key"), deserializedMsg.data.get("key"));
|
||||
|
||||
String invalidMsg = "random text";
|
||||
deserializedMsg =client.deserialize(invalidMsg);
|
||||
assertNull(deserializedMsg);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendToZeppelinTest() {
|
||||
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
|
||||
client.start();
|
||||
Message msg = new Message(OP.LIST_NOTES);
|
||||
msg.data = Maps.newHashMap();
|
||||
msg.data.put("key", "value");
|
||||
client.send(msg, "DDDD");
|
||||
client.removeZeppelinConnection("DDDD");
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock;
|
||||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MockEchoWebsocketServer implements Runnable {
|
||||
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockEchoWebsocketServer.class);
|
||||
private Server server;
|
||||
|
||||
public MockEchoWebsocketServer(int port) {
|
||||
server = new Server();
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(port);
|
||||
server.addConnector(connector);
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
||||
context.setContextPath("/");
|
||||
server.setHandler(context);
|
||||
|
||||
//ServletHolder holderEvents = new ServletHolder("ws-events", MockEventServlet.class);
|
||||
context.addServlet(MockEventServlet.class, "/ws/*");
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
LOG.info("Starting mock echo websocket server");
|
||||
server.start();
|
||||
server.join();
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
LOG.info("Stopping mock echo websocket server");
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
this.start();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Couldn't start mock echo websocket server", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock;
|
||||
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public class MockEventServlet extends WebSocketServlet
|
||||
{
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory)
|
||||
{
|
||||
factory.register(MockEventSocket.class);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MockEventSocket extends WebSocketAdapter {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MockEventServlet.class);
|
||||
private Session session;
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session session) {
|
||||
super.onWebSocketConnect(session);
|
||||
this.session = session;
|
||||
LOG.info("Socket Connected: " + session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String message) {
|
||||
super.onWebSocketText(message);
|
||||
session.getRemote().sendStringByFuture(message);
|
||||
LOG.info("Received TEXT message: {}", message);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason) {
|
||||
super.onWebSocketClose(statusCode, reason);
|
||||
LOG.info("Socket Closed: [{}] {}", statusCode, reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable cause) {
|
||||
super.onWebSocketError(cause);
|
||||
LOG.error("Websocket error: {}", cause);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue