zeppelin-interpreter note session support

This commit is contained in:
Lee moon soo 2016-02-03 06:33:10 +09:00
parent 5787984def
commit ed1ab0d213
19 changed files with 1525 additions and 350 deletions

View file

@ -23,13 +23,22 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.resource.ResourcePool;
/**
* InterpreterGroup is list of interpreters in the same group.
* InterpreterGroup is list of interpreters in the same interpreter group.
* For example spark, pyspark, sql interpreters are in the same 'spark' group
* and InterpreterGroup will have reference to these all interpreters.
*
* Remember, list of interpreters are dedicated to a note.
* (when InterpreterOption.perNoteSession==true)
* So InterpreterGroup internally manages map of [noteId, list of interpreters]
*
* A InterpreterGroup runs on interpreter process.
* And unit of interpreter instantiate, restart, bind, unbind.
*/
public class InterpreterGroup extends LinkedList<Interpreter>{
public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter>> {
String id;
Logger LOGGER = Logger.getLogger(InterpreterGroup.class);
@ -38,10 +47,14 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
ResourcePool resourcePool;
// map [notebook session, Interpreters in the group], to support per note session interpreters
//Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
// List<Interpreter>>();
private static final Map<String, InterpreterGroup> allInterpreterGroups =
new ConcurrentHashMap<String, InterpreterGroup>();
public static InterpreterGroup get(String id) {
public static InterpreterGroup getByInterpreterGroupId(String id) {
return allInterpreterGroups.get(id);
}
@ -49,11 +62,18 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
return new LinkedList(allInterpreterGroups.values());
}
/**
* Create InterpreterGroup with given id
* @param id
*/
public InterpreterGroup(String id) {
this.id = id;
allInterpreterGroups.put(id, this);
}
/**
* Create InterpreterGroup with autogenerated id
*/
public InterpreterGroup() {
getId();
allInterpreterGroups.put(id, this);
@ -73,10 +93,22 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
}
}
/**
* Get combined property of all interpreters in this group
* @return
*/
public Properties getProperty() {
Properties p = new Properties();
for (Interpreter intp : this) {
p.putAll(intp.getProperty());
Collection<List<Interpreter>> intpGroupForANote = this.values();
if (intpGroupForANote != null && intpGroupForANote.size() > 0) {
for (List<Interpreter> intpGroup : intpGroupForANote) {
for (Interpreter intp : intpGroup) {
p.putAll(intp.getProperty());
}
// it's okay to break here while vevery List<Interpreters> will have the same property set
break;
}
}
return p;
}
@ -97,10 +129,32 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
this.remoteInterpreterProcess = remoteInterpreterProcess;
}
/**
* Close all interpreter instances in this group
*/
public void close() {
List<Interpreter> intpToClose = new LinkedList<Interpreter>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToClose.addAll(intpGroupForNote);
}
close(intpToClose);
}
/**
* Close all interpreter instances in this group for the note
* @param noteId
*/
public void close(String noteId) {
List<Interpreter> intpForNote = this.get(noteId);
close(intpForNote);
}
private void close(Collection<Interpreter> intpToClose) {
List<Thread> closeThreads = new LinkedList<Thread>();
for (final Interpreter intp : this) {
for (final Interpreter intp : intpToClose) {
Thread t = new Thread() {
public void run() {
intp.close();
@ -120,10 +174,40 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
}
}
/**
* Destroy all interpreter instances in this group for the note
* @param noteId
*/
public void destroy(String noteId) {
List<Interpreter> intpForNote = this.get(noteId);
destroy(intpForNote);
}
/**
* Destroy all interpreter instances in this group
*/
public void destroy() {
List<Interpreter> intpToDestroy = new LinkedList<Interpreter>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToDestroy.addAll(intpGroupForNote);
}
destroy(intpToDestroy);
// make sure remote interpreter process terminates
if (remoteInterpreterProcess != null) {
while (remoteInterpreterProcess.referenceCount() > 0) {
remoteInterpreterProcess.dereference();
}
}
allInterpreterGroups.remove(id);
}
private void destroy(Collection<Interpreter> intpToDestroy) {
List<Thread> destroyThreads = new LinkedList<Thread>();
for (final Interpreter intp : this) {
for (final Interpreter intp : intpToDestroy) {
Thread t = new Thread() {
public void run() {
intp.destroy();
@ -141,17 +225,10 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
LOGGER.error("Can't close interpreter", e);
}
}
// make sure remote interpreter process terminates
if (remoteInterpreterProcess != null) {
while (remoteInterpreterProcess.referenceCount() > 0) {
remoteInterpreterProcess.dereference();
}
}
allInterpreterGroups.remove(id);
}
public void setResourcePool(ResourcePool resourcePool) {
this.resourcePool = resourcePool;
}

View file

@ -47,19 +47,7 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
}
private RemoteInterpreterProcess getRemoteInterpreterProcess() {
if (interpreterGroup.size() == 0) {
throw new RuntimeException("Can't get remoteInterpreterProcess");
}
Interpreter p = interpreterGroup.get(0);
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
if (p instanceof RemoteInterpreter) {
return ((RemoteInterpreter) p).getInterpreterProcess();
} else {
throw new RuntimeException("Can't get remoteInterpreterProcess");
}
return interpreterGroup.getRemoteInterpreterProcess();
}
/**

View file

@ -17,10 +17,7 @@
package org.apache.zeppelin.interpreter.remote;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.GUI;
@ -53,6 +50,7 @@ public class RemoteInterpreter extends Interpreter {
private String interpreterPath;
private String localRepoPath;
private String className;
private String noteId;
FormType formType;
boolean initialized;
private Map<String, String> env;
@ -60,6 +58,7 @@ public class RemoteInterpreter extends Interpreter {
private int maxPoolSize;
public RemoteInterpreter(Properties property,
String noteId,
String className,
String interpreterRunner,
String interpreterPath,
@ -68,6 +67,7 @@ public class RemoteInterpreter extends Interpreter {
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.noteId = noteId;
this.className = className;
initialized = false;
this.interpreterRunner = interpreterRunner;
@ -80,6 +80,7 @@ public class RemoteInterpreter extends Interpreter {
}
public RemoteInterpreter(Properties property,
String noteId,
String className,
String interpreterRunner,
String interpreterPath,
@ -89,6 +90,7 @@ public class RemoteInterpreter extends Interpreter {
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.className = className;
this.noteId = noteId;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.localRepoPath = localRepoPath;
@ -123,39 +125,32 @@ public class RemoteInterpreter extends Interpreter {
}
}
private synchronized void init() {
public synchronized void init() {
if (initialized == true) {
return;
}
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
int rc = interpreterProcess.reference(getInterpreterGroup());
interpreterProcess.setMaxPoolSize(this.maxPoolSize);
synchronized (interpreterProcess) {
// when first process created
if (rc == 1) {
// create all interpreter class in this interpreter group
Client client = null;
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
throw new InterpreterException(e1);
}
boolean broken = false;
try {
for (Interpreter intp : this.getInterpreterGroup()) {
logger.info("Create remote interpreter {}", intp.getClassName());
property.put("zeppelin.interpreter.localRepo", localRepoPath);
client.createInterpreter(getInterpreterGroup().getId(),
intp.getClassName(), (Map) property);
}
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client, broken);
}
synchronized (interpreterProcess) {
Client client = null;
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
throw new InterpreterException(e1);
}
boolean broken = false;
try {
logger.info("Create remote interpreter {}", getClassName());
property.put("zeppelin.interpreter.localRepo", localRepoPath);
client.createInterpreter(getInterpreterGroup().getId(), noteId,
getClassName(), (Map) property);
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client, broken);
}
}
initialized = true;
@ -165,19 +160,36 @@ public class RemoteInterpreter extends Interpreter {
@Override
public void open() {
init();
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
InterpreterGroup interpreterGroup = getInterpreterGroup();
int rc = interpreterProcess.reference(getInterpreterGroup());
interpreterProcess.setMaxPoolSize(
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
synchronized (interpreterGroup) {
// initialize all interpreters in this interpreter group
List<Interpreter> interpreters = interpreterGroup.get(noteId);
for (Interpreter intp : interpreters) {
((RemoteInterpreter) intp).init();
}
}
}
@Override
public void close() {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
SchedulerFactory.singleton()
.removeScheduler("remoteinterpreter_" + interpreterProcess.hashCode());
Client client = null;
boolean broken = false;
try {
client = interpreterProcess.getClient();
if (client != null) {
client.close(className);
client.close(noteId, className);
}
} catch (TException e) {
broken = true;
@ -219,7 +231,8 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
GUI settings = context.getGui();
RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context));
RemoteInterpreterResult remoteResult = client.interpret(
noteId, className, st, convert(context));
Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
@ -256,7 +269,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
client.cancel(className, convert(context));
client.cancel(noteId, className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@ -284,7 +297,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
formType = FormType.valueOf(client.getFormType(className));
formType = FormType.valueOf(client.getFormType(noteId, className));
return formType;
} catch (TException e) {
broken = true;
@ -310,7 +323,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
return client.getProgress(className, convert(context));
return client.getProgress(noteId, className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@ -332,7 +345,7 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
return client.completion(className, buf, cursor);
return client.completion(noteId, className, buf, cursor);
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@ -349,7 +362,9 @@ public class RemoteInterpreter extends Interpreter {
return null;
} else {
return SchedulerFactory.singleton().createOrGetRemoteScheduler(
"remoteinterpreter_" + interpreterProcess.hashCode(), interpreterProcess,
"remoteinterpreter_" + interpreterProcess.hashCode(),
noteId,
interpreterProcess,
maxConcurrency);
}
}

View file

@ -244,7 +244,8 @@ public class RemoteInterpreterEventPoller extends Thread {
}
private Object getResource(ResourceId resourceId) {
InterpreterGroup intpGroup = InterpreterGroup.get(resourceId.getResourcePoolId());
InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
resourceId.getResourcePoolId());
if (intpGroup == null) {
return null;
}

View file

@ -281,6 +281,15 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
clientPool.setMaxTotal(size + 2);
}
}
public int getMaxPoolSize() {
if (clientPool != null) {
return clientPool.getMaxTotal();
} else {
return 0;
}
}
/**
* Called when angular object is updated in client side to propagate
* change to the remote process

View file

@ -142,10 +142,9 @@ public class RemoteInterpreterServer
@Override
public void createInterpreter(String interpreterGroupId, String className, Map<String, String>
properties)
throws TException {
public void createInterpreter(String interpreterGroupId, String noteId, String
className,
Map<String, String> properties) throws TException {
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
@ -167,8 +166,13 @@ public class RemoteInterpreterServer
repl.setClassloaderUrls(new URL[]{});
synchronized (interpreterGroup) {
interpreterGroup.add(new LazyOpenInterpreter(
new ClassloaderInterpreter(repl, cl)));
List<Interpreter> interpreters = interpreterGroup.get(noteId);
if (interpreters == null) {
interpreters = new LinkedList<Interpreter>();
interpreterGroup.put(noteId, interpreters);
}
interpreters.add(new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl)));
}
logger.info("Instantiate interpreter {}", className);
@ -181,9 +185,18 @@ public class RemoteInterpreterServer
}
}
private Interpreter getInterpreter(String className) throws TException {
private Interpreter getInterpreter(String noteId, String className) throws TException {
if (interpreterGroup == null) {
throw new TException(
new InterpreterException("Interpreter instance " + className + " not created"));
}
synchronized (interpreterGroup) {
for (Interpreter inp : interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
if (interpreters == null) {
throw new TException(
new InterpreterException("Interpreter " + className + " not initialized"));
}
for (Interpreter inp : interpreters) {
if (inp.getClassName().equals(className)) {
return inp;
}
@ -194,23 +207,23 @@ public class RemoteInterpreterServer
}
@Override
public void open(String className) throws TException {
Interpreter intp = getInterpreter(className);
public void open(String noteId, String className) throws TException {
Interpreter intp = getInterpreter(noteId, className);
intp.open();
}
@Override
public void close(String className) throws TException {
Interpreter intp = getInterpreter(className);
public void close(String noteId, String className) throws TException {
Interpreter intp = getInterpreter(noteId, className);
intp.close();
}
@Override
public RemoteInterpreterResult interpret(String className, String st,
public RemoteInterpreterResult interpret(String noteId, String className, String st,
RemoteInterpreterContext interpreterContext) throws TException {
logger.debug("st: {}", st);
Interpreter intp = getInterpreter(className);
Interpreter intp = getInterpreter(noteId, className);
InterpreterContext context = convert(interpreterContext);
Scheduler scheduler = intp.getScheduler();
@ -343,10 +356,10 @@ public class RemoteInterpreterServer
@Override
public void cancel(String className, RemoteInterpreterContext interpreterContext)
public void cancel(String noteId, String className, RemoteInterpreterContext interpreterContext)
throws TException {
logger.info("cancel {} {}", className, interpreterContext.getParagraphId());
Interpreter intp = getInterpreter(className);
Interpreter intp = getInterpreter(noteId, className);
String jobId = interpreterContext.getParagraphId();
Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
@ -358,22 +371,24 @@ public class RemoteInterpreterServer
}
@Override
public int getProgress(String className, RemoteInterpreterContext interpreterContext)
public int getProgress(String noteId, String className,
RemoteInterpreterContext interpreterContext)
throws TException {
Interpreter intp = getInterpreter(className);
Interpreter intp = getInterpreter(noteId, className);
return intp.getProgress(convert(interpreterContext));
}
@Override
public String getFormType(String className) throws TException {
Interpreter intp = getInterpreter(className);
public String getFormType(String noteId, String className) throws TException {
Interpreter intp = getInterpreter(noteId, className);
return intp.getFormType().toString();
}
@Override
public List<String> completion(String className, String buf, int cursor) throws TException {
Interpreter intp = getInterpreter(className);
public List<String> completion(String noteId, String className, String buf, int cursor)
throws TException {
Interpreter intp = getInterpreter(noteId, className);
return intp.completion(buf, cursor);
}
@ -442,14 +457,19 @@ public class RemoteInterpreterServer
}
@Override
public String getStatus(String jobId)
public String getStatus(String noteId, String jobId)
throws TException {
if (interpreterGroup == null) {
return "Unknown";
}
synchronized (interpreterGroup) {
for (Interpreter intp : interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
if (interpreters == null) {
return "Unknown";
}
for (Interpreter intp : interpreters) {
for (Job job : intp.getScheduler().getJobsRunning()) {
if (jobId.equals(job.getId())) {
return job.getStatus().name();

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-2-2")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-2-2")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-2-2")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -45,14 +45,16 @@ public class RemoteScheduler implements Scheduler {
boolean terminate = false;
private String name;
private int maxConcurrency;
private final String noteId;
private RemoteInterpreterProcess interpreterProcess;
public RemoteScheduler(String name, ExecutorService executor,
public RemoteScheduler(String name, ExecutorService executor, String noteId,
RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
int maxConcurrency) {
this.name = name;
this.executor = executor;
this.listener = listener;
this.noteId = noteId;
this.interpreterProcess = interpreterProcess;
this.maxConcurrency = maxConcurrency;
}
@ -257,7 +259,7 @@ public class RemoteScheduler implements Scheduler {
boolean broken = false;
try {
String statusStr = client.getStatus(job.getId());
String statusStr = client.getStatus(noteId, job.getId());
if ("Unknown".equals(statusStr)) {
// not found this job in the remote schedulers.
// maybe not submitted, maybe already finished

View file

@ -86,6 +86,7 @@ public class SchedulerFactory implements SchedulerListener {
public Scheduler createOrGetRemoteScheduler(
String name,
String noteId,
RemoteInterpreterProcess interpreterProcess,
int maxConcurrency) {
@ -94,6 +95,7 @@ public class SchedulerFactory implements SchedulerListener {
Scheduler s = new RemoteScheduler(
name,
executor,
noteId,
interpreterProcess,
this,
maxConcurrency);

View file

@ -55,18 +55,18 @@ struct RemoteInterpreterEvent {
}
service RemoteInterpreterService {
void createInterpreter(1: string intpGroupId, 2: string className, 3: map<string, string> properties);
void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string className, 4: map<string, string> properties);
void open(1: string className);
void close(1: string className);
RemoteInterpreterResult interpret(1: string className, 2: string st, 3: RemoteInterpreterContext interpreterContext);
void cancel(1: string className, 2: RemoteInterpreterContext interpreterContext);
i32 getProgress(1: string className, 2: RemoteInterpreterContext interpreterContext);
string getFormType(1: string className);
list<string> completion(1: string className, 2: string buf, 3: i32 cursor);
void open(1: string noteId, 2: string className);
void close(1: string noteId, 2: string className);
RemoteInterpreterResult interpret(1: string noteId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
void cancel(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
i32 getProgress(1: string noteId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
string getFormType(1: string noteId, 2: string className);
list<string> completion(1: string noteId, 2: string className, 3: string buf, 4: i32 cursor);
void shutdown();
string getStatus(1:string jobId);
string getStatus(1: string noteId, 2:string jobId);
RemoteInterpreterEvent getEvent();

View file

@ -29,10 +29,7 @@ import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.resource.ResourcePool;
@ -67,6 +64,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
intp = new RemoteInterpreter(
p,
"note",
MockInterpreterAngular.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -76,7 +74,8 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
null
);
intpGroup.add(intp);
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(intp);
intp.setInterpreterGroup(intpGroup);
context = new InterpreterContext(

View file

@ -43,6 +43,8 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
@Before
public void setUp() throws Exception {
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
env = new HashMap<String, String>();
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
}
@ -56,6 +58,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
private RemoteInterpreter createMockInterpreter() {
RemoteInterpreter intp = new RemoteInterpreter(
new Properties(),
"note",
MockInterpreterOutputStream.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -64,7 +67,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
10 * 1000,
this);
intpGroup.add(intp);
intpGroup.get("note").add(intp);
intp.setInterpreterGroup(intpGroup);
return intp;
}

View file

@ -30,10 +30,7 @@ import java.util.Properties;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
@ -65,8 +62,13 @@ public class RemoteInterpreterTest {
}
private RemoteInterpreter createMockInterpreterA(Properties p) {
return createMockInterpreterA(p, "note");
}
private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) {
return new RemoteInterpreter(
p,
noteId,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -77,8 +79,13 @@ public class RemoteInterpreterTest {
}
private RemoteInterpreter createMockInterpreterB(Properties p) {
return createMockInterpreterB(p, "note");
}
private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) {
return new RemoteInterpreter(
p,
noteId,
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -91,15 +98,17 @@ public class RemoteInterpreterTest {
@Test
public void testRemoteInterperterCall() throws TTransportException, IOException {
Properties p = new Properties();
intpGroup.put("note", new LinkedList<Interpreter>());
RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreter intpB = createMockInterpreterB(p);
intpGroup.add(intpB);
intpGroup.get("note").add(intpB);
intpB.setInterpreterGroup(intpGroup);
@ -145,7 +154,8 @@ public class RemoteInterpreterTest {
RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
intpA.open();
@ -167,9 +177,11 @@ public class RemoteInterpreterTest {
@Test
public void testRemoteSchedulerSharing() throws TTransportException, IOException {
Properties p = new Properties();
intpGroup.put("note", new LinkedList<Interpreter>());
RemoteInterpreter intpA = new RemoteInterpreter(
p,
"note",
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -178,11 +190,13 @@ public class RemoteInterpreterTest {
10 * 1000,
null);
intpGroup.add(intpA);
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreter intpB = new RemoteInterpreter(
p,
"note",
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -191,7 +205,7 @@ public class RemoteInterpreterTest {
10 * 1000,
null);
intpGroup.add(intpB);
intpGroup.get("note").add(intpB);
intpB.setInterpreterGroup(intpGroup);
intpA.open();
@ -234,15 +248,16 @@ public class RemoteInterpreterTest {
@Test
public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
Properties p = new Properties();
intpGroup.put("note", new LinkedList<Interpreter>());
final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
final RemoteInterpreter intpB = createMockInterpreterB(p);
intpGroup.add(intpB);
intpGroup.get("note").add(intpB);
intpB.setInterpreterGroup(intpGroup);
intpA.open();
@ -318,13 +333,11 @@ public class RemoteInterpreterTest {
};
intpB.getScheduler().submit(jobB);
// wait until both job finished
while (jobA.getStatus() != Status.FINISHED ||
jobB.getStatus() != Status.FINISHED) {
Thread.sleep(100);
}
long end = System.currentTimeMillis();
assertTrue(end - start >= 1000);
@ -337,10 +350,11 @@ public class RemoteInterpreterTest {
@Test
public void testRunOrderPreserved() throws InterruptedException {
Properties p = new Properties();
intpGroup.put("note", new LinkedList<Interpreter>());
final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
intpA.open();
@ -412,10 +426,11 @@ public class RemoteInterpreterTest {
public void testRunParallel() throws InterruptedException {
Properties p = new Properties();
p.put("parallel", "true");
intpGroup.put("note", new LinkedList<Interpreter>());
final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
intpA.open();
@ -501,6 +516,7 @@ public class RemoteInterpreterTest {
@Test
public void testInterpreterGroupResetAfterProcessFinished() {
Properties p = new Properties();
intpGroup.put("note", new LinkedList<Interpreter>());
RemoteInterpreter intpA = createMockInterpreterA(p);
@ -519,10 +535,11 @@ public class RemoteInterpreterTest {
@Test
public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException {
Properties p = new Properties();
intpGroup.put("note", new LinkedList<Interpreter>());
final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
intpA.open();
@ -570,7 +587,12 @@ public class RemoteInterpreterTest {
// restart interpreter
RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
intpA.close();
intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
InterpreterGroup newInterpreterGroup =
new InterpreterGroup(intpA.getInterpreterGroup().getId());
newInterpreterGroup.put("note", new LinkedList<Interpreter>());
intpA.setInterpreterGroup(newInterpreterGroup);
intpA.open();
RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
@ -581,15 +603,16 @@ public class RemoteInterpreterTest {
@Test
public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
Properties p = new Properties();
intpGroup.put("note", new LinkedList<Interpreter>());
RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreter intpB = createMockInterpreterB(p);
intpGroup.add(intpB);
intpGroup.get("note").add(intpB);
intpB.setInterpreterGroup(intpGroup);
intpA.open();
@ -597,4 +620,39 @@ public class RemoteInterpreterTest {
assertEquals(intpA.getScheduler(), intpB.getScheduler());
}
@Test
public void testMultiInterpreterSession() {
Properties p = new Properties();
intpGroup.put("sessionA", new LinkedList<Interpreter>());
intpGroup.put("sessionB", new LinkedList<Interpreter>());
RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
intpGroup.get("sessionA").add(intpAsessionA);
intpAsessionA.setInterpreterGroup(intpGroup);
RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
intpGroup.get("sessionA").add(intpBsessionA);
intpBsessionA.setInterpreterGroup(intpGroup);
intpAsessionA.open();
intpBsessionA.open();
assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
intpGroup.get("sessionB").add(intpAsessionB);
intpAsessionB.setInterpreterGroup(intpGroup);
RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
intpGroup.get("sessionB").add(intpBsessionB);
intpBsessionB.setInterpreterGroup(intpGroup);
intpAsessionB.open();
intpBsessionB.open();
assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
assertEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
assertEquals(intpBsessionA.getScheduler(), intpBsessionB.getScheduler());
}
}

View file

@ -91,13 +91,30 @@ public class MockInterpreterB extends Interpreter {
public MockInterpreterA getInterpreterA() {
InterpreterGroup interpreterGroup = getInterpreterGroup();
for (Interpreter intp : interpreterGroup) {
if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
synchronized (interpreterGroup) {
for (List<Interpreter> interpreters : interpreterGroup.values()) {
boolean belongsToSameNoteGroup = false;
MockInterpreterA a = null;
for (Interpreter intp : interpreters) {
if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
a = (MockInterpreterA) p;
}
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
if (this == p) {
belongsToSameNoteGroup = true;
}
}
if (belongsToSameNoteGroup) {
return a;
}
return (MockInterpreterA) p;
}
}
return null;
@ -105,13 +122,10 @@ public class MockInterpreterB extends Interpreter {
@Override
public Scheduler getScheduler() {
InterpreterGroup interpreterGroup = getInterpreterGroup();
for (Interpreter intp : interpreterGroup) {
if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
return intp.getScheduler();
}
MockInterpreterA intpA = getInterpreterA();
if (intpA != null) {
return intpA.getScheduler();
}
return null;
}

View file

@ -18,10 +18,7 @@ package org.apache.zeppelin.resource;
import com.google.gson.Gson;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
@ -60,6 +57,7 @@ public class DistributedResourcePoolTest {
intp1 = new RemoteInterpreter(
p,
"note",
MockInterpreterResourcePool.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -70,11 +68,13 @@ public class DistributedResourcePoolTest {
);
intpGroup1 = new InterpreterGroup("intpGroup1");
intpGroup1.add(intp1);
intpGroup1.put("note", new LinkedList<Interpreter>());
intpGroup1.get("note").add(intp1);
intp1.setInterpreterGroup(intpGroup1);
intp2 = new RemoteInterpreter(
p,
"note",
MockInterpreterResourcePool.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -85,7 +85,8 @@ public class DistributedResourcePoolTest {
);
intpGroup2 = new InterpreterGroup("intpGroup2");
intpGroup2.add(intp2);
intpGroup2.put("note", new LinkedList<Interpreter>());
intpGroup2.get("note").add(intp2);
intp2.setInterpreterGroup(intpGroup2);
context = new InterpreterContext(

View file

@ -30,6 +30,7 @@ import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
@ -67,6 +68,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
"note",
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -75,12 +77,13 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
10 * 1000,
this);
intpGroup.add(intpA);
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
intpA.open();
Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
intpA.getInterpreterProcess(),
10);
@ -152,6 +155,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
"note",
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
@ -160,12 +164,13 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
10 * 1000,
this);
intpGroup.add(intpA);
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(intpA);
intpA.setInterpreterGroup(intpGroup);
intpA.open();
Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
intpA.getInterpreterProcess(),
10);