diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index ee5b0f9e4c..8ae5e7c713 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -60,55 +60,65 @@ import org.slf4j.LoggerFactory; * Main class of Zeppelin. * */ - public class ZeppelinServer extends Application { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class); public static Notebook notebook; - public static NotebookServer notebookServer; - public static Server jettyServer; + public static Server jettyWebServer; + public static NotebookServer notebookWsServer; private SchedulerFactory schedulerFactory; private InterpreterFactory replFactory; private NotebookRepo notebookRepo; private SearchService notebookIndex; - public static void main(String[] args) throws Exception { + public ZeppelinServer() throws Exception { + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + + this.schedulerFactory = new SchedulerFactory(); + this.replFactory = new InterpreterFactory(conf, notebookWsServer); + this.notebookRepo = new NotebookRepoSync(conf); + + notebook = new Notebook(conf, + notebookRepo, schedulerFactory, replFactory, notebookWsServer, notebookIndex); + } + + public static void main(String[] args) throws InterruptedException { ZeppelinConfiguration conf = ZeppelinConfiguration.create(); conf.setProperty("args", args); - jettyServer = setupJettyServer(conf); - // REST api - final ServletContextHandler restApi = setupRestApiContextHandler(conf); + final ServletContextHandler restApiContext = setupRestApiContextHandler(conf); // Notebook server - final ServletContextHandler notebook = setupNotebookServer(conf); + final ServletContextHandler notebookContext = setupNotebookServer(conf); // Web UI final WebAppContext webApp = setupWebAppContext(conf); // add all handlers ContextHandlerCollection contexts = new ContextHandlerCollection(); - contexts.setHandlers(new Handler[]{restApi, notebook, webApp}); - jettyServer.setHandler(contexts); + contexts.setHandlers(new Handler[]{restApiContext, notebookContext, webApp}); - LOG.info("Start zeppelin server"); + jettyWebServer = setupJettyServer(conf); + jettyWebServer.setHandler(contexts); + + LOG.info("Starting zeppelin server"); try { - jettyServer.start(); + jettyWebServer.start(); //Instantiates ZeppelinServer } catch (Exception e) { LOG.error("Error while running jettyServer", e); System.exit(-1); } - LOG.info("Started zeppelin server"); + LOG.info("Done, zeppelin server started"); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { LOG.info("Shutting down Zeppelin Server ... "); try { - jettyServer.stop(); - ZeppelinServer.notebook.getInterpreterFactory().close(); - ZeppelinServer.notebook.close(); + jettyWebServer.stop(); + notebook.getInterpreterFactory().close(); + notebook.close(); } catch (Exception e) { LOG.error("Error while stopping servlet container", e); } @@ -127,18 +137,15 @@ public class ZeppelinServer extends Application { System.exit(0); } - jettyServer.join(); + jettyWebServer.join(); ZeppelinServer.notebook.getInterpreterFactory().close(); } - private static Server setupJettyServer(ZeppelinConfiguration conf) - throws Exception { - + private static Server setupJettyServer(ZeppelinConfiguration conf) { AbstractConnector connector; if (conf.useSsl()) { connector = new SslSelectChannelConnector(getSslContextFactory(conf)); - } - else { + } else { connector = new SelectChannelConnector(); } @@ -155,11 +162,9 @@ public class ZeppelinServer extends Application { return server; } - private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) - throws Exception { - - notebookServer = new NotebookServer(); - final ServletHolder servletHolder = new ServletHolder(notebookServer); + private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) { + notebookWsServer = new NotebookServer(); + final ServletHolder servletHolder = new ServletHolder(notebookWsServer); servletHolder.setInitParameter("maxTextMessageSize", "1024000"); final ServletContextHandler cxfContext = new ServletContextHandler( @@ -173,9 +178,8 @@ public class ZeppelinServer extends Application { return cxfContext; } - private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) - throws Exception { - + @SuppressWarnings("deprecation") + private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) { // Note that the API for the SslContextFactory is different for // Jetty version 9 SslContextFactory sslContextFactory = new SslContextFactory(); @@ -196,6 +200,7 @@ public class ZeppelinServer extends Application { return sslContextFactory; } + @SuppressWarnings("unused") //TODO(bzz) why unused? private static SSLContext getSslContext(ZeppelinConfiguration conf) throws Exception { @@ -242,25 +247,10 @@ public class ZeppelinServer extends Application { webApp.setTempDirectory(warTempDirectory); } // Explicit bind to root - webApp.addServlet( - new ServletHolder(new DefaultServlet()), - "/*" - ); + webApp.addServlet(new ServletHolder(new DefaultServlet()), "/*"); return webApp; } - public ZeppelinServer() throws Exception { - ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - - this.schedulerFactory = new SchedulerFactory(); - this.replFactory = new InterpreterFactory(conf, notebookServer); - this.notebookIndex = new SearchService(); - this.notebookRepo = new NotebookRepoSync(conf); - - notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, - notebookServer, notebookIndex); - } - @Override public Set> getClasses() { Set> classes = new HashSet>(); @@ -268,14 +258,14 @@ public class ZeppelinServer extends Application { } @Override - public java.util.Set getSingletons() { - Set singletons = new HashSet(); + public Set getSingletons() { + Set singletons = new HashSet<>(); /** Rest-api root endpoint */ ZeppelinRestApi root = new ZeppelinRestApi(); singletons.add(root); - NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer, notebookIndex); + NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer, notebookIndex); singletons.add(notebookApi); InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index a0d8354ee7..554f68cf07 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + import javax.servlet.http.HttpServletRequest; import org.apache.zeppelin.conf.ZeppelinConfiguration; @@ -45,6 +47,7 @@ import org.eclipse.jetty.websocket.WebSocketServlet; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.common.base.Strings; import com.google.gson.Gson; @@ -57,7 +60,7 @@ public class NotebookServer extends WebSocketServlet implements private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); Gson gson = new Gson(); final Map> noteSocketMap = new HashMap<>(); - final List connectedSockets = new LinkedList<>(); + final Queue connectedSockets = new ConcurrentLinkedQueue<>(); private Notebook notebook() { return ZeppelinServer.notebook; @@ -84,9 +87,7 @@ public class NotebookServer extends WebSocketServlet implements public void onOpen(NotebookSocket conn) { LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), conn.getRequest().getRemotePort()); - synchronized (connectedSockets) { - connectedSockets.add(conn); - } + connectedSockets.add(conn); } @Override @@ -146,8 +147,7 @@ public class NotebookServer extends WebSocketServlet implements completion(conn, notebook, messagereceived); break; case PING: - pong(); - break; + break; //do nothing case ANGULAR_OBJECT_UPDATED: angularObjectUpdated(conn, notebook, messagereceived); break; @@ -165,9 +165,7 @@ public class NotebookServer extends WebSocketServlet implements LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); - synchronized (connectedSockets) { - connectedSockets.remove(conn); - } + connectedSockets.remove(conn); } protected Message deserializeMessage(String msg) { @@ -284,13 +282,11 @@ public class NotebookServer extends WebSocketServlet implements } private void broadcastAll(Message m) { - synchronized (connectedSockets) { - for (NotebookSocket conn : connectedSockets) { - try { - conn.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("socket error", e); - } + for (NotebookSocket conn : connectedSockets) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); } } } @@ -729,6 +725,7 @@ public class NotebookServer extends WebSocketServlet implements public static class ParagraphJobListener implements JobListener { private NotebookServer notebookServer; private Note note; + public ParagraphJobListener(NotebookServer notebookServer, Note note) { this.notebookServer = notebookServer; this.note = note; @@ -770,8 +767,6 @@ public class NotebookServer extends WebSocketServlet implements public JobListener getParagraphJobListener(Note note) { return new ParagraphJobListener(this, note); } - private void pong() { - } private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { List settings = note.getNoteReplLoader() diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index db7affe7c4..69d10228b4 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -29,8 +29,12 @@ import java.util.concurrent.Executors; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpMethodBase; -import org.apache.commons.httpclient.methods.*; -import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; +import org.apache.commons.httpclient.methods.DeleteMethod; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.PutMethod; +import org.apache.commons.httpclient.methods.RequestEntity; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterSetting; @@ -207,7 +211,7 @@ public abstract class AbstractTestRestApi { } LOG.info("Terminating test Zeppelin..."); - ZeppelinServer.jettyServer.stop(); + ZeppelinServer.jettyWebServer.stop(); executor.shutdown(); long s = System.currentTimeMillis(); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index faef28709e..67d12b7edf 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -60,7 +60,7 @@ public class NotebookServerTest extends AbstractTestRestApi { AbstractTestRestApi.startUp(); gson = new Gson(); notebook = ZeppelinServer.notebook; - notebookServer = ZeppelinServer.notebookServer; + notebookServer = ZeppelinServer.notebookWsServer; } @AfterClass diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 909345a1c4..72b6a3ce4a 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -18,12 +18,12 @@ package org.apache.zeppelin.conf; import java.net.URL; -import java.util.*; +import java.util.Arrays; +import java.util.List; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.configuration.tree.ConfigurationNode; -import org.apache.zeppelin.notebook.repo.S3NotebookRepo; import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory; /** * Zeppelin configuration. * - * @author Leemoonsoo - * */ public class ZeppelinConfiguration extends XMLConfiguration { private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";