Add one more test

This commit is contained in:
Jeff Zhang 2017-10-26 10:01:27 +08:00
parent 00d0183577
commit 8b4c6ced34
9 changed files with 126 additions and 55 deletions

View file

@ -707,7 +707,11 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager");
"org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager"),
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL(
"zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", 6000L),
ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD(
"zeppelin.interpreter.lifecyclemanager.timeout.threshold", 360000L);
private String varName;
@SuppressWarnings("rawtypes")

View file

@ -1376,13 +1376,13 @@ public class NotebookServer extends WebSocketServlet
List<InterpreterSetting> settings =
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) {
if (setting.getInterpreterGroup(user, note.getId()) == null) {
continue;
}
if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, note.getId())
if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId())
.getId())) {
AngularObjectRegistry angularObjectRegistry =
setting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
// first trying to get local registry
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
@ -1419,13 +1419,13 @@ public class NotebookServer extends WebSocketServlet
List<InterpreterSetting> settings =
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) {
if (setting.getInterpreterGroup(user, n.getId()) == null) {
continue;
}
if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId())
if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId())
.getId())) {
AngularObjectRegistry angularObjectRegistry =
setting.getOrCreateInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
this.broadcastExcept(n.getId(),
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId())
@ -2297,14 +2297,17 @@ public class NotebookServer extends WebSocketServlet
}
for (InterpreterSetting intpSetting : settings) {
if (intpSetting.getInterpreterGroup(user, note.getId()) == null) {
continue;
}
AngularObjectRegistry registry =
intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
intpSetting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
List<AngularObject> objects = registry.getAllWithGlobal(note.getId());
for (AngularObject object : objects) {
conn.send(serializeMessage(
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
.put("interpreterGroupId",
intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getId())
intpSetting.getInterpreterGroup(user, note.getId()).getId())
.put("noteId", note.getId()).put("paragraphId", object.getParagraphId())));
}
}

View file

@ -402,7 +402,7 @@ public class InterpreterSetting {
this.interpreterGroups.remove(groupId);
}
ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
String groupId = getInterpreterGroupId(user, noteId);
try {
interpreterGroupReadLock.lock();

View file

@ -159,7 +159,7 @@ public class InterpreterSettingManager {
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
.newInstance(conf);
} catch (Exception e) {
throw new IOException("Fail to create LifecyleManager", e);
throw new IOException("Fail to create LifecycleManager", e);
}
init();

View file

@ -24,7 +24,7 @@ public class TimeoutLifecycleManager implements LifecycleManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutLifecycleManager.class);
// ManagerInterpreter -> LastTimeUsing timestamp
// ManagerInterpreterGroup -> LastTimeUsing timestamp
private Map<ManagedInterpreterGroup, Long> interpreterGroups = new ConcurrentHashMap<>();
private long checkInterval;
@ -33,10 +33,10 @@ public class TimeoutLifecycleManager implements LifecycleManager {
private Timer checkTimer;
public TimeoutLifecycleManager(ZeppelinConfiguration zConf) {
this.checkInterval = zConf.getLong("zeppelin.interpreter.lifecyclemanager." +
"timeout.checkinterval", 60000);
this.timeoutThreshold = zConf.getLong("zeppelin.interpreter.lifecyclemanager.timeout.threshold",
1000 * 60 * 60);
this.checkInterval = zConf.getLong(ZeppelinConfiguration.ConfVars
.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL);
this.timeoutThreshold = zConf.getLong(
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD);
this.checkTimer = new Timer(true);
this.checkTimer.scheduleAtFixedRate(new TimerTask() {
@Override
@ -46,13 +46,15 @@ public class TimeoutLifecycleManager implements LifecycleManager {
ManagedInterpreterGroup interpreterGroup = entry.getKey();
Long lastTimeUsing = entry.getValue();
if ((now - lastTimeUsing) > timeoutThreshold ) {
LOGGER.info("Interpreter {} is timeout.", interpreterGroup.getId());
LOGGER.info("InterpreterGroup {} is timeout.", interpreterGroup.getId());
interpreterGroup.close();
interpreterGroups.remove(entry.getKey());
}
}
}
}, checkInterval, checkInterval);
LOGGER.info("TimeoutLifecycleManager is started with checkinterval: " + checkInterval
+ ", timeoutThreshold: " + timeoutThreshold);
}
@Override
@ -70,6 +72,4 @@ public class TimeoutLifecycleManager implements LifecycleManager {
public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {
interpreterGroups.put(interpreterGroup, System.currentTimeMillis());
}
}

View file

@ -685,9 +685,11 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
if (intpGroup != null) {
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
}
}
}
@ -700,7 +702,10 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
if (setting.getInterpreterGroup(user, id) == null) {
continue;
}
InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {

View file

@ -334,39 +334,41 @@ public class Notebook implements NoteEventListener {
// remove from all interpreter instance's angular object registry
for (InterpreterSetting settings : interpreterSettingManager.get()) {
AngularObjectRegistry registry =
settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId());
InterpreterGroup interpreterGroup = settings.getInterpreterGroup(subject.getUser(), id);
if (interpreterGroup != null) {
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId());
// remove app scope object
List<ApplicationState> appStates = p.getAllApplicationStates();
if (appStates != null) {
for (ApplicationState app : appStates) {
((RemoteAngularObjectRegistry) registry)
.removeAllAndNotifyRemoteProcess(id, app.getId());
// remove app scope object
List<ApplicationState> appStates = p.getAllApplicationStates();
if (appStates != null) {
for (ApplicationState app : appStates) {
((RemoteAngularObjectRegistry) registry)
.removeAllAndNotifyRemoteProcess(id, app.getId());
}
}
}
}
// remove note scope object
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
} else {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
registry.removeAll(id, p.getId());
// remove note scope object
((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null);
} else {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
registry.removeAll(id, p.getId());
// remove app scope object
List<ApplicationState> appStates = p.getAllApplicationStates();
if (appStates != null) {
for (ApplicationState app : appStates) {
registry.removeAll(id, app.getId());
// remove app scope object
List<ApplicationState> appStates = p.getAllApplicationStates();
if (appStates != null) {
for (ApplicationState app : appStates) {
registry.removeAll(id, app.getId());
}
}
}
// remove note scope object
registry.removeAll(id, null);
}
// remove note scope object
registry.removeAll(id, null);
}
}
@ -517,9 +519,8 @@ public class Notebook implements NoteEventListener {
SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
List<InterpreterSetting> settings = interpreterSettingManager.get();
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(),
note.getId());
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId());
if (intpGroup != null && intpGroup.getId().equals(snapshot.getIntpGroupId())) {
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
String noteId = snapshot.getAngularObject().getNoteId();
String paragraphId = snapshot.getAngularObject().getParagraphId();

View file

@ -52,7 +52,7 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
assertEquals("test", interpreterSetting.getGroup());
assertEquals(3, interpreterSetting.getInterpreterInfos().size());
// 3 other builtin properties:
// * zeppelin.interpeter.output.limit
// * zeppelin.interpreter.output.limit
// * zeppelin.interpreter.localRepo
// * zeppelin.interpreter.max.poolsize
assertEquals(6, interpreterSetting.getJavaProperties().size());

View file

@ -25,12 +25,14 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -42,8 +44,8 @@ public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
public void setUp() throws Exception {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS.getVarName(),
TimeoutLifecycleManager.class.getName());
conf.setProperty("zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", 5000);
conf.setProperty("zeppelin.interpreter.lifecyclemanager.timeout.threshold", 10000);
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL.getVarName(), "1000");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000");
super.setUp();
}
@ -65,4 +67,60 @@ public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
assertFalse(remoteInterpreter.isOpened());
}
@Test
public void testTimeout_2() throws InterpreterException, InterruptedException, IOException {
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.sleep") instanceof RemoteInterpreter);
final RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.sleep");
// simulate how zeppelin submit paragraph
remoteInterpreter.getScheduler().submit(new Job("test-job", null) {
@Override
public Object getReturn() {
return null;
}
@Override
public int progress() {
return 0;
}
@Override
public Map<String, Object> info() {
return null;
}
@Override
protected Object jobRun() throws Throwable {
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
null, null, new ArrayList<InterpreterContextRunner>(), null);
return remoteInterpreter.interpret("100000", context);
}
@Override
protected boolean jobAbort() {
return false;
}
@Override
public void setResult(Object results) {
}
});
while(!remoteInterpreter.isOpened()) {
Thread.sleep(1000);
LOGGER.info("Wait for interpreter to be started");
}
InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test");
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
Thread.sleep(15 * 1000);
// interpreterGroup is not timeout because getStatus is called periodically.
assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
assertTrue(remoteInterpreter.isOpened());
}
}