Handle scheduler termination correctly when unbind, bind interpreter

This commit is contained in:
Lee moon soo 2016-02-17 20:39:04 -08:00
parent 269c27d77c
commit f2299d6c74
8 changed files with 55 additions and 26 deletions

View file

@ -121,10 +121,6 @@ public abstract class Interpreter {
* Called when interpreter is no longer used.
*/
public void destroy() {
Scheduler scheduler = getScheduler();
if (scheduler != null) {
scheduler.stop();
}
}
public static Logger logger = LoggerFactory.getLogger(Interpreter.class);

View file

@ -25,6 +25,8 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
/**
* InterpreterGroup is list of interpreters in the same interpreter group.
@ -160,7 +162,12 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
for (final Interpreter intp : intpToClose) {
Thread t = new Thread() {
public void run() {
Scheduler scheduler = intp.getScheduler();
intp.close();
if (scheduler != null) {
SchedulerFactory.singleton().removeScheduler(scheduler.getName());
}
}
};

View file

@ -35,7 +35,7 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
*
* Proxy for Interpreter instance that runs on separate process
*/
public class RemoteInterpreter extends Interpreter {
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
@ -51,6 +51,7 @@ public class RemoteInterpreter extends Interpreter {
private Map<String, String> env;
private int connectTimeout;
private int maxPoolSize;
private static String schedulerName;
public RemoteInterpreter(Properties property,
String noteId,
@ -178,11 +179,7 @@ public class RemoteInterpreter extends Interpreter {
public void close() {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
SchedulerFactory.singleton()
.removeScheduler("remoteinterpreter_" + interpreterProcess.hashCode());
Client client = null;
boolean broken = false;
try {
client = interpreterProcess.getClient();
@ -360,7 +357,7 @@ public class RemoteInterpreter extends Interpreter {
return null;
} else {
return SchedulerFactory.singleton().createOrGetRemoteScheduler(
"remoteinterpreter_" + interpreterProcess.hashCode(),
RemoteInterpreter.class.getName() + noteId + interpreterProcess.hashCode(),
noteId,
interpreterProcess,
maxConcurrency);

View file

@ -23,11 +23,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
@ -214,8 +210,20 @@ public class RemoteInterpreterServer
@Override
public void close(String noteId, String className) throws TException {
Interpreter intp = getInterpreter(noteId, className);
intp.close();
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(noteId);
if (interpreters != null) {
Iterator<Interpreter> it = interpreters.iterator();
while (it.hasNext()) {
Interpreter inp = it.next();
if (inp.getClassName().equals(className)) {
inp.close();
it.remove();
break;
}
}
}
}
}

View file

@ -652,7 +652,6 @@ public class RemoteInterpreterTest {
intpBsessionB.open();
assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
assertEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
assertEquals(intpBsessionA.getScheduler(), intpBsessionB.getScheduler());
assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
}
}

View file

@ -24,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.rest.AbstractTestRestApi;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.socket.Message.OP;
@ -86,14 +87,17 @@ public class NotebookServerTest extends AbstractTestRestApi {
InterpreterGroup interpreterGroup = null;
List<InterpreterSetting> settings = note1.getNoteReplLoader().getInterpreterSettings();
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup() == null) {
continue;
if (setting.getName().equals("md")) {
interpreterGroup = setting.getInterpreterGroup();
break;
}
interpreterGroup = setting.getInterpreterGroup();
break;
}
// start interpreter process
Paragraph p1 = note1.addParagraph();
p1.setText("%md start remote interpreter process");
note1.run(p1.getId());
// add angularObject
interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null);

View file

@ -450,7 +450,11 @@ public class InterpreterFactory {
interpreterGroup.destroy(noteId);
synchronized (interpreterGroup) {
interpreterGroup.remove(noteId);
interpreterGroup.notifyAll(); // notify createInterpreterForNote()
}
logger.info("Interpreter instance {} for note {} is removed",
interpreterSetting.getName(),
noteId);
}
public void createInterpretersForNote(
@ -461,6 +465,19 @@ public class InterpreterFactory {
InterpreterOption option = interpreterSetting.getOption();
Properties properties = interpreterSetting.getProperties();
// if interpreters are already there, wait until they're being removed
synchronized (interpreterGroup) {
while (interpreterGroup.containsKey(noteId)) {
try {
interpreterGroup.wait(1000);
} catch (InterruptedException e) {
logger.debug(e.getMessage(), e);
}
}
}
logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId);
for (String className : interpreterClassList) {
Set<String> keys = Interpreter.registeredInterpreters.keySet();
for (String intName : keys) {
@ -489,7 +506,7 @@ public class InterpreterFactory {
}
interpreters.add(intp);
}
logger.info("Interpreter " + intp.getClassName() + " " + intp.hashCode() + " created");
intp.setInterpreterGroup(interpreterGroup);
break;
}

View file

@ -244,7 +244,8 @@ public class Notebook {
Note note = getNote(id);
if (note != null) {
note.getNoteReplLoader().setInterpreters(interpreterSettingIds);
replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
// comment out while note.getNoteReplLoader().setInterpreters(...) do the same
// replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
}
}