Implement eventForWait class

This commit is contained in:
CloverHearts 2016-11-14 23:46:26 +09:00
parent 0570ae804b
commit 25232387d1

View file

@ -23,7 +23,10 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.rmi.server.RemoteServer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
@ -79,6 +82,9 @@ public class RemoteInterpreterServer
private final Map<String, RunningApplication> runningApplications =
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
private Map<String, Object> remoteWorksResponsePool =
Collections.synchronizedMap(new HashMap<String, Object>());
public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
@ -343,11 +349,14 @@ public class RemoteInterpreterServer
logger.info("clover remote zeppelin server conteroller body {}", response.getMsg());
if (response.getType() == RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT) {
List<InterpreterContextRunner> intpContextRunners = new LinkedList<>();
List<ZeppelinServerResourceParagraphRunner> runners = gson.fromJson(response.getMsg(),
new TypeToken<List<ZeppelinServerResourceParagraphRunner>>() {}.getType());
for (ZeppelinServerResourceParagraphRunner r : runners) {
logger.info("clover runner nid {} pid {}", r.getNoteId(), r.getParagraphId());
intpContextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
}
remoteWorksResponsePool.put(response.getEventOwnerKey(), intpContextRunners);
}
}
@ -607,6 +616,38 @@ public class RemoteInterpreterServer
}
}
static class RemoteServerController {
//clover
private final long DEFAULT_TIMEOUT_VALUE = 30000;
private Map<String, Object> remoteWorksResponsePool;
public RemoteServerController(Map<String, Object> remoteWorksResponsePool) {
this.remoteWorksResponsePool = remoteWorksResponsePool;
}
public boolean waitForEvent(String eventOwnerKey) throws InterruptedException {
return waitForEvent(eventOwnerKey, DEFAULT_TIMEOUT_VALUE);
}
public boolean waitForEvent(String eventOwnerKey, long timeout) throws InterruptedException {
boolean wasGetData = false;
long now = System.currentTimeMillis();
long endTime = System.currentTimeMillis() + timeout;
while (endTime <= now) {
if (this.remoteWorksResponsePool.containsKey(eventOwnerKey) == true) {
wasGetData = true;
break;
}
now = System.currentTimeMillis();
sleep(500);
}
return wasGetData;
}
}
private RemoteInterpreterResult convert(InterpreterResult result,
Map<String, Object> config, GUI gui) {
return new RemoteInterpreterResult(