Merge branch 'master' into extends-zrun-remote-transaction

This commit is contained in:
CloverHearts 2016-11-26 18:21:46 +09:00
commit 2a2c173b86
9 changed files with 121 additions and 109 deletions

View file

@ -291,6 +291,10 @@ public class SparkInterpreter extends Interpreter {
return (DepInterpreter) p;
}
private boolean isYarnMode() {
return getProperty("master").startsWith("yarn");
}
/**
* Spark 2.x
* Create SparkSession
@ -314,6 +318,10 @@ public class SparkInterpreter extends Interpreter {
conf.set("spark.scheduler.mode", "FAIR");
conf.setMaster(getProperty("master"));
if (isYarnMode()) {
conf.set("master", "yarn");
conf.set("spark.submit.deployMode", "client");
}
Properties intpProperty = getProperty();
@ -505,7 +513,7 @@ public class SparkInterpreter extends Interpreter {
// Distributes needed libraries to workers
// when spark version is greater than or equal to 1.5.0
if (getProperty("master").equals("yarn-client")) {
if (isYarnMode()) {
conf.set("spark.yarn.isPython", "true");
}
}
@ -554,7 +562,7 @@ public class SparkInterpreter extends Interpreter {
@Override
public void open() {
// set properties and do login before creating any spark stuff for secured cluster
if (getProperty("master").equals("yarn-client")) {
if (isYarnMode()) {
System.setProperty("SPARK_YARN_MODE", "true");
}
if (getProperty().containsKey("spark.yarn.keytab") &&

View file

@ -32,9 +32,10 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
* For example spark, pyspark, sql interpreters are in the same 'spark' group
* and InterpreterGroup will have reference to these all interpreters.
*
* Remember, list of interpreters are dedicated to a note.
* (when InterpreterOption.session==true)
* So InterpreterGroup internally manages map of [noteId, list of interpreters]
* Remember, list of interpreters are dedicated to a session. Session could be shared across user
* or notes, so the sessionId could be user or noteId or their combination.
* So InterpreterGroup internally manages map of [interpreterSessionKey(noteId, user, or
* their combination), list of interpreters]
*
* A InterpreterGroup runs on interpreter process.
* And unit of interpreter instantiate, restart, bind, unbind.
@ -103,15 +104,12 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public Properties getProperty() {
Properties p = new Properties();
Collection<List<Interpreter>> intpGroupForANote = this.values();
if (intpGroupForANote != null && intpGroupForANote.size() > 0) {
for (List<Interpreter> intpGroup : intpGroupForANote) {
for (Interpreter intp : intpGroup) {
p.putAll(intp.getProperty());
}
// it's okay to break here while every List<Interpreters> will have the same property set
break;
for (List<Interpreter> intpGroupForASession : this.values()) {
for (Interpreter intp : intpGroupForASession) {
p.putAll(intp.getProperty());
}
// it's okay to break here while every List<Interpreters> will have the same property set
break;
}
return p;
}
@ -148,20 +146,20 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public void close() {
LOGGER.info("Close interpreter group " + getId());
List<Interpreter> intpToClose = new LinkedList<>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToClose.addAll(intpGroupForNote);
for (List<Interpreter> intpGroupForSession : this.values()) {
intpToClose.addAll(intpGroupForSession);
}
close(intpToClose);
}
/**
* Close all interpreter instances in this group for the note
* @param noteId
* Close all interpreter instances in this group for the session
* @param sessionId
*/
public void close(String noteId) {
LOGGER.info("Close interpreter group " + getId() + " for note " + noteId);
List<Interpreter> intpForNote = this.get(noteId);
close(intpForNote);
public void close(String sessionId) {
LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionId);
List<Interpreter> intpForSession = this.get(sessionId);
close(intpForSession);
}
private void close(Collection<Interpreter> intpToClose) {
@ -196,13 +194,13 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
}
/**
* Destroy all interpreter instances in this group for the note
* @param noteId
* Destroy all interpreter instances in this group for the session
* @param sessionId
*/
public void destroy(String noteId) {
LOGGER.info("Destroy interpreter group " + getId() + " for note " + noteId);
List<Interpreter> intpForNote = this.get(noteId);
destroy(intpForNote);
public void destroy(String sessionId) {
LOGGER.info("Destroy interpreter group " + getId() + " for session " + sessionId);
List<Interpreter> intpForSession = this.get(sessionId);
destroy(intpForSession);
if (remoteInterpreterProcess != null) {
remoteInterpreterProcess.dereference();
@ -220,8 +218,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public void destroy() {
LOGGER.info("Destroy interpreter group " + getId());
List<Interpreter> intpToDestroy = new LinkedList<>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToDestroy.addAll(intpGroupForNote);
for (List<Interpreter> intpGroupForSession : this.values()) {
intpToDestroy.addAll(intpGroupForSession);
}
destroy(intpToDestroy);

View file

@ -44,24 +44,24 @@ public class ZeppelinDevServer extends
}
@Override
protected Interpreter getInterpreter(String noteId, String className) throws TException {
protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
synchronized (this) {
InterpreterGroup interpreterGroup = getInterpreterGroup();
if (interpreterGroup == null) {
createInterpreter(
"dev",
noteId,
sessionKey,
DevInterpreter.class.getName(),
new HashMap<String, String>());
Interpreter intp = super.getInterpreter(noteId, className);
Interpreter intp = super.getInterpreter(sessionKey, className);
interpreter = (DevInterpreter) (
((LazyOpenInterpreter) intp).getInnerInterpreter());
interpreter.setInterpreterEvent(this);
notify();
}
}
return super.getInterpreter(noteId, className);
return super.getInterpreter(sessionKey, className);
}
@Override

View file

@ -51,7 +51,7 @@ public class RemoteInterpreter extends Interpreter {
private String interpreterPath;
private String localRepoPath;
private String className;
private String noteId;
private String sessionKey;
FormType formType;
boolean initialized;
private Map<String, String> env;
@ -66,7 +66,7 @@ public class RemoteInterpreter extends Interpreter {
* Remote interpreter and manage interpreter process
*/
public RemoteInterpreter(Properties property,
String noteId,
String sessionKey,
String className,
String interpreterRunner,
String interpreterPath,
@ -78,7 +78,7 @@ public class RemoteInterpreter extends Interpreter {
String userName,
Boolean isUserImpersonate) {
super(property);
this.noteId = noteId;
this.sessionKey = sessionKey;
this.className = className;
initialized = false;
this.interpreterRunner = interpreterRunner;
@ -99,7 +99,7 @@ public class RemoteInterpreter extends Interpreter {
*/
public RemoteInterpreter(
Properties property,
String noteId,
String sessionKey,
String className,
String host,
int port,
@ -110,7 +110,7 @@ public class RemoteInterpreter extends Interpreter {
String userName,
Boolean isUserImpersonate) {
super(property);
this.noteId = noteId;
this.sessionKey = sessionKey;
this.className = className;
initialized = false;
this.host = host;
@ -127,7 +127,7 @@ public class RemoteInterpreter extends Interpreter {
// VisibleForTesting
public RemoteInterpreter(
Properties property,
String noteId,
String sessionKey,
String className,
String interpreterRunner,
String interpreterPath,
@ -140,7 +140,7 @@ public class RemoteInterpreter extends Interpreter {
Boolean isUserImpersonate) {
super(property);
this.className = className;
this.noteId = noteId;
this.sessionKey = sessionKey;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.localRepoPath = localRepoPath;
@ -238,7 +238,7 @@ public class RemoteInterpreter extends Interpreter {
if (localRepoPath != null) {
property.put("zeppelin.interpreter.localRepo", localRepoPath);
}
client.createInterpreter(groupId, noteId,
client.createInterpreter(groupId, sessionKey,
getClassName(), (Map) property);
// Push angular object loaded from JSON file to remote interpreter
@ -266,7 +266,7 @@ public class RemoteInterpreter extends Interpreter {
synchronized (interpreterGroup) {
// initialize all interpreters in this interpreter group
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
for (Interpreter intp : new ArrayList<>(interpreters)) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
@ -292,7 +292,7 @@ public class RemoteInterpreter extends Interpreter {
try {
client = interpreterProcess.getClient();
if (client != null) {
client.close(noteId, className);
client.close(sessionKey, className);
}
} catch (TException e) {
broken = true;
@ -339,7 +339,7 @@ public class RemoteInterpreter extends Interpreter {
final GUI currentGUI = context.getGui();
RemoteInterpreterResult remoteResult = client.interpret(
noteId, className, st, convert(context));
sessionKey, className, st, convert(context));
Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
@ -385,7 +385,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
client.cancel(noteId, className, convert(context));
client.cancel(sessionKey, className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@ -413,7 +413,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
formType = FormType.valueOf(client.getFormType(noteId, className));
formType = FormType.valueOf(client.getFormType(sessionKey, className));
return formType;
} catch (TException e) {
broken = true;
@ -439,7 +439,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
return client.getProgress(noteId, className, convert(context));
return client.getProgress(sessionKey, className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@ -461,7 +461,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
List completion = client.completion(noteId, className, buf, cursor);
List completion = client.completion(sessionKey, className, buf, cursor);
return completion;
} catch (TException e) {
broken = true;
@ -479,8 +479,8 @@ public class RemoteInterpreter extends Interpreter {
return null;
} else {
return SchedulerFactory.singleton().createOrGetRemoteScheduler(
RemoteInterpreter.class.getName() + noteId + interpreterProcess.hashCode(),
noteId,
RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
sessionKey,
interpreterProcess,
maxConcurrency);
}

View file

@ -157,7 +157,7 @@ public class RemoteInterpreterServer
@Override
public void createInterpreter(String interpreterGroupId, String noteId, String
public void createInterpreter(String interpreterGroupId, String sessionKey, String
className, Map<String, String> properties) throws TException {
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
@ -185,10 +185,10 @@ public class RemoteInterpreterServer
repl.setClassloaderUrls(new URL[]{});
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters == null) {
interpreters = new LinkedList<>();
interpreterGroup.put(noteId, interpreters);
interpreterGroup.put(sessionKey, interpreters);
}
interpreters.add(new LazyOpenInterpreter(repl));
@ -229,13 +229,13 @@ public class RemoteInterpreterServer
}
}
protected Interpreter getInterpreter(String noteId, String className) throws TException {
protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
if (interpreterGroup == null) {
throw new TException(
new InterpreterException("Interpreter instance " + className + " not created"));
}
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters == null) {
throw new TException(
new InterpreterException("Interpreter " + className + " not initialized"));
@ -257,13 +257,13 @@ public class RemoteInterpreterServer
}
@Override
public void close(String noteId, String className) throws TException {
public void close(String sessionKey, String className) throws TException {
// unload all applications
for (String appId : runningApplications.keySet()) {
RunningApplication appInfo = runningApplications.get(appId);
// see NoteInterpreterLoader.SHARED_SESSION
if (appInfo.noteId.equals(noteId) || noteId.equals("shared_session")) {
if (appInfo.noteId.equals(sessionKey) || sessionKey.equals("shared_session")) {
try {
logger.info("Unload App {} ", appInfo.pkg.getName());
appInfo.app.unload();
@ -277,7 +277,7 @@ public class RemoteInterpreterServer
// close interpreters
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters != null) {
Iterator<Interpreter> it = interpreters.iterator();
while (it.hasNext()) {
@ -715,14 +715,14 @@ public class RemoteInterpreterServer
}
@Override
public String getStatus(String noteId, String jobId)
public String getStatus(String sessionKey, String jobId)
throws TException {
if (interpreterGroup == null) {
return "Unknown";
}
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters == null) {
return "Unknown";
}

View file

@ -19,7 +19,7 @@
namespace java org.apache.zeppelin.interpreter.thrift
struct RemoteInterpreterContext {
1: string noteId,
1: string sessionKey,
2: string paragraphId,
3: string replName,
4: string paragraphTitle,
@ -82,18 +82,18 @@ struct InterpreterCompletion {
}
service RemoteInterpreterService {
void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string className, 4: map<string, string> properties);
void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties);
void open(1: string noteId, 2: string className);
void close(1: string noteId, 2: string className);
RemoteInterpreterResult interpret(1: string noteId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
void cancel(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
i32 getProgress(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
string getFormType(1: string noteId, 2: string className);
list<InterpreterCompletion> completion(1: string noteId, 2: string className, 3: string buf, 4: i32 cursor);
void open(1: string sessionKey, 2: string className);
void close(1: string sessionKey, 2: string className);
RemoteInterpreterResult interpret(1: string sessionKey, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
void cancel(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
i32 getProgress(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
string getFormType(1: string sessionKey, 2: string className);
list<InterpreterCompletion> completion(1: string sessionKey, 2: string className, 3: string buf, 4: i32 cursor);
void shutdown();
string getStatus(1: string noteId, 2:string jobId);
string getStatus(1: string sessionKey, 2:string jobId);
RemoteInterpreterEvent getEvent();
@ -104,17 +104,17 @@ service RemoteInterpreterService {
// get all resources in the interpreter process
list<string> resourcePoolGetAll();
// get value of resource
binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName);
binary resourceGet(1: string sessionKey, 2: string paragraphId, 3: string resourceName);
// remove resource
bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName);
bool resourceRemove(1: string sessionKey, 2: string paragraphId, 3:string resourceName);
void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
void angularObjectUpdate(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string
object);
void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
void angularObjectAdd(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string object);
void angularObjectRemove(1: string name, 2: string sessionKey, 3: string paragraphId);
void angularRegistryPush(1: string registry);
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string noteId, 4: string paragraphId);
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionKey, 4: string paragraphId);
RemoteApplicationResult unloadApplication(1: string applicationInstanceId);
RemoteApplicationResult runApplication(1: string applicationInstanceId);

View file

@ -50,7 +50,6 @@ import java.util.Set;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.internal.StringMap;
@ -730,7 +729,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
} else if (option.isSession()) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
String key = getInterpreterInstanceKey(user, noteId, interpreterSetting);
String key = getInterpreterSessionKey(user, noteId, interpreterSetting);
interpreterGroup.close(key);
interpreterGroup.destroy(key);
synchronized (interpreterGroup) {
@ -743,7 +742,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
public void createInterpretersForNote(InterpreterSetting interpreterSetting, String user,
String noteId, String key) {
String noteId, String interpreterSessionKey) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
InterpreterOption option = interpreterSetting.getOption();
Properties properties = (Properties) interpreterSetting.getProperties();
@ -760,7 +759,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
long minTimeout = 10L * 1000 * 1000000; // 10 sec
long interpreterRemovalWaitTimeout = Math.max(minTimeout,
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
while (interpreterGroup.containsKey(key)) {
while (interpreterGroup.containsKey(interpreterSessionKey)) {
if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) {
throw new InterpreterException("Can not create interpreter");
}
@ -784,18 +783,18 @@ public class InterpreterFactory implements InterpreterGroupFactory {
connectToRemoteRepl(noteId, info.getClassName(), option.getHost(), option.getPort(),
properties, user, option.isUserImpersonate);
} else {
interpreter = createRemoteRepl(path, key, info.getClassName(), properties,
interpreterSetting.getId(), user, option.isUserImpersonate());
interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(),
properties, interpreterSetting.getId(), user, option.isUserImpersonate());
}
} else {
interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties);
}
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(key);
List<Interpreter> interpreters = interpreterGroup.get(interpreterSessionKey);
if (null == interpreters) {
interpreters = new ArrayList<>();
interpreterGroup.put(key, interpreters);
interpreterGroup.put(interpreterSessionKey, interpreters);
}
if (info.isDefaultInterpreter()) {
interpreters.add(0, interpreter);
@ -1105,25 +1104,27 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
private Interpreter connectToRemoteRepl(String noteId, String className, String host, int port,
Properties property, String userName, Boolean isUserImpersonate) {
private Interpreter connectToRemoteRepl(String interpreterSessionKey, String className,
String host, int port, Properties property, String userName, Boolean isUserImpersonate) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(
new RemoteInterpreter(property, noteId, className, host, port, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate));
new RemoteInterpreter(property, interpreterSessionKey, className, host, port,
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
userName, isUserImpersonate));
return intp;
}
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
Properties property, String interpreterSettingId, String userName,
Boolean isUserImpersonate) {
private Interpreter createRemoteRepl(String interpreterPath, String interpreterSessionKey,
String className, Properties property, String interpreterSettingId,
String userName, Boolean isUserImpersonate) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
RemoteInterpreter remoteInterpreter =
new RemoteInterpreter(property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
new RemoteInterpreter(property, interpreterSessionKey, className,
conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
remoteInterpreter.addEnv(env);
@ -1177,21 +1178,23 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
private String getInterpreterInstanceKey(String user, String noteId, InterpreterSetting setting) {
private String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) {
InterpreterOption option = setting.getOption();
String key;
if (option.isExistingProcess()) {
key = Constants.EXISTING_PROCESS;
} else if (!option.perNoteShared()) {
} else if (option.perNoteScoped() && option.perUserScoped()) {
key = user + ":" + noteId;
} else if (option.perUserScoped()) {
key = user;
} else if (option.perNoteScoped()) {
key = noteId;
if (shiroEnabled && !option.perUserShared()) {
key = user + ":" + key;
}
} else {
key = SHARED_SESSION;
}
logger.debug("Interpreter instance key: {}", key);
logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
"{}", key, noteId, user, setting.getName());
return key;
}
@ -1199,11 +1202,11 @@ public class InterpreterFactory implements InterpreterGroupFactory {
InterpreterSetting setting) {
InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId);
synchronized (interpreterGroup) {
String key = getInterpreterInstanceKey(user, noteId, setting);
if (!interpreterGroup.containsKey(key)) {
createInterpretersForNote(setting, user, noteId, key);
String interpreterSessionKey = getInterpreterSessionKey(user, noteId, setting);
if (!interpreterGroup.containsKey(interpreterSessionKey)) {
createInterpretersForNote(setting, user, noteId, interpreterSessionKey);
}
return interpreterGroup.get(getInterpreterInstanceKey(user, noteId, setting));
return interpreterGroup.get(interpreterSessionKey);
}
}

View file

@ -130,7 +130,8 @@ public class InterpreterSetting {
key = SHARED_PROCESS;
}
logger.debug("getInterpreterProcessKey: {}", key);
logger.debug("getInterpreterProcessKey: {} for InterpreterSetting Id: {}, Name: {}",
key, getId(), getName());
return key;
}
@ -142,6 +143,7 @@ public class InterpreterSetting {
interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
interpreterGroupWriteLock.lock();
logger.debug("create interpreter group with groupId:" + interpreterGroupId);
interpreterGroupRef.put(key, intpGroup);
interpreterGroupWriteLock.unlock();
}

View file

@ -27,6 +27,7 @@ import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter11;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
@ -135,8 +136,8 @@ public class NoteInterpreterLoaderTest {
factory.getInterpreterSettings("noteB").get(0).getOption().setPerNote(InterpreterOption.ISOLATED);
// interpreters are not created before accessing it
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
factory.getInterpreter("user", "noteA", null).open();
factory.getInterpreter("user", "noteB", null).open();
@ -147,16 +148,16 @@ public class NoteInterpreterLoaderTest {
factory.getInterpreter("user", "noteB", null).getInterpreterGroup().getId()));
// interpreters are created after accessing it
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
// when
factory.closeNote("user", "noteA");
factory.closeNote("user", "noteB");
// interpreters are destroyed after close
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
}