mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Implement runNote and re implement run method
This commit is contained in:
parent
f9661c843b
commit
10c2a47047
8 changed files with 78 additions and 18 deletions
|
|
@ -340,6 +340,27 @@ public class ZeppelinContext {
|
|||
|
||||
}
|
||||
|
||||
public void runNote(String noteId) {
|
||||
runNote(noteId, interpreterContext);
|
||||
}
|
||||
|
||||
public void runNote(String noteId, InterpreterContext context) {
|
||||
String runningNoteId = context.getNoteId();
|
||||
String runningParagraphId = context.getParagraphId();
|
||||
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
|
||||
|
||||
if (runners.size() <= 0) {
|
||||
throw new InterpreterException("Note " + noteId + " not found " + runners.size());
|
||||
}
|
||||
|
||||
for (InterpreterContextRunner r : runners) {
|
||||
if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) {
|
||||
continue;
|
||||
}
|
||||
r.run();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get Zeppelin Paragraph Runner from zeppelin server
|
||||
|
|
@ -440,13 +461,7 @@ public class ZeppelinContext {
|
|||
*/
|
||||
@ZeppelinApi
|
||||
public void runAll(InterpreterContext context) {
|
||||
for (InterpreterContextRunner r : context.getRunners()) {
|
||||
if (r.getParagraphId().equals(context.getParagraphId())) {
|
||||
// skip itself
|
||||
continue;
|
||||
}
|
||||
r.run();
|
||||
}
|
||||
runNote(context.getNoteId());
|
||||
}
|
||||
|
||||
@ZeppelinApi
|
||||
|
|
|
|||
|
|
@ -148,8 +148,9 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
InterpreterContextRunner runnerFromRemote = gson.fromJson(
|
||||
event.getData(), RemoteInterpreterContextRunner.class);
|
||||
|
||||
interpreterProcess.getInterpreterContextRunnerPool().run(
|
||||
listener.onRemoteRunParagraph(
|
||||
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
|
||||
|
||||
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
|
||||
ResourceSet resourceSet = getAllResourcePoolExcept();
|
||||
sendResourcePoolResponseGetAll(resourceSet);
|
||||
|
|
@ -199,13 +200,10 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
|
||||
appListener.onStatusChange(noteId, paragraphId, appId, status);
|
||||
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_PARAGRAPH_RUN_CONTEXT) {
|
||||
//clover
|
||||
RemoteZeppelinServerController remoteControlEvent = gson.fromJson(
|
||||
event.getData(), RemoteZeppelinServerController.class);
|
||||
progressRemoteZeppelinControlEvent(event.getType(), listener, remoteControlEvent);
|
||||
|
||||
|
||||
|
||||
} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
|
||||
Map<String, String> metaInfos = gson.fromJson(event.getData(),
|
||||
new TypeToken<Map<String, String>>() {
|
||||
|
|
@ -248,7 +246,6 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
new RemoteInterpreterProcessListener.RemoteWorksEventListener() {
|
||||
@Override
|
||||
public void onFinished(Object resultObject) {
|
||||
logger.info("clover on Finished!!! send event finished");
|
||||
boolean clientBroken = false;
|
||||
if (resultObject != null && resultObject instanceof List) {
|
||||
List<InterpreterContextRunner> runnerList =
|
||||
|
|
@ -263,7 +260,6 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
RemoteInterpreterEventType.RESOURCE_PARAGRAPH_RUN_CONTEXT,
|
||||
gson.toJson(resResource));
|
||||
try {
|
||||
logger.info("clover send event finished");
|
||||
eventClient.onReceivedResourceParagraphRunners(response);
|
||||
} catch (Exception e) {
|
||||
clientBroken = true;
|
||||
|
|
@ -277,7 +273,7 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
|
||||
@Override
|
||||
public void onError() {
|
||||
logger.info("clover onError");
|
||||
logger.info("onGetParagraphRunners onError");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ public interface RemoteInterpreterProcessListener {
|
|||
public void onOutputAppend(String noteId, String paragraphId, String output);
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String output);
|
||||
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
|
||||
public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception;
|
||||
public void onGetParagraphRunners(
|
||||
String noteId, String paragraphId, RemoteWorksEventListener callback);
|
||||
|
||||
|
|
|
|||
|
|
@ -346,8 +346,6 @@ public class RemoteInterpreterServer
|
|||
@Override
|
||||
public void onReceivedResourceParagraphRunners(
|
||||
RemoteInterpreterEvent response) throws TException {
|
||||
//clover
|
||||
logger.info("remote zeppelin server controller feedback {}", response);
|
||||
if (response.getType() == RemoteInterpreterEventType.RESOURCE_PARAGRAPH_RUN_CONTEXT) {
|
||||
List<InterpreterContextRunner> intpContextRunners = new LinkedList<>();
|
||||
|
||||
|
|
@ -625,7 +623,6 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
static class ZeppelinRemoteWorksController implements RemoteWorksController{
|
||||
//cloverhearts
|
||||
Logger logger = LoggerFactory.getLogger(ZeppelinRemoteWorksController.class);
|
||||
|
||||
private final long DEFAULT_TIMEOUT_VALUE = 300000;
|
||||
|
|
|
|||
|
|
@ -165,6 +165,13 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
|
|||
|
||||
@Override
|
||||
public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
|
||||
if (callback != null) {
|
||||
callback.onFinished(new LinkedList<>());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -311,6 +311,13 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
|
||||
@Override
|
||||
public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
|
||||
if (callback != null) {
|
||||
callback.onFinished(new LinkedList<>());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.vfs2.FileSystemException;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
|
|
@ -60,6 +61,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
|
|||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.apache.zeppelin.notebook.socket.Message.OP;
|
||||
import org.apache.zeppelin.notebook.socket.WatcherMessage;
|
||||
import org.apache.zeppelin.rest.exception.ForbiddenException;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.server.ZeppelinServer;
|
||||
|
|
@ -1486,11 +1488,11 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
@Override
|
||||
public void onGetParagraphRunners(
|
||||
String noteId, String paragraphId, RemoteWorksEventListener callback) {
|
||||
LOG.info("clover onGetParagraphRunners {} {}", noteId, paragraphId);
|
||||
Notebook notebookIns = notebook();
|
||||
List<InterpreterContextRunner> runner = new LinkedList<>();
|
||||
|
||||
if (notebookIns == null) {
|
||||
LOG.info("intepreter request notebook instance is null");
|
||||
callback.onFinished(notebookIns);
|
||||
}
|
||||
|
||||
|
|
@ -1515,6 +1517,40 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteRunParagraph(String noteId, String paragraphId) throws Exception {
|
||||
Notebook notebookIns = notebook();
|
||||
try {
|
||||
if (notebookIns == null) {
|
||||
throw new Exception("onRemoteRunParagraph notebook instance is null");
|
||||
}
|
||||
Note noteIns = notebookIns.getNote(noteId);
|
||||
if (noteIns == null) {
|
||||
throw new Exception(String.format("Can't found note id %s", noteId));
|
||||
}
|
||||
|
||||
Paragraph paragraph = noteIns.getParagraph(paragraphId);
|
||||
if (paragraph == null) {
|
||||
throw new Exception(String.format("Can't found paragraph %s %s", noteId, paragraphId));
|
||||
}
|
||||
|
||||
Set<String> userAndRoles = Sets.newHashSet();
|
||||
userAndRoles.add(SecurityUtils.getPrincipal());
|
||||
userAndRoles.addAll(SecurityUtils.getRoles());
|
||||
if (!notebookIns.getNotebookAuthorization().hasWriteAuthorization(userAndRoles, noteId)) {
|
||||
throw new ForbiddenException(String.format("can't execute note %s", noteId));
|
||||
}
|
||||
|
||||
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
|
||||
paragraph.setAuthenticationInfo(subject);
|
||||
|
||||
noteIns.run(paragraphId);
|
||||
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Notebook Information Change event
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -483,6 +483,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
}
|
||||
|
||||
public InterpreterContextRunner getInterpreterContextRunner() {
|
||||
|
||||
return new ParagraphRunner(note, note.getId(), getId());
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue