mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
zeppelin-interpreter note session support
This commit is contained in:
parent
5787984def
commit
ed1ab0d213
19 changed files with 1525 additions and 350 deletions
|
|
@ -23,13 +23,22 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import org.apache.log4j.Logger;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
||||
/**
|
||||
* InterpreterGroup is list of interpreters in the same group.
|
||||
* InterpreterGroup is list of interpreters in the same interpreter group.
|
||||
* For example spark, pyspark, sql interpreters are in the same 'spark' group
|
||||
* and InterpreterGroup will have reference to these all interpreters.
|
||||
*
|
||||
* Remember, list of interpreters are dedicated to a note.
|
||||
* (when InterpreterOption.perNoteSession==true)
|
||||
* So InterpreterGroup internally manages map of [noteId, list of interpreters]
|
||||
*
|
||||
* A InterpreterGroup runs on interpreter process.
|
||||
* And unit of interpreter instantiate, restart, bind, unbind.
|
||||
*/
|
||||
public class InterpreterGroup extends LinkedList<Interpreter>{
|
||||
public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter>> {
|
||||
String id;
|
||||
|
||||
Logger LOGGER = Logger.getLogger(InterpreterGroup.class);
|
||||
|
|
@ -38,10 +47,14 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
|
|||
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
|
||||
ResourcePool resourcePool;
|
||||
|
||||
// map [notebook session, Interpreters in the group], to support per note session interpreters
|
||||
//Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
|
||||
// List<Interpreter>>();
|
||||
|
||||
private static final Map<String, InterpreterGroup> allInterpreterGroups =
|
||||
new ConcurrentHashMap<String, InterpreterGroup>();
|
||||
|
||||
public static InterpreterGroup get(String id) {
|
||||
public static InterpreterGroup getByInterpreterGroupId(String id) {
|
||||
return allInterpreterGroups.get(id);
|
||||
}
|
||||
|
||||
|
|
@ -49,11 +62,18 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
|
|||
return new LinkedList(allInterpreterGroups.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create InterpreterGroup with given id
|
||||
* @param id
|
||||
*/
|
||||
public InterpreterGroup(String id) {
|
||||
this.id = id;
|
||||
allInterpreterGroups.put(id, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create InterpreterGroup with autogenerated id
|
||||
*/
|
||||
public InterpreterGroup() {
|
||||
getId();
|
||||
allInterpreterGroups.put(id, this);
|
||||
|
|
@ -73,10 +93,22 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get combined property of all interpreters in this group
|
||||
* @return
|
||||
*/
|
||||
public Properties getProperty() {
|
||||
Properties p = new Properties();
|
||||
for (Interpreter intp : this) {
|
||||
p.putAll(intp.getProperty());
|
||||
|
||||
Collection<List<Interpreter>> intpGroupForANote = this.values();
|
||||
if (intpGroupForANote != null && intpGroupForANote.size() > 0) {
|
||||
for (List<Interpreter> intpGroup : intpGroupForANote) {
|
||||
for (Interpreter intp : intpGroup) {
|
||||
p.putAll(intp.getProperty());
|
||||
}
|
||||
// it's okay to break here while vevery List<Interpreters> will have the same property set
|
||||
break;
|
||||
}
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
|
@ -97,10 +129,32 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
|
|||
this.remoteInterpreterProcess = remoteInterpreterProcess;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Close all interpreter instances in this group
|
||||
*/
|
||||
public void close() {
|
||||
List<Interpreter> intpToClose = new LinkedList<Interpreter>();
|
||||
for (List<Interpreter> intpGroupForNote : this.values()) {
|
||||
intpToClose.addAll(intpGroupForNote);
|
||||
}
|
||||
close(intpToClose);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all interpreter instances in this group for the note
|
||||
* @param noteId
|
||||
*/
|
||||
public void close(String noteId) {
|
||||
List<Interpreter> intpForNote = this.get(noteId);
|
||||
close(intpForNote);
|
||||
}
|
||||
|
||||
private void close(Collection<Interpreter> intpToClose) {
|
||||
List<Thread> closeThreads = new LinkedList<Thread>();
|
||||
|
||||
for (final Interpreter intp : this) {
|
||||
for (final Interpreter intp : intpToClose) {
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
intp.close();
|
||||
|
|
@ -120,10 +174,40 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroy all interpreter instances in this group for the note
|
||||
* @param noteId
|
||||
*/
|
||||
public void destroy(String noteId) {
|
||||
List<Interpreter> intpForNote = this.get(noteId);
|
||||
destroy(intpForNote);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Destroy all interpreter instances in this group
|
||||
*/
|
||||
public void destroy() {
|
||||
List<Interpreter> intpToDestroy = new LinkedList<Interpreter>();
|
||||
for (List<Interpreter> intpGroupForNote : this.values()) {
|
||||
intpToDestroy.addAll(intpGroupForNote);
|
||||
}
|
||||
destroy(intpToDestroy);
|
||||
|
||||
// make sure remote interpreter process terminates
|
||||
if (remoteInterpreterProcess != null) {
|
||||
while (remoteInterpreterProcess.referenceCount() > 0) {
|
||||
remoteInterpreterProcess.dereference();
|
||||
}
|
||||
}
|
||||
|
||||
allInterpreterGroups.remove(id);
|
||||
}
|
||||
|
||||
private void destroy(Collection<Interpreter> intpToDestroy) {
|
||||
List<Thread> destroyThreads = new LinkedList<Thread>();
|
||||
|
||||
for (final Interpreter intp : this) {
|
||||
for (final Interpreter intp : intpToDestroy) {
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
intp.destroy();
|
||||
|
|
@ -141,17 +225,10 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
|
|||
LOGGER.error("Can't close interpreter", e);
|
||||
}
|
||||
}
|
||||
|
||||
// make sure remote interpreter process terminates
|
||||
if (remoteInterpreterProcess != null) {
|
||||
while (remoteInterpreterProcess.referenceCount() > 0) {
|
||||
remoteInterpreterProcess.dereference();
|
||||
}
|
||||
}
|
||||
|
||||
allInterpreterGroups.remove(id);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setResourcePool(ResourcePool resourcePool) {
|
||||
this.resourcePool = resourcePool;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,19 +47,7 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
|
|||
}
|
||||
|
||||
private RemoteInterpreterProcess getRemoteInterpreterProcess() {
|
||||
if (interpreterGroup.size() == 0) {
|
||||
throw new RuntimeException("Can't get remoteInterpreterProcess");
|
||||
}
|
||||
Interpreter p = interpreterGroup.get(0);
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
|
||||
if (p instanceof RemoteInterpreter) {
|
||||
return ((RemoteInterpreter) p).getInterpreterProcess();
|
||||
} else {
|
||||
throw new RuntimeException("Can't get remoteInterpreterProcess");
|
||||
}
|
||||
return interpreterGroup.getRemoteInterpreterProcess();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -17,10 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
|
|
@ -53,6 +50,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private String interpreterPath;
|
||||
private String localRepoPath;
|
||||
private String className;
|
||||
private String noteId;
|
||||
FormType formType;
|
||||
boolean initialized;
|
||||
private Map<String, String> env;
|
||||
|
|
@ -60,6 +58,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private int maxPoolSize;
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
|
|
@ -68,6 +67,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
super(property);
|
||||
this.noteId = noteId;
|
||||
this.className = className;
|
||||
initialized = false;
|
||||
this.interpreterRunner = interpreterRunner;
|
||||
|
|
@ -80,6 +80,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
|
|
@ -89,6 +90,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
super(property);
|
||||
this.className = className;
|
||||
this.noteId = noteId;
|
||||
this.interpreterRunner = interpreterRunner;
|
||||
this.interpreterPath = interpreterPath;
|
||||
this.localRepoPath = localRepoPath;
|
||||
|
|
@ -123,39 +125,32 @@ public class RemoteInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized void init() {
|
||||
public synchronized void init() {
|
||||
if (initialized == true) {
|
||||
return;
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
int rc = interpreterProcess.reference(getInterpreterGroup());
|
||||
interpreterProcess.setMaxPoolSize(this.maxPoolSize);
|
||||
synchronized (interpreterProcess) {
|
||||
// when first process created
|
||||
if (rc == 1) {
|
||||
// create all interpreter class in this interpreter group
|
||||
Client client = null;
|
||||
try {
|
||||
client = interpreterProcess.getClient();
|
||||
} catch (Exception e1) {
|
||||
throw new InterpreterException(e1);
|
||||
}
|
||||
|
||||
boolean broken = false;
|
||||
try {
|
||||
for (Interpreter intp : this.getInterpreterGroup()) {
|
||||
logger.info("Create remote interpreter {}", intp.getClassName());
|
||||
property.put("zeppelin.interpreter.localRepo", localRepoPath);
|
||||
client.createInterpreter(getInterpreterGroup().getId(),
|
||||
intp.getClassName(), (Map) property);
|
||||
}
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
throw new InterpreterException(e);
|
||||
} finally {
|
||||
interpreterProcess.releaseClient(client, broken);
|
||||
}
|
||||
synchronized (interpreterProcess) {
|
||||
Client client = null;
|
||||
try {
|
||||
client = interpreterProcess.getClient();
|
||||
} catch (Exception e1) {
|
||||
throw new InterpreterException(e1);
|
||||
}
|
||||
|
||||
boolean broken = false;
|
||||
try {
|
||||
logger.info("Create remote interpreter {}", getClassName());
|
||||
property.put("zeppelin.interpreter.localRepo", localRepoPath);
|
||||
client.createInterpreter(getInterpreterGroup().getId(), noteId,
|
||||
getClassName(), (Map) property);
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
throw new InterpreterException(e);
|
||||
} finally {
|
||||
interpreterProcess.releaseClient(client, broken);
|
||||
}
|
||||
}
|
||||
initialized = true;
|
||||
|
|
@ -165,19 +160,36 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void open() {
|
||||
init();
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
|
||||
InterpreterGroup interpreterGroup = getInterpreterGroup();
|
||||
int rc = interpreterProcess.reference(getInterpreterGroup());
|
||||
interpreterProcess.setMaxPoolSize(
|
||||
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
// initialize all interpreters in this interpreter group
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
for (Interpreter intp : interpreters) {
|
||||
((RemoteInterpreter) intp).init();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
|
||||
SchedulerFactory.singleton()
|
||||
.removeScheduler("remoteinterpreter_" + interpreterProcess.hashCode());
|
||||
|
||||
Client client = null;
|
||||
|
||||
boolean broken = false;
|
||||
try {
|
||||
client = interpreterProcess.getClient();
|
||||
if (client != null) {
|
||||
client.close(className);
|
||||
client.close(noteId, className);
|
||||
}
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
|
|
@ -219,7 +231,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
boolean broken = false;
|
||||
try {
|
||||
GUI settings = context.getGui();
|
||||
RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context));
|
||||
RemoteInterpreterResult remoteResult = client.interpret(
|
||||
noteId, className, st, convert(context));
|
||||
|
||||
Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
|
||||
remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
|
||||
|
|
@ -256,7 +269,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
client.cancel(className, convert(context));
|
||||
client.cancel(noteId, className, convert(context));
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
throw new InterpreterException(e);
|
||||
|
|
@ -284,7 +297,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
formType = FormType.valueOf(client.getFormType(className));
|
||||
formType = FormType.valueOf(client.getFormType(noteId, className));
|
||||
return formType;
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
|
|
@ -310,7 +323,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
return client.getProgress(className, convert(context));
|
||||
return client.getProgress(noteId, className, convert(context));
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
throw new InterpreterException(e);
|
||||
|
|
@ -332,7 +345,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
return client.completion(className, buf, cursor);
|
||||
return client.completion(noteId, className, buf, cursor);
|
||||
} catch (TException e) {
|
||||
broken = true;
|
||||
throw new InterpreterException(e);
|
||||
|
|
@ -349,7 +362,9 @@ public class RemoteInterpreter extends Interpreter {
|
|||
return null;
|
||||
} else {
|
||||
return SchedulerFactory.singleton().createOrGetRemoteScheduler(
|
||||
"remoteinterpreter_" + interpreterProcess.hashCode(), interpreterProcess,
|
||||
"remoteinterpreter_" + interpreterProcess.hashCode(),
|
||||
noteId,
|
||||
interpreterProcess,
|
||||
maxConcurrency);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -244,7 +244,8 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
}
|
||||
|
||||
private Object getResource(ResourceId resourceId) {
|
||||
InterpreterGroup intpGroup = InterpreterGroup.get(resourceId.getResourcePoolId());
|
||||
InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
|
||||
resourceId.getResourcePoolId());
|
||||
if (intpGroup == null) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -281,6 +281,15 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
clientPool.setMaxTotal(size + 2);
|
||||
}
|
||||
}
|
||||
|
||||
public int getMaxPoolSize() {
|
||||
if (clientPool != null) {
|
||||
return clientPool.getMaxTotal();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when angular object is updated in client side to propagate
|
||||
* change to the remote process
|
||||
|
|
|
|||
|
|
@ -142,10 +142,9 @@ public class RemoteInterpreterServer
|
|||
|
||||
|
||||
@Override
|
||||
public void createInterpreter(String interpreterGroupId, String className, Map<String, String>
|
||||
properties)
|
||||
throws TException {
|
||||
|
||||
public void createInterpreter(String interpreterGroupId, String noteId, String
|
||||
className,
|
||||
Map<String, String> properties) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
interpreterGroup = new InterpreterGroup(interpreterGroupId);
|
||||
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
|
||||
|
|
@ -167,8 +166,13 @@ public class RemoteInterpreterServer
|
|||
repl.setClassloaderUrls(new URL[]{});
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
interpreterGroup.add(new LazyOpenInterpreter(
|
||||
new ClassloaderInterpreter(repl, cl)));
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
if (interpreters == null) {
|
||||
interpreters = new LinkedList<Interpreter>();
|
||||
interpreterGroup.put(noteId, interpreters);
|
||||
}
|
||||
|
||||
interpreters.add(new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl)));
|
||||
}
|
||||
|
||||
logger.info("Instantiate interpreter {}", className);
|
||||
|
|
@ -181,9 +185,18 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
}
|
||||
|
||||
private Interpreter getInterpreter(String className) throws TException {
|
||||
private Interpreter getInterpreter(String noteId, String className) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
throw new TException(
|
||||
new InterpreterException("Interpreter instance " + className + " not created"));
|
||||
}
|
||||
synchronized (interpreterGroup) {
|
||||
for (Interpreter inp : interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
if (interpreters == null) {
|
||||
throw new TException(
|
||||
new InterpreterException("Interpreter " + className + " not initialized"));
|
||||
}
|
||||
for (Interpreter inp : interpreters) {
|
||||
if (inp.getClassName().equals(className)) {
|
||||
return inp;
|
||||
}
|
||||
|
|
@ -194,23 +207,23 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
@Override
|
||||
public void open(String className) throws TException {
|
||||
Interpreter intp = getInterpreter(className);
|
||||
public void open(String noteId, String className) throws TException {
|
||||
Interpreter intp = getInterpreter(noteId, className);
|
||||
intp.open();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(String className) throws TException {
|
||||
Interpreter intp = getInterpreter(className);
|
||||
public void close(String noteId, String className) throws TException {
|
||||
Interpreter intp = getInterpreter(noteId, className);
|
||||
intp.close();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RemoteInterpreterResult interpret(String className, String st,
|
||||
public RemoteInterpreterResult interpret(String noteId, String className, String st,
|
||||
RemoteInterpreterContext interpreterContext) throws TException {
|
||||
logger.debug("st: {}", st);
|
||||
Interpreter intp = getInterpreter(className);
|
||||
Interpreter intp = getInterpreter(noteId, className);
|
||||
InterpreterContext context = convert(interpreterContext);
|
||||
|
||||
Scheduler scheduler = intp.getScheduler();
|
||||
|
|
@ -343,10 +356,10 @@ public class RemoteInterpreterServer
|
|||
|
||||
|
||||
@Override
|
||||
public void cancel(String className, RemoteInterpreterContext interpreterContext)
|
||||
public void cancel(String noteId, String className, RemoteInterpreterContext interpreterContext)
|
||||
throws TException {
|
||||
logger.info("cancel {} {}", className, interpreterContext.getParagraphId());
|
||||
Interpreter intp = getInterpreter(className);
|
||||
Interpreter intp = getInterpreter(noteId, className);
|
||||
String jobId = interpreterContext.getParagraphId();
|
||||
Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
|
||||
|
||||
|
|
@ -358,22 +371,24 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(String className, RemoteInterpreterContext interpreterContext)
|
||||
public int getProgress(String noteId, String className,
|
||||
RemoteInterpreterContext interpreterContext)
|
||||
throws TException {
|
||||
Interpreter intp = getInterpreter(className);
|
||||
Interpreter intp = getInterpreter(noteId, className);
|
||||
return intp.getProgress(convert(interpreterContext));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getFormType(String className) throws TException {
|
||||
Interpreter intp = getInterpreter(className);
|
||||
public String getFormType(String noteId, String className) throws TException {
|
||||
Interpreter intp = getInterpreter(noteId, className);
|
||||
return intp.getFormType().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String className, String buf, int cursor) throws TException {
|
||||
Interpreter intp = getInterpreter(className);
|
||||
public List<String> completion(String noteId, String className, String buf, int cursor)
|
||||
throws TException {
|
||||
Interpreter intp = getInterpreter(noteId, className);
|
||||
return intp.completion(buf, cursor);
|
||||
}
|
||||
|
||||
|
|
@ -442,14 +457,19 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
@Override
|
||||
public String getStatus(String jobId)
|
||||
public String getStatus(String noteId, String jobId)
|
||||
throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
return "Unknown";
|
||||
}
|
||||
|
||||
synchronized (interpreterGroup) {
|
||||
for (Interpreter intp : interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
if (interpreters == null) {
|
||||
return "Unknown";
|
||||
}
|
||||
|
||||
for (Interpreter intp : interpreters) {
|
||||
for (Job job : intp.getScheduler().getJobsRunning()) {
|
||||
if (jobId.equals(job.getId())) {
|
||||
return job.getStatus().name();
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-2-2")
|
||||
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-2-2")
|
||||
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-2-2")
|
||||
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -45,14 +45,16 @@ public class RemoteScheduler implements Scheduler {
|
|||
boolean terminate = false;
|
||||
private String name;
|
||||
private int maxConcurrency;
|
||||
private final String noteId;
|
||||
private RemoteInterpreterProcess interpreterProcess;
|
||||
|
||||
public RemoteScheduler(String name, ExecutorService executor,
|
||||
public RemoteScheduler(String name, ExecutorService executor, String noteId,
|
||||
RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
|
||||
int maxConcurrency) {
|
||||
this.name = name;
|
||||
this.executor = executor;
|
||||
this.listener = listener;
|
||||
this.noteId = noteId;
|
||||
this.interpreterProcess = interpreterProcess;
|
||||
this.maxConcurrency = maxConcurrency;
|
||||
}
|
||||
|
|
@ -257,7 +259,7 @@ public class RemoteScheduler implements Scheduler {
|
|||
|
||||
boolean broken = false;
|
||||
try {
|
||||
String statusStr = client.getStatus(job.getId());
|
||||
String statusStr = client.getStatus(noteId, job.getId());
|
||||
if ("Unknown".equals(statusStr)) {
|
||||
// not found this job in the remote schedulers.
|
||||
// maybe not submitted, maybe already finished
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@ public class SchedulerFactory implements SchedulerListener {
|
|||
|
||||
public Scheduler createOrGetRemoteScheduler(
|
||||
String name,
|
||||
String noteId,
|
||||
RemoteInterpreterProcess interpreterProcess,
|
||||
int maxConcurrency) {
|
||||
|
||||
|
|
@ -94,6 +95,7 @@ public class SchedulerFactory implements SchedulerListener {
|
|||
Scheduler s = new RemoteScheduler(
|
||||
name,
|
||||
executor,
|
||||
noteId,
|
||||
interpreterProcess,
|
||||
this,
|
||||
maxConcurrency);
|
||||
|
|
|
|||
|
|
@ -55,18 +55,18 @@ struct RemoteInterpreterEvent {
|
|||
}
|
||||
|
||||
service RemoteInterpreterService {
|
||||
void createInterpreter(1: string intpGroupId, 2: string className, 3: map<string, string> properties);
|
||||
void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string className, 4: map<string, string> properties);
|
||||
|
||||
void open(1: string className);
|
||||
void close(1: string className);
|
||||
RemoteInterpreterResult interpret(1: string className, 2: string st, 3: RemoteInterpreterContext interpreterContext);
|
||||
void cancel(1: string className, 2: RemoteInterpreterContext interpreterContext);
|
||||
i32 getProgress(1: string className, 2: RemoteInterpreterContext interpreterContext);
|
||||
string getFormType(1: string className);
|
||||
list<string> completion(1: string className, 2: string buf, 3: i32 cursor);
|
||||
void open(1: string noteId, 2: string className);
|
||||
void close(1: string noteId, 2: string className);
|
||||
RemoteInterpreterResult interpret(1: string noteId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
|
||||
void cancel(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
i32 getProgress(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
|
||||
string getFormType(1: string noteId, 2: string className);
|
||||
list<string> completion(1: string noteId, 2: string className, 3: string buf, 4: i32 cursor);
|
||||
void shutdown();
|
||||
|
||||
string getStatus(1:string jobId);
|
||||
string getStatus(1: string noteId, 2:string jobId);
|
||||
|
||||
RemoteInterpreterEvent getEvent();
|
||||
|
||||
|
|
|
|||
|
|
@ -29,10 +29,7 @@ import org.apache.zeppelin.display.AngularObject;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
|
||||
import org.apache.zeppelin.resource.LocalResourcePool;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
|
@ -67,6 +64,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
|
|||
|
||||
intp = new RemoteInterpreter(
|
||||
p,
|
||||
"note",
|
||||
MockInterpreterAngular.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -76,7 +74,8 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
|
|||
null
|
||||
);
|
||||
|
||||
intpGroup.add(intp);
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup.get("note").add(intp);
|
||||
intp.setInterpreterGroup(intpGroup);
|
||||
|
||||
context = new InterpreterContext(
|
||||
|
|
|
|||
|
|
@ -43,6 +43,8 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
intpGroup = new InterpreterGroup();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
env = new HashMap<String, String>();
|
||||
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
|
||||
}
|
||||
|
|
@ -56,6 +58,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
|
|||
private RemoteInterpreter createMockInterpreter() {
|
||||
RemoteInterpreter intp = new RemoteInterpreter(
|
||||
new Properties(),
|
||||
"note",
|
||||
MockInterpreterOutputStream.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -64,7 +67,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
|
|||
10 * 1000,
|
||||
this);
|
||||
|
||||
intpGroup.add(intp);
|
||||
intpGroup.get("note").add(intp);
|
||||
intp.setInterpreterGroup(intpGroup);
|
||||
return intp;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,10 +30,7 @@ import java.util.Properties;
|
|||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
|
||||
|
|
@ -65,8 +62,13 @@ public class RemoteInterpreterTest {
|
|||
}
|
||||
|
||||
private RemoteInterpreter createMockInterpreterA(Properties p) {
|
||||
return createMockInterpreterA(p, "note");
|
||||
}
|
||||
|
||||
private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) {
|
||||
return new RemoteInterpreter(
|
||||
p,
|
||||
noteId,
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -77,8 +79,13 @@ public class RemoteInterpreterTest {
|
|||
}
|
||||
|
||||
private RemoteInterpreter createMockInterpreterB(Properties p) {
|
||||
return createMockInterpreterB(p, "note");
|
||||
}
|
||||
|
||||
private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) {
|
||||
return new RemoteInterpreter(
|
||||
p,
|
||||
noteId,
|
||||
MockInterpreterB.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -91,15 +98,17 @@ public class RemoteInterpreterTest {
|
|||
@Test
|
||||
public void testRemoteInterperterCall() throws TTransportException, IOException {
|
||||
Properties p = new Properties();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
RemoteInterpreter intpA = createMockInterpreterA(p);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpGroup.get("note").add(intpA);
|
||||
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
RemoteInterpreter intpB = createMockInterpreterB(p);
|
||||
|
||||
intpGroup.add(intpB);
|
||||
intpGroup.get("note").add(intpB);
|
||||
intpB.setInterpreterGroup(intpGroup);
|
||||
|
||||
|
||||
|
|
@ -145,7 +154,8 @@ public class RemoteInterpreterTest {
|
|||
|
||||
RemoteInterpreter intpA = createMockInterpreterA(p);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpA.open();
|
||||
|
|
@ -167,9 +177,11 @@ public class RemoteInterpreterTest {
|
|||
@Test
|
||||
public void testRemoteSchedulerSharing() throws TTransportException, IOException {
|
||||
Properties p = new Properties();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
RemoteInterpreter intpA = new RemoteInterpreter(
|
||||
p,
|
||||
"note",
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -178,11 +190,13 @@ public class RemoteInterpreterTest {
|
|||
10 * 1000,
|
||||
null);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
RemoteInterpreter intpB = new RemoteInterpreter(
|
||||
p,
|
||||
"note",
|
||||
MockInterpreterB.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -191,7 +205,7 @@ public class RemoteInterpreterTest {
|
|||
10 * 1000,
|
||||
null);
|
||||
|
||||
intpGroup.add(intpB);
|
||||
intpGroup.get("note").add(intpB);
|
||||
intpB.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpA.open();
|
||||
|
|
@ -234,15 +248,16 @@ public class RemoteInterpreterTest {
|
|||
@Test
|
||||
public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
|
||||
Properties p = new Properties();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
final RemoteInterpreter intpA = createMockInterpreterA(p);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
final RemoteInterpreter intpB = createMockInterpreterB(p);
|
||||
|
||||
intpGroup.add(intpB);
|
||||
intpGroup.get("note").add(intpB);
|
||||
intpB.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpA.open();
|
||||
|
|
@ -318,13 +333,11 @@ public class RemoteInterpreterTest {
|
|||
|
||||
};
|
||||
intpB.getScheduler().submit(jobB);
|
||||
|
||||
// wait until both job finished
|
||||
while (jobA.getStatus() != Status.FINISHED ||
|
||||
jobB.getStatus() != Status.FINISHED) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
assertTrue(end - start >= 1000);
|
||||
|
||||
|
|
@ -337,10 +350,11 @@ public class RemoteInterpreterTest {
|
|||
@Test
|
||||
public void testRunOrderPreserved() throws InterruptedException {
|
||||
Properties p = new Properties();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
final RemoteInterpreter intpA = createMockInterpreterA(p);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpA.open();
|
||||
|
|
@ -412,10 +426,11 @@ public class RemoteInterpreterTest {
|
|||
public void testRunParallel() throws InterruptedException {
|
||||
Properties p = new Properties();
|
||||
p.put("parallel", "true");
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
final RemoteInterpreter intpA = createMockInterpreterA(p);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpA.open();
|
||||
|
|
@ -501,6 +516,7 @@ public class RemoteInterpreterTest {
|
|||
@Test
|
||||
public void testInterpreterGroupResetAfterProcessFinished() {
|
||||
Properties p = new Properties();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
RemoteInterpreter intpA = createMockInterpreterA(p);
|
||||
|
||||
|
|
@ -519,10 +535,11 @@ public class RemoteInterpreterTest {
|
|||
@Test
|
||||
public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException {
|
||||
Properties p = new Properties();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
final RemoteInterpreter intpA = createMockInterpreterA(p);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpA.open();
|
||||
|
|
@ -570,7 +587,12 @@ public class RemoteInterpreterTest {
|
|||
// restart interpreter
|
||||
RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
|
||||
intpA.close();
|
||||
intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
|
||||
|
||||
InterpreterGroup newInterpreterGroup =
|
||||
new InterpreterGroup(intpA.getInterpreterGroup().getId());
|
||||
newInterpreterGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
intpA.setInterpreterGroup(newInterpreterGroup);
|
||||
intpA.open();
|
||||
RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
|
||||
|
||||
|
|
@ -581,15 +603,16 @@ public class RemoteInterpreterTest {
|
|||
@Test
|
||||
public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
|
||||
Properties p = new Properties();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
||||
RemoteInterpreter intpA = createMockInterpreterA(p);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
RemoteInterpreter intpB = createMockInterpreterB(p);
|
||||
|
||||
intpGroup.add(intpB);
|
||||
intpGroup.get("note").add(intpB);
|
||||
intpB.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpA.open();
|
||||
|
|
@ -597,4 +620,39 @@ public class RemoteInterpreterTest {
|
|||
|
||||
assertEquals(intpA.getScheduler(), intpB.getScheduler());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiInterpreterSession() {
|
||||
Properties p = new Properties();
|
||||
intpGroup.put("sessionA", new LinkedList<Interpreter>());
|
||||
intpGroup.put("sessionB", new LinkedList<Interpreter>());
|
||||
|
||||
RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
|
||||
intpGroup.get("sessionA").add(intpAsessionA);
|
||||
intpAsessionA.setInterpreterGroup(intpGroup);
|
||||
|
||||
RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
|
||||
intpGroup.get("sessionA").add(intpBsessionA);
|
||||
intpBsessionA.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpAsessionA.open();
|
||||
intpBsessionA.open();
|
||||
|
||||
assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
|
||||
|
||||
RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
|
||||
intpGroup.get("sessionB").add(intpAsessionB);
|
||||
intpAsessionB.setInterpreterGroup(intpGroup);
|
||||
|
||||
RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
|
||||
intpGroup.get("sessionB").add(intpBsessionB);
|
||||
intpBsessionB.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpAsessionB.open();
|
||||
intpBsessionB.open();
|
||||
|
||||
assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
|
||||
assertEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
|
||||
assertEquals(intpBsessionA.getScheduler(), intpBsessionB.getScheduler());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,13 +91,30 @@ public class MockInterpreterB extends Interpreter {
|
|||
|
||||
public MockInterpreterA getInterpreterA() {
|
||||
InterpreterGroup interpreterGroup = getInterpreterGroup();
|
||||
for (Interpreter intp : interpreterGroup) {
|
||||
if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
synchronized (interpreterGroup) {
|
||||
for (List<Interpreter> interpreters : interpreterGroup.values()) {
|
||||
boolean belongsToSameNoteGroup = false;
|
||||
MockInterpreterA a = null;
|
||||
for (Interpreter intp : interpreters) {
|
||||
if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
a = (MockInterpreterA) p;
|
||||
}
|
||||
|
||||
Interpreter p = intp;
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
if (this == p) {
|
||||
belongsToSameNoteGroup = true;
|
||||
}
|
||||
}
|
||||
if (belongsToSameNoteGroup) {
|
||||
return a;
|
||||
}
|
||||
return (MockInterpreterA) p;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
|
@ -105,13 +122,10 @@ public class MockInterpreterB extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
InterpreterGroup interpreterGroup = getInterpreterGroup();
|
||||
for (Interpreter intp : interpreterGroup) {
|
||||
if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
|
||||
return intp.getScheduler();
|
||||
}
|
||||
MockInterpreterA intpA = getInterpreterA();
|
||||
if (intpA != null) {
|
||||
return intpA.getScheduler();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,10 +18,7 @@ package org.apache.zeppelin.resource;
|
|||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
|
||||
|
|
@ -60,6 +57,7 @@ public class DistributedResourcePoolTest {
|
|||
|
||||
intp1 = new RemoteInterpreter(
|
||||
p,
|
||||
"note",
|
||||
MockInterpreterResourcePool.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -70,11 +68,13 @@ public class DistributedResourcePoolTest {
|
|||
);
|
||||
|
||||
intpGroup1 = new InterpreterGroup("intpGroup1");
|
||||
intpGroup1.add(intp1);
|
||||
intpGroup1.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup1.get("note").add(intp1);
|
||||
intp1.setInterpreterGroup(intpGroup1);
|
||||
|
||||
intp2 = new RemoteInterpreter(
|
||||
p,
|
||||
"note",
|
||||
MockInterpreterResourcePool.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -85,7 +85,8 @@ public class DistributedResourcePoolTest {
|
|||
);
|
||||
|
||||
intpGroup2 = new InterpreterGroup("intpGroup2");
|
||||
intpGroup2.add(intp2);
|
||||
intpGroup2.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup2.get("note").add(intp2);
|
||||
intp2.setInterpreterGroup(intpGroup2);
|
||||
|
||||
context = new InterpreterContext(
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import java.util.Properties;
|
|||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
|
|
@ -67,6 +68,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
|
||||
final RemoteInterpreter intpA = new RemoteInterpreter(
|
||||
p,
|
||||
"note",
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -75,12 +77,13 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
10 * 1000,
|
||||
this);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpA.open();
|
||||
|
||||
Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
|
||||
Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
|
||||
intpA.getInterpreterProcess(),
|
||||
10);
|
||||
|
||||
|
|
@ -152,6 +155,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
|
||||
final RemoteInterpreter intpA = new RemoteInterpreter(
|
||||
p,
|
||||
"note",
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
|
|
@ -160,12 +164,13 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
10 * 1000,
|
||||
this);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
intpGroup.get("note").add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpA.open();
|
||||
|
||||
Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
|
||||
Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
|
||||
intpA.getInterpreterProcess(),
|
||||
10);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue