mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' into notebook-search
Conflicts: zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
This commit is contained in:
commit
6a3906f6d2
5 changed files with 62 additions and 75 deletions
|
|
@ -60,55 +60,65 @@ import org.slf4j.LoggerFactory;
|
|||
* Main class of Zeppelin.
|
||||
*
|
||||
*/
|
||||
|
||||
public class ZeppelinServer extends Application {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class);
|
||||
|
||||
public static Notebook notebook;
|
||||
public static NotebookServer notebookServer;
|
||||
public static Server jettyServer;
|
||||
public static Server jettyWebServer;
|
||||
public static NotebookServer notebookWsServer;
|
||||
|
||||
private SchedulerFactory schedulerFactory;
|
||||
private InterpreterFactory replFactory;
|
||||
private NotebookRepo notebookRepo;
|
||||
private SearchService notebookIndex;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
public ZeppelinServer() throws Exception {
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
|
||||
this.schedulerFactory = new SchedulerFactory();
|
||||
this.replFactory = new InterpreterFactory(conf, notebookWsServer);
|
||||
this.notebookRepo = new NotebookRepoSync(conf);
|
||||
|
||||
notebook = new Notebook(conf,
|
||||
notebookRepo, schedulerFactory, replFactory, notebookWsServer, notebookIndex);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
conf.setProperty("args", args);
|
||||
|
||||
jettyServer = setupJettyServer(conf);
|
||||
|
||||
// REST api
|
||||
final ServletContextHandler restApi = setupRestApiContextHandler(conf);
|
||||
final ServletContextHandler restApiContext = setupRestApiContextHandler(conf);
|
||||
|
||||
// Notebook server
|
||||
final ServletContextHandler notebook = setupNotebookServer(conf);
|
||||
final ServletContextHandler notebookContext = setupNotebookServer(conf);
|
||||
|
||||
// Web UI
|
||||
final WebAppContext webApp = setupWebAppContext(conf);
|
||||
|
||||
// add all handlers
|
||||
ContextHandlerCollection contexts = new ContextHandlerCollection();
|
||||
contexts.setHandlers(new Handler[]{restApi, notebook, webApp});
|
||||
jettyServer.setHandler(contexts);
|
||||
contexts.setHandlers(new Handler[]{restApiContext, notebookContext, webApp});
|
||||
|
||||
LOG.info("Start zeppelin server");
|
||||
jettyWebServer = setupJettyServer(conf);
|
||||
jettyWebServer.setHandler(contexts);
|
||||
|
||||
LOG.info("Starting zeppelin server");
|
||||
try {
|
||||
jettyServer.start();
|
||||
jettyWebServer.start(); //Instantiates ZeppelinServer
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error while running jettyServer", e);
|
||||
System.exit(-1);
|
||||
}
|
||||
LOG.info("Started zeppelin server");
|
||||
LOG.info("Done, zeppelin server started");
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(){
|
||||
@Override public void run() {
|
||||
LOG.info("Shutting down Zeppelin Server ... ");
|
||||
try {
|
||||
jettyServer.stop();
|
||||
ZeppelinServer.notebook.getInterpreterFactory().close();
|
||||
ZeppelinServer.notebook.close();
|
||||
jettyWebServer.stop();
|
||||
notebook.getInterpreterFactory().close();
|
||||
notebook.close();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error while stopping servlet container", e);
|
||||
}
|
||||
|
|
@ -127,18 +137,15 @@ public class ZeppelinServer extends Application {
|
|||
System.exit(0);
|
||||
}
|
||||
|
||||
jettyServer.join();
|
||||
jettyWebServer.join();
|
||||
ZeppelinServer.notebook.getInterpreterFactory().close();
|
||||
}
|
||||
|
||||
private static Server setupJettyServer(ZeppelinConfiguration conf)
|
||||
throws Exception {
|
||||
|
||||
private static Server setupJettyServer(ZeppelinConfiguration conf) {
|
||||
AbstractConnector connector;
|
||||
if (conf.useSsl()) {
|
||||
connector = new SslSelectChannelConnector(getSslContextFactory(conf));
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
connector = new SelectChannelConnector();
|
||||
}
|
||||
|
||||
|
|
@ -155,11 +162,9 @@ public class ZeppelinServer extends Application {
|
|||
return server;
|
||||
}
|
||||
|
||||
private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf)
|
||||
throws Exception {
|
||||
|
||||
notebookServer = new NotebookServer();
|
||||
final ServletHolder servletHolder = new ServletHolder(notebookServer);
|
||||
private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) {
|
||||
notebookWsServer = new NotebookServer();
|
||||
final ServletHolder servletHolder = new ServletHolder(notebookWsServer);
|
||||
servletHolder.setInitParameter("maxTextMessageSize", "1024000");
|
||||
|
||||
final ServletContextHandler cxfContext = new ServletContextHandler(
|
||||
|
|
@ -173,9 +178,8 @@ public class ZeppelinServer extends Application {
|
|||
return cxfContext;
|
||||
}
|
||||
|
||||
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf)
|
||||
throws Exception {
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) {
|
||||
// Note that the API for the SslContextFactory is different for
|
||||
// Jetty version 9
|
||||
SslContextFactory sslContextFactory = new SslContextFactory();
|
||||
|
|
@ -196,6 +200,7 @@ public class ZeppelinServer extends Application {
|
|||
return sslContextFactory;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused") //TODO(bzz) why unused?
|
||||
private static SSLContext getSslContext(ZeppelinConfiguration conf)
|
||||
throws Exception {
|
||||
|
||||
|
|
@ -242,25 +247,10 @@ public class ZeppelinServer extends Application {
|
|||
webApp.setTempDirectory(warTempDirectory);
|
||||
}
|
||||
// Explicit bind to root
|
||||
webApp.addServlet(
|
||||
new ServletHolder(new DefaultServlet()),
|
||||
"/*"
|
||||
);
|
||||
webApp.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
||||
return webApp;
|
||||
}
|
||||
|
||||
public ZeppelinServer() throws Exception {
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
|
||||
this.schedulerFactory = new SchedulerFactory();
|
||||
this.replFactory = new InterpreterFactory(conf, notebookServer);
|
||||
this.notebookIndex = new SearchService();
|
||||
this.notebookRepo = new NotebookRepoSync(conf);
|
||||
|
||||
notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory,
|
||||
notebookServer, notebookIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Class<?>> getClasses() {
|
||||
Set<Class<?>> classes = new HashSet<Class<?>>();
|
||||
|
|
@ -268,14 +258,14 @@ public class ZeppelinServer extends Application {
|
|||
}
|
||||
|
||||
@Override
|
||||
public java.util.Set<java.lang.Object> getSingletons() {
|
||||
Set<Object> singletons = new HashSet<Object>();
|
||||
public Set<Object> getSingletons() {
|
||||
Set<Object> singletons = new HashSet<>();
|
||||
|
||||
/** Rest-api root endpoint */
|
||||
ZeppelinRestApi root = new ZeppelinRestApi();
|
||||
singletons.add(root);
|
||||
|
||||
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer, notebookIndex);
|
||||
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer, notebookIndex);
|
||||
singletons.add(notebookApi);
|
||||
|
||||
InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);
|
||||
|
|
|
|||
|
|
@ -20,6 +20,8 @@ import java.io.IOException;
|
|||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
|
|
@ -45,6 +47,7 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
|
|||
import org.quartz.SchedulerException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.gson.Gson;
|
||||
|
||||
|
|
@ -57,7 +60,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
|
||||
Gson gson = new Gson();
|
||||
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
|
||||
final List<NotebookSocket> connectedSockets = new LinkedList<>();
|
||||
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private Notebook notebook() {
|
||||
return ZeppelinServer.notebook;
|
||||
|
|
@ -84,9 +87,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
public void onOpen(NotebookSocket conn) {
|
||||
LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
|
||||
conn.getRequest().getRemotePort());
|
||||
synchronized (connectedSockets) {
|
||||
connectedSockets.add(conn);
|
||||
}
|
||||
connectedSockets.add(conn);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -146,8 +147,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
completion(conn, notebook, messagereceived);
|
||||
break;
|
||||
case PING:
|
||||
pong();
|
||||
break;
|
||||
break; //do nothing
|
||||
case ANGULAR_OBJECT_UPDATED:
|
||||
angularObjectUpdated(conn, notebook, messagereceived);
|
||||
break;
|
||||
|
|
@ -165,9 +165,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest()
|
||||
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
|
||||
removeConnectionFromAllNote(conn);
|
||||
synchronized (connectedSockets) {
|
||||
connectedSockets.remove(conn);
|
||||
}
|
||||
connectedSockets.remove(conn);
|
||||
}
|
||||
|
||||
protected Message deserializeMessage(String msg) {
|
||||
|
|
@ -284,13 +282,11 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
}
|
||||
|
||||
private void broadcastAll(Message m) {
|
||||
synchronized (connectedSockets) {
|
||||
for (NotebookSocket conn : connectedSockets) {
|
||||
try {
|
||||
conn.send(serializeMessage(m));
|
||||
} catch (IOException e) {
|
||||
LOG.error("socket error", e);
|
||||
}
|
||||
for (NotebookSocket conn : connectedSockets) {
|
||||
try {
|
||||
conn.send(serializeMessage(m));
|
||||
} catch (IOException e) {
|
||||
LOG.error("socket error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -729,6 +725,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
public static class ParagraphJobListener implements JobListener {
|
||||
private NotebookServer notebookServer;
|
||||
private Note note;
|
||||
|
||||
public ParagraphJobListener(NotebookServer notebookServer, Note note) {
|
||||
this.notebookServer = notebookServer;
|
||||
this.note = note;
|
||||
|
|
@ -770,8 +767,6 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
public JobListener getParagraphJobListener(Note note) {
|
||||
return new ParagraphJobListener(this, note);
|
||||
}
|
||||
private void pong() {
|
||||
}
|
||||
|
||||
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
|
||||
List<InterpreterSetting> settings = note.getNoteReplLoader()
|
||||
|
|
|
|||
|
|
@ -29,8 +29,12 @@ import java.util.concurrent.Executors;
|
|||
|
||||
import org.apache.commons.httpclient.HttpClient;
|
||||
import org.apache.commons.httpclient.HttpMethodBase;
|
||||
import org.apache.commons.httpclient.methods.*;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
|
||||
import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
|
||||
import org.apache.commons.httpclient.methods.DeleteMethod;
|
||||
import org.apache.commons.httpclient.methods.GetMethod;
|
||||
import org.apache.commons.httpclient.methods.PostMethod;
|
||||
import org.apache.commons.httpclient.methods.PutMethod;
|
||||
import org.apache.commons.httpclient.methods.RequestEntity;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
|
|
@ -207,7 +211,7 @@ public abstract class AbstractTestRestApi {
|
|||
}
|
||||
|
||||
LOG.info("Terminating test Zeppelin...");
|
||||
ZeppelinServer.jettyServer.stop();
|
||||
ZeppelinServer.jettyWebServer.stop();
|
||||
executor.shutdown();
|
||||
|
||||
long s = System.currentTimeMillis();
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
|
|||
AbstractTestRestApi.startUp();
|
||||
gson = new Gson();
|
||||
notebook = ZeppelinServer.notebook;
|
||||
notebookServer = ZeppelinServer.notebookServer;
|
||||
notebookServer = ZeppelinServer.notebookWsServer;
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
|||
|
|
@ -18,12 +18,12 @@
|
|||
package org.apache.zeppelin.conf;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.*;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.configuration.ConfigurationException;
|
||||
import org.apache.commons.configuration.XMLConfiguration;
|
||||
import org.apache.commons.configuration.tree.ConfigurationNode;
|
||||
import org.apache.zeppelin.notebook.repo.S3NotebookRepo;
|
||||
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Zeppelin configuration.
|
||||
*
|
||||
* @author Leemoonsoo
|
||||
*
|
||||
*/
|
||||
public class ZeppelinConfiguration extends XMLConfiguration {
|
||||
private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";
|
||||
|
|
|
|||
Loading…
Reference in a new issue