mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Handle scheduler termination correctly when unbind, bind interpreter
This commit is contained in:
parent
269c27d77c
commit
f2299d6c74
8 changed files with 55 additions and 26 deletions
|
|
@ -121,10 +121,6 @@ public abstract class Interpreter {
|
|||
* Called when interpreter is no longer used.
|
||||
*/
|
||||
public void destroy() {
|
||||
Scheduler scheduler = getScheduler();
|
||||
if (scheduler != null) {
|
||||
scheduler.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public static Logger logger = LoggerFactory.getLogger(Interpreter.class);
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
||||
/**
|
||||
* InterpreterGroup is list of interpreters in the same interpreter group.
|
||||
|
|
@ -160,7 +162,12 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
for (final Interpreter intp : intpToClose) {
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
Scheduler scheduler = intp.getScheduler();
|
||||
intp.close();
|
||||
|
||||
if (scheduler != null) {
|
||||
SchedulerFactory.singleton().removeScheduler(scheduler.getName());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ import com.google.gson.Gson;
|
|||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
/**
|
||||
*
|
||||
* Proxy for Interpreter instance that runs on separate process
|
||||
*/
|
||||
public class RemoteInterpreter extends Interpreter {
|
||||
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
|
||||
|
|
@ -51,6 +51,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private Map<String, String> env;
|
||||
private int connectTimeout;
|
||||
private int maxPoolSize;
|
||||
private static String schedulerName;
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
|
|
@ -178,11 +179,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
public void close() {
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
|
||||
SchedulerFactory.singleton()
|
||||
.removeScheduler("remoteinterpreter_" + interpreterProcess.hashCode());
|
||||
|
||||
Client client = null;
|
||||
|
||||
boolean broken = false;
|
||||
try {
|
||||
client = interpreterProcess.getClient();
|
||||
|
|
@ -360,7 +357,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
return null;
|
||||
} else {
|
||||
return SchedulerFactory.singleton().createOrGetRemoteScheduler(
|
||||
"remoteinterpreter_" + interpreterProcess.hashCode(),
|
||||
RemoteInterpreter.class.getName() + noteId + interpreterProcess.hashCode(),
|
||||
noteId,
|
||||
interpreterProcess,
|
||||
maxConcurrency);
|
||||
|
|
|
|||
|
|
@ -23,11 +23,7 @@ import java.lang.reflect.Constructor;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
|
|
@ -214,8 +210,20 @@ public class RemoteInterpreterServer
|
|||
|
||||
@Override
|
||||
public void close(String noteId, String className) throws TException {
|
||||
Interpreter intp = getInterpreter(noteId, className);
|
||||
intp.close();
|
||||
synchronized (interpreterGroup) {
|
||||
List<Interpreter> interpreters = interpreterGroup.get(noteId);
|
||||
if (interpreters != null) {
|
||||
Iterator<Interpreter> it = interpreters.iterator();
|
||||
while (it.hasNext()) {
|
||||
Interpreter inp = it.next();
|
||||
if (inp.getClassName().equals(className)) {
|
||||
inp.close();
|
||||
it.remove();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -652,7 +652,6 @@ public class RemoteInterpreterTest {
|
|||
intpBsessionB.open();
|
||||
|
||||
assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
|
||||
assertEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
|
||||
assertEquals(intpBsessionA.getScheduler(), intpBsessionB.getScheduler());
|
||||
assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
|
|||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.rest.AbstractTestRestApi;
|
||||
import org.apache.zeppelin.server.ZeppelinServer;
|
||||
import org.apache.zeppelin.socket.Message.OP;
|
||||
|
|
@ -86,14 +87,17 @@ public class NotebookServerTest extends AbstractTestRestApi {
|
|||
InterpreterGroup interpreterGroup = null;
|
||||
List<InterpreterSetting> settings = note1.getNoteReplLoader().getInterpreterSettings();
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getInterpreterGroup() == null) {
|
||||
continue;
|
||||
if (setting.getName().equals("md")) {
|
||||
interpreterGroup = setting.getInterpreterGroup();
|
||||
break;
|
||||
}
|
||||
|
||||
interpreterGroup = setting.getInterpreterGroup();
|
||||
break;
|
||||
}
|
||||
|
||||
// start interpreter process
|
||||
Paragraph p1 = note1.addParagraph();
|
||||
p1.setText("%md start remote interpreter process");
|
||||
note1.run(p1.getId());
|
||||
|
||||
// add angularObject
|
||||
interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null);
|
||||
|
||||
|
|
|
|||
|
|
@ -450,7 +450,11 @@ public class InterpreterFactory {
|
|||
interpreterGroup.destroy(noteId);
|
||||
synchronized (interpreterGroup) {
|
||||
interpreterGroup.remove(noteId);
|
||||
interpreterGroup.notifyAll(); // notify createInterpreterForNote()
|
||||
}
|
||||
logger.info("Interpreter instance {} for note {} is removed",
|
||||
interpreterSetting.getName(),
|
||||
noteId);
|
||||
}
|
||||
|
||||
public void createInterpretersForNote(
|
||||
|
|
@ -461,6 +465,19 @@ public class InterpreterFactory {
|
|||
InterpreterOption option = interpreterSetting.getOption();
|
||||
Properties properties = interpreterSetting.getProperties();
|
||||
|
||||
// if interpreters are already there, wait until they're being removed
|
||||
synchronized (interpreterGroup) {
|
||||
while (interpreterGroup.containsKey(noteId)) {
|
||||
try {
|
||||
interpreterGroup.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
logger.debug(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId);
|
||||
|
||||
for (String className : interpreterClassList) {
|
||||
Set<String> keys = Interpreter.registeredInterpreters.keySet();
|
||||
for (String intName : keys) {
|
||||
|
|
@ -489,7 +506,7 @@ public class InterpreterFactory {
|
|||
}
|
||||
interpreters.add(intp);
|
||||
}
|
||||
|
||||
logger.info("Interpreter " + intp.getClassName() + " " + intp.hashCode() + " created");
|
||||
intp.setInterpreterGroup(interpreterGroup);
|
||||
break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -244,7 +244,8 @@ public class Notebook {
|
|||
Note note = getNote(id);
|
||||
if (note != null) {
|
||||
note.getNoteReplLoader().setInterpreters(interpreterSettingIds);
|
||||
replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
|
||||
// comment out while note.getNoteReplLoader().setInterpreters(...) do the same
|
||||
// replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue