Implement runNote and re implement run method

This commit is contained in:
CloverHearts 2016-11-21 14:53:13 +09:00
parent f9661c843b
commit 10c2a47047
8 changed files with 78 additions and 18 deletions

View file

@ -340,6 +340,27 @@ public class ZeppelinContext {
}
public void runNote(String noteId) {
runNote(noteId, interpreterContext);
}
public void runNote(String noteId, InterpreterContext context) {
String runningNoteId = context.getNoteId();
String runningParagraphId = context.getParagraphId();
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
if (runners.size() <= 0) {
throw new InterpreterException("Note " + noteId + " not found " + runners.size());
}
for (InterpreterContextRunner r : runners) {
if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) {
continue;
}
r.run();
}
}
/**
* get Zeppelin Paragraph Runner from zeppelin server
@ -440,13 +461,7 @@ public class ZeppelinContext {
*/
@ZeppelinApi
public void runAll(InterpreterContext context) {
for (InterpreterContextRunner r : context.getRunners()) {
if (r.getParagraphId().equals(context.getParagraphId())) {
// skip itself
continue;
}
r.run();
}
runNote(context.getNoteId());
}
@ZeppelinApi

View file

@ -148,8 +148,9 @@ public class RemoteInterpreterEventPoller extends Thread {
InterpreterContextRunner runnerFromRemote = gson.fromJson(
event.getData(), RemoteInterpreterContextRunner.class);
interpreterProcess.getInterpreterContextRunnerPool().run(
listener.onRemoteRunParagraph(
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
ResourceSet resourceSet = getAllResourcePoolExcept();
sendResourcePoolResponseGetAll(resourceSet);
@ -199,13 +200,10 @@ public class RemoteInterpreterEventPoller extends Thread {
appListener.onStatusChange(noteId, paragraphId, appId, status);
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_PARAGRAPH_RUN_CONTEXT) {
//clover
RemoteZeppelinServerController remoteControlEvent = gson.fromJson(
event.getData(), RemoteZeppelinServerController.class);
progressRemoteZeppelinControlEvent(event.getType(), listener, remoteControlEvent);
} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
Map<String, String> metaInfos = gson.fromJson(event.getData(),
new TypeToken<Map<String, String>>() {
@ -248,7 +246,6 @@ public class RemoteInterpreterEventPoller extends Thread {
new RemoteInterpreterProcessListener.RemoteWorksEventListener() {
@Override
public void onFinished(Object resultObject) {
logger.info("clover on Finished!!! send event finished");
boolean clientBroken = false;
if (resultObject != null && resultObject instanceof List) {
List<InterpreterContextRunner> runnerList =
@ -263,7 +260,6 @@ public class RemoteInterpreterEventPoller extends Thread {
RemoteInterpreterEventType.RESOURCE_PARAGRAPH_RUN_CONTEXT,
gson.toJson(resResource));
try {
logger.info("clover send event finished");
eventClient.onReceivedResourceParagraphRunners(response);
} catch (Exception e) {
clientBroken = true;
@ -277,7 +273,7 @@ public class RemoteInterpreterEventPoller extends Thread {
@Override
public void onError() {
logger.info("clover onError");
logger.info("onGetParagraphRunners onError");
}
});
}

View file

@ -25,6 +25,7 @@ public interface RemoteInterpreterProcessListener {
public void onOutputAppend(String noteId, String paragraphId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String output);
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception;
public void onGetParagraphRunners(
String noteId, String paragraphId, RemoteWorksEventListener callback);

View file

@ -346,8 +346,6 @@ public class RemoteInterpreterServer
@Override
public void onReceivedResourceParagraphRunners(
RemoteInterpreterEvent response) throws TException {
//clover
logger.info("remote zeppelin server controller feedback {}", response);
if (response.getType() == RemoteInterpreterEventType.RESOURCE_PARAGRAPH_RUN_CONTEXT) {
List<InterpreterContextRunner> intpContextRunners = new LinkedList<>();
@ -625,7 +623,6 @@ public class RemoteInterpreterServer
}
static class ZeppelinRemoteWorksController implements RemoteWorksController{
//cloverhearts
Logger logger = LoggerFactory.getLogger(ZeppelinRemoteWorksController.class);
private final long DEFAULT_TIMEOUT_VALUE = 300000;

View file

@ -165,6 +165,13 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
@Override
public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
if (callback != null) {
callback.onFinished(new LinkedList<>());
}
}
@Override
public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
}
}

View file

@ -311,6 +311,13 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
@Override
public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
if (callback != null) {
callback.onFinished(new LinkedList<>());
}
}
@Override
public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
}
}

View file

@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@ -60,6 +61,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.notebook.socket.WatcherMessage;
import org.apache.zeppelin.rest.exception.ForbiddenException;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
@ -1486,11 +1488,11 @@ public class NotebookServer extends WebSocketServlet implements
@Override
public void onGetParagraphRunners(
String noteId, String paragraphId, RemoteWorksEventListener callback) {
LOG.info("clover onGetParagraphRunners {} {}", noteId, paragraphId);
Notebook notebookIns = notebook();
List<InterpreterContextRunner> runner = new LinkedList<>();
if (notebookIns == null) {
LOG.info("intepreter request notebook instance is null");
callback.onFinished(notebookIns);
}
@ -1515,6 +1517,40 @@ public class NotebookServer extends WebSocketServlet implements
}
}
@Override
public void onRemoteRunParagraph(String noteId, String paragraphId) throws Exception {
Notebook notebookIns = notebook();
try {
if (notebookIns == null) {
throw new Exception("onRemoteRunParagraph notebook instance is null");
}
Note noteIns = notebookIns.getNote(noteId);
if (noteIns == null) {
throw new Exception(String.format("Can't found note id %s", noteId));
}
Paragraph paragraph = noteIns.getParagraph(paragraphId);
if (paragraph == null) {
throw new Exception(String.format("Can't found paragraph %s %s", noteId, paragraphId));
}
Set<String> userAndRoles = Sets.newHashSet();
userAndRoles.add(SecurityUtils.getPrincipal());
userAndRoles.addAll(SecurityUtils.getRoles());
if (!notebookIns.getNotebookAuthorization().hasWriteAuthorization(userAndRoles, noteId)) {
throw new ForbiddenException(String.format("can't execute note %s", noteId));
}
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
paragraph.setAuthenticationInfo(subject);
noteIns.run(paragraphId);
} catch (Exception e) {
throw e;
}
}
/**
* Notebook Information Change event
*/

View file

@ -483,6 +483,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
}
public InterpreterContextRunner getInterpreterContextRunner() {
return new ParagraphRunner(note, note.getId(), getId());
}