mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
App output display
This commit is contained in:
parent
bd0f467d23
commit
6223cd44d9
20 changed files with 441 additions and 51 deletions
|
|
@ -22,4 +22,6 @@ package org.apache.zeppelin.helium;
|
|||
public interface ApplicationEventListener {
|
||||
public void onOutputAppend(String noteId, String paragraphId, String appId, String output);
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output);
|
||||
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg);
|
||||
public void onStatusChange(String noteId, String paragraphId, String appId, String status);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ public class HeliumPackage {
|
|||
private String className; // entry point
|
||||
private String [][] resources; // resource classnames that requires
|
||||
// [[ .. and .. and .. ] or [ .. and .. and ..] ..]
|
||||
|
||||
private String icon;
|
||||
/**
|
||||
* Type of package
|
||||
*/
|
||||
|
|
@ -89,4 +89,8 @@ public class HeliumPackage {
|
|||
public String[][] getResources() {
|
||||
return resources;
|
||||
}
|
||||
|
||||
public String getIcon() {
|
||||
return icon;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -147,7 +147,7 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
String paragraphId = outputAppend.get("paragraphId");
|
||||
String outputToAppend = outputAppend.get("data");
|
||||
String appId = outputAppend.get("appId");
|
||||
|
||||
logger.info("Append " + outputToAppend + ", appId = " + appId);
|
||||
if (appId == null) {
|
||||
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
|
||||
} else {
|
||||
|
|
@ -161,7 +161,7 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
String paragraphId = outputAppend.get("paragraphId");
|
||||
String outputToUpdate = outputAppend.get("data");
|
||||
String appId = outputAppend.get("appId");
|
||||
|
||||
logger.info("Update " + outputToUpdate + ", appId = " + appId);
|
||||
if (appId == null) {
|
||||
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -44,6 +44,9 @@ public class HeliumRestApi {
|
|||
private Notebook notebook;
|
||||
private Gson gson = new Gson();
|
||||
|
||||
public HeliumRestApi() {
|
||||
}
|
||||
|
||||
public HeliumRestApi(Helium helium,
|
||||
HeliumApplicationFactory heliumApplicationFactory,
|
||||
Notebook notebook) {
|
||||
|
|
|
|||
|
|
@ -114,7 +114,9 @@ public class Message {
|
|||
// @param checkpointName
|
||||
|
||||
APP_APPEND_OUTPUT, // [s-c] append output
|
||||
APP_UPDATE_OUTPUT // [s-c] update (replace) output
|
||||
APP_UPDATE_OUTPUT, // [s-c] update (replace) output
|
||||
APP_LOAD, // [s-c] on app load
|
||||
APP_STATUS_CHANGE // [s-c] on app status change
|
||||
}
|
||||
|
||||
public OP op;
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ import org.apache.zeppelin.display.AngularObject;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
|
|
@ -1018,6 +1019,26 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) {
|
||||
Message msg = new Message(OP.APP_LOAD)
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("appId", appId)
|
||||
.put("pkg", pkg);
|
||||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStatusChange(String noteId, String paragraphId, String appId, String status) {
|
||||
Message msg = new Message(OP.APP_STATUS_CHANGE)
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("appId", appId)
|
||||
.put("status", status);
|
||||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Need description here.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -13,35 +13,76 @@ limitations under the License.
|
|||
-->
|
||||
|
||||
<div id="{{paragraph.id}}_switch"
|
||||
ng-if="paragraph.result.type == 'TABLE' && !asIframe && !viewOnly"
|
||||
ng-if="(paragraph.result.type == 'TABLE' || apps.length > 0 || suggestion.available && suggestion.available.length > 0) && !asIframe && !viewOnly"
|
||||
class="btn-group"
|
||||
style='margin-bottom: 10px;'>
|
||||
<button type="button" class="btn btn-default btn-sm"
|
||||
ng-if="paragraph.result.type == 'TABLE'"
|
||||
ng-class="{'active': isGraphMode('table')}"
|
||||
ng-click="setGraphMode('table', true)" ><i class="fa fa-table"></i>
|
||||
</button>
|
||||
<button type="button" class="btn btn-default btn-sm"
|
||||
ng-if="paragraph.result.type == 'TABLE'"
|
||||
ng-class="{'active': isGraphMode('multiBarChart')}"
|
||||
ng-click="setGraphMode('multiBarChart', true)"><i class="fa fa-bar-chart"></i>
|
||||
</button>
|
||||
<button type="button" class="btn btn-default btn-sm"
|
||||
ng-if="paragraph.result.type == 'TABLE'"
|
||||
ng-class="{'active': isGraphMode('pieChart')}"
|
||||
ng-click="setGraphMode('pieChart', true)"><i class="fa fa-pie-chart"></i>
|
||||
</button>
|
||||
<button type="button" class="btn btn-default btn-sm"
|
||||
ng-if="paragraph.result.type == 'TABLE'"
|
||||
ng-class="{'active': isGraphMode('stackedAreaChart')}"
|
||||
ng-click="setGraphMode('stackedAreaChart', true)"><i class="fa fa-area-chart"></i>
|
||||
</button>
|
||||
<button type="button" class="btn btn-default btn-sm"
|
||||
ng-if="paragraph.result.type == 'TABLE'"
|
||||
ng-class="{'active': isGraphMode('lineChart') || isGraphMode('lineWithFocusChart')}"
|
||||
ng-click="paragraph.config.graph.lineWithFocus ? setGraphMode('lineWithFocusChart', true) : setGraphMode('lineChart', true)"><i class="fa fa-line-chart"></i>
|
||||
</button>
|
||||
<button type="button" class="btn btn-default btn-sm"
|
||||
ng-if="paragraph.result.type == 'TABLE'"
|
||||
ng-class="{'active': isGraphMode('scatterChart')}"
|
||||
ng-click="setGraphMode('scatterChart', true)"><i class="cf cf-scatter-chart"></i>
|
||||
</button>
|
||||
<button type="button"
|
||||
class="btn btn-default btn-sm"
|
||||
ng-repeat="app in apps"
|
||||
ng-click="switchApp(app.id)"
|
||||
ng-class="{'active' : app.id == paragraph.config.helium.activeApp}"
|
||||
ng-bind-html="app.pkg.icon">
|
||||
</button>
|
||||
|
||||
</div>
|
||||
<span ng-if="getResultType()=='TABLE' && getGraphMode()!='table' && !asIframe && !viewOnly"
|
||||
|
||||
<div id="{{paragraph.id}}_helium"
|
||||
ng-if="(suggestion.available && suggestion.available.length > 0) && !asIframe && !viewOnly"
|
||||
class="btn-group"
|
||||
style='margin-bottom: 10px;'>
|
||||
<button type="button"
|
||||
class="btn btn-default btn-sm dropdown-toggle"
|
||||
ng-if="suggestion.available && suggestion.available.length > 0"
|
||||
data-toggle="dropdown"
|
||||
style="font-weight:bold">
|
||||
Helium
|
||||
</button>
|
||||
<ul class="dropdown-menu"
|
||||
ng-if="suggestion.available && suggestion.available.length > 0"
|
||||
role="menu">
|
||||
<li class="appSuggestion">
|
||||
<div ng-repeat="pkgInfo in suggestion.available">
|
||||
<a ng-click="loadApp(pkgInfo.pkg)">
|
||||
{{pkgInfo.pkg.name}}
|
||||
</a>
|
||||
</div>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
|
||||
|
||||
<span
|
||||
ng-if="getResultType()=='TABLE' && !paragraph.config.helium.activeApp && getGraphMode()!='table' && !asIframe && !viewOnly"
|
||||
style="margin-left:10px; cursor:pointer; display: inline-block; vertical-align:top; position: relative; line-height:30px;">
|
||||
<a class="btnText" ng-click="toggleGraphOption()">
|
||||
settings <span ng-class="paragraph.config.graph.optionOpen ? 'fa fa-caret-up' : 'fa fa-caret-down'"></span>
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ limitations under the License.
|
|||
-->
|
||||
<div
|
||||
id="p{{paragraph.id}}_resize"
|
||||
ng-if="!paragraph.config.helium.activeApp"
|
||||
resize='{"allowresize": "{{!asIframe && !viewOnly}}", "graphType": "{{getResultType()}}"}'
|
||||
resizable on-resize="resizeParagraph(width, height);">
|
||||
<div ng-include src="'app/notebook/paragraph/paragraph-graph.html'"></div>
|
||||
|
|
@ -59,3 +60,11 @@ limitations under the License.
|
|||
ng-bind="paragraph.errorMessage">
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div ng-repeat="app in apps">
|
||||
<div id="p{{app.id}}"
|
||||
ng-show="paragraph.config.helium.activeApp == app.id">
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
angular.module('zeppelinWebApp')
|
||||
.controller('ParagraphCtrl', function($scope,$rootScope, $route, $window, $element, $routeParams, $location,
|
||||
$timeout, $compile, websocketMsgSrv) {
|
||||
$timeout, $compile, $http, websocketMsgSrv, baseUrlSrv) {
|
||||
var ANGULAR_FUNCTION_OBJECT_NAME_PREFIX = '_Z_ANGULAR_FUNC_';
|
||||
$scope.paragraph = null;
|
||||
$scope.originalText = '';
|
||||
|
|
@ -75,10 +75,21 @@ angular.module('zeppelinWebApp')
|
|||
} else if ($scope.getResultType() === 'TEXT') {
|
||||
$scope.renderText();
|
||||
}
|
||||
|
||||
getApplicationStates();
|
||||
getSuggestions();
|
||||
|
||||
var activeApp = _.get($scope.paragraph.config, 'helium.activeApp');
|
||||
if (activeApp) {
|
||||
var app = _.find($scope.apps, {id: activeApp});
|
||||
renderApp(app);
|
||||
}
|
||||
};
|
||||
|
||||
$scope.renderHtml = function() {
|
||||
var retryRenderer = function() {
|
||||
|
||||
|
||||
$scope.renderHtml = function() {
|
||||
var retryRenderer = function() {
|
||||
if (angular.element('#p' + $scope.paragraph.id + '_html').length) {
|
||||
try {
|
||||
angular.element('#p' + $scope.paragraph.id + '_html').html($scope.paragraph.result.msg);
|
||||
|
|
@ -328,6 +339,9 @@ angular.module('zeppelinWebApp')
|
|||
|
||||
var statusChanged = (data.paragraph.status !== $scope.paragraph.status);
|
||||
|
||||
var oldActiveApp = _.get($scope.paragraph.config, 'helium.activeApp');
|
||||
var newActiveApp = _.get(data.paragraph.config, 'helium.activeApp');
|
||||
|
||||
//console.log("updateParagraph oldData %o, newData %o. type %o -> %o, mode %o -> %o", $scope.paragraph, data, oldType, newType, oldGraphMode, newGraphMode);
|
||||
|
||||
if ($scope.paragraph.text !== data.paragraph.text) {
|
||||
|
|
@ -388,6 +402,14 @@ angular.module('zeppelinWebApp')
|
|||
$scope.renderText();
|
||||
}
|
||||
|
||||
getApplicationStates();
|
||||
getSuggestions();
|
||||
|
||||
if (newActiveApp && newActiveApp !== oldActiveApp) {
|
||||
var app = _.find($scope.apps, { id : newActiveApp });
|
||||
renderApp(app);
|
||||
}
|
||||
|
||||
if (statusChanged || resultRefreshed) {
|
||||
// when last paragraph runs, zeppelin automatically appends new paragraph.
|
||||
// this broadcast will focus to the newly inserted paragraph
|
||||
|
|
@ -1160,6 +1182,9 @@ angular.module('zeppelinWebApp')
|
|||
// graph options
|
||||
newConfig.graph.mode = newMode;
|
||||
|
||||
// see switchApp()
|
||||
_.set(newConfig, 'helium.activeApp', undefined);
|
||||
|
||||
commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams);
|
||||
};
|
||||
|
||||
|
|
@ -1417,7 +1442,8 @@ angular.module('zeppelinWebApp')
|
|||
};
|
||||
|
||||
$scope.isGraphMode = function(graphName) {
|
||||
if ($scope.getResultType() === 'TABLE' && $scope.getGraphMode()===graphName) {
|
||||
var activeAppId = _.get($scope.paragraph.config, 'helium.activeApp');
|
||||
if ($scope.getResultType() === 'TABLE' && $scope.getGraphMode()===graphName && !activeAppId) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
|
@ -2142,4 +2168,173 @@ angular.module('zeppelinWebApp')
|
|||
$scope.keepScrollDown = false;
|
||||
};
|
||||
|
||||
// Helium ---------------------------------------------
|
||||
|
||||
// app states
|
||||
$scope.apps = [];
|
||||
|
||||
// suggested apps
|
||||
$scope.suggestion = {};
|
||||
|
||||
$scope.switchApp = function(appId) {
|
||||
var app = _.find($scope.apps, { id : appId });
|
||||
var config = $scope.paragraph.config;
|
||||
var settings = $scope.paragraph.settings;
|
||||
|
||||
var newConfig = angular.copy(config);
|
||||
var newParams = angular.copy(settings.params);
|
||||
|
||||
// 'helium.activeApp' can be cleared by setGraphMode()
|
||||
_.set(newConfig, 'helium.activeApp', appId);
|
||||
|
||||
commitConfig(newConfig, newParams);
|
||||
};
|
||||
|
||||
$scope.loadApp = function(heliumPackage) {
|
||||
var noteId = $route.current.pathParams.noteId;
|
||||
$http.post(baseUrlSrv.getRestApiBase() + '/helium/load/' + noteId + '/' + $scope.paragraph.id,
|
||||
heliumPackage)
|
||||
.success(function(data, status, headers, config) {
|
||||
console.log('Load app %o', data);
|
||||
})
|
||||
.error(function(err, status, headers, config) {
|
||||
console.log('Error %o', err);
|
||||
});
|
||||
};
|
||||
|
||||
var commitConfig = function(config, params) {
|
||||
var paragraph = $scope.paragraph;
|
||||
commitParagraph(paragraph.title, paragraph.text, config, params);
|
||||
};
|
||||
|
||||
var getApplicationStates = function() {
|
||||
var appStates = [];
|
||||
var paragraph = $scope.paragraph;
|
||||
|
||||
// Display ApplicationState
|
||||
if (paragraph.apps) {
|
||||
_.forEach(paragraph.apps, function (app) {
|
||||
appStates.push({
|
||||
id: app.id,
|
||||
pkg: app.pkg,
|
||||
status: app.status,
|
||||
output: app.output
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// update or remove app states no longer exists
|
||||
_.forEach($scope.apps, function(currentAppState, idx) {
|
||||
var newAppState = _.find(appStates, { id : currentAppState.id });
|
||||
if (newAppState) {
|
||||
angular.extend($scope.apps[idx], newAppState);
|
||||
} else {
|
||||
$scope.apps.splice(idx, 1);
|
||||
}
|
||||
});
|
||||
|
||||
// add new app states
|
||||
_.forEach(appStates, function(app, idx) {
|
||||
if ($scope.apps.length <= idx || $scope.apps[idx].id !== app.id) {
|
||||
$scope.apps.splice(idx, 0, app);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
var getSuggestions = function() {
|
||||
// Get suggested apps
|
||||
var noteId = $route.current.pathParams.noteId;
|
||||
$http.get(baseUrlSrv.getRestApiBase() + '/helium/suggest/' + noteId + '/' + $scope.paragraph.id)
|
||||
.success(function(data, status, headers, config) {
|
||||
console.log('Suggested apps %o', data);
|
||||
$scope.suggestion = data.body;
|
||||
})
|
||||
.error(function(err, status, headers, config) {
|
||||
console.log('Error %o', err);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
var renderApp = function(appState) {
|
||||
var retryRenderer = function() {
|
||||
var targetEl = angular.element(document.getElementById('p' + appState.id));
|
||||
console.log('retry renderApp %o', targetEl);
|
||||
if (targetEl.length) {
|
||||
try {
|
||||
console.log('renderApp %o', appState);
|
||||
targetEl.html(appState.output);
|
||||
$compile(targetEl.contents())(paragraphScope);
|
||||
} catch(err) {
|
||||
console.log('App rendering error %o', err);
|
||||
}
|
||||
} else {
|
||||
$timeout(retryRenderer, 1000);
|
||||
}
|
||||
};
|
||||
$timeout(retryRenderer);
|
||||
};
|
||||
|
||||
$scope.$on('appendAppOutput', function(event, data) {
|
||||
if ($scope.paragraph.id === data.paragraphId) {
|
||||
var app = _.find($scope.apps, { id : data.appId });
|
||||
if (app) {
|
||||
app.output += data.data;
|
||||
|
||||
var paragraphAppState = _.find($scope.paragraph.apps, { id : data.appId });
|
||||
paragraphAppState.output = app.output;
|
||||
|
||||
var targetEl = angular.element(document.getElementById('p' + app.id));
|
||||
targetEl.html(app.output);
|
||||
$compile(targetEl.contents())(paragraphScope);
|
||||
console.log('append app output %o', $scope.apps);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
$scope.$on('updateAppOutput', function(event, data) {
|
||||
if ($scope.paragraph.id === data.paragraphId) {
|
||||
var app = _.find($scope.apps, { id : data.appId });
|
||||
if (app) {
|
||||
app.output = data.data;
|
||||
|
||||
var paragraphAppState = _.find($scope.paragraph.apps, { id : data.appId });
|
||||
paragraphAppState.output = app.output;
|
||||
|
||||
var targetEl = angular.element(document.getElementById('p' + app.id));
|
||||
targetEl.html(app.output);
|
||||
$compile(targetEl.contents())(paragraphScope);
|
||||
console.log('append app output');
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
$scope.$on('appLoad', function(event, data) {
|
||||
if ($scope.paragraph.id === data.paragraphId) {
|
||||
var app = _.find($scope.apps, {id: data.appId});
|
||||
if (!app) {
|
||||
app = {
|
||||
id: data.appId,
|
||||
pkg: data.pkg,
|
||||
status: 'UNLOADED',
|
||||
output: ''
|
||||
};
|
||||
|
||||
$scope.apps.push(app);
|
||||
$scope.paragraph.apps.push(app);
|
||||
}
|
||||
|
||||
$scope.switchApp(app.id);
|
||||
}
|
||||
});
|
||||
|
||||
$scope.$on('appStatusChange', function(event, data) {
|
||||
if ($scope.paragraph.id === data.paragraphId) {
|
||||
var app = _.find($scope.apps, {id: data.appId});
|
||||
if (app) {
|
||||
app.status = data.status;
|
||||
var paragraphAppState = _.find($scope.paragraph.apps, { id : data.appId });
|
||||
paragraphAppState.status = app.status;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -69,6 +69,14 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope,
|
|||
$rootScope.$broadcast('angularObjectUpdate', data);
|
||||
} else if (op === 'ANGULAR_OBJECT_REMOVE') {
|
||||
$rootScope.$broadcast('angularObjectRemove', data);
|
||||
} else if (op === 'APP_APPEND_OUTPUT') {
|
||||
$rootScope.$broadcast('appendAppOutput', data);
|
||||
} else if (op === 'APP_UPDATE_OUTPUT') {
|
||||
$rootScope.$broadcast('updateAppOutput', data);
|
||||
} else if (op === 'APP_LOAD') {
|
||||
$rootScope.$broadcast('appLoad', data);
|
||||
} else if (op === 'APP_STATUS_CHANGE') {
|
||||
$rootScope.$broadcast('appStatusChange', data);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,13 @@ describe('Controller: ParagraphCtrl', function() {
|
|||
var paragraphMock = {
|
||||
config: {}
|
||||
};
|
||||
var route = {
|
||||
current : {
|
||||
pathParams : {
|
||||
noteId : 'noteId'
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
beforeEach(inject(function($controller, $rootScope) {
|
||||
scope = $rootScope.$new();
|
||||
|
|
@ -18,8 +25,10 @@ describe('Controller: ParagraphCtrl', function() {
|
|||
ParagraphCtrl = $controller('ParagraphCtrl', {
|
||||
$scope: scope,
|
||||
websocketMsgSrv: websocketMsgSrvMock,
|
||||
$element: {}
|
||||
$element: {},
|
||||
$route: route
|
||||
});
|
||||
|
||||
scope.init(paragraphMock);
|
||||
}));
|
||||
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ public class Helium {
|
|||
|
||||
/**
|
||||
* Add HeliumRegistry
|
||||
*
|
||||
* @param registry
|
||||
*/
|
||||
public void addRegistry(HeliumRegistry registry) {
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
|
|||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.notebook.*;
|
||||
import org.apache.zeppelin.scheduler.ExecutorFactory;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -36,12 +37,7 @@ import java.util.concurrent.ExecutorService;
|
|||
/**
|
||||
* HeliumApplicationFactory
|
||||
*
|
||||
* 2. unload on interpreter restart
|
||||
* 3. front-end job
|
||||
* 4. example app
|
||||
* 5. dev mode
|
||||
* 6. app launcher
|
||||
* 7. offline mode. front-end table data / pivot panel access
|
||||
* TODO(moon): unload apps on interpreter restart
|
||||
*/
|
||||
public class HeliumApplicationFactory implements ApplicationEventListener, NotebookEventListener {
|
||||
private final Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
|
||||
|
|
@ -65,6 +61,8 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
*/
|
||||
public String loadAndRun(HeliumPackage pkg, Paragraph paragraph) {
|
||||
ApplicationState appState = paragraph.createOrGetApplicationState(pkg);
|
||||
onLoad(paragraph.getNote().getId(), paragraph.getId(), appState.getId(),
|
||||
appState.getHeliumPackage());
|
||||
executor.submit(new LoadApplication(appState, pkg, paragraph));
|
||||
return appState.getId();
|
||||
}
|
||||
|
|
@ -104,7 +102,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
logger.error(e.getMessage(), e);
|
||||
|
||||
if (appState != null) {
|
||||
appState.setStatus(ApplicationState.Status.ERROR);
|
||||
appStatusChange(paragraph, appState.getId(), ApplicationState.Status.ERROR);
|
||||
appState.setOutput(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
@ -113,23 +111,20 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
private void load(RemoteInterpreterProcess intpProcess, ApplicationState appState)
|
||||
throws Exception {
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
throw new ApplicationException(e);
|
||||
}
|
||||
RemoteInterpreterService.Client client = null;
|
||||
|
||||
synchronized (appState) {
|
||||
if (appState.getStatus() == ApplicationState.Status.LOADED) {
|
||||
// already loaded
|
||||
return;
|
||||
}
|
||||
appState.setStatus(ApplicationState.Status.LOADING);
|
||||
|
||||
try {
|
||||
appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADING);
|
||||
String pkgInfo = gson.toJson(pkg);
|
||||
String appId = appState.getId();
|
||||
|
||||
client = intpProcess.getClient();
|
||||
RemoteApplicationResult ret = client.loadApplication(
|
||||
appId,
|
||||
pkgInfo,
|
||||
|
|
@ -137,7 +132,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
paragraph.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
appState.setStatus(ApplicationState.Status.LOADED);
|
||||
appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED);
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
|
|
@ -145,7 +140,9 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
intpProcess.releaseBrokenClient(client);
|
||||
throw e;
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
if (client != null) {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -194,12 +191,15 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
logger.warn("Can not find {} to unload from {}", appId, paragraph.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
if (appState.getStatus() == ApplicationState.Status.UNLOADED) {
|
||||
// not loaded
|
||||
return;
|
||||
}
|
||||
unload(appState);
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
if (appState != null) {
|
||||
appState.setStatus(ApplicationState.Status.ERROR);
|
||||
appStatusChange(paragraph, appId, ApplicationState.Status.ERROR);
|
||||
appState.setOutput(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
@ -211,7 +211,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
throw new ApplicationException(
|
||||
"Can't unload application status " + appsToUnload.getStatus());
|
||||
}
|
||||
appsToUnload.setStatus(ApplicationState.Status.UNLOADING);
|
||||
appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADING);
|
||||
Interpreter intp = paragraph.getCurrentRepl();
|
||||
if (intp == null) {
|
||||
throw new ApplicationException("No interpreter found");
|
||||
|
|
@ -234,7 +234,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
|
||||
|
||||
if (ret.isSuccess()) {
|
||||
appsToUnload.setStatus(ApplicationState.Status.UNLOADED);
|
||||
appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED);
|
||||
} else {
|
||||
throw new ApplicationException(ret.getMsg());
|
||||
}
|
||||
|
|
@ -286,7 +286,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
if (appState != null) {
|
||||
appState.setStatus(ApplicationState.Status.ERROR);
|
||||
appStatusChange(paragraph, appId, ApplicationState.Status.UNLOADED);
|
||||
appState.setOutput(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
@ -309,7 +309,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
if (intpProcess == null) {
|
||||
throw new ApplicationException("Target interpreter process is not running");
|
||||
}
|
||||
RemoteInterpreterService.Client client;
|
||||
RemoteInterpreterService.Client client = null;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
} catch (Exception e) {
|
||||
|
|
@ -326,9 +326,12 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
}
|
||||
} catch (TException e) {
|
||||
intpProcess.releaseBrokenClient(client);
|
||||
client = null;
|
||||
throw new ApplicationException(e);
|
||||
} finally {
|
||||
intpProcess.releaseClient(client);
|
||||
if (client != null) {
|
||||
intpProcess.releaseClient(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -364,6 +367,28 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) {
|
||||
if (applicationEventListener != null) {
|
||||
applicationEventListener.onLoad(noteId, paragraphId, appId, pkg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStatusChange(String noteId, String paragraphId, String appId, String status) {
|
||||
if (applicationEventListener != null) {
|
||||
applicationEventListener.onStatusChange(noteId, paragraphId, appId, status);
|
||||
}
|
||||
}
|
||||
|
||||
private void appStatusChange(Paragraph paragraph,
|
||||
String appId,
|
||||
ApplicationState.Status status) {
|
||||
ApplicationState app = paragraph.getApplicationState(appId);
|
||||
app.setStatus(status);
|
||||
onStatusChange(paragraph.getNote().getId(), paragraph.getId(), appId, status.toString());
|
||||
}
|
||||
|
||||
private ApplicationState getAppState(String noteId, String paragraphId, String appId) {
|
||||
if (notebook == null) {
|
||||
return null;
|
||||
|
|
@ -437,4 +462,16 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
public void onParagraphCreate(Paragraph p) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onParagraphStatusChange(Paragraph p, Job.Status status) {
|
||||
if (status == Job.Status.FINISHED) {
|
||||
// refresh application
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
|
||||
for (ApplicationState app : appStates) {
|
||||
loadAndRun(app.getHeliumPackage(), p);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
|
||||
/**
|
||||
* Current state of application
|
||||
*/
|
||||
|
|
@ -35,12 +37,12 @@ public class ApplicationState {
|
|||
Status status = Status.UNLOADED;
|
||||
|
||||
String id; // unique id for this instance. Similar to note id or paragraph id
|
||||
String name; // name of app
|
||||
HeliumPackage pkg;
|
||||
String output;
|
||||
|
||||
public ApplicationState(String id, String name) {
|
||||
public ApplicationState(String id, HeliumPackage pkg) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
this.pkg = pkg;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -59,19 +61,17 @@ public class ApplicationState {
|
|||
public boolean equals(Object o) {
|
||||
String compareName;
|
||||
if (o instanceof ApplicationState) {
|
||||
compareName = ((ApplicationState) o).name;
|
||||
} else if (o instanceof String) {
|
||||
compareName = (String) o;
|
||||
return pkg.equals(((ApplicationState) o).getHeliumPackage());
|
||||
} else if (o instanceof HeliumPackage) {
|
||||
return pkg.equals((HeliumPackage) o);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
return name.equals(compareName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return name.hashCode();
|
||||
return pkg.hashCode();
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
|
@ -102,7 +102,7 @@ public class ApplicationState {
|
|||
}
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
public HeliumPackage getHeliumPackage() {
|
||||
return pkg;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.Input;
|
||||
import org.apache.zeppelin.helium.HeliumApplicationFactory;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
|
|
@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Binded interpreters for a note
|
||||
*/
|
||||
public class Note implements Serializable, JobListener {
|
||||
public class Note implements Serializable, ParagraphJobListener {
|
||||
static Logger logger = LoggerFactory.getLogger(Note.class);
|
||||
private static final long serialVersionUID = 7920699076577612429L;
|
||||
|
||||
|
|
@ -369,7 +370,7 @@ public class Note implements Serializable, JobListener {
|
|||
continue;
|
||||
}
|
||||
p.setNoteReplLoader(replLoader);
|
||||
p.setListener(jobListenerFactory.getParagraphJobListener(this));
|
||||
p.setListener(this);
|
||||
Interpreter intp = replLoader.get(p.getRequiredReplName());
|
||||
intp.getScheduler().submit(p);
|
||||
}
|
||||
|
|
@ -384,7 +385,7 @@ public class Note implements Serializable, JobListener {
|
|||
public void run(String paragraphId) {
|
||||
Paragraph p = getParagraph(paragraphId);
|
||||
p.setNoteReplLoader(replLoader);
|
||||
p.setListener(jobListenerFactory.getParagraphJobListener(this));
|
||||
p.setListener(this);
|
||||
Interpreter intp = replLoader.get(p.getRequiredReplName());
|
||||
if (intp == null) {
|
||||
throw new InterpreterException("Interpreter " + p.getRequiredReplName() + " not found");
|
||||
|
|
@ -517,14 +518,45 @@ public class Note implements Serializable, JobListener {
|
|||
|
||||
@Override
|
||||
public void beforeStatusChange(Job job, Status before, Status after) {
|
||||
if (jobListenerFactory != null) {
|
||||
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
|
||||
listener.beforeStatusChange(job, before, after);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterStatusChange(Job job, Status before, Status after) {
|
||||
if (jobListenerFactory != null) {
|
||||
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
|
||||
listener.afterStatusChange(job, before, after);
|
||||
}
|
||||
noteEventListener.onParagraphStatusChange((Paragraph) job, after);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProgressUpdate(Job job, int progress) {}
|
||||
public void onProgressUpdate(Job job, int progress) {
|
||||
if (jobListenerFactory != null) {
|
||||
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
|
||||
listener.onProgressUpdate(job, progress);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
|
||||
if (jobListenerFactory != null) {
|
||||
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
|
||||
listener.onOutputAppend(paragraph, out, output);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
|
||||
if (jobListenerFactory != null) {
|
||||
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
|
||||
listener.onOutputUpdate(paragraph, out, output);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
@ -535,4 +567,5 @@ public class Note implements Serializable, JobListener {
|
|||
public void setNoteEventListener(NoteEventListener noteEventListener) {
|
||||
this.noteEventListener = noteEventListener;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,12 @@
|
|||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
|
||||
/**
|
||||
* NoteEventListener
|
||||
*/
|
||||
public interface NoteEventListener {
|
||||
public void onParagraphRemove(Paragraph p);
|
||||
public void onParagraphCreate(Paragraph p);
|
||||
public void onParagraphStatusChange(Paragraph p, Job.Status status);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
|||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
|
||||
import org.apache.zeppelin.resource.ResourcePoolUtils;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.apache.zeppelin.search.SearchService;
|
||||
import org.quartz.CronScheduleBuilder;
|
||||
|
|
@ -646,4 +647,11 @@ public class Notebook implements NoteEventListener {
|
|||
listener.onParagraphCreate(p);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onParagraphStatusChange(Paragraph p, Job.Status status) {
|
||||
for (NotebookEventListener listener : notebookEventListeners) {
|
||||
listener.onParagraphStatusChange(p, status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -408,13 +408,13 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
public ApplicationState createOrGetApplicationState(HeliumPackage pkg) {
|
||||
synchronized (apps) {
|
||||
for (ApplicationState as : apps) {
|
||||
if (as.getName().equals(pkg.getName())) {
|
||||
if (as.equals(pkg)) {
|
||||
return as;
|
||||
}
|
||||
}
|
||||
|
||||
String appId = getApplicationId(pkg);
|
||||
ApplicationState appState = new ApplicationState(appId, pkg.getName());
|
||||
ApplicationState appState = new ApplicationState(appId, pkg);
|
||||
apps.add(appState);
|
||||
return appState;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import org.apache.commons.vfs2.Selectors;
|
|||
import org.apache.commons.vfs2.VFS;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
import org.apache.zeppelin.notebook.ApplicationState;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.NoteInfo;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
|
|
@ -169,6 +170,15 @@ public class VFSNotebookRepo implements NotebookRepo {
|
|||
if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
|
||||
p.setStatus(Status.ABORT);
|
||||
}
|
||||
|
||||
List<ApplicationState> appStates = p.getAllApplicationStates();
|
||||
if (appStates != null) {
|
||||
for (ApplicationState app : appStates) {
|
||||
if (app.getStatus() != ApplicationState.Status.ERROR) {
|
||||
app.setStatus(ApplicationState.Status.UNLOADED);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return note;
|
||||
|
|
|
|||
|
|
@ -659,6 +659,10 @@ public class NotebookTest implements JobListenerFactory{
|
|||
public void onParagraphCreate(Paragraph p) {
|
||||
onParagraphCreate.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onParagraphStatusChange(Paragraph p, Status status) {
|
||||
}
|
||||
});
|
||||
|
||||
Note note1 = notebook.createNote();
|
||||
|
|
|
|||
Loading…
Reference in a new issue