Merge branch 'master' into notebook-search

Conflicts:
	zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
This commit is contained in:
Alexander Bezzubov 2015-12-22 16:39:11 +09:00
commit 6a3906f6d2
5 changed files with 62 additions and 75 deletions

View file

@ -60,55 +60,65 @@ import org.slf4j.LoggerFactory;
* Main class of Zeppelin.
*
*/
public class ZeppelinServer extends Application {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class);
public static Notebook notebook;
public static NotebookServer notebookServer;
public static Server jettyServer;
public static Server jettyWebServer;
public static NotebookServer notebookWsServer;
private SchedulerFactory schedulerFactory;
private InterpreterFactory replFactory;
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
public static void main(String[] args) throws Exception {
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookWsServer);
this.notebookRepo = new NotebookRepoSync(conf);
notebook = new Notebook(conf,
notebookRepo, schedulerFactory, replFactory, notebookWsServer, notebookIndex);
}
public static void main(String[] args) throws InterruptedException {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
conf.setProperty("args", args);
jettyServer = setupJettyServer(conf);
// REST api
final ServletContextHandler restApi = setupRestApiContextHandler(conf);
final ServletContextHandler restApiContext = setupRestApiContextHandler(conf);
// Notebook server
final ServletContextHandler notebook = setupNotebookServer(conf);
final ServletContextHandler notebookContext = setupNotebookServer(conf);
// Web UI
final WebAppContext webApp = setupWebAppContext(conf);
// add all handlers
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(new Handler[]{restApi, notebook, webApp});
jettyServer.setHandler(contexts);
contexts.setHandlers(new Handler[]{restApiContext, notebookContext, webApp});
LOG.info("Start zeppelin server");
jettyWebServer = setupJettyServer(conf);
jettyWebServer.setHandler(contexts);
LOG.info("Starting zeppelin server");
try {
jettyServer.start();
jettyWebServer.start(); //Instantiates ZeppelinServer
} catch (Exception e) {
LOG.error("Error while running jettyServer", e);
System.exit(-1);
}
LOG.info("Started zeppelin server");
LOG.info("Done, zeppelin server started");
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override public void run() {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyServer.stop();
ZeppelinServer.notebook.getInterpreterFactory().close();
ZeppelinServer.notebook.close();
jettyWebServer.stop();
notebook.getInterpreterFactory().close();
notebook.close();
} catch (Exception e) {
LOG.error("Error while stopping servlet container", e);
}
@ -127,18 +137,15 @@ public class ZeppelinServer extends Application {
System.exit(0);
}
jettyServer.join();
jettyWebServer.join();
ZeppelinServer.notebook.getInterpreterFactory().close();
}
private static Server setupJettyServer(ZeppelinConfiguration conf)
throws Exception {
private static Server setupJettyServer(ZeppelinConfiguration conf) {
AbstractConnector connector;
if (conf.useSsl()) {
connector = new SslSelectChannelConnector(getSslContextFactory(conf));
}
else {
} else {
connector = new SelectChannelConnector();
}
@ -155,11 +162,9 @@ public class ZeppelinServer extends Application {
return server;
}
private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf)
throws Exception {
notebookServer = new NotebookServer();
final ServletHolder servletHolder = new ServletHolder(notebookServer);
private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) {
notebookWsServer = new NotebookServer();
final ServletHolder servletHolder = new ServletHolder(notebookWsServer);
servletHolder.setInitParameter("maxTextMessageSize", "1024000");
final ServletContextHandler cxfContext = new ServletContextHandler(
@ -173,9 +178,8 @@ public class ZeppelinServer extends Application {
return cxfContext;
}
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf)
throws Exception {
@SuppressWarnings("deprecation")
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) {
// Note that the API for the SslContextFactory is different for
// Jetty version 9
SslContextFactory sslContextFactory = new SslContextFactory();
@ -196,6 +200,7 @@ public class ZeppelinServer extends Application {
return sslContextFactory;
}
@SuppressWarnings("unused") //TODO(bzz) why unused?
private static SSLContext getSslContext(ZeppelinConfiguration conf)
throws Exception {
@ -242,25 +247,10 @@ public class ZeppelinServer extends Application {
webApp.setTempDirectory(warTempDirectory);
}
// Explicit bind to root
webApp.addServlet(
new ServletHolder(new DefaultServlet()),
"/*"
);
webApp.addServlet(new ServletHolder(new DefaultServlet()), "/*");
return webApp;
}
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookServer);
this.notebookIndex = new SearchService();
this.notebookRepo = new NotebookRepoSync(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory,
notebookServer, notebookIndex);
}
@Override
public Set<Class<?>> getClasses() {
Set<Class<?>> classes = new HashSet<Class<?>>();
@ -268,14 +258,14 @@ public class ZeppelinServer extends Application {
}
@Override
public java.util.Set<java.lang.Object> getSingletons() {
Set<Object> singletons = new HashSet<Object>();
public Set<Object> getSingletons() {
Set<Object> singletons = new HashSet<>();
/** Rest-api root endpoint */
ZeppelinRestApi root = new ZeppelinRestApi();
singletons.add(root);
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer, notebookIndex);
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer, notebookIndex);
singletons.add(notebookApi);
InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);

View file

@ -20,6 +20,8 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@ -45,6 +47,7 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import com.google.gson.Gson;
@ -57,7 +60,7 @@ public class NotebookServer extends WebSocketServlet implements
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
Gson gson = new Gson();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
final List<NotebookSocket> connectedSockets = new LinkedList<>();
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
private Notebook notebook() {
return ZeppelinServer.notebook;
@ -84,9 +87,7 @@ public class NotebookServer extends WebSocketServlet implements
public void onOpen(NotebookSocket conn) {
LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
conn.getRequest().getRemotePort());
synchronized (connectedSockets) {
connectedSockets.add(conn);
}
connectedSockets.add(conn);
}
@Override
@ -146,8 +147,7 @@ public class NotebookServer extends WebSocketServlet implements
completion(conn, notebook, messagereceived);
break;
case PING:
pong();
break;
break; //do nothing
case ANGULAR_OBJECT_UPDATED:
angularObjectUpdated(conn, notebook, messagereceived);
break;
@ -165,9 +165,7 @@ public class NotebookServer extends WebSocketServlet implements
LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest()
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
removeConnectionFromAllNote(conn);
synchronized (connectedSockets) {
connectedSockets.remove(conn);
}
connectedSockets.remove(conn);
}
protected Message deserializeMessage(String msg) {
@ -284,13 +282,11 @@ public class NotebookServer extends WebSocketServlet implements
}
private void broadcastAll(Message m) {
synchronized (connectedSockets) {
for (NotebookSocket conn : connectedSockets) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
LOG.error("socket error", e);
}
for (NotebookSocket conn : connectedSockets) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
LOG.error("socket error", e);
}
}
}
@ -729,6 +725,7 @@ public class NotebookServer extends WebSocketServlet implements
public static class ParagraphJobListener implements JobListener {
private NotebookServer notebookServer;
private Note note;
public ParagraphJobListener(NotebookServer notebookServer, Note note) {
this.notebookServer = notebookServer;
this.note = note;
@ -770,8 +767,6 @@ public class NotebookServer extends WebSocketServlet implements
public JobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener(this, note);
}
private void pong() {
}
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
List<InterpreterSetting> settings = note.getNoteReplLoader()

View file

@ -29,8 +29,12 @@ import java.util.concurrent.Executors;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethodBase;
import org.apache.commons.httpclient.methods.*;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.PutMethod;
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterSetting;
@ -207,7 +211,7 @@ public abstract class AbstractTestRestApi {
}
LOG.info("Terminating test Zeppelin...");
ZeppelinServer.jettyServer.stop();
ZeppelinServer.jettyWebServer.stop();
executor.shutdown();
long s = System.currentTimeMillis();

View file

@ -60,7 +60,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
AbstractTestRestApi.startUp();
gson = new Gson();
notebook = ZeppelinServer.notebook;
notebookServer = ZeppelinServer.notebookServer;
notebookServer = ZeppelinServer.notebookWsServer;
}
@AfterClass

View file

@ -18,12 +18,12 @@
package org.apache.zeppelin.conf;
import java.net.URL;
import java.util.*;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.configuration.tree.ConfigurationNode;
import org.apache.zeppelin.notebook.repo.S3NotebookRepo;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
/**
* Zeppelin configuration.
*
* @author Leemoonsoo
*
*/
public class ZeppelinConfiguration extends XMLConfiguration {
private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";