Merge branch 'master' into ZEPPELIN-732-up

This commit is contained in:
Lee moon soo 2016-07-02 10:47:03 -07:00
commit 0665380469
28 changed files with 576 additions and 579 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

20
docs/interpreter/shell.md Normal file
View file

@ -0,0 +1,20 @@
---
layout: page
title: "Shell Interpreter"
description: "Shell Interpreter"
group: manual
---
{% include JB/setup %}
## Shell interpreter for Apache Zeppelin
### Overview
Shell interpreter uses [Apache Commons Exec](https://commons.apache.org/proper/commons-exec) to execute external processes.
In Zeppelin notebook, you can use ` %sh ` in the beginning of a paragraph to invoke system shell and run commands.
Note: Currently each command runs as Zeppelin user.
### Example
The following example demonstrates the basic usage of Shell in a Zeppelin notebook.
<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/shell-example.png" width="70%" />

View file

@ -96,7 +96,7 @@ public class LivyHelper {
}.getType());
if (jsonMap.get("state").equals("idle")) {
break;
} else if (jsonMap.get("state").equals("error")) {
} else if (jsonMap.get("state").equals("error") || jsonMap.get("state").equals("dead")) {
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
sessionId + "/log",
"GET", null,
@ -124,7 +124,7 @@ public class LivyHelper {
protected void initializeSpark(final InterpreterContext context,
final Map<String, Integer> userSessionMap) throws Exception {
interpret("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" +
"import sqlContext.implicits._", context, userSessionMap);
}

View file

@ -80,7 +80,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
line.replaceAll("\"", "\\\\\"")
.replaceAll("\\n", " ")
+ "\").show(" +
property.get("livy.spark.sql.maxResult") + ")",
property.get("zeppelin.livy.spark.sql.maxResult") + ")",
interpreterContext, userSessionMap);
if (res.code() == InterpreterResult.Code.SUCCESS) {
@ -123,6 +123,10 @@ public class LivySparkSQLInterpreter extends Interpreter {
}
}
public boolean concurrentSQL() {
return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
}
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
@ -140,8 +144,19 @@ public class LivySparkSQLInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
LivySparkInterpreter.class.getName() + this.hashCode());
if (concurrentSQL()) {
int maxConcurrency = 10;
return SchedulerFactory.singleton().createOrGetParallelScheduler(
LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency);
} else {
Interpreter intp =
getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
if (intp != null) {
return intp.getScheduler();
} else {
return null;
}
}
}
@Override

View file

@ -88,6 +88,11 @@
"propertyName": "zeppelin.livy.spark.sql.maxResult",
"defaultValue": "1000",
"description": "Max number of SparkSQL result to display."
},
"zeppelin.livy.concurrentSQL": {
"propertyName": "zeppelin.livy.concurrentSQL",
"defaultValue": "false",
"description": "Execute multiple SQL concurrently if set true."
}
}
},

View file

@ -19,21 +19,23 @@ package org.apache.zeppelin.shell;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.*;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.Executor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@ -43,14 +45,11 @@ import org.slf4j.LoggerFactory;
* Shell interpreter for Zeppelin.
*/
public class ShellInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
private static final String EXECUTOR_KEY = "executor";
public static final String SHELL_COMMAND_TIMEOUT = "shell.command.timeout.millisecs";
int commandTimeOut;
private static final boolean isWindows = System
.getProperty("os.name")
.startsWith("Windows");
final String shell = isWindows ? "cmd /c" : "bash -c";
private static final Logger LOGGER = LoggerFactory.getLogger(ShellInterpreter.class);
private static final String TIMEOUT_PROPERTY = "shell.command.timeout.millisecs";
private final boolean isWindows = System.getProperty("os.name").startsWith("Windows");
private final String shell = isWindows ? "cmd /c" : "bash -c";
private Map<String, DefaultExecutor> executors;
public ShellInterpreter(Properties property) {
super(property);
@ -58,9 +57,8 @@ public class ShellInterpreter extends Interpreter {
@Override
public void open() {
logger.info("Command timeout is set as:", SHELL_COMMAND_TIMEOUT);
commandTimeOut = Integer.valueOf(getProperty(SHELL_COMMAND_TIMEOUT));
LOGGER.info("Command timeout property: {}", TIMEOUT_PROPERTY);
executors = new HashMap<String, DefaultExecutor>();
}
@Override
@ -69,7 +67,10 @@ public class ShellInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.debug("Run shell command '" + cmd + "'");
LOGGER.debug("Run shell command '" + cmd + "'");
OutputStream outStream = new ByteArrayOutputStream();
OutputStream errStream = new ByteArrayOutputStream();
CommandLine cmdLine = CommandLine.parse(shell);
// the Windows CMD shell doesn't handle multiline statements,
// they need to be delimited by '&&' instead
@ -78,62 +79,45 @@ public class ShellInterpreter extends Interpreter {
cmd = StringUtils.join(lines, " && ");
}
cmdLine.addArgument(cmd, false);
DefaultExecutor executor = new DefaultExecutor();
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
executor.setStreamHandler(new PumpStreamHandler(contextInterpreter.out, errorStream));
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
Map<String, Object> info = runningJob.info();
info.put(EXECUTOR_KEY, executor);
try {
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(new PumpStreamHandler(outStream, errStream));
executor.setWatchdog(new ExecuteWatchdog(Long.valueOf(getProperty(TIMEOUT_PROPERTY))));
executors.put(contextInterpreter.getParagraphId(), executor);
int exitVal = executor.execute(cmdLine);
logger.info("Paragraph " + contextInterpreter.getParagraphId()
+ "return with exit value: " + exitVal);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, null);
LOGGER.info("Paragraph " + contextInterpreter.getParagraphId()
+ " return with exit value: " + exitVal);
return new InterpreterResult(Code.SUCCESS, outStream.toString());
} catch (ExecuteException e) {
int exitValue = e.getExitValue();
logger.error("Can not run " + cmd, e);
LOGGER.error("Can not run " + cmd, e);
Code code = Code.ERROR;
String msg = errorStream.toString();
String message = errStream.toString();
if (exitValue == 143) {
code = Code.INCOMPLETE;
msg = msg + "Paragraph received a SIGTERM.\n";
logger.info("The paragraph " + contextInterpreter.getParagraphId()
+ " stopped executing: " + msg);
message += "Paragraph received a SIGTERM.\n";
LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
+ " stopped executing: " + message);
}
msg += "ExitValue: " + exitValue;
return new InterpreterResult(code, msg);
message += "ExitValue: " + exitValue;
return new InterpreterResult(code, message);
} catch (IOException e) {
logger.error("Can not run " + cmd, e);
LOGGER.error("Can not run " + cmd, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
}
private Job getRunningJob(String paragraphId) {
Job foundJob = null;
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
for (Job job : jobsRunning) {
if (job.getId().equals(paragraphId)) {
foundJob = job;
}
}
return foundJob;
}
@Override
public void cancel(InterpreterContext context) {
Job runningJob = getRunningJob(context.getParagraphId());
if (runningJob != null) {
Map<String, Object> info = runningJob.info();
Object object = info.get(EXECUTOR_KEY);
if (object != null) {
Executor executor = (Executor) object;
ExecuteWatchdog watchdog = executor.getWatchdog();
watchdog.destroyProcess();
for (String paragraphId : executors.keySet()) {
if (paragraphId.equals(context.getParagraphId())) {
DefaultExecutor executor = executors.get(paragraphId);
executor.getWatchdog().destroyProcess();
}
}
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;

View file

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.shell;
import static org.junit.Assert.assertEquals;
import java.util.Properties;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ShellInterpreterTest {
private ShellInterpreter shell;
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("shell.command.timeout.millisecs", "60000");
shell = new ShellInterpreter(p);
}
@After
public void tearDown() throws Exception {
}
@Test
public void test() {
shell.open();
InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null);
InterpreterResult result = new InterpreterResult(Code.ERROR);
if (System.getProperty("os.name").startsWith("Windows")) {
result = shell.interpret("dir", context);
} else {
result = shell.interpret("ls", context);
}
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
}

View file

@ -309,7 +309,7 @@ public class NotebookServer extends WebSocketServlet implements
Notebook notebook = notebook();
List<Note> notes = notebook.getAllNotes();
for (Note note : notes) {
List<String> ids = note.getNoteReplLoader().getInterpreters();
List<String> ids = notebook.getInterpreterFactory().getInterpreters(note.getId());
for (String id : ids) {
if (id.equals(interpreterGroupId)) {
broadcast(note.id(), m);
@ -752,10 +752,16 @@ public class NotebookServer extends WebSocketServlet implements
// propagate change to (Remote) AngularObjectRegistry
Note note = notebook.getNote(noteId);
if (note != null) {
Collection<InterpreterGroup> interpreterGroups = InterpreterGroup.getAll();
for (InterpreterGroup interpreterGroup : interpreterGroups) {
if (interpreterGroupId.equals(interpreterGroup.getId())) {
AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
List<InterpreterSetting> settings = notebook.getInterpreterFactory()
.getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup(note.id()) == null) {
continue;
}
if (interpreterGroupId.equals(setting.getInterpreterGroup(note.id()).getId())) {
AngularObjectRegistry angularObjectRegistry = setting
.getInterpreterGroup(note.id()).getAngularObjectRegistry();
// first trying to get local registry
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
if (ao == null) {
@ -788,8 +794,8 @@ public class NotebookServer extends WebSocketServlet implements
if (global) { // broadcast change to all web session that uses related
// interpreter.
for (Note n : notebook.getAllNotes()) {
List<InterpreterSetting> settings = note.getNoteReplLoader()
.getInterpreterSettings();
List<InterpreterSetting> settings = notebook.getInterpreterFactory()
.getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup(n.id()) == null) {
continue;
@ -1288,8 +1294,8 @@ public class NotebookServer extends WebSocketServlet implements
}
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
List<InterpreterSetting> settings = note.getNoteReplLoader()
.getInterpreterSettings();
List<InterpreterSetting> settings =
notebook().getInterpreterFactory().getInterpreterSettings(note.getId());
if (settings == null || settings.size() == 0) {
return;
}
@ -1328,8 +1334,8 @@ public class NotebookServer extends WebSocketServlet implements
continue;
}
List<InterpreterSetting> intpSettings = note.getNoteReplLoader()
.getInterpreterSettings();
List<InterpreterSetting> intpSettings = notebook.getInterpreterFactory()
.getInterpreterSettings(note.getId());
if (intpSettings.isEmpty()) {
continue;
}
@ -1353,10 +1359,15 @@ public class NotebookServer extends WebSocketServlet implements
continue;
}
broadcast(
note.id(),
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
"noteId", noteId).put("paragraphId", paragraphId));
List<String> ids = notebook.getInterpreterFactory().getInterpreters(note.getId());
for (String id : ids) {
if (id.equals(interpreterGroupId)) {
broadcast(
note.id(),
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
"noteId", noteId).put("paragraphId", paragraphId));
}
}
}
}
}

View file

@ -105,20 +105,12 @@ abstract public class AbstractZeppelinIT {
}
protected void createNewNote() {
List<WebElement> notebookLinks = driver.findElements(By
.xpath("//div[contains(@class, \"col-md-4\")]/div/ul/li"));
List<String> notebookTitles = new LinkedList<String>();
for (WebElement el : notebookLinks) {
notebookTitles.add(el.getText());
}
WebElement createNoteLink = driver.findElement(By.xpath("//div[contains(@class, \"col-md-4\")]/div/h5/a[contains(.,'Create new note')]"));
createNoteLink.click();
clickAndWait(By.xpath("//div[contains(@class, \"col-md-4\")]/div/h5/a[contains(.,'Create new" +
" note')]"));
WebDriverWait block = new WebDriverWait(driver, MAX_BROWSER_TIMEOUT_SEC);
WebElement modal = block.until(ExpectedConditions.visibilityOfElementLocated(By.id("noteNameModal")));
WebElement createNoteButton = modal.findElement(By.id("createNoteButton"));
createNoteButton.click();
block.until(ExpectedConditions.visibilityOfElementLocated(By.id("noteNameModal")));
clickAndWait(By.id("createNoteButton"));
try {
Thread.sleep(500); // wait for notebook list updated
@ -136,7 +128,7 @@ abstract public class AbstractZeppelinIT {
}
protected void clickAndWait(final By locator) {
driver.findElement(locator).click();
pollingWait(locator, MAX_IMPLICIT_WAIT).click();
ZeppelinITUtils.sleep(1000, true);
}

View file

@ -28,6 +28,7 @@ import org.junit.Test;
import org.junit.rules.ErrorCollector;
import org.openqa.selenium.By;
import org.openqa.selenium.Keys;
import org.openqa.selenium.TimeoutException;
import org.openqa.selenium.WebElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -210,8 +211,16 @@ public class ZeppelinIT extends AbstractZeppelinIT {
String artifact = "org.apache.commons:commons-csv:1.1";
depArtifact.sendKeys(artifact);
driver.findElement(By.xpath("//div[@id='spark']//form//button[1]")).click();
driver.findElement(By.xpath("//div[@class='modal-dialog'][contains(.,'Do you want to update this interpreter and restart with new settings?')]" +
"//div[@class='modal-footer']//button[contains(.,'OK')]")).click();
clickAndWait(By.xpath("//div[@class='modal-dialog'][contains(.,'Do you want to update this interpreter and restart with new settings?')]" +
"//div[@class='modal-footer']//button[contains(.,'OK')]"));
try {
clickAndWait(By.xpath("//div[@class='modal-dialog'][contains(.,'Do you want to " +
"update this interpreter and restart with new settings?')]//" +
"div[@class='bootstrap-dialog-close-button']/button"));
} catch (TimeoutException e) {
//Modal dialog got closed earlier than expected nothing to worry.
}
driver.navigate().back();
createNewNote();

View file

@ -159,7 +159,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
// restart interpreter
for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) {
for (InterpreterSetting setting : ZeppelinServer.notebook.getInterpreterFactory().getInterpreterSettings(note.getId())) {
if (setting.getName().equals("md")) {
// Call Restart Interpreter REST API
PutMethod put = httpPut("/interpreter/setting/restart/" + setting.id(), "");

View file

@ -93,7 +93,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
// get reference to interpreterGroup
InterpreterGroup interpreterGroup = null;
List<InterpreterSetting> settings = note1.getNoteReplLoader().getInterpreterSettings();
List<InterpreterSetting> settings = notebook.getInterpreterFactory().getInterpreterSettings(note1.getId());
for (InterpreterSetting setting : settings) {
if (setting.getName().equals("md")) {
interpreterGroup = setting.getInterpreterGroup("sharedProcess");

View file

@ -29,7 +29,7 @@ limitations under the License.
</div>
</div>
<div class="box width-full home">
<div class="box width-full">
<div>
<div class="row configuration">
<div class="col-md-12">

View file

@ -28,7 +28,7 @@ limitations under the License.
</div>
</div>
<div class="box width-full home"
<div class="box width-full"
>
<div>
<div class="row interpreter">

View file

@ -102,7 +102,7 @@ angular.module('zeppelinWebApp').controller('InterpreterCtrl', function($scope,
};
$scope.updateInterpreterSetting = function(form, settingId) {
BootstrapDialog.confirm({
var thisConfirm = BootstrapDialog.confirm({
closable: true,
title: '',
message: 'Do you want to update this interpreter and restart with new settings?',
@ -133,16 +133,23 @@ angular.module('zeppelinWebApp').controller('InterpreterCtrl', function($scope,
dependencies: angular.copy(setting.dependencies)
};
$http.put(baseUrlSrv.getRestApiBase() + '/interpreter/setting/' + settingId, request).
success(function (data, status, headers, config) {
thisConfirm.$modalFooter.find('button').addClass('disabled');
thisConfirm.$modalFooter.find('button:contains("OK")')
.html('<i class="fa fa-circle-o-notch fa-spin"></i> Saving Setting');
$http.put(baseUrlSrv.getRestApiBase() + '/interpreter/setting/' + settingId, request)
.success(function(data, status, headers, config) {
$scope.interpreterSettings[index] = data.body;
removeTMPSettings(index);
}).
error(function (data, status, headers, config) {
thisConfirm.close();
})
.error(function(data, status, headers, config) {
console.log('Error %o %o', status, data.message);
ngToast.danger({content: data.message, verticalPosition: 'bottom'});
form.$show();
thisConfirm.close();
});
return false;
}
}
});

View file

@ -86,7 +86,7 @@ limitations under the License.
<div ng-include src="'app/interpreter/interpreter-create/interpreter-create.html'"></div>
</div>
<div class="box width-full home"
<div class="box width-full"
ng-repeat="setting in interpreterSettings | orderBy: 'group' | filter: searchInterpreter">
<div id="{{setting.name | lowercase}}">
<div class="row interpreter">

View file

@ -37,7 +37,6 @@ import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.notebook.NoteInterpreterLoader;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.slf4j.Logger;
@ -63,27 +62,28 @@ import java.util.*;
* Manage interpreters.
*/
public class InterpreterFactory implements InterpreterGroupFactory {
Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
private static Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
private Map<String, URLClassLoader> cleanCl = Collections
.synchronizedMap(new HashMap<String, URLClassLoader>());
private static final String SHARED_SESSION = "shared_session";
private Map<String, URLClassLoader> cleanCl =
Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
private ZeppelinConfiguration conf;
@Deprecated
String[] interpreterClassList;
String[] interpreterGroupOrderList;
private String[] interpreterClassList;
private String[] interpreterGroupOrderList;
private Map<String, InterpreterSetting> interpreterSettings =
new HashMap<String, InterpreterSetting>();
private Map<String, InterpreterSetting> interpreterSettings = new HashMap<>();
private Map<String, List<String>> interpreterBindings = new HashMap<String, List<String>>();
private Map<String, List<String>> interpreterBindings = new HashMap<>();
private List<RemoteRepository> interpreterRepositories;
private Gson gson;
private InterpreterOption defaultOption;
AngularObjectRegistryListener angularObjectRegistryListener;
private AngularObjectRegistryListener angularObjectRegistryListener;
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
private final ApplicationEventListener appEventListener;
@ -241,7 +241,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
private void registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
String interpreterJson)
String interpreterJson)
throws MalformedURLException {
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
@ -257,7 +257,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
private void registerInterpreterFromPath(String interpreterDir,
String interpreterJson) throws IOException {
String interpreterJson) throws IOException {
Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson);
if (Files.exists(interpreterJsonPath)) {
@ -280,11 +280,11 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
String absolutePath) {
String absolutePath) {
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
String className = registeredInterpreter.getClassName();
if (validateRegisterInterpreter(registeredInterpreter) &&
null == Interpreter.findRegisteredInterpreterByClassName(className)) {
null == Interpreter.findRegisteredInterpreterByClassName(className)) {
registeredInterpreter.setPath(absolutePath);
Interpreter.register(registeredInterpreter);
logger.debug("Registered. key: {}, className: {}, path: {}",
@ -296,7 +296,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
private boolean validateRegisterInterpreter(RegisteredInterpreter registeredInterpreter) {
return null != registeredInterpreter.getGroup() && null != registeredInterpreter.getName() &&
null != registeredInterpreter.getClassName();
null != registeredInterpreter.getClassName();
}
private void loadFromFile() throws IOException {
@ -334,14 +334,9 @@ public class InterpreterFactory implements InterpreterGroupFactory {
// previously created setting should turn this feature on here.
setting.getOption().setRemote(true);
InterpreterSetting intpSetting = new InterpreterSetting(
setting.id(),
setting.getName(),
setting.getGroup(),
setting.getInterpreterInfos(),
setting.getProperties(),
setting.getDependencies(),
setting.getOption());
InterpreterSetting intpSetting = new InterpreterSetting(setting.id(), setting.getName(),
setting.getGroup(), setting.getInterpreterInfos(), setting.getProperties(),
setting.getDependencies(), setting.getOption());
intpSetting.setInterpreterGroupFactory(this);
interpreterSettings.put(k, intpSetting);
@ -369,18 +364,14 @@ public class InterpreterFactory implements InterpreterGroupFactory {
// load dependencies
List<Dependency> deps = intSetting.getDependencies();
if (deps != null) {
for (Dependency d: deps) {
for (Dependency d : deps) {
File destDir = new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
if (d.getExclusions() != null) {
depResolver.load(
d.getGroupArtifactVersion(),
d.getExclusions(),
depResolver.load(d.getGroupArtifactVersion(), d.getExclusions(),
new File(destDir, intSetting.id()));
} else {
depResolver.load(
d.getGroupArtifactVersion(),
new File(destDir, intSetting.id()));
depResolver.load(d.getGroupArtifactVersion(), new File(destDir, intSetting.id()));
}
}
}
@ -410,29 +401,19 @@ public class InterpreterFactory implements InterpreterGroupFactory {
fos.close();
}
private RegisteredInterpreter getRegisteredReplInfoFromClassName(String clsName) {
Set<String> keys = Interpreter.registeredInterpreters.keySet();
for (String intName : keys) {
RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
if (clsName.equals(info.getClassName())) {
return info;
}
}
return null;
}
/**
* Return ordered interpreter setting list.
* The list does not contain more than one setting from the same interpreter class.
* Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name
*
* @return
*/
public List<String> getDefaultInterpreterSettingList() {
// this list will contain default interpreter setting list
List<String> defaultSettings = new LinkedList<String>();
List<String> defaultSettings = new LinkedList<>();
// to ignore the same interpreter group
Map<String, Boolean> interpreterGroupCheck = new HashMap<String, Boolean>();
Map<String, Boolean> interpreterGroupCheck = new HashMap<>();
List<InterpreterSetting> sortedSettings = get();
@ -454,15 +435,14 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
/**
* @param name user defined name
* @param groupName interpreter group name to instantiate
* @param name user defined name
* @param groupName interpreter group name to instantiate
* @param properties
* @return
* @throws InterpreterException
* @throws IOException
*/
public InterpreterSetting add(String name, String groupName,
List<Dependency> dependencies,
public InterpreterSetting add(String name, String groupName, List<Dependency> dependencies,
InterpreterOption option, Properties properties)
throws InterpreterException, IOException, RepositoryException {
synchronized (interpreterSettings) {
@ -482,13 +462,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
InterpreterSetting intpSetting = new InterpreterSetting(
name,
groupName,
interpreterInfos,
properties,
dependencies,
option);
InterpreterSetting intpSetting = new InterpreterSetting(name, groupName, interpreterInfos,
properties, dependencies, option);
if (dependencies.size() > 0) {
loadInterpreterDependencies(intpSetting);
@ -513,15 +488,10 @@ public class InterpreterFactory implements InterpreterGroupFactory {
InterpreterGroup interpreterGroup = new InterpreterGroup(id);
if (option.isRemote()) {
angularObjectRegistry = new RemoteAngularObjectRegistry(
id,
angularObjectRegistryListener,
interpreterGroup
);
angularObjectRegistry = new RemoteAngularObjectRegistry(id, angularObjectRegistryListener,
interpreterGroup);
} else {
angularObjectRegistry = new AngularObjectRegistry(
id,
angularObjectRegistryListener);
angularObjectRegistry = new AngularObjectRegistry(id, angularObjectRegistryListener);
// TODO(moon) : create distributed resource pool for local interpreters and set
}
@ -530,8 +500,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
return interpreterGroup;
}
public void removeInterpretersForNote(InterpreterSetting interpreterSetting,
String noteId) {
public void removeInterpretersForNote(InterpreterSetting interpreterSetting, String noteId) {
if (interpreterSetting.getOption().isPerNoteProcess()) {
interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
} else if (interpreterSetting.getOption().isPerNoteSession()) {
@ -549,9 +518,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
public void createInterpretersForNote(
InterpreterSetting interpreterSetting,
String noteId,
public void createInterpretersForNote(InterpreterSetting interpreterSetting, String noteId,
String key) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(noteId);
String groupName = interpreterSetting.getGroup();
@ -568,10 +535,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
// in ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT msec. However, if termination of the process and
// removal from interpreter group take too long, throw an error.
long minTimeout = 10L * 1000 * 1000000; // 10 sec
long interpreterRemovalWaitTimeout =
Math.max(
minTimeout,
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
long interpreterRemovalWaitTimeout = Math.max(minTimeout,
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
while (interpreterGroup.containsKey(key)) {
if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) {
throw new InterpreterException("Can not create interpreter");
@ -606,15 +571,13 @@ public class InterpreterFactory implements InterpreterGroupFactory {
interpreterSetting.id());
}
} else {
intp = createRepl(info.getPath(),
info.getClassName(),
properties);
intp = createRepl(info.getPath(), info.getClassName(), properties);
}
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(key);
if (interpreters == null) {
interpreters = new LinkedList<Interpreter>();
interpreters = new LinkedList<>();
interpreterGroup.put(key, interpreters);
}
if (info.isDefaultInterpreter()) {
@ -630,7 +593,6 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
public void remove(String id) throws IOException {
synchronized (interpreterSettings) {
if (interpreterSettings.containsKey(id)) {
@ -657,11 +619,12 @@ public class InterpreterFactory implements InterpreterGroupFactory {
/**
* Get interpreter settings
*
* @return
*/
public List<InterpreterSetting> get() {
synchronized (interpreterSettings) {
List<InterpreterSetting> orderedSettings = new LinkedList<InterpreterSetting>();
List<InterpreterSetting> orderedSettings = new LinkedList<>();
Map<String, List<InterpreterSetting>> groupNameInterpreterSettingMap = new HashMap<>();
for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
@ -710,9 +673,9 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
public void putNoteInterpreterSettingBinding(String noteId,
List<String> settingList) throws IOException {
List<String> unBindedSettings = new LinkedList<String>();
private void putNoteInterpreterSettingBinding(String noteId, List<String> settingList)
throws IOException {
List<String> unBindedSettings = new LinkedList<>();
synchronized (interpreterSettings) {
List<String> oldSettings = interpreterBindings.get(noteId);
@ -743,8 +706,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
public List<String> getNoteInterpreterSettingBinding(String noteId) {
LinkedList<String> bindings = new LinkedList<String>();
private List<String> getNoteInterpreterSettingBinding(String noteId) {
LinkedList<String> bindings = new LinkedList<>();
synchronized (interpreterSettings) {
List<String> settingIds = interpreterBindings.get(noteId);
if (settingIds != null) {
@ -756,14 +719,13 @@ public class InterpreterFactory implements InterpreterGroupFactory {
/**
* Change interpreter property and restart
*
* @param id
* @param option
* @param properties
* @throws IOException
*/
public void setPropertyAndRestart(String id,
InterpreterOption option,
Properties properties,
public void setPropertyAndRestart(String id, InterpreterOption option, Properties properties,
List<Dependency> dependencies) throws IOException, RepositoryException {
synchronized (interpreterSettings) {
InterpreterSetting intpsetting = interpreterSettings.get(id);
@ -824,7 +786,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
public void close() {
List<Thread> closeThreads = new LinkedList<Thread>();
List<Thread> closeThreads = new LinkedList<>();
synchronized (interpreterSettings) {
Collection<InterpreterSetting> intpsettings = interpreterSettings.values();
for (final InterpreterSetting intpsetting : intpsettings) {
@ -860,7 +822,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
URLClassLoader ccl = cleanCl.get(dirName);
if (ccl == null) {
// classloader fallback
ccl = URLClassLoader.newInstance(new URL[] {}, oldcl);
ccl = URLClassLoader.newInstance(new URL[]{}, oldcl);
}
boolean separateCL = true;
@ -870,13 +832,13 @@ public class InterpreterFactory implements InterpreterGroupFactory {
separateCL = false;
}
} catch (Exception e) {
logger.error("exception checking server classloader driver" , e);
logger.error("exception checking server classloader driver", e);
}
URLClassLoader cl;
if (separateCL == true) {
cl = URLClassLoader.newInstance(new URL[] {}, ccl);
cl = URLClassLoader.newInstance(new URL[]{}, ccl);
} else {
cl = ccl;
}
@ -884,7 +846,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
Constructor<Interpreter> constructor =
replClass.getConstructor(new Class[] {Properties.class});
replClass.getConstructor(new Class[]{Properties.class});
Interpreter repl = constructor.newInstance(property);
repl.setClassloaderUrls(ccl.getURLs());
LazyOpenInterpreter intp = new LazyOpenInterpreter(
@ -938,6 +900,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
updatePropertiesFromRegisteredInterpreter(property, className);
RemoteInterpreter remoteInterpreter = new RemoteInterpreter(
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout,
@ -948,7 +911,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
private Properties updatePropertiesFromRegisteredInterpreter(Properties properties,
String className) {
String className) {
RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName(
className);
if (null != registeredInterpreter) {
@ -963,10 +926,173 @@ public class InterpreterFactory implements InterpreterGroupFactory {
return properties;
}
/**
* map interpreter ids into noteId
*
* @param noteId note id
* @param ids InterpreterSetting id list
* @throws IOException
*/
public void setInterpreters(String noteId, List<String> ids) throws IOException {
putNoteInterpreterSettingBinding(noteId, ids);
}
public List<String> getInterpreters(String noteId) {
return getNoteInterpreterSettingBinding(noteId);
}
public List<InterpreterSetting> getInterpreterSettings(String noteId) {
List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId);
LinkedList<InterpreterSetting> settings = new LinkedList<>();
synchronized (interpreterSettingIds) {
for (String id : interpreterSettingIds) {
InterpreterSetting setting = get(id);
if (setting == null) {
// interpreter setting is removed from factory. remove id from here, too
interpreterSettingIds.remove(id);
} else {
settings.add(setting);
}
}
}
return settings;
}
public void closeNote(String noteId) {
// close interpreters in this note session
List<InterpreterSetting> settings = getInterpreterSettings(noteId);
if (settings == null || settings.size() == 0) {
return;
}
logger.info("closeNote: {}", noteId);
for (InterpreterSetting setting : settings) {
removeInterpretersForNote(setting, noteId);
}
}
private String getInterpreterInstanceKey(String noteId, InterpreterSetting setting) {
if (setting.getOption().isExistingProcess()) {
return Constants.EXISTING_PROCESS;
} else if (setting.getOption().isPerNoteSession() || setting.getOption().isPerNoteProcess()) {
return noteId;
} else {
return SHARED_SESSION;
}
}
private List<Interpreter> createOrGetInterpreterList(String noteId, InterpreterSetting setting) {
InterpreterGroup interpreterGroup = setting.getInterpreterGroup(noteId);
synchronized (interpreterGroup) {
String key = getInterpreterInstanceKey(noteId, setting);
if (!interpreterGroup.containsKey(key)) {
createInterpretersForNote(setting, noteId, key);
}
return interpreterGroup.get(getInterpreterInstanceKey(noteId, setting));
}
}
private InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
if (settings == null || settings.isEmpty()) {
return null;
}
return settings.get(0);
}
public InterpreterSetting getDefaultInterpreterSetting(String noteId) {
return getDefaultInterpreterSetting(getInterpreterSettings(noteId));
}
public Interpreter getInterpreter(String noteId, String replName) {
List<InterpreterSetting> settings = getInterpreterSettings(noteId);
if (settings == null || settings.size() == 0) {
return null;
}
if (replName == null || replName.trim().length() == 0) {
// get default settings (first available)
// TODO(jl): Fix it in case of returning null
InterpreterSetting defaultSettings = getDefaultInterpreterSetting(settings);
return createOrGetInterpreterList(noteId, defaultSettings).get(0);
}
if (Interpreter.registeredInterpreters == null) {
return null;
}
String[] replNameSplit = replName.split("\\.");
String group = null;
String name = null;
if (replNameSplit.length == 2) {
group = replNameSplit[0];
name = replNameSplit[1];
Interpreter.RegisteredInterpreter registeredInterpreter = Interpreter.registeredInterpreters
.get(group + "." + name);
if (registeredInterpreter == null
|| registeredInterpreter.getClassName() == null) {
throw new InterpreterException(replName + " interpreter not found");
}
String interpreterClassName = registeredInterpreter.getClassName();
for (InterpreterSetting setting : settings) {
if (registeredInterpreter.getGroup().equals(setting.getGroup())) {
List<Interpreter> intpGroup = createOrGetInterpreterList(noteId, setting);
for (Interpreter interpreter : intpGroup) {
if (interpreterClassName.equals(interpreter.getClassName())) {
return interpreter;
}
}
}
}
throw new InterpreterException(replName + " interpreter not found");
} else {
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
// search 'name' from first (default) interpreter group
InterpreterSetting defaultSetting = getDefaultInterpreterSetting(settings);
Interpreter.RegisteredInterpreter registeredInterpreter =
Interpreter.registeredInterpreters.get(defaultSetting.getGroup() + "." + replName);
if (registeredInterpreter != null) {
List<Interpreter> interpreters = createOrGetInterpreterList(noteId, defaultSetting);
for (Interpreter interpreter : interpreters) {
RegisteredInterpreter intp =
Interpreter.findRegisteredInterpreterByClassName(interpreter.getClassName());
if (intp == null) {
continue;
}
if (intp.getName().equals(replName)) {
return interpreter;
}
}
throw new InterpreterException(
defaultSetting.getGroup() + "." + replName + " interpreter not found");
}
// next, assume replName is 'group' of interpreter ('name' is ommitted)
// search interpreter group and return first interpreter.
for (InterpreterSetting setting : settings) {
if (setting.getGroup().equals(replName)) {
List<Interpreter> interpreters = createOrGetInterpreterList(noteId, setting);
return interpreters.get(0);
}
}
}
// dev interpreter
if (DevInterpreter.isInterpreterName(replName)) {
return getDevInterpreter();
}
return null;
}
private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
URL[] urls = new URL[0];
if (path == null || path.exists() == false) {
if (path == null || !path.exists()) {
return urls;
} else if (path.getName().startsWith(".")) {
return urls;
@ -979,7 +1105,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
return urls;
} else {
return new URL[] {path.toURI().toURL()};
return new URL[]{path.toURI().toURL()};
}
}

View file

@ -74,7 +74,7 @@ public class Note implements Serializable, ParagraphJobListener {
@SuppressWarnings("rawtypes")
Map<String, List<AngularObject>> angularObjects = new HashMap<>();
private transient NoteInterpreterLoader replLoader;
private transient InterpreterFactory factory;
private transient JobListenerFactory jobListenerFactory;
private transient NotebookRepo repo;
private transient SearchService index;
@ -99,11 +99,11 @@ public class Note implements Serializable, ParagraphJobListener {
public Note() {}
public Note(NotebookRepo repo, NoteInterpreterLoader replLoader,
public Note(NotebookRepo repo, InterpreterFactory factory,
JobListenerFactory jlFactory, SearchService noteIndex, Credentials credentials,
NoteEventListener noteEventListener) {
this.repo = repo;
this.replLoader = replLoader;
this.factory = factory;
this.jobListenerFactory = jlFactory;
this.index = noteIndex;
this.noteEventListener = noteEventListener;
@ -116,8 +116,8 @@ public class Note implements Serializable, ParagraphJobListener {
}
private String getDefaultInterpreterName() {
Optional<InterpreterSetting> settingOptional = replLoader.getDefaultInterpreterSetting();
return settingOptional.isPresent() ? settingOptional.get().getGroup() : StringUtils.EMPTY;
InterpreterSetting setting = factory.getDefaultInterpreterSetting(getId());
return null != setting ? setting.getGroup() : StringUtils.EMPTY;
}
void putDefaultReplName() {
@ -158,12 +158,8 @@ public class Note implements Serializable, ParagraphJobListener {
this.name = name;
}
public NoteInterpreterLoader getNoteReplLoader() {
return replLoader;
}
public void setReplLoader(NoteInterpreterLoader replLoader) {
this.replLoader = replLoader;
public void setInterpreterFactory(InterpreterFactory factory) {
this.factory = factory;
}
public JobListenerFactory getJobListenerFactory() {
@ -205,7 +201,7 @@ public class Note implements Serializable, ParagraphJobListener {
*/
public Paragraph addParagraph() {
Paragraph p = new Paragraph(this, this, replLoader);
Paragraph p = new Paragraph(this, this, factory);
addLastReplNameIfEmptyText(p);
synchronized (paragraphs) {
paragraphs.add(p);
@ -224,7 +220,7 @@ public class Note implements Serializable, ParagraphJobListener {
public void addCloneParagraph(Paragraph srcParagraph) {
// Keep paragraph original ID
final Paragraph newParagraph = new Paragraph(srcParagraph.getId(), this, this, replLoader);
final Paragraph newParagraph = new Paragraph(srcParagraph.getId(), this, this, factory);
Map<String, Object> config = new HashMap<>(srcParagraph.getConfig());
Map<String, Object> param = new HashMap<>(srcParagraph.settings.getParams());
@ -261,7 +257,7 @@ public class Note implements Serializable, ParagraphJobListener {
* @param index
*/
public Paragraph insertParagraph(int index) {
Paragraph p = new Paragraph(this, this, replLoader);
Paragraph p = new Paragraph(this, this, factory);
addLastReplNameIfEmptyText(p);
synchronized (paragraphs) {
paragraphs.add(index, p);
@ -450,9 +446,11 @@ public class Note implements Serializable, ParagraphJobListener {
AuthenticationInfo authenticationInfo = new AuthenticationInfo();
authenticationInfo.setUser(cronExecutingUser);
p.setAuthenticationInfo(authenticationInfo);
p.setNoteReplLoader(replLoader);
p.setListener(this);
Interpreter intp = replLoader.get(p.getRequiredReplName());
p.setInterpreterFactory(factory);
p.setListener(jobListenerFactory.getParagraphJobListener(this));
Interpreter intp = factory.getInterpreter(getId(), p.getRequiredReplName());
intp.getScheduler().submit(p);
}
}
@ -465,15 +463,14 @@ public class Note implements Serializable, ParagraphJobListener {
*/
public void run(String paragraphId) {
Paragraph p = getParagraph(paragraphId);
p.setNoteReplLoader(replLoader);
p.setListener(this);
p.setInterpreterFactory(factory);
p.setListener(jobListenerFactory.getParagraphJobListener(this));
String requiredReplName = p.getRequiredReplName();
Interpreter intp = replLoader.get(requiredReplName);
Interpreter intp = factory.getInterpreter(getId(), requiredReplName);
if (intp == null) {
// TODO(jongyoul): Make "%jdbc" configurable from JdbcInterpreter
if (conf.getUseJdbcAlias() && null != (intp = replLoader.get("jdbc"))) {
if (conf.getUseJdbcAlias() && null != (intp = factory.getInterpreter(getId(), "jdbc"))) {
String pText = p.getText().replaceFirst(requiredReplName, "jdbc(" + requiredReplName + ")");
logger.debug("New paragraph: {}", pText);
p.setEffectiveText(pText);
@ -504,7 +501,7 @@ public class Note implements Serializable, ParagraphJobListener {
public List<InterpreterCompletion> completion(String paragraphId, String buffer, int cursor) {
Paragraph p = getParagraph(paragraphId);
p.setNoteReplLoader(replLoader);
p.setInterpreterFactory(factory);
p.setListener(jobListenerFactory.getParagraphJobListener(this));
List completion = p.completion(buffer, cursor);
@ -520,7 +517,7 @@ public class Note implements Serializable, ParagraphJobListener {
private void snapshotAngularObjectRegistry() {
angularObjects = new HashMap<>();
List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
List<InterpreterSetting> settings = factory.getInterpreterSettings(getId());
if (settings == null || settings.size() == 0) {
return;
}
@ -535,7 +532,7 @@ public class Note implements Serializable, ParagraphJobListener {
private void removeAllAngularObjectInParagraph(String paragraphId) {
angularObjects = new HashMap<String, List<AngularObject>>();
List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
List<InterpreterSetting> settings = factory.getInterpreterSettings(getId());
if (settings == null || settings.size() == 0) {
return;
}

View file

@ -1,213 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook;
import com.google.common.base.Optional;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.dev.DevInterpreter;
/**
* Interpreter loader per note.
*/
public class NoteInterpreterLoader {
private transient InterpreterFactory factory;
private static String SHARED_SESSION = "shared_session";
String noteId;
public NoteInterpreterLoader(InterpreterFactory factory) {
this.factory = factory;
}
public void setNoteId(String noteId) {
this.noteId = noteId;
}
/**
* set interpreter ids
* @param ids InterpreterSetting id list
* @throws IOException
*/
public void setInterpreters(List<String> ids) throws IOException {
factory.putNoteInterpreterSettingBinding(noteId, ids);
}
public List<String> getInterpreters() {
return factory.getNoteInterpreterSettingBinding(noteId);
}
public List<InterpreterSetting> getInterpreterSettings() {
List<String> interpreterSettingIds = factory.getNoteInterpreterSettingBinding(noteId);
LinkedList<InterpreterSetting> settings = new LinkedList<InterpreterSetting>();
synchronized (interpreterSettingIds) {
for (String id : interpreterSettingIds) {
InterpreterSetting setting = factory.get(id);
if (setting == null) {
// interpreter setting is removed from factory. remove id from here, too
interpreterSettingIds.remove(id);
} else {
settings.add(setting);
}
}
}
return settings;
}
private String getInterpreterInstanceKey(InterpreterSetting setting) {
if (setting.getOption().isExistingProcess()) {
return Constants.EXISTING_PROCESS;
} else if (setting.getOption().isPerNoteSession() || setting.getOption().isPerNoteProcess()) {
return noteId;
} else {
return SHARED_SESSION;
}
}
private List<Interpreter> createOrGetInterpreterList(InterpreterSetting setting) {
InterpreterGroup interpreterGroup =
setting.getInterpreterGroup(noteId);
synchronized (interpreterGroup) {
String key = getInterpreterInstanceKey(setting);
if (!interpreterGroup.containsKey(key)) {
factory.createInterpretersForNote(setting, noteId, key);
}
return interpreterGroup.get(getInterpreterInstanceKey(setting));
}
}
public void close() {
// close interpreters in this note session
List<InterpreterSetting> settings = this.getInterpreterSettings();
if (settings == null || settings.size() == 0) {
return;
}
System.err.println("close");
for (InterpreterSetting setting : settings) {
factory.removeInterpretersForNote(setting, noteId);
}
}
public Interpreter get(String replName) {
List<InterpreterSetting> settings = getInterpreterSettings();
if (settings == null || settings.size() == 0) {
return null;
}
if (replName == null || replName.trim().length() == 0) {
// get default settings (first available)
InterpreterSetting defaultSettings = getDefaultInterpreterSetting(settings).get();
return createOrGetInterpreterList(defaultSettings).get(0);
}
if (Interpreter.registeredInterpreters == null) {
return null;
}
String[] replNameSplit = replName.split("\\.");
String group = null;
String name = null;
if (replNameSplit.length == 2) {
group = replNameSplit[0];
name = replNameSplit[1];
Interpreter.RegisteredInterpreter registeredInterpreter = Interpreter.registeredInterpreters
.get(group + "." + name);
if (registeredInterpreter == null
|| registeredInterpreter.getClassName() == null) {
throw new InterpreterException(replName + " interpreter not found");
}
String interpreterClassName = registeredInterpreter.getClassName();
for (InterpreterSetting setting : settings) {
if (registeredInterpreter.getGroup().equals(setting.getGroup())) {
List<Interpreter> intpGroup = createOrGetInterpreterList(setting);
for (Interpreter interpreter : intpGroup) {
if (interpreterClassName.equals(interpreter.getClassName())) {
return interpreter;
}
}
}
}
throw new InterpreterException(replName + " interpreter not found");
} else {
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
// search 'name' from first (default) interpreter group
InterpreterSetting defaultSetting = getDefaultInterpreterSetting(settings).get();
Interpreter.RegisteredInterpreter registeredInterpreter =
Interpreter.registeredInterpreters.get(defaultSetting.getGroup() + "." + replName);
if (registeredInterpreter != null) {
List<Interpreter> interpreters = createOrGetInterpreterList(defaultSetting);
for (Interpreter interpreter : interpreters) {
RegisteredInterpreter intp =
Interpreter.findRegisteredInterpreterByClassName(interpreter.getClassName());
if (intp == null) {
continue;
}
if (intp.getName().equals(replName)) {
return interpreter;
}
}
throw new InterpreterException(
defaultSetting.getGroup() + "." + replName + " interpreter not found");
}
// next, assume replName is 'group' of interpreter ('name' is ommitted)
// search interpreter group and return first interpreter.
for (InterpreterSetting setting : settings) {
if (setting.getGroup().equals(replName)) {
List<Interpreter> interpreters = createOrGetInterpreterList(setting);
return interpreters.get(0);
}
}
}
// dev interpreter
if (DevInterpreter.isInterpreterName(replName)) {
return factory.getDevInterpreter();
}
return null;
}
private Optional<InterpreterSetting>
getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
if (settings == null || settings.isEmpty()) {
return Optional.absent();
}
return Optional.of(settings.get(0));
}
Optional<InterpreterSetting> getDefaultInterpreterSetting() {
return getDefaultInterpreterSetting(getInterpreterSettings());
}
}

View file

@ -154,15 +154,13 @@ public class Notebook implements NoteEventListener {
*/
public Note createNote(List<String> interpreterIds, AuthenticationInfo subject)
throws IOException {
NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory);
Note note = new Note(
notebookRepo,
intpLoader,
replFactory,
jobListenerFactory,
notebookIndex,
credentials,
this);
intpLoader.setNoteId(note.id());
synchronized (notes) {
notes.put(note.id(), note);
}
@ -265,17 +263,17 @@ public class Notebook implements NoteEventListener {
}
public void bindInterpretersToNote(String id,
List<String> newBindings) throws IOException {
List<String> interpreterSettingIds) throws IOException {
Note note = getNote(id);
if (note != null) {
List<InterpreterSetting> currentBindings = note.getNoteReplLoader().getInterpreterSettings();
List<InterpreterSetting> currentBindings = replFactory.getInterpreterSettings(id);
for (InterpreterSetting setting : currentBindings) {
if (!newBindings.contains(setting.id())) {
if (!interpreterSettingIds.contains(setting.id())) {
fireUnbindInterpreter(note, setting);
}
}
note.getNoteReplLoader().setInterpreters(newBindings);
replFactory.setInterpreters(note.getId(), interpreterSettingIds);
// comment out while note.getNoteReplLoader().setInterpreters(...) do the same
// replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
}
@ -284,7 +282,7 @@ public class Notebook implements NoteEventListener {
public List<String> getBindedInterpreterSettingsIds(String id) {
Note note = getNote(id);
if (note != null) {
return note.getNoteReplLoader().getInterpreters();
return getInterpreterFactory().getInterpreters(note.getId());
} else {
return new LinkedList<String>();
}
@ -293,7 +291,7 @@ public class Notebook implements NoteEventListener {
public List<InterpreterSetting> getBindedInterpreterSettings(String id) {
Note note = getNote(id);
if (note != null) {
return note.getNoteReplLoader().getInterpreterSettings();
return replFactory.getInterpreterSettings(note.getId());
} else {
return new LinkedList<InterpreterSetting>();
}
@ -384,9 +382,7 @@ public class Notebook implements NoteEventListener {
note.setIndex(this.notebookIndex);
note.setCredentials(this.credentials);
NoteInterpreterLoader replLoader = new NoteInterpreterLoader(replFactory);
note.setReplLoader(replLoader);
replLoader.setNoteId(note.id());
note.setInterpreterFactory(replFactory);
note.setJobListenerFactory(jobListenerFactory);
note.setNotebookRepo(notebookRepo);
@ -646,9 +642,9 @@ public class Notebook implements NoteEventListener {
// set interpreter bind type
String interpreterGroupName = null;
if (note.getNoteReplLoader().getInterpreterSettings() != null &&
note.getNoteReplLoader().getInterpreterSettings().size() >= 1) {
interpreterGroupName = note.getNoteReplLoader().getInterpreterSettings().get(0).getGroup();
if (replFactory.getInterpreterSettings(note.getId()) != null &&
replFactory.getInterpreterSettings(note.getId()).size() >= 1) {
interpreterGroupName = replFactory.getInterpreterSettings(note.getId()).get(0).getGroup();
}
// not update and not running -> pass
@ -698,7 +694,8 @@ public class Notebook implements NoteEventListener {
logger.error(e.getMessage(), e);
}
if (releaseResource) {
for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) {
for (InterpreterSetting setting :
notebook.getInterpreterFactory().getInterpreterSettings(note.getId())) {
notebook.getInterpreterFactory().restart(setting.id());
}
}

View file

@ -50,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting;
public class Paragraph extends Job implements Serializable, Cloneable {
private static final long serialVersionUID = -6328572073497992016L;
private transient NoteInterpreterLoader replLoader;
private transient InterpreterFactory factory;
private transient Note note;
private transient AuthenticationInfo authenticationInfo;
private transient String effectiveText;
@ -75,10 +75,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
}
public Paragraph(String paragraphId, Note note, JobListener listener,
NoteInterpreterLoader replLoader) {
InterpreterFactory factory) {
super(paragraphId, generateId(), listener);
this.note = note;
this.replLoader = replLoader;
this.factory = factory;
title = null;
text = null;
authenticationInfo = null;
@ -88,10 +88,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
config = new HashMap<String, Object>();
}
public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoader) {
public Paragraph(Note note, JobListener listener, InterpreterFactory factory) {
super(generateId(), listener);
this.note = note;
this.replLoader = replLoader;
this.factory = factory;
title = null;
text = null;
authenticationInfo = null;
@ -200,15 +200,8 @@ public class Paragraph extends Job implements Serializable, Cloneable {
return text.substring(magic.length() + 1).trim();
}
public NoteInterpreterLoader getNoteReplLoader() {
return replLoader;
}
public Interpreter getRepl(String name) {
if (replLoader == null) {
return null;
}
return replLoader.get(name);
return factory.getInterpreter(note.getId(), name);
}
public Interpreter getCurrentRepl() {
@ -230,8 +223,8 @@ public class Paragraph extends Job implements Serializable, Cloneable {
return completion;
}
public void setNoteReplLoader(NoteInterpreterLoader repls) {
this.replLoader = repls;
public void setInterpreterFactory(InterpreterFactory factory) {
this.factory = factory;
}
public InterpreterResult getResult() {
@ -376,8 +369,8 @@ public class Paragraph extends Job implements Serializable, Cloneable {
AngularObjectRegistry registry = null;
ResourcePool resourcePool = null;
if (!getNoteReplLoader().getInterpreterSettings().isEmpty()) {
InterpreterSetting intpGroup = getNoteReplLoader().getInterpreterSettings().get(0);
if (!factory.getInterpreterSettings(note.getId()).isEmpty()) {
InterpreterSetting intpGroup = factory.getInterpreterSettings(note.getId()).get(0);
registry = intpGroup.getInterpreterGroup(note.id()).getAngularObjectRegistry();
resourcePool = intpGroup.getInterpreterGroup(note.id()).getResourcePool();
}

View file

@ -135,7 +135,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
new String[][]{});
Note note1 = notebook.createNote(null);
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note1.getId(),factory.getDefaultInterpreterSettingList());
Paragraph p1 = note1.addParagraph();
@ -179,7 +179,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
new String[][]{});
Note note1 = notebook.createNote(null);
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note1.id(), factory.getDefaultInterpreterSettingList());
Paragraph p1 = note1.addParagraph();

View file

@ -71,102 +71,92 @@ public class NoteInterpreterLoaderTest {
@Test
public void testGetInterpreter() throws IOException {
NoteInterpreterLoader loader = new NoteInterpreterLoader(factory);
loader.setNoteId("note");
loader.setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters("note", factory.getDefaultInterpreterSettingList());
// when there're no interpreter selection directive
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get(null).getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get("").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get(" ").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", null).getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", "").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", " ").getClassName());
// when group name is omitted
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", loader.get("mock11").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", factory.getInterpreter("note", "mock11").getClassName());
// when 'name' is ommitted
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get("group1").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", loader.get("group2").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", "group1").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", factory.getInterpreter("note", "group2").getClassName());
// when nothing is ommitted
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get("group1.mock1").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", loader.get("group1.mock11").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", loader.get("group2.mock2").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", "group1.mock1").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", factory.getInterpreter("note", "group1.mock11").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", factory.getInterpreter("note", "group2.mock2").getClassName());
loader.close();
factory.closeNote("note");
}
@Test
public void testNoteSession() throws IOException {
NoteInterpreterLoader loaderA = new NoteInterpreterLoader(factory);
loaderA.setNoteId("noteA");
loaderA.setInterpreters(factory.getDefaultInterpreterSettingList());
loaderA.getInterpreterSettings().get(0).getOption().setPerNoteSession(true);
factory.setInterpreters("noteA", factory.getDefaultInterpreterSettingList());
factory.getInterpreterSettings("noteA").get(0).getOption().setPerNoteSession(true);
NoteInterpreterLoader loaderB = new NoteInterpreterLoader(factory);
loaderB.setNoteId("noteB");
loaderB.setInterpreters(factory.getDefaultInterpreterSettingList());
loaderB.getInterpreterSettings().get(0).getOption().setPerNoteSession(true);
factory.setInterpreters("noteB", factory.getDefaultInterpreterSettingList());
factory.getInterpreterSettings("noteB").get(0).getOption().setPerNoteSession(true);
// interpreters are not created before accessing it
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteA"));
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteB"));
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("shared_process").get("noteA"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("shared_process").get("noteB"));
loaderA.get(null).open();
loaderB.get(null).open();
factory.getInterpreter("noteA", null).open();
factory.getInterpreter("noteB", null).open();
assertTrue(
loaderA.get(null).getInterpreterGroup().getId().equals(
loaderB.get(null).getInterpreterGroup().getId()));
factory.getInterpreter("noteA", null).getInterpreterGroup().getId().equals(
factory.getInterpreter("noteB", null).getInterpreterGroup().getId()));
// interpreters are created after accessing it
assertNotNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteA"));
assertNotNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteB"));
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("shared_process").get("noteA"));
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("shared_process").get("noteB"));
// when
loaderA.close();
loaderB.close();
factory.closeNote("noteA");
factory.closeNote("noteB");
// interpreters are destroyed after close
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteA"));
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteB"));
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("shared_process").get("noteA"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("shared_process").get("noteB"));
}
@Test
public void testNotePerInterpreterProcess() throws IOException {
NoteInterpreterLoader loaderA = new NoteInterpreterLoader(factory);
loaderA.setNoteId("noteA");
loaderA.setInterpreters(factory.getDefaultInterpreterSettingList());
loaderA.getInterpreterSettings().get(0).getOption().setPerNoteProcess(true);
factory.setInterpreters("noteA", factory.getDefaultInterpreterSettingList());
factory.getInterpreterSettings("noteA").get(0).getOption().setPerNoteProcess(true);
NoteInterpreterLoader loaderB = new NoteInterpreterLoader(factory);
loaderB.setNoteId("noteB");
loaderB.setInterpreters(factory.getDefaultInterpreterSettingList());
loaderB.getInterpreterSettings().get(0).getOption().setPerNoteProcess(true);
factory.setInterpreters("noteB", factory.getDefaultInterpreterSettingList());
factory.getInterpreterSettings("noteB").get(0).getOption().setPerNoteProcess(true);
// interpreters are not created before accessing it
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("noteA").get("noteA"));
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("noteB").get("noteB"));
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("noteA").get("noteA"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("noteB").get("noteB"));
loaderA.get(null).open();
loaderB.get(null).open();
factory.getInterpreter("noteA", null).open();
factory.getInterpreter("noteB", null).open();
// per note interpreter process
assertFalse(
loaderA.get(null).getInterpreterGroup().getId().equals(
loaderB.get(null).getInterpreterGroup().getId()));
factory.getInterpreter("noteA", null).getInterpreterGroup().getId().equals(
factory.getInterpreter("noteB", null).getInterpreterGroup().getId()));
// interpreters are created after accessing it
assertNotNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("noteA").get("noteA"));
assertNotNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("noteB").get("noteB"));
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("noteA").get("noteA"));
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("noteB").get("noteB"));
// when
loaderA.close();
loaderB.close();
factory.closeNote("noteA");
factory.closeNote("noteB");
// interpreters are destroyed after close
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("noteA").get("noteA"));
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("noteB").get("noteB"));
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("noteA").get("noteA"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("noteB").get("noteB"));
}

View file

@ -21,6 +21,7 @@ import com.google.common.base.Optional;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.scheduler.Scheduler;
@ -41,9 +42,6 @@ public class NoteTest {
@Mock
NotebookRepo repo;
@Mock
NoteInterpreterLoader replLoader;
@Mock
JobListenerFactory jobListenerFactory;
@ -62,39 +60,44 @@ public class NoteTest {
@Mock
NoteEventListener noteEventListener;
@Mock
InterpreterFactory interpreterFactory;
@Test
public void runNormalTest() {
when(replLoader.get("spark")).thenReturn(interpreter);
when(interpreterFactory.getInterpreter(anyString(), eq("spark"))).thenReturn(interpreter);
when(interpreter.getScheduler()).thenReturn(scheduler);
String pText = "%spark sc.version";
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
Paragraph p = note.addParagraph();
p.setText(pText);
note.run(p.getId());
ArgumentCaptor<Paragraph> pCaptor = ArgumentCaptor.forClass(Paragraph.class);
verify(scheduler, only()).submit(pCaptor.capture());
verify(replLoader, only()).get("spark");
verify(interpreterFactory, only()).getInterpreter(anyString(), eq("spark"));
assertEquals("Paragraph text", pText, pCaptor.getValue().getText());
}
@Test
public void runJdbcTest() {
when(replLoader.get("mysql")).thenReturn(null);
when(replLoader.get("jdbc")).thenReturn(interpreter);
when(interpreterFactory.getInterpreter(anyString(), eq("mysql"))).thenReturn(null);
when(interpreterFactory.getInterpreter(anyString(), eq("jdbc"))).thenReturn(interpreter);
when(interpreter.getScheduler()).thenReturn(scheduler);
String pText = "%mysql show databases";
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
Paragraph p = note.addParagraph();
p.setText(pText);
note.run(p.getId());
ArgumentCaptor<Paragraph> pCaptor = ArgumentCaptor.forClass(Paragraph.class);
verify(scheduler, only()).submit(pCaptor.capture());
verify(replLoader, times(2)).get(anyString());
verify(interpreterFactory, times(2)).getInterpreter(anyString(), anyString());
assertEquals("Change paragraph text", "%jdbc(mysql) show databases", pCaptor.getValue().getEffectiveText());
assertEquals("Change paragraph text", pText, pCaptor.getValue().getText());
@ -102,10 +105,10 @@ public class NoteTest {
@Test
public void putDefaultReplNameIfInterpreterSettingAbsent() {
when(replLoader.getDefaultInterpreterSetting())
.thenReturn(Optional.<InterpreterSetting>absent());
when(interpreterFactory.getDefaultInterpreterSetting(anyString()))
.thenReturn(null);
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
note.putDefaultReplName();
assertEquals(StringUtils.EMPTY, note.getLastReplName());
@ -116,10 +119,10 @@ public class NoteTest {
public void putDefaultReplNameIfInterpreterSettingPresent() {
InterpreterSetting interpreterSetting = Mockito.mock(InterpreterSetting.class);
when(interpreterSetting.getGroup()).thenReturn("spark");
when(replLoader.getDefaultInterpreterSetting())
.thenReturn(Optional.of(interpreterSetting));
when(interpreterFactory.getDefaultInterpreterSetting(anyString()))
.thenReturn(interpreterSetting);
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
note.putDefaultReplName();
assertEquals("spark", note.getLastReplName());
@ -130,10 +133,10 @@ public class NoteTest {
public void addParagraphWithLastReplName() {
InterpreterSetting interpreterSetting = Mockito.mock(InterpreterSetting.class);
when(interpreterSetting.getGroup()).thenReturn("spark");
when(replLoader.getDefaultInterpreterSetting())
.thenReturn(Optional.of(interpreterSetting));
when(interpreterFactory.getDefaultInterpreterSetting(anyString()))
.thenReturn(interpreterSetting);
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
note.putDefaultReplName(); //set lastReplName
Paragraph p = note.addParagraph();
@ -145,10 +148,10 @@ public class NoteTest {
public void insertParagraphWithLastReplName() {
InterpreterSetting interpreterSetting = Mockito.mock(InterpreterSetting.class);
when(interpreterSetting.getGroup()).thenReturn("spark");
when(replLoader.getDefaultInterpreterSetting())
.thenReturn(Optional.of(interpreterSetting));
when(interpreterFactory.getDefaultInterpreterSetting(anyString()))
.thenReturn(interpreterSetting);
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
note.putDefaultReplName(); //set lastReplName
Paragraph p = note.insertParagraph(note.getParagraphs().size());
@ -159,7 +162,7 @@ public class NoteTest {
@Test
public void setLastReplName() {
String paragraphId = "HelloWorld";
Note note = Mockito.spy(new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener));
Note note = Mockito.spy(new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener));
Paragraph mockParagraph = Mockito.mock(Paragraph.class);
when(note.getParagraph(paragraphId)).thenReturn(mockParagraph);
when(mockParagraph.getRequiredReplName()).thenReturn("spark");

View file

@ -106,7 +106,7 @@ public class NotebookTest implements JobListenerFactory{
@Test
public void testSelectingReplImplementation() throws IOException {
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
// run with defatul repl
Paragraph p1 = note.addParagraph();
@ -204,7 +204,7 @@ public class NotebookTest implements JobListenerFactory{
@Test
public void testRunAll() throws IOException {
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
// p1
Paragraph p1 = note.addParagraph();
@ -243,7 +243,7 @@ public class NotebookTest implements JobListenerFactory{
public void testSchedule() throws InterruptedException, IOException{
// create a note and a paragraph
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
Paragraph p = note.addParagraph();
Map config = new HashMap<String, Object>();
@ -275,7 +275,7 @@ public class NotebookTest implements JobListenerFactory{
public void testAutoRestartInterpreterAfterSchedule() throws InterruptedException, IOException{
// create a note and a paragraph
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
Paragraph p = note.addParagraph();
Map config = new HashMap<String, Object>();
@ -296,11 +296,11 @@ public class NotebookTest implements JobListenerFactory{
MockInterpreter1 mock1 = ((MockInterpreter1) (((ClassloaderInterpreter)
((LazyOpenInterpreter) note.getNoteReplLoader().get("mock1")).getInnerInterpreter())
((LazyOpenInterpreter) factory.getInterpreter(note.getId(), "mock1")).getInnerInterpreter())
.getInnerInterpreter()));
MockInterpreter2 mock2 = ((MockInterpreter2) (((ClassloaderInterpreter)
((LazyOpenInterpreter) note.getNoteReplLoader().get("mock2")).getInnerInterpreter())
((LazyOpenInterpreter) factory.getInterpreter(note.getId(), "mock2")).getInnerInterpreter())
.getInnerInterpreter()));
// wait until interpreters are started
@ -327,7 +327,7 @@ public class NotebookTest implements JobListenerFactory{
public void testExportAndImportNote() throws IOException, CloneNotSupportedException,
InterruptedException {
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
final Paragraph p = note.addParagraph();
String simpleText = "hello world";
@ -354,7 +354,7 @@ public class NotebookTest implements JobListenerFactory{
public void testCloneNote() throws IOException, CloneNotSupportedException,
InterruptedException {
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
final Paragraph p = note.addParagraph();
p.setText("hello world");
@ -376,7 +376,7 @@ public class NotebookTest implements JobListenerFactory{
public void testCloneNoteWithExceptionResult() throws IOException, CloneNotSupportedException,
InterruptedException {
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
final Paragraph p = note.addParagraph();
p.setText("hello world");
@ -399,7 +399,7 @@ public class NotebookTest implements JobListenerFactory{
@Test
public void testResourceRemovealOnParagraphNoteRemove() throws IOException {
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
}
@ -428,10 +428,10 @@ public class NotebookTest implements JobListenerFactory{
IOException {
// create a note and a paragraph
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
AngularObjectRegistry registry = note.getNoteReplLoader()
.getInterpreterSettings().get(0).getInterpreterGroup("sharedProcess")
AngularObjectRegistry registry = factory
.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup("sharedProcess")
.getAngularObjectRegistry();
Paragraph p1 = note.addParagraph();
@ -461,10 +461,10 @@ public class NotebookTest implements JobListenerFactory{
IOException {
// create a note and a paragraph
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
AngularObjectRegistry registry = note.getNoteReplLoader()
.getInterpreterSettings().get(0).getInterpreterGroup("sharedProcess")
AngularObjectRegistry registry = factory
.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup("sharedProcess")
.getAngularObjectRegistry();
Paragraph p1 = note.addParagraph();
@ -494,10 +494,10 @@ public class NotebookTest implements JobListenerFactory{
IOException {
// create a note and a paragraph
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
AngularObjectRegistry registry = note.getNoteReplLoader()
.getInterpreterSettings().get(0).getInterpreterGroup("sharedProcess")
AngularObjectRegistry registry = factory
.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup("sharedProcess")
.getAngularObjectRegistry();
// add local scope object
@ -506,9 +506,8 @@ public class NotebookTest implements JobListenerFactory{
registry.add("o2", "object2", null, null);
// restart interpreter
factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id());
registry = note.getNoteReplLoader()
.getInterpreterSettings().get(0).getInterpreterGroup("sharedProcess")
factory.restart(factory.getInterpreterSettings(note.getId()).get(0).id());
registry = factory.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup("sharedProcess")
.getAngularObjectRegistry();
// local and global scope object should be removed
@ -566,7 +565,7 @@ public class NotebookTest implements JobListenerFactory{
public void testAbortParagraphStatusOnInterpreterRestart() throws InterruptedException,
IOException {
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
ArrayList<Paragraph> paragraphs = new ArrayList<>();
for (int i = 0; i < 100; i++) {
@ -583,7 +582,7 @@ public class NotebookTest implements JobListenerFactory{
while (paragraphs.get(0).getStatus() != Status.FINISHED) Thread.yield();
factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id());
factory.restart(factory.getInterpreterSettings(note.getId()).get(0).id());
boolean isAborted = false;
for (Paragraph p : paragraphs) {
@ -607,7 +606,7 @@ public class NotebookTest implements JobListenerFactory{
p1.setText("getId");
// restart interpreter with per note session enabled
for (InterpreterSetting setting : note1.getNoteReplLoader().getInterpreterSettings()) {
for (InterpreterSetting setting : factory.getInterpreterSettings(note1.getId())) {
setting.getOption().setPerNoteSession(true);
notebook.getInterpreterFactory().restart(setting.id());
}
@ -652,7 +651,7 @@ public class NotebookTest implements JobListenerFactory{
// restart interpreter with per note session enabled
for (InterpreterSetting setting : note1.getNoteReplLoader().getInterpreterSettings()) {
for (InterpreterSetting setting : notebook.getInterpreterFactory().getInterpreterSettings(note1.getId())) {
setting.getOption().setPerNoteSession(true);
notebook.getInterpreterFactory().restart(setting.id());
}
@ -678,7 +677,7 @@ public class NotebookTest implements JobListenerFactory{
p1.setText("getId");
// restart interpreter with per note session enabled
for (InterpreterSetting setting : note1.getNoteReplLoader().getInterpreterSettings()) {
for (InterpreterSetting setting : factory.getInterpreterSettings(note1.getId())) {
setting.getOption().setPerNoteSession(true);
notebook.getInterpreterFactory().restart(setting.id());
}

View file

@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -28,6 +30,7 @@ import org.apache.zeppelin.display.AngularObjectBuilder;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.junit.Test;
import java.util.HashMap;
@ -72,17 +75,19 @@ public class ParagraphTest {
@Test
public void effectiveTextTest() {
NoteInterpreterLoader noteInterpreterLoader = mock(NoteInterpreterLoader.class);
InterpreterFactory interpreterFactory = mock(InterpreterFactory.class);
Interpreter interpreter = mock(Interpreter.class);
Note note = mock(Note.class);
Paragraph p = new Paragraph(null, null, null, noteInterpreterLoader);
Paragraph p = new Paragraph("paragraph", note, null, interpreterFactory);
p.setText("%h2 show databases");
p.setEffectiveText("%jdbc(h2) show databases");
assertEquals("Get right replName", "jdbc", p.getRequiredReplName());
assertEquals("Get right scriptBody", "(h2) show databases", p.getScriptBody());
when(noteInterpreterLoader.get("jdbc")).thenReturn(interpreter);
when(interpreterFactory.getInterpreter(anyString(), eq("jdbc"))).thenReturn(interpreter);
when(interpreter.getFormType()).thenReturn(Interpreter.FormType.NATIVE);
when(note.getId()).thenReturn("noteId");
try {
p.jobRun();

View file

@ -107,7 +107,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
@Test
public void testSaveNotebook() throws IOException, InterruptedException {
Note note = notebook.createNote(null);
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
Paragraph p1 = note.addParagraph();
Map<String, Object> config = p1.getConfig();

View file

@ -25,9 +25,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInterpreterLoader;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.junit.After;
@ -36,21 +35,20 @@ import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
public class LuceneSearchTest {
private static NoteInterpreterLoader replLoaderMock;
private static NotebookRepo notebookRepoMock;
private static InterpreterFactory interpreterFactory;
private SearchService notebookIndex;
@BeforeClass
public static void beforeStartUp() {
notebookRepoMock = mock(NotebookRepo.class);
replLoaderMock = mock(NoteInterpreterLoader.class);
interpreterFactory = mock(InterpreterFactory.class);
when(replLoaderMock.getInterpreterSettings())
.thenReturn(ImmutableList.<InterpreterSetting>of());
// when(replLoaderMock.getInterpreterSettings())
// .thenReturn(ImmutableList.<InterpreterSetting>of());
}
@Before
@ -286,7 +284,7 @@ public class LuceneSearchTest {
}
private Note newNote(String name) {
Note note = new Note(notebookRepoMock, replLoaderMock, null, notebookIndex, null, null);
Note note = new Note(notebookRepoMock, interpreterFactory, null, notebookIndex, null, null);
note.setName(name);
return note;
}