mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge remote-tracking branch 'origin/master' into ZEPPELIN-212
This commit is contained in:
commit
d5981d5160
11 changed files with 147 additions and 119 deletions
4
pom.xml
4
pom.xml
|
|
@ -261,7 +261,7 @@
|
|||
</goals>
|
||||
<configuration>
|
||||
<failOnViolation>true</failOnViolation>
|
||||
<excludes>org/apache/zeppelin/interpreter/thrift/*</excludes>
|
||||
<excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/scio/avro/*</excludes>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
|
|
@ -271,7 +271,7 @@
|
|||
<goal>checkstyle-aggregate</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<excludes>org/apache/zeppelin/interpreter/thrift/*</excludes>
|
||||
<excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/scio/avro/*</excludes>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
|
|
|
|||
|
|
@ -296,6 +296,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
return (DepInterpreter) p;
|
||||
}
|
||||
|
||||
private boolean isYarnMode() {
|
||||
return getProperty("master").startsWith("yarn");
|
||||
}
|
||||
|
||||
/**
|
||||
* Spark 2.x
|
||||
* Create SparkSession
|
||||
|
|
@ -319,6 +323,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
conf.set("spark.scheduler.mode", "FAIR");
|
||||
conf.setMaster(getProperty("master"));
|
||||
if (isYarnMode()) {
|
||||
conf.set("master", "yarn");
|
||||
conf.set("spark.submit.deployMode", "client");
|
||||
}
|
||||
|
||||
Properties intpProperty = getProperty();
|
||||
|
||||
|
|
@ -510,7 +518,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
// Distributes needed libraries to workers
|
||||
// when spark version is greater than or equal to 1.5.0
|
||||
if (getProperty("master").equals("yarn-client")) {
|
||||
if (isYarnMode()) {
|
||||
conf.set("spark.yarn.isPython", "true");
|
||||
}
|
||||
}
|
||||
|
|
@ -559,7 +567,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
@Override
|
||||
public void open() {
|
||||
// set properties and do login before creating any spark stuff for secured cluster
|
||||
if (getProperty("master").equals("yarn-client")) {
|
||||
if (isYarnMode()) {
|
||||
System.setProperty("SPARK_YARN_MODE", "true");
|
||||
}
|
||||
if (getProperty().containsKey("spark.yarn.keytab") &&
|
||||
|
|
|
|||
|
|
@ -32,9 +32,10 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
|
|||
* For example spark, pyspark, sql interpreters are in the same 'spark' group
|
||||
* and InterpreterGroup will have reference to these all interpreters.
|
||||
*
|
||||
* Remember, list of interpreters are dedicated to a note.
|
||||
* (when InterpreterOption.session==true)
|
||||
* So InterpreterGroup internally manages map of [noteId, list of interpreters]
|
||||
* Remember, list of interpreters are dedicated to a session. Session could be shared across user
|
||||
* or notes, so the sessionId could be user or noteId or their combination.
|
||||
* So InterpreterGroup internally manages map of [interpreterSessionKey(noteId, user, or
|
||||
* their combination), list of interpreters]
|
||||
*
|
||||
* A InterpreterGroup runs on interpreter process.
|
||||
* And unit of interpreter instantiate, restart, bind, unbind.
|
||||
|
|
@ -103,15 +104,12 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
public Properties getProperty() {
|
||||
Properties p = new Properties();
|
||||
|
||||
Collection<List<Interpreter>> intpGroupForANote = this.values();
|
||||
if (intpGroupForANote != null && intpGroupForANote.size() > 0) {
|
||||
for (List<Interpreter> intpGroup : intpGroupForANote) {
|
||||
for (Interpreter intp : intpGroup) {
|
||||
p.putAll(intp.getProperty());
|
||||
}
|
||||
// it's okay to break here while every List<Interpreters> will have the same property set
|
||||
break;
|
||||
for (List<Interpreter> intpGroupForASession : this.values()) {
|
||||
for (Interpreter intp : intpGroupForASession) {
|
||||
p.putAll(intp.getProperty());
|
||||
}
|
||||
// it's okay to break here while every List<Interpreters> will have the same property set
|
||||
break;
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
|
@ -148,20 +146,20 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
public void close() {
|
||||
LOGGER.info("Close interpreter group " + getId());
|
||||
List<Interpreter> intpToClose = new LinkedList<>();
|
||||
for (List<Interpreter> intpGroupForNote : this.values()) {
|
||||
intpToClose.addAll(intpGroupForNote);
|
||||
for (List<Interpreter> intpGroupForSession : this.values()) {
|
||||
intpToClose.addAll(intpGroupForSession);
|
||||
}
|
||||
close(intpToClose);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all interpreter instances in this group for the note
|
||||
* @param noteId
|
||||
* Close all interpreter instances in this group for the session
|
||||
* @param sessionId
|
||||
*/
|
||||
public void close(String noteId) {
|
||||
LOGGER.info("Close interpreter group " + getId() + " for note " + noteId);
|
||||
List<Interpreter> intpForNote = this.get(noteId);
|
||||
close(intpForNote);
|
||||
public void close(String sessionId) {
|
||||
LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionId);
|
||||
List<Interpreter> intpForSession = this.get(sessionId);
|
||||
close(intpForSession);
|
||||
}
|
||||
|
||||
private void close(Collection<Interpreter> intpToClose) {
|
||||
|
|
@ -196,13 +194,13 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
}
|
||||
|
||||
/**
|
||||
* Destroy all interpreter instances in this group for the note
|
||||
* @param noteId
|
||||
* Destroy all interpreter instances in this group for the session
|
||||
* @param sessionId
|
||||
*/
|
||||
public void destroy(String noteId) {
|
||||
LOGGER.info("Destroy interpreter group " + getId() + " for note " + noteId);
|
||||
List<Interpreter> intpForNote = this.get(noteId);
|
||||
destroy(intpForNote);
|
||||
public void destroy(String sessionId) {
|
||||
LOGGER.info("Destroy interpreter group " + getId() + " for session " + sessionId);
|
||||
List<Interpreter> intpForSession = this.get(sessionId);
|
||||
destroy(intpForSession);
|
||||
|
||||
if (remoteInterpreterProcess != null) {
|
||||
remoteInterpreterProcess.dereference();
|
||||
|
|
@ -220,8 +218,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
public void destroy() {
|
||||
LOGGER.info("Destroy interpreter group " + getId());
|
||||
List<Interpreter> intpToDestroy = new LinkedList<>();
|
||||
for (List<Interpreter> intpGroupForNote : this.values()) {
|
||||
intpToDestroy.addAll(intpGroupForNote);
|
||||
for (List<Interpreter> intpGroupForSession : this.values()) {
|
||||
intpToDestroy.addAll(intpGroupForSession);
|
||||
}
|
||||
destroy(intpToDestroy);
|
||||
|
||||
|
|
|
|||
|
|
@ -44,24 +44,24 @@ public class ZeppelinDevServer extends
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Interpreter getInterpreter(String noteId, String className) throws TException {
|
||||
protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
|
||||
synchronized (this) {
|
||||
InterpreterGroup interpreterGroup = getInterpreterGroup();
|
||||
if (interpreterGroup == null) {
|
||||
createInterpreter(
|
||||
"dev",
|
||||
noteId,
|
||||
sessionKey,
|
||||
DevInterpreter.class.getName(),
|
||||
new HashMap<String, String>());
|
||||
|
||||
Interpreter intp = super.getInterpreter(noteId, className);
|
||||
Interpreter intp = super.getInterpreter(sessionKey, className);
|
||||
interpreter = (DevInterpreter) (
|
||||
((LazyOpenInterpreter) intp).getInnerInterpreter());
|
||||
interpreter.setInterpreterEvent(this);
|
||||
notify();
|
||||
}
|
||||
}
|
||||
return super.getInterpreter(noteId, className);
|
||||
return super.getInterpreter(sessionKey, className);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private String interpreterPath;
|
||||
private String localRepoPath;
|
||||
private String className;
|
||||
private String noteId;
|
||||
private String sessionKey;
|
||||
FormType formType;
|
||||
boolean initialized;
|
||||
private Map<String, String> env;
|
||||
|
|
@ -67,7 +67,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
* Remote interpreter and manage interpreter process
|
||||
*/
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
String sessionKey,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
|
|
@ -79,7 +79,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
String userName,
|
||||
Boolean isUserImpersonate) {
|
||||
super(property);
|
||||
this.noteId = noteId;
|
||||
this.sessionKey = sessionKey;
|
||||
this.className = className;
|
||||
initialized = false;
|
||||
this.interpreterRunner = interpreterRunner;
|
||||
|
|
@ -100,7 +100,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
*/
|
||||
public RemoteInterpreter(
|
||||
Properties property,
|
||||
String noteId,
|
||||
String sessionKey,
|
||||
String className,
|
||||
String host,
|
||||
int port,
|
||||
|
|
@ -111,7 +111,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
String userName,
|
||||
Boolean isUserImpersonate) {
|
||||
super(property);
|
||||
this.noteId = noteId;
|
||||
this.sessionKey = sessionKey;
|
||||
this.className = className;
|
||||
initialized = false;
|
||||
this.host = host;
|
||||
|
|
@ -128,7 +128,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
// VisibleForTesting
|
||||
public RemoteInterpreter(
|
||||
Properties property,
|
||||
String noteId,
|
||||
String sessionKey,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
|
|
@ -141,7 +141,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
Boolean isUserImpersonate) {
|
||||
super(property);
|
||||
this.className = className;
|
||||
this.noteId = noteId;
|
||||
this.sessionKey = sessionKey;
|
||||
this.interpreterRunner = interpreterRunner;
|
||||
this.interpreterPath = interpreterPath;
|
||||
this.localRepoPath = localRepoPath;
|
||||
|
|
@ -239,7 +239,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
if (localRepoPath != null) {
|
||||
property.put("zeppelin.interpreter.localRepo", localRepoPath);
|
||||
}
|
||||
client.createInterpreter(groupId, noteId,
|
||||
client.createInterpreter(groupId, sessionKey,
|
||||
getClassName(), (Map) property);
|
||||
|
||||
// Push angular object loaded from JSON file to remote interpreter
|
||||
|
|
@ -267,7 +267,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
synchronized (interpreterGroup) {
|
||||
// initialize all interpreters in this interpreter group
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
for (Interpreter intp : new ArrayList<>(interpreters)) {
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
|
|
@ -293,7 +293,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
try {
|
||||
client = interpreterProcess.getClient();
|
||||
if (client != null) {
|
||||
client.close(noteId, className);
|
||||
client.close(sessionKey, className);
|
||||
}
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
|
|
@ -340,7 +340,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
final GUI currentGUI = context.getGui();
|
||||
RemoteInterpreterResult remoteResult = client.interpret(
|
||||
noteId, className, st, convert(context));
|
||||
sessionKey, className, st, convert(context));
|
||||
|
||||
Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
|
||||
remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
|
||||
|
|
@ -386,7 +386,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
client.cancel(noteId, className, convert(context));
|
||||
client.cancel(sessionKey, className, convert(context));
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
throw new InterpreterException(e);
|
||||
|
|
@ -414,7 +414,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
formType = FormType.valueOf(client.getFormType(noteId, className));
|
||||
formType = FormType.valueOf(client.getFormType(sessionKey, className));
|
||||
return formType;
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
|
|
@ -440,7 +440,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
return client.getProgress(noteId, className, convert(context));
|
||||
return client.getProgress(sessionKey, className, convert(context));
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
throw new InterpreterException(e);
|
||||
|
|
@ -462,7 +462,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
List completion = client.completion(noteId, className, buf, cursor);
|
||||
List completion = client.completion(sessionKey, className, buf, cursor);
|
||||
return completion;
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
|
|
@ -480,8 +480,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
return null;
|
||||
} else {
|
||||
return SchedulerFactory.singleton().createOrGetRemoteScheduler(
|
||||
RemoteInterpreter.class.getName() + noteId + interpreterProcess.hashCode(),
|
||||
noteId,
|
||||
RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
|
||||
sessionKey,
|
||||
interpreterProcess,
|
||||
maxConcurrency);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -149,7 +149,7 @@ public class RemoteInterpreterServer
|
|||
|
||||
|
||||
@Override
|
||||
public void createInterpreter(String interpreterGroupId, String noteId, String
|
||||
public void createInterpreter(String interpreterGroupId, String sessionKey, String
|
||||
className, Map<String, String> properties) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
interpreterGroup = new InterpreterGroup(interpreterGroupId);
|
||||
|
|
@ -178,10 +178,10 @@ public class RemoteInterpreterServer
|
|||
repl.setClassloaderUrls(new URL[]{});
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
if (interpreters == null) {
|
||||
interpreters = new LinkedList<>();
|
||||
interpreterGroup.put(noteId, interpreters);
|
||||
interpreterGroup.put(sessionKey, interpreters);
|
||||
}
|
||||
|
||||
interpreters.add(new LazyOpenInterpreter(repl));
|
||||
|
|
@ -222,13 +222,13 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
}
|
||||
|
||||
protected Interpreter getInterpreter(String noteId, String className) throws TException {
|
||||
protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
throw new TException(
|
||||
new InterpreterException("Interpreter instance " + className + " not created"));
|
||||
}
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
if (interpreters == null) {
|
||||
throw new TException(
|
||||
new InterpreterException("Interpreter " + className + " not initialized"));
|
||||
|
|
@ -250,13 +250,13 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close(String noteId, String className) throws TException {
|
||||
public void close(String sessionKey, String className) throws TException {
|
||||
// unload all applications
|
||||
for (String appId : runningApplications.keySet()) {
|
||||
RunningApplication appInfo = runningApplications.get(appId);
|
||||
|
||||
// see NoteInterpreterLoader.SHARED_SESSION
|
||||
if (appInfo.noteId.equals(noteId) || noteId.equals("shared_session")) {
|
||||
if (appInfo.noteId.equals(sessionKey) || sessionKey.equals("shared_session")) {
|
||||
try {
|
||||
logger.info("Unload App {} ", appInfo.pkg.getName());
|
||||
appInfo.app.unload();
|
||||
|
|
@ -270,7 +270,7 @@ public class RemoteInterpreterServer
|
|||
|
||||
// close interpreters
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
if (interpreters != null) {
|
||||
Iterator<Interpreter> it = interpreters.iterator();
|
||||
while (it.hasNext()) {
|
||||
|
|
@ -614,14 +614,14 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
@Override
|
||||
public String getStatus(String noteId, String jobId)
|
||||
public String getStatus(String sessionKey, String jobId)
|
||||
throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
return "Unknown";
|
||||
}
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
|
||||
if (interpreters == null) {
|
||||
return "Unknown";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
namespace java org.apache.zeppelin.interpreter.thrift
|
||||
|
||||
struct RemoteInterpreterContext {
|
||||
1: string noteId,
|
||||
1: string sessionKey,
|
||||
2: string paragraphId,
|
||||
3: string replName,
|
||||
4: string paragraphTitle,
|
||||
|
|
@ -79,18 +79,18 @@ struct InterpreterCompletion {
|
|||
}
|
||||
|
||||
service RemoteInterpreterService {
|
||||
void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string className, 4: map<string, string> properties);
|
||||
void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties);
|
||||
|
||||
void open(1: string noteId, 2: string className);
|
||||
void close(1: string noteId, 2: string className);
|
||||
RemoteInterpreterResult interpret(1: string noteId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
|
||||
void cancel(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
i32 getProgress(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
string getFormType(1: string noteId, 2: string className);
|
||||
list<InterpreterCompletion> completion(1: string noteId, 2: string className, 3: string buf, 4: i32 cursor);
|
||||
void open(1: string sessionKey, 2: string className);
|
||||
void close(1: string sessionKey, 2: string className);
|
||||
RemoteInterpreterResult interpret(1: string sessionKey, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
|
||||
void cancel(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
i32 getProgress(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
string getFormType(1: string sessionKey, 2: string className);
|
||||
list<InterpreterCompletion> completion(1: string sessionKey, 2: string className, 3: string buf, 4: i32 cursor);
|
||||
void shutdown();
|
||||
|
||||
string getStatus(1: string noteId, 2:string jobId);
|
||||
string getStatus(1: string sessionKey, 2:string jobId);
|
||||
|
||||
RemoteInterpreterEvent getEvent();
|
||||
|
||||
|
|
@ -101,17 +101,17 @@ service RemoteInterpreterService {
|
|||
// get all resources in the interpreter process
|
||||
list<string> resourcePoolGetAll();
|
||||
// get value of resource
|
||||
binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName);
|
||||
binary resourceGet(1: string sessionKey, 2: string paragraphId, 3: string resourceName);
|
||||
// remove resource
|
||||
bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName);
|
||||
bool resourceRemove(1: string sessionKey, 2: string paragraphId, 3:string resourceName);
|
||||
|
||||
void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
|
||||
void angularObjectUpdate(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string
|
||||
object);
|
||||
void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
|
||||
void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
|
||||
void angularObjectAdd(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string object);
|
||||
void angularObjectRemove(1: string name, 2: string sessionKey, 3: string paragraphId);
|
||||
void angularRegistryPush(1: string registry);
|
||||
|
||||
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string noteId, 4: string paragraphId);
|
||||
RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionKey, 4: string paragraphId);
|
||||
RemoteApplicationResult unloadApplication(1: string applicationInstanceId);
|
||||
RemoteApplicationResult runApplication(1: string applicationInstanceId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -135,6 +135,12 @@
|
|||
if (!config.results) {
|
||||
config.results = {};
|
||||
}
|
||||
|
||||
if (!config.editorSetting) {
|
||||
config.editorSetting = {};
|
||||
} else if (config.editorSetting.editOnDblClick) {
|
||||
editorSetting.isOutputHidden = config.editorSetting.editOnDblClick;
|
||||
}
|
||||
};
|
||||
|
||||
$scope.$on('updateParagraphOutput', function(event, data) {
|
||||
|
|
@ -205,9 +211,15 @@
|
|||
$scope.originalText = angular.copy(data);
|
||||
$scope.dirtyText = undefined;
|
||||
|
||||
if (editorSetting.editOnDblClick) {
|
||||
if ($scope.paragraph.config.editorSetting.editOnDblClick) {
|
||||
closeEditorAndOpenTable();
|
||||
} else if (editorSetting.isOutputHidden &&
|
||||
!$scope.paragraph.config.editorSetting.editOnDblClick) {
|
||||
// %md/%angular repl make output to be hidden by default after running
|
||||
// so should open output if repl changed from %md/%angular to another
|
||||
openEditorAndOpenTable();
|
||||
}
|
||||
editorSetting.isOutputHidden = $scope.paragraph.config.editorSetting.editOnDblClick;
|
||||
};
|
||||
|
||||
$scope.saveParagraph = function() {
|
||||
|
|
@ -330,11 +342,15 @@
|
|||
manageEditorAndTableState(true, false);
|
||||
};
|
||||
|
||||
var manageEditorAndTableState = function(showEditor, showTable) {
|
||||
var openEditorAndOpenTable = function() {
|
||||
manageEditorAndTableState(false, false);
|
||||
};
|
||||
|
||||
var manageEditorAndTableState = function(hideEditor, hideTable) {
|
||||
var newParams = angular.copy($scope.paragraph.settings.params);
|
||||
var newConfig = angular.copy($scope.paragraph.config);
|
||||
newConfig.editorHide = showEditor;
|
||||
newConfig.tableHide = showTable;
|
||||
newConfig.editorHide = hideEditor;
|
||||
newConfig.tableHide = hideTable;
|
||||
|
||||
commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams);
|
||||
};
|
||||
|
|
@ -645,7 +661,7 @@
|
|||
// or the first 30 characters of the paragraph have been modified
|
||||
// or cursor position is at beginning of second line.(in case user hit enter after typing %magic)
|
||||
if ((typeof pos === 'undefined') || (pos.row === 0 && pos.column < 30) ||
|
||||
(pos.row === 1 && pos.column === 0) || pastePercentSign || $scope.paragraphFocused) {
|
||||
(pos.row === 1 && pos.column === 0) || pastePercentSign) {
|
||||
// If paragraph loading, use config value if exists
|
||||
if ((typeof pos === 'undefined') && $scope.paragraph.config.editorMode) {
|
||||
session.setMode($scope.paragraph.config.editorMode);
|
||||
|
|
@ -656,7 +672,7 @@
|
|||
getEditorSetting(magic)
|
||||
.then(function(setting) {
|
||||
setEditorLanguage(session, setting.editor.language);
|
||||
_.merge(editorSetting, setting.editor);
|
||||
_.merge($scope.paragraph.config.editorSetting, setting.editor);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -665,7 +681,7 @@
|
|||
};
|
||||
|
||||
var getInterpreterName = function(paragraphText) {
|
||||
var intpNameRegexp = /%(.+?)\s/g;
|
||||
var intpNameRegexp = /^\s*%(.+?)\s/g;
|
||||
var match = intpNameRegexp.exec(paragraphText);
|
||||
if (match) {
|
||||
return match[1].trim();
|
||||
|
|
@ -1135,7 +1151,7 @@
|
|||
|
||||
$scope.$on('doubleClickParagraph', function(event, paragraphId) {
|
||||
if ($scope.paragraph.id === paragraphId && $scope.paragraph.config.editorHide &&
|
||||
editorSetting.editOnDblClick) {
|
||||
$scope.paragraph.config.editorSetting.editOnDblClick) {
|
||||
var deferred = $q.defer();
|
||||
openEditorAndCloseTable();
|
||||
$timeout(
|
||||
|
|
|
|||
|
|
@ -50,7 +50,6 @@ import java.util.Set;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.internal.StringMap;
|
||||
|
|
@ -740,7 +739,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
|
||||
} else if (option.isSession()) {
|
||||
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
|
||||
String key = getInterpreterInstanceKey(user, noteId, interpreterSetting);
|
||||
String key = getInterpreterSessionKey(user, noteId, interpreterSetting);
|
||||
interpreterGroup.close(key);
|
||||
interpreterGroup.destroy(key);
|
||||
synchronized (interpreterGroup) {
|
||||
|
|
@ -753,7 +752,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
public void createInterpretersForNote(InterpreterSetting interpreterSetting, String user,
|
||||
String noteId, String key) {
|
||||
String noteId, String interpreterSessionKey) {
|
||||
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
|
||||
InterpreterOption option = interpreterSetting.getOption();
|
||||
Properties properties = (Properties) interpreterSetting.getProperties();
|
||||
|
|
@ -770,7 +769,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
long minTimeout = 10L * 1000 * 1000000; // 10 sec
|
||||
long interpreterRemovalWaitTimeout = Math.max(minTimeout,
|
||||
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
|
||||
while (interpreterGroup.containsKey(key)) {
|
||||
while (interpreterGroup.containsKey(interpreterSessionKey)) {
|
||||
if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) {
|
||||
throw new InterpreterException("Can not create interpreter");
|
||||
}
|
||||
|
|
@ -794,18 +793,18 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
connectToRemoteRepl(noteId, info.getClassName(), option.getHost(), option.getPort(),
|
||||
properties, user, option.isUserImpersonate);
|
||||
} else {
|
||||
interpreter = createRemoteRepl(path, key, info.getClassName(), properties,
|
||||
interpreterSetting.getId(), user, option.isUserImpersonate());
|
||||
interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(),
|
||||
properties, interpreterSetting.getId(), user, option.isUserImpersonate());
|
||||
}
|
||||
} else {
|
||||
interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties);
|
||||
}
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(key);
|
||||
List<Interpreter> interpreters = interpreterGroup.get(interpreterSessionKey);
|
||||
if (null == interpreters) {
|
||||
interpreters = new ArrayList<>();
|
||||
interpreterGroup.put(key, interpreters);
|
||||
interpreterGroup.put(interpreterSessionKey, interpreters);
|
||||
}
|
||||
if (info.isDefaultInterpreter()) {
|
||||
interpreters.add(0, interpreter);
|
||||
|
|
@ -1115,25 +1114,27 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
private Interpreter connectToRemoteRepl(String noteId, String className, String host, int port,
|
||||
Properties property, String userName, Boolean isUserImpersonate) {
|
||||
private Interpreter connectToRemoteRepl(String interpreterSessionKey, String className,
|
||||
String host, int port, Properties property, String userName, Boolean isUserImpersonate) {
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(
|
||||
new RemoteInterpreter(property, noteId, className, host, port, connectTimeout, maxPoolSize,
|
||||
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate));
|
||||
new RemoteInterpreter(property, interpreterSessionKey, className, host, port,
|
||||
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
|
||||
userName, isUserImpersonate));
|
||||
return intp;
|
||||
}
|
||||
|
||||
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
|
||||
Properties property, String interpreterSettingId, String userName,
|
||||
Boolean isUserImpersonate) {
|
||||
private Interpreter createRemoteRepl(String interpreterPath, String interpreterSessionKey,
|
||||
String className, Properties property, String interpreterSettingId,
|
||||
String userName, Boolean isUserImpersonate) {
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
|
||||
RemoteInterpreter remoteInterpreter =
|
||||
new RemoteInterpreter(property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
new RemoteInterpreter(property, interpreterSessionKey, className,
|
||||
conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
|
||||
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
|
||||
remoteInterpreter.addEnv(env);
|
||||
|
|
@ -1187,21 +1188,23 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
private String getInterpreterInstanceKey(String user, String noteId, InterpreterSetting setting) {
|
||||
private String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) {
|
||||
InterpreterOption option = setting.getOption();
|
||||
String key;
|
||||
if (option.isExistingProcess()) {
|
||||
key = Constants.EXISTING_PROCESS;
|
||||
} else if (!option.perNoteShared()) {
|
||||
} else if (option.perNoteScoped() && option.perUserScoped()) {
|
||||
key = user + ":" + noteId;
|
||||
} else if (option.perUserScoped()) {
|
||||
key = user;
|
||||
} else if (option.perNoteScoped()) {
|
||||
key = noteId;
|
||||
if (shiroEnabled && !option.perUserShared()) {
|
||||
key = user + ":" + key;
|
||||
}
|
||||
} else {
|
||||
key = SHARED_SESSION;
|
||||
}
|
||||
|
||||
logger.debug("Interpreter instance key: {}", key);
|
||||
logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
|
||||
"{}", key, noteId, user, setting.getName());
|
||||
return key;
|
||||
}
|
||||
|
||||
|
|
@ -1209,11 +1212,11 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
InterpreterSetting setting) {
|
||||
InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId);
|
||||
synchronized (interpreterGroup) {
|
||||
String key = getInterpreterInstanceKey(user, noteId, setting);
|
||||
if (!interpreterGroup.containsKey(key)) {
|
||||
createInterpretersForNote(setting, user, noteId, key);
|
||||
String interpreterSessionKey = getInterpreterSessionKey(user, noteId, setting);
|
||||
if (!interpreterGroup.containsKey(interpreterSessionKey)) {
|
||||
createInterpretersForNote(setting, user, noteId, interpreterSessionKey);
|
||||
}
|
||||
return interpreterGroup.get(getInterpreterInstanceKey(user, noteId, setting));
|
||||
return interpreterGroup.get(interpreterSessionKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -130,7 +130,8 @@ public class InterpreterSetting {
|
|||
key = SHARED_PROCESS;
|
||||
}
|
||||
|
||||
logger.debug("getInterpreterProcessKey: {}", key);
|
||||
logger.debug("getInterpreterProcessKey: {} for InterpreterSetting Id: {}, Name: {}",
|
||||
key, getId(), getName());
|
||||
return key;
|
||||
}
|
||||
|
||||
|
|
@ -142,6 +143,7 @@ public class InterpreterSetting {
|
|||
interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
|
||||
|
||||
interpreterGroupWriteLock.lock();
|
||||
logger.debug("create interpreter group with groupId:" + interpreterGroupId);
|
||||
interpreterGroupRef.put(key, intpGroup);
|
||||
interpreterGroupWriteLock.unlock();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import org.apache.zeppelin.dep.DependencyResolver;
|
|||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter11;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
|
||||
|
|
@ -135,8 +136,8 @@ public class NoteInterpreterLoaderTest {
|
|||
factory.getInterpreterSettings("noteB").get(0).getOption().setPerNote(InterpreterOption.ISOLATED);
|
||||
|
||||
// interpreters are not created before accessing it
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
|
||||
|
||||
factory.getInterpreter("user", "noteA", null).open();
|
||||
factory.getInterpreter("user", "noteB", null).open();
|
||||
|
|
@ -147,16 +148,16 @@ public class NoteInterpreterLoaderTest {
|
|||
factory.getInterpreter("user", "noteB", null).getInterpreterGroup().getId()));
|
||||
|
||||
// interpreters are created after accessing it
|
||||
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
|
||||
assertNotNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
|
||||
|
||||
// when
|
||||
factory.closeNote("user", "noteA");
|
||||
factory.closeNote("user", "noteB");
|
||||
|
||||
// interpreters are destroyed after close
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("noteA"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("noteB"));
|
||||
assertNull(factory.getInterpreterSettings("noteA").get(0).getInterpreterGroup("user", "noteA").get("shared_session"));
|
||||
assertNull(factory.getInterpreterSettings("noteB").get(0).getInterpreterGroup("user", "noteB").get("shared_session"));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue