mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
feat: Save spell result and propagate
This commit is contained in:
parent
3cdf2daebd
commit
9fb74388d3
7 changed files with 344 additions and 122 deletions
|
|
@ -262,6 +262,9 @@ public class NotebookServer extends WebSocketServlet
|
|||
case RUN_PARAGRAPH:
|
||||
runParagraph(conn, userAndRoles, notebook, messagereceived);
|
||||
break;
|
||||
case PARAGRAPH_EXECUTED_BY_SPELL:
|
||||
broadcastSpellExecution(conn, userAndRoles, notebook, messagereceived);
|
||||
break;
|
||||
case RUN_ALL_PARAGRAPHS:
|
||||
runAllParagraphs(conn, userAndRoles, notebook, messagereceived);
|
||||
break;
|
||||
|
|
@ -1574,6 +1577,45 @@ public class NotebookServer extends WebSocketServlet
|
|||
}
|
||||
}
|
||||
|
||||
private void broadcastSpellExecution(NotebookSocket conn, HashSet<String> userAndRoles,
|
||||
Notebook notebook, Message fromMessage)
|
||||
throws IOException {
|
||||
|
||||
final String paragraphId = (String) fromMessage.get("id");
|
||||
if (paragraphId == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String noteId = getOpenNoteId(conn);
|
||||
final Note note = notebook.getNote(noteId);
|
||||
NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
|
||||
if (!notebookAuthorization.isWriter(noteId, userAndRoles)) {
|
||||
permissionError(conn, "write", fromMessage.principal, userAndRoles,
|
||||
notebookAuthorization.getWriters(noteId));
|
||||
return;
|
||||
}
|
||||
|
||||
String text = (String) fromMessage.get("paragraph");
|
||||
String title = (String) fromMessage.get("title");
|
||||
Status status = Status.valueOf((String) fromMessage.get("status"));
|
||||
Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
|
||||
Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
|
||||
|
||||
Paragraph p = setParagraphUsingMessage(note, fromMessage, paragraphId,
|
||||
text, title, params, config);
|
||||
p.setStatus(status);
|
||||
p.setResult(fromMessage.get("results"));
|
||||
|
||||
addNewParagraphIfLastParagraphIsExecuted(note, p);
|
||||
if (!persistNoteWithAuthInfo(conn, note, p)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// broadcast to other clients only
|
||||
broadcastExcept(note.getId(),
|
||||
new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn);
|
||||
}
|
||||
|
||||
private void runParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
|
||||
Message fromMessage) throws IOException {
|
||||
final String paragraphId = (String) fromMessage.get("id");
|
||||
|
|
@ -1600,8 +1642,7 @@ public class NotebookServer extends WebSocketServlet
|
|||
persistAndExecuteSingleParagraph(conn, note, p);
|
||||
}
|
||||
|
||||
private void persistAndExecuteSingleParagraph(NotebookSocket conn,
|
||||
Note note, Paragraph p) throws IOException {
|
||||
private void addNewParagraphIfLastParagraphIsExecuted(Note note, Paragraph p) {
|
||||
// if it's the last paragraph and empty, let's add a new one
|
||||
boolean isTheLastParagraph = note.isLastParagraph(p.getId());
|
||||
if (!(p.getText().trim().equals(p.getMagic()) ||
|
||||
|
|
@ -1610,15 +1651,30 @@ public class NotebookServer extends WebSocketServlet
|
|||
Paragraph newPara = note.addParagraph(p.getAuthenticationInfo());
|
||||
broadcastNewParagraph(note, newPara);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return false if failed to save a note
|
||||
*/
|
||||
private boolean persistNoteWithAuthInfo(NotebookSocket conn,
|
||||
Note note, Paragraph p) throws IOException {
|
||||
try {
|
||||
note.persist(p.getAuthenticationInfo());
|
||||
return true;
|
||||
} catch (FileSystemException ex) {
|
||||
LOG.error("Exception from run", ex);
|
||||
conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
|
||||
"Oops! There is something wrong with the notebook file system. "
|
||||
+ "Please check the logs for more details.")));
|
||||
// don't run the paragraph when there is error on persisting the note information
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void persistAndExecuteSingleParagraph(NotebookSocket conn,
|
||||
Note note, Paragraph p) throws IOException {
|
||||
addNewParagraphIfLastParagraphIsExecuted(note, p);
|
||||
if (!persistNoteWithAuthInfo(conn, note, p)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { SpellResult } from '../../spell'
|
||||
import {
|
||||
SpellResult,
|
||||
} from '../../spell';
|
||||
|
||||
angular.module('zeppelinWebApp').controller('ParagraphCtrl', ParagraphCtrl);
|
||||
|
||||
|
|
@ -225,15 +227,32 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
|
|||
websocketMsgSrv.cancelParagraphRun(paragraph.id);
|
||||
};
|
||||
|
||||
$scope.handleSpellError = function(error, digestRequired) {
|
||||
$scope.propagateSpellResult = function(paragraphId, paragraphTitle,
|
||||
paragraphText, paragraphResults, paragraphStatus,
|
||||
paragraphConfig, paragraphSettingsParam) {
|
||||
websocketMsgSrv.paragraphExecutedBySpell(
|
||||
paragraphId, paragraphTitle,
|
||||
paragraphText, paragraphResults, paragraphStatus,
|
||||
paragraphConfig, paragraphSettingsParam);
|
||||
};
|
||||
|
||||
$scope.handleSpellError = function(paragraphText, error,
|
||||
digestRequired, propagated) {
|
||||
$scope.paragraph.status = 'ERROR';
|
||||
$scope.paragraph.errorMessage = error.stack;
|
||||
console.error('Failed to execute interpret() in spell\n', error);
|
||||
if (digestRequired) { $scope.$digest(); }
|
||||
|
||||
if (!propagated) {
|
||||
$scope.propagateSpellResult(
|
||||
$scope.paragraph.id, $scope.paragraph.title,
|
||||
paragraphText, [], $scope.paragraph.status,
|
||||
$scope.paragraph.config, $scope.paragraph.settings.params);
|
||||
}
|
||||
};
|
||||
|
||||
$scope.runParagraphUsingSpell = function(spell, paragraphText,
|
||||
magic, digestRequired) {
|
||||
magic, digestRequired, propagated) {
|
||||
$scope.paragraph.results = {};
|
||||
if (digestRequired) { $scope.$digest(); }
|
||||
|
||||
|
|
@ -242,21 +261,34 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
|
|||
const splited = paragraphText.split(magic);
|
||||
// remove leading spaces
|
||||
const textWithoutMagic = splited[1].replace(/^\s+/g, '');
|
||||
$scope.paragraph.status = 'FINISHED';
|
||||
const spellResult = spell.interpret(textWithoutMagic);
|
||||
const parsed = spellResult.getAllParsedDataWithTypes(
|
||||
heliumService.getAllSpells());
|
||||
heliumService.getAllSpells(), magic, textWithoutMagic);
|
||||
|
||||
// handle actual result message in promise
|
||||
parsed.then(resultsMsg => {
|
||||
const status = 'FINISHED';
|
||||
$scope.paragraph.status = status;
|
||||
$scope.paragraph.errorMessage = '';
|
||||
$scope.paragraph.results.code = status;
|
||||
$scope.paragraph.results.msg = resultsMsg;
|
||||
$scope.paragraph.config.tableHide = false;
|
||||
if (digestRequired) { $scope.$digest(); }
|
||||
|
||||
if (!propagated) {
|
||||
const propagable = SpellResult.createPropagable(resultsMsg);
|
||||
$scope.propagateSpellResult(
|
||||
$scope.paragraph.id, $scope.paragraph.title,
|
||||
paragraphText, propagable, status,
|
||||
$scope.paragraph.config, $scope.paragraph.settings.params);
|
||||
}
|
||||
}).catch(error => {
|
||||
$scope.handleSpellError(error, digestRequired);
|
||||
$scope.handleSpellError(paragraphText, error,
|
||||
digestRequired, propagated);
|
||||
});
|
||||
} catch (error) {
|
||||
$scope.handleSpellError(error, digestRequired);
|
||||
$scope.handleSpellError(paragraphText, error,
|
||||
digestRequired, propagated);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -280,7 +312,12 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
|
|||
commitParagraph(paragraph);
|
||||
};
|
||||
|
||||
$scope.runParagraph = function(paragraphText, digestRequired) {
|
||||
/**
|
||||
* @param paragraphText to be parsed
|
||||
* @param digestRequired true if calling `$digest` is required
|
||||
* @param propagated true if update request is sent from other client
|
||||
*/
|
||||
$scope.runParagraph = function(paragraphText, digestRequired, propagated) {
|
||||
if (!paragraphText || $scope.isRunning($scope.paragraph)) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -290,7 +327,7 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
|
|||
|
||||
if (spell) {
|
||||
$scope.runParagraphUsingSpell(
|
||||
spell, paragraphText, magic, digestRequired);
|
||||
spell, paragraphText, magic, digestRequired, propagated);
|
||||
} else {
|
||||
$scope.runParagraphUsingBackendInterpreter(paragraphText);
|
||||
}
|
||||
|
|
@ -312,12 +349,12 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
|
|||
$scope.runParagraphFromShortcut = function(paragraphText) {
|
||||
// passing `digestRequired` as true to update view immediately
|
||||
// without this, results cannot be rendered in view more than once
|
||||
$scope.runParagraph(paragraphText, true);
|
||||
$scope.runParagraph(paragraphText, true, false);
|
||||
};
|
||||
|
||||
$scope.runParagraphFromButton = function(paragraphText) {
|
||||
// we come here from `$scope.on`, so we don't need to call `$digest()`
|
||||
$scope.runParagraph(paragraphText, false)
|
||||
// we come here from the view, so we don't need to call `$digest()`
|
||||
$scope.runParagraph(paragraphText, false, false)
|
||||
};
|
||||
|
||||
$scope.moveUp = function(paragraph) {
|
||||
|
|
@ -1032,101 +1069,146 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
|
|||
}
|
||||
});
|
||||
|
||||
$scope.$on('updateParagraph', function(event, data) {
|
||||
if (data.paragraph.id === $scope.paragraph.id &&
|
||||
(data.paragraph.dateCreated !== $scope.paragraph.dateCreated ||
|
||||
data.paragraph.dateFinished !== $scope.paragraph.dateFinished ||
|
||||
data.paragraph.dateStarted !== $scope.paragraph.dateStarted ||
|
||||
data.paragraph.dateUpdated !== $scope.paragraph.dateUpdated ||
|
||||
data.paragraph.status !== $scope.paragraph.status ||
|
||||
data.paragraph.jobName !== $scope.paragraph.jobName ||
|
||||
data.paragraph.title !== $scope.paragraph.title ||
|
||||
isEmpty(data.paragraph.results) !== isEmpty($scope.paragraph.results) ||
|
||||
data.paragraph.errorMessage !== $scope.paragraph.errorMessage ||
|
||||
!angular.equals(data.paragraph.settings, $scope.paragraph.settings) ||
|
||||
!angular.equals(data.paragraph.config, $scope.paragraph.config))
|
||||
) {
|
||||
var statusChanged = (data.paragraph.status !== $scope.paragraph.status);
|
||||
var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished) ||
|
||||
isEmpty(data.paragraph.results) !== isEmpty($scope.paragraph.results) ||
|
||||
data.paragraph.status === 'ERROR' || (data.paragraph.status === 'FINISHED' && statusChanged);
|
||||
/**
|
||||
* @returns {boolean} true if updated is needed
|
||||
*/
|
||||
function isUpdateRequired(oldPara, newPara) {
|
||||
return (newPara.id === oldPara.id &&
|
||||
(newPara.dateCreated !== oldPara.dateCreated ||
|
||||
newPara.dateFinished !== oldPara.dateFinished ||
|
||||
newPara.dateStarted !== oldPara.dateStarted ||
|
||||
newPara.dateUpdated !== oldPara.dateUpdated ||
|
||||
newPara.status !== oldPara.status ||
|
||||
newPara.jobName !== oldPara.jobName ||
|
||||
newPara.title !== oldPara.title ||
|
||||
isEmpty(newPara.results) !== isEmpty(oldPara.results) ||
|
||||
newPara.errorMessage !== oldPara.errorMessage ||
|
||||
!angular.equals(newPara.settings, oldPara.settings) ||
|
||||
!angular.equals(newPara.config, oldPara.config)))
|
||||
}
|
||||
|
||||
if ($scope.paragraph.text !== data.paragraph.text) {
|
||||
if ($scope.dirtyText) { // check if editor has local update
|
||||
if ($scope.dirtyText === data.paragraph.text) { // when local update is the same from remote, clear local update
|
||||
$scope.paragraph.text = data.paragraph.text;
|
||||
$scope.dirtyText = undefined;
|
||||
$scope.originalText = angular.copy(data.paragraph.text);
|
||||
} else { // if there're local update, keep it.
|
||||
$scope.paragraph.text = data.paragraph.text;
|
||||
}
|
||||
} else {
|
||||
$scope.paragraph.text = data.paragraph.text;
|
||||
$scope.originalText = angular.copy(data.paragraph.text);
|
||||
$scope.updateAllScopeTexts = function(oldPara, newPara) {
|
||||
if (oldPara.text !== newPara.text) {
|
||||
if ($scope.dirtyText) { // check if editor has local update
|
||||
if ($scope.dirtyText === newPara.text) { // when local update is the same from remote, clear local update
|
||||
$scope.paragraph.text = newPara.text;
|
||||
$scope.dirtyText = undefined;
|
||||
$scope.originalText = angular.copy(newPara.text);
|
||||
|
||||
} else { // if there're local update, keep it.
|
||||
$scope.paragraph.text = newPara.text;
|
||||
}
|
||||
}
|
||||
|
||||
/** broadcast update to result controller **/
|
||||
if (data.paragraph.results && data.paragraph.results.msg) {
|
||||
for (var i in data.paragraph.results.msg) {
|
||||
var newResult = data.paragraph.results.msg ? data.paragraph.results.msg[i] : {};
|
||||
var oldResult = ($scope.paragraph.results && $scope.paragraph.results.msg) ?
|
||||
$scope.paragraph.results.msg[i] : {};
|
||||
var newConfig = data.paragraph.config.results ? data.paragraph.config.results[i] : {};
|
||||
var oldConfig = $scope.paragraph.config.results ? $scope.paragraph.config.results[i] : {};
|
||||
if (!angular.equals(newResult, oldResult) ||
|
||||
!angular.equals(newConfig, oldConfig)) {
|
||||
$rootScope.$broadcast('updateResult', newResult, newConfig, data.paragraph, parseInt(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// resize col width
|
||||
if ($scope.paragraph.config.colWidth !== data.paragraph.colWidth) {
|
||||
$rootScope.$broadcast('paragraphResized', $scope.paragraph.id);
|
||||
}
|
||||
|
||||
/** push the rest */
|
||||
$scope.paragraph.aborted = data.paragraph.aborted;
|
||||
$scope.paragraph.user = data.paragraph.user;
|
||||
$scope.paragraph.dateUpdated = data.paragraph.dateUpdated;
|
||||
$scope.paragraph.dateCreated = data.paragraph.dateCreated;
|
||||
$scope.paragraph.dateFinished = data.paragraph.dateFinished;
|
||||
$scope.paragraph.dateStarted = data.paragraph.dateStarted;
|
||||
$scope.paragraph.errorMessage = data.paragraph.errorMessage;
|
||||
$scope.paragraph.jobName = data.paragraph.jobName;
|
||||
$scope.paragraph.title = data.paragraph.title;
|
||||
$scope.paragraph.lineNumbers = data.paragraph.lineNumbers;
|
||||
$scope.paragraph.status = data.paragraph.status;
|
||||
if (data.paragraph.status !== 'RUNNING') {
|
||||
$scope.paragraph.results = data.paragraph.results;
|
||||
}
|
||||
$scope.paragraph.settings = data.paragraph.settings;
|
||||
if ($scope.editor) {
|
||||
$scope.editor.setReadOnly($scope.isRunning(data.paragraph));
|
||||
}
|
||||
|
||||
if (!$scope.asIframe) {
|
||||
$scope.paragraph.config = data.paragraph.config;
|
||||
initializeDefault(data.paragraph.config);
|
||||
} else {
|
||||
data.paragraph.config.editorHide = true;
|
||||
data.paragraph.config.tableHide = false;
|
||||
$scope.paragraph.config = data.paragraph.config;
|
||||
}
|
||||
|
||||
if (statusChanged || resultRefreshed) {
|
||||
// when last paragraph runs, zeppelin automatically appends new paragraph.
|
||||
// this broadcast will focus to the newly inserted paragraph
|
||||
var paragraphs = angular.element('div[id$="_paragraphColumn_main"]');
|
||||
if (paragraphs.length >= 2 && paragraphs[paragraphs.length - 2].id.indexOf($scope.paragraph.id) === 0) {
|
||||
// rendering output can took some time. So delay scrolling event firing for sometime.
|
||||
setTimeout(function() {
|
||||
$rootScope.$broadcast('scrollToCursor');
|
||||
}, 500);
|
||||
}
|
||||
$scope.paragraph.text = newPara.text;
|
||||
$scope.originalText = angular.copy(newPara.text);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
$scope.updateParagraphObjectWhenUpdated = function(newPara) {
|
||||
// resize col width
|
||||
if ($scope.paragraph.config.colWidth !== newPara.colWidth) {
|
||||
$rootScope.$broadcast('paragraphResized', $scope.paragraph.id);
|
||||
}
|
||||
|
||||
/** push the rest */
|
||||
$scope.paragraph.aborted = newPara.aborted;
|
||||
$scope.paragraph.user = newPara.user;
|
||||
$scope.paragraph.dateUpdated = newPara.dateUpdated;
|
||||
$scope.paragraph.dateCreated = newPara.dateCreated;
|
||||
$scope.paragraph.dateFinished = newPara.dateFinished;
|
||||
$scope.paragraph.dateStarted = newPara.dateStarted;
|
||||
$scope.paragraph.errorMessage = newPara.errorMessage;
|
||||
$scope.paragraph.jobName = newPara.jobName;
|
||||
$scope.paragraph.title = newPara.title;
|
||||
$scope.paragraph.lineNumbers = newPara.lineNumbers;
|
||||
$scope.paragraph.status = newPara.status;
|
||||
if (newPara.status !== 'RUNNING') {
|
||||
$scope.paragraph.results = newPara.results;
|
||||
}
|
||||
$scope.paragraph.settings = newPara.settings;
|
||||
if ($scope.editor) {
|
||||
$scope.editor.setReadOnly($scope.isRunning(newPara));
|
||||
}
|
||||
|
||||
if (!$scope.asIframe) {
|
||||
$scope.paragraph.config = newPara.config;
|
||||
initializeDefault(newPara.config);
|
||||
} else {
|
||||
newPara.config.editorHide = true;
|
||||
newPara.config.tableHide = false;
|
||||
$scope.paragraph.config = newPara.config;
|
||||
}
|
||||
};
|
||||
|
||||
$scope.updateParagraph = function(oldPara, newPara, updateCallback) {
|
||||
// 1. get status, refreshed
|
||||
const statusChanged = (newPara.status !== oldPara.status);
|
||||
const resultRefreshed = (newPara.dateFinished !== oldPara.dateFinished) ||
|
||||
isEmpty(newPara.results) !== isEmpty(oldPara.results) ||
|
||||
newPara.status === 'ERROR' || (newPara.status === 'FINISHED' && statusChanged);
|
||||
|
||||
// 2. update texts managed by $scope
|
||||
$scope.updateAllScopeTexts(oldPara, newPara);
|
||||
|
||||
// 3. execute callback to update result
|
||||
updateCallback();
|
||||
|
||||
// 4. update remaining paragraph objects
|
||||
$scope.updateParagraphObjectWhenUpdated(newPara);
|
||||
|
||||
// 5. handle scroll down by key properly if new paragraph is added
|
||||
if (statusChanged || resultRefreshed) {
|
||||
// when last paragraph runs, zeppelin automatically appends new paragraph.
|
||||
// this broadcast will focus to the newly inserted paragraph
|
||||
const paragraphs = angular.element('div[id$="_paragraphColumn_main"]');
|
||||
if (paragraphs.length >= 2 && paragraphs[paragraphs.length - 2].id.indexOf($scope.paragraph.id) === 0) {
|
||||
// rendering output can took some time. So delay scrolling event firing for sometime.
|
||||
setTimeout(() => { $rootScope.$broadcast('scrollToCursor'); }, 500);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
$scope.$on('runParagraphUsingSpell', function(event, data) {
|
||||
const oldPara = $scope.paragraph;
|
||||
let newPara = data.paragraph;
|
||||
const updateCallback = () => {
|
||||
$scope.runParagraph(newPara.text, true, true);
|
||||
};
|
||||
|
||||
if (!isUpdateRequired(oldPara, newPara)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$scope.updateParagraph(oldPara, newPara, updateCallback)
|
||||
});
|
||||
|
||||
$scope.$on('updateParagraph', function(event, data) {
|
||||
const oldPara = $scope.paragraph;
|
||||
const newPara = data.paragraph;
|
||||
|
||||
if (!isUpdateRequired(oldPara, newPara)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const updateCallback = () => {
|
||||
// broadcast `updateResult` message to trigger result update
|
||||
if (newPara.results && newPara.results.msg) {
|
||||
for (let i in newPara.results.msg) {
|
||||
const newResult = newPara.results.msg ? newPara.results.msg[i] : {};
|
||||
const oldResult = (newPara.results && newPara.results.msg) ?
|
||||
newPara.results.msg[i] : {};
|
||||
const newConfig = newPara.config.results ? newPara.config.results[i] : {};
|
||||
const oldConfig = newPara.config.results ? newPara.config.results[i] : {};
|
||||
if (!angular.equals(newResult, oldResult) ||
|
||||
!angular.equals(newConfig, oldConfig)) {
|
||||
$rootScope.$broadcast('updateResult', newResult, newConfig, newPara, parseInt(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
$scope.updateParagraph(oldPara, newPara, updateCallback)
|
||||
});
|
||||
|
||||
$scope.$on('updateProgress', function(event, data) {
|
||||
|
|
|
|||
|
|
@ -326,20 +326,30 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location
|
|||
|
||||
// custom display result can include multiple subset results
|
||||
parsed.then(dataWithTypes => {
|
||||
const containerDOM = document.getElementById(`p${$scope.id}_custom`);
|
||||
for(let i = 0; i < dataWithTypes.length; i++) {
|
||||
const dt = dataWithTypes[i];
|
||||
const data = dt.data;
|
||||
const type = dt.type;
|
||||
function retry() {
|
||||
const containerDOM = angular.element(`#p${$scope.id}_custom`);
|
||||
if (!containerDOM.length) {
|
||||
$timeout(retry, 10);
|
||||
return;
|
||||
}
|
||||
|
||||
// prepare DOM to be filled
|
||||
const subResultDOMId = $scope.createDisplayDOMId(`p${$scope.id}_custom`, type);
|
||||
const subResultDOM = document.createElement('div');
|
||||
containerDOM.appendChild(subResultDOM);
|
||||
subResultDOM.setAttribute('id', subResultDOMId);
|
||||
// Spell.interpret() can create multiple outputs
|
||||
for(let i = 0; i < dataWithTypes.length; i++) {
|
||||
const dt = dataWithTypes[i];
|
||||
const data = dt.data;
|
||||
const type = dt.type;
|
||||
|
||||
$scope.renderDefaultDisplay(subResultDOMId, type, data, true);
|
||||
// prepare each DOM to be filled
|
||||
const subResultDOMId = $scope.createDisplayDOMId(`p${$scope.id}_custom_${i}`, type);
|
||||
const subResultDOM = document.createElement('div');
|
||||
containerDOM.append(subResultDOM);
|
||||
subResultDOM.setAttribute('id', subResultDOMId);
|
||||
|
||||
$scope.renderDefaultDisplay(subResultDOMId, type, data, true);
|
||||
}
|
||||
}
|
||||
|
||||
$timeout(retry);
|
||||
}).catch(error => {
|
||||
console.error(`Failed to render custom display: ${$scope.type}\n` + error);
|
||||
});
|
||||
|
|
|
|||
|
|
@ -32,9 +32,20 @@ export const DefaultDisplayMagic = {
|
|||
};
|
||||
|
||||
export class DataWithType {
|
||||
constructor(data, type) {
|
||||
constructor(data, type, magic, text) {
|
||||
this.data = data;
|
||||
this.type = type;
|
||||
|
||||
/**
|
||||
* keep for `DefaultDisplayType.ELEMENT` (function data type)
|
||||
* to propagate a result to other client.
|
||||
*
|
||||
* otherwise we will send function as `data` and it will not work
|
||||
* since they don't have context where they are created.
|
||||
*/
|
||||
|
||||
this.magic = magic;
|
||||
this.text = text;
|
||||
}
|
||||
|
||||
static handleDefaultMagic(m) {
|
||||
|
|
@ -47,6 +58,17 @@ export class DataWithType {
|
|||
}
|
||||
}
|
||||
|
||||
static createPropagable(dataWithType) {
|
||||
if (!SpellResult.isFunction(dataWithType.data)) {
|
||||
return dataWithType;
|
||||
}
|
||||
|
||||
const data = dataWithType.getText();
|
||||
const type = dataWithType.getMagic();
|
||||
|
||||
return new DataWithType(data, type);
|
||||
}
|
||||
|
||||
/**
|
||||
* consume 1 data and produce multiple
|
||||
* @param data {string}
|
||||
|
|
@ -100,9 +122,12 @@ export class DataWithType {
|
|||
* object, function or promise
|
||||
* @param dataWithType {DataWithType}
|
||||
* @param availableDisplays {Object} Map for available displays
|
||||
* @param magic
|
||||
* @param textWithoutMagic
|
||||
* @return {Promise<Array<DataWithType>>}
|
||||
*/
|
||||
static produceMultipleData(dataWithType, customDisplayType) {
|
||||
static produceMultipleData(dataWithType, customDisplayType,
|
||||
magic, textWithoutMagic) {
|
||||
const data = dataWithType.getData();
|
||||
const type = dataWithType.getType();
|
||||
|
||||
|
|
@ -117,7 +142,9 @@ export class DataWithType {
|
|||
if (SpellResult.isFunction(data)) {
|
||||
// if data is a function, we consider it as ELEMENT type.
|
||||
wrapped = new Promise((resolve) => {
|
||||
const result = [new DataWithType(data, DefaultDisplayType.ELEMENT)];
|
||||
const dt = new DataWithType(
|
||||
data, DefaultDisplayType.ELEMENT, magic, textWithoutMagic);
|
||||
const result = [dt];
|
||||
return resolve(result);
|
||||
});
|
||||
} else if (SpellResult.isPromise(data)) {
|
||||
|
|
@ -162,6 +189,14 @@ export class DataWithType {
|
|||
getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
getMagic() {
|
||||
return this.magic;
|
||||
}
|
||||
|
||||
getText() {
|
||||
return this.text;
|
||||
}
|
||||
}
|
||||
|
||||
export class SpellResult {
|
||||
|
|
@ -198,6 +233,11 @@ export class SpellResult {
|
|||
return undefined;
|
||||
}
|
||||
|
||||
static createPropagable(resultMsg) {
|
||||
return resultMsg.map(dt => {
|
||||
return DataWithType.createPropagable(dt);
|
||||
})
|
||||
}
|
||||
|
||||
add(resultData, resultType) {
|
||||
if (resultData) {
|
||||
|
|
@ -210,11 +250,13 @@ export class SpellResult {
|
|||
|
||||
/**
|
||||
* @param customDisplayType
|
||||
* @param textWithoutMagic
|
||||
* @return {Promise<Array<DataWithType>>}
|
||||
*/
|
||||
getAllParsedDataWithTypes(customDisplayType) {
|
||||
const promises = this.dataWithTypes.map(gt => {
|
||||
return DataWithType.produceMultipleData(gt, customDisplayType);
|
||||
getAllParsedDataWithTypes(customDisplayType, magic, textWithoutMagic) {
|
||||
const promises = this.dataWithTypes.map(dt => {
|
||||
return DataWithType.produceMultipleData(
|
||||
dt, customDisplayType, magic, textWithoutMagic);
|
||||
});
|
||||
|
||||
// some promises can include an array so we need to flatten them
|
||||
|
|
|
|||
|
|
@ -105,6 +105,8 @@ function websocketEvents($rootScope, $websocket, $location, baseUrlSrv) {
|
|||
|
||||
} else if (op === 'PARAGRAPH') {
|
||||
$rootScope.$broadcast('updateParagraph', data);
|
||||
} else if (op === 'RUN_PARAGRAPH_USING_SPELL') {
|
||||
$rootScope.$broadcast('runParagraphUsingSpell', data);
|
||||
} else if (op === 'PARAGRAPH_APPEND_OUTPUT') {
|
||||
$rootScope.$broadcast('appendParagraphOutput', data);
|
||||
} else if (op === 'PARAGRAPH_UPDATE_OUTPUT') {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,11 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
SpellResult,
|
||||
DefaultDisplayType,
|
||||
} from '../../app/spell';
|
||||
|
||||
angular.module('zeppelinWebApp').service('websocketMsgSrv', websocketMsgSrv);
|
||||
|
||||
websocketMsgSrv.$inject = ['$rootScope', 'websocketEvents'];
|
||||
|
|
@ -159,6 +164,29 @@ function websocketMsgSrv($rootScope, websocketEvents) {
|
|||
websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}});
|
||||
},
|
||||
|
||||
paragraphExecutedBySpell: function(paragraphId, paragraphTitle,
|
||||
paragraphText, paragraphResultsMsg, paragraphStatus,
|
||||
paragraphConfig, paragraphParams) {
|
||||
websocketEvents.sendNewEvent({
|
||||
op: 'PARAGRAPH_EXECUTED_BY_SPELL',
|
||||
data: {
|
||||
id: paragraphId,
|
||||
title: paragraphTitle,
|
||||
paragraph: paragraphText,
|
||||
results: {
|
||||
code: paragraphStatus,
|
||||
msg: paragraphResultsMsg.map(dataWithType => {
|
||||
let serializedData = dataWithType.data;
|
||||
return { type: dataWithType.type, data: serializedData, };
|
||||
})
|
||||
},
|
||||
status: paragraphStatus,
|
||||
config: paragraphConfig,
|
||||
params: paragraphParams
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
runParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams) {
|
||||
websocketEvents.sendNewEvent({
|
||||
op: 'RUN_PARAGRAPH',
|
||||
|
|
|
|||
|
|
@ -172,7 +172,9 @@ public class Message {
|
|||
PARAGRAPH_REMOVED, // [s-c] paragraph deleted
|
||||
PARAGRAPH_MOVED, // [s-c] paragraph moved
|
||||
NOTE_UPDATED, // [s-c] paragraph updated(name, config)
|
||||
RUN_ALL_PARAGRAPHS // [c-s] run all paragraphs
|
||||
RUN_ALL_PARAGRAPHS, // [c-s] run all paragraphs
|
||||
PARAGRAPH_EXECUTED_BY_SPELL, // [c-s] paragraph was executed by spell
|
||||
RUN_PARAGRAPH_USING_SPELL // [s-c] run paragraph using spell
|
||||
}
|
||||
|
||||
public static final Message EMPTY = new Message(null);
|
||||
|
|
|
|||
Loading…
Reference in a new issue