mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Implement eventForWait class
This commit is contained in:
parent
0570ae804b
commit
25232387d1
1 changed files with 41 additions and 0 deletions
|
|
@ -23,7 +23,10 @@ import java.lang.reflect.Constructor;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.rmi.server.RemoteServer;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
|
|
@ -79,6 +82,9 @@ public class RemoteInterpreterServer
|
|||
private final Map<String, RunningApplication> runningApplications =
|
||||
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
|
||||
|
||||
private Map<String, Object> remoteWorksResponsePool =
|
||||
Collections.synchronizedMap(new HashMap<String, Object>());
|
||||
|
||||
public RemoteInterpreterServer(int port) throws TTransportException {
|
||||
this.port = port;
|
||||
|
||||
|
|
@ -343,11 +349,14 @@ public class RemoteInterpreterServer
|
|||
logger.info("clover remote zeppelin server conteroller body {}", response.getMsg());
|
||||
|
||||
if (response.getType() == RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT) {
|
||||
List<InterpreterContextRunner> intpContextRunners = new LinkedList<>();
|
||||
List<ZeppelinServerResourceParagraphRunner> runners = gson.fromJson(response.getMsg(),
|
||||
new TypeToken<List<ZeppelinServerResourceParagraphRunner>>() {}.getType());
|
||||
for (ZeppelinServerResourceParagraphRunner r : runners) {
|
||||
logger.info("clover runner nid {} pid {}", r.getNoteId(), r.getParagraphId());
|
||||
intpContextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
|
||||
}
|
||||
remoteWorksResponsePool.put(response.getEventOwnerKey(), intpContextRunners);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -607,6 +616,38 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
}
|
||||
|
||||
static class RemoteServerController {
|
||||
//clover
|
||||
private final long DEFAULT_TIMEOUT_VALUE = 30000;
|
||||
private Map<String, Object> remoteWorksResponsePool;
|
||||
public RemoteServerController(Map<String, Object> remoteWorksResponsePool) {
|
||||
this.remoteWorksResponsePool = remoteWorksResponsePool;
|
||||
}
|
||||
|
||||
public boolean waitForEvent(String eventOwnerKey) throws InterruptedException {
|
||||
return waitForEvent(eventOwnerKey, DEFAULT_TIMEOUT_VALUE);
|
||||
}
|
||||
|
||||
public boolean waitForEvent(String eventOwnerKey, long timeout) throws InterruptedException {
|
||||
boolean wasGetData = false;
|
||||
long now = System.currentTimeMillis();
|
||||
long endTime = System.currentTimeMillis() + timeout;
|
||||
|
||||
while (endTime <= now) {
|
||||
if (this.remoteWorksResponsePool.containsKey(eventOwnerKey) == true) {
|
||||
wasGetData = true;
|
||||
break;
|
||||
}
|
||||
now = System.currentTimeMillis();
|
||||
sleep(500);
|
||||
}
|
||||
|
||||
return wasGetData;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private RemoteInterpreterResult convert(InterpreterResult result,
|
||||
Map<String, Object> config, GUI gui) {
|
||||
return new RemoteInterpreterResult(
|
||||
|
|
|
|||
Loading…
Reference in a new issue