mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Add one more test
This commit is contained in:
parent
00d0183577
commit
8b4c6ced34
9 changed files with 126 additions and 55 deletions
|
|
@ -707,7 +707,11 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
|
||||
|
||||
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
|
||||
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager");
|
||||
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager"),
|
||||
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL(
|
||||
"zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", 6000L),
|
||||
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD(
|
||||
"zeppelin.interpreter.lifecyclemanager.timeout.threshold", 360000L);
|
||||
|
||||
private String varName;
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
|
|
|||
|
|
@ -1376,13 +1376,13 @@ public class NotebookServer extends WebSocketServlet
|
|||
List<InterpreterSetting> settings =
|
||||
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) {
|
||||
if (setting.getInterpreterGroup(user, note.getId()) == null) {
|
||||
continue;
|
||||
}
|
||||
if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, note.getId())
|
||||
if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId())
|
||||
.getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry =
|
||||
setting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
|
||||
setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
|
||||
|
||||
// first trying to get local registry
|
||||
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
|
||||
|
|
@ -1419,13 +1419,13 @@ public class NotebookServer extends WebSocketServlet
|
|||
List<InterpreterSetting> settings =
|
||||
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) {
|
||||
if (setting.getInterpreterGroup(user, n.getId()) == null) {
|
||||
continue;
|
||||
}
|
||||
if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId())
|
||||
if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId())
|
||||
.getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry =
|
||||
setting.getOrCreateInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
|
||||
setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
|
||||
this.broadcastExcept(n.getId(),
|
||||
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
|
||||
.put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId())
|
||||
|
|
@ -2297,14 +2297,17 @@ public class NotebookServer extends WebSocketServlet
|
|||
}
|
||||
|
||||
for (InterpreterSetting intpSetting : settings) {
|
||||
if (intpSetting.getInterpreterGroup(user, note.getId()) == null) {
|
||||
continue;
|
||||
}
|
||||
AngularObjectRegistry registry =
|
||||
intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
|
||||
intpSetting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
|
||||
List<AngularObject> objects = registry.getAllWithGlobal(note.getId());
|
||||
for (AngularObject object : objects) {
|
||||
conn.send(serializeMessage(
|
||||
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
|
||||
.put("interpreterGroupId",
|
||||
intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getId())
|
||||
intpSetting.getInterpreterGroup(user, note.getId()).getId())
|
||||
.put("noteId", note.getId()).put("paragraphId", object.getParagraphId())));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -402,7 +402,7 @@ public class InterpreterSetting {
|
|||
this.interpreterGroups.remove(groupId);
|
||||
}
|
||||
|
||||
ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
|
||||
public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
|
||||
String groupId = getInterpreterGroupId(user, noteId);
|
||||
try {
|
||||
interpreterGroupReadLock.lock();
|
||||
|
|
|
|||
|
|
@ -159,7 +159,7 @@ public class InterpreterSettingManager {
|
|||
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
|
||||
.newInstance(conf);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Fail to create LifecyleManager", e);
|
||||
throw new IOException("Fail to create LifecycleManager", e);
|
||||
}
|
||||
|
||||
init();
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ public class TimeoutLifecycleManager implements LifecycleManager {
|
|||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutLifecycleManager.class);
|
||||
|
||||
// ManagerInterpreter -> LastTimeUsing timestamp
|
||||
// ManagerInterpreterGroup -> LastTimeUsing timestamp
|
||||
private Map<ManagedInterpreterGroup, Long> interpreterGroups = new ConcurrentHashMap<>();
|
||||
|
||||
private long checkInterval;
|
||||
|
|
@ -33,10 +33,10 @@ public class TimeoutLifecycleManager implements LifecycleManager {
|
|||
private Timer checkTimer;
|
||||
|
||||
public TimeoutLifecycleManager(ZeppelinConfiguration zConf) {
|
||||
this.checkInterval = zConf.getLong("zeppelin.interpreter.lifecyclemanager." +
|
||||
"timeout.checkinterval", 60000);
|
||||
this.timeoutThreshold = zConf.getLong("zeppelin.interpreter.lifecyclemanager.timeout.threshold",
|
||||
1000 * 60 * 60);
|
||||
this.checkInterval = zConf.getLong(ZeppelinConfiguration.ConfVars
|
||||
.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL);
|
||||
this.timeoutThreshold = zConf.getLong(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD);
|
||||
this.checkTimer = new Timer(true);
|
||||
this.checkTimer.scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
|
|
@ -46,13 +46,15 @@ public class TimeoutLifecycleManager implements LifecycleManager {
|
|||
ManagedInterpreterGroup interpreterGroup = entry.getKey();
|
||||
Long lastTimeUsing = entry.getValue();
|
||||
if ((now - lastTimeUsing) > timeoutThreshold ) {
|
||||
LOGGER.info("Interpreter {} is timeout.", interpreterGroup.getId());
|
||||
LOGGER.info("InterpreterGroup {} is timeout.", interpreterGroup.getId());
|
||||
interpreterGroup.close();
|
||||
interpreterGroups.remove(entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
}, checkInterval, checkInterval);
|
||||
LOGGER.info("TimeoutLifecycleManager is started with checkinterval: " + checkInterval
|
||||
+ ", timeoutThreshold: " + timeoutThreshold);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -70,6 +72,4 @@ public class TimeoutLifecycleManager implements LifecycleManager {
|
|||
public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {
|
||||
interpreterGroups.put(interpreterGroup, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -685,9 +685,11 @@ public class Note implements ParagraphJobListener, JsonSerializable {
|
|||
}
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
|
||||
if (intpGroup != null) {
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -700,7 +702,10 @@ public class Note implements ParagraphJobListener, JsonSerializable {
|
|||
}
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
|
||||
if (setting.getInterpreterGroup(user, id) == null) {
|
||||
continue;
|
||||
}
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
|
||||
if (registry instanceof RemoteAngularObjectRegistry) {
|
||||
|
|
|
|||
|
|
@ -334,39 +334,41 @@ public class Notebook implements NoteEventListener {
|
|||
|
||||
// remove from all interpreter instance's angular object registry
|
||||
for (InterpreterSetting settings : interpreterSettingManager.get()) {
|
||||
AngularObjectRegistry registry =
|
||||
settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
|
||||
if (registry instanceof RemoteAngularObjectRegistry) {
|
||||
// remove paragraph scope object
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId());
|
||||
InterpreterGroup interpreterGroup = settings.getInterpreterGroup(subject.getUser(), id);
|
||||
if (interpreterGroup != null) {
|
||||
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
|
||||
if (registry instanceof RemoteAngularObjectRegistry) {
|
||||
// remove paragraph scope object
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId());
|
||||
|
||||
// remove app scope object
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
if (appStates != null) {
|
||||
for (ApplicationState app : appStates) {
|
||||
((RemoteAngularObjectRegistry) registry)
|
||||
.removeAllAndNotifyRemoteProcess(id, app.getId());
|
||||
// remove app scope object
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
if (appStates != null) {
|
||||
for (ApplicationState app : appStates) {
|
||||
((RemoteAngularObjectRegistry) registry)
|
||||
.removeAllAndNotifyRemoteProcess(id, app.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// remove note scope object
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
|
||||
} else {
|
||||
// remove paragraph scope object
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
registry.removeAll(id, p.getId());
|
||||
// remove note scope object
|
||||
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
|
||||
} else {
|
||||
// remove paragraph scope object
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
registry.removeAll(id, p.getId());
|
||||
|
||||
// remove app scope object
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
if (appStates != null) {
|
||||
for (ApplicationState app : appStates) {
|
||||
registry.removeAll(id, app.getId());
|
||||
// remove app scope object
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
if (appStates != null) {
|
||||
for (ApplicationState app : appStates) {
|
||||
registry.removeAll(id, app.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
// remove note scope object
|
||||
registry.removeAll(id, null);
|
||||
}
|
||||
// remove note scope object
|
||||
registry.removeAll(id, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -517,9 +519,8 @@ public class Notebook implements NoteEventListener {
|
|||
SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
|
||||
List<InterpreterSetting> settings = interpreterSettingManager.get();
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(),
|
||||
note.getId());
|
||||
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId());
|
||||
if (intpGroup != null && intpGroup.getId().equals(snapshot.getIntpGroupId())) {
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
String noteId = snapshot.getAngularObject().getNoteId();
|
||||
String paragraphId = snapshot.getAngularObject().getParagraphId();
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
|
|||
assertEquals("test", interpreterSetting.getGroup());
|
||||
assertEquals(3, interpreterSetting.getInterpreterInfos().size());
|
||||
// 3 other builtin properties:
|
||||
// * zeppelin.interpeter.output.limit
|
||||
// * zeppelin.interpreter.output.limit
|
||||
// * zeppelin.interpreter.localRepo
|
||||
// * zeppelin.interpreter.max.poolsize
|
||||
assertEquals(6, interpreterSetting.getJavaProperties().size());
|
||||
|
|
|
|||
|
|
@ -25,12 +25,14 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
|||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
|
@ -42,8 +44,8 @@ public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
|
|||
public void setUp() throws Exception {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS.getVarName(),
|
||||
TimeoutLifecycleManager.class.getName());
|
||||
conf.setProperty("zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", 5000);
|
||||
conf.setProperty("zeppelin.interpreter.lifecyclemanager.timeout.threshold", 10000);
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL.getVarName(), "1000");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000");
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
|
|
@ -65,4 +67,60 @@ public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
|
|||
assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
|
||||
assertFalse(remoteInterpreter.isOpened());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeout_2() throws InterpreterException, InterruptedException, IOException {
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.sleep") instanceof RemoteInterpreter);
|
||||
final RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.sleep");
|
||||
|
||||
// simulate how zeppelin submit paragraph
|
||||
remoteInterpreter.getScheduler().submit(new Job("test-job", null) {
|
||||
@Override
|
||||
public Object getReturn() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int progress() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> info() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object jobRun() throws Throwable {
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
return remoteInterpreter.interpret("100000", context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean jobAbort() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResult(Object results) {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
while(!remoteInterpreter.isOpened()) {
|
||||
Thread.sleep(1000);
|
||||
LOGGER.info("Wait for interpreter to be started");
|
||||
}
|
||||
|
||||
InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
|
||||
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
|
||||
|
||||
Thread.sleep(15 * 1000);
|
||||
// interpreterGroup is not timeout because getStatus is called periodically.
|
||||
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
|
||||
assertTrue(remoteInterpreter.isOpened());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue