mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Application output
This commit is contained in:
parent
b239f1b96b
commit
9f5c493e41
30 changed files with 764 additions and 184 deletions
|
|
@ -16,16 +16,20 @@
|
|||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
|
||||
/**
|
||||
* ApplicationContext
|
||||
*/
|
||||
public class ApplicationContext {
|
||||
private final String noteId;
|
||||
private final String paragraphId;
|
||||
public final InterpreterOutput out;
|
||||
|
||||
public ApplicationContext(String noteId, String paragraphId) {
|
||||
public ApplicationContext(String noteId, String paragraphId, InterpreterOutput out) {
|
||||
this.noteId = noteId;
|
||||
this.paragraphId = paragraphId;
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
public String getNoteId() {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
/**
|
||||
* Event from HeliumApplication running on remote interpreter process
|
||||
*/
|
||||
public interface ApplicationEventListener {
|
||||
public void onOutputAppend(String noteId, String paragraphId, String appId, String output);
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output);
|
||||
|
||||
}
|
||||
|
|
@ -22,6 +22,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.helium.Application;
|
||||
import org.apache.zeppelin.helium.ApplicationException;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import org.apache.thrift.TException;
|
|||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
|
||||
|
|
@ -41,6 +42,7 @@ import com.google.gson.reflect.TypeToken;
|
|||
*/
|
||||
public class RemoteInterpreter extends Interpreter {
|
||||
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
|
||||
private final ApplicationEventListener applicationEventListener;
|
||||
Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
|
||||
Gson gson = new Gson();
|
||||
private String interpreterRunner;
|
||||
|
|
@ -56,14 +58,15 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private static String schedulerName;
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
String localRepoPath,
|
||||
int connectTimeout,
|
||||
int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
String noteId,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
String localRepoPath,
|
||||
int connectTimeout,
|
||||
int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener) {
|
||||
super(property);
|
||||
this.noteId = noteId;
|
||||
this.className = className;
|
||||
|
|
@ -75,17 +78,19 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.connectTimeout = connectTimeout;
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.applicationEventListener = appListener;
|
||||
}
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
String localRepoPath,
|
||||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
String noteId,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
String localRepoPath,
|
||||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener) {
|
||||
super(property);
|
||||
this.className = className;
|
||||
this.noteId = noteId;
|
||||
|
|
@ -96,6 +101,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.connectTimeout = connectTimeout;
|
||||
this.maxPoolSize = 10;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.applicationEventListener = appListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -114,7 +120,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
// create new remote process
|
||||
RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
|
||||
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
|
||||
remoteInterpreterProcessListener);
|
||||
remoteInterpreterProcessListener, applicationEventListener);
|
||||
|
||||
intpGroup.setRemoteInterpreterProcess(remoteProcess);
|
||||
}
|
||||
|
|
@ -423,4 +429,12 @@ public class RemoteInterpreter extends Interpreter {
|
|||
client.angularRegistryPush(gson.toJson(registry, registryType));
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, String> getEnv() {
|
||||
return env;
|
||||
}
|
||||
|
||||
public void setEnv(Map<String, String> env) {
|
||||
this.env = env;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -242,4 +242,28 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
}
|
||||
}
|
||||
|
||||
public void onAppOutputAppend(String noteId, String paragraphId, String appId, String output) {
|
||||
Map<String, String> appendOutput = new HashMap<String, String>();
|
||||
appendOutput.put("noteId", noteId);
|
||||
appendOutput.put("paragraphId", paragraphId);
|
||||
appendOutput.put("appId", appId);
|
||||
appendOutput.put("data", output);
|
||||
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
RemoteInterpreterEventType.OUTPUT_APPEND,
|
||||
gson.toJson(appendOutput)));
|
||||
}
|
||||
|
||||
|
||||
public void onAppOutputUpdate(String noteId, String paragraphId, String appId, String output) {
|
||||
Map<String, String> appendOutput = new HashMap<String, String>();
|
||||
appendOutput.put("noteId", noteId);
|
||||
appendOutput.put("paragraphId", paragraphId);
|
||||
appendOutput.put("appId", appId);
|
||||
appendOutput.put("data", output);
|
||||
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
RemoteInterpreterEventType.OUTPUT_UPDATE,
|
||||
gson.toJson(appendOutput)));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import com.google.gson.reflect.TypeToken;
|
|||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
|
|
@ -47,14 +48,18 @@ import java.util.Map;
|
|||
public class RemoteInterpreterEventPoller extends Thread {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
|
||||
private final RemoteInterpreterProcessListener listener;
|
||||
private final ApplicationEventListener appListener;
|
||||
|
||||
private volatile boolean shutdown;
|
||||
|
||||
private RemoteInterpreterProcess interpreterProcess;
|
||||
private InterpreterGroup interpreterGroup;
|
||||
|
||||
public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) {
|
||||
public RemoteInterpreterEventPoller(
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener) {
|
||||
this.listener = listener;
|
||||
this.appListener = appListener;
|
||||
shutdown = false;
|
||||
}
|
||||
|
||||
|
|
@ -141,8 +146,13 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
String noteId = outputAppend.get("noteId");
|
||||
String paragraphId = outputAppend.get("paragraphId");
|
||||
String outputToAppend = outputAppend.get("data");
|
||||
String appId = outputAppend.get("appId");
|
||||
|
||||
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
|
||||
if (appId == null) {
|
||||
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
|
||||
} else {
|
||||
appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend);
|
||||
}
|
||||
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
|
||||
// on output update
|
||||
Map<String, String> outputAppend = gson.fromJson(
|
||||
|
|
@ -150,8 +160,13 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
String noteId = outputAppend.get("noteId");
|
||||
String paragraphId = outputAppend.get("paragraphId");
|
||||
String outputToUpdate = outputAppend.get("data");
|
||||
String appId = outputAppend.get("appId");
|
||||
|
||||
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
|
||||
if (appId == null) {
|
||||
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
|
||||
} else {
|
||||
appListener.onOutputUpdated(noteId, paragraphId, appId, outputToUpdate);
|
||||
}
|
||||
}
|
||||
logger.debug("Event from remoteproceess {}", event.getType());
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.commons.exec.*;
|
|||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPool;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
|
|
@ -53,17 +54,19 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
|
||||
private int connectTimeout;
|
||||
|
||||
public RemoteInterpreterProcess(String intpRunner,
|
||||
public RemoteInterpreterProcess(
|
||||
String intpRunner,
|
||||
String intpDir,
|
||||
String localRepoDir,
|
||||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener) {
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener) {
|
||||
this(intpRunner,
|
||||
intpDir,
|
||||
localRepoDir,
|
||||
env,
|
||||
new RemoteInterpreterEventPoller(listener),
|
||||
new RemoteInterpreterEventPoller(listener, appListener),
|
||||
connectTimeout);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -706,9 +706,28 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
}
|
||||
|
||||
private InterpreterOutput createAppOutput(final String noteId,
|
||||
final String paragraphId,
|
||||
final String appId) {
|
||||
return new InterpreterOutput(new InterpreterOutputListener() {
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
logger.info("Append output ----------------" + new String(line));
|
||||
eventClient.onAppOutputAppend(noteId, paragraphId, appId, new String(line));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(InterpreterOutput out, byte[] output) {
|
||||
eventClient.onAppOutputUpdate(noteId, paragraphId, appId, new String(output));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private ApplicationContext getApplicationContext(
|
||||
HeliumPackage packageInfo, String noteId, String paragraphId) {
|
||||
return new ApplicationContext(noteId, paragraphId);
|
||||
HeliumPackage packageInfo, String noteId, String paragraphId, String applicationInstanceId) {
|
||||
System.err.println("get app context ************");
|
||||
InterpreterOutput out = createAppOutput(noteId, paragraphId, applicationInstanceId);
|
||||
return new ApplicationContext(noteId, paragraphId, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -720,11 +739,12 @@ public class RemoteInterpreterServer
|
|||
return new RemoteApplicationResult(true, "");
|
||||
}
|
||||
HeliumPackage pkgInfo = gson.fromJson(packageInfo, HeliumPackage.class);
|
||||
ApplicationContext context = getApplicationContext(pkgInfo, noteId, paragraphId);
|
||||
ApplicationContext context = getApplicationContext(
|
||||
pkgInfo, noteId, paragraphId, applicationInstanceId);
|
||||
try {
|
||||
Application app = null;
|
||||
logger.info(
|
||||
"Loading application {}({}). artifact={}, className={} into note={}, paragraph={}",
|
||||
"Loading application {}({}), artifact={}, className={} into note={}, paragraph={}",
|
||||
pkgInfo.getName(),
|
||||
applicationInstanceId,
|
||||
pkgInfo.getArtifact(),
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ package org.apache.zeppelin.helium;
|
|||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
import org.apache.zeppelin.resource.LocalResourcePool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
|
@ -83,7 +85,18 @@ public class ApplicationLoaderTest {
|
|||
public ApplicationContext createContext(String noteId, String paragraphId) {
|
||||
ApplicationContext context1 = new ApplicationContext(
|
||||
noteId,
|
||||
paragraphId);
|
||||
paragraphId,
|
||||
new InterpreterOutput(new InterpreterOutputListener() {
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(InterpreterOutput out, byte[] output) {
|
||||
|
||||
}
|
||||
}));
|
||||
return context1;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,6 +68,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -66,7 +66,8 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
this);
|
||||
this,
|
||||
null);
|
||||
|
||||
intpGroup.get("note").add(intp);
|
||||
intp.setInterpreterGroup(intpGroup);
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ public class RemoteInterpreterProcessTest {
|
|||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
|
||||
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
|
||||
10 * 1000, null);
|
||||
10 * 1000, null, null);
|
||||
assertFalse(rip.isRunning());
|
||||
assertEquals(0, rip.referenceCount());
|
||||
assertEquals(1, rip.reference(intpGroup));
|
||||
|
|
|
|||
|
|
@ -83,6 +83,7 @@ public class RemoteInterpreterTest {
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
|
||||
|
|
@ -100,6 +101,7 @@ public class RemoteInterpreterTest {
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
|
||||
|
|
@ -198,6 +200,7 @@ public class RemoteInterpreterTest {
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null);
|
||||
|
||||
|
||||
|
|
@ -213,6 +216,7 @@ public class RemoteInterpreterTest {
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null);
|
||||
|
||||
intpGroup.get("note").add(intpB);
|
||||
|
|
@ -677,7 +681,7 @@ public class RemoteInterpreterTest {
|
|||
//Given
|
||||
final Client client = Mockito.mock(Client.class);
|
||||
final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
|
||||
MockInterpreterA.class.getName(), "runner", "path","localRepo", env, 10 * 1000, null);
|
||||
MockInterpreterA.class.getName(), "runner", "path","localRepo", env, 10 * 1000, null, null);
|
||||
final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
|
||||
registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
|
||||
final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
|
||||
|
|
|
|||
|
|
@ -65,6 +65,7 @@ public class DistributedResourcePoolTest {
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
@ -82,6 +83,7 @@ public class DistributedResourcePoolTest {
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
@ -106,11 +108,11 @@ public class DistributedResourcePoolTest {
|
|||
intp1.open();
|
||||
intp2.open();
|
||||
|
||||
eventPoller1 = new RemoteInterpreterEventPoller(null);
|
||||
eventPoller1 = new RemoteInterpreterEventPoller(null, null);
|
||||
eventPoller1.setInterpreterGroup(intpGroup1);
|
||||
eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
|
||||
|
||||
eventPoller2 = new RemoteInterpreterEventPoller(null);
|
||||
eventPoller2 = new RemoteInterpreterEventPoller(null, null);
|
||||
eventPoller2.setInterpreterGroup(intpGroup2);
|
||||
eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
|
||||
|
||||
|
|
|
|||
|
|
@ -76,7 +76,8 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
this);
|
||||
this,
|
||||
null);
|
||||
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup.get("note").add(intpA);
|
||||
|
|
@ -164,7 +165,8 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
this);
|
||||
this,
|
||||
null);
|
||||
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup.get("note").add(intpA);
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
|||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.helium.Helium;
|
||||
import org.apache.zeppelin.helium.HeliumApplicationFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.NotebookAuthorization;
|
||||
|
|
@ -76,6 +77,7 @@ public class ZeppelinServer extends Application {
|
|||
private NotebookAuthorization notebookAuthorization;
|
||||
private DependencyResolver depResolver;
|
||||
private Helium helium;
|
||||
private HeliumApplicationFactory heliumApplicationFactory;
|
||||
|
||||
public ZeppelinServer() throws Exception {
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
|
|
@ -84,15 +86,21 @@ public class ZeppelinServer extends Application {
|
|||
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
|
||||
|
||||
this.helium = new Helium(conf.getHeliumConfPath());
|
||||
this.heliumApplicationFactory = new HeliumApplicationFactory();
|
||||
this.schedulerFactory = new SchedulerFactory();
|
||||
this.replFactory = new InterpreterFactory(conf, notebookWsServer,
|
||||
notebookWsServer, depResolver);
|
||||
notebookWsServer, heliumApplicationFactory, depResolver);
|
||||
this.notebookRepo = new NotebookRepoSync(conf);
|
||||
this.notebookIndex = new LuceneSearch();
|
||||
this.notebookAuthorization = new NotebookAuthorization(conf);
|
||||
notebook = new Notebook(conf,
|
||||
notebookRepo, schedulerFactory, replFactory, notebookWsServer,
|
||||
notebookIndex, notebookAuthorization);
|
||||
|
||||
// to update notebook from application event from remote process.
|
||||
heliumApplicationFactory.setNotebook(notebook);
|
||||
// to update fire websocket event on application event.
|
||||
heliumApplicationFactory.setApplicationEventListener(notebookWsServer);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
|
|
|||
|
|
@ -109,10 +109,12 @@ public class Message {
|
|||
CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
|
||||
// @param settings serialized Map<String, String> object
|
||||
|
||||
CHECKPOINT_NOTEBOOK // [c-s] checkpoint notebook to storage repository
|
||||
CHECKPOINT_NOTEBOOK, // [c-s] checkpoint notebook to storage repository
|
||||
// @param noteId
|
||||
// @param checkpointName
|
||||
|
||||
APP_APPEND_OUTPUT, // [s-c] append output
|
||||
APP_UPDATE_OUTPUT // [s-c] update (replace) output
|
||||
}
|
||||
|
||||
public OP op;
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
|||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
|
|
@ -60,7 +61,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class NotebookServer extends WebSocketServlet implements
|
||||
NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener,
|
||||
RemoteInterpreterProcessListener {
|
||||
RemoteInterpreterProcessListener, ApplicationEventListener {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
|
||||
Gson gson = new Gson();
|
||||
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
|
||||
|
|
@ -965,7 +966,6 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("data", output);
|
||||
Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
|
||||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
|
|
@ -981,7 +981,40 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("data", output);
|
||||
Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
|
||||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* When application append output
|
||||
* @param noteId
|
||||
* @param paragraphId
|
||||
* @param appId
|
||||
* @param output
|
||||
*/
|
||||
@Override
|
||||
public void onOutputAppend(String noteId, String paragraphId, String appId, String output) {
|
||||
Message msg = new Message(OP.APP_APPEND_OUTPUT)
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("appId", appId)
|
||||
.put("data", output);
|
||||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* When application update output
|
||||
* @param noteId
|
||||
* @param paragraphId
|
||||
* @param appId
|
||||
* @param output
|
||||
*/
|
||||
@Override
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output) {
|
||||
Message msg = new Message(OP.APP_UPDATE_OUTPUT)
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("appId", appId)
|
||||
.put("data", output);
|
||||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,74 +19,60 @@ package org.apache.zeppelin.helium;
|
|||
import com.google.gson.Gson;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.notebook.ApplicationState;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* HeliumApplicationFactory
|
||||
*
|
||||
* 1. sync api -> async api
|
||||
* 2. unload app when paragraph / note / interpreter remove
|
||||
* 3. front-end job
|
||||
* 4. example app
|
||||
* 5. dev mode
|
||||
* 6. app launcher
|
||||
*/
|
||||
public class HeliumApplicationFactory {
|
||||
public class HeliumApplicationFactory implements ApplicationEventListener {
|
||||
Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
|
||||
|
||||
private final Gson gson;
|
||||
Map<String, RunningApplicationInfo> apps =
|
||||
Collections.synchronizedMap(new HashMap<String, RunningApplicationInfo>());
|
||||
|
||||
private Notebook notebook;
|
||||
private ApplicationEventListener applicationEventListener;
|
||||
|
||||
public HeliumApplicationFactory() {
|
||||
gson = new Gson();
|
||||
}
|
||||
|
||||
private static class RunningApplicationInfo {
|
||||
HeliumPackage pkg;
|
||||
Paragraph paragraph;
|
||||
RemoteInterpreterProcess process;
|
||||
|
||||
public RunningApplicationInfo(HeliumPackage pkg,
|
||||
RemoteInterpreterProcess process,
|
||||
Paragraph paragraph) {
|
||||
this.pkg = pkg;
|
||||
this.paragraph = paragraph;
|
||||
this.process = process;
|
||||
}
|
||||
|
||||
public HeliumPackage getPkg() {
|
||||
return pkg;
|
||||
}
|
||||
|
||||
public Paragraph getParagraph() {
|
||||
return paragraph;
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcess getProcess() {
|
||||
return process;
|
||||
}
|
||||
private static String generateApplicationId(HeliumPackage pkg, Paragraph paragraph) {
|
||||
return "app_" + paragraph.getNote().getId() + "-" + paragraph.getId() + pkg.getName();
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static String generateApplicationId() {
|
||||
return "app_" + System.currentTimeMillis() + "_"
|
||||
+ new Random(System.currentTimeMillis()).nextInt();
|
||||
private boolean isRemote(InterpreterGroup group) {
|
||||
return group.getAngularObjectRegistry() instanceof RemoteAngularObjectRegistry;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Load pkg
|
||||
*/
|
||||
public void load(HeliumPackage pkg, Paragraph paragraph) throws ApplicationException {
|
||||
public String load(HeliumPackage pkg, Paragraph paragraph) throws ApplicationException {
|
||||
Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
|
||||
RemoteInterpreterProcess intpProcess = intp.getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
InterpreterGroup intpGroup = intp.getInterpreterGroup();
|
||||
RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess();
|
||||
if (intpProcess == null) {
|
||||
throw new ApplicationException("Target interpreter process is not running");
|
||||
}
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
|
|
@ -95,134 +81,247 @@ public class HeliumApplicationFactory {
|
|||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
String appId = generateApplicationId();
|
||||
String appId = generateApplicationId(pkg, paragraph);
|
||||
String pkgInfo = gson.toJson(pkg);
|
||||
try {
|
||||
RemoteApplicationResult ret = client.loadApplication(
|
||||
appId,
|
||||
pkgInfo,
|
||||
paragraph.getNote().getId(),
|
||||
paragraph.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
RunningApplicationInfo appInfo = new RunningApplicationInfo(pkg, intpProcess, paragraph);
|
||||
apps.put(appId, appInfo);
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
ApplicationState appState = null;
|
||||
synchronized (paragraph.apps) {
|
||||
for (ApplicationState as : paragraph.apps) {
|
||||
if (as.getName().equals(pkg.getName())) {
|
||||
appState = as;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (appState == null) {
|
||||
appState = new ApplicationState(appId, pkg.getName());
|
||||
paragraph.apps.add(appState);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (appState) {
|
||||
if (appState.getStatus() == ApplicationState.ApplicationStatus.LOADED) {
|
||||
logger.info("Application {} already loaded on paragraph {}",
|
||||
pkg.getName(), paragraph.getId());
|
||||
return appId;
|
||||
}
|
||||
appState.setStatus(ApplicationState.ApplicationStatus.LOADING);
|
||||
try {
|
||||
RemoteApplicationResult ret = client.loadApplication(
|
||||
appId,
|
||||
pkgInfo,
|
||||
paragraph.getNote().getId(),
|
||||
paragraph.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
appState.setStatus(ApplicationState.ApplicationStatus.LOADED);
|
||||
} else {
|
||||
appState.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
return appId;
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
appState.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Unload pkg
|
||||
*/
|
||||
public void unload(String appId) throws ApplicationException {
|
||||
RunningApplicationInfo appInfo = apps.get(appId);
|
||||
if (appInfo == null) {
|
||||
public void unload(Paragraph paragraph, String appName) throws ApplicationException {
|
||||
|
||||
ApplicationState appsToUnload = null;
|
||||
|
||||
synchronized (paragraph.apps) {
|
||||
for (ApplicationState as : paragraph.apps) {
|
||||
if (as.getName().equals(appName)) {
|
||||
appsToUnload = as;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (appsToUnload == null) {
|
||||
logger.warn("Can not find {} to unload from {}", appName, paragraph.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess intpProcess = appInfo.getProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.unloadApplication(appId);
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
apps.remove(appId);
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
synchronized (appsToUnload) {
|
||||
if (appsToUnload.getStatus() != ApplicationState.ApplicationStatus.LOADED) {
|
||||
throw new ApplicationException(
|
||||
"Can't unload application status " + appsToUnload.getStatus());
|
||||
}
|
||||
appsToUnload.setStatus(ApplicationState.ApplicationStatus.UNLOADING);
|
||||
RemoteInterpreterProcess intpProcess =
|
||||
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
appsToUnload.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
|
||||
} else {
|
||||
appsToUnload.setStatus(ApplicationState.ApplicationStatus.LOADED);
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
appsToUnload.setStatus(ApplicationState.ApplicationStatus.LOADED);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* run pkg
|
||||
*/
|
||||
public void run(String appId) throws ApplicationException {
|
||||
RunningApplicationInfo appInfo = apps.get(appId);
|
||||
if (appInfo == null) {
|
||||
public void run(Paragraph paragraph, String appName) throws ApplicationException {
|
||||
ApplicationState app = null;
|
||||
synchronized (paragraph.apps) {
|
||||
for (ApplicationState as : paragraph.apps) {
|
||||
if (as.getName().equals(appName)) {
|
||||
app = as;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (app == null) {
|
||||
logger.warn("Can not find app {} from {}", appName, paragraph.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess intpProcess = appInfo.getProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.runApplication(appId);
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
// success
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
synchronized (app) {
|
||||
if (app.getStatus() != ApplicationState.ApplicationStatus.LOADED) {
|
||||
throw new ApplicationException(
|
||||
"Can't run application status " + app.getStatus());
|
||||
}
|
||||
RemoteInterpreterProcess intpProcess =
|
||||
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
RemoteApplicationResult ret = client.runApplication(app.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
// success
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
public void unloadAllInTheInteprreterProcess(RemoteInterpreterProcess process) {
|
||||
for (String appId : apps.keySet()) {
|
||||
RunningApplicationInfo app = apps.get(appId);
|
||||
if (app.getProcess() == process) {
|
||||
try {
|
||||
unload(appId);
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
public void unloadAllInTheInterpreterProcess(RemoteInterpreterProcess process) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
public void unloadAllInTheNote(Note note) {
|
||||
for (String appId : apps.keySet()) {
|
||||
RunningApplicationInfo app = apps.get(appId);
|
||||
if (app.getParagraph().getNote().getId().equals(note.getId())) {
|
||||
try {
|
||||
unload(appId);
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO
|
||||
}
|
||||
|
||||
public void unloadAllInTheParagraph(Paragraph paragraph) {
|
||||
for (String appId : apps.keySet()) {
|
||||
RunningApplicationInfo app = apps.get(appId);
|
||||
if (app.getParagraph().getId().equals(paragraph.getId())) {
|
||||
try {
|
||||
unload(appId);
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
// TODO
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onOutputAppend(String noteId, String paragraphId, String appId, String output) {
|
||||
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
|
||||
|
||||
if (appToUpdate != null) {
|
||||
appToUpdate.appendOutput(output);
|
||||
} else {
|
||||
logger.error("Can't find app {}", appId);
|
||||
}
|
||||
|
||||
if (applicationEventListener != null) {
|
||||
applicationEventListener.onOutputAppend(noteId, paragraphId, appId, output);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output) {
|
||||
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
|
||||
|
||||
if (appToUpdate != null) {
|
||||
appToUpdate.setOutput(output);
|
||||
} else {
|
||||
logger.error("Can't find app {}", appId);
|
||||
}
|
||||
|
||||
if (applicationEventListener != null) {
|
||||
applicationEventListener.onOutputUpdated(noteId, paragraphId, appId, output);
|
||||
}
|
||||
}
|
||||
|
||||
private ApplicationState getAppState(String noteId, String paragraphId, String appId) {
|
||||
if (notebook == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Note note = notebook.getNote(noteId);
|
||||
if (note == null) {
|
||||
logger.error("Can't get note {}", noteId);
|
||||
return null;
|
||||
}
|
||||
Paragraph paragraph = note.getParagraph(paragraphId);
|
||||
if (paragraph == null) {
|
||||
logger.error("Can't get paragraph {}", paragraphId);
|
||||
return null;
|
||||
}
|
||||
|
||||
ApplicationState appFound = null;
|
||||
synchronized (paragraph.apps) {
|
||||
for (ApplicationState app : paragraph.apps) {
|
||||
if (app.getId().equals(appId)) {
|
||||
appFound = app;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return appFound;
|
||||
}
|
||||
|
||||
public Notebook getNotebook() {
|
||||
return notebook;
|
||||
}
|
||||
|
||||
public void setNotebook(Notebook notebook) {
|
||||
this.notebook = notebook;
|
||||
}
|
||||
|
||||
public ApplicationEventListener getApplicationEventListener() {
|
||||
return applicationEventListener;
|
||||
}
|
||||
|
||||
public void setApplicationEventListener(ApplicationEventListener applicationEventListener) {
|
||||
this.applicationEventListener = applicationEventListener;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import org.apache.zeppelin.dep.Dependency;
|
|||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
|
|
@ -73,22 +74,27 @@ public class InterpreterFactory {
|
|||
|
||||
AngularObjectRegistryListener angularObjectRegistryListener;
|
||||
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
|
||||
private final ApplicationEventListener appEventListener;
|
||||
|
||||
private DependencyResolver depResolver;
|
||||
|
||||
private Map<String, String> env;
|
||||
|
||||
public InterpreterFactory(ZeppelinConfiguration conf,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appEventListener,
|
||||
DependencyResolver depResolver)
|
||||
throws InterpreterException, IOException, RepositoryException {
|
||||
this(conf, new InterpreterOption(true), angularObjectRegistryListener,
|
||||
remoteInterpreterProcessListener, depResolver);
|
||||
remoteInterpreterProcessListener, appEventListener, depResolver);
|
||||
}
|
||||
|
||||
|
||||
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appEventListener,
|
||||
DependencyResolver depResolver)
|
||||
throws InterpreterException, IOException, RepositoryException {
|
||||
this.conf = conf;
|
||||
|
|
@ -97,6 +103,7 @@ public class InterpreterFactory {
|
|||
this.depResolver = depResolver;
|
||||
this.interpreterRepositories = depResolver.getRepos();
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.appEventListener = appEventListener;
|
||||
String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
|
||||
interpreterClassList = replsConf.split(",");
|
||||
|
||||
|
|
@ -812,10 +819,12 @@ public class InterpreterFactory {
|
|||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterId;
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
|
||||
RemoteInterpreter remoteInterpreter = new RemoteInterpreter(
|
||||
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, localRepoPath, connectTimeout,
|
||||
maxPoolSize, remoteInterpreterProcessListener));
|
||||
maxPoolSize, remoteInterpreterProcessListener, appEventListener);
|
||||
remoteInterpreter.setEnv(env);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(remoteInterpreter);
|
||||
return intp;
|
||||
}
|
||||
|
||||
|
|
@ -853,4 +862,12 @@ public class InterpreterFactory {
|
|||
depResolver.delRepo(id);
|
||||
saveToFile();
|
||||
}
|
||||
|
||||
public Map<String, String> getEnv() {
|
||||
return env;
|
||||
}
|
||||
|
||||
public void setEnv(Map<String, String> env) {
|
||||
this.env = env;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,8 +16,86 @@
|
|||
*/
|
||||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import org.apache.zeppelin.helium.Application;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
|
||||
/**
|
||||
* Running ApplicationState
|
||||
*/
|
||||
public class ApplicationState {
|
||||
|
||||
/**
|
||||
* Status of Application
|
||||
*/
|
||||
public static enum ApplicationStatus {
|
||||
LOADING,
|
||||
LOADED,
|
||||
UNLOADING,
|
||||
UNLOADED
|
||||
};
|
||||
|
||||
|
||||
String id; // unique id for this instance similar to note id or paragraph id
|
||||
String name; // name of app
|
||||
ApplicationStatus status;
|
||||
String output;
|
||||
|
||||
public ApplicationState(String id, String name) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
status = ApplicationStatus.UNLOADED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
String compareName;
|
||||
if (o instanceof ApplicationState) {
|
||||
compareName = ((ApplicationState) o).name;
|
||||
} else if (o instanceof String) {
|
||||
compareName = (String) o;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
return name.equals(compareName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return name.hashCode();
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setStatus(ApplicationStatus status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public ApplicationStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public String getOutput() {
|
||||
return output;
|
||||
}
|
||||
|
||||
public void setOutput(String output) {
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
public synchronized void appendOutput(String output) {
|
||||
if (this.output == null) {
|
||||
this.output = output;
|
||||
} else {
|
||||
this.output += output;
|
||||
}
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
|||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
Date dateUpdated;
|
||||
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
|
||||
public final GUI settings; // form and parameter settings
|
||||
private Map<String, ApplicationState> apps; // helium applications running
|
||||
public final List<ApplicationState> apps = new LinkedList<ApplicationState>();
|
||||
|
||||
@VisibleForTesting
|
||||
Paragraph() {
|
||||
|
|
@ -72,7 +72,6 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
dateUpdated = null;
|
||||
settings = new GUI();
|
||||
config = new HashMap<String, Object>();
|
||||
apps = new HashMap<String, ApplicationState>();
|
||||
}
|
||||
|
||||
private static String generateId() {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
|
||||
import org.apache.zeppelin.notebook.*;
|
||||
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.apache.zeppelin.search.SearchService;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
||||
private File tmpDir;
|
||||
private File notebookDir;
|
||||
private ZeppelinConfiguration conf;
|
||||
private SchedulerFactory schedulerFactory;
|
||||
private DependencyResolver depResolver;
|
||||
private InterpreterFactory factory;
|
||||
private VFSNotebookRepo notebookRepo;
|
||||
private Notebook notebook;
|
||||
private HeliumApplicationFactory heliumAppFactory;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
|
||||
tmpDir.mkdirs();
|
||||
File confDir = new File(tmpDir, "conf");
|
||||
confDir.mkdirs();
|
||||
notebookDir = new File(tmpDir + "/notebook");
|
||||
notebookDir.mkdirs();
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
|
||||
conf = ZeppelinConfiguration.create();
|
||||
|
||||
this.schedulerFactory = new SchedulerFactory();
|
||||
|
||||
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
|
||||
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
|
||||
heliumAppFactory = new HeliumApplicationFactory();
|
||||
|
||||
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
|
||||
factory = new InterpreterFactory(conf,
|
||||
new InterpreterOption(true), null, null, heliumAppFactory, depResolver);
|
||||
HashMap<String, String> env = new HashMap<String, String>();
|
||||
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
|
||||
factory.setEnv(env);
|
||||
|
||||
SearchService search = mock(SearchService.class);
|
||||
notebookRepo = new VFSNotebookRepo(conf);
|
||||
NotebookAuthorization notebookAuthorization = new NotebookAuthorization(conf);
|
||||
notebook = new Notebook(
|
||||
conf,
|
||||
notebookRepo,
|
||||
schedulerFactory,
|
||||
factory,
|
||||
this,
|
||||
search,
|
||||
notebookAuthorization);
|
||||
|
||||
heliumAppFactory.setNotebook(notebook);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testLoadRunUnloadApplication()
|
||||
throws IOException, ApplicationException, InterruptedException {
|
||||
// given
|
||||
HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
|
||||
"name1",
|
||||
"desc1",
|
||||
"",
|
||||
HeliumTestApplication.class.getName(),
|
||||
new String[][]{});
|
||||
|
||||
Note note1 = notebook.createNote();
|
||||
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
|
||||
Paragraph p1 = note1.addParagraph();
|
||||
|
||||
// make sure interpreter process running
|
||||
p1.setText("job");
|
||||
note1.run(p1.getId());
|
||||
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
|
||||
|
||||
assertEquals("repl1: job", p1.getResult().message());
|
||||
|
||||
// when
|
||||
String appId = heliumAppFactory.load(pkg1, p1);
|
||||
heliumAppFactory.run(p1, pkg1.getName());
|
||||
|
||||
// then
|
||||
Thread.sleep(1000);
|
||||
assertEquals("Hello world", p1.apps.get(0).getOutput());
|
||||
|
||||
// clean
|
||||
heliumAppFactory.unload(p1, pkg1.getName());
|
||||
notebook.removeNote(note1.getId());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ParagraphJobListener getParagraphJobListener(Note note) {
|
||||
return new ParagraphJobListener() {
|
||||
@Override
|
||||
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProgressUpdate(Job job, int progress) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeStatusChange(Job job, Job.Status before, Job.Status after) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterStatusChange(Job job, Job.Status before, Job.Status after) {
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class HeliumTestApplication extends Application {
|
||||
public HeliumTestApplication(ResourceSet args, ApplicationContext context)
|
||||
throws ApplicationException {
|
||||
super(args, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws ApplicationException {
|
||||
try {
|
||||
context().out.write("Hello world");
|
||||
context().out.flush();
|
||||
} catch (IOException e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unload() throws ApplicationException {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -60,7 +60,7 @@ public class InterpreterFactoryTest {
|
|||
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
conf = new ZeppelinConfiguration();
|
||||
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
|
||||
context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ public class NoteInterpreterLoaderTest {
|
|||
MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
|
||||
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
|
||||
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
|
||||
|
||||
SearchService search = mock(SearchService.class);
|
||||
notebookRepo = new VFSNotebookRepo(conf);
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
|
|||
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
|
||||
depResolver = new DependencyResolver(mainZepDir.getAbsolutePath() + "/local-repo");
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
|
||||
|
||||
search = mock(SearchService.class);
|
||||
notebookRepoSync = new NotebookRepoSync(conf);
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
|
|||
|
||||
this.schedulerFactory = new SchedulerFactory();
|
||||
depResolver = new DependencyResolver(mainZepDir.getAbsolutePath() + "/local-repo");
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
|
||||
|
||||
SearchService search = mock(SearchService.class);
|
||||
notebookRepo = new VFSNotebookRepo(conf);
|
||||
|
|
|
|||
Loading…
Reference in a new issue