mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
interpreter session aware interpreter factory
This commit is contained in:
parent
ed1ab0d213
commit
fc9cb3f4b5
21 changed files with 374 additions and 200 deletions
|
|
@ -299,18 +299,16 @@ public class DepInterpreter extends Interpreter {
|
|||
if (intpGroup == null) {
|
||||
return null;
|
||||
}
|
||||
synchronized (intpGroup) {
|
||||
for (Interpreter intp : intpGroup){
|
||||
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (SparkInterpreter) p;
|
||||
}
|
||||
}
|
||||
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
if (p == null) {
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (SparkInterpreter) p;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -494,23 +494,18 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
|
||||
|
||||
private SparkInterpreter getSparkInterpreter() {
|
||||
InterpreterGroup intpGroup = getInterpreterGroup();
|
||||
LazyOpenInterpreter lazy = null;
|
||||
SparkInterpreter spark = null;
|
||||
synchronized (intpGroup) {
|
||||
for (Interpreter intp : getInterpreterGroup()){
|
||||
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (SparkInterpreter) p;
|
||||
}
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (SparkInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
|
|
@ -554,20 +549,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
}
|
||||
|
||||
private DepInterpreter getDepInterpreter() {
|
||||
InterpreterGroup intpGroup = getInterpreterGroup();
|
||||
if (intpGroup == null) return null;
|
||||
synchronized (intpGroup) {
|
||||
for (Interpreter intp : intpGroup) {
|
||||
if (intp.getClassName().equals(DepInterpreter.class.getName())) {
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (DepInterpreter) p;
|
||||
}
|
||||
}
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
|
||||
if (p == null) {
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (DepInterpreter) p;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -111,9 +111,9 @@ public class SparkInterpreter extends Interpreter {
|
|||
private ZeppelinContext z;
|
||||
private SparkILoop interpreter;
|
||||
private SparkIMain intp;
|
||||
private SparkContext sc;
|
||||
private static SparkContext sc;
|
||||
private SparkOutputStream out;
|
||||
private SQLContext sqlc;
|
||||
private static SQLContext sqlc;
|
||||
private SparkDependencyResolver dep;
|
||||
private SparkJLineCompletion completor;
|
||||
|
||||
|
|
@ -230,20 +230,15 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
private DepInterpreter getDepInterpreter() {
|
||||
InterpreterGroup intpGroup = getInterpreterGroup();
|
||||
if (intpGroup == null) return null;
|
||||
synchronized (intpGroup) {
|
||||
for (Interpreter intp : intpGroup) {
|
||||
if (intp.getClassName().equals(DepInterpreter.class.getName())) {
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (DepInterpreter) p;
|
||||
}
|
||||
}
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
|
||||
if (p == null) {
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (DepInterpreter) p;
|
||||
}
|
||||
|
||||
public SparkContext createSparkContext() {
|
||||
|
|
|
|||
|
|
@ -79,23 +79,18 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
private SparkInterpreter getSparkInterpreter() {
|
||||
InterpreterGroup intpGroup = getInterpreterGroup();
|
||||
LazyOpenInterpreter lazy = null;
|
||||
SparkInterpreter spark = null;
|
||||
synchronized (intpGroup) {
|
||||
for (Interpreter intp : getInterpreterGroup()){
|
||||
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (SparkInterpreter) p;
|
||||
}
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (SparkInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
|
|
@ -179,15 +174,14 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
// It's because of scheduler is not created yet, and scheduler is created by this function.
|
||||
// Therefore, we can still use getSparkInterpreter() here, but it's better and safe
|
||||
// to getSparkInterpreter without opening it.
|
||||
for (Interpreter intp : getInterpreterGroup()) {
|
||||
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
|
||||
Interpreter p = intp;
|
||||
return p.getScheduler();
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
||||
Interpreter intp =
|
||||
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
if (intp != null) {
|
||||
return intp.getScheduler();
|
||||
} else {
|
||||
throw new InterpreterException("Can't find SparkInterpreter");
|
||||
}
|
||||
throw new InterpreterException("Can't find SparkInterpreter");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ trait AbstractAngularElemTest
|
|||
val context = new InterpreterContext("note", "paragraph", "title", "text",
|
||||
new util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry(
|
||||
intpGroup.getId(), null),
|
||||
null,
|
||||
new util.LinkedList[InterpreterContextRunner](),
|
||||
new InterpreterOutput(new InterpreterOutputListener() {
|
||||
override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers {
|
|||
val context = new InterpreterContext("note", "id", "title", "text",
|
||||
new java.util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry(
|
||||
intpGroup.getId(), null),
|
||||
null,
|
||||
new java.util.LinkedList[InterpreterContextRunner](),
|
||||
new InterpreterOutput(new InterpreterOutputListener() {
|
||||
override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {
|
||||
|
|
|
|||
|
|
@ -193,6 +193,33 @@ public abstract class Interpreter {
|
|||
this.classloaderUrls = classloaderUrls;
|
||||
}
|
||||
|
||||
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
|
||||
synchronized (interpreterGroup) {
|
||||
for (List<Interpreter> interpreters : interpreterGroup.values()) {
|
||||
boolean belongsToSameNoteGroup = false;
|
||||
Interpreter interpreterFound = null;
|
||||
for (Interpreter intp : interpreters) {
|
||||
if (intp.getClassName().equals(className)) {
|
||||
interpreterFound = intp;
|
||||
}
|
||||
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
if (this == p) {
|
||||
belongsToSameNoteGroup = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (belongsToSameNoteGroup) {
|
||||
return interpreterFound;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Type of interpreter.
|
||||
|
|
|
|||
|
|
@ -152,6 +152,9 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
}
|
||||
|
||||
private void close(Collection<Interpreter> intpToClose) {
|
||||
if (intpToClose == null) {
|
||||
return;
|
||||
}
|
||||
List<Thread> closeThreads = new LinkedList<Thread>();
|
||||
|
||||
for (final Interpreter intp : intpToClose) {
|
||||
|
|
@ -205,6 +208,10 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
}
|
||||
|
||||
private void destroy(Collection<Interpreter> intpToDestroy) {
|
||||
if (intpToDestroy == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<Thread> destroyThreads = new LinkedList<Thread>();
|
||||
|
||||
for (final Interpreter intp : intpToDestroy) {
|
||||
|
|
|
|||
|
|
@ -21,12 +21,7 @@ import java.util.*;
|
|||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
|
||||
|
|
@ -132,6 +127,10 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
|
||||
interpreterProcess.reference(getInterpreterGroup());
|
||||
interpreterProcess.setMaxPoolSize(
|
||||
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
|
||||
|
||||
synchronized (interpreterProcess) {
|
||||
Client client = null;
|
||||
try {
|
||||
|
|
@ -160,18 +159,17 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void open() {
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
|
||||
InterpreterGroup interpreterGroup = getInterpreterGroup();
|
||||
int rc = interpreterProcess.reference(getInterpreterGroup());
|
||||
interpreterProcess.setMaxPoolSize(
|
||||
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
// initialize all interpreters in this interpreter group
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
for (Interpreter intp : interpreters) {
|
||||
((RemoteInterpreter) intp).init();
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
((RemoteInterpreter) p).init();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ public class NotebookRestApi {
|
|||
setting.id(),
|
||||
setting.getName(),
|
||||
setting.getGroup(),
|
||||
setting.getInterpreterGroup(),
|
||||
setting.getInterpreterInfos(),
|
||||
true)
|
||||
);
|
||||
}
|
||||
|
|
@ -122,7 +122,7 @@ public class NotebookRestApi {
|
|||
setting.id(),
|
||||
setting.getName(),
|
||||
setting.getGroup(),
|
||||
setting.getInterpreterGroup(),
|
||||
setting.getInterpreterInfos(),
|
||||
false)
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.rest.message;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
|
||||
/**
|
||||
* InterpreterSetting information for binding
|
||||
|
|
@ -29,10 +30,12 @@ public class InterpreterSettingListForNoteBind {
|
|||
String name;
|
||||
String group;
|
||||
private boolean selected;
|
||||
private List<Interpreter> interpreters;
|
||||
private List<InterpreterSetting.InterpreterInfo> interpreters;
|
||||
|
||||
public InterpreterSettingListForNoteBind(String id, String name,
|
||||
String group, List<Interpreter> interpreters, boolean selected) {
|
||||
String group,
|
||||
List<InterpreterSetting.InterpreterInfo> interpreters,
|
||||
boolean selected) {
|
||||
super();
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
|
|
@ -65,11 +68,11 @@ public class InterpreterSettingListForNoteBind {
|
|||
this.group = group;
|
||||
}
|
||||
|
||||
public List<Interpreter> getInterpreterNames() {
|
||||
public List<InterpreterSetting.InterpreterInfo> getInterpreterNames() {
|
||||
return interpreters;
|
||||
}
|
||||
|
||||
public void setInterpreterNames(List<Interpreter> interpreters) {
|
||||
public void setInterpreterNames(List<InterpreterSetting.InterpreterInfo> interpreters) {
|
||||
this.interpreters = interpreters;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,10 +23,11 @@ import javax.ws.rs.core.NewCookie;
|
|||
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSerializer;
|
||||
import org.apache.zeppelin.interpreter.InterpreterInfoSerializer;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
|
||||
/**
|
||||
* Json response builder.
|
||||
|
|
@ -98,8 +99,9 @@ public class JsonResponse<T> {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
GsonBuilder gsonBuilder = new GsonBuilder()
|
||||
.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
|
||||
GsonBuilder gsonBuilder = new GsonBuilder().registerTypeAdapter(
|
||||
InterpreterSetting.InterpreterInfo.class,
|
||||
new InterpreterInfoSerializer());
|
||||
if (pretty) {
|
||||
gsonBuilder.setPrettyPrinting();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
|||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.display.Input;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
|
|
@ -37,7 +36,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
|||
import org.apache.zeppelin.notebook.*;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
import org.apache.zeppelin.server.ZeppelinServer;
|
||||
import org.apache.zeppelin.socket.Message.OP;
|
||||
import org.apache.zeppelin.ticket.TicketContainer;
|
||||
|
|
|
|||
|
|
@ -90,7 +90,8 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
|
|||
// Call Create Setting REST API
|
||||
String jsonRequest = "{\"name\":\"md2\",\"group\":\"md\",\"properties\":{\"propname\":\"propvalue\"}," +
|
||||
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
|
||||
"\"dependencies\":[]}";
|
||||
"\"dependencies\":[]," +
|
||||
"\"option\": { \"remote\": true, \"perNoteSession\": false }}";
|
||||
PostMethod post = httpPost("/interpreter/setting/", jsonRequest);
|
||||
LOG.info("testSettingCRUD create response\n" + post.getResponseBodyAsString());
|
||||
assertThat("test create method:", post, isCreated());
|
||||
|
|
@ -105,7 +106,8 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
|
|||
// Call Update Setting REST API
|
||||
jsonRequest = "{\"name\":\"md2\",\"group\":\"md\",\"properties\":{\"propname\":\"Otherpropvalue\"}," +
|
||||
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
|
||||
"\"dependencies\":[]}";
|
||||
"\"dependencies\":[]," +
|
||||
"\"option\": { \"remote\": true, \"perNoteSession\": false }}";
|
||||
PutMethod put = httpPut("/interpreter/setting/" + newSettingId, jsonRequest);
|
||||
LOG.info("testSettingCRUD update response\n" + put.getResponseBodyAsString());
|
||||
assertThat("test update method:", put, isAllowed());
|
||||
|
|
|
|||
|
|
@ -51,7 +51,6 @@ import java.util.*;
|
|||
|
||||
/**
|
||||
* Manage interpreters.
|
||||
*
|
||||
*/
|
||||
public class InterpreterFactory {
|
||||
Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
|
||||
|
|
@ -103,7 +102,8 @@ public class InterpreterFactory {
|
|||
|
||||
GsonBuilder builder = new GsonBuilder();
|
||||
builder.setPrettyPrinting();
|
||||
builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
|
||||
builder.registerTypeAdapter(
|
||||
InterpreterSetting.InterpreterInfo.class, new InterpreterInfoSerializer());
|
||||
gson = builder.create();
|
||||
|
||||
init();
|
||||
|
|
@ -197,16 +197,14 @@ public class InterpreterFactory {
|
|||
InterpreterSetting setting = interpreterSettings.get(settingId);
|
||||
logger.info("Interpreter setting group {} : id={}, name={}",
|
||||
setting.getGroup(), settingId, setting.getName());
|
||||
for (Interpreter interpreter : setting.getInterpreterGroup()) {
|
||||
logger.info(" className = {}", interpreter.getClassName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void loadFromFile() throws IOException {
|
||||
GsonBuilder builder = new GsonBuilder();
|
||||
builder.setPrettyPrinting();
|
||||
builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
|
||||
builder.registerTypeAdapter(
|
||||
InterpreterSetting.InterpreterInfo.class, new InterpreterInfoSerializer());
|
||||
Gson gson = builder.create();
|
||||
|
||||
File settingFile = new File(conf.getInterpreterSettingPath());
|
||||
|
|
@ -241,14 +239,12 @@ public class InterpreterFactory {
|
|||
setting.id(),
|
||||
setting.getName(),
|
||||
setting.getGroup(),
|
||||
setting.getInterpreterInfos(),
|
||||
setting.getProperties(),
|
||||
setting.getDependencies(),
|
||||
setting.getOption());
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(
|
||||
setting.id(),
|
||||
setting.getGroup(),
|
||||
setting.getOption(),
|
||||
setting.getProperties());
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(setting.id(), setting.getOption());
|
||||
intpSetting.setInterpreterGroup(interpreterGroup);
|
||||
|
||||
interpreterSettings.put(k, intpSetting);
|
||||
|
|
@ -380,9 +376,27 @@ public class InterpreterFactory {
|
|||
throws InterpreterException, IOException, RepositoryException {
|
||||
synchronized (interpreterSettings) {
|
||||
|
||||
List<InterpreterSetting.InterpreterInfo> interpreterInfos =
|
||||
new LinkedList<InterpreterSetting.InterpreterInfo>();
|
||||
|
||||
for (RegisteredInterpreter registeredInterpreter :
|
||||
Interpreter.registeredInterpreters.values()) {
|
||||
if (registeredInterpreter.getGroup().equals(groupName)) {
|
||||
for (String className : interpreterClassList) {
|
||||
if (registeredInterpreter.getClassName().equals(className)) {
|
||||
interpreterInfos.add(
|
||||
new InterpreterSetting.InterpreterInfo(
|
||||
className, registeredInterpreter.getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
InterpreterSetting intpSetting = new InterpreterSetting(
|
||||
name,
|
||||
groupName,
|
||||
interpreterInfos,
|
||||
properties,
|
||||
dependencies,
|
||||
option);
|
||||
|
||||
|
|
@ -390,8 +404,7 @@ public class InterpreterFactory {
|
|||
loadInterpreterDependencies(intpSetting);
|
||||
}
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(
|
||||
intpSetting.id(), groupName, option, properties);
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(intpSetting.id(), option);
|
||||
|
||||
intpSetting.setInterpreterGroup(interpreterGroup);
|
||||
|
||||
|
|
@ -401,18 +414,12 @@ public class InterpreterFactory {
|
|||
}
|
||||
}
|
||||
|
||||
private InterpreterGroup createInterpreterGroup(String id,
|
||||
String groupName,
|
||||
InterpreterOption option,
|
||||
Properties properties)
|
||||
private InterpreterGroup createInterpreterGroup(String id, InterpreterOption option)
|
||||
throws InterpreterException, NullArgumentException {
|
||||
|
||||
//When called from REST API without option we receive NPE
|
||||
if (option == null)
|
||||
throw new NullArgumentException("option");
|
||||
//When called from REST API without option we receive NPE
|
||||
if (properties == null)
|
||||
throw new NullArgumentException("properties");
|
||||
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
|
||||
|
|
@ -430,19 +437,38 @@ public class InterpreterFactory {
|
|||
}
|
||||
|
||||
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
|
||||
return interpreterGroup;
|
||||
}
|
||||
|
||||
public void removeInterpretersForNote(InterpreterSetting interpreterSetting,
|
||||
String noteId) {
|
||||
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup();
|
||||
interpreterGroup.close(noteId);
|
||||
interpreterGroup.destroy(noteId);
|
||||
synchronized (interpreterGroup) {
|
||||
interpreterGroup.remove(noteId);
|
||||
}
|
||||
}
|
||||
|
||||
public void createInterpretersForNote(
|
||||
InterpreterSetting interpreterSetting,
|
||||
String noteId) {
|
||||
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup();
|
||||
String groupName = interpreterSetting.getGroup();
|
||||
InterpreterOption option = interpreterSetting.getOption();
|
||||
Properties properties = interpreterSetting.getProperties();
|
||||
|
||||
for (String className : interpreterClassList) {
|
||||
Set<String> keys = Interpreter.registeredInterpreters.keySet();
|
||||
for (String intName : keys) {
|
||||
RegisteredInterpreter info = Interpreter.registeredInterpreters
|
||||
.get(intName);
|
||||
RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
|
||||
if (info.getClassName().equals(className)
|
||||
&& info.getGroup().equals(groupName)) {
|
||||
Interpreter intp;
|
||||
|
||||
if (option.isRemote()) {
|
||||
intp = createRemoteRepl(info.getPath(),
|
||||
noteId,
|
||||
info.getClassName(),
|
||||
properties,
|
||||
interpreterGroup.id);
|
||||
|
|
@ -451,15 +477,25 @@ public class InterpreterFactory {
|
|||
info.getClassName(),
|
||||
properties);
|
||||
}
|
||||
interpreterGroup.add(intp);
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
if (interpreters == null) {
|
||||
interpreters = new LinkedList<Interpreter>();
|
||||
interpreterGroup.put(noteId, interpreters);
|
||||
}
|
||||
interpreters.add(intp);
|
||||
}
|
||||
|
||||
intp.setInterpreterGroup(interpreterGroup);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return interpreterGroup;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void remove(String id) throws IOException {
|
||||
synchronized (interpreterSettings) {
|
||||
if (interpreterSettings.containsKey(id)) {
|
||||
|
|
@ -486,7 +522,7 @@ public class InterpreterFactory {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get loaded interpreters
|
||||
* Get interpreter settings
|
||||
* @return
|
||||
*/
|
||||
public List<InterpreterSetting> get() {
|
||||
|
|
@ -508,8 +544,8 @@ public class InterpreterFactory {
|
|||
continue;
|
||||
}
|
||||
}
|
||||
for (InterpreterSetting.InterpreterInfo intp : setting.getInterpreterInfos()) {
|
||||
|
||||
for (Interpreter intp : setting.getInterpreterGroup()) {
|
||||
if (className.equals(intp.getClassName())) {
|
||||
boolean alreadyAdded = false;
|
||||
for (InterpreterSetting st : orderedSettings) {
|
||||
|
|
@ -536,15 +572,33 @@ public class InterpreterFactory {
|
|||
|
||||
public void putNoteInterpreterSettingBinding(String noteId,
|
||||
List<String> settingList) throws IOException {
|
||||
List<String> unBindedSettings = new LinkedList<String>();
|
||||
|
||||
synchronized (interpreterSettings) {
|
||||
List<String> oldSettings = interpreterBindings.get(noteId);
|
||||
if (oldSettings != null) {
|
||||
for (String oldSettingId : oldSettings) {
|
||||
if (!settingList.contains(oldSettingId)) {
|
||||
unBindedSettings.add(oldSettingId);
|
||||
}
|
||||
}
|
||||
}
|
||||
interpreterBindings.put(noteId, settingList);
|
||||
saveToFile();
|
||||
|
||||
for (String settingId : unBindedSettings) {
|
||||
InterpreterSetting setting = get(settingId);
|
||||
removeInterpretersForNote(setting, noteId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void removeNoteInterpreterSettingBinding(String noteId) {
|
||||
synchronized (interpreterSettings) {
|
||||
interpreterBindings.remove(noteId);
|
||||
List<String> settingIds = interpreterBindings.remove(noteId);
|
||||
for (String settingId : settingIds) {
|
||||
this.removeInterpretersForNote(get(settingId), noteId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -582,9 +636,7 @@ public class InterpreterFactory {
|
|||
intpsetting.setOption(option);
|
||||
intpsetting.setDependencies(dependencies);
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(
|
||||
intpsetting.id(),
|
||||
intpsetting.getGroup(), option, properties);
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(intpsetting.id(), option);
|
||||
intpsetting.setInterpreterGroup(interpreterGroup);
|
||||
|
||||
loadInterpreterDependencies(intpsetting);
|
||||
|
|
@ -608,7 +660,7 @@ public class InterpreterFactory {
|
|||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(
|
||||
intpsetting.id(),
|
||||
intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties());
|
||||
intpsetting.getOption());
|
||||
intpsetting.setInterpreterGroup(interpreterGroup);
|
||||
} else {
|
||||
throw new InterpreterException("Interpreter setting id " + id
|
||||
|
|
@ -619,16 +671,18 @@ public class InterpreterFactory {
|
|||
|
||||
private void stopJobAllInterpreter(InterpreterSetting intpsetting) {
|
||||
if (intpsetting != null) {
|
||||
for (Interpreter intp : intpsetting.getInterpreterGroup()) {
|
||||
for (Job job : intp.getScheduler().getJobsRunning()) {
|
||||
job.abort();
|
||||
job.setStatus(Status.ABORT);
|
||||
logger.info("Job " + job.getJobName() + " aborted ");
|
||||
}
|
||||
for (Job job : intp.getScheduler().getJobsWaiting()) {
|
||||
job.abort();
|
||||
job.setStatus(Status.ABORT);
|
||||
logger.info("Job " + job.getJobName() + " aborted ");
|
||||
for (List<Interpreter> interpreters : intpsetting.getInterpreterGroup().values()) {
|
||||
for (Interpreter intp : interpreters) {
|
||||
for (Job job : intp.getScheduler().getJobsRunning()) {
|
||||
job.abort();
|
||||
job.setStatus(Status.ABORT);
|
||||
logger.info("Job " + job.getJobName() + " aborted ");
|
||||
}
|
||||
for (Job job : intp.getScheduler().getJobsWaiting()) {
|
||||
job.abort();
|
||||
job.setStatus(Status.ABORT);
|
||||
logger.info("Job " + job.getJobName() + " aborted ");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -720,13 +774,13 @@ public class InterpreterFactory {
|
|||
}
|
||||
|
||||
|
||||
private Interpreter createRemoteRepl(String interpreterPath, String className,
|
||||
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
|
||||
Properties property, String interpreterId) {
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterId;
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
|
||||
property, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, localRepoPath, connectTimeout,
|
||||
maxPoolSize, remoteInterpreterProcessListener));
|
||||
return intp;
|
||||
|
|
|
|||
|
|
@ -29,28 +29,30 @@ import com.google.gson.JsonSerializer;
|
|||
|
||||
|
||||
/**
|
||||
* Interpreter class serializer for gson
|
||||
* InterpreterInfo class serializer for gson
|
||||
*
|
||||
*/
|
||||
public class InterpreterSerializer implements JsonSerializer<Interpreter>,
|
||||
JsonDeserializer<Interpreter> {
|
||||
public class InterpreterInfoSerializer
|
||||
implements JsonSerializer<InterpreterSetting.InterpreterInfo>,
|
||||
JsonDeserializer<InterpreterSetting.InterpreterInfo> {
|
||||
|
||||
@Override
|
||||
public JsonElement serialize(Interpreter interpreter, Type type,
|
||||
public JsonElement serialize(InterpreterSetting.InterpreterInfo interpreterInfo, Type type,
|
||||
JsonSerializationContext context) {
|
||||
JsonObject json = new JsonObject();
|
||||
json.addProperty("class", interpreter.getClassName());
|
||||
json.addProperty(
|
||||
"name",
|
||||
Interpreter.findRegisteredInterpreterByClassName(
|
||||
interpreter.getClassName()).getName());
|
||||
json.addProperty("class", interpreterInfo.getClassName());
|
||||
json.addProperty("name", interpreterInfo.getName());
|
||||
return json;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Interpreter deserialize(JsonElement json, Type typeOfT,
|
||||
public InterpreterSetting.InterpreterInfo deserialize(JsonElement json, Type typeOfT,
|
||||
JsonDeserializationContext context) throws JsonParseException {
|
||||
return null;
|
||||
JsonObject jsonObject = json.getAsJsonObject();
|
||||
String className = jsonObject.get("class").getAsString();
|
||||
String name = jsonObject.get("name").getAsString();
|
||||
|
||||
return new InterpreterSetting.InterpreterInfo(className, name);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -34,27 +34,59 @@ public class InterpreterSetting {
|
|||
private String group;
|
||||
private String description;
|
||||
private Properties properties;
|
||||
private InterpreterGroup interpreterGroup;
|
||||
|
||||
// use 'interpreterGroup' as a field name to keep backward compativility of
|
||||
// conf/interpreter.json file format
|
||||
private List<InterpreterInfo> interpreterGroup;
|
||||
private transient InterpreterGroup interpreterGroupRef;
|
||||
private List<Dependency> dependencies;
|
||||
private InterpreterOption option;
|
||||
|
||||
public InterpreterSetting(String id,
|
||||
String name,
|
||||
String group,
|
||||
List<InterpreterInfo> interpreterInfos,
|
||||
Properties properties,
|
||||
List<Dependency> dependencies,
|
||||
InterpreterOption option) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
this.group = group;
|
||||
this.interpreterGroup = interpreterInfos;
|
||||
this.properties = properties;
|
||||
this.dependencies = dependencies;
|
||||
this.option = option;
|
||||
}
|
||||
|
||||
public InterpreterSetting(String name,
|
||||
String group,
|
||||
List<InterpreterInfo> interpreterInfos,
|
||||
Properties properties,
|
||||
List<Dependency> dependencies,
|
||||
InterpreterOption option) {
|
||||
this(generateId(), name, group, dependencies, option);
|
||||
this(generateId(), name, group, interpreterInfos, properties, dependencies, option);
|
||||
}
|
||||
|
||||
/**
|
||||
* Information of interpreters in this interpreter setting.
|
||||
* this will be serialized for conf/interpreter.json and REST api response.
|
||||
*/
|
||||
public static class InterpreterInfo {
|
||||
private final String name;
|
||||
private final String className;
|
||||
|
||||
public InterpreterInfo(String className, String name) {
|
||||
this.className = className;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public String getClassName() {
|
||||
return className;
|
||||
}
|
||||
}
|
||||
|
||||
public String id() {
|
||||
|
|
@ -86,12 +118,11 @@ public class InterpreterSetting {
|
|||
}
|
||||
|
||||
public InterpreterGroup getInterpreterGroup() {
|
||||
return interpreterGroup;
|
||||
return interpreterGroupRef;
|
||||
}
|
||||
|
||||
public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
|
||||
this.interpreterGroup = interpreterGroup;
|
||||
this.properties = interpreterGroup.getProperty();
|
||||
this.interpreterGroupRef = interpreterGroup;
|
||||
}
|
||||
|
||||
public Properties getProperties() {
|
||||
|
|
@ -120,4 +151,8 @@ public class InterpreterSetting {
|
|||
public void setOption(InterpreterOption option) {
|
||||
this.option = option;
|
||||
}
|
||||
|
||||
public List<InterpreterInfo> getInterpreterInfos() {
|
||||
return interpreterGroup;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,10 +29,11 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
|
|||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
|
||||
/**
|
||||
* Repl loader per note.
|
||||
* Interpreter loader per note.
|
||||
*/
|
||||
public class NoteInterpreterLoader {
|
||||
private transient InterpreterFactory factory;
|
||||
private static String SHARED_SESSION = "shared_session";
|
||||
String noteId;
|
||||
|
||||
public NoteInterpreterLoader(InterpreterFactory factory) {
|
||||
|
|
@ -73,6 +74,37 @@ public class NoteInterpreterLoader {
|
|||
return settings;
|
||||
}
|
||||
|
||||
private String getInterpreterGroupKey(InterpreterSetting setting) {
|
||||
if (setting.getOption().isPerNoteSession()) {
|
||||
return SHARED_SESSION;
|
||||
} else {
|
||||
return noteId;
|
||||
}
|
||||
}
|
||||
|
||||
private List<Interpreter> createOrGetInterpreterList(InterpreterSetting setting) {
|
||||
InterpreterGroup interpreterGroup = setting.getInterpreterGroup();
|
||||
synchronized (interpreterGroup) {
|
||||
String key = getInterpreterGroupKey(setting);
|
||||
if (!interpreterGroup.containsKey(key)) {
|
||||
factory.createInterpretersForNote(setting, key);
|
||||
}
|
||||
return interpreterGroup.get(getInterpreterGroupKey(setting));
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
// close interpreters in this note session
|
||||
List<InterpreterSetting> settings = this.getInterpreterSettings();
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
factory.removeInterpretersForNote(setting, noteId);
|
||||
}
|
||||
}
|
||||
|
||||
public Interpreter get(String replName) {
|
||||
List<InterpreterSetting> settings = getInterpreterSettings();
|
||||
|
||||
|
|
@ -81,7 +113,9 @@ public class NoteInterpreterLoader {
|
|||
}
|
||||
|
||||
if (replName == null || replName.trim().length() == 0) {
|
||||
return settings.get(0).getInterpreterGroup().getFirst();
|
||||
// get default settings (first available)
|
||||
InterpreterSetting defaultSettings = settings.get(0);
|
||||
return createOrGetInterpreterList(defaultSettings).get(0);
|
||||
}
|
||||
|
||||
if (Interpreter.registeredInterpreters == null) {
|
||||
|
|
@ -104,43 +138,47 @@ public class NoteInterpreterLoader {
|
|||
String interpreterClassName = registeredInterpreter.getClassName();
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup();
|
||||
for (Interpreter interpreter : intpGroup) {
|
||||
if (interpreterClassName.equals(interpreter.getClassName())) {
|
||||
return interpreter;
|
||||
if (registeredInterpreter.getGroup().equals(setting.getGroup())) {
|
||||
List<Interpreter> intpGroup = createOrGetInterpreterList(setting);
|
||||
for (Interpreter interpreter : intpGroup) {
|
||||
if (interpreterClassName.equals(interpreter.getClassName())) {
|
||||
return interpreter;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new InterpreterException(replName + " interpreter not found");
|
||||
} else {
|
||||
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
|
||||
// search 'name' from first (default) interpreter group
|
||||
InterpreterGroup intpGroup = settings.get(0).getInterpreterGroup();
|
||||
for (Interpreter interpreter : intpGroup) {
|
||||
RegisteredInterpreter intp = Interpreter
|
||||
.findRegisteredInterpreterByClassName(interpreter.getClassName());
|
||||
if (intp == null) {
|
||||
continue;
|
||||
InterpreterSetting defaultSetting = settings.get(0);
|
||||
Interpreter.RegisteredInterpreter registeredInterpreter =
|
||||
Interpreter.registeredInterpreters.get(defaultSetting.getGroup() + "." + replName);
|
||||
if (registeredInterpreter != null) {
|
||||
List<Interpreter> interpreters = createOrGetInterpreterList(defaultSetting);
|
||||
for (Interpreter interpreter : interpreters) {
|
||||
|
||||
RegisteredInterpreter intp =
|
||||
Interpreter.findRegisteredInterpreterByClassName(interpreter.getClassName());
|
||||
if (intp == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (intp.getName().equals(replName)) {
|
||||
return interpreter;
|
||||
}
|
||||
}
|
||||
|
||||
if (intp.getName().equals(replName)) {
|
||||
return interpreter;
|
||||
}
|
||||
throw new InterpreterException(
|
||||
defaultSetting.getGroup() + "." + replName + " interpreter not found");
|
||||
}
|
||||
|
||||
|
||||
// next, assume replName is 'group' of interpreter ('name' is ommitted)
|
||||
// search interpreter group and return first interpreter.
|
||||
for (InterpreterSetting setting : settings) {
|
||||
intpGroup = setting.getInterpreterGroup();
|
||||
Interpreter interpreter = intpGroup.get(0);
|
||||
RegisteredInterpreter intp = Interpreter
|
||||
.findRegisteredInterpreterByClassName(interpreter.getClassName());
|
||||
if (intp == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (intp.getGroup().equals(replName)) {
|
||||
return interpreter;
|
||||
if (setting.getGroup().equals(replName)) {
|
||||
List<Interpreter> interpreters = createOrGetInterpreterList(setting);
|
||||
return interpreters.get(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -278,6 +278,7 @@ public class Notebook {
|
|||
synchronized (notes) {
|
||||
note = notes.remove(id);
|
||||
}
|
||||
replFactory.removeNoteInterpreterSettingBinding(id);
|
||||
notebookIndex.deleteIndexDocs(note);
|
||||
|
||||
// remove from all interpreter instance's angular object registry
|
||||
|
|
@ -376,7 +377,7 @@ public class Notebook {
|
|||
//
|
||||
// therefore instead of addAndNotifyRemoteProcess(), need to use add()
|
||||
// that results add angularObject only in ZeppelinServer side not remoteProcessSide
|
||||
registry.add(name, snapshot.getAngularObject().get(), noteId, paragraphId);
|
||||
//registry.add(name, snapshot.getAngularObject().get(), noteId, paragraphId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,8 +22,7 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.*;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
|
@ -87,9 +86,12 @@ public class InterpreterFactoryTest {
|
|||
@Test
|
||||
public void testBasic() {
|
||||
List<String> all = factory.getDefaultInterpreterSettingList();
|
||||
InterpreterSetting setting = factory.get(all.get(0));
|
||||
InterpreterGroup interpreterGroup = setting.getInterpreterGroup();
|
||||
factory.createInterpretersForNote(setting, "session");
|
||||
|
||||
// get interpreter
|
||||
Interpreter repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
|
||||
Interpreter repl1 = interpreterGroup.get("session").get(0);
|
||||
assertFalse(((LazyOpenInterpreter) repl1).isOpen());
|
||||
repl1.interpret("repl1", context);
|
||||
assertTrue(((LazyOpenInterpreter) repl1).isOpen());
|
||||
|
|
@ -99,23 +101,14 @@ public class InterpreterFactoryTest {
|
|||
|
||||
// restart interpreter
|
||||
factory.restart(all.get(0));
|
||||
repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
|
||||
assertFalse(((LazyOpenInterpreter) repl1).isOpen());
|
||||
assertNull(setting.getInterpreterGroup().get("session"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFactoryDefaultList() throws IOException, RepositoryException {
|
||||
// get default list from default setting
|
||||
// get default settings
|
||||
List<String> all = factory.getDefaultInterpreterSettingList();
|
||||
assertEquals(2, all.size());
|
||||
assertEquals(factory.get(all.get(0)).getInterpreterGroup().getFirst().getClassName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
|
||||
|
||||
// add setting
|
||||
factory.add("a mock", "mock2", new LinkedList<Dependency>(), new InterpreterOption(false), new Properties());
|
||||
all = factory.getDefaultInterpreterSettingList();
|
||||
assertEquals(2, all.size());
|
||||
assertEquals("mock1", factory.get(all.get(0)).getName());
|
||||
assertEquals("a mock", factory.get(all.get(1)).getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -16,8 +16,6 @@
|
|||
*/
|
||||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
|
@ -36,6 +34,8 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class NoteInterpreterLoaderTest {
|
||||
|
||||
private File tmpDir;
|
||||
|
|
@ -92,6 +92,41 @@ public class NoteInterpreterLoaderTest {
|
|||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get("group1.mock1").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", loader.get("group1.mock11").getClassName());
|
||||
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", loader.get("group2.mock2").getClassName());
|
||||
|
||||
loader.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoteSession() throws IOException {
|
||||
NoteInterpreterLoader loaderA = new NoteInterpreterLoader(factory);
|
||||
loaderA.setNoteId("noteA");
|
||||
loaderA.setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
|
||||
NoteInterpreterLoader loaderB = new NoteInterpreterLoader(factory);
|
||||
loaderB.setNoteId("noteB");
|
||||
loaderB.setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
|
||||
// interpreters are not created before accessing it
|
||||
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA"));
|
||||
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB"));
|
||||
|
||||
// per note session interpreter instance in the same interpreter process
|
||||
assertTrue(
|
||||
loaderA.get(null).getInterpreterGroup().getRemoteInterpreterProcess() ==
|
||||
loaderB.get(null).getInterpreterGroup().getRemoteInterpreterProcess());
|
||||
|
||||
// interpreters are created after accessing it
|
||||
assertNotNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA"));
|
||||
assertNotNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB"));
|
||||
|
||||
// when
|
||||
loaderA.close();
|
||||
loaderB.close();
|
||||
|
||||
// interpreters are destroyed after close
|
||||
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA"));
|
||||
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB"));
|
||||
|
||||
}
|
||||
|
||||
private void delete(File file){
|
||||
|
|
|
|||
Loading…
Reference in a new issue