App output display

This commit is contained in:
Lee moon soo 2016-04-12 10:10:58 +09:00
parent bd0f467d23
commit 6223cd44d9
20 changed files with 441 additions and 51 deletions

View file

@ -22,4 +22,6 @@ package org.apache.zeppelin.helium;
public interface ApplicationEventListener {
public void onOutputAppend(String noteId, String paragraphId, String appId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output);
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg);
public void onStatusChange(String noteId, String paragraphId, String appId, String status);
}

View file

@ -27,7 +27,7 @@ public class HeliumPackage {
private String className; // entry point
private String [][] resources; // resource classnames that requires
// [[ .. and .. and .. ] or [ .. and .. and ..] ..]
private String icon;
/**
* Type of package
*/
@ -89,4 +89,8 @@ public class HeliumPackage {
public String[][] getResources() {
return resources;
}
public String getIcon() {
return icon;
}
}

View file

@ -147,7 +147,7 @@ public class RemoteInterpreterEventPoller extends Thread {
String paragraphId = outputAppend.get("paragraphId");
String outputToAppend = outputAppend.get("data");
String appId = outputAppend.get("appId");
logger.info("Append " + outputToAppend + ", appId = " + appId);
if (appId == null) {
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
} else {
@ -161,7 +161,7 @@ public class RemoteInterpreterEventPoller extends Thread {
String paragraphId = outputAppend.get("paragraphId");
String outputToUpdate = outputAppend.get("data");
String appId = outputAppend.get("appId");
logger.info("Update " + outputToUpdate + ", appId = " + appId);
if (appId == null) {
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
} else {

View file

@ -44,6 +44,9 @@ public class HeliumRestApi {
private Notebook notebook;
private Gson gson = new Gson();
public HeliumRestApi() {
}
public HeliumRestApi(Helium helium,
HeliumApplicationFactory heliumApplicationFactory,
Notebook notebook) {

View file

@ -114,7 +114,9 @@ public class Message {
// @param checkpointName
APP_APPEND_OUTPUT, // [s-c] append output
APP_UPDATE_OUTPUT // [s-c] update (replace) output
APP_UPDATE_OUTPUT, // [s-c] update (replace) output
APP_LOAD, // [s-c] on app load
APP_STATUS_CHANGE // [s-c] on app status change
}
public OP op;

View file

@ -35,6 +35,7 @@ import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.user.AuthenticationInfo;
@ -1018,6 +1019,26 @@ public class NotebookServer extends WebSocketServlet implements
broadcast(noteId, msg);
}
@Override
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) {
Message msg = new Message(OP.APP_LOAD)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("appId", appId)
.put("pkg", pkg);
broadcast(noteId, msg);
}
@Override
public void onStatusChange(String noteId, String paragraphId, String appId, String status) {
Message msg = new Message(OP.APP_STATUS_CHANGE)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("appId", appId)
.put("status", status);
broadcast(noteId, msg);
}
/**
* Need description here.
*

View file

@ -13,35 +13,76 @@ limitations under the License.
-->
<div id="{{paragraph.id}}_switch"
ng-if="paragraph.result.type == 'TABLE' && !asIframe && !viewOnly"
ng-if="(paragraph.result.type == 'TABLE' || apps.length > 0 || suggestion.available && suggestion.available.length > 0) && !asIframe && !viewOnly"
class="btn-group"
style='margin-bottom: 10px;'>
<button type="button" class="btn btn-default btn-sm"
ng-if="paragraph.result.type == 'TABLE'"
ng-class="{'active': isGraphMode('table')}"
ng-click="setGraphMode('table', true)" ><i class="fa fa-table"></i>
</button>
<button type="button" class="btn btn-default btn-sm"
ng-if="paragraph.result.type == 'TABLE'"
ng-class="{'active': isGraphMode('multiBarChart')}"
ng-click="setGraphMode('multiBarChart', true)"><i class="fa fa-bar-chart"></i>
</button>
<button type="button" class="btn btn-default btn-sm"
ng-if="paragraph.result.type == 'TABLE'"
ng-class="{'active': isGraphMode('pieChart')}"
ng-click="setGraphMode('pieChart', true)"><i class="fa fa-pie-chart"></i>
</button>
<button type="button" class="btn btn-default btn-sm"
ng-if="paragraph.result.type == 'TABLE'"
ng-class="{'active': isGraphMode('stackedAreaChart')}"
ng-click="setGraphMode('stackedAreaChart', true)"><i class="fa fa-area-chart"></i>
</button>
<button type="button" class="btn btn-default btn-sm"
ng-if="paragraph.result.type == 'TABLE'"
ng-class="{'active': isGraphMode('lineChart') || isGraphMode('lineWithFocusChart')}"
ng-click="paragraph.config.graph.lineWithFocus ? setGraphMode('lineWithFocusChart', true) : setGraphMode('lineChart', true)"><i class="fa fa-line-chart"></i>
</button>
<button type="button" class="btn btn-default btn-sm"
ng-if="paragraph.result.type == 'TABLE'"
ng-class="{'active': isGraphMode('scatterChart')}"
ng-click="setGraphMode('scatterChart', true)"><i class="cf cf-scatter-chart"></i>
</button>
<button type="button"
class="btn btn-default btn-sm"
ng-repeat="app in apps"
ng-click="switchApp(app.id)"
ng-class="{'active' : app.id == paragraph.config.helium.activeApp}"
ng-bind-html="app.pkg.icon">
</button>
</div>
<span ng-if="getResultType()=='TABLE' && getGraphMode()!='table' && !asIframe && !viewOnly"
<div id="{{paragraph.id}}_helium"
ng-if="(suggestion.available && suggestion.available.length > 0) && !asIframe && !viewOnly"
class="btn-group"
style='margin-bottom: 10px;'>
<button type="button"
class="btn btn-default btn-sm dropdown-toggle"
ng-if="suggestion.available && suggestion.available.length > 0"
data-toggle="dropdown"
style="font-weight:bold">
Helium
</button>
<ul class="dropdown-menu"
ng-if="suggestion.available && suggestion.available.length > 0"
role="menu">
<li class="appSuggestion">
<div ng-repeat="pkgInfo in suggestion.available">
<a ng-click="loadApp(pkgInfo.pkg)">
{{pkgInfo.pkg.name}}
</a>
</div>
</li>
</ul>
</div>
<span
ng-if="getResultType()=='TABLE' && !paragraph.config.helium.activeApp && getGraphMode()!='table' && !asIframe && !viewOnly"
style="margin-left:10px; cursor:pointer; display: inline-block; vertical-align:top; position: relative; line-height:30px;">
<a class="btnText" ng-click="toggleGraphOption()">
settings <span ng-class="paragraph.config.graph.optionOpen ? 'fa fa-caret-up' : 'fa fa-caret-down'"></span>

View file

@ -13,6 +13,7 @@ limitations under the License.
-->
<div
id="p{{paragraph.id}}_resize"
ng-if="!paragraph.config.helium.activeApp"
resize='{"allowresize": "{{!asIframe && !viewOnly}}", "graphType": "{{getResultType()}}"}'
resizable on-resize="resizeParagraph(width, height);">
<div ng-include src="'app/notebook/paragraph/paragraph-graph.html'"></div>
@ -59,3 +60,11 @@ limitations under the License.
ng-bind="paragraph.errorMessage">
</div>
</div>
<div ng-repeat="app in apps">
<div id="p{{app.id}}"
ng-show="paragraph.config.helium.activeApp == app.id">
</div>
</div>

View file

@ -16,7 +16,7 @@
angular.module('zeppelinWebApp')
.controller('ParagraphCtrl', function($scope,$rootScope, $route, $window, $element, $routeParams, $location,
$timeout, $compile, websocketMsgSrv) {
$timeout, $compile, $http, websocketMsgSrv, baseUrlSrv) {
var ANGULAR_FUNCTION_OBJECT_NAME_PREFIX = '_Z_ANGULAR_FUNC_';
$scope.paragraph = null;
$scope.originalText = '';
@ -75,10 +75,21 @@ angular.module('zeppelinWebApp')
} else if ($scope.getResultType() === 'TEXT') {
$scope.renderText();
}
getApplicationStates();
getSuggestions();
var activeApp = _.get($scope.paragraph.config, 'helium.activeApp');
if (activeApp) {
var app = _.find($scope.apps, {id: activeApp});
renderApp(app);
}
};
$scope.renderHtml = function() {
var retryRenderer = function() {
$scope.renderHtml = function() {
var retryRenderer = function() {
if (angular.element('#p' + $scope.paragraph.id + '_html').length) {
try {
angular.element('#p' + $scope.paragraph.id + '_html').html($scope.paragraph.result.msg);
@ -328,6 +339,9 @@ angular.module('zeppelinWebApp')
var statusChanged = (data.paragraph.status !== $scope.paragraph.status);
var oldActiveApp = _.get($scope.paragraph.config, 'helium.activeApp');
var newActiveApp = _.get(data.paragraph.config, 'helium.activeApp');
//console.log("updateParagraph oldData %o, newData %o. type %o -> %o, mode %o -> %o", $scope.paragraph, data, oldType, newType, oldGraphMode, newGraphMode);
if ($scope.paragraph.text !== data.paragraph.text) {
@ -388,6 +402,14 @@ angular.module('zeppelinWebApp')
$scope.renderText();
}
getApplicationStates();
getSuggestions();
if (newActiveApp && newActiveApp !== oldActiveApp) {
var app = _.find($scope.apps, { id : newActiveApp });
renderApp(app);
}
if (statusChanged || resultRefreshed) {
// when last paragraph runs, zeppelin automatically appends new paragraph.
// this broadcast will focus to the newly inserted paragraph
@ -1160,6 +1182,9 @@ angular.module('zeppelinWebApp')
// graph options
newConfig.graph.mode = newMode;
// see switchApp()
_.set(newConfig, 'helium.activeApp', undefined);
commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams);
};
@ -1417,7 +1442,8 @@ angular.module('zeppelinWebApp')
};
$scope.isGraphMode = function(graphName) {
if ($scope.getResultType() === 'TABLE' && $scope.getGraphMode()===graphName) {
var activeAppId = _.get($scope.paragraph.config, 'helium.activeApp');
if ($scope.getResultType() === 'TABLE' && $scope.getGraphMode()===graphName && !activeAppId) {
return true;
} else {
return false;
@ -2142,4 +2168,173 @@ angular.module('zeppelinWebApp')
$scope.keepScrollDown = false;
};
// Helium ---------------------------------------------
// app states
$scope.apps = [];
// suggested apps
$scope.suggestion = {};
$scope.switchApp = function(appId) {
var app = _.find($scope.apps, { id : appId });
var config = $scope.paragraph.config;
var settings = $scope.paragraph.settings;
var newConfig = angular.copy(config);
var newParams = angular.copy(settings.params);
// 'helium.activeApp' can be cleared by setGraphMode()
_.set(newConfig, 'helium.activeApp', appId);
commitConfig(newConfig, newParams);
};
$scope.loadApp = function(heliumPackage) {
var noteId = $route.current.pathParams.noteId;
$http.post(baseUrlSrv.getRestApiBase() + '/helium/load/' + noteId + '/' + $scope.paragraph.id,
heliumPackage)
.success(function(data, status, headers, config) {
console.log('Load app %o', data);
})
.error(function(err, status, headers, config) {
console.log('Error %o', err);
});
};
var commitConfig = function(config, params) {
var paragraph = $scope.paragraph;
commitParagraph(paragraph.title, paragraph.text, config, params);
};
var getApplicationStates = function() {
var appStates = [];
var paragraph = $scope.paragraph;
// Display ApplicationState
if (paragraph.apps) {
_.forEach(paragraph.apps, function (app) {
appStates.push({
id: app.id,
pkg: app.pkg,
status: app.status,
output: app.output
});
});
}
// update or remove app states no longer exists
_.forEach($scope.apps, function(currentAppState, idx) {
var newAppState = _.find(appStates, { id : currentAppState.id });
if (newAppState) {
angular.extend($scope.apps[idx], newAppState);
} else {
$scope.apps.splice(idx, 1);
}
});
// add new app states
_.forEach(appStates, function(app, idx) {
if ($scope.apps.length <= idx || $scope.apps[idx].id !== app.id) {
$scope.apps.splice(idx, 0, app);
}
});
};
var getSuggestions = function() {
// Get suggested apps
var noteId = $route.current.pathParams.noteId;
$http.get(baseUrlSrv.getRestApiBase() + '/helium/suggest/' + noteId + '/' + $scope.paragraph.id)
.success(function(data, status, headers, config) {
console.log('Suggested apps %o', data);
$scope.suggestion = data.body;
})
.error(function(err, status, headers, config) {
console.log('Error %o', err);
});
};
var renderApp = function(appState) {
var retryRenderer = function() {
var targetEl = angular.element(document.getElementById('p' + appState.id));
console.log('retry renderApp %o', targetEl);
if (targetEl.length) {
try {
console.log('renderApp %o', appState);
targetEl.html(appState.output);
$compile(targetEl.contents())(paragraphScope);
} catch(err) {
console.log('App rendering error %o', err);
}
} else {
$timeout(retryRenderer, 1000);
}
};
$timeout(retryRenderer);
};
$scope.$on('appendAppOutput', function(event, data) {
if ($scope.paragraph.id === data.paragraphId) {
var app = _.find($scope.apps, { id : data.appId });
if (app) {
app.output += data.data;
var paragraphAppState = _.find($scope.paragraph.apps, { id : data.appId });
paragraphAppState.output = app.output;
var targetEl = angular.element(document.getElementById('p' + app.id));
targetEl.html(app.output);
$compile(targetEl.contents())(paragraphScope);
console.log('append app output %o', $scope.apps);
}
}
});
$scope.$on('updateAppOutput', function(event, data) {
if ($scope.paragraph.id === data.paragraphId) {
var app = _.find($scope.apps, { id : data.appId });
if (app) {
app.output = data.data;
var paragraphAppState = _.find($scope.paragraph.apps, { id : data.appId });
paragraphAppState.output = app.output;
var targetEl = angular.element(document.getElementById('p' + app.id));
targetEl.html(app.output);
$compile(targetEl.contents())(paragraphScope);
console.log('append app output');
}
}
});
$scope.$on('appLoad', function(event, data) {
if ($scope.paragraph.id === data.paragraphId) {
var app = _.find($scope.apps, {id: data.appId});
if (!app) {
app = {
id: data.appId,
pkg: data.pkg,
status: 'UNLOADED',
output: ''
};
$scope.apps.push(app);
$scope.paragraph.apps.push(app);
}
$scope.switchApp(app.id);
}
});
$scope.$on('appStatusChange', function(event, data) {
if ($scope.paragraph.id === data.paragraphId) {
var app = _.find($scope.apps, {id: data.appId});
if (app) {
app.status = data.status;
var paragraphAppState = _.find($scope.paragraph.apps, { id : data.appId });
paragraphAppState.status = app.status;
}
}
});
});

View file

@ -69,6 +69,14 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope,
$rootScope.$broadcast('angularObjectUpdate', data);
} else if (op === 'ANGULAR_OBJECT_REMOVE') {
$rootScope.$broadcast('angularObjectRemove', data);
} else if (op === 'APP_APPEND_OUTPUT') {
$rootScope.$broadcast('appendAppOutput', data);
} else if (op === 'APP_UPDATE_OUTPUT') {
$rootScope.$broadcast('updateAppOutput', data);
} else if (op === 'APP_LOAD') {
$rootScope.$broadcast('appLoad', data);
} else if (op === 'APP_STATUS_CHANGE') {
$rootScope.$broadcast('appStatusChange', data);
}
});

View file

@ -10,6 +10,13 @@ describe('Controller: ParagraphCtrl', function() {
var paragraphMock = {
config: {}
};
var route = {
current : {
pathParams : {
noteId : 'noteId'
}
}
};
beforeEach(inject(function($controller, $rootScope) {
scope = $rootScope.$new();
@ -18,8 +25,10 @@ describe('Controller: ParagraphCtrl', function() {
ParagraphCtrl = $controller('ParagraphCtrl', {
$scope: scope,
websocketMsgSrv: websocketMsgSrvMock,
$element: {}
$element: {},
$route: route
});
scope.init(paragraphMock);
}));

View file

@ -62,6 +62,7 @@ public class Helium {
/**
* Add HeliumRegistry
*
* @param registry
*/
public void addRegistry(HeliumRegistry registry) {

View file

@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.scheduler.ExecutorFactory;
import org.apache.zeppelin.scheduler.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,12 +37,7 @@ import java.util.concurrent.ExecutorService;
/**
* HeliumApplicationFactory
*
* 2. unload on interpreter restart
* 3. front-end job
* 4. example app
* 5. dev mode
* 6. app launcher
* 7. offline mode. front-end table data / pivot panel access
* TODO(moon): unload apps on interpreter restart
*/
public class HeliumApplicationFactory implements ApplicationEventListener, NotebookEventListener {
private final Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
@ -65,6 +61,8 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
*/
public String loadAndRun(HeliumPackage pkg, Paragraph paragraph) {
ApplicationState appState = paragraph.createOrGetApplicationState(pkg);
onLoad(paragraph.getNote().getId(), paragraph.getId(), appState.getId(),
appState.getHeliumPackage());
executor.submit(new LoadApplication(appState, pkg, paragraph));
return appState.getId();
}
@ -104,7 +102,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
logger.error(e.getMessage(), e);
if (appState != null) {
appState.setStatus(ApplicationState.Status.ERROR);
appStatusChange(paragraph, appState.getId(), ApplicationState.Status.ERROR);
appState.setOutput(e.getMessage());
}
}
@ -113,23 +111,20 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
private void load(RemoteInterpreterProcess intpProcess, ApplicationState appState)
throws Exception {
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
} catch (Exception e) {
throw new ApplicationException(e);
}
RemoteInterpreterService.Client client = null;
synchronized (appState) {
if (appState.getStatus() == ApplicationState.Status.LOADED) {
// already loaded
return;
}
appState.setStatus(ApplicationState.Status.LOADING);
try {
appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADING);
String pkgInfo = gson.toJson(pkg);
String appId = appState.getId();
client = intpProcess.getClient();
RemoteApplicationResult ret = client.loadApplication(
appId,
pkgInfo,
@ -137,7 +132,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
paragraph.getId());
if (ret.isSuccess()) {
appState.setStatus(ApplicationState.Status.LOADED);
appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED);
} else {
throw new ApplicationException(ret.getMsg());
}
@ -145,7 +140,9 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
intpProcess.releaseBrokenClient(client);
throw e;
} finally {
intpProcess.releaseClient(client);
if (client != null) {
intpProcess.releaseClient(client);
}
}
}
}
@ -194,12 +191,15 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
logger.warn("Can not find {} to unload from {}", appId, paragraph.getId());
return;
}
if (appState.getStatus() == ApplicationState.Status.UNLOADED) {
// not loaded
return;
}
unload(appState);
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (appState != null) {
appState.setStatus(ApplicationState.Status.ERROR);
appStatusChange(paragraph, appId, ApplicationState.Status.ERROR);
appState.setOutput(e.getMessage());
}
}
@ -211,7 +211,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
throw new ApplicationException(
"Can't unload application status " + appsToUnload.getStatus());
}
appsToUnload.setStatus(ApplicationState.Status.UNLOADING);
appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADING);
Interpreter intp = paragraph.getCurrentRepl();
if (intp == null) {
throw new ApplicationException("No interpreter found");
@ -234,7 +234,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
if (ret.isSuccess()) {
appsToUnload.setStatus(ApplicationState.Status.UNLOADED);
appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED);
} else {
throw new ApplicationException(ret.getMsg());
}
@ -286,7 +286,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (appState != null) {
appState.setStatus(ApplicationState.Status.ERROR);
appStatusChange(paragraph, appId, ApplicationState.Status.UNLOADED);
appState.setOutput(e.getMessage());
}
}
@ -309,7 +309,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
if (intpProcess == null) {
throw new ApplicationException("Target interpreter process is not running");
}
RemoteInterpreterService.Client client;
RemoteInterpreterService.Client client = null;
try {
client = intpProcess.getClient();
} catch (Exception e) {
@ -326,9 +326,12 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
}
} catch (TException e) {
intpProcess.releaseBrokenClient(client);
client = null;
throw new ApplicationException(e);
} finally {
intpProcess.releaseClient(client);
if (client != null) {
intpProcess.releaseClient(client);
}
}
}
}
@ -364,6 +367,28 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
}
}
@Override
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) {
if (applicationEventListener != null) {
applicationEventListener.onLoad(noteId, paragraphId, appId, pkg);
}
}
@Override
public void onStatusChange(String noteId, String paragraphId, String appId, String status) {
if (applicationEventListener != null) {
applicationEventListener.onStatusChange(noteId, paragraphId, appId, status);
}
}
private void appStatusChange(Paragraph paragraph,
String appId,
ApplicationState.Status status) {
ApplicationState app = paragraph.getApplicationState(appId);
app.setStatus(status);
onStatusChange(paragraph.getNote().getId(), paragraph.getId(), appId, status.toString());
}
private ApplicationState getAppState(String noteId, String paragraphId, String appId) {
if (notebook == null) {
return null;
@ -437,4 +462,16 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
public void onParagraphCreate(Paragraph p) {
}
@Override
public void onParagraphStatusChange(Paragraph p, Job.Status status) {
if (status == Job.Status.FINISHED) {
// refresh application
List<ApplicationState> appStates = p.getAllApplicationStates();
for (ApplicationState app : appStates) {
loadAndRun(app.getHeliumPackage(), p);
}
}
}
}

View file

@ -16,6 +16,8 @@
*/
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.helium.HeliumPackage;
/**
* Current state of application
*/
@ -35,12 +37,12 @@ public class ApplicationState {
Status status = Status.UNLOADED;
String id; // unique id for this instance. Similar to note id or paragraph id
String name; // name of app
HeliumPackage pkg;
String output;
public ApplicationState(String id, String name) {
public ApplicationState(String id, HeliumPackage pkg) {
this.id = id;
this.name = name;
this.pkg = pkg;
}
/**
@ -59,19 +61,17 @@ public class ApplicationState {
public boolean equals(Object o) {
String compareName;
if (o instanceof ApplicationState) {
compareName = ((ApplicationState) o).name;
} else if (o instanceof String) {
compareName = (String) o;
return pkg.equals(((ApplicationState) o).getHeliumPackage());
} else if (o instanceof HeliumPackage) {
return pkg.equals((HeliumPackage) o);
} else {
return false;
}
return name.equals(compareName);
}
@Override
public int hashCode() {
return name.hashCode();
return pkg.hashCode();
}
public String getId() {
@ -102,7 +102,7 @@ public class ApplicationState {
}
}
public String getName() {
return name;
public HeliumPackage getHeliumPackage() {
return pkg;
}
}

View file

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
/**
* Binded interpreters for a note
*/
public class Note implements Serializable, JobListener {
public class Note implements Serializable, ParagraphJobListener {
static Logger logger = LoggerFactory.getLogger(Note.class);
private static final long serialVersionUID = 7920699076577612429L;
@ -369,7 +370,7 @@ public class Note implements Serializable, JobListener {
continue;
}
p.setNoteReplLoader(replLoader);
p.setListener(jobListenerFactory.getParagraphJobListener(this));
p.setListener(this);
Interpreter intp = replLoader.get(p.getRequiredReplName());
intp.getScheduler().submit(p);
}
@ -384,7 +385,7 @@ public class Note implements Serializable, JobListener {
public void run(String paragraphId) {
Paragraph p = getParagraph(paragraphId);
p.setNoteReplLoader(replLoader);
p.setListener(jobListenerFactory.getParagraphJobListener(this));
p.setListener(this);
Interpreter intp = replLoader.get(p.getRequiredReplName());
if (intp == null) {
throw new InterpreterException("Interpreter " + p.getRequiredReplName() + " not found");
@ -517,14 +518,45 @@ public class Note implements Serializable, JobListener {
@Override
public void beforeStatusChange(Job job, Status before, Status after) {
if (jobListenerFactory != null) {
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
listener.beforeStatusChange(job, before, after);
}
}
@Override
public void afterStatusChange(Job job, Status before, Status after) {
if (jobListenerFactory != null) {
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
listener.afterStatusChange(job, before, after);
}
noteEventListener.onParagraphStatusChange((Paragraph) job, after);
}
@Override
public void onProgressUpdate(Job job, int progress) {}
public void onProgressUpdate(Job job, int progress) {
if (jobListenerFactory != null) {
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
listener.onProgressUpdate(job, progress);
}
}
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
if (jobListenerFactory != null) {
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
listener.onOutputAppend(paragraph, out, output);
}
}
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
if (jobListenerFactory != null) {
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
listener.onOutputUpdate(paragraph, out, output);
}
}
@ -535,4 +567,5 @@ public class Note implements Serializable, JobListener {
public void setNoteEventListener(NoteEventListener noteEventListener) {
this.noteEventListener = noteEventListener;
}
}

View file

@ -1,9 +1,12 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.scheduler.Job;
/**
* NoteEventListener
*/
public interface NoteEventListener {
public void onParagraphRemove(Paragraph p);
public void onParagraphCreate(Paragraph p);
public void onParagraphStatusChange(Paragraph p, Job.Status status);
}

View file

@ -41,6 +41,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.quartz.CronScheduleBuilder;
@ -646,4 +647,11 @@ public class Notebook implements NoteEventListener {
listener.onParagraphCreate(p);
}
}
@Override
public void onParagraphStatusChange(Paragraph p, Job.Status status) {
for (NotebookEventListener listener : notebookEventListeners) {
listener.onParagraphStatusChange(p, status);
}
}
}

View file

@ -408,13 +408,13 @@ public class Paragraph extends Job implements Serializable, Cloneable {
public ApplicationState createOrGetApplicationState(HeliumPackage pkg) {
synchronized (apps) {
for (ApplicationState as : apps) {
if (as.getName().equals(pkg.getName())) {
if (as.equals(pkg)) {
return as;
}
}
String appId = getApplicationId(pkg);
ApplicationState appState = new ApplicationState(appId, pkg.getName());
ApplicationState appState = new ApplicationState(appId, pkg);
apps.add(appState);
return appState;
}

View file

@ -36,6 +36,7 @@ import org.apache.commons.vfs2.Selectors;
import org.apache.commons.vfs2.VFS;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.notebook.ApplicationState;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.Paragraph;
@ -169,6 +170,15 @@ public class VFSNotebookRepo implements NotebookRepo {
if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
p.setStatus(Status.ABORT);
}
List<ApplicationState> appStates = p.getAllApplicationStates();
if (appStates != null) {
for (ApplicationState app : appStates) {
if (app.getStatus() != ApplicationState.Status.ERROR) {
app.setStatus(ApplicationState.Status.UNLOADED);
}
}
}
}
return note;

View file

@ -659,6 +659,10 @@ public class NotebookTest implements JobListenerFactory{
public void onParagraphCreate(Paragraph p) {
onParagraphCreate.incrementAndGet();
}
@Override
public void onParagraphStatusChange(Paragraph p, Status status) {
}
});
Note note1 = notebook.createNote();