Merge remote-tracking branch 'origin/master' into ZEPPELIN-212

This commit is contained in:
Lee moon soo 2016-11-26 08:05:42 -08:00
commit d5981d5160
11 changed files with 147 additions and 119 deletions

View file

@ -261,7 +261,7 @@
</goals>
<configuration>
<failOnViolation>true</failOnViolation>
<excludes>org/apache/zeppelin/interpreter/thrift/*</excludes>
<excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/scio/avro/*</excludes>
</configuration>
</execution>
<execution>
@ -271,7 +271,7 @@
<goal>checkstyle-aggregate</goal>
</goals>
<configuration>
<excludes>org/apache/zeppelin/interpreter/thrift/*</excludes>
<excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/scio/avro/*</excludes>
</configuration>
</execution>
</executions>

View file

@ -296,6 +296,10 @@ public class SparkInterpreter extends Interpreter {
return (DepInterpreter) p;
}
private boolean isYarnMode() {
return getProperty("master").startsWith("yarn");
}
/**
* Spark 2.x
* Create SparkSession
@ -319,6 +323,10 @@ public class SparkInterpreter extends Interpreter {
conf.set("spark.scheduler.mode", "FAIR");
conf.setMaster(getProperty("master"));
if (isYarnMode()) {
conf.set("master", "yarn");
conf.set("spark.submit.deployMode", "client");
}
Properties intpProperty = getProperty();
@ -510,7 +518,7 @@ public class SparkInterpreter extends Interpreter {
// Distributes needed libraries to workers
// when spark version is greater than or equal to 1.5.0
if (getProperty("master").equals("yarn-client")) {
if (isYarnMode()) {
conf.set("spark.yarn.isPython", "true");
}
}
@ -559,7 +567,7 @@ public class SparkInterpreter extends Interpreter {
@Override
public void open() {
// set properties and do login before creating any spark stuff for secured cluster
if (getProperty("master").equals("yarn-client")) {
if (isYarnMode()) {
System.setProperty("SPARK_YARN_MODE", "true");
}
if (getProperty().containsKey("spark.yarn.keytab") &&

View file

@ -32,9 +32,10 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
* For example spark, pyspark, sql interpreters are in the same 'spark' group
* and InterpreterGroup will have reference to these all interpreters.
*
* Remember, list of interpreters are dedicated to a note.
* (when InterpreterOption.session==true)
* So InterpreterGroup internally manages map of [noteId, list of interpreters]
* Remember, list of interpreters are dedicated to a session. Session could be shared across user
* or notes, so the sessionId could be user or noteId or their combination.
* So InterpreterGroup internally manages map of [interpreterSessionKey(noteId, user, or
* their combination), list of interpreters]
*
* A InterpreterGroup runs on interpreter process.
* And unit of interpreter instantiate, restart, bind, unbind.
@ -103,15 +104,12 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public Properties getProperty() {
Properties p = new Properties();
Collection<List<Interpreter>> intpGroupForANote = this.values();
if (intpGroupForANote != null && intpGroupForANote.size() > 0) {
for (List<Interpreter> intpGroup : intpGroupForANote) {
for (Interpreter intp : intpGroup) {
p.putAll(intp.getProperty());
}
// it's okay to break here while every List<Interpreters> will have the same property set
break;
for (List<Interpreter> intpGroupForASession : this.values()) {
for (Interpreter intp : intpGroupForASession) {
p.putAll(intp.getProperty());
}
// it's okay to break here while every List<Interpreters> will have the same property set
break;
}
return p;
}
@ -148,20 +146,20 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public void close() {
LOGGER.info("Close interpreter group " + getId());
List<Interpreter> intpToClose = new LinkedList<>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToClose.addAll(intpGroupForNote);
for (List<Interpreter> intpGroupForSession : this.values()) {
intpToClose.addAll(intpGroupForSession);
}
close(intpToClose);
}
/**
* Close all interpreter instances in this group for the note
* @param noteId
* Close all interpreter instances in this group for the session
* @param sessionId
*/
public void close(String noteId) {
LOGGER.info("Close interpreter group " + getId() + " for note " + noteId);
List<Interpreter> intpForNote = this.get(noteId);
close(intpForNote);
public void close(String sessionId) {
LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionId);
List<Interpreter> intpForSession = this.get(sessionId);
close(intpForSession);
}
private void close(Collection<Interpreter> intpToClose) {
@ -196,13 +194,13 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
}
/**
* Destroy all interpreter instances in this group for the note
* @param noteId
* Destroy all interpreter instances in this group for the session
* @param sessionId
*/
public void destroy(String noteId) {
LOGGER.info("Destroy interpreter group " + getId() + " for note " + noteId);
List<Interpreter> intpForNote = this.get(noteId);
destroy(intpForNote);
public void destroy(String sessionId) {
LOGGER.info("Destroy interpreter group " + getId() + " for session " + sessionId);
List<Interpreter> intpForSession = this.get(sessionId);
destroy(intpForSession);
if (remoteInterpreterProcess != null) {
remoteInterpreterProcess.dereference();
@ -220,8 +218,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public void destroy() {
LOGGER.info("Destroy interpreter group " + getId());
List<Interpreter> intpToDestroy = new LinkedList<>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToDestroy.addAll(intpGroupForNote);
for (List<Interpreter> intpGroupForSession : this.values()) {
intpToDestroy.addAll(intpGroupForSession);
}
destroy(intpToDestroy);

View file

@ -44,24 +44,24 @@ public class ZeppelinDevServer extends
}
@Override
protected Interpreter getInterpreter(String noteId, String className) throws TException {
protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
synchronized (this) {
InterpreterGroup interpreterGroup = getInterpreterGroup();
if (interpreterGroup == null) {
createInterpreter(
"dev",
noteId,
sessionKey,
DevInterpreter.class.getName(),
new HashMap<String, String>());
Interpreter intp = super.getInterpreter(noteId, className);
Interpreter intp = super.getInterpreter(sessionKey, className);
interpreter = (DevInterpreter) (
((LazyOpenInterpreter) intp).getInnerInterpreter());
interpreter.setInterpreterEvent(this);
notify();
}
}
return super.getInterpreter(noteId, className);
return super.getInterpreter(sessionKey, className);
}
@Override

View file

@ -52,7 +52,7 @@ public class RemoteInterpreter extends Interpreter {
private String interpreterPath;
private String localRepoPath;
private String className;
private String noteId;
private String sessionKey;
FormType formType;
boolean initialized;
private Map<String, String> env;
@ -67,7 +67,7 @@ public class RemoteInterpreter extends Interpreter {
* Remote interpreter and manage interpreter process
*/
public RemoteInterpreter(Properties property,
String noteId,
String sessionKey,
String className,
String interpreterRunner,
String interpreterPath,
@ -79,7 +79,7 @@ public class RemoteInterpreter extends Interpreter {
String userName,
Boolean isUserImpersonate) {
super(property);
this.noteId = noteId;
this.sessionKey = sessionKey;
this.className = className;
initialized = false;
this.interpreterRunner = interpreterRunner;
@ -100,7 +100,7 @@ public class RemoteInterpreter extends Interpreter {
*/
public RemoteInterpreter(
Properties property,
String noteId,
String sessionKey,
String className,
String host,
int port,
@ -111,7 +111,7 @@ public class RemoteInterpreter extends Interpreter {
String userName,
Boolean isUserImpersonate) {
super(property);
this.noteId = noteId;
this.sessionKey = sessionKey;
this.className = className;
initialized = false;
this.host = host;
@ -128,7 +128,7 @@ public class RemoteInterpreter extends Interpreter {
// VisibleForTesting
public RemoteInterpreter(
Properties property,
String noteId,
String sessionKey,
String className,
String interpreterRunner,
String interpreterPath,
@ -141,7 +141,7 @@ public class RemoteInterpreter extends Interpreter {
Boolean isUserImpersonate) {
super(property);
this.className = className;
this.noteId = noteId;
this.sessionKey = sessionKey;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.localRepoPath = localRepoPath;
@ -239,7 +239,7 @@ public class RemoteInterpreter extends Interpreter {
if (localRepoPath != null) {
property.put("zeppelin.interpreter.localRepo", localRepoPath);
}
client.createInterpreter(groupId, noteId,
client.createInterpreter(groupId, sessionKey,
getClassName(), (Map) property);
// Push angular object loaded from JSON file to remote interpreter
@ -267,7 +267,7 @@ public class RemoteInterpreter extends Interpreter {
synchronized (interpreterGroup) {
// initialize all interpreters in this interpreter group
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
for (Interpreter intp : new ArrayList<>(interpreters)) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
@ -293,7 +293,7 @@ public class RemoteInterpreter extends Interpreter {
try {
client = interpreterProcess.getClient();
if (client != null) {
client.close(noteId, className);
client.close(sessionKey, className);
}
} catch (TException e) {
broken = true;
@ -340,7 +340,7 @@ public class RemoteInterpreter extends Interpreter {
final GUI currentGUI = context.getGui();
RemoteInterpreterResult remoteResult = client.interpret(
noteId, className, st, convert(context));
sessionKey, className, st, convert(context));
Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
@ -386,7 +386,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
client.cancel(noteId, className, convert(context));
client.cancel(sessionKey, className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@ -414,7 +414,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
formType = FormType.valueOf(client.getFormType(noteId, className));
formType = FormType.valueOf(client.getFormType(sessionKey, className));
return formType;
} catch (TException e) {
broken = true;
@ -440,7 +440,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
return client.getProgress(noteId, className, convert(context));
return client.getProgress(sessionKey, className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@ -462,7 +462,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
List completion = client.completion(noteId, className, buf, cursor);
List completion = client.completion(sessionKey, className, buf, cursor);
return completion;
} catch (TException e) {
broken = true;
@ -480,8 +480,8 @@ public class RemoteInterpreter extends Interpreter {
return null;
} else {
return SchedulerFactory.singleton().createOrGetRemoteScheduler(
RemoteInterpreter.class.getName() + noteId + interpreterProcess.hashCode(),
noteId,
RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
sessionKey,
interpreterProcess,
maxConcurrency);
}

View file

@ -149,7 +149,7 @@ public class RemoteInterpreterServer
@Override
public void createInterpreter(String interpreterGroupId, String noteId, String
public void createInterpreter(String interpreterGroupId, String sessionKey, String
className, Map<String, String> properties) throws TException {
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
@ -178,10 +178,10 @@ public class RemoteInterpreterServer
repl.setClassloaderUrls(new URL[]{});
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters == null) {
interpreters = new LinkedList<>();
interpreterGroup.put(noteId, interpreters);
interpreterGroup.put(sessionKey, interpreters);
}
interpreters.add(new LazyOpenInterpreter(repl));
@ -222,13 +222,13 @@ public class RemoteInterpreterServer
}
}
protected Interpreter getInterpreter(String noteId, String className) throws TException {
protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
if (interpreterGroup == null) {
throw new TException(
new InterpreterException("Interpreter instance " + className + " not created"));
}
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters == null) {
throw new TException(
new InterpreterException("Interpreter " + className + " not initialized"));
@ -250,13 +250,13 @@ public class RemoteInterpreterServer
}
@Override
public void close(String noteId, String className) throws TException {
public void close(String sessionKey, String className) throws TException {
// unload all applications
for (String appId : runningApplications.keySet()) {
RunningApplication appInfo = runningApplications.get(appId);
// see NoteInterpreterLoader.SHARED_SESSION
if (appInfo.noteId.equals(noteId) || noteId.equals("shared_session")) {
if (appInfo.noteId.equals(sessionKey) || sessionKey.equals("shared_session")) {
try {
logger.info("Unload App {} ", appInfo.pkg.getName());
appInfo.app.unload();
@ -270,7 +270,7 @@ public class RemoteInterpreterServer
// close interpreters
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters != null) {
Iterator<Interpreter> it = interpreters.iterator();
while (it.hasNext()) {
@ -614,14 +614,14 @@ public class RemoteInterpreterServer
}
@Override
public String getStatus(String noteId, String jobId)
public String getStatus(String sessionKey, String jobId)
throws TException {
if (interpreterGroup == null) {
return "Unknown";
}
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
if (interpreters == null) {
return "Unknown";
}

View file

@ -19,7 +19,7 @@
namespace java org.apache.zeppelin.interpreter.thrift
struct RemoteInterpreterContext {
1: string noteId,
1: string sessionKey,
2: string paragraphId,
3: string replName,
4: string paragraphTitle,
@ -79,18 +79,18 @@ struct InterpreterCompletion {
}
service RemoteInterpreterService {
void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string className, 4: map<string, string> properties);
void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties);
void open(1: string noteId, 2: string className);
void close(1: string noteId, 2: string className);
RemoteInterpreterResult interpret(1: string noteId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
void cancel(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
i32 getProgress(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
string getFormType(1: string noteId, 2: string className);
list<InterpreterCompletion> completion(1: string noteId, 2: string className, 3: string buf, 4: i32 cursor);
void open(1: string sessionKey, 2: string className);
void close(1: string sessionKey, 2: string className);
RemoteInterpreterResult interpret(1: string sessionKey, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
void cancel(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
i32 getProgress(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
string getFormType(1: string sessionKey, 2: string className);
list<InterpreterCompletion> completion(1: string sessionKey, 2: string className, 3: string buf, 4: i32 cursor);
void shutdown();
string getStatus(1: string noteId, 2:string jobId);
string getStatus(1: string sessionKey, 2:string jobId);
RemoteInterpreterEvent getEvent();
@ -101,17 +101,17 @@ service RemoteInterpreterService {
// get all resources in the interpreter process
list<string> resourcePoolGetAll();
// get value of resource
binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName);
binary resourceGet(1: string sessionKey, 2: string paragraphId, 3: string resourceName);
// remove resource
bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName);
bool resourceRemove(1: string sessionKey, 2: string paragraphId, 3:string resourceName);
void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
void angularObjectUpdate(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string
object);
void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
void angularObjectAdd(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string object);
void angularObjectRemove(1: string name, 2: string sessionKey, 3: string paragraphId);
void angularRegistryPush(1: string registry);
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string noteId, 4: string paragraphId);
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionKey, 4: string paragraphId);
RemoteApplicationResult unloadApplication(1: string applicationInstanceId);
RemoteApplicationResult runApplication(1: string applicationInstanceId);
}

View file

@ -135,6 +135,12 @@
if (!config.results) {
config.results = {};
}
if (!config.editorSetting) {
config.editorSetting = {};
} else if (config.editorSetting.editOnDblClick) {
editorSetting.isOutputHidden = config.editorSetting.editOnDblClick;
}
};
$scope.$on('updateParagraphOutput', function(event, data) {
@ -205,9 +211,15 @@
$scope.originalText = angular.copy(data);
$scope.dirtyText = undefined;
if (editorSetting.editOnDblClick) {
if ($scope.paragraph.config.editorSetting.editOnDblClick) {
closeEditorAndOpenTable();
} else if (editorSetting.isOutputHidden &&
!$scope.paragraph.config.editorSetting.editOnDblClick) {
// %md/%angular repl make output to be hidden by default after running
// so should open output if repl changed from %md/%angular to another
openEditorAndOpenTable();
}
editorSetting.isOutputHidden = $scope.paragraph.config.editorSetting.editOnDblClick;
};
$scope.saveParagraph = function() {
@ -330,11 +342,15 @@
manageEditorAndTableState(true, false);
};
var manageEditorAndTableState = function(showEditor, showTable) {
var openEditorAndOpenTable = function() {
manageEditorAndTableState(false, false);
};
var manageEditorAndTableState = function(hideEditor, hideTable) {
var newParams = angular.copy($scope.paragraph.settings.params);
var newConfig = angular.copy($scope.paragraph.config);
newConfig.editorHide = showEditor;
newConfig.tableHide = showTable;
newConfig.editorHide = hideEditor;
newConfig.tableHide = hideTable;
commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams);
};
@ -645,7 +661,7 @@
// or the first 30 characters of the paragraph have been modified
// or cursor position is at beginning of second line.(in case user hit enter after typing %magic)
if ((typeof pos === 'undefined') || (pos.row === 0 && pos.column < 30) ||
(pos.row === 1 && pos.column === 0) || pastePercentSign || $scope.paragraphFocused) {
(pos.row === 1 && pos.column === 0) || pastePercentSign) {
// If paragraph loading, use config value if exists
if ((typeof pos === 'undefined') && $scope.paragraph.config.editorMode) {
session.setMode($scope.paragraph.config.editorMode);
@ -656,7 +672,7 @@
getEditorSetting(magic)
.then(function(setting) {
setEditorLanguage(session, setting.editor.language);
_.merge(editorSetting, setting.editor);
_.merge($scope.paragraph.config.editorSetting, setting.editor);
});
}
}
@ -665,7 +681,7 @@
};
var getInterpreterName = function(paragraphText) {
var intpNameRegexp = /%(.+?)\s/g;
var intpNameRegexp = /^\s*%(.+?)\s/g;
var match = intpNameRegexp.exec(paragraphText);
if (match) {
return match[1].trim();
@ -1135,7 +1151,7 @@
$scope.$on('doubleClickParagraph', function(event, paragraphId) {
if ($scope.paragraph.id === paragraphId && $scope.paragraph.config.editorHide &&
editorSetting.editOnDblClick) {
$scope.paragraph.config.editorSetting.editOnDblClick) {
var deferred = $q.defer();
openEditorAndCloseTable();
$timeout(

View file

@ -50,7 +50,6 @@ import java.util.Set;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.internal.StringMap;
@ -740,7 +739,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
} else if (option.isSession()) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
String key = getInterpreterInstanceKey(user, noteId, interpreterSetting);
String key = getInterpreterSessionKey(user, noteId, interpreterSetting);
interpreterGroup.close(key);
interpreterGroup.destroy(key);
synchronized (interpreterGroup) {
@ -753,7 +752,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
public void createInterpretersForNote(InterpreterSetting interpreterSetting, String user,
String noteId, String key) {
String noteId, String interpreterSessionKey) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
InterpreterOption option = interpreterSetting.getOption();
Properties properties = (Properties) interpreterSetting.getProperties();
@ -770,7 +769,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
long minTimeout = 10L * 1000 * 1000000; // 10 sec
long interpreterRemovalWaitTimeout = Math.max(minTimeout,
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
while (interpreterGroup.containsKey(key)) {
while (interpreterGroup.containsKey(interpreterSessionKey)) {
if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) {
throw new InterpreterException("Can not create interpreter");
}
@ -794,18 +793,18 @@ public class InterpreterFactory implements InterpreterGroupFactory {
connectToRemoteRepl(noteId, info.getClassName(), option.getHost(), option.getPort(),
properties, user, option.isUserImpersonate);
} else {
interpreter = createRemoteRepl(path, key, info.getClassName(), properties,
interpreterSetting.getId(), user, option.isUserImpersonate());
interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(),
properties, interpreterSetting.getId(), user, option.isUserImpersonate());
}
} else {
interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties);
}
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(key);
List<Interpreter> interpreters = interpreterGroup.get(interpreterSessionKey);
if (null == interpreters) {
interpreters = new ArrayList<>();
interpreterGroup.put(key, interpreters);
interpreterGroup.put(interpreterSessionKey, interpreters);
}
if (info.isDefaultInterpreter()) {
interpreters.add(0, interpreter);
@ -1115,25 +1114,27 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
private Interpreter connectToRemoteRepl(String noteId, String className, String host, int port,
Properties property, String userName, Boolean isUserImpersonate) {
private Interpreter connectToRemoteRepl(String interpreterSessionKey, String className,
String host, int port, Properties property, String userName, Boolean isUserImpersonate) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(
new RemoteInterpreter(property, noteId, className, host, port, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate));
new RemoteInterpreter(property, interpreterSessionKey, className, host, port,
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
userName, isUserImpersonate));
return intp;
}
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
Properties property, String interpreterSettingId, String userName,
Boolean isUserImpersonate) {
private Interpreter createRemoteRepl(String interpreterPath, String interpreterSessionKey,
String className, Properties property, String interpreterSettingId,
String userName, Boolean isUserImpersonate) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
RemoteInterpreter remoteInterpreter =
new RemoteInterpreter(property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
new RemoteInterpreter(property, interpreterSessionKey, className,
conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
remoteInterpreter.addEnv(env);
@ -1187,21 +1188,23 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
private String getInterpreterInstanceKey(String user, String noteId, InterpreterSetting setting) {
private String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) {
InterpreterOption option = setting.getOption();
String key;
if (option.isExistingProcess()) {
key = Constants.EXISTING_PROCESS;
} else if (!option.perNoteShared()) {
} else if (option.perNoteScoped() && option.perUserScoped()) {
key = user + ":" + noteId;
} else if (option.perUserScoped()) {
key = user;
} else if (option.perNoteScoped()) {
key = noteId;
if (shiroEnabled && !option.perUserShared()) {
key = user + ":" + key;
}
} else {
key = SHARED_SESSION;
}
logger.debug("Interpreter instance key: {}", key);
logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
"{}", key, noteId, user, setting.getName());
return key;
}
@ -1209,11 +1212,11 @@ public class InterpreterFactory implements InterpreterGroupFactory {
InterpreterSetting setting) {
InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId);
synchronized (interpreterGroup) {
String key = getInterpreterInstanceKey(user, noteId, setting);
if (!interpreterGroup.containsKey(key)) {
createInterpretersForNote(setting, user, noteId, key);
String interpreterSessionKey = getInterpreterSessionKey(user, noteId, setting);
if (!interpreterGroup.containsKey(interpreterSessionKey)) {
createInterpretersForNote(setting, user, noteId, interpreterSessionKey);
}
return interpreterGroup.get(getInterpreterInstanceKey(user, noteId, setting));
return interpreterGroup.get(interpreterSessionKey);
}
}

View file

@ -130,7 +130,8 @@ public class InterpreterSetting {
key = SHARED_PROCESS;
}
logger.debug("getInterpreterProcessKey: {}", key);
logger.debug("getInterpreterProcessKey: {} for InterpreterSetting Id: {}, Name: {}",
key, getId(), getName());
return key;
}
@ -142,6 +143,7 @@ public class InterpreterSetting {
interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
interpreterGroupWriteLock.lock();
logger.debug("create interpreter group with groupId:" + interpreterGroupId);
interpreterGroupRef.put(key, intpGroup);
interpreterGroupWriteLock.unlock();
}

View file

@ -27,6 +27,7 @@ import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter11;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
@ -135,8 +136,8 @@ public class NoteInterpreterLoaderTest {
factory.getInterpreterSettings("noteB").get(0).getOption().setPerNote(InterpreterOption.ISOLATED);
// interpreters are not created before accessing it
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
factory.getInterpreter("user", "noteA", null).open();
factory.getInterpreter("user", "noteB", null).open();
@ -147,16 +148,16 @@ public class NoteInterpreterLoaderTest {
factory.getInterpreter("user", "noteB", null).getInterpreterGroup().getId()));
// interpreters are created after accessing it
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
// when
factory.closeNote("user", "noteA");
factory.closeNote("user", "noteB");
// interpreters are destroyed after close
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
}