mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' into extends-zrun-remote-transaction
This commit is contained in:
commit
2a2c173b86
9 changed files with 121 additions and 109 deletions
|
|
@ -291,6 +291,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
return (DepInterpreter) p;
|
||||
}
|
||||
|
||||
private boolean isYarnMode() {
|
||||
return getProperty("master").startsWith("yarn");
|
||||
}
|
||||
|
||||
/**
|
||||
* Spark 2.x
|
||||
* Create SparkSession
|
||||
|
|
@ -314,6 +318,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
conf.set("spark.scheduler.mode", "FAIR");
|
||||
conf.setMaster(getProperty("master"));
|
||||
if (isYarnMode()) {
|
||||
conf.set("master", "yarn");
|
||||
conf.set("spark.submit.deployMode", "client");
|
||||
}
|
||||
|
||||
Properties intpProperty = getProperty();
|
||||
|
||||
|
|
@ -505,7 +513,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
// Distributes needed libraries to workers
|
||||
// when spark version is greater than or equal to 1.5.0
|
||||
if (getProperty("master").equals("yarn-client")) {
|
||||
if (isYarnMode()) {
|
||||
conf.set("spark.yarn.isPython", "true");
|
||||
}
|
||||
}
|
||||
|
|
@ -554,7 +562,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
@Override
|
||||
public void open() {
|
||||
// set properties and do login before creating any spark stuff for secured cluster
|
||||
if (getProperty("master").equals("yarn-client")) {
|
||||
if (isYarnMode()) {
|
||||
System.setProperty("SPARK_YARN_MODE", "true");
|
||||
}
|
||||
if (getProperty().containsKey("spark.yarn.keytab") &&
|
||||
|
|
|
|||
|
|
@ -32,9 +32,10 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
|
|||
* For example spark, pyspark, sql interpreters are in the same 'spark' group
|
||||
* and InterpreterGroup will have reference to these all interpreters.
|
||||
*
|
||||
* Remember, list of interpreters are dedicated to a note.
|
||||
* (when InterpreterOption.session==true)
|
||||
* So InterpreterGroup internally manages map of [noteId, list of interpreters]
|
||||
* Remember, list of interpreters are dedicated to a session. Session could be shared across user
|
||||
* or notes, so the sessionId could be user or noteId or their combination.
|
||||
* So InterpreterGroup internally manages map of [interpreterSessionKey(noteId, user, or
|
||||
* their combination), list of interpreters]
|
||||
*
|
||||
* A InterpreterGroup runs on interpreter process.
|
||||
* And unit of interpreter instantiate, restart, bind, unbind.
|
||||
|
|
@ -103,15 +104,12 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
public Properties getProperty() {
|
||||
Properties p = new Properties();
|
||||
|
||||
Collection<List<Interpreter>> intpGroupForANote = this.values();
|
||||
if (intpGroupForANote != null && intpGroupForANote.size() > 0) {
|
||||
for (List<Interpreter> intpGroup : intpGroupForANote) {
|
||||
for (Interpreter intp : intpGroup) {
|
||||
p.putAll(intp.getProperty());
|
||||
}
|
||||
// it's okay to break here while every List<Interpreters> will have the same property set
|
||||
break;
|
||||
for (List<Interpreter> intpGroupForASession : this.values()) {
|
||||
for (Interpreter intp : intpGroupForASession) {
|
||||
p.putAll(intp.getProperty());
|
||||
}
|
||||
// it's okay to break here while every List<Interpreters> will have the same property set
|
||||
break;
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
|
@ -148,20 +146,20 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
public void close() {
|
||||
LOGGER.info("Close interpreter group " + getId());
|
||||
List<Interpreter> intpToClose = new LinkedList<>();
|
||||
for (List<Interpreter> intpGroupForNote : this.values()) {
|
||||
intpToClose.addAll(intpGroupForNote);
|
||||
for (List<Interpreter> intpGroupForSession : this.values()) {
|
||||
intpToClose.addAll(intpGroupForSession);
|
||||
}
|
||||
close(intpToClose);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all interpreter instances in this group for the note
|
||||
* @param noteId
|
||||
* Close all interpreter instances in this group for the session
|
||||
* @param sessionId
|
||||
*/
|
||||
public void close(String noteId) {
|
||||
LOGGER.info("Close interpreter group " + getId() + " for note " + noteId);
|
||||
List<Interpreter> intpForNote = this.get(noteId);
|
||||
close(intpForNote);
|
||||
public void close(String sessionId) {
|
||||
LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionId);
|
||||
List<Interpreter> intpForSession = this.get(sessionId);
|
||||
close(intpForSession);
|
||||
}
|
||||
|
||||
private void close(Collection<Interpreter> intpToClose) {
|
||||
|
|
@ -196,13 +194,13 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
}
|
||||
|
||||
/**
|
||||
* Destroy all interpreter instances in this group for the note
|
||||
* @param noteId
|
||||
* Destroy all interpreter instances in this group for the session
|
||||
* @param sessionId
|
||||
*/
|
||||
public void destroy(String noteId) {
|
||||
LOGGER.info("Destroy interpreter group " + getId() + " for note " + noteId);
|
||||
List<Interpreter> intpForNote = this.get(noteId);
|
||||
destroy(intpForNote);
|
||||
public void destroy(String sessionId) {
|
||||
LOGGER.info("Destroy interpreter group " + getId() + " for session " + sessionId);
|
||||
List<Interpreter> intpForSession = this.get(sessionId);
|
||||
destroy(intpForSession);
|
||||
|
||||
if (remoteInterpreterProcess != null) {
|
||||
remoteInterpreterProcess.dereference();
|
||||
|
|
@ -220,8 +218,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
public void destroy() {
|
||||
LOGGER.info("Destroy interpreter group " + getId());
|
||||
List<Interpreter> intpToDestroy = new LinkedList<>();
|
||||
for (List<Interpreter> intpGroupForNote : this.values()) {
|
||||
intpToDestroy.addAll(intpGroupForNote);
|
||||
for (List<Interpreter> intpGroupForSession : this.values()) {
|
||||
intpToDestroy.addAll(intpGroupForSession);
|
||||
}
|
||||
destroy(intpToDestroy);
|
||||
|
||||
|
|
|
|||
|
|
@ -44,24 +44,24 @@ public class ZeppelinDevServer extends
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Interpreter getInterpreter(String noteId, String className) throws TException {
|
||||
protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
|
||||
synchronized (this) {
|
||||
InterpreterGroup interpreterGroup = getInterpreterGroup();
|
||||
if (interpreterGroup == null) {
|
||||
createInterpreter(
|
||||
"dev",
|
||||
noteId,
|
||||
sessionKey,
|
||||
DevInterpreter.class.getName(),
|
||||
new HashMap<String, String>());
|
||||
|
||||
Interpreter intp = super.getInterpreter(noteId, className);
|
||||
Interpreter intp = super.getInterpreter(sessionKey, className);
|
||||
interpreter = (DevInterpreter) (
|
||||
((LazyOpenInterpreter) intp).getInnerInterpreter());
|
||||
interpreter.setInterpreterEvent(this);
|
||||
notify();
|
||||
}
|
||||
}
|
||||
return super.getInterpreter(noteId, className);
|
||||
return super.getInterpreter(sessionKey, className);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private String interpreterPath;
|
||||
private String localRepoPath;
|
||||
private String className;
|
||||
private String noteId;
|
||||
private String sessionKey;
|
||||
FormType formType;
|
||||
boolean initialized;
|
||||
private Map<String, String> env;
|
||||
|
|
@ -66,7 +66,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
* Remote interpreter and manage interpreter process
|
||||
*/
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
String sessionKey,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
|
|
@ -78,7 +78,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
String userName,
|
||||
Boolean isUserImpersonate) {
|
||||
super(property);
|
||||
this.noteId = noteId;
|
||||
this.sessionKey = sessionKey;
|
||||
this.className = className;
|
||||
initialized = false;
|
||||
this.interpreterRunner = interpreterRunner;
|
||||
|
|
@ -99,7 +99,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
*/
|
||||
public RemoteInterpreter(
|
||||
Properties property,
|
||||
String noteId,
|
||||
String sessionKey,
|
||||
String className,
|
||||
String host,
|
||||
int port,
|
||||
|
|
@ -110,7 +110,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
String userName,
|
||||
Boolean isUserImpersonate) {
|
||||
super(property);
|
||||
this.noteId = noteId;
|
||||
this.sessionKey = sessionKey;
|
||||
this.className = className;
|
||||
initialized = false;
|
||||
this.host = host;
|
||||
|
|
@ -127,7 +127,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
// VisibleForTesting
|
||||
public RemoteInterpreter(
|
||||
Properties property,
|
||||
String noteId,
|
||||
String sessionKey,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
|
|
@ -140,7 +140,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
Boolean isUserImpersonate) {
|
||||
super(property);
|
||||
this.className = className;
|
||||
this.noteId = noteId;
|
||||
this.sessionKey = sessionKey;
|
||||
this.interpreterRunner = interpreterRunner;
|
||||
this.interpreterPath = interpreterPath;
|
||||
this.localRepoPath = localRepoPath;
|
||||
|
|
@ -238,7 +238,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
if (localRepoPath != null) {
|
||||
property.put("zeppelin.interpreter.localRepo", localRepoPath);
|
||||
}
|
||||
client.createInterpreter(groupId, noteId,
|
||||
client.createInterpreter(groupId, sessionKey,
|
||||
getClassName(), (Map) property);
|
||||
|
||||
// Push angular object loaded from JSON file to remote interpreter
|
||||
|
|
@ -266,7 +266,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
synchronized (interpreterGroup) {
|
||||
// initialize all interpreters in this interpreter group
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
for (Interpreter intp : new ArrayList<>(interpreters)) {
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
|
|
@ -292,7 +292,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
try {
|
||||
client = interpreterProcess.getClient();
|
||||
if (client != null) {
|
||||
client.close(noteId, className);
|
||||
client.close(sessionKey, className);
|
||||
}
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
|
|
@ -339,7 +339,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
final GUI currentGUI = context.getGui();
|
||||
RemoteInterpreterResult remoteResult = client.interpret(
|
||||
noteId, className, st, convert(context));
|
||||
sessionKey, className, st, convert(context));
|
||||
|
||||
Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
|
||||
remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
|
||||
|
|
@ -385,7 +385,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
client.cancel(noteId, className, convert(context));
|
||||
client.cancel(sessionKey, className, convert(context));
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
throw new InterpreterException(e);
|
||||
|
|
@ -413,7 +413,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
formType = FormType.valueOf(client.getFormType(noteId, className));
|
||||
formType = FormType.valueOf(client.getFormType(sessionKey, className));
|
||||
return formType;
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
|
|
@ -439,7 +439,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
return client.getProgress(noteId, className, convert(context));
|
||||
return client.getProgress(sessionKey, className, convert(context));
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
throw new InterpreterException(e);
|
||||
|
|
@ -461,7 +461,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
List completion = client.completion(noteId, className, buf, cursor);
|
||||
List completion = client.completion(sessionKey, className, buf, cursor);
|
||||
return completion;
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
|
|
@ -479,8 +479,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
return null;
|
||||
} else {
|
||||
return SchedulerFactory.singleton().createOrGetRemoteScheduler(
|
||||
RemoteInterpreter.class.getName() + noteId + interpreterProcess.hashCode(),
|
||||
noteId,
|
||||
RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
|
||||
sessionKey,
|
||||
interpreterProcess,
|
||||
maxConcurrency);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -157,7 +157,7 @@ public class RemoteInterpreterServer
|
|||
|
||||
|
||||
@Override
|
||||
public void createInterpreter(String interpreterGroupId, String noteId, String
|
||||
public void createInterpreter(String interpreterGroupId, String sessionKey, String
|
||||
className, Map<String, String> properties) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
interpreterGroup = new InterpreterGroup(interpreterGroupId);
|
||||
|
|
@ -185,10 +185,10 @@ public class RemoteInterpreterServer
|
|||
repl.setClassloaderUrls(new URL[]{});
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
if (interpreters == null) {
|
||||
interpreters = new LinkedList<>();
|
||||
interpreterGroup.put(noteId, interpreters);
|
||||
interpreterGroup.put(sessionKey, interpreters);
|
||||
}
|
||||
|
||||
interpreters.add(new LazyOpenInterpreter(repl));
|
||||
|
|
@ -229,13 +229,13 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
}
|
||||
|
||||
protected Interpreter getInterpreter(String noteId, String className) throws TException {
|
||||
protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
throw new TException(
|
||||
new InterpreterException("Interpreter instance " + className + " not created"));
|
||||
}
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
if (interpreters == null) {
|
||||
throw new TException(
|
||||
new InterpreterException("Interpreter " + className + " not initialized"));
|
||||
|
|
@ -257,13 +257,13 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close(String noteId, String className) throws TException {
|
||||
public void close(String sessionKey, String className) throws TException {
|
||||
// unload all applications
|
||||
for (String appId : runningApplications.keySet()) {
|
||||
RunningApplication appInfo = runningApplications.get(appId);
|
||||
|
||||
// see NoteInterpreterLoader.SHARED_SESSION
|
||||
if (appInfo.noteId.equals(noteId) || noteId.equals("shared_session")) {
|
||||
if (appInfo.noteId.equals(sessionKey) || sessionKey.equals("shared_session")) {
|
||||
try {
|
||||
logger.info("Unload App {} ", appInfo.pkg.getName());
|
||||
appInfo.app.unload();
|
||||
|
|
@ -277,7 +277,7 @@ public class RemoteInterpreterServer
|
|||
|
||||
// close interpreters
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
if (interpreters != null) {
|
||||
Iterator<Interpreter> it = interpreters.iterator();
|
||||
while (it.hasNext()) {
|
||||
|
|
@ -715,14 +715,14 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
@Override
|
||||
public String getStatus(String noteId, String jobId)
|
||||
public String getStatus(String sessionKey, String jobId)
|
||||
throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
return "Unknown";
|
||||
}
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
if (interpreters == null) {
|
||||
return "Unknown";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
namespace java org.apache.zeppelin.interpreter.thrift
|
||||
|
||||
struct RemoteInterpreterContext {
|
||||
1: string noteId,
|
||||
1: string sessionKey,
|
||||
2: string paragraphId,
|
||||
3: string replName,
|
||||
4: string paragraphTitle,
|
||||
|
|
@ -82,18 +82,18 @@ struct InterpreterCompletion {
|
|||
}
|
||||
|
||||
service RemoteInterpreterService {
|
||||
void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string className, 4: map<string, string> properties);
|
||||
void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties);
|
||||
|
||||
void open(1: string noteId, 2: string className);
|
||||
void close(1: string noteId, 2: string className);
|
||||
RemoteInterpreterResult interpret(1: string noteId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
|
||||
void cancel(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
i32 getProgress(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
string getFormType(1: string noteId, 2: string className);
|
||||
list<InterpreterCompletion> completion(1: string noteId, 2: string className, 3: string buf, 4: i32 cursor);
|
||||
void open(1: string sessionKey, 2: string className);
|
||||
void close(1: string sessionKey, 2: string className);
|
||||
RemoteInterpreterResult interpret(1: string sessionKey, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
|
||||
void cancel(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
i32 getProgress(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
string getFormType(1: string sessionKey, 2: string className);
|
||||
list<InterpreterCompletion> completion(1: string sessionKey, 2: string className, 3: string buf, 4: i32 cursor);
|
||||
void shutdown();
|
||||
|
||||
string getStatus(1: string noteId, 2:string jobId);
|
||||
string getStatus(1: string sessionKey, 2:string jobId);
|
||||
|
||||
RemoteInterpreterEvent getEvent();
|
||||
|
||||
|
|
@ -104,17 +104,17 @@ service RemoteInterpreterService {
|
|||
// get all resources in the interpreter process
|
||||
list<string> resourcePoolGetAll();
|
||||
// get value of resource
|
||||
binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName);
|
||||
binary resourceGet(1: string sessionKey, 2: string paragraphId, 3: string resourceName);
|
||||
// remove resource
|
||||
bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName);
|
||||
bool resourceRemove(1: string sessionKey, 2: string paragraphId, 3:string resourceName);
|
||||
|
||||
void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
|
||||
void angularObjectUpdate(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string
|
||||
object);
|
||||
void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
|
||||
void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
|
||||
void angularObjectAdd(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string object);
|
||||
void angularObjectRemove(1: string name, 2: string sessionKey, 3: string paragraphId);
|
||||
void angularRegistryPush(1: string registry);
|
||||
|
||||
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string noteId, 4: string paragraphId);
|
||||
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionKey, 4: string paragraphId);
|
||||
RemoteApplicationResult unloadApplication(1: string applicationInstanceId);
|
||||
RemoteApplicationResult runApplication(1: string applicationInstanceId);
|
||||
|
||||
|
|
|
|||
|
|
@ -50,7 +50,6 @@ import java.util.Set;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.internal.StringMap;
|
||||
|
|
@ -730,7 +729,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
|
||||
} else if (option.isSession()) {
|
||||
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
|
||||
String key = getInterpreterInstanceKey(user, noteId, interpreterSetting);
|
||||
String key = getInterpreterSessionKey(user, noteId, interpreterSetting);
|
||||
interpreterGroup.close(key);
|
||||
interpreterGroup.destroy(key);
|
||||
synchronized (interpreterGroup) {
|
||||
|
|
@ -743,7 +742,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
public void createInterpretersForNote(InterpreterSetting interpreterSetting, String user,
|
||||
String noteId, String key) {
|
||||
String noteId, String interpreterSessionKey) {
|
||||
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
|
||||
InterpreterOption option = interpreterSetting.getOption();
|
||||
Properties properties = (Properties) interpreterSetting.getProperties();
|
||||
|
|
@ -760,7 +759,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
long minTimeout = 10L * 1000 * 1000000; // 10 sec
|
||||
long interpreterRemovalWaitTimeout = Math.max(minTimeout,
|
||||
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
|
||||
while (interpreterGroup.containsKey(key)) {
|
||||
while (interpreterGroup.containsKey(interpreterSessionKey)) {
|
||||
if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) {
|
||||
throw new InterpreterException("Can not create interpreter");
|
||||
}
|
||||
|
|
@ -784,18 +783,18 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
connectToRemoteRepl(noteId, info.getClassName(), option.getHost(), option.getPort(),
|
||||
properties, user, option.isUserImpersonate);
|
||||
} else {
|
||||
interpreter = createRemoteRepl(path, key, info.getClassName(), properties,
|
||||
interpreterSetting.getId(), user, option.isUserImpersonate());
|
||||
interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(),
|
||||
properties, interpreterSetting.getId(), user, option.isUserImpersonate());
|
||||
}
|
||||
} else {
|
||||
interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties);
|
||||
}
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(key);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(interpreterSessionKey);
|
||||
if (null == interpreters) {
|
||||
interpreters = new ArrayList<>();
|
||||
interpreterGroup.put(key, interpreters);
|
||||
interpreterGroup.put(interpreterSessionKey, interpreters);
|
||||
}
|
||||
if (info.isDefaultInterpreter()) {
|
||||
interpreters.add(0, interpreter);
|
||||
|
|
@ -1105,25 +1104,27 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
private Interpreter connectToRemoteRepl(String noteId, String className, String host, int port,
|
||||
Properties property, String userName, Boolean isUserImpersonate) {
|
||||
private Interpreter connectToRemoteRepl(String interpreterSessionKey, String className,
|
||||
String host, int port, Properties property, String userName, Boolean isUserImpersonate) {
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(
|
||||
new RemoteInterpreter(property, noteId, className, host, port, connectTimeout, maxPoolSize,
|
||||
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate));
|
||||
new RemoteInterpreter(property, interpreterSessionKey, className, host, port,
|
||||
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
|
||||
userName, isUserImpersonate));
|
||||
return intp;
|
||||
}
|
||||
|
||||
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
|
||||
Properties property, String interpreterSettingId, String userName,
|
||||
Boolean isUserImpersonate) {
|
||||
private Interpreter createRemoteRepl(String interpreterPath, String interpreterSessionKey,
|
||||
String className, Properties property, String interpreterSettingId,
|
||||
String userName, Boolean isUserImpersonate) {
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
|
||||
RemoteInterpreter remoteInterpreter =
|
||||
new RemoteInterpreter(property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
new RemoteInterpreter(property, interpreterSessionKey, className,
|
||||
conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
|
||||
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
|
||||
remoteInterpreter.addEnv(env);
|
||||
|
|
@ -1177,21 +1178,23 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
private String getInterpreterInstanceKey(String user, String noteId, InterpreterSetting setting) {
|
||||
private String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) {
|
||||
InterpreterOption option = setting.getOption();
|
||||
String key;
|
||||
if (option.isExistingProcess()) {
|
||||
key = Constants.EXISTING_PROCESS;
|
||||
} else if (!option.perNoteShared()) {
|
||||
} else if (option.perNoteScoped() && option.perUserScoped()) {
|
||||
key = user + ":" + noteId;
|
||||
} else if (option.perUserScoped()) {
|
||||
key = user;
|
||||
} else if (option.perNoteScoped()) {
|
||||
key = noteId;
|
||||
if (shiroEnabled && !option.perUserShared()) {
|
||||
key = user + ":" + key;
|
||||
}
|
||||
} else {
|
||||
key = SHARED_SESSION;
|
||||
}
|
||||
|
||||
logger.debug("Interpreter instance key: {}", key);
|
||||
logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
|
||||
"{}", key, noteId, user, setting.getName());
|
||||
return key;
|
||||
}
|
||||
|
||||
|
|
@ -1199,11 +1202,11 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
InterpreterSetting setting) {
|
||||
InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId);
|
||||
synchronized (interpreterGroup) {
|
||||
String key = getInterpreterInstanceKey(user, noteId, setting);
|
||||
if (!interpreterGroup.containsKey(key)) {
|
||||
createInterpretersForNote(setting, user, noteId, key);
|
||||
String interpreterSessionKey = getInterpreterSessionKey(user, noteId, setting);
|
||||
if (!interpreterGroup.containsKey(interpreterSessionKey)) {
|
||||
createInterpretersForNote(setting, user, noteId, interpreterSessionKey);
|
||||
}
|
||||
return interpreterGroup.get(getInterpreterInstanceKey(user, noteId, setting));
|
||||
return interpreterGroup.get(interpreterSessionKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -130,7 +130,8 @@ public class InterpreterSetting {
|
|||
key = SHARED_PROCESS;
|
||||
}
|
||||
|
||||
logger.debug("getInterpreterProcessKey: {}", key);
|
||||
logger.debug("getInterpreterProcessKey: {} for InterpreterSetting Id: {}, Name: {}",
|
||||
key, getId(), getName());
|
||||
return key;
|
||||
}
|
||||
|
||||
|
|
@ -142,6 +143,7 @@ public class InterpreterSetting {
|
|||
interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
|
||||
|
||||
interpreterGroupWriteLock.lock();
|
||||
logger.debug("create interpreter group with groupId:" + interpreterGroupId);
|
||||
interpreterGroupRef.put(key, intpGroup);
|
||||
interpreterGroupWriteLock.unlock();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import org.apache.zeppelin.dep.DependencyResolver;
|
|||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter11;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
|
||||
|
|
@ -135,8 +136,8 @@ public class NoteInterpreterLoaderTest {
|
|||
factory.getInterpreterSettings("noteB").get(0).getOption().setPerNote(InterpreterOption.ISOLATED);
|
||||
|
||||
// interpreters are not created before accessing it
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
|
||||
|
||||
factory.getInterpreter("user", "noteA", null).open();
|
||||
factory.getInterpreter("user", "noteB", null).open();
|
||||
|
|
@ -147,16 +148,16 @@ public class NoteInterpreterLoaderTest {
|
|||
factory.getInterpreter("user", "noteB", null).getInterpreterGroup().getId()));
|
||||
|
||||
// interpreters are created after accessing it
|
||||
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
|
||||
|
||||
// when
|
||||
factory.closeNote("user", "noteA");
|
||||
factory.closeNote("user", "noteB");
|
||||
|
||||
// interpreters are destroyed after close
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue