mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' into ZEPPELIN-732-up
This commit is contained in:
commit
0665380469
28 changed files with 576 additions and 579 deletions
BIN
docs/assets/themes/zeppelin/img/docs-img/shell-example.png
Normal file
BIN
docs/assets/themes/zeppelin/img/docs-img/shell-example.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 41 KiB |
20
docs/interpreter/shell.md
Normal file
20
docs/interpreter/shell.md
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
---
|
||||
layout: page
|
||||
title: "Shell Interpreter"
|
||||
description: "Shell Interpreter"
|
||||
group: manual
|
||||
---
|
||||
{% include JB/setup %}
|
||||
|
||||
## Shell interpreter for Apache Zeppelin
|
||||
|
||||
### Overview
|
||||
Shell interpreter uses [Apache Commons Exec](https://commons.apache.org/proper/commons-exec) to execute external processes.
|
||||
|
||||
In Zeppelin notebook, you can use ` %sh ` in the beginning of a paragraph to invoke system shell and run commands.
|
||||
Note: Currently each command runs as Zeppelin user.
|
||||
|
||||
### Example
|
||||
The following example demonstrates the basic usage of Shell in a Zeppelin notebook.
|
||||
|
||||
<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/shell-example.png" width="70%" />
|
||||
|
|
@ -96,7 +96,7 @@ public class LivyHelper {
|
|||
}.getType());
|
||||
if (jsonMap.get("state").equals("idle")) {
|
||||
break;
|
||||
} else if (jsonMap.get("state").equals("error")) {
|
||||
} else if (jsonMap.get("state").equals("error") || jsonMap.get("state").equals("dead")) {
|
||||
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
|
||||
sessionId + "/log",
|
||||
"GET", null,
|
||||
|
|
@ -124,7 +124,7 @@ public class LivyHelper {
|
|||
|
||||
protected void initializeSpark(final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap) throws Exception {
|
||||
interpret("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
|
||||
interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" +
|
||||
"import sqlContext.implicits._", context, userSessionMap);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
line.replaceAll("\"", "\\\\\"")
|
||||
.replaceAll("\\n", " ")
|
||||
+ "\").show(" +
|
||||
property.get("livy.spark.sql.maxResult") + ")",
|
||||
property.get("zeppelin.livy.spark.sql.maxResult") + ")",
|
||||
interpreterContext, userSessionMap);
|
||||
|
||||
if (res.code() == InterpreterResult.Code.SUCCESS) {
|
||||
|
|
@ -123,6 +123,10 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean concurrentSQL() {
|
||||
return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
|
|
@ -140,8 +144,19 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode());
|
||||
if (concurrentSQL()) {
|
||||
int maxConcurrency = 10;
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency);
|
||||
} else {
|
||||
Interpreter intp =
|
||||
getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
|
||||
if (intp != null) {
|
||||
return intp.getScheduler();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -88,6 +88,11 @@
|
|||
"propertyName": "zeppelin.livy.spark.sql.maxResult",
|
||||
"defaultValue": "1000",
|
||||
"description": "Max number of SparkSQL result to display."
|
||||
},
|
||||
"zeppelin.livy.concurrentSQL": {
|
||||
"propertyName": "zeppelin.livy.concurrentSQL",
|
||||
"defaultValue": "false",
|
||||
"description": "Execute multiple SQL concurrently if set true."
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
|
|||
|
|
@ -19,21 +19,23 @@ package org.apache.zeppelin.shell;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.io.OutputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -43,14 +45,11 @@ import org.slf4j.LoggerFactory;
|
|||
* Shell interpreter for Zeppelin.
|
||||
*/
|
||||
public class ShellInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
private static final String EXECUTOR_KEY = "executor";
|
||||
public static final String SHELL_COMMAND_TIMEOUT = "shell.command.timeout.millisecs";
|
||||
int commandTimeOut;
|
||||
private static final boolean isWindows = System
|
||||
.getProperty("os.name")
|
||||
.startsWith("Windows");
|
||||
final String shell = isWindows ? "cmd /c" : "bash -c";
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
private static final String TIMEOUT_PROPERTY = "shell.command.timeout.millisecs";
|
||||
private final boolean isWindows = System.getProperty("os.name").startsWith("Windows");
|
||||
private final String shell = isWindows ? "cmd /c" : "bash -c";
|
||||
private Map<String, DefaultExecutor> executors;
|
||||
|
||||
public ShellInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -58,9 +57,8 @@ public class ShellInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void open() {
|
||||
logger.info("Command timeout is set as:", SHELL_COMMAND_TIMEOUT);
|
||||
|
||||
commandTimeOut = Integer.valueOf(getProperty(SHELL_COMMAND_TIMEOUT));
|
||||
LOGGER.info("Command timeout property: {}", TIMEOUT_PROPERTY);
|
||||
executors = new HashMap<String, DefaultExecutor>();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -69,7 +67,10 @@ public class ShellInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.debug("Run shell command '" + cmd + "'");
|
||||
LOGGER.debug("Run shell command '" + cmd + "'");
|
||||
OutputStream outStream = new ByteArrayOutputStream();
|
||||
OutputStream errStream = new ByteArrayOutputStream();
|
||||
|
||||
CommandLine cmdLine = CommandLine.parse(shell);
|
||||
// the Windows CMD shell doesn't handle multiline statements,
|
||||
// they need to be delimited by '&&' instead
|
||||
|
|
@ -78,62 +79,45 @@ public class ShellInterpreter extends Interpreter {
|
|||
cmd = StringUtils.join(lines, " && ");
|
||||
}
|
||||
cmdLine.addArgument(cmd, false);
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(contextInterpreter.out, errorStream));
|
||||
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
|
||||
|
||||
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
|
||||
Map<String, Object> info = runningJob.info();
|
||||
info.put(EXECUTOR_KEY, executor);
|
||||
try {
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outStream, errStream));
|
||||
executor.setWatchdog(new ExecuteWatchdog(Long.valueOf(getProperty(TIMEOUT_PROPERTY))));
|
||||
executors.put(contextInterpreter.getParagraphId(), executor);
|
||||
int exitVal = executor.execute(cmdLine);
|
||||
logger.info("Paragraph " + contextInterpreter.getParagraphId()
|
||||
+ "return with exit value: " + exitVal);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, null);
|
||||
LOGGER.info("Paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " return with exit value: " + exitVal);
|
||||
return new InterpreterResult(Code.SUCCESS, outStream.toString());
|
||||
} catch (ExecuteException e) {
|
||||
int exitValue = e.getExitValue();
|
||||
logger.error("Can not run " + cmd, e);
|
||||
LOGGER.error("Can not run " + cmd, e);
|
||||
Code code = Code.ERROR;
|
||||
String msg = errorStream.toString();
|
||||
String message = errStream.toString();
|
||||
if (exitValue == 143) {
|
||||
code = Code.INCOMPLETE;
|
||||
msg = msg + "Paragraph received a SIGTERM.\n";
|
||||
logger.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " stopped executing: " + msg);
|
||||
message += "Paragraph received a SIGTERM.\n";
|
||||
LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " stopped executing: " + message);
|
||||
}
|
||||
msg += "ExitValue: " + exitValue;
|
||||
return new InterpreterResult(code, msg);
|
||||
message += "ExitValue: " + exitValue;
|
||||
return new InterpreterResult(code, message);
|
||||
} catch (IOException e) {
|
||||
logger.error("Can not run " + cmd, e);
|
||||
LOGGER.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private Job getRunningJob(String paragraphId) {
|
||||
Job foundJob = null;
|
||||
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
|
||||
for (Job job : jobsRunning) {
|
||||
if (job.getId().equals(paragraphId)) {
|
||||
foundJob = job;
|
||||
}
|
||||
}
|
||||
return foundJob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
if (runningJob != null) {
|
||||
Map<String, Object> info = runningJob.info();
|
||||
Object object = info.get(EXECUTOR_KEY);
|
||||
if (object != null) {
|
||||
Executor executor = (Executor) object;
|
||||
ExecuteWatchdog watchdog = executor.getWatchdog();
|
||||
watchdog.destroyProcess();
|
||||
for (String paragraphId : executors.keySet()) {
|
||||
if (paragraphId.equals(context.getParagraphId())) {
|
||||
DefaultExecutor executor = executors.get(paragraphId);
|
||||
executor.getWatchdog().destroyProcess();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.shell;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ShellInterpreterTest {
|
||||
|
||||
private ShellInterpreter shell;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Properties p = new Properties();
|
||||
p.setProperty("shell.command.timeout.millisecs", "60000");
|
||||
shell = new ShellInterpreter(p);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
shell.open();
|
||||
InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null);
|
||||
InterpreterResult result = new InterpreterResult(Code.ERROR);
|
||||
if (System.getProperty("os.name").startsWith("Windows")) {
|
||||
result = shell.interpret("dir", context);
|
||||
} else {
|
||||
result = shell.interpret("ls", context);
|
||||
}
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -309,7 +309,7 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
Notebook notebook = notebook();
|
||||
List<Note> notes = notebook.getAllNotes();
|
||||
for (Note note : notes) {
|
||||
List<String> ids = note.getNoteReplLoader().getInterpreters();
|
||||
List<String> ids = notebook.getInterpreterFactory().getInterpreters(note.getId());
|
||||
for (String id : ids) {
|
||||
if (id.equals(interpreterGroupId)) {
|
||||
broadcast(note.id(), m);
|
||||
|
|
@ -752,10 +752,16 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
// propagate change to (Remote) AngularObjectRegistry
|
||||
Note note = notebook.getNote(noteId);
|
||||
if (note != null) {
|
||||
Collection<InterpreterGroup> interpreterGroups = InterpreterGroup.getAll();
|
||||
for (InterpreterGroup interpreterGroup : interpreterGroups) {
|
||||
if (interpreterGroupId.equals(interpreterGroup.getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
|
||||
List<InterpreterSetting> settings = notebook.getInterpreterFactory()
|
||||
.getInterpreterSettings(note.getId());
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getInterpreterGroup(note.id()) == null) {
|
||||
continue;
|
||||
}
|
||||
if (interpreterGroupId.equals(setting.getInterpreterGroup(note.id()).getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry = setting
|
||||
.getInterpreterGroup(note.id()).getAngularObjectRegistry();
|
||||
|
||||
// first trying to get local registry
|
||||
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
|
||||
if (ao == null) {
|
||||
|
|
@ -788,8 +794,8 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
if (global) { // broadcast change to all web session that uses related
|
||||
// interpreter.
|
||||
for (Note n : notebook.getAllNotes()) {
|
||||
List<InterpreterSetting> settings = note.getNoteReplLoader()
|
||||
.getInterpreterSettings();
|
||||
List<InterpreterSetting> settings = notebook.getInterpreterFactory()
|
||||
.getInterpreterSettings(note.getId());
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getInterpreterGroup(n.id()) == null) {
|
||||
continue;
|
||||
|
|
@ -1288,8 +1294,8 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
}
|
||||
|
||||
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
|
||||
List<InterpreterSetting> settings = note.getNoteReplLoader()
|
||||
.getInterpreterSettings();
|
||||
List<InterpreterSetting> settings =
|
||||
notebook().getInterpreterFactory().getInterpreterSettings(note.getId());
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -1328,8 +1334,8 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
continue;
|
||||
}
|
||||
|
||||
List<InterpreterSetting> intpSettings = note.getNoteReplLoader()
|
||||
.getInterpreterSettings();
|
||||
List<InterpreterSetting> intpSettings = notebook.getInterpreterFactory()
|
||||
.getInterpreterSettings(note.getId());
|
||||
if (intpSettings.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
|
@ -1353,10 +1359,15 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
continue;
|
||||
}
|
||||
|
||||
broadcast(
|
||||
note.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
|
||||
"noteId", noteId).put("paragraphId", paragraphId));
|
||||
List<String> ids = notebook.getInterpreterFactory().getInterpreters(note.getId());
|
||||
for (String id : ids) {
|
||||
if (id.equals(interpreterGroupId)) {
|
||||
broadcast(
|
||||
note.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
|
||||
"noteId", noteId).put("paragraphId", paragraphId));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -105,20 +105,12 @@ abstract public class AbstractZeppelinIT {
|
|||
}
|
||||
|
||||
protected void createNewNote() {
|
||||
List<WebElement> notebookLinks = driver.findElements(By
|
||||
.xpath("//div[contains(@class, \"col-md-4\")]/div/ul/li"));
|
||||
List<String> notebookTitles = new LinkedList<String>();
|
||||
for (WebElement el : notebookLinks) {
|
||||
notebookTitles.add(el.getText());
|
||||
}
|
||||
|
||||
WebElement createNoteLink = driver.findElement(By.xpath("//div[contains(@class, \"col-md-4\")]/div/h5/a[contains(.,'Create new note')]"));
|
||||
createNoteLink.click();
|
||||
clickAndWait(By.xpath("//div[contains(@class, \"col-md-4\")]/div/h5/a[contains(.,'Create new" +
|
||||
" note')]"));
|
||||
|
||||
WebDriverWait block = new WebDriverWait(driver, MAX_BROWSER_TIMEOUT_SEC);
|
||||
WebElement modal = block.until(ExpectedConditions.visibilityOfElementLocated(By.id("noteNameModal")));
|
||||
WebElement createNoteButton = modal.findElement(By.id("createNoteButton"));
|
||||
createNoteButton.click();
|
||||
block.until(ExpectedConditions.visibilityOfElementLocated(By.id("noteNameModal")));
|
||||
clickAndWait(By.id("createNoteButton"));
|
||||
|
||||
try {
|
||||
Thread.sleep(500); // wait for notebook list updated
|
||||
|
|
@ -136,7 +128,7 @@ abstract public class AbstractZeppelinIT {
|
|||
}
|
||||
|
||||
protected void clickAndWait(final By locator) {
|
||||
driver.findElement(locator).click();
|
||||
pollingWait(locator, MAX_IMPLICIT_WAIT).click();
|
||||
ZeppelinITUtils.sleep(1000, true);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import org.junit.Test;
|
|||
import org.junit.rules.ErrorCollector;
|
||||
import org.openqa.selenium.By;
|
||||
import org.openqa.selenium.Keys;
|
||||
import org.openqa.selenium.TimeoutException;
|
||||
import org.openqa.selenium.WebElement;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -210,8 +211,16 @@ public class ZeppelinIT extends AbstractZeppelinIT {
|
|||
String artifact = "org.apache.commons:commons-csv:1.1";
|
||||
depArtifact.sendKeys(artifact);
|
||||
driver.findElement(By.xpath("//div[@id='spark']//form//button[1]")).click();
|
||||
driver.findElement(By.xpath("//div[@class='modal-dialog'][contains(.,'Do you want to update this interpreter and restart with new settings?')]" +
|
||||
"//div[@class='modal-footer']//button[contains(.,'OK')]")).click();
|
||||
clickAndWait(By.xpath("//div[@class='modal-dialog'][contains(.,'Do you want to update this interpreter and restart with new settings?')]" +
|
||||
"//div[@class='modal-footer']//button[contains(.,'OK')]"));
|
||||
|
||||
try {
|
||||
clickAndWait(By.xpath("//div[@class='modal-dialog'][contains(.,'Do you want to " +
|
||||
"update this interpreter and restart with new settings?')]//" +
|
||||
"div[@class='bootstrap-dialog-close-button']/button"));
|
||||
} catch (TimeoutException e) {
|
||||
//Modal dialog got closed earlier than expected nothing to worry.
|
||||
}
|
||||
|
||||
driver.navigate().back();
|
||||
createNewNote();
|
||||
|
|
|
|||
|
|
@ -159,7 +159,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
|
|||
|
||||
|
||||
// restart interpreter
|
||||
for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) {
|
||||
for (InterpreterSetting setting : ZeppelinServer.notebook.getInterpreterFactory().getInterpreterSettings(note.getId())) {
|
||||
if (setting.getName().equals("md")) {
|
||||
// Call Restart Interpreter REST API
|
||||
PutMethod put = httpPut("/interpreter/setting/restart/" + setting.id(), "");
|
||||
|
|
|
|||
|
|
@ -93,7 +93,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
|
|||
|
||||
// get reference to interpreterGroup
|
||||
InterpreterGroup interpreterGroup = null;
|
||||
List<InterpreterSetting> settings = note1.getNoteReplLoader().getInterpreterSettings();
|
||||
List<InterpreterSetting> settings = notebook.getInterpreterFactory().getInterpreterSettings(note1.getId());
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getName().equals("md")) {
|
||||
interpreterGroup = setting.getInterpreterGroup("sharedProcess");
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ limitations under the License.
|
|||
</div>
|
||||
</div>
|
||||
|
||||
<div class="box width-full home">
|
||||
<div class="box width-full">
|
||||
<div>
|
||||
<div class="row configuration">
|
||||
<div class="col-md-12">
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ limitations under the License.
|
|||
</div>
|
||||
</div>
|
||||
|
||||
<div class="box width-full home"
|
||||
<div class="box width-full"
|
||||
>
|
||||
<div>
|
||||
<div class="row interpreter">
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ angular.module('zeppelinWebApp').controller('InterpreterCtrl', function($scope,
|
|||
};
|
||||
|
||||
$scope.updateInterpreterSetting = function(form, settingId) {
|
||||
BootstrapDialog.confirm({
|
||||
var thisConfirm = BootstrapDialog.confirm({
|
||||
closable: true,
|
||||
title: '',
|
||||
message: 'Do you want to update this interpreter and restart with new settings?',
|
||||
|
|
@ -133,16 +133,23 @@ angular.module('zeppelinWebApp').controller('InterpreterCtrl', function($scope,
|
|||
dependencies: angular.copy(setting.dependencies)
|
||||
};
|
||||
|
||||
$http.put(baseUrlSrv.getRestApiBase() + '/interpreter/setting/' + settingId, request).
|
||||
success(function (data, status, headers, config) {
|
||||
thisConfirm.$modalFooter.find('button').addClass('disabled');
|
||||
thisConfirm.$modalFooter.find('button:contains("OK")')
|
||||
.html('<i class="fa fa-circle-o-notch fa-spin"></i> Saving Setting');
|
||||
|
||||
$http.put(baseUrlSrv.getRestApiBase() + '/interpreter/setting/' + settingId, request)
|
||||
.success(function(data, status, headers, config) {
|
||||
$scope.interpreterSettings[index] = data.body;
|
||||
removeTMPSettings(index);
|
||||
}).
|
||||
error(function (data, status, headers, config) {
|
||||
thisConfirm.close();
|
||||
})
|
||||
.error(function(data, status, headers, config) {
|
||||
console.log('Error %o %o', status, data.message);
|
||||
ngToast.danger({content: data.message, verticalPosition: 'bottom'});
|
||||
form.$show();
|
||||
thisConfirm.close();
|
||||
});
|
||||
return false;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ limitations under the License.
|
|||
<div ng-include src="'app/interpreter/interpreter-create/interpreter-create.html'"></div>
|
||||
</div>
|
||||
|
||||
<div class="box width-full home"
|
||||
<div class="box width-full"
|
||||
ng-repeat="setting in interpreterSettings | orderBy: 'group' | filter: searchInterpreter">
|
||||
<div id="{{setting.name | lowercase}}">
|
||||
<div class="row interpreter">
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
||||
import org.apache.zeppelin.notebook.NoteInterpreterLoader;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -63,27 +62,28 @@ import java.util.*;
|
|||
* Manage interpreters.
|
||||
*/
|
||||
public class InterpreterFactory implements InterpreterGroupFactory {
|
||||
Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
|
||||
private static Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
|
||||
|
||||
private Map<String, URLClassLoader> cleanCl = Collections
|
||||
.synchronizedMap(new HashMap<String, URLClassLoader>());
|
||||
private static final String SHARED_SESSION = "shared_session";
|
||||
|
||||
private Map<String, URLClassLoader> cleanCl =
|
||||
Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
|
||||
|
||||
private ZeppelinConfiguration conf;
|
||||
@Deprecated
|
||||
String[] interpreterClassList;
|
||||
String[] interpreterGroupOrderList;
|
||||
private String[] interpreterClassList;
|
||||
private String[] interpreterGroupOrderList;
|
||||
|
||||
private Map<String, InterpreterSetting> interpreterSettings =
|
||||
new HashMap<String, InterpreterSetting>();
|
||||
private Map<String, InterpreterSetting> interpreterSettings = new HashMap<>();
|
||||
|
||||
private Map<String, List<String>> interpreterBindings = new HashMap<String, List<String>>();
|
||||
private Map<String, List<String>> interpreterBindings = new HashMap<>();
|
||||
private List<RemoteRepository> interpreterRepositories;
|
||||
|
||||
private Gson gson;
|
||||
|
||||
private InterpreterOption defaultOption;
|
||||
|
||||
AngularObjectRegistryListener angularObjectRegistryListener;
|
||||
private AngularObjectRegistryListener angularObjectRegistryListener;
|
||||
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
|
||||
private final ApplicationEventListener appEventListener;
|
||||
|
||||
|
|
@ -241,7 +241,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
private void registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
|
||||
String interpreterJson)
|
||||
String interpreterJson)
|
||||
throws MalformedURLException {
|
||||
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
|
||||
ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
|
||||
|
|
@ -257,7 +257,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
private void registerInterpreterFromPath(String interpreterDir,
|
||||
String interpreterJson) throws IOException {
|
||||
String interpreterJson) throws IOException {
|
||||
|
||||
Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson);
|
||||
if (Files.exists(interpreterJsonPath)) {
|
||||
|
|
@ -280,11 +280,11 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
|
||||
String absolutePath) {
|
||||
String absolutePath) {
|
||||
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
|
||||
String className = registeredInterpreter.getClassName();
|
||||
if (validateRegisterInterpreter(registeredInterpreter) &&
|
||||
null == Interpreter.findRegisteredInterpreterByClassName(className)) {
|
||||
null == Interpreter.findRegisteredInterpreterByClassName(className)) {
|
||||
registeredInterpreter.setPath(absolutePath);
|
||||
Interpreter.register(registeredInterpreter);
|
||||
logger.debug("Registered. key: {}, className: {}, path: {}",
|
||||
|
|
@ -296,7 +296,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
|
||||
private boolean validateRegisterInterpreter(RegisteredInterpreter registeredInterpreter) {
|
||||
return null != registeredInterpreter.getGroup() && null != registeredInterpreter.getName() &&
|
||||
null != registeredInterpreter.getClassName();
|
||||
null != registeredInterpreter.getClassName();
|
||||
}
|
||||
|
||||
private void loadFromFile() throws IOException {
|
||||
|
|
@ -334,14 +334,9 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
// previously created setting should turn this feature on here.
|
||||
setting.getOption().setRemote(true);
|
||||
|
||||
InterpreterSetting intpSetting = new InterpreterSetting(
|
||||
setting.id(),
|
||||
setting.getName(),
|
||||
setting.getGroup(),
|
||||
setting.getInterpreterInfos(),
|
||||
setting.getProperties(),
|
||||
setting.getDependencies(),
|
||||
setting.getOption());
|
||||
InterpreterSetting intpSetting = new InterpreterSetting(setting.id(), setting.getName(),
|
||||
setting.getGroup(), setting.getInterpreterInfos(), setting.getProperties(),
|
||||
setting.getDependencies(), setting.getOption());
|
||||
|
||||
intpSetting.setInterpreterGroupFactory(this);
|
||||
interpreterSettings.put(k, intpSetting);
|
||||
|
|
@ -369,18 +364,14 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
// load dependencies
|
||||
List<Dependency> deps = intSetting.getDependencies();
|
||||
if (deps != null) {
|
||||
for (Dependency d: deps) {
|
||||
for (Dependency d : deps) {
|
||||
File destDir = new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
|
||||
|
||||
if (d.getExclusions() != null) {
|
||||
depResolver.load(
|
||||
d.getGroupArtifactVersion(),
|
||||
d.getExclusions(),
|
||||
depResolver.load(d.getGroupArtifactVersion(), d.getExclusions(),
|
||||
new File(destDir, intSetting.id()));
|
||||
} else {
|
||||
depResolver.load(
|
||||
d.getGroupArtifactVersion(),
|
||||
new File(destDir, intSetting.id()));
|
||||
depResolver.load(d.getGroupArtifactVersion(), new File(destDir, intSetting.id()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -410,29 +401,19 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
fos.close();
|
||||
}
|
||||
|
||||
private RegisteredInterpreter getRegisteredReplInfoFromClassName(String clsName) {
|
||||
Set<String> keys = Interpreter.registeredInterpreters.keySet();
|
||||
for (String intName : keys) {
|
||||
RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
|
||||
if (clsName.equals(info.getClassName())) {
|
||||
return info;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return ordered interpreter setting list.
|
||||
* The list does not contain more than one setting from the same interpreter class.
|
||||
* Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public List<String> getDefaultInterpreterSettingList() {
|
||||
// this list will contain default interpreter setting list
|
||||
List<String> defaultSettings = new LinkedList<String>();
|
||||
List<String> defaultSettings = new LinkedList<>();
|
||||
|
||||
// to ignore the same interpreter group
|
||||
Map<String, Boolean> interpreterGroupCheck = new HashMap<String, Boolean>();
|
||||
Map<String, Boolean> interpreterGroupCheck = new HashMap<>();
|
||||
|
||||
List<InterpreterSetting> sortedSettings = get();
|
||||
|
||||
|
|
@ -454,15 +435,14 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param name user defined name
|
||||
* @param groupName interpreter group name to instantiate
|
||||
* @param name user defined name
|
||||
* @param groupName interpreter group name to instantiate
|
||||
* @param properties
|
||||
* @return
|
||||
* @throws InterpreterException
|
||||
* @throws IOException
|
||||
*/
|
||||
public InterpreterSetting add(String name, String groupName,
|
||||
List<Dependency> dependencies,
|
||||
public InterpreterSetting add(String name, String groupName, List<Dependency> dependencies,
|
||||
InterpreterOption option, Properties properties)
|
||||
throws InterpreterException, IOException, RepositoryException {
|
||||
synchronized (interpreterSettings) {
|
||||
|
|
@ -482,13 +462,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
InterpreterSetting intpSetting = new InterpreterSetting(
|
||||
name,
|
||||
groupName,
|
||||
interpreterInfos,
|
||||
properties,
|
||||
dependencies,
|
||||
option);
|
||||
InterpreterSetting intpSetting = new InterpreterSetting(name, groupName, interpreterInfos,
|
||||
properties, dependencies, option);
|
||||
|
||||
if (dependencies.size() > 0) {
|
||||
loadInterpreterDependencies(intpSetting);
|
||||
|
|
@ -513,15 +488,10 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup(id);
|
||||
if (option.isRemote()) {
|
||||
angularObjectRegistry = new RemoteAngularObjectRegistry(
|
||||
id,
|
||||
angularObjectRegistryListener,
|
||||
interpreterGroup
|
||||
);
|
||||
angularObjectRegistry = new RemoteAngularObjectRegistry(id, angularObjectRegistryListener,
|
||||
interpreterGroup);
|
||||
} else {
|
||||
angularObjectRegistry = new AngularObjectRegistry(
|
||||
id,
|
||||
angularObjectRegistryListener);
|
||||
angularObjectRegistry = new AngularObjectRegistry(id, angularObjectRegistryListener);
|
||||
|
||||
// TODO(moon) : create distributed resource pool for local interpreters and set
|
||||
}
|
||||
|
|
@ -530,8 +500,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
return interpreterGroup;
|
||||
}
|
||||
|
||||
public void removeInterpretersForNote(InterpreterSetting interpreterSetting,
|
||||
String noteId) {
|
||||
public void removeInterpretersForNote(InterpreterSetting interpreterSetting, String noteId) {
|
||||
if (interpreterSetting.getOption().isPerNoteProcess()) {
|
||||
interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
|
||||
} else if (interpreterSetting.getOption().isPerNoteSession()) {
|
||||
|
|
@ -549,9 +518,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
public void createInterpretersForNote(
|
||||
InterpreterSetting interpreterSetting,
|
||||
String noteId,
|
||||
public void createInterpretersForNote(InterpreterSetting interpreterSetting, String noteId,
|
||||
String key) {
|
||||
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(noteId);
|
||||
String groupName = interpreterSetting.getGroup();
|
||||
|
|
@ -568,10 +535,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
// in ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT msec. However, if termination of the process and
|
||||
// removal from interpreter group take too long, throw an error.
|
||||
long minTimeout = 10L * 1000 * 1000000; // 10 sec
|
||||
long interpreterRemovalWaitTimeout =
|
||||
Math.max(
|
||||
minTimeout,
|
||||
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
|
||||
long interpreterRemovalWaitTimeout = Math.max(minTimeout,
|
||||
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
|
||||
while (interpreterGroup.containsKey(key)) {
|
||||
if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) {
|
||||
throw new InterpreterException("Can not create interpreter");
|
||||
|
|
@ -606,15 +571,13 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
interpreterSetting.id());
|
||||
}
|
||||
} else {
|
||||
intp = createRepl(info.getPath(),
|
||||
info.getClassName(),
|
||||
properties);
|
||||
intp = createRepl(info.getPath(), info.getClassName(), properties);
|
||||
}
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(key);
|
||||
if (interpreters == null) {
|
||||
interpreters = new LinkedList<Interpreter>();
|
||||
interpreters = new LinkedList<>();
|
||||
interpreterGroup.put(key, interpreters);
|
||||
}
|
||||
if (info.isDefaultInterpreter()) {
|
||||
|
|
@ -630,7 +593,6 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
|
||||
|
||||
public void remove(String id) throws IOException {
|
||||
synchronized (interpreterSettings) {
|
||||
if (interpreterSettings.containsKey(id)) {
|
||||
|
|
@ -657,11 +619,12 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
|
||||
/**
|
||||
* Get interpreter settings
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public List<InterpreterSetting> get() {
|
||||
synchronized (interpreterSettings) {
|
||||
List<InterpreterSetting> orderedSettings = new LinkedList<InterpreterSetting>();
|
||||
List<InterpreterSetting> orderedSettings = new LinkedList<>();
|
||||
|
||||
Map<String, List<InterpreterSetting>> groupNameInterpreterSettingMap = new HashMap<>();
|
||||
for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
|
||||
|
|
@ -710,9 +673,9 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
public void putNoteInterpreterSettingBinding(String noteId,
|
||||
List<String> settingList) throws IOException {
|
||||
List<String> unBindedSettings = new LinkedList<String>();
|
||||
private void putNoteInterpreterSettingBinding(String noteId, List<String> settingList)
|
||||
throws IOException {
|
||||
List<String> unBindedSettings = new LinkedList<>();
|
||||
|
||||
synchronized (interpreterSettings) {
|
||||
List<String> oldSettings = interpreterBindings.get(noteId);
|
||||
|
|
@ -743,8 +706,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
public List<String> getNoteInterpreterSettingBinding(String noteId) {
|
||||
LinkedList<String> bindings = new LinkedList<String>();
|
||||
private List<String> getNoteInterpreterSettingBinding(String noteId) {
|
||||
LinkedList<String> bindings = new LinkedList<>();
|
||||
synchronized (interpreterSettings) {
|
||||
List<String> settingIds = interpreterBindings.get(noteId);
|
||||
if (settingIds != null) {
|
||||
|
|
@ -756,14 +719,13 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
|
||||
/**
|
||||
* Change interpreter property and restart
|
||||
*
|
||||
* @param id
|
||||
* @param option
|
||||
* @param properties
|
||||
* @throws IOException
|
||||
*/
|
||||
public void setPropertyAndRestart(String id,
|
||||
InterpreterOption option,
|
||||
Properties properties,
|
||||
public void setPropertyAndRestart(String id, InterpreterOption option, Properties properties,
|
||||
List<Dependency> dependencies) throws IOException, RepositoryException {
|
||||
synchronized (interpreterSettings) {
|
||||
InterpreterSetting intpsetting = interpreterSettings.get(id);
|
||||
|
|
@ -824,7 +786,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
public void close() {
|
||||
List<Thread> closeThreads = new LinkedList<Thread>();
|
||||
List<Thread> closeThreads = new LinkedList<>();
|
||||
synchronized (interpreterSettings) {
|
||||
Collection<InterpreterSetting> intpsettings = interpreterSettings.values();
|
||||
for (final InterpreterSetting intpsetting : intpsettings) {
|
||||
|
|
@ -860,7 +822,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
URLClassLoader ccl = cleanCl.get(dirName);
|
||||
if (ccl == null) {
|
||||
// classloader fallback
|
||||
ccl = URLClassLoader.newInstance(new URL[] {}, oldcl);
|
||||
ccl = URLClassLoader.newInstance(new URL[]{}, oldcl);
|
||||
}
|
||||
|
||||
boolean separateCL = true;
|
||||
|
|
@ -870,13 +832,13 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
separateCL = false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("exception checking server classloader driver" , e);
|
||||
logger.error("exception checking server classloader driver", e);
|
||||
}
|
||||
|
||||
URLClassLoader cl;
|
||||
|
||||
if (separateCL == true) {
|
||||
cl = URLClassLoader.newInstance(new URL[] {}, ccl);
|
||||
cl = URLClassLoader.newInstance(new URL[]{}, ccl);
|
||||
} else {
|
||||
cl = ccl;
|
||||
}
|
||||
|
|
@ -884,7 +846,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
|
||||
Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
|
||||
Constructor<Interpreter> constructor =
|
||||
replClass.getConstructor(new Class[] {Properties.class});
|
||||
replClass.getConstructor(new Class[]{Properties.class});
|
||||
Interpreter repl = constructor.newInstance(property);
|
||||
repl.setClassloaderUrls(ccl.getURLs());
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(
|
||||
|
|
@ -938,6 +900,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
|
||||
updatePropertiesFromRegisteredInterpreter(property, className);
|
||||
|
||||
|
||||
RemoteInterpreter remoteInterpreter = new RemoteInterpreter(
|
||||
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, localRepoPath, connectTimeout,
|
||||
|
|
@ -948,7 +911,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
private Properties updatePropertiesFromRegisteredInterpreter(Properties properties,
|
||||
String className) {
|
||||
String className) {
|
||||
RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName(
|
||||
className);
|
||||
if (null != registeredInterpreter) {
|
||||
|
|
@ -963,10 +926,173 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
return properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* map interpreter ids into noteId
|
||||
*
|
||||
* @param noteId note id
|
||||
* @param ids InterpreterSetting id list
|
||||
* @throws IOException
|
||||
*/
|
||||
public void setInterpreters(String noteId, List<String> ids) throws IOException {
|
||||
putNoteInterpreterSettingBinding(noteId, ids);
|
||||
}
|
||||
|
||||
public List<String> getInterpreters(String noteId) {
|
||||
return getNoteInterpreterSettingBinding(noteId);
|
||||
}
|
||||
|
||||
public List<InterpreterSetting> getInterpreterSettings(String noteId) {
|
||||
List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId);
|
||||
LinkedList<InterpreterSetting> settings = new LinkedList<>();
|
||||
synchronized (interpreterSettingIds) {
|
||||
for (String id : interpreterSettingIds) {
|
||||
InterpreterSetting setting = get(id);
|
||||
if (setting == null) {
|
||||
// interpreter setting is removed from factory. remove id from here, too
|
||||
interpreterSettingIds.remove(id);
|
||||
} else {
|
||||
settings.add(setting);
|
||||
}
|
||||
}
|
||||
}
|
||||
return settings;
|
||||
}
|
||||
|
||||
public void closeNote(String noteId) {
|
||||
// close interpreters in this note session
|
||||
List<InterpreterSetting> settings = getInterpreterSettings(noteId);
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("closeNote: {}", noteId);
|
||||
for (InterpreterSetting setting : settings) {
|
||||
removeInterpretersForNote(setting, noteId);
|
||||
}
|
||||
}
|
||||
|
||||
private String getInterpreterInstanceKey(String noteId, InterpreterSetting setting) {
|
||||
if (setting.getOption().isExistingProcess()) {
|
||||
return Constants.EXISTING_PROCESS;
|
||||
} else if (setting.getOption().isPerNoteSession() || setting.getOption().isPerNoteProcess()) {
|
||||
return noteId;
|
||||
} else {
|
||||
return SHARED_SESSION;
|
||||
}
|
||||
}
|
||||
|
||||
private List<Interpreter> createOrGetInterpreterList(String noteId, InterpreterSetting setting) {
|
||||
InterpreterGroup interpreterGroup = setting.getInterpreterGroup(noteId);
|
||||
synchronized (interpreterGroup) {
|
||||
String key = getInterpreterInstanceKey(noteId, setting);
|
||||
if (!interpreterGroup.containsKey(key)) {
|
||||
createInterpretersForNote(setting, noteId, key);
|
||||
}
|
||||
return interpreterGroup.get(getInterpreterInstanceKey(noteId, setting));
|
||||
}
|
||||
}
|
||||
|
||||
private InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
|
||||
if (settings == null || settings.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return settings.get(0);
|
||||
}
|
||||
|
||||
public InterpreterSetting getDefaultInterpreterSetting(String noteId) {
|
||||
return getDefaultInterpreterSetting(getInterpreterSettings(noteId));
|
||||
}
|
||||
|
||||
public Interpreter getInterpreter(String noteId, String replName) {
|
||||
List<InterpreterSetting> settings = getInterpreterSettings(noteId);
|
||||
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (replName == null || replName.trim().length() == 0) {
|
||||
// get default settings (first available)
|
||||
// TODO(jl): Fix it in case of returning null
|
||||
InterpreterSetting defaultSettings = getDefaultInterpreterSetting(settings);
|
||||
return createOrGetInterpreterList(noteId, defaultSettings).get(0);
|
||||
}
|
||||
|
||||
if (Interpreter.registeredInterpreters == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String[] replNameSplit = replName.split("\\.");
|
||||
String group = null;
|
||||
String name = null;
|
||||
if (replNameSplit.length == 2) {
|
||||
group = replNameSplit[0];
|
||||
name = replNameSplit[1];
|
||||
|
||||
Interpreter.RegisteredInterpreter registeredInterpreter = Interpreter.registeredInterpreters
|
||||
.get(group + "." + name);
|
||||
if (registeredInterpreter == null
|
||||
|| registeredInterpreter.getClassName() == null) {
|
||||
throw new InterpreterException(replName + " interpreter not found");
|
||||
}
|
||||
String interpreterClassName = registeredInterpreter.getClassName();
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (registeredInterpreter.getGroup().equals(setting.getGroup())) {
|
||||
List<Interpreter> intpGroup = createOrGetInterpreterList(noteId, setting);
|
||||
for (Interpreter interpreter : intpGroup) {
|
||||
if (interpreterClassName.equals(interpreter.getClassName())) {
|
||||
return interpreter;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new InterpreterException(replName + " interpreter not found");
|
||||
} else {
|
||||
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
|
||||
// search 'name' from first (default) interpreter group
|
||||
InterpreterSetting defaultSetting = getDefaultInterpreterSetting(settings);
|
||||
Interpreter.RegisteredInterpreter registeredInterpreter =
|
||||
Interpreter.registeredInterpreters.get(defaultSetting.getGroup() + "." + replName);
|
||||
if (registeredInterpreter != null) {
|
||||
List<Interpreter> interpreters = createOrGetInterpreterList(noteId, defaultSetting);
|
||||
for (Interpreter interpreter : interpreters) {
|
||||
|
||||
RegisteredInterpreter intp =
|
||||
Interpreter.findRegisteredInterpreterByClassName(interpreter.getClassName());
|
||||
if (intp == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (intp.getName().equals(replName)) {
|
||||
return interpreter;
|
||||
}
|
||||
}
|
||||
|
||||
throw new InterpreterException(
|
||||
defaultSetting.getGroup() + "." + replName + " interpreter not found");
|
||||
}
|
||||
|
||||
// next, assume replName is 'group' of interpreter ('name' is ommitted)
|
||||
// search interpreter group and return first interpreter.
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getGroup().equals(replName)) {
|
||||
List<Interpreter> interpreters = createOrGetInterpreterList(noteId, setting);
|
||||
return interpreters.get(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dev interpreter
|
||||
if (DevInterpreter.isInterpreterName(replName)) {
|
||||
return getDevInterpreter();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
|
||||
URL[] urls = new URL[0];
|
||||
if (path == null || path.exists() == false) {
|
||||
if (path == null || !path.exists()) {
|
||||
return urls;
|
||||
} else if (path.getName().startsWith(".")) {
|
||||
return urls;
|
||||
|
|
@ -979,7 +1105,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
return urls;
|
||||
} else {
|
||||
return new URL[] {path.toURI().toURL()};
|
||||
return new URL[]{path.toURI().toURL()};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
@SuppressWarnings("rawtypes")
|
||||
Map<String, List<AngularObject>> angularObjects = new HashMap<>();
|
||||
|
||||
private transient NoteInterpreterLoader replLoader;
|
||||
private transient InterpreterFactory factory;
|
||||
private transient JobListenerFactory jobListenerFactory;
|
||||
private transient NotebookRepo repo;
|
||||
private transient SearchService index;
|
||||
|
|
@ -99,11 +99,11 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
|
||||
public Note() {}
|
||||
|
||||
public Note(NotebookRepo repo, NoteInterpreterLoader replLoader,
|
||||
public Note(NotebookRepo repo, InterpreterFactory factory,
|
||||
JobListenerFactory jlFactory, SearchService noteIndex, Credentials credentials,
|
||||
NoteEventListener noteEventListener) {
|
||||
this.repo = repo;
|
||||
this.replLoader = replLoader;
|
||||
this.factory = factory;
|
||||
this.jobListenerFactory = jlFactory;
|
||||
this.index = noteIndex;
|
||||
this.noteEventListener = noteEventListener;
|
||||
|
|
@ -116,8 +116,8 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
}
|
||||
|
||||
private String getDefaultInterpreterName() {
|
||||
Optional<InterpreterSetting> settingOptional = replLoader.getDefaultInterpreterSetting();
|
||||
return settingOptional.isPresent() ? settingOptional.get().getGroup() : StringUtils.EMPTY;
|
||||
InterpreterSetting setting = factory.getDefaultInterpreterSetting(getId());
|
||||
return null != setting ? setting.getGroup() : StringUtils.EMPTY;
|
||||
}
|
||||
|
||||
void putDefaultReplName() {
|
||||
|
|
@ -158,12 +158,8 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
this.name = name;
|
||||
}
|
||||
|
||||
public NoteInterpreterLoader getNoteReplLoader() {
|
||||
return replLoader;
|
||||
}
|
||||
|
||||
public void setReplLoader(NoteInterpreterLoader replLoader) {
|
||||
this.replLoader = replLoader;
|
||||
public void setInterpreterFactory(InterpreterFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
public JobListenerFactory getJobListenerFactory() {
|
||||
|
|
@ -205,7 +201,7 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
*/
|
||||
|
||||
public Paragraph addParagraph() {
|
||||
Paragraph p = new Paragraph(this, this, replLoader);
|
||||
Paragraph p = new Paragraph(this, this, factory);
|
||||
addLastReplNameIfEmptyText(p);
|
||||
synchronized (paragraphs) {
|
||||
paragraphs.add(p);
|
||||
|
|
@ -224,7 +220,7 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
public void addCloneParagraph(Paragraph srcParagraph) {
|
||||
|
||||
// Keep paragraph original ID
|
||||
final Paragraph newParagraph = new Paragraph(srcParagraph.getId(), this, this, replLoader);
|
||||
final Paragraph newParagraph = new Paragraph(srcParagraph.getId(), this, this, factory);
|
||||
|
||||
Map<String, Object> config = new HashMap<>(srcParagraph.getConfig());
|
||||
Map<String, Object> param = new HashMap<>(srcParagraph.settings.getParams());
|
||||
|
|
@ -261,7 +257,7 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
* @param index
|
||||
*/
|
||||
public Paragraph insertParagraph(int index) {
|
||||
Paragraph p = new Paragraph(this, this, replLoader);
|
||||
Paragraph p = new Paragraph(this, this, factory);
|
||||
addLastReplNameIfEmptyText(p);
|
||||
synchronized (paragraphs) {
|
||||
paragraphs.add(index, p);
|
||||
|
|
@ -450,9 +446,11 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
AuthenticationInfo authenticationInfo = new AuthenticationInfo();
|
||||
authenticationInfo.setUser(cronExecutingUser);
|
||||
p.setAuthenticationInfo(authenticationInfo);
|
||||
p.setNoteReplLoader(replLoader);
|
||||
p.setListener(this);
|
||||
Interpreter intp = replLoader.get(p.getRequiredReplName());
|
||||
|
||||
p.setInterpreterFactory(factory);
|
||||
p.setListener(jobListenerFactory.getParagraphJobListener(this));
|
||||
Interpreter intp = factory.getInterpreter(getId(), p.getRequiredReplName());
|
||||
|
||||
intp.getScheduler().submit(p);
|
||||
}
|
||||
}
|
||||
|
|
@ -465,15 +463,14 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
*/
|
||||
public void run(String paragraphId) {
|
||||
Paragraph p = getParagraph(paragraphId);
|
||||
p.setNoteReplLoader(replLoader);
|
||||
|
||||
p.setListener(this);
|
||||
p.setInterpreterFactory(factory);
|
||||
p.setListener(jobListenerFactory.getParagraphJobListener(this));
|
||||
String requiredReplName = p.getRequiredReplName();
|
||||
Interpreter intp = replLoader.get(requiredReplName);
|
||||
Interpreter intp = factory.getInterpreter(getId(), requiredReplName);
|
||||
|
||||
if (intp == null) {
|
||||
// TODO(jongyoul): Make "%jdbc" configurable from JdbcInterpreter
|
||||
if (conf.getUseJdbcAlias() && null != (intp = replLoader.get("jdbc"))) {
|
||||
if (conf.getUseJdbcAlias() && null != (intp = factory.getInterpreter(getId(), "jdbc"))) {
|
||||
String pText = p.getText().replaceFirst(requiredReplName, "jdbc(" + requiredReplName + ")");
|
||||
logger.debug("New paragraph: {}", pText);
|
||||
p.setEffectiveText(pText);
|
||||
|
|
@ -504,7 +501,7 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
|
||||
public List<InterpreterCompletion> completion(String paragraphId, String buffer, int cursor) {
|
||||
Paragraph p = getParagraph(paragraphId);
|
||||
p.setNoteReplLoader(replLoader);
|
||||
p.setInterpreterFactory(factory);
|
||||
p.setListener(jobListenerFactory.getParagraphJobListener(this));
|
||||
List completion = p.completion(buffer, cursor);
|
||||
|
||||
|
|
@ -520,7 +517,7 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
private void snapshotAngularObjectRegistry() {
|
||||
angularObjects = new HashMap<>();
|
||||
|
||||
List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
|
||||
List<InterpreterSetting> settings = factory.getInterpreterSettings(getId());
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -535,7 +532,7 @@ public class Note implements Serializable, ParagraphJobListener {
|
|||
private void removeAllAngularObjectInParagraph(String paragraphId) {
|
||||
angularObjects = new HashMap<String, List<AngularObject>>();
|
||||
|
||||
List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
|
||||
List<InterpreterSetting> settings = factory.getInterpreterSettings(getId());
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,213 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Constants;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.dev.DevInterpreter;
|
||||
|
||||
/**
|
||||
* Interpreter loader per note.
|
||||
*/
|
||||
public class NoteInterpreterLoader {
|
||||
private transient InterpreterFactory factory;
|
||||
private static String SHARED_SESSION = "shared_session";
|
||||
String noteId;
|
||||
|
||||
public NoteInterpreterLoader(InterpreterFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
public void setNoteId(String noteId) {
|
||||
this.noteId = noteId;
|
||||
}
|
||||
|
||||
/**
|
||||
* set interpreter ids
|
||||
* @param ids InterpreterSetting id list
|
||||
* @throws IOException
|
||||
*/
|
||||
public void setInterpreters(List<String> ids) throws IOException {
|
||||
factory.putNoteInterpreterSettingBinding(noteId, ids);
|
||||
}
|
||||
|
||||
public List<String> getInterpreters() {
|
||||
return factory.getNoteInterpreterSettingBinding(noteId);
|
||||
}
|
||||
|
||||
public List<InterpreterSetting> getInterpreterSettings() {
|
||||
List<String> interpreterSettingIds = factory.getNoteInterpreterSettingBinding(noteId);
|
||||
LinkedList<InterpreterSetting> settings = new LinkedList<InterpreterSetting>();
|
||||
synchronized (interpreterSettingIds) {
|
||||
for (String id : interpreterSettingIds) {
|
||||
InterpreterSetting setting = factory.get(id);
|
||||
if (setting == null) {
|
||||
// interpreter setting is removed from factory. remove id from here, too
|
||||
interpreterSettingIds.remove(id);
|
||||
} else {
|
||||
settings.add(setting);
|
||||
}
|
||||
}
|
||||
}
|
||||
return settings;
|
||||
}
|
||||
|
||||
private String getInterpreterInstanceKey(InterpreterSetting setting) {
|
||||
if (setting.getOption().isExistingProcess()) {
|
||||
return Constants.EXISTING_PROCESS;
|
||||
} else if (setting.getOption().isPerNoteSession() || setting.getOption().isPerNoteProcess()) {
|
||||
return noteId;
|
||||
} else {
|
||||
return SHARED_SESSION;
|
||||
}
|
||||
}
|
||||
|
||||
private List<Interpreter> createOrGetInterpreterList(InterpreterSetting setting) {
|
||||
InterpreterGroup interpreterGroup =
|
||||
setting.getInterpreterGroup(noteId);
|
||||
synchronized (interpreterGroup) {
|
||||
String key = getInterpreterInstanceKey(setting);
|
||||
if (!interpreterGroup.containsKey(key)) {
|
||||
factory.createInterpretersForNote(setting, noteId, key);
|
||||
}
|
||||
return interpreterGroup.get(getInterpreterInstanceKey(setting));
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
// close interpreters in this note session
|
||||
List<InterpreterSetting> settings = this.getInterpreterSettings();
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
System.err.println("close");
|
||||
for (InterpreterSetting setting : settings) {
|
||||
factory.removeInterpretersForNote(setting, noteId);
|
||||
}
|
||||
}
|
||||
|
||||
public Interpreter get(String replName) {
|
||||
List<InterpreterSetting> settings = getInterpreterSettings();
|
||||
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (replName == null || replName.trim().length() == 0) {
|
||||
// get default settings (first available)
|
||||
InterpreterSetting defaultSettings = getDefaultInterpreterSetting(settings).get();
|
||||
return createOrGetInterpreterList(defaultSettings).get(0);
|
||||
}
|
||||
|
||||
if (Interpreter.registeredInterpreters == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String[] replNameSplit = replName.split("\\.");
|
||||
String group = null;
|
||||
String name = null;
|
||||
if (replNameSplit.length == 2) {
|
||||
group = replNameSplit[0];
|
||||
name = replNameSplit[1];
|
||||
|
||||
Interpreter.RegisteredInterpreter registeredInterpreter = Interpreter.registeredInterpreters
|
||||
.get(group + "." + name);
|
||||
if (registeredInterpreter == null
|
||||
|| registeredInterpreter.getClassName() == null) {
|
||||
throw new InterpreterException(replName + " interpreter not found");
|
||||
}
|
||||
String interpreterClassName = registeredInterpreter.getClassName();
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (registeredInterpreter.getGroup().equals(setting.getGroup())) {
|
||||
List<Interpreter> intpGroup = createOrGetInterpreterList(setting);
|
||||
for (Interpreter interpreter : intpGroup) {
|
||||
if (interpreterClassName.equals(interpreter.getClassName())) {
|
||||
return interpreter;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new InterpreterException(replName + " interpreter not found");
|
||||
} else {
|
||||
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
|
||||
// search 'name' from first (default) interpreter group
|
||||
InterpreterSetting defaultSetting = getDefaultInterpreterSetting(settings).get();
|
||||
Interpreter.RegisteredInterpreter registeredInterpreter =
|
||||
Interpreter.registeredInterpreters.get(defaultSetting.getGroup() + "." + replName);
|
||||
if (registeredInterpreter != null) {
|
||||
List<Interpreter> interpreters = createOrGetInterpreterList(defaultSetting);
|
||||
for (Interpreter interpreter : interpreters) {
|
||||
|
||||
RegisteredInterpreter intp =
|
||||
Interpreter.findRegisteredInterpreterByClassName(interpreter.getClassName());
|
||||
if (intp == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (intp.getName().equals(replName)) {
|
||||
return interpreter;
|
||||
}
|
||||
}
|
||||
|
||||
throw new InterpreterException(
|
||||
defaultSetting.getGroup() + "." + replName + " interpreter not found");
|
||||
}
|
||||
|
||||
// next, assume replName is 'group' of interpreter ('name' is ommitted)
|
||||
// search interpreter group and return first interpreter.
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getGroup().equals(replName)) {
|
||||
List<Interpreter> interpreters = createOrGetInterpreterList(setting);
|
||||
return interpreters.get(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dev interpreter
|
||||
if (DevInterpreter.isInterpreterName(replName)) {
|
||||
return factory.getDevInterpreter();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private Optional<InterpreterSetting>
|
||||
getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
|
||||
if (settings == null || settings.isEmpty()) {
|
||||
return Optional.absent();
|
||||
}
|
||||
return Optional.of(settings.get(0));
|
||||
}
|
||||
|
||||
Optional<InterpreterSetting> getDefaultInterpreterSetting() {
|
||||
return getDefaultInterpreterSetting(getInterpreterSettings());
|
||||
}
|
||||
}
|
||||
|
|
@ -154,15 +154,13 @@ public class Notebook implements NoteEventListener {
|
|||
*/
|
||||
public Note createNote(List<String> interpreterIds, AuthenticationInfo subject)
|
||||
throws IOException {
|
||||
NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory);
|
||||
Note note = new Note(
|
||||
notebookRepo,
|
||||
intpLoader,
|
||||
replFactory,
|
||||
jobListenerFactory,
|
||||
notebookIndex,
|
||||
credentials,
|
||||
this);
|
||||
intpLoader.setNoteId(note.id());
|
||||
synchronized (notes) {
|
||||
notes.put(note.id(), note);
|
||||
}
|
||||
|
|
@ -265,17 +263,17 @@ public class Notebook implements NoteEventListener {
|
|||
}
|
||||
|
||||
public void bindInterpretersToNote(String id,
|
||||
List<String> newBindings) throws IOException {
|
||||
List<String> interpreterSettingIds) throws IOException {
|
||||
Note note = getNote(id);
|
||||
if (note != null) {
|
||||
List<InterpreterSetting> currentBindings = note.getNoteReplLoader().getInterpreterSettings();
|
||||
List<InterpreterSetting> currentBindings = replFactory.getInterpreterSettings(id);
|
||||
for (InterpreterSetting setting : currentBindings) {
|
||||
if (!newBindings.contains(setting.id())) {
|
||||
if (!interpreterSettingIds.contains(setting.id())) {
|
||||
fireUnbindInterpreter(note, setting);
|
||||
}
|
||||
}
|
||||
|
||||
note.getNoteReplLoader().setInterpreters(newBindings);
|
||||
replFactory.setInterpreters(note.getId(), interpreterSettingIds);
|
||||
// comment out while note.getNoteReplLoader().setInterpreters(...) do the same
|
||||
// replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
|
||||
}
|
||||
|
|
@ -284,7 +282,7 @@ public class Notebook implements NoteEventListener {
|
|||
public List<String> getBindedInterpreterSettingsIds(String id) {
|
||||
Note note = getNote(id);
|
||||
if (note != null) {
|
||||
return note.getNoteReplLoader().getInterpreters();
|
||||
return getInterpreterFactory().getInterpreters(note.getId());
|
||||
} else {
|
||||
return new LinkedList<String>();
|
||||
}
|
||||
|
|
@ -293,7 +291,7 @@ public class Notebook implements NoteEventListener {
|
|||
public List<InterpreterSetting> getBindedInterpreterSettings(String id) {
|
||||
Note note = getNote(id);
|
||||
if (note != null) {
|
||||
return note.getNoteReplLoader().getInterpreterSettings();
|
||||
return replFactory.getInterpreterSettings(note.getId());
|
||||
} else {
|
||||
return new LinkedList<InterpreterSetting>();
|
||||
}
|
||||
|
|
@ -384,9 +382,7 @@ public class Notebook implements NoteEventListener {
|
|||
note.setIndex(this.notebookIndex);
|
||||
note.setCredentials(this.credentials);
|
||||
|
||||
NoteInterpreterLoader replLoader = new NoteInterpreterLoader(replFactory);
|
||||
note.setReplLoader(replLoader);
|
||||
replLoader.setNoteId(note.id());
|
||||
note.setInterpreterFactory(replFactory);
|
||||
|
||||
note.setJobListenerFactory(jobListenerFactory);
|
||||
note.setNotebookRepo(notebookRepo);
|
||||
|
|
@ -646,9 +642,9 @@ public class Notebook implements NoteEventListener {
|
|||
|
||||
// set interpreter bind type
|
||||
String interpreterGroupName = null;
|
||||
if (note.getNoteReplLoader().getInterpreterSettings() != null &&
|
||||
note.getNoteReplLoader().getInterpreterSettings().size() >= 1) {
|
||||
interpreterGroupName = note.getNoteReplLoader().getInterpreterSettings().get(0).getGroup();
|
||||
if (replFactory.getInterpreterSettings(note.getId()) != null &&
|
||||
replFactory.getInterpreterSettings(note.getId()).size() >= 1) {
|
||||
interpreterGroupName = replFactory.getInterpreterSettings(note.getId()).get(0).getGroup();
|
||||
}
|
||||
|
||||
// not update and not running -> pass
|
||||
|
|
@ -698,7 +694,8 @@ public class Notebook implements NoteEventListener {
|
|||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
if (releaseResource) {
|
||||
for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) {
|
||||
for (InterpreterSetting setting :
|
||||
notebook.getInterpreterFactory().getInterpreterSettings(note.getId())) {
|
||||
notebook.getInterpreterFactory().restart(setting.id());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
public class Paragraph extends Job implements Serializable, Cloneable {
|
||||
private static final long serialVersionUID = -6328572073497992016L;
|
||||
|
||||
private transient NoteInterpreterLoader replLoader;
|
||||
private transient InterpreterFactory factory;
|
||||
private transient Note note;
|
||||
private transient AuthenticationInfo authenticationInfo;
|
||||
private transient String effectiveText;
|
||||
|
|
@ -75,10 +75,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
}
|
||||
|
||||
public Paragraph(String paragraphId, Note note, JobListener listener,
|
||||
NoteInterpreterLoader replLoader) {
|
||||
InterpreterFactory factory) {
|
||||
super(paragraphId, generateId(), listener);
|
||||
this.note = note;
|
||||
this.replLoader = replLoader;
|
||||
this.factory = factory;
|
||||
title = null;
|
||||
text = null;
|
||||
authenticationInfo = null;
|
||||
|
|
@ -88,10 +88,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
config = new HashMap<String, Object>();
|
||||
}
|
||||
|
||||
public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoader) {
|
||||
public Paragraph(Note note, JobListener listener, InterpreterFactory factory) {
|
||||
super(generateId(), listener);
|
||||
this.note = note;
|
||||
this.replLoader = replLoader;
|
||||
this.factory = factory;
|
||||
title = null;
|
||||
text = null;
|
||||
authenticationInfo = null;
|
||||
|
|
@ -200,15 +200,8 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
return text.substring(magic.length() + 1).trim();
|
||||
}
|
||||
|
||||
public NoteInterpreterLoader getNoteReplLoader() {
|
||||
return replLoader;
|
||||
}
|
||||
|
||||
public Interpreter getRepl(String name) {
|
||||
if (replLoader == null) {
|
||||
return null;
|
||||
}
|
||||
return replLoader.get(name);
|
||||
return factory.getInterpreter(note.getId(), name);
|
||||
}
|
||||
|
||||
public Interpreter getCurrentRepl() {
|
||||
|
|
@ -230,8 +223,8 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
return completion;
|
||||
}
|
||||
|
||||
public void setNoteReplLoader(NoteInterpreterLoader repls) {
|
||||
this.replLoader = repls;
|
||||
public void setInterpreterFactory(InterpreterFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
public InterpreterResult getResult() {
|
||||
|
|
@ -376,8 +369,8 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
AngularObjectRegistry registry = null;
|
||||
ResourcePool resourcePool = null;
|
||||
|
||||
if (!getNoteReplLoader().getInterpreterSettings().isEmpty()) {
|
||||
InterpreterSetting intpGroup = getNoteReplLoader().getInterpreterSettings().get(0);
|
||||
if (!factory.getInterpreterSettings(note.getId()).isEmpty()) {
|
||||
InterpreterSetting intpGroup = factory.getInterpreterSettings(note.getId()).get(0);
|
||||
registry = intpGroup.getInterpreterGroup(note.id()).getAngularObjectRegistry();
|
||||
resourcePool = intpGroup.getInterpreterGroup(note.id()).getResourcePool();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
new String[][]{});
|
||||
|
||||
Note note1 = notebook.createNote(null);
|
||||
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note1.getId(),factory.getDefaultInterpreterSettingList());
|
||||
|
||||
Paragraph p1 = note1.addParagraph();
|
||||
|
||||
|
|
@ -179,7 +179,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
new String[][]{});
|
||||
|
||||
Note note1 = notebook.createNote(null);
|
||||
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note1.id(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
Paragraph p1 = note1.addParagraph();
|
||||
|
||||
|
|
|
|||
|
|
@ -71,102 +71,92 @@ public class NoteInterpreterLoaderTest {
|
|||
|
||||
@Test
|
||||
public void testGetInterpreter() throws IOException {
|
||||
NoteInterpreterLoader loader = new NoteInterpreterLoader(factory);
|
||||
loader.setNoteId("note");
|
||||
loader.setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters("note", factory.getDefaultInterpreterSettingList());
|
||||
|
||||
// when there're no interpreter selection directive
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get(null).getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get("").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get(" ").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", null).getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", "").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", " ").getClassName());
|
||||
|
||||
// when group name is omitted
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", loader.get("mock11").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", factory.getInterpreter("note", "mock11").getClassName());
|
||||
|
||||
// when 'name' is ommitted
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get("group1").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", loader.get("group2").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", "group1").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", factory.getInterpreter("note", "group2").getClassName());
|
||||
|
||||
// when nothing is ommitted
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get("group1.mock1").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", loader.get("group1.mock11").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", loader.get("group2.mock2").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", factory.getInterpreter("note", "group1.mock1").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", factory.getInterpreter("note", "group1.mock11").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", factory.getInterpreter("note", "group2.mock2").getClassName());
|
||||
|
||||
loader.close();
|
||||
factory.closeNote("note");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoteSession() throws IOException {
|
||||
NoteInterpreterLoader loaderA = new NoteInterpreterLoader(factory);
|
||||
loaderA.setNoteId("noteA");
|
||||
loaderA.setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
loaderA.getInterpreterSettings().get(0).getOption().setPerNoteSession(true);
|
||||
factory.setInterpreters("noteA", factory.getDefaultInterpreterSettingList());
|
||||
factory.getInterpreterSettings("noteA").get(0).getOption().setPerNoteSession(true);
|
||||
|
||||
NoteInterpreterLoader loaderB = new NoteInterpreterLoader(factory);
|
||||
loaderB.setNoteId("noteB");
|
||||
loaderB.setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
loaderB.getInterpreterSettings().get(0).getOption().setPerNoteSession(true);
|
||||
factory.setInterpreters("noteB", factory.getDefaultInterpreterSettingList());
|
||||
factory.getInterpreterSettings("noteB").get(0).getOption().setPerNoteSession(true);
|
||||
|
||||
// interpreters are not created before accessing it
|
||||
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteA"));
|
||||
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteB"));
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("shared_process").get("noteA"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("shared_process").get("noteB"));
|
||||
|
||||
loaderA.get(null).open();
|
||||
loaderB.get(null).open();
|
||||
factory.getInterpreter("noteA", null).open();
|
||||
factory.getInterpreter("noteB", null).open();
|
||||
|
||||
assertTrue(
|
||||
loaderA.get(null).getInterpreterGroup().getId().equals(
|
||||
loaderB.get(null).getInterpreterGroup().getId()));
|
||||
factory.getInterpreter("noteA", null).getInterpreterGroup().getId().equals(
|
||||
factory.getInterpreter("noteB", null).getInterpreterGroup().getId()));
|
||||
|
||||
// interpreters are created after accessing it
|
||||
assertNotNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteA"));
|
||||
assertNotNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteB"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("shared_process").get("noteA"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("shared_process").get("noteB"));
|
||||
|
||||
// when
|
||||
loaderA.close();
|
||||
loaderB.close();
|
||||
factory.closeNote("noteA");
|
||||
factory.closeNote("noteB");
|
||||
|
||||
// interpreters are destroyed after close
|
||||
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteA"));
|
||||
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("shared_process").get("noteB"));
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("shared_process").get("noteA"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("shared_process").get("noteB"));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotePerInterpreterProcess() throws IOException {
|
||||
NoteInterpreterLoader loaderA = new NoteInterpreterLoader(factory);
|
||||
loaderA.setNoteId("noteA");
|
||||
loaderA.setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
loaderA.getInterpreterSettings().get(0).getOption().setPerNoteProcess(true);
|
||||
factory.setInterpreters("noteA", factory.getDefaultInterpreterSettingList());
|
||||
factory.getInterpreterSettings("noteA").get(0).getOption().setPerNoteProcess(true);
|
||||
|
||||
NoteInterpreterLoader loaderB = new NoteInterpreterLoader(factory);
|
||||
loaderB.setNoteId("noteB");
|
||||
loaderB.setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
loaderB.getInterpreterSettings().get(0).getOption().setPerNoteProcess(true);
|
||||
factory.setInterpreters("noteB", factory.getDefaultInterpreterSettingList());
|
||||
factory.getInterpreterSettings("noteB").get(0).getOption().setPerNoteProcess(true);
|
||||
|
||||
// interpreters are not created before accessing it
|
||||
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("noteA").get("noteA"));
|
||||
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("noteB").get("noteB"));
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("noteA").get("noteA"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("noteB").get("noteB"));
|
||||
|
||||
loaderA.get(null).open();
|
||||
loaderB.get(null).open();
|
||||
factory.getInterpreter("noteA", null).open();
|
||||
factory.getInterpreter("noteB", null).open();
|
||||
|
||||
// per note interpreter process
|
||||
assertFalse(
|
||||
loaderA.get(null).getInterpreterGroup().getId().equals(
|
||||
loaderB.get(null).getInterpreterGroup().getId()));
|
||||
factory.getInterpreter("noteA", null).getInterpreterGroup().getId().equals(
|
||||
factory.getInterpreter("noteB", null).getInterpreterGroup().getId()));
|
||||
|
||||
// interpreters are created after accessing it
|
||||
assertNotNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("noteA").get("noteA"));
|
||||
assertNotNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("noteB").get("noteB"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("noteA").get("noteA"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("noteB").get("noteB"));
|
||||
|
||||
// when
|
||||
loaderA.close();
|
||||
loaderB.close();
|
||||
factory.closeNote("noteA");
|
||||
factory.closeNote("noteB");
|
||||
|
||||
// interpreters are destroyed after close
|
||||
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup("noteA").get("noteA"));
|
||||
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup("noteB").get("noteB"));
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("noteA").get("noteA"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("noteB").get("noteB"));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import com.google.common.base.Optional;
|
|||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -41,9 +42,6 @@ public class NoteTest {
|
|||
@Mock
|
||||
NotebookRepo repo;
|
||||
|
||||
@Mock
|
||||
NoteInterpreterLoader replLoader;
|
||||
|
||||
@Mock
|
||||
JobListenerFactory jobListenerFactory;
|
||||
|
||||
|
|
@ -62,39 +60,44 @@ public class NoteTest {
|
|||
@Mock
|
||||
NoteEventListener noteEventListener;
|
||||
|
||||
@Mock
|
||||
InterpreterFactory interpreterFactory;
|
||||
|
||||
@Test
|
||||
public void runNormalTest() {
|
||||
when(replLoader.get("spark")).thenReturn(interpreter);
|
||||
when(interpreterFactory.getInterpreter(anyString(), eq("spark"))).thenReturn(interpreter);
|
||||
when(interpreter.getScheduler()).thenReturn(scheduler);
|
||||
|
||||
String pText = "%spark sc.version";
|
||||
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
|
||||
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
|
||||
|
||||
Paragraph p = note.addParagraph();
|
||||
p.setText(pText);
|
||||
note.run(p.getId());
|
||||
|
||||
ArgumentCaptor<Paragraph> pCaptor = ArgumentCaptor.forClass(Paragraph.class);
|
||||
verify(scheduler, only()).submit(pCaptor.capture());
|
||||
verify(replLoader, only()).get("spark");
|
||||
verify(interpreterFactory, only()).getInterpreter(anyString(), eq("spark"));
|
||||
|
||||
assertEquals("Paragraph text", pText, pCaptor.getValue().getText());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void runJdbcTest() {
|
||||
when(replLoader.get("mysql")).thenReturn(null);
|
||||
when(replLoader.get("jdbc")).thenReturn(interpreter);
|
||||
when(interpreterFactory.getInterpreter(anyString(), eq("mysql"))).thenReturn(null);
|
||||
when(interpreterFactory.getInterpreter(anyString(), eq("jdbc"))).thenReturn(interpreter);
|
||||
when(interpreter.getScheduler()).thenReturn(scheduler);
|
||||
|
||||
String pText = "%mysql show databases";
|
||||
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
|
||||
|
||||
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
|
||||
Paragraph p = note.addParagraph();
|
||||
p.setText(pText);
|
||||
note.run(p.getId());
|
||||
|
||||
ArgumentCaptor<Paragraph> pCaptor = ArgumentCaptor.forClass(Paragraph.class);
|
||||
verify(scheduler, only()).submit(pCaptor.capture());
|
||||
verify(replLoader, times(2)).get(anyString());
|
||||
verify(interpreterFactory, times(2)).getInterpreter(anyString(), anyString());
|
||||
|
||||
assertEquals("Change paragraph text", "%jdbc(mysql) show databases", pCaptor.getValue().getEffectiveText());
|
||||
assertEquals("Change paragraph text", pText, pCaptor.getValue().getText());
|
||||
|
|
@ -102,10 +105,10 @@ public class NoteTest {
|
|||
|
||||
@Test
|
||||
public void putDefaultReplNameIfInterpreterSettingAbsent() {
|
||||
when(replLoader.getDefaultInterpreterSetting())
|
||||
.thenReturn(Optional.<InterpreterSetting>absent());
|
||||
when(interpreterFactory.getDefaultInterpreterSetting(anyString()))
|
||||
.thenReturn(null);
|
||||
|
||||
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
|
||||
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
|
||||
note.putDefaultReplName();
|
||||
|
||||
assertEquals(StringUtils.EMPTY, note.getLastReplName());
|
||||
|
|
@ -116,10 +119,10 @@ public class NoteTest {
|
|||
public void putDefaultReplNameIfInterpreterSettingPresent() {
|
||||
InterpreterSetting interpreterSetting = Mockito.mock(InterpreterSetting.class);
|
||||
when(interpreterSetting.getGroup()).thenReturn("spark");
|
||||
when(replLoader.getDefaultInterpreterSetting())
|
||||
.thenReturn(Optional.of(interpreterSetting));
|
||||
when(interpreterFactory.getDefaultInterpreterSetting(anyString()))
|
||||
.thenReturn(interpreterSetting);
|
||||
|
||||
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
|
||||
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
|
||||
note.putDefaultReplName();
|
||||
|
||||
assertEquals("spark", note.getLastReplName());
|
||||
|
|
@ -130,10 +133,10 @@ public class NoteTest {
|
|||
public void addParagraphWithLastReplName() {
|
||||
InterpreterSetting interpreterSetting = Mockito.mock(InterpreterSetting.class);
|
||||
when(interpreterSetting.getGroup()).thenReturn("spark");
|
||||
when(replLoader.getDefaultInterpreterSetting())
|
||||
.thenReturn(Optional.of(interpreterSetting));
|
||||
when(interpreterFactory.getDefaultInterpreterSetting(anyString()))
|
||||
.thenReturn(interpreterSetting);
|
||||
|
||||
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
|
||||
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
|
||||
note.putDefaultReplName(); //set lastReplName
|
||||
|
||||
Paragraph p = note.addParagraph();
|
||||
|
|
@ -145,10 +148,10 @@ public class NoteTest {
|
|||
public void insertParagraphWithLastReplName() {
|
||||
InterpreterSetting interpreterSetting = Mockito.mock(InterpreterSetting.class);
|
||||
when(interpreterSetting.getGroup()).thenReturn("spark");
|
||||
when(replLoader.getDefaultInterpreterSetting())
|
||||
.thenReturn(Optional.of(interpreterSetting));
|
||||
when(interpreterFactory.getDefaultInterpreterSetting(anyString()))
|
||||
.thenReturn(interpreterSetting);
|
||||
|
||||
Note note = new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener);
|
||||
Note note = new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener);
|
||||
note.putDefaultReplName(); //set lastReplName
|
||||
|
||||
Paragraph p = note.insertParagraph(note.getParagraphs().size());
|
||||
|
|
@ -159,7 +162,7 @@ public class NoteTest {
|
|||
@Test
|
||||
public void setLastReplName() {
|
||||
String paragraphId = "HelloWorld";
|
||||
Note note = Mockito.spy(new Note(repo, replLoader, jobListenerFactory, index, credentials, noteEventListener));
|
||||
Note note = Mockito.spy(new Note(repo, interpreterFactory, jobListenerFactory, index, credentials, noteEventListener));
|
||||
Paragraph mockParagraph = Mockito.mock(Paragraph.class);
|
||||
when(note.getParagraph(paragraphId)).thenReturn(mockParagraph);
|
||||
when(mockParagraph.getRequiredReplName()).thenReturn("spark");
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
@Test
|
||||
public void testSelectingReplImplementation() throws IOException {
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
// run with defatul repl
|
||||
Paragraph p1 = note.addParagraph();
|
||||
|
|
@ -204,7 +204,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
@Test
|
||||
public void testRunAll() throws IOException {
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
// p1
|
||||
Paragraph p1 = note.addParagraph();
|
||||
|
|
@ -243,7 +243,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
public void testSchedule() throws InterruptedException, IOException{
|
||||
// create a note and a paragraph
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
Paragraph p = note.addParagraph();
|
||||
Map config = new HashMap<String, Object>();
|
||||
|
|
@ -275,7 +275,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
public void testAutoRestartInterpreterAfterSchedule() throws InterruptedException, IOException{
|
||||
// create a note and a paragraph
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
Paragraph p = note.addParagraph();
|
||||
Map config = new HashMap<String, Object>();
|
||||
|
|
@ -296,11 +296,11 @@ public class NotebookTest implements JobListenerFactory{
|
|||
|
||||
|
||||
MockInterpreter1 mock1 = ((MockInterpreter1) (((ClassloaderInterpreter)
|
||||
((LazyOpenInterpreter) note.getNoteReplLoader().get("mock1")).getInnerInterpreter())
|
||||
((LazyOpenInterpreter) factory.getInterpreter(note.getId(), "mock1")).getInnerInterpreter())
|
||||
.getInnerInterpreter()));
|
||||
|
||||
MockInterpreter2 mock2 = ((MockInterpreter2) (((ClassloaderInterpreter)
|
||||
((LazyOpenInterpreter) note.getNoteReplLoader().get("mock2")).getInnerInterpreter())
|
||||
((LazyOpenInterpreter) factory.getInterpreter(note.getId(), "mock2")).getInnerInterpreter())
|
||||
.getInnerInterpreter()));
|
||||
|
||||
// wait until interpreters are started
|
||||
|
|
@ -327,7 +327,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
public void testExportAndImportNote() throws IOException, CloneNotSupportedException,
|
||||
InterruptedException {
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
final Paragraph p = note.addParagraph();
|
||||
String simpleText = "hello world";
|
||||
|
|
@ -354,7 +354,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
public void testCloneNote() throws IOException, CloneNotSupportedException,
|
||||
InterruptedException {
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
final Paragraph p = note.addParagraph();
|
||||
p.setText("hello world");
|
||||
|
|
@ -376,7 +376,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
public void testCloneNoteWithExceptionResult() throws IOException, CloneNotSupportedException,
|
||||
InterruptedException {
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
final Paragraph p = note.addParagraph();
|
||||
p.setText("hello world");
|
||||
|
|
@ -399,7 +399,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
@Test
|
||||
public void testResourceRemovealOnParagraphNoteRemove() throws IOException {
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
|
||||
intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
|
||||
}
|
||||
|
|
@ -428,10 +428,10 @@ public class NotebookTest implements JobListenerFactory{
|
|||
IOException {
|
||||
// create a note and a paragraph
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
AngularObjectRegistry registry = note.getNoteReplLoader()
|
||||
.getInterpreterSettings().get(0).getInterpreterGroup("sharedProcess")
|
||||
AngularObjectRegistry registry = factory
|
||||
.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup("sharedProcess")
|
||||
.getAngularObjectRegistry();
|
||||
|
||||
Paragraph p1 = note.addParagraph();
|
||||
|
|
@ -461,10 +461,10 @@ public class NotebookTest implements JobListenerFactory{
|
|||
IOException {
|
||||
// create a note and a paragraph
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
AngularObjectRegistry registry = note.getNoteReplLoader()
|
||||
.getInterpreterSettings().get(0).getInterpreterGroup("sharedProcess")
|
||||
AngularObjectRegistry registry = factory
|
||||
.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup("sharedProcess")
|
||||
.getAngularObjectRegistry();
|
||||
|
||||
Paragraph p1 = note.addParagraph();
|
||||
|
|
@ -494,10 +494,10 @@ public class NotebookTest implements JobListenerFactory{
|
|||
IOException {
|
||||
// create a note and a paragraph
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
AngularObjectRegistry registry = note.getNoteReplLoader()
|
||||
.getInterpreterSettings().get(0).getInterpreterGroup("sharedProcess")
|
||||
AngularObjectRegistry registry = factory
|
||||
.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup("sharedProcess")
|
||||
.getAngularObjectRegistry();
|
||||
|
||||
// add local scope object
|
||||
|
|
@ -506,9 +506,8 @@ public class NotebookTest implements JobListenerFactory{
|
|||
registry.add("o2", "object2", null, null);
|
||||
|
||||
// restart interpreter
|
||||
factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id());
|
||||
registry = note.getNoteReplLoader()
|
||||
.getInterpreterSettings().get(0).getInterpreterGroup("sharedProcess")
|
||||
factory.restart(factory.getInterpreterSettings(note.getId()).get(0).id());
|
||||
registry = factory.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup("sharedProcess")
|
||||
.getAngularObjectRegistry();
|
||||
|
||||
// local and global scope object should be removed
|
||||
|
|
@ -566,7 +565,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
public void testAbortParagraphStatusOnInterpreterRestart() throws InterruptedException,
|
||||
IOException {
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
ArrayList<Paragraph> paragraphs = new ArrayList<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
|
@ -583,7 +582,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
|
||||
while (paragraphs.get(0).getStatus() != Status.FINISHED) Thread.yield();
|
||||
|
||||
factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id());
|
||||
factory.restart(factory.getInterpreterSettings(note.getId()).get(0).id());
|
||||
|
||||
boolean isAborted = false;
|
||||
for (Paragraph p : paragraphs) {
|
||||
|
|
@ -607,7 +606,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
p1.setText("getId");
|
||||
|
||||
// restart interpreter with per note session enabled
|
||||
for (InterpreterSetting setting : note1.getNoteReplLoader().getInterpreterSettings()) {
|
||||
for (InterpreterSetting setting : factory.getInterpreterSettings(note1.getId())) {
|
||||
setting.getOption().setPerNoteSession(true);
|
||||
notebook.getInterpreterFactory().restart(setting.id());
|
||||
}
|
||||
|
|
@ -652,7 +651,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
|
||||
|
||||
// restart interpreter with per note session enabled
|
||||
for (InterpreterSetting setting : note1.getNoteReplLoader().getInterpreterSettings()) {
|
||||
for (InterpreterSetting setting : notebook.getInterpreterFactory().getInterpreterSettings(note1.getId())) {
|
||||
setting.getOption().setPerNoteSession(true);
|
||||
notebook.getInterpreterFactory().restart(setting.id());
|
||||
}
|
||||
|
|
@ -678,7 +677,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
p1.setText("getId");
|
||||
|
||||
// restart interpreter with per note session enabled
|
||||
for (InterpreterSetting setting : note1.getNoteReplLoader().getInterpreterSettings()) {
|
||||
for (InterpreterSetting setting : factory.getInterpreterSettings(note1.getId())) {
|
||||
setting.getOption().setPerNoteSession(true);
|
||||
notebook.getInterpreterFactory().restart(setting.id());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook;
|
|||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
|
@ -28,6 +30,7 @@ import org.apache.zeppelin.display.AngularObjectBuilder;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.Input;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
|
@ -72,17 +75,19 @@ public class ParagraphTest {
|
|||
|
||||
@Test
|
||||
public void effectiveTextTest() {
|
||||
NoteInterpreterLoader noteInterpreterLoader = mock(NoteInterpreterLoader.class);
|
||||
InterpreterFactory interpreterFactory = mock(InterpreterFactory.class);
|
||||
Interpreter interpreter = mock(Interpreter.class);
|
||||
Note note = mock(Note.class);
|
||||
|
||||
Paragraph p = new Paragraph(null, null, null, noteInterpreterLoader);
|
||||
Paragraph p = new Paragraph("paragraph", note, null, interpreterFactory);
|
||||
p.setText("%h2 show databases");
|
||||
p.setEffectiveText("%jdbc(h2) show databases");
|
||||
assertEquals("Get right replName", "jdbc", p.getRequiredReplName());
|
||||
assertEquals("Get right scriptBody", "(h2) show databases", p.getScriptBody());
|
||||
|
||||
when(noteInterpreterLoader.get("jdbc")).thenReturn(interpreter);
|
||||
when(interpreterFactory.getInterpreter(anyString(), eq("jdbc"))).thenReturn(interpreter);
|
||||
when(interpreter.getFormType()).thenReturn(Interpreter.FormType.NATIVE);
|
||||
when(note.getId()).thenReturn("noteId");
|
||||
|
||||
try {
|
||||
p.jobRun();
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
|
|||
@Test
|
||||
public void testSaveNotebook() throws IOException, InterruptedException {
|
||||
Note note = notebook.createNote(null);
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
Paragraph p1 = note.addParagraph();
|
||||
Map<String, Object> config = p1.getConfig();
|
||||
|
|
|
|||
|
|
@ -25,9 +25,8 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.NoteInterpreterLoader;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
import org.junit.After;
|
||||
|
|
@ -36,21 +35,20 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
public class LuceneSearchTest {
|
||||
|
||||
private static NoteInterpreterLoader replLoaderMock;
|
||||
private static NotebookRepo notebookRepoMock;
|
||||
private static InterpreterFactory interpreterFactory;
|
||||
private SearchService notebookIndex;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeStartUp() {
|
||||
notebookRepoMock = mock(NotebookRepo.class);
|
||||
replLoaderMock = mock(NoteInterpreterLoader.class);
|
||||
interpreterFactory = mock(InterpreterFactory.class);
|
||||
|
||||
when(replLoaderMock.getInterpreterSettings())
|
||||
.thenReturn(ImmutableList.<InterpreterSetting>of());
|
||||
// when(replLoaderMock.getInterpreterSettings())
|
||||
// .thenReturn(ImmutableList.<InterpreterSetting>of());
|
||||
}
|
||||
|
||||
@Before
|
||||
|
|
@ -286,7 +284,7 @@ public class LuceneSearchTest {
|
|||
}
|
||||
|
||||
private Note newNote(String name) {
|
||||
Note note = new Note(notebookRepoMock, replLoaderMock, null, notebookIndex, null, null);
|
||||
Note note = new Note(notebookRepoMock, interpreterFactory, null, notebookIndex, null, null);
|
||||
note.setName(name);
|
||||
return note;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue