Application output

This commit is contained in:
Lee moon soo 2016-04-01 11:41:57 -07:00
parent b239f1b96b
commit 9f5c493e41
30 changed files with 764 additions and 184 deletions

View file

@ -16,16 +16,20 @@
*/
package org.apache.zeppelin.helium;
import org.apache.zeppelin.interpreter.InterpreterOutput;
/**
* ApplicationContext
*/
public class ApplicationContext {
private final String noteId;
private final String paragraphId;
public final InterpreterOutput out;
public ApplicationContext(String noteId, String paragraphId) {
public ApplicationContext(String noteId, String paragraphId, InterpreterOutput out) {
this.noteId = noteId;
this.paragraphId = paragraphId;
this.out = out;
}
public String getNoteId() {

View file

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
/**
* Event from HeliumApplication running on remote interpreter process
*/
public interface ApplicationEventListener {
public void onOutputAppend(String noteId, String paragraphId, String appId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output);
}

View file

@ -22,6 +22,9 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.Application;
import org.apache.zeppelin.helium.ApplicationException;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.resource.ResourcePool;

View file

@ -23,6 +23,7 @@ import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
@ -41,6 +42,7 @@ import com.google.gson.reflect.TypeToken;
*/
public class RemoteInterpreter extends Interpreter {
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
private final ApplicationEventListener applicationEventListener;
Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
Gson gson = new Gson();
private String interpreterRunner;
@ -56,14 +58,15 @@ public class RemoteInterpreter extends Interpreter {
private static String schedulerName;
public RemoteInterpreter(Properties property,
String noteId,
String className,
String interpreterRunner,
String interpreterPath,
String localRepoPath,
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
String noteId,
String className,
String interpreterRunner,
String interpreterPath,
String localRepoPath,
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener) {
super(property);
this.noteId = noteId;
this.className = className;
@ -75,17 +78,19 @@ public class RemoteInterpreter extends Interpreter {
this.connectTimeout = connectTimeout;
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
}
public RemoteInterpreter(Properties property,
String noteId,
String className,
String interpreterRunner,
String interpreterPath,
String localRepoPath,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
String noteId,
String className,
String interpreterRunner,
String interpreterPath,
String localRepoPath,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener) {
super(property);
this.className = className;
this.noteId = noteId;
@ -96,6 +101,7 @@ public class RemoteInterpreter extends Interpreter {
this.connectTimeout = connectTimeout;
this.maxPoolSize = 10;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
}
@Override
@ -114,7 +120,7 @@ public class RemoteInterpreter extends Interpreter {
// create new remote process
RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
remoteInterpreterProcessListener);
remoteInterpreterProcessListener, applicationEventListener);
intpGroup.setRemoteInterpreterProcess(remoteProcess);
}
@ -423,4 +429,12 @@ public class RemoteInterpreter extends Interpreter {
client.angularRegistryPush(gson.toJson(registry, registryType));
}
}
public Map<String, String> getEnv() {
return env;
}
public void setEnv(Map<String, String> env) {
this.env = env;
}
}

View file

@ -242,4 +242,28 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
}
}
public void onAppOutputAppend(String noteId, String paragraphId, String appId, String output) {
Map<String, String> appendOutput = new HashMap<String, String>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("appId", appId);
appendOutput.put("data", output);
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.OUTPUT_APPEND,
gson.toJson(appendOutput)));
}
public void onAppOutputUpdate(String noteId, String paragraphId, String appId, String output) {
Map<String, String> appendOutput = new HashMap<String, String>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("appId", appId);
appendOutput.put("data", output);
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.OUTPUT_UPDATE,
gson.toJson(appendOutput)));
}
}

View file

@ -22,6 +22,7 @@ import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@ -47,14 +48,18 @@ import java.util.Map;
public class RemoteInterpreterEventPoller extends Thread {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
private final RemoteInterpreterProcessListener listener;
private final ApplicationEventListener appListener;
private volatile boolean shutdown;
private RemoteInterpreterProcess interpreterProcess;
private InterpreterGroup interpreterGroup;
public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) {
public RemoteInterpreterEventPoller(
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
this.listener = listener;
this.appListener = appListener;
shutdown = false;
}
@ -141,8 +146,13 @@ public class RemoteInterpreterEventPoller extends Thread {
String noteId = outputAppend.get("noteId");
String paragraphId = outputAppend.get("paragraphId");
String outputToAppend = outputAppend.get("data");
String appId = outputAppend.get("appId");
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
if (appId == null) {
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
} else {
appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend);
}
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
// on output update
Map<String, String> outputAppend = gson.fromJson(
@ -150,8 +160,13 @@ public class RemoteInterpreterEventPoller extends Thread {
String noteId = outputAppend.get("noteId");
String paragraphId = outputAppend.get("paragraphId");
String outputToUpdate = outputAppend.get("data");
String appId = outputAppend.get("appId");
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
if (appId == null) {
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
} else {
appListener.onOutputUpdated(noteId, paragraphId, appId, outputToUpdate);
}
}
logger.debug("Event from remoteproceess {}", event.getType());
} catch (Exception e) {

View file

@ -22,6 +22,7 @@ import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TException;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
@ -53,17 +54,19 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
private int connectTimeout;
public RemoteInterpreterProcess(String intpRunner,
public RemoteInterpreterProcess(
String intpRunner,
String intpDir,
String localRepoDir,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener listener) {
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
this(intpRunner,
intpDir,
localRepoDir,
env,
new RemoteInterpreterEventPoller(listener),
new RemoteInterpreterEventPoller(listener, appListener),
connectTimeout);
}

View file

@ -706,9 +706,28 @@ public class RemoteInterpreterServer
}
}
private InterpreterOutput createAppOutput(final String noteId,
final String paragraphId,
final String appId) {
return new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
logger.info("Append output ----------------" + new String(line));
eventClient.onAppOutputAppend(noteId, paragraphId, appId, new String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
eventClient.onAppOutputUpdate(noteId, paragraphId, appId, new String(output));
}
});
}
private ApplicationContext getApplicationContext(
HeliumPackage packageInfo, String noteId, String paragraphId) {
return new ApplicationContext(noteId, paragraphId);
HeliumPackage packageInfo, String noteId, String paragraphId, String applicationInstanceId) {
System.err.println("get app context ************");
InterpreterOutput out = createAppOutput(noteId, paragraphId, applicationInstanceId);
return new ApplicationContext(noteId, paragraphId, out);
}
@Override
@ -720,11 +739,12 @@ public class RemoteInterpreterServer
return new RemoteApplicationResult(true, "");
}
HeliumPackage pkgInfo = gson.fromJson(packageInfo, HeliumPackage.class);
ApplicationContext context = getApplicationContext(pkgInfo, noteId, paragraphId);
ApplicationContext context = getApplicationContext(
pkgInfo, noteId, paragraphId, applicationInstanceId);
try {
Application app = null;
logger.info(
"Loading application {}({}). artifact={}, className={} into note={}, paragraph={}",
"Loading application {}({}), artifact={}, className={} into note={}, paragraph={}",
pkgInfo.getName(),
applicationInstanceId,
pkgInfo.getArtifact(),

View file

@ -19,6 +19,8 @@ package org.apache.zeppelin.helium;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.junit.After;
import org.junit.Before;
@ -83,7 +85,18 @@ public class ApplicationLoaderTest {
public ApplicationContext createContext(String noteId, String paragraphId) {
ApplicationContext context1 = new ApplicationContext(
noteId,
paragraphId);
paragraphId,
new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}));
return context1;
}
}

View file

@ -68,6 +68,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
"fakeRepo",
env,
10 * 1000,
null,
null
);

View file

@ -66,7 +66,8 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
"fakeRepo",
env,
10 * 1000,
this);
this,
null);
intpGroup.get("note").add(intp);
intp.setInterpreterGroup(intpGroup);

View file

@ -34,7 +34,7 @@ public class RemoteInterpreterProcessTest {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
10 * 1000, null);
10 * 1000, null, null);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
assertEquals(1, rip.reference(intpGroup));

View file

@ -83,6 +83,7 @@ public class RemoteInterpreterTest {
"fakeRepo",
env,
10 * 1000,
null,
null);
}
@ -100,6 +101,7 @@ public class RemoteInterpreterTest {
"fakeRepo",
env,
10 * 1000,
null,
null);
}
@ -198,6 +200,7 @@ public class RemoteInterpreterTest {
"fakeRepo",
env,
10 * 1000,
null,
null);
@ -213,6 +216,7 @@ public class RemoteInterpreterTest {
"fakeRepo",
env,
10 * 1000,
null,
null);
intpGroup.get("note").add(intpB);
@ -677,7 +681,7 @@ public class RemoteInterpreterTest {
//Given
final Client client = Mockito.mock(Client.class);
final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
MockInterpreterA.class.getName(), "runner", "path","localRepo", env, 10 * 1000, null);
MockInterpreterA.class.getName(), "runner", "path","localRepo", env, 10 * 1000, null, null);
final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");

View file

@ -65,6 +65,7 @@ public class DistributedResourcePoolTest {
"fakeRepo",
env,
10 * 1000,
null,
null
);
@ -82,6 +83,7 @@ public class DistributedResourcePoolTest {
"fakeRepo",
env,
10 * 1000,
null,
null
);
@ -106,11 +108,11 @@ public class DistributedResourcePoolTest {
intp1.open();
intp2.open();
eventPoller1 = new RemoteInterpreterEventPoller(null);
eventPoller1 = new RemoteInterpreterEventPoller(null, null);
eventPoller1.setInterpreterGroup(intpGroup1);
eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
eventPoller2 = new RemoteInterpreterEventPoller(null);
eventPoller2 = new RemoteInterpreterEventPoller(null, null);
eventPoller2.setInterpreterGroup(intpGroup2);
eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());

View file

@ -76,7 +76,8 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
"fakeRepo",
env,
10 * 1000,
this);
this,
null);
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(intpA);
@ -164,7 +165,8 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
"fakeRepo",
env,
10 * 1000,
this);
this,
null);
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(intpA);

View file

@ -32,6 +32,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.helium.Helium;
import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
@ -76,6 +77,7 @@ public class ZeppelinServer extends Application {
private NotebookAuthorization notebookAuthorization;
private DependencyResolver depResolver;
private Helium helium;
private HeliumApplicationFactory heliumApplicationFactory;
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
@ -84,15 +86,21 @@ public class ZeppelinServer extends Application {
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
this.helium = new Helium(conf.getHeliumConfPath());
this.heliumApplicationFactory = new HeliumApplicationFactory();
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookWsServer,
notebookWsServer, depResolver);
notebookWsServer, heliumApplicationFactory, depResolver);
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new LuceneSearch();
this.notebookAuthorization = new NotebookAuthorization(conf);
notebook = new Notebook(conf,
notebookRepo, schedulerFactory, replFactory, notebookWsServer,
notebookIndex, notebookAuthorization);
// to update notebook from application event from remote process.
heliumApplicationFactory.setNotebook(notebook);
// to update fire websocket event on application event.
heliumApplicationFactory.setApplicationEventListener(notebookWsServer);
}
public static void main(String[] args) throws InterruptedException {

View file

@ -109,10 +109,12 @@ public class Message {
CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
// @param settings serialized Map<String, String> object
CHECKPOINT_NOTEBOOK // [c-s] checkpoint notebook to storage repository
CHECKPOINT_NOTEBOOK, // [c-s] checkpoint notebook to storage repository
// @param noteId
// @param checkpointName
APP_APPEND_OUTPUT, // [s-c] append output
APP_UPDATE_OUTPUT // [s-c] update (replace) output
}
public OP op;

View file

@ -34,6 +34,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.user.AuthenticationInfo;
@ -60,7 +61,7 @@ import org.slf4j.LoggerFactory;
*/
public class NotebookServer extends WebSocketServlet implements
NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener,
RemoteInterpreterProcessListener {
RemoteInterpreterProcessListener, ApplicationEventListener {
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
Gson gson = new Gson();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
@ -965,7 +966,6 @@ public class NotebookServer extends WebSocketServlet implements
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("data", output);
Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
broadcast(noteId, msg);
}
@ -981,7 +981,40 @@ public class NotebookServer extends WebSocketServlet implements
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("data", output);
Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
broadcast(noteId, msg);
}
/**
* When application append output
* @param noteId
* @param paragraphId
* @param appId
* @param output
*/
@Override
public void onOutputAppend(String noteId, String paragraphId, String appId, String output) {
Message msg = new Message(OP.APP_APPEND_OUTPUT)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("appId", appId)
.put("data", output);
broadcast(noteId, msg);
}
/**
* When application update output
* @param noteId
* @param paragraphId
* @param appId
* @param output
*/
@Override
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output) {
Message msg = new Message(OP.APP_UPDATE_OUTPUT)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("appId", appId)
.put("data", output);
broadcast(noteId, msg);
}

View file

@ -19,74 +19,60 @@ package org.apache.zeppelin.helium;
import com.google.gson.Gson;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.notebook.ApplicationState;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.*;
/**
* HeliumApplicationFactory
*
* 1. sync api -> async api
* 2. unload app when paragraph / note / interpreter remove
* 3. front-end job
* 4. example app
* 5. dev mode
* 6. app launcher
*/
public class HeliumApplicationFactory {
public class HeliumApplicationFactory implements ApplicationEventListener {
Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
private final Gson gson;
Map<String, RunningApplicationInfo> apps =
Collections.synchronizedMap(new HashMap<String, RunningApplicationInfo>());
private Notebook notebook;
private ApplicationEventListener applicationEventListener;
public HeliumApplicationFactory() {
gson = new Gson();
}
private static class RunningApplicationInfo {
HeliumPackage pkg;
Paragraph paragraph;
RemoteInterpreterProcess process;
public RunningApplicationInfo(HeliumPackage pkg,
RemoteInterpreterProcess process,
Paragraph paragraph) {
this.pkg = pkg;
this.paragraph = paragraph;
this.process = process;
}
public HeliumPackage getPkg() {
return pkg;
}
public Paragraph getParagraph() {
return paragraph;
}
public RemoteInterpreterProcess getProcess() {
return process;
}
private static String generateApplicationId(HeliumPackage pkg, Paragraph paragraph) {
return "app_" + paragraph.getNote().getId() + "-" + paragraph.getId() + pkg.getName();
}
private static String generateApplicationId() {
return "app_" + System.currentTimeMillis() + "_"
+ new Random(System.currentTimeMillis()).nextInt();
private boolean isRemote(InterpreterGroup group) {
return group.getAngularObjectRegistry() instanceof RemoteAngularObjectRegistry;
}
/**
* Load pkg
*/
public void load(HeliumPackage pkg, Paragraph paragraph) throws ApplicationException {
public String load(HeliumPackage pkg, Paragraph paragraph) throws ApplicationException {
Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
RemoteInterpreterProcess intpProcess = intp.getInterpreterGroup().getRemoteInterpreterProcess();
InterpreterGroup intpGroup = intp.getInterpreterGroup();
RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess();
if (intpProcess == null) {
throw new ApplicationException("Target interpreter process is not running");
}
RemoteInterpreterService.Client client;
try {
@ -95,134 +81,247 @@ public class HeliumApplicationFactory {
throw new ApplicationException(e);
}
String appId = generateApplicationId();
String appId = generateApplicationId(pkg, paragraph);
String pkgInfo = gson.toJson(pkg);
try {
RemoteApplicationResult ret = client.loadApplication(
appId,
pkgInfo,
paragraph.getNote().getId(),
paragraph.getId());
if (ret.isSuccess()) {
RunningApplicationInfo appInfo = new RunningApplicationInfo(pkg, intpProcess, paragraph);
apps.put(appId, appInfo);
} else {
throw new ApplicationException(ret.getMsg());
ApplicationState appState = null;
synchronized (paragraph.apps) {
for (ApplicationState as : paragraph.apps) {
if (as.getName().equals(pkg.getName())) {
appState = as;
break;
}
}
if (appState == null) {
appState = new ApplicationState(appId, pkg.getName());
paragraph.apps.add(appState);
}
}
synchronized (appState) {
if (appState.getStatus() == ApplicationState.ApplicationStatus.LOADED) {
logger.info("Application {} already loaded on paragraph {}",
pkg.getName(), paragraph.getId());
return appId;
}
appState.setStatus(ApplicationState.ApplicationStatus.LOADING);
try {
RemoteApplicationResult ret = client.loadApplication(
appId,
pkgInfo,
paragraph.getNote().getId(),
paragraph.getId());
if (ret.isSuccess()) {
appState.setStatus(ApplicationState.ApplicationStatus.LOADED);
} else {
appState.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
throw new ApplicationException(ret.getMsg());
}
return appId;
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
appState.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
/**
* Unload pkg
*/
public void unload(String appId) throws ApplicationException {
RunningApplicationInfo appInfo = apps.get(appId);
if (appInfo == null) {
public void unload(Paragraph paragraph, String appName) throws ApplicationException {
ApplicationState appsToUnload = null;
synchronized (paragraph.apps) {
for (ApplicationState as : paragraph.apps) {
if (as.getName().equals(appName)) {
appsToUnload = as;
break;
}
}
}
if (appsToUnload == null) {
logger.warn("Can not find {} to unload from {}", appName, paragraph.getId());
return;
}
RemoteInterpreterProcess intpProcess = appInfo.getProcess();
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.unloadApplication(appId);
if (ret.isSuccess()) {
apps.remove(appId);
} else {
throw new ApplicationException(ret.getMsg());
synchronized (appsToUnload) {
if (appsToUnload.getStatus() != ApplicationState.ApplicationStatus.LOADED) {
throw new ApplicationException(
"Can't unload application status " + appsToUnload.getStatus());
}
appsToUnload.setStatus(ApplicationState.ApplicationStatus.UNLOADING);
RemoteInterpreterProcess intpProcess =
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
if (ret.isSuccess()) {
appsToUnload.setStatus(ApplicationState.ApplicationStatus.UNLOADED);
} else {
appsToUnload.setStatus(ApplicationState.ApplicationStatus.LOADED);
throw new ApplicationException(ret.getMsg());
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
appsToUnload.setStatus(ApplicationState.ApplicationStatus.LOADED);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
/**
* run pkg
*/
public void run(String appId) throws ApplicationException {
RunningApplicationInfo appInfo = apps.get(appId);
if (appInfo == null) {
public void run(Paragraph paragraph, String appName) throws ApplicationException {
ApplicationState app = null;
synchronized (paragraph.apps) {
for (ApplicationState as : paragraph.apps) {
if (as.getName().equals(appName)) {
app = as;
break;
}
}
}
if (app == null) {
logger.warn("Can not find app {} from {}", appName, paragraph.getId());
return;
}
RemoteInterpreterProcess intpProcess = appInfo.getProcess();
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.runApplication(appId);
if (ret.isSuccess()) {
// success
} else {
throw new ApplicationException(ret.getMsg());
synchronized (app) {
if (app.getStatus() != ApplicationState.ApplicationStatus.LOADED) {
throw new ApplicationException(
"Can't run application status " + app.getStatus());
}
RemoteInterpreterProcess intpProcess =
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
try {
RemoteApplicationResult ret = client.runApplication(app.getId());
if (ret.isSuccess()) {
// success
} else {
throw new ApplicationException(ret.getMsg());
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
}
}
public void unloadAllInTheInteprreterProcess(RemoteInterpreterProcess process) {
for (String appId : apps.keySet()) {
RunningApplicationInfo app = apps.get(appId);
if (app.getProcess() == process) {
try {
unload(appId);
} catch (ApplicationException e) {
logger.error(e.getMessage(), e);
}
}
}
public void unloadAllInTheInterpreterProcess(RemoteInterpreterProcess process) {
// TODO
}
public void unloadAllInTheNote(Note note) {
for (String appId : apps.keySet()) {
RunningApplicationInfo app = apps.get(appId);
if (app.getParagraph().getNote().getId().equals(note.getId())) {
try {
unload(appId);
} catch (ApplicationException e) {
logger.error(e.getMessage(), e);
}
}
}
// TODO
}
public void unloadAllInTheParagraph(Paragraph paragraph) {
for (String appId : apps.keySet()) {
RunningApplicationInfo app = apps.get(appId);
if (app.getParagraph().getId().equals(paragraph.getId())) {
try {
unload(appId);
} catch (ApplicationException e) {
logger.error(e.getMessage(), e);
// TODO
}
@Override
public void onOutputAppend(String noteId, String paragraphId, String appId, String output) {
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
if (appToUpdate != null) {
appToUpdate.appendOutput(output);
} else {
logger.error("Can't find app {}", appId);
}
if (applicationEventListener != null) {
applicationEventListener.onOutputAppend(noteId, paragraphId, appId, output);
}
}
@Override
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output) {
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
if (appToUpdate != null) {
appToUpdate.setOutput(output);
} else {
logger.error("Can't find app {}", appId);
}
if (applicationEventListener != null) {
applicationEventListener.onOutputUpdated(noteId, paragraphId, appId, output);
}
}
private ApplicationState getAppState(String noteId, String paragraphId, String appId) {
if (notebook == null) {
return null;
}
Note note = notebook.getNote(noteId);
if (note == null) {
logger.error("Can't get note {}", noteId);
return null;
}
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph == null) {
logger.error("Can't get paragraph {}", paragraphId);
return null;
}
ApplicationState appFound = null;
synchronized (paragraph.apps) {
for (ApplicationState app : paragraph.apps) {
if (app.getId().equals(appId)) {
appFound = app;
break;
}
}
}
return appFound;
}
public Notebook getNotebook() {
return notebook;
}
public void setNotebook(Notebook notebook) {
this.notebook = notebook;
}
public ApplicationEventListener getApplicationEventListener() {
return applicationEventListener;
}
public void setApplicationEventListener(ApplicationEventListener applicationEventListener) {
this.applicationEventListener = applicationEventListener;
}
}

View file

@ -29,6 +29,7 @@ import org.apache.zeppelin.dep.Dependency;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
@ -73,22 +74,27 @@ public class InterpreterFactory {
AngularObjectRegistryListener angularObjectRegistryListener;
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
private final ApplicationEventListener appEventListener;
private DependencyResolver depResolver;
private Map<String, String> env;
public InterpreterFactory(ZeppelinConfiguration conf,
AngularObjectRegistryListener angularObjectRegistryListener,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appEventListener,
DependencyResolver depResolver)
throws InterpreterException, IOException, RepositoryException {
this(conf, new InterpreterOption(true), angularObjectRegistryListener,
remoteInterpreterProcessListener, depResolver);
remoteInterpreterProcessListener, appEventListener, depResolver);
}
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
AngularObjectRegistryListener angularObjectRegistryListener,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appEventListener,
DependencyResolver depResolver)
throws InterpreterException, IOException, RepositoryException {
this.conf = conf;
@ -97,6 +103,7 @@ public class InterpreterFactory {
this.depResolver = depResolver;
this.interpreterRepositories = depResolver.getRepos();
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.appEventListener = appEventListener;
String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
interpreterClassList = replsConf.split(",");
@ -812,10 +819,12 @@ public class InterpreterFactory {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterId;
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
RemoteInterpreter remoteInterpreter = new RemoteInterpreter(
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout,
maxPoolSize, remoteInterpreterProcessListener));
maxPoolSize, remoteInterpreterProcessListener, appEventListener);
remoteInterpreter.setEnv(env);
LazyOpenInterpreter intp = new LazyOpenInterpreter(remoteInterpreter);
return intp;
}
@ -853,4 +862,12 @@ public class InterpreterFactory {
depResolver.delRepo(id);
saveToFile();
}
public Map<String, String> getEnv() {
return env;
}
public void setEnv(Map<String, String> env) {
this.env = env;
}
}

View file

@ -16,8 +16,86 @@
*/
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.helium.Application;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
/**
* Running ApplicationState
*/
public class ApplicationState {
/**
* Status of Application
*/
public static enum ApplicationStatus {
LOADING,
LOADED,
UNLOADING,
UNLOADED
};
String id; // unique id for this instance similar to note id or paragraph id
String name; // name of app
ApplicationStatus status;
String output;
public ApplicationState(String id, String name) {
this.id = id;
this.name = name;
status = ApplicationStatus.UNLOADED;
}
@Override
public boolean equals(Object o) {
String compareName;
if (o instanceof ApplicationState) {
compareName = ((ApplicationState) o).name;
} else if (o instanceof String) {
compareName = (String) o;
} else {
return false;
}
return name.equals(compareName);
}
@Override
public int hashCode() {
return name.hashCode();
}
public String getId() {
return id;
}
public void setStatus(ApplicationStatus status) {
this.status = status;
}
public ApplicationStatus getStatus() {
return status;
}
public String getOutput() {
return output;
}
public void setOutput(String output) {
this.output = output;
}
public synchronized void appendOutput(String output) {
if (this.output == null) {
this.output = output;
} else {
this.output += output;
}
}
public String getName() {
return name;
}
}

View file

@ -34,6 +34,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;

View file

@ -53,7 +53,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
Date dateUpdated;
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
public final GUI settings; // form and parameter settings
private Map<String, ApplicationState> apps; // helium applications running
public final List<ApplicationState> apps = new LinkedList<ApplicationState>();
@VisibleForTesting
Paragraph() {
@ -72,7 +72,6 @@ public class Paragraph extends Job implements Serializable, Cloneable {
dateUpdated = null;
settings = new GUI();
config = new HashMap<String, Object>();
apps = new HashMap<String, ApplicationState>();
}
private static String generateId() {

View file

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
public class HeliumApplicationFactoryTest implements JobListenerFactory {
private File tmpDir;
private File notebookDir;
private ZeppelinConfiguration conf;
private SchedulerFactory schedulerFactory;
private DependencyResolver depResolver;
private InterpreterFactory factory;
private VFSNotebookRepo notebookRepo;
private Notebook notebook;
private HeliumApplicationFactory heliumAppFactory;
@Before
public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
tmpDir.mkdirs();
File confDir = new File(tmpDir, "conf");
confDir.mkdirs();
notebookDir = new File(tmpDir + "/notebook");
notebookDir.mkdirs();
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
conf = ZeppelinConfiguration.create();
this.schedulerFactory = new SchedulerFactory();
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
heliumAppFactory = new HeliumApplicationFactory();
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf,
new InterpreterOption(true), null, null, heliumAppFactory, depResolver);
HashMap<String, String> env = new HashMap<String, String>();
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
factory.setEnv(env);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
NotebookAuthorization notebookAuthorization = new NotebookAuthorization(conf);
notebook = new Notebook(
conf,
notebookRepo,
schedulerFactory,
factory,
this,
search,
notebookAuthorization);
heliumAppFactory.setNotebook(notebook);
}
@After
public void tearDown() throws Exception {
FileUtils.deleteDirectory(tmpDir);
}
@Test
public void testLoadRunUnloadApplication()
throws IOException, ApplicationException, InterruptedException {
// given
HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
"name1",
"desc1",
"",
HeliumTestApplication.class.getName(),
new String[][]{});
Note note1 = notebook.createNote();
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
Paragraph p1 = note1.addParagraph();
// make sure interpreter process running
p1.setText("job");
note1.run(p1.getId());
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
assertEquals("repl1: job", p1.getResult().message());
// when
String appId = heliumAppFactory.load(pkg1, p1);
heliumAppFactory.run(p1, pkg1.getName());
// then
Thread.sleep(1000);
assertEquals("Hello world", p1.apps.get(0).getOutput());
// clean
heliumAppFactory.unload(p1, pkg1.getName());
notebook.removeNote(note1.getId());
}
@Override
public ParagraphJobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener() {
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
}
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
}
@Override
public void onProgressUpdate(Job job, int progress) {
}
@Override
public void beforeStatusChange(Job job, Job.Status before, Job.Status after) {
}
@Override
public void afterStatusChange(Job job, Job.Status before, Job.Status after) {
}
};
}
}

View file

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
import org.apache.zeppelin.resource.ResourceSet;
import java.io.IOException;
public class HeliumTestApplication extends Application {
public HeliumTestApplication(ResourceSet args, ApplicationContext context)
throws ApplicationException {
super(args, context);
}
@Override
public void run() throws ApplicationException {
try {
context().out.write("Hello world");
context().out.flush();
} catch (IOException e) {
throw new ApplicationException(e);
}
}
@Override
public void unload() throws ApplicationException {
}
}

View file

@ -60,7 +60,7 @@ public class InterpreterFactoryTest {
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
conf = new ZeppelinConfiguration();
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null);
}

View file

@ -61,7 +61,7 @@ public class NoteInterpreterLoaderTest {
MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
}
@After

View file

@ -85,7 +85,7 @@ public class NotebookTest implements JobListenerFactory{
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);

View file

@ -92,7 +92,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
depResolver = new DependencyResolver(mainZepDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
search = mock(SearchService.class);
notebookRepoSync = new NotebookRepoSync(conf);

View file

@ -76,7 +76,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
this.schedulerFactory = new SchedulerFactory();
depResolver = new DependencyResolver(mainZepDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);