Take care output streaming

This commit is contained in:
Lee moon soo 2016-11-19 17:27:17 -08:00
parent 57d6b2f647
commit 804768dfd9
18 changed files with 231 additions and 166 deletions

View file

@ -16,12 +16,17 @@
*/
package org.apache.zeppelin.helium;
import org.apache.zeppelin.interpreter.InterpreterResult;
/**
* Event from HeliumApplication running on remote interpreter process
*/
public interface ApplicationEventListener {
public void onOutputAppend(String noteId, String paragraphId, String appId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output);
public void onOutputAppend(
String noteId, String paragraphId, int index, String appId, String output);
public void onOutputUpdated(
String noteId, String paragraphId, int index, String appId,
InterpreterResult.Type type, String output);
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg);
public void onStatusChange(String noteId, String paragraphId, String appId, String status);
}

View file

@ -96,7 +96,9 @@ public class InterpreterResultMessageOutput extends OutputStream {
firstWrite = false;
}
flush();
if (isAppendSupported()) {
flush(true);
}
}
}
}
@ -200,22 +202,34 @@ public class InterpreterResultMessageOutput extends OutputStream {
return new InterpreterResultMessage(type, new String(toByteArray()));
}
public void flush() throws IOException {
private void flush(boolean append) throws IOException {
synchronized (outList) {
buffer.flush();
byte[] bytes = buffer.toByteArray();
if (bytes != null && bytes.length > 0) {
outList.add(bytes);
if (type == InterpreterResult.Type.TEXT) {
if (append) {
if (flushListener != null) {
flushListener.onAppend(this, bytes);
}
} else {
if (flushListener != null) {
flushListener.onUpdate(this);
}
}
}
buffer.reset();
}
}
public void flush() throws IOException {
flush(isAppendSupported());
}
public boolean isAppendSupported() {
return type == InterpreterResult.Type.TEXT;
}
private void copyStream(InputStream in, OutputStream out) throws IOException {
int bufferSize = 8192;
byte[] buffer = new byte[bufferSize];

View file

@ -157,8 +157,7 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
eventClient.onInterpreterOutputAppend(noteId, paragraphId,
index, out.getType(), new String(line));
eventClient.onInterpreterOutputAppend(noteId, paragraphId, index, new String(line));
}
@Override

View file

@ -83,8 +83,7 @@ public class ZeppelinDevServer extends
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
eventClient.onInterpreterOutputAppend(noteId, paragraphId,
index, out.getType(), new String(line));
eventClient.onInterpreterOutputAppend(noteId, paragraphId, index, new String(line));
}
@Override

View file

@ -25,11 +25,13 @@ public class AppendOutputBuffer {
private String noteId;
private String paragraphId;
private int index;
private String data;
public AppendOutputBuffer(String noteId, String paragraphId, String data) {
public AppendOutputBuffer(String noteId, String paragraphId, int index, String data) {
this.noteId = noteId;
this.paragraphId = paragraphId;
this.index = index;
this.data = data;
}
@ -41,6 +43,10 @@ public class AppendOutputBuffer {
return paragraphId;
}
public int getIndex() {
return index;
}
public String getData() {
return data;
}

View file

@ -51,7 +51,7 @@ public class AppendOutputRunner implements Runnable {
@Override
public void run() {
Map<String, Map<String, StringBuilder> > noteMap = new HashMap<>();
Map<String, StringBuilder> stringBufferMap = new HashMap<>();
List<AppendOutputBuffer> list = new LinkedList<>();
/* "drainTo" method does not wait for any element
@ -73,15 +73,14 @@ public class AppendOutputRunner implements Runnable {
for (AppendOutputBuffer buffer: list) {
String noteId = buffer.getNoteId();
String paragraphId = buffer.getParagraphId();
int index = buffer.getIndex();
String stringBufferKey = noteId + ":" + paragraphId + ":" + index;
Map<String, StringBuilder> paragraphMap = (noteMap.containsKey(noteId)) ?
noteMap.get(noteId) : new HashMap<String, StringBuilder>();
StringBuilder builder = paragraphMap.containsKey(paragraphId) ?
paragraphMap.get(paragraphId) : new StringBuilder();
StringBuilder builder = stringBufferMap.containsKey(stringBufferKey) ?
stringBufferMap.get(stringBufferKey) : new StringBuilder();
builder.append(buffer.getData());
paragraphMap.put(paragraphId, builder);
noteMap.put(noteId, paragraphMap);
stringBufferMap.put(stringBufferKey, builder);
}
Long processingTime = System.currentTimeMillis() - processingStartTime;
@ -94,12 +93,11 @@ public class AppendOutputRunner implements Runnable {
}
Long sizeProcessed = new Long(0);
for (String noteId: noteMap.keySet()) {
for (String paragraphId: noteMap.get(noteId).keySet()) {
String data = noteMap.get(noteId).get(paragraphId).toString();
sizeProcessed += data.length();
listener.onOutputAppend(noteId, paragraphId, data);
}
for (String stringBufferKey : stringBufferMap.keySet()) {
StringBuilder buffer = stringBufferMap.get(stringBufferKey);
sizeProcessed += buffer.length();
String[] keys = stringBufferKey.split(":");
listener.onOutputAppend(keys[0], keys[1], Integer.parseInt(keys[2]), buffer.toString());
}
if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
@ -111,8 +109,8 @@ public class AppendOutputRunner implements Runnable {
}
}
public void appendBuffer(String noteId, String paragraphId, String outputToAppend) {
queue.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend));
public void appendBuffer(String noteId, String paragraphId, int index, String outputToAppend) {
queue.offer(new AppendOutputBuffer(noteId, paragraphId, index, outputToAppend));
}
}

View file

@ -215,13 +215,11 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
}
public void onInterpreterOutputAppend(
String noteId, String paragraphId, int outputIndex,
InterpreterResult.Type type, String output) {
Map<String, Object> appendOutput = new HashMap<>();
String noteId, String paragraphId, int outputIndex, String output) {
Map<String, String> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("index", outputIndex);
appendOutput.put("type", type.name());
appendOutput.put("index", Integer.toString(outputIndex));
appendOutput.put("data", output);
sendEvent(new RemoteInterpreterEvent(
@ -232,10 +230,10 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
public void onInterpreterOutputUpdate(
String noteId, String paragraphId, int outputIndex,
InterpreterResult.Type type, String output) {
Map<String, Object> appendOutput = new HashMap<>();
Map<String, String> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("index", outputIndex);
appendOutput.put("index", Integer.toString(outputIndex));
appendOutput.put("type", type.name());
appendOutput.put("data", output);
@ -275,10 +273,12 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
}
}
public void onAppOutputAppend(String noteId, String paragraphId, String appId, String output) {
Map<String, String> appendOutput = new HashMap<>();
public void onAppOutputAppend(
String noteId, String paragraphId, int index, String appId, String output) {
Map<String, Object> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("index", index);
appendOutput.put("appId", appId);
appendOutput.put("data", output);
@ -288,11 +288,15 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
}
public void onAppOutputUpdate(String noteId, String paragraphId, String appId, String output) {
Map<String, String> appendOutput = new HashMap<>();
public void onAppOutputUpdate(
String noteId, String paragraphId, int index, String appId,
InterpreterResult.Type type, String output) {
Map<String, Object> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("index", index);
appendOutput.put("appId", appId);
appendOutput.put("type", type);
appendOutput.put("data", output);
sendEvent(new RemoteInterpreterEvent(

View file

@ -159,38 +159,36 @@ public class RemoteInterpreterEventPoller extends Thread {
sendResourceResponseGet(resourceId, o);
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
// on output append
Map<String, Object> outputAppend = gson.fromJson(
Map<String, String> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
String noteId = (String) outputAppend.get("noteId");
String paragraphId = (String) outputAppend.get("paragraphId");
int index = (int) outputAppend.get("index");
InterpreterResult.Type type =
InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
int index = Integer.parseInt(outputAppend.get("index"));
String outputToAppend = (String) outputAppend.get("data");
String appId = (String) outputAppend.get("appId");
if (appId == null) {
runner.appendBuffer(noteId, paragraphId, outputToAppend);
runner.appendBuffer(noteId, paragraphId, index, outputToAppend);
} else {
appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend);
appListener.onOutputAppend(noteId, paragraphId, index, appId, outputToAppend);
}
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
// on output update
Map<String, Object> outputAppend = gson.fromJson(
Map<String, String> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
String noteId = (String) outputAppend.get("noteId");
String paragraphId = (String) outputAppend.get("paragraphId");
int index = (int) outputAppend.get("index");
int index = Integer.parseInt(outputAppend.get("index"));
InterpreterResult.Type type =
InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
String outputToUpdate = (String) outputAppend.get("data");
String appId = (String) outputAppend.get("appId");
if (appId == null) {
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
listener.onOutputUpdated(noteId, paragraphId, index, type, outputToUpdate);
} else {
appListener.onOutputUpdated(noteId, paragraphId, appId, outputToUpdate);
appListener.onOutputUpdated(noteId, paragraphId, index, appId, type, outputToUpdate);
}
} else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) {
// on output update

View file

@ -16,13 +16,16 @@
*/
package org.apache.zeppelin.interpreter.remote;
import org.apache.zeppelin.interpreter.InterpreterResult;
import java.util.Map;
/**
* Event from remoteInterpreterProcess
*/
public interface RemoteInterpreterProcessListener {
public void onOutputAppend(String noteId, String paragraphId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String output);
public void onOutputAppend(String noteId, String paragraphId, int index, String output);
public void onOutputUpdated(
String noteId, String paragraphId, int index, InterpreterResult.Type type, String output);
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
}

View file

@ -573,7 +573,7 @@ public class RemoteInterpreterServer
String output = new String(line);
logger.debug("Output Append: {}", output);
eventClient.onInterpreterOutputAppend(
noteId, paragraphId, index, out.getType(), output);
noteId, paragraphId, index, output);
}
@Override

View file

@ -20,6 +20,7 @@ package org.apache.zeppelin.interpreter.remote;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@ -67,8 +68,8 @@ public class AppendOutputRunnerTest {
String[][] buffer = {{"note", "para", "data\n"}};
loopForCompletingEvents(listener, 1, buffer);
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
verify(listener, times(1)).onOutputAppend("note", "para", "data\n");
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n");
}
@Test
@ -83,8 +84,8 @@ public class AppendOutputRunnerTest {
};
loopForCompletingEvents(listener, 1, buffer);
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n");
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
}
@Test
@ -102,11 +103,11 @@ public class AppendOutputRunnerTest {
};
loopForCompletingEvents(listener, 4, buffer);
verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), any(String.class));
verify(listener, times(1)).onOutputAppend(note1, para1, "data1\n");
verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n");
verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n");
verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n");
verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n");
verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n");
verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n");
verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n");
}
@Test
@ -125,7 +126,7 @@ public class AppendOutputRunnerTest {
* calls, 30-40 Web-socket calls are made. Keeping
* the unit-test to a pessimistic 100 web-socket calls.
*/
verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class));
verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
}
@Test
@ -136,7 +137,7 @@ public class AppendOutputRunnerTest {
int numEvents = 100000;
for (int i=0; i<numEvents; i++) {
runner.appendBuffer("noteId", "paraId", data);
runner.appendBuffer("noteId", "paraId", 0, data);
}
TestAppender appender = new TestAppender();
@ -178,7 +179,7 @@ public class AppendOutputRunnerTest {
String noteId = "noteId";
String paraId = "paraId";
for (int i=0; i<NUM_EVENTS; i++) {
runner.appendBuffer(noteId, paraId, "data\n");
runner.appendBuffer(noteId, paraId, 0, "data\n");
}
}
}
@ -212,7 +213,7 @@ public class AppendOutputRunnerTest {
numInvocations += 1;
return null;
}
}).when(listener).onOutputAppend(any(String.class), any(String.class), any(String.class));
}).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
}
private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
@ -221,7 +222,7 @@ public class AppendOutputRunnerTest {
prepareInvocationCounts(listener);
AppendOutputRunner runner = new AppendOutputRunner(listener);
for (String[] bufferElement: buffer) {
runner.appendBuffer(bufferElement[0], bufferElement[1], bufferElement[2]);
runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
}
future = service.scheduleWithFixedDelay(runner, 0,
AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);

View file

@ -149,12 +149,12 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
}
@Override
public void onOutputAppend(String noteId, String paragraphId, String output) {
public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
}
@Override
public void onOutputUpdated(String noteId, String paragraphId, String output) {
public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
}

View file

@ -29,12 +29,9 @@ import java.util.Map;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
@ -295,12 +292,12 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
}
@Override
public void onOutputAppend(String noteId, String paragraphId, String output) {
public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
}
@Override
public void onOutputUpdated(String noteId, String paragraphId, String output) {
public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
}

View file

@ -1402,11 +1402,12 @@ public class NotebookServer extends WebSocketServlet implements
* @param output output to append
*/
@Override
public void onOutputAppend(String noteId, String paragraphId, String output) {
public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("data", output);
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("index", index)
.put("data", output);
broadcast(noteId, msg);
}
@ -1417,11 +1418,14 @@ public class NotebookServer extends WebSocketServlet implements
* @param output output to update (replace)
*/
@Override
public void onOutputUpdated(String noteId, String paragraphId, String output) {
public void onOutputUpdated(
String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("data", output);
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("index", index)
.put("type", type)
.put("data", output);
broadcast(noteId, msg);
}
@ -1433,10 +1437,12 @@ public class NotebookServer extends WebSocketServlet implements
* @param output
*/
@Override
public void onOutputAppend(String noteId, String paragraphId, String appId, String output) {
public void onOutputAppend(
String noteId, String paragraphId, int index, String appId, String output) {
Message msg = new Message(OP.APP_APPEND_OUTPUT)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("index", index)
.put("appId", appId)
.put("data", output);
broadcast(noteId, msg);
@ -1450,10 +1456,14 @@ public class NotebookServer extends WebSocketServlet implements
* @param output
*/
@Override
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output) {
public void onOutputUpdated(
String noteId, String paragraphId, int index, String appId, InterpreterResult.Type type,
String output) {
Message msg = new Message(OP.APP_UPDATE_OUTPUT)
.put("noteId", noteId)
.put("paragraphId", paragraphId)
.put("index", index)
.put("type", type)
.put("appId", appId)
.put("data", output);
broadcast(noteId, msg);

View file

@ -141,6 +141,30 @@
}
};
$scope.$on('updateParagraphOutput', function(event, data) {
if ($scope.paragraph.id === data.paragraphId) {
if (!$scope.paragraph.result.msg) {
$scope.paragraph.result.msg = [];
}
var update = ($scope.paragraph.result.msg[data.index]) ? true : false;
$scope.paragraph.result.msg[data.index] = {
data : data.data,
type : data.type
};
if (update) {
$rootScope.$broadcast(
'updateResult',
$scope.paragraph.result.msg[data.index],
$scope.paragraph.config.result[data.index],
$scope.paragraph,
data.index);
}
}
});
$scope.getIframeDimensions = function() {
if ($scope.asIframe) {
var paragraphid = '#' + $routeParams.paragraphId + '_container';
@ -769,19 +793,6 @@
}
};
$scope.getBase64ImageSrc = function(base64Data) {
return 'data:image/png;base64,' + base64Data;
};
$scope.getGraphMode = function(paragraph) {
var pdata = (paragraph) ? paragraph : $scope.paragraph;
if (pdata.config.graph && pdata.config.graph.mode) {
return pdata.config.graph.mode;
} else {
return 'table';
}
};
$scope.parseTableCell = function(cell) {
if (!isNaN(cell)) {
if (cell.length === 0 || Number(cell) > Number.MAX_SAFE_INTEGER || Number(cell) < Number.MIN_SAFE_INTEGER) {
@ -1175,7 +1186,7 @@
}
/** broadcast update to result controller **/
if (data.paragraph.result) {
if (data.paragraph.result && data.paragraph.result.msg) {
for (var i in data.paragraph.result.msg) {
var newResult = data.paragraph.result.msg[i];
var oldResult = $scope.paragraph.result.msg[i];
@ -1228,31 +1239,6 @@
});
$scope.$on('appendParagraphOutput', function(event, data) {
/* It has been observed that append events
* can be errorneously called even if paragraph
* execution has ended, and in that case, no append
* should be made. Also, it was observed that between PENDING
* and RUNNING states, append-events can be called and we can't
* miss those, else during the length of paragraph run, few
* initial output line/s will be missing.
*/
if ($scope.paragraph.id === data.paragraphId &&
($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING')) {
if ($scope.flushStreamingOutput) {
$scope.clearTextOutput();
$scope.flushStreamingOutput = false;
}
$scope.appendTextOutput(data.data);
}
});
$scope.$on('updateParagraphOutput', function(event, data) {
if ($scope.paragraph.id === data.paragraphId) {
$scope.clearTextOutput();
$scope.appendTextOutput(data.data);
}
});
$scope.$on('updateProgress', function(event, data) {
if (data.id === $scope.paragraph.id) {

View file

@ -16,7 +16,7 @@ limitations under the License.
ng-if="!config.helium.activeApp"
style="padding-bottom: 5px;"
resize='{"allowresize": "{{!asIframe && !viewOnly}}", "graphType": "{{type}}"}'
resizable on-resize="resizeParagraph(width, height);">
resizable on-resize="resize(width, height);">
<div ng-include src="'app/notebook/paragraph/result/result-graph.html'"></div>
<div id="{{id}}_comment"

View file

@ -160,12 +160,36 @@
renderResult($scope.type, true);
});
$scope.$on('appendParagraphOutput', function(event, data) {
/* It has been observed that append events
* can be errorneously called even if paragraph
* execution has ended, and in that case, no append
* should be made. Also, it was observed that between PENDING
* and RUNNING states, append-events can be called and we can't
* miss those, else during the length of paragraph run, few
* initial output line/s will be missing.
*/
if (paragraph.id === data.paragraphId &&
resultIndex === data.index &&
(paragraph.status === 'RUNNING' || paragraph.status === 'PENDING')) {
appendTextOutput(data.data);
}
});
$scope.$on('updateParagraphOutput', function(event, data) {
if (paragraph.id === data.paragraphId &&
resultIndex === data.index) {
clearTextOutput();
appendTextOutput(data.data);
}
});
var updateData = function(result, config, paragraphRef, index) {
data = result.data;
paragraph = paragraphRef;
resultIndex = parseInt(index);
$scope.id = paragraph.id + "_" + index;
$scope.id = paragraph.id + '_' + index;
$scope.type = result.type;
config = config ? config : {};
@ -201,7 +225,7 @@
if (!config.graph.scatter) {
config.graph.scatter = {};
}
$scope.graphMode = config.graph.mode;
$scope.config = angular.copy(config);
@ -211,6 +235,8 @@
tableData.loadParagraphResult({type: $scope.type, msg: data});
$scope.tableDataColumns = tableData.columns;
$scope.tableDataComment = tableData.comment;
selectDefaultColsForGraphOption();
}
};
@ -275,21 +301,24 @@
$timeout(retryRenderer);
};
var textRendererInitialized = false;
var renderText = function() {
var retryRenderer = function() {
var textEl = angular.element('#p' + $scope.id + '_text');
if (textEl.length) {
// clear all lines before render
clearTextOutput();
textRendererInitialized = true;
if (data) {
appendTextOutput(data);
} else {
flushAppendQueue();
}
angular.element('#p' + $scope.id + '_text').bind('mousewheel', function(e) {
$scope.keepScrollDown = false;
});
$scope.flushStreamingOutput = true;
});
} else {
$timeout(retryRenderer, 10);
}
@ -304,17 +333,30 @@
}
};
var appendTextOutput = function(msg) {
var textEl = angular.element('#p' + $scope.id + '_text');
if (textEl.length) {
var lines = msg.split('\n');
for (var i = 0; i < lines.length; i++) {
textEl.append(angular.element('<div></div>').text(lines[i]));
}
var textAppendQueueBeforeInitialize = [];
var flushAppendQueue = function() {
while (textAppendQueueBeforeInitialize.length > 0) {
appendTextOutput(textAppendQueueBeforeInitialize.pop());
}
if ($scope.keepScrollDown) {
var doc = angular.element('#p' + $scope.id + '_text');
doc[0].scrollTop = doc[0].scrollHeight;
};
var appendTextOutput = function(msg) {
if (!textRendererInitialized) {
textAppendQueueBeforeInitialize.push(msg);
} else {
flushAppendQueue();
var textEl = angular.element('#p' + $scope.id + '_text');
if (textEl.length) {
var lines = msg.split('\n');
for (var i = 0; i < lines.length; i++) {
textEl.append(angular.element('<div></div>').text(lines[i]));
}
}
if ($scope.keepScrollDown) {
var doc = angular.element('#p' + $scope.id + '_text');
doc[0].scrollTop = doc[0].scrollHeight;
}
}
};
@ -526,33 +568,33 @@
}
};
unique($scope.paragraph.config.graph.keys);
removeUnknown($scope.paragraph.config.graph.keys);
unique($scope.config.graph.keys);
removeUnknown($scope.config.graph.keys);
removeUnknown($scope.paragraph.config.graph.values);
removeUnknown($scope.config.graph.values);
unique($scope.paragraph.config.graph.groups);
removeUnknown($scope.paragraph.config.graph.groups);
unique($scope.config.graph.groups);
removeUnknown($scope.config.graph.groups);
removeUnknownFromFields($scope.paragraph.config.graph.scatter);
removeUnknownFromFields($scope.config.graph.scatter);
};
/* select default key and value if there're none selected */
var selectDefaultColsForGraphOption = function() {
if ($scope.paragraph.config.graph.keys.length === 0 && tableData.columns.length > 0) {
$scope.paragraph.config.graph.keys.push(tableData.columns[0]);
if ($scope.config.graph.keys.length === 0 && tableData.columns.length > 0) {
$scope.config.graph.keys.push(tableData.columns[0]);
}
if ($scope.paragraph.config.graph.values.length === 0 && tableData.columns.length > 1) {
$scope.paragraph.config.graph.values.push(tableData.columns[1]);
if ($scope.config.graph.values.length === 0 && tableData.columns.length > 1) {
$scope.config.graph.values.push(tableData.columns[1]);
}
if (!$scope.paragraph.config.graph.scatter.xAxis && !$scope.paragraph.config.graph.scatter.yAxis) {
if (!$scope.config.graph.scatter.xAxis && !$scope.config.graph.scatter.yAxis) {
if (tableData.columns.length > 1) {
$scope.paragraph.config.graph.scatter.xAxis = tableData.columns[0];
$scope.paragraph.config.graph.scatter.yAxis = tableData.columns[1];
$scope.config.graph.scatter.xAxis = tableData.columns[0];
$scope.config.graph.scatter.yAxis = tableData.columns[1];
} else if (tableData.columns.length === 1) {
$scope.paragraph.config.graph.scatter.xAxis = tableData.columns[0];
$scope.config.graph.scatter.xAxis = tableData.columns[0];
}
}
};
@ -566,21 +608,20 @@
}
};
$scope.resizeParagraph = function(width, height) {
$scope.changeColWidth(width);
$scope.resize = function(width, height) {
$timeout(function() {
autoAdjustEditorHeight($scope.paragraph.id + '_editor');
$scope.changeHeight(height);
changeHeight(width, height);
}, 200);
};
$scope.changeHeight = function(height) {
var newParams = angular.copy($scope.paragraph.settings.params);
var newConfig = angular.copy($scope.paragraph.config);
var changeHeight = function(width, height) {
var newParams = angular.copy(paragraph.settings.params);
var newConfig = angular.copy($scope.config);
newConfig.graph.height = height;
paragraph.config.colWidth = width;
commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams);
commitParagraphResult(paragraph.title, paragraph.text, newConfig, newParams);
};
$scope.exportToDSV = function(delimiter) {
@ -610,5 +651,9 @@
}
saveAsService.saveAs(dsv, 'data', extension);
};
$scope.getBase64ImageSrc = function(base64Data) {
return 'data:image/png;base64,' + base64Data;
};
};
})();

View file

@ -18,10 +18,7 @@ package org.apache.zeppelin.helium;
import com.google.gson.Gson;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterInfo;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
@ -337,7 +334,8 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
}
@Override
public void onOutputAppend(String noteId, String paragraphId, String appId, String output) {
public void onOutputAppend(
String noteId, String paragraphId, int index, String appId, String output) {
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
if (appToUpdate != null) {
@ -347,12 +345,14 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
}
if (applicationEventListener != null) {
applicationEventListener.onOutputAppend(noteId, paragraphId, appId, output);
applicationEventListener.onOutputAppend(noteId, paragraphId, index, appId, output);
}
}
@Override
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output) {
public void onOutputUpdated(
String noteId, String paragraphId, int index, String appId,
InterpreterResult.Type type, String output) {
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
if (appToUpdate != null) {
@ -362,7 +362,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
}
if (applicationEventListener != null) {
applicationEventListener.onOutputUpdated(noteId, paragraphId, appId, output);
applicationEventListener.onOutputUpdated(noteId, paragraphId, index, appId, type, output);
}
}