interpreter session aware interpreter factory

This commit is contained in:
Lee moon soo 2016-02-04 01:39:48 +09:00
parent ed1ab0d213
commit fc9cb3f4b5
21 changed files with 374 additions and 200 deletions

View file

@ -299,18 +299,16 @@ public class DepInterpreter extends Interpreter {
if (intpGroup == null) {
return null;
}
synchronized (intpGroup) {
for (Interpreter intp : intpGroup){
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (SparkInterpreter) p;
}
}
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
if (p == null) {
return null;
}
return null;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (SparkInterpreter) p;
}
@Override

View file

@ -494,23 +494,18 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
private SparkInterpreter getSparkInterpreter() {
InterpreterGroup intpGroup = getInterpreterGroup();
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
synchronized (intpGroup) {
for (Interpreter intp : getInterpreterGroup()){
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
}
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
@ -554,20 +549,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
private DepInterpreter getDepInterpreter() {
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup == null) return null;
synchronized (intpGroup) {
for (Interpreter intp : intpGroup) {
if (intp.getClassName().equals(DepInterpreter.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (DepInterpreter) p;
}
}
Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
if (p == null) {
return null;
}
return null;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (DepInterpreter) p;
}

View file

@ -111,9 +111,9 @@ public class SparkInterpreter extends Interpreter {
private ZeppelinContext z;
private SparkILoop interpreter;
private SparkIMain intp;
private SparkContext sc;
private static SparkContext sc;
private SparkOutputStream out;
private SQLContext sqlc;
private static SQLContext sqlc;
private SparkDependencyResolver dep;
private SparkJLineCompletion completor;
@ -230,20 +230,15 @@ public class SparkInterpreter extends Interpreter {
}
private DepInterpreter getDepInterpreter() {
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup == null) return null;
synchronized (intpGroup) {
for (Interpreter intp : intpGroup) {
if (intp.getClassName().equals(DepInterpreter.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (DepInterpreter) p;
}
}
Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
if (p == null) {
return null;
}
return null;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (DepInterpreter) p;
}
public SparkContext createSparkContext() {

View file

@ -79,23 +79,18 @@ public class SparkSqlInterpreter extends Interpreter {
}
private SparkInterpreter getSparkInterpreter() {
InterpreterGroup intpGroup = getInterpreterGroup();
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
synchronized (intpGroup) {
for (Interpreter intp : getInterpreterGroup()){
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
}
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
@ -179,15 +174,14 @@ public class SparkSqlInterpreter extends Interpreter {
// It's because of scheduler is not created yet, and scheduler is created by this function.
// Therefore, we can still use getSparkInterpreter() here, but it's better and safe
// to getSparkInterpreter without opening it.
for (Interpreter intp : getInterpreterGroup()) {
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
Interpreter p = intp;
return p.getScheduler();
} else {
continue;
}
Interpreter intp =
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
if (intp != null) {
return intp.getScheduler();
} else {
throw new InterpreterException("Can't find SparkInterpreter");
}
throw new InterpreterException("Can't find SparkInterpreter");
}
}

View file

@ -35,6 +35,7 @@ trait AbstractAngularElemTest
val context = new InterpreterContext("note", "paragraph", "title", "text",
new util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
null,
new util.LinkedList[InterpreterContextRunner](),
new InterpreterOutput(new InterpreterOutputListener() {
override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {

View file

@ -31,6 +31,7 @@ with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers {
val context = new InterpreterContext("note", "id", "title", "text",
new java.util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
null,
new java.util.LinkedList[InterpreterContextRunner](),
new InterpreterOutput(new InterpreterOutputListener() {
override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {

View file

@ -193,6 +193,33 @@ public abstract class Interpreter {
this.classloaderUrls = classloaderUrls;
}
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
synchronized (interpreterGroup) {
for (List<Interpreter> interpreters : interpreterGroup.values()) {
boolean belongsToSameNoteGroup = false;
Interpreter interpreterFound = null;
for (Interpreter intp : interpreters) {
if (intp.getClassName().equals(className)) {
interpreterFound = intp;
}
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
if (this == p) {
belongsToSameNoteGroup = true;
}
}
if (belongsToSameNoteGroup) {
return interpreterFound;
}
}
}
return null;
}
/**
* Type of interpreter.

View file

@ -152,6 +152,9 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
}
private void close(Collection<Interpreter> intpToClose) {
if (intpToClose == null) {
return;
}
List<Thread> closeThreads = new LinkedList<Thread>();
for (final Interpreter intp : intpToClose) {
@ -205,6 +208,10 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
}
private void destroy(Collection<Interpreter> intpToDestroy) {
if (intpToDestroy == null) {
return;
}
List<Thread> destroyThreads = new LinkedList<Thread>();
for (final Interpreter intp : intpToDestroy) {

View file

@ -21,12 +21,7 @@ import java.util.*;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
@ -132,6 +127,10 @@ public class RemoteInterpreter extends Interpreter {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
interpreterProcess.reference(getInterpreterGroup());
interpreterProcess.setMaxPoolSize(
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
synchronized (interpreterProcess) {
Client client = null;
try {
@ -160,18 +159,17 @@ public class RemoteInterpreter extends Interpreter {
@Override
public void open() {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
InterpreterGroup interpreterGroup = getInterpreterGroup();
int rc = interpreterProcess.reference(getInterpreterGroup());
interpreterProcess.setMaxPoolSize(
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
synchronized (interpreterGroup) {
// initialize all interpreters in this interpreter group
List<Interpreter> interpreters = interpreterGroup.get(noteId);
for (Interpreter intp : interpreters) {
((RemoteInterpreter) intp).init();
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
((RemoteInterpreter) p).init();
}
}
}

View file

@ -102,7 +102,7 @@ public class NotebookRestApi {
setting.id(),
setting.getName(),
setting.getGroup(),
setting.getInterpreterGroup(),
setting.getInterpreterInfos(),
true)
);
}
@ -122,7 +122,7 @@ public class NotebookRestApi {
setting.id(),
setting.getName(),
setting.getGroup(),
setting.getInterpreterGroup(),
setting.getInterpreterInfos(),
false)
);
}

View file

@ -20,6 +20,7 @@ package org.apache.zeppelin.rest.message;
import java.util.List;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterSetting;
/**
* InterpreterSetting information for binding
@ -29,10 +30,12 @@ public class InterpreterSettingListForNoteBind {
String name;
String group;
private boolean selected;
private List<Interpreter> interpreters;
private List<InterpreterSetting.InterpreterInfo> interpreters;
public InterpreterSettingListForNoteBind(String id, String name,
String group, List<Interpreter> interpreters, boolean selected) {
String group,
List<InterpreterSetting.InterpreterInfo> interpreters,
boolean selected) {
super();
this.id = id;
this.name = name;
@ -65,11 +68,11 @@ public class InterpreterSettingListForNoteBind {
this.group = group;
}
public List<Interpreter> getInterpreterNames() {
public List<InterpreterSetting.InterpreterInfo> getInterpreterNames() {
return interpreters;
}
public void setInterpreterNames(List<Interpreter> interpreters) {
public void setInterpreterNames(List<InterpreterSetting.InterpreterInfo> interpreters) {
this.interpreters = interpreters;
}

View file

@ -23,10 +23,11 @@ import javax.ws.rs.core.NewCookie;
import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterSerializer;
import org.apache.zeppelin.interpreter.InterpreterInfoSerializer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.zeppelin.interpreter.InterpreterSetting;
/**
* Json response builder.
@ -98,8 +99,9 @@ public class JsonResponse<T> {
@Override
public String toString() {
GsonBuilder gsonBuilder = new GsonBuilder()
.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
GsonBuilder gsonBuilder = new GsonBuilder().registerTypeAdapter(
InterpreterSetting.InterpreterInfo.class,
new InterpreterInfoSerializer());
if (pretty) {
gsonBuilder.setPrettyPrinting();
}

View file

@ -29,7 +29,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
@ -37,7 +36,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.socket.Message.OP;
import org.apache.zeppelin.ticket.TicketContainer;

View file

@ -90,7 +90,8 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
// Call Create Setting REST API
String jsonRequest = "{\"name\":\"md2\",\"group\":\"md\",\"properties\":{\"propname\":\"propvalue\"}," +
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
"\"dependencies\":[]}";
"\"dependencies\":[]," +
"\"option\": { \"remote\": true, \"perNoteSession\": false }}";
PostMethod post = httpPost("/interpreter/setting/", jsonRequest);
LOG.info("testSettingCRUD create response\n" + post.getResponseBodyAsString());
assertThat("test create method:", post, isCreated());
@ -105,7 +106,8 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
// Call Update Setting REST API
jsonRequest = "{\"name\":\"md2\",\"group\":\"md\",\"properties\":{\"propname\":\"Otherpropvalue\"}," +
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
"\"dependencies\":[]}";
"\"dependencies\":[]," +
"\"option\": { \"remote\": true, \"perNoteSession\": false }}";
PutMethod put = httpPut("/interpreter/setting/" + newSettingId, jsonRequest);
LOG.info("testSettingCRUD update response\n" + put.getResponseBodyAsString());
assertThat("test update method:", put, isAllowed());

View file

@ -51,7 +51,6 @@ import java.util.*;
/**
* Manage interpreters.
*
*/
public class InterpreterFactory {
Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
@ -103,7 +102,8 @@ public class InterpreterFactory {
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
builder.registerTypeAdapter(
InterpreterSetting.InterpreterInfo.class, new InterpreterInfoSerializer());
gson = builder.create();
init();
@ -197,16 +197,14 @@ public class InterpreterFactory {
InterpreterSetting setting = interpreterSettings.get(settingId);
logger.info("Interpreter setting group {} : id={}, name={}",
setting.getGroup(), settingId, setting.getName());
for (Interpreter interpreter : setting.getInterpreterGroup()) {
logger.info(" className = {}", interpreter.getClassName());
}
}
}
private void loadFromFile() throws IOException {
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
builder.registerTypeAdapter(
InterpreterSetting.InterpreterInfo.class, new InterpreterInfoSerializer());
Gson gson = builder.create();
File settingFile = new File(conf.getInterpreterSettingPath());
@ -241,14 +239,12 @@ public class InterpreterFactory {
setting.id(),
setting.getName(),
setting.getGroup(),
setting.getInterpreterInfos(),
setting.getProperties(),
setting.getDependencies(),
setting.getOption());
InterpreterGroup interpreterGroup = createInterpreterGroup(
setting.id(),
setting.getGroup(),
setting.getOption(),
setting.getProperties());
InterpreterGroup interpreterGroup = createInterpreterGroup(setting.id(), setting.getOption());
intpSetting.setInterpreterGroup(interpreterGroup);
interpreterSettings.put(k, intpSetting);
@ -380,9 +376,27 @@ public class InterpreterFactory {
throws InterpreterException, IOException, RepositoryException {
synchronized (interpreterSettings) {
List<InterpreterSetting.InterpreterInfo> interpreterInfos =
new LinkedList<InterpreterSetting.InterpreterInfo>();
for (RegisteredInterpreter registeredInterpreter :
Interpreter.registeredInterpreters.values()) {
if (registeredInterpreter.getGroup().equals(groupName)) {
for (String className : interpreterClassList) {
if (registeredInterpreter.getClassName().equals(className)) {
interpreterInfos.add(
new InterpreterSetting.InterpreterInfo(
className, registeredInterpreter.getName()));
}
}
}
}
InterpreterSetting intpSetting = new InterpreterSetting(
name,
groupName,
interpreterInfos,
properties,
dependencies,
option);
@ -390,8 +404,7 @@ public class InterpreterFactory {
loadInterpreterDependencies(intpSetting);
}
InterpreterGroup interpreterGroup = createInterpreterGroup(
intpSetting.id(), groupName, option, properties);
InterpreterGroup interpreterGroup = createInterpreterGroup(intpSetting.id(), option);
intpSetting.setInterpreterGroup(interpreterGroup);
@ -401,18 +414,12 @@ public class InterpreterFactory {
}
}
private InterpreterGroup createInterpreterGroup(String id,
String groupName,
InterpreterOption option,
Properties properties)
private InterpreterGroup createInterpreterGroup(String id, InterpreterOption option)
throws InterpreterException, NullArgumentException {
//When called from REST API without option we receive NPE
if (option == null)
throw new NullArgumentException("option");
//When called from REST API without option we receive NPE
if (properties == null)
throw new NullArgumentException("properties");
AngularObjectRegistry angularObjectRegistry;
@ -430,19 +437,38 @@ public class InterpreterFactory {
}
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
return interpreterGroup;
}
public void removeInterpretersForNote(InterpreterSetting interpreterSetting,
String noteId) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup();
interpreterGroup.close(noteId);
interpreterGroup.destroy(noteId);
synchronized (interpreterGroup) {
interpreterGroup.remove(noteId);
}
}
public void createInterpretersForNote(
InterpreterSetting interpreterSetting,
String noteId) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup();
String groupName = interpreterSetting.getGroup();
InterpreterOption option = interpreterSetting.getOption();
Properties properties = interpreterSetting.getProperties();
for (String className : interpreterClassList) {
Set<String> keys = Interpreter.registeredInterpreters.keySet();
for (String intName : keys) {
RegisteredInterpreter info = Interpreter.registeredInterpreters
.get(intName);
RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
if (info.getClassName().equals(className)
&& info.getGroup().equals(groupName)) {
Interpreter intp;
if (option.isRemote()) {
intp = createRemoteRepl(info.getPath(),
noteId,
info.getClassName(),
properties,
interpreterGroup.id);
@ -451,15 +477,25 @@ public class InterpreterFactory {
info.getClassName(),
properties);
}
interpreterGroup.add(intp);
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
if (interpreters == null) {
interpreters = new LinkedList<Interpreter>();
interpreterGroup.put(noteId, interpreters);
}
interpreters.add(intp);
}
intp.setInterpreterGroup(interpreterGroup);
break;
}
}
}
return interpreterGroup;
}
public void remove(String id) throws IOException {
synchronized (interpreterSettings) {
if (interpreterSettings.containsKey(id)) {
@ -486,7 +522,7 @@ public class InterpreterFactory {
}
/**
* Get loaded interpreters
* Get interpreter settings
* @return
*/
public List<InterpreterSetting> get() {
@ -508,8 +544,8 @@ public class InterpreterFactory {
continue;
}
}
for (InterpreterSetting.InterpreterInfo intp : setting.getInterpreterInfos()) {
for (Interpreter intp : setting.getInterpreterGroup()) {
if (className.equals(intp.getClassName())) {
boolean alreadyAdded = false;
for (InterpreterSetting st : orderedSettings) {
@ -536,15 +572,33 @@ public class InterpreterFactory {
public void putNoteInterpreterSettingBinding(String noteId,
List<String> settingList) throws IOException {
List<String> unBindedSettings = new LinkedList<String>();
synchronized (interpreterSettings) {
List<String> oldSettings = interpreterBindings.get(noteId);
if (oldSettings != null) {
for (String oldSettingId : oldSettings) {
if (!settingList.contains(oldSettingId)) {
unBindedSettings.add(oldSettingId);
}
}
}
interpreterBindings.put(noteId, settingList);
saveToFile();
for (String settingId : unBindedSettings) {
InterpreterSetting setting = get(settingId);
removeInterpretersForNote(setting, noteId);
}
}
}
public void removeNoteInterpreterSettingBinding(String noteId) {
synchronized (interpreterSettings) {
interpreterBindings.remove(noteId);
List<String> settingIds = interpreterBindings.remove(noteId);
for (String settingId : settingIds) {
this.removeInterpretersForNote(get(settingId), noteId);
}
}
}
@ -582,9 +636,7 @@ public class InterpreterFactory {
intpsetting.setOption(option);
intpsetting.setDependencies(dependencies);
InterpreterGroup interpreterGroup = createInterpreterGroup(
intpsetting.id(),
intpsetting.getGroup(), option, properties);
InterpreterGroup interpreterGroup = createInterpreterGroup(intpsetting.id(), option);
intpsetting.setInterpreterGroup(interpreterGroup);
loadInterpreterDependencies(intpsetting);
@ -608,7 +660,7 @@ public class InterpreterFactory {
InterpreterGroup interpreterGroup = createInterpreterGroup(
intpsetting.id(),
intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties());
intpsetting.getOption());
intpsetting.setInterpreterGroup(interpreterGroup);
} else {
throw new InterpreterException("Interpreter setting id " + id
@ -619,16 +671,18 @@ public class InterpreterFactory {
private void stopJobAllInterpreter(InterpreterSetting intpsetting) {
if (intpsetting != null) {
for (Interpreter intp : intpsetting.getInterpreterGroup()) {
for (Job job : intp.getScheduler().getJobsRunning()) {
job.abort();
job.setStatus(Status.ABORT);
logger.info("Job " + job.getJobName() + " aborted ");
}
for (Job job : intp.getScheduler().getJobsWaiting()) {
job.abort();
job.setStatus(Status.ABORT);
logger.info("Job " + job.getJobName() + " aborted ");
for (List<Interpreter> interpreters : intpsetting.getInterpreterGroup().values()) {
for (Interpreter intp : interpreters) {
for (Job job : intp.getScheduler().getJobsRunning()) {
job.abort();
job.setStatus(Status.ABORT);
logger.info("Job " + job.getJobName() + " aborted ");
}
for (Job job : intp.getScheduler().getJobsWaiting()) {
job.abort();
job.setStatus(Status.ABORT);
logger.info("Job " + job.getJobName() + " aborted ");
}
}
}
}
@ -720,13 +774,13 @@ public class InterpreterFactory {
}
private Interpreter createRemoteRepl(String interpreterPath, String className,
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
Properties property, String interpreterId) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterId;
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
property, className, conf.getInterpreterRemoteRunnerPath(),
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout,
maxPoolSize, remoteInterpreterProcessListener));
return intp;

View file

@ -29,28 +29,30 @@ import com.google.gson.JsonSerializer;
/**
* Interpreter class serializer for gson
* InterpreterInfo class serializer for gson
*
*/
public class InterpreterSerializer implements JsonSerializer<Interpreter>,
JsonDeserializer<Interpreter> {
public class InterpreterInfoSerializer
implements JsonSerializer<InterpreterSetting.InterpreterInfo>,
JsonDeserializer<InterpreterSetting.InterpreterInfo> {
@Override
public JsonElement serialize(Interpreter interpreter, Type type,
public JsonElement serialize(InterpreterSetting.InterpreterInfo interpreterInfo, Type type,
JsonSerializationContext context) {
JsonObject json = new JsonObject();
json.addProperty("class", interpreter.getClassName());
json.addProperty(
"name",
Interpreter.findRegisteredInterpreterByClassName(
interpreter.getClassName()).getName());
json.addProperty("class", interpreterInfo.getClassName());
json.addProperty("name", interpreterInfo.getName());
return json;
}
@Override
public Interpreter deserialize(JsonElement json, Type typeOfT,
public InterpreterSetting.InterpreterInfo deserialize(JsonElement json, Type typeOfT,
JsonDeserializationContext context) throws JsonParseException {
return null;
JsonObject jsonObject = json.getAsJsonObject();
String className = jsonObject.get("class").getAsString();
String name = jsonObject.get("name").getAsString();
return new InterpreterSetting.InterpreterInfo(className, name);
}
}

View file

@ -34,27 +34,59 @@ public class InterpreterSetting {
private String group;
private String description;
private Properties properties;
private InterpreterGroup interpreterGroup;
// use 'interpreterGroup' as a field name to keep backward compativility of
// conf/interpreter.json file format
private List<InterpreterInfo> interpreterGroup;
private transient InterpreterGroup interpreterGroupRef;
private List<Dependency> dependencies;
private InterpreterOption option;
public InterpreterSetting(String id,
String name,
String group,
List<InterpreterInfo> interpreterInfos,
Properties properties,
List<Dependency> dependencies,
InterpreterOption option) {
this.id = id;
this.name = name;
this.group = group;
this.interpreterGroup = interpreterInfos;
this.properties = properties;
this.dependencies = dependencies;
this.option = option;
}
public InterpreterSetting(String name,
String group,
List<InterpreterInfo> interpreterInfos,
Properties properties,
List<Dependency> dependencies,
InterpreterOption option) {
this(generateId(), name, group, dependencies, option);
this(generateId(), name, group, interpreterInfos, properties, dependencies, option);
}
/**
* Information of interpreters in this interpreter setting.
* this will be serialized for conf/interpreter.json and REST api response.
*/
public static class InterpreterInfo {
private final String name;
private final String className;
public InterpreterInfo(String className, String name) {
this.className = className;
this.name = name;
}
public String getName() {
return name;
}
public String getClassName() {
return className;
}
}
public String id() {
@ -86,12 +118,11 @@ public class InterpreterSetting {
}
public InterpreterGroup getInterpreterGroup() {
return interpreterGroup;
return interpreterGroupRef;
}
public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
this.interpreterGroup = interpreterGroup;
this.properties = interpreterGroup.getProperty();
this.interpreterGroupRef = interpreterGroup;
}
public Properties getProperties() {
@ -120,4 +151,8 @@ public class InterpreterSetting {
public void setOption(InterpreterOption option) {
this.option = option;
}
public List<InterpreterInfo> getInterpreterInfos() {
return interpreterGroup;
}
}

View file

@ -29,10 +29,11 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
/**
* Repl loader per note.
* Interpreter loader per note.
*/
public class NoteInterpreterLoader {
private transient InterpreterFactory factory;
private static String SHARED_SESSION = "shared_session";
String noteId;
public NoteInterpreterLoader(InterpreterFactory factory) {
@ -73,6 +74,37 @@ public class NoteInterpreterLoader {
return settings;
}
private String getInterpreterGroupKey(InterpreterSetting setting) {
if (setting.getOption().isPerNoteSession()) {
return SHARED_SESSION;
} else {
return noteId;
}
}
private List<Interpreter> createOrGetInterpreterList(InterpreterSetting setting) {
InterpreterGroup interpreterGroup = setting.getInterpreterGroup();
synchronized (interpreterGroup) {
String key = getInterpreterGroupKey(setting);
if (!interpreterGroup.containsKey(key)) {
factory.createInterpretersForNote(setting, key);
}
return interpreterGroup.get(getInterpreterGroupKey(setting));
}
}
public void close() {
// close interpreters in this note session
List<InterpreterSetting> settings = this.getInterpreterSettings();
if (settings == null || settings.size() == 0) {
return;
}
for (InterpreterSetting setting : settings) {
factory.removeInterpretersForNote(setting, noteId);
}
}
public Interpreter get(String replName) {
List<InterpreterSetting> settings = getInterpreterSettings();
@ -81,7 +113,9 @@ public class NoteInterpreterLoader {
}
if (replName == null || replName.trim().length() == 0) {
return settings.get(0).getInterpreterGroup().getFirst();
// get default settings (first available)
InterpreterSetting defaultSettings = settings.get(0);
return createOrGetInterpreterList(defaultSettings).get(0);
}
if (Interpreter.registeredInterpreters == null) {
@ -104,43 +138,47 @@ public class NoteInterpreterLoader {
String interpreterClassName = registeredInterpreter.getClassName();
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getInterpreterGroup();
for (Interpreter interpreter : intpGroup) {
if (interpreterClassName.equals(interpreter.getClassName())) {
return interpreter;
if (registeredInterpreter.getGroup().equals(setting.getGroup())) {
List<Interpreter> intpGroup = createOrGetInterpreterList(setting);
for (Interpreter interpreter : intpGroup) {
if (interpreterClassName.equals(interpreter.getClassName())) {
return interpreter;
}
}
}
}
throw new InterpreterException(replName + " interpreter not found");
} else {
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
// search 'name' from first (default) interpreter group
InterpreterGroup intpGroup = settings.get(0).getInterpreterGroup();
for (Interpreter interpreter : intpGroup) {
RegisteredInterpreter intp = Interpreter
.findRegisteredInterpreterByClassName(interpreter.getClassName());
if (intp == null) {
continue;
InterpreterSetting defaultSetting = settings.get(0);
Interpreter.RegisteredInterpreter registeredInterpreter =
Interpreter.registeredInterpreters.get(defaultSetting.getGroup() + "." + replName);
if (registeredInterpreter != null) {
List<Interpreter> interpreters = createOrGetInterpreterList(defaultSetting);
for (Interpreter interpreter : interpreters) {
RegisteredInterpreter intp =
Interpreter.findRegisteredInterpreterByClassName(interpreter.getClassName());
if (intp == null) {
continue;
}
if (intp.getName().equals(replName)) {
return interpreter;
}
}
if (intp.getName().equals(replName)) {
return interpreter;
}
throw new InterpreterException(
defaultSetting.getGroup() + "." + replName + " interpreter not found");
}
// next, assume replName is 'group' of interpreter ('name' is ommitted)
// search interpreter group and return first interpreter.
for (InterpreterSetting setting : settings) {
intpGroup = setting.getInterpreterGroup();
Interpreter interpreter = intpGroup.get(0);
RegisteredInterpreter intp = Interpreter
.findRegisteredInterpreterByClassName(interpreter.getClassName());
if (intp == null) {
continue;
}
if (intp.getGroup().equals(replName)) {
return interpreter;
if (setting.getGroup().equals(replName)) {
List<Interpreter> interpreters = createOrGetInterpreterList(setting);
return interpreters.get(0);
}
}
}

View file

@ -278,6 +278,7 @@ public class Notebook {
synchronized (notes) {
note = notes.remove(id);
}
replFactory.removeNoteInterpreterSettingBinding(id);
notebookIndex.deleteIndexDocs(note);
// remove from all interpreter instance's angular object registry
@ -376,7 +377,7 @@ public class Notebook {
//
// therefore instead of addAndNotifyRemoteProcess(), need to use add()
// that results add angularObject only in ZeppelinServer side not remoteProcessSide
registry.add(name, snapshot.getAngularObject().get(), noteId, paragraphId);
//registry.add(name, snapshot.getAngularObject().get(), noteId, paragraphId);
}
}
}

View file

@ -22,8 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
@ -87,9 +86,12 @@ public class InterpreterFactoryTest {
@Test
public void testBasic() {
List<String> all = factory.getDefaultInterpreterSettingList();
InterpreterSetting setting = factory.get(all.get(0));
InterpreterGroup interpreterGroup = setting.getInterpreterGroup();
factory.createInterpretersForNote(setting, "session");
// get interpreter
Interpreter repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
Interpreter repl1 = interpreterGroup.get("session").get(0);
assertFalse(((LazyOpenInterpreter) repl1).isOpen());
repl1.interpret("repl1", context);
assertTrue(((LazyOpenInterpreter) repl1).isOpen());
@ -99,23 +101,14 @@ public class InterpreterFactoryTest {
// restart interpreter
factory.restart(all.get(0));
repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
assertFalse(((LazyOpenInterpreter) repl1).isOpen());
assertNull(setting.getInterpreterGroup().get("session"));
}
@Test
public void testFactoryDefaultList() throws IOException, RepositoryException {
// get default list from default setting
// get default settings
List<String> all = factory.getDefaultInterpreterSettingList();
assertEquals(2, all.size());
assertEquals(factory.get(all.get(0)).getInterpreterGroup().getFirst().getClassName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
// add setting
factory.add("a mock", "mock2", new LinkedList<Dependency>(), new InterpreterOption(false), new Properties());
all = factory.getDefaultInterpreterSettingList();
assertEquals(2, all.size());
assertEquals("mock1", factory.get(all.get(0)).getName());
assertEquals("a mock", factory.get(all.get(1)).getName());
}
@Test

View file

@ -16,8 +16,6 @@
*/
package org.apache.zeppelin.notebook;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
@ -36,6 +34,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class NoteInterpreterLoaderTest {
private File tmpDir;
@ -92,6 +92,41 @@ public class NoteInterpreterLoaderTest {
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", loader.get("group1.mock1").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", loader.get("group1.mock11").getClassName());
assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", loader.get("group2.mock2").getClassName());
loader.close();
}
@Test
public void testNoteSession() throws IOException {
NoteInterpreterLoader loaderA = new NoteInterpreterLoader(factory);
loaderA.setNoteId("noteA");
loaderA.setInterpreters(factory.getDefaultInterpreterSettingList());
NoteInterpreterLoader loaderB = new NoteInterpreterLoader(factory);
loaderB.setNoteId("noteB");
loaderB.setInterpreters(factory.getDefaultInterpreterSettingList());
// interpreters are not created before accessing it
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA"));
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB"));
// per note session interpreter instance in the same interpreter process
assertTrue(
loaderA.get(null).getInterpreterGroup().getRemoteInterpreterProcess() ==
loaderB.get(null).getInterpreterGroup().getRemoteInterpreterProcess());
// interpreters are created after accessing it
assertNotNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA"));
assertNotNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB"));
// when
loaderA.close();
loaderB.close();
// interpreters are destroyed after close
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA"));
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB"));
}
private void delete(File file){