mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Take care output streaming
This commit is contained in:
parent
57d6b2f647
commit
804768dfd9
18 changed files with 231 additions and 166 deletions
|
|
@ -16,12 +16,17 @@
|
|||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
|
||||
/**
|
||||
* Event from HeliumApplication running on remote interpreter process
|
||||
*/
|
||||
public interface ApplicationEventListener {
|
||||
public void onOutputAppend(String noteId, String paragraphId, String appId, String output);
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output);
|
||||
public void onOutputAppend(
|
||||
String noteId, String paragraphId, int index, String appId, String output);
|
||||
public void onOutputUpdated(
|
||||
String noteId, String paragraphId, int index, String appId,
|
||||
InterpreterResult.Type type, String output);
|
||||
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg);
|
||||
public void onStatusChange(String noteId, String paragraphId, String appId, String status);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -96,7 +96,9 @@ public class InterpreterResultMessageOutput extends OutputStream {
|
|||
firstWrite = false;
|
||||
}
|
||||
|
||||
flush();
|
||||
if (isAppendSupported()) {
|
||||
flush(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -200,22 +202,34 @@ public class InterpreterResultMessageOutput extends OutputStream {
|
|||
return new InterpreterResultMessage(type, new String(toByteArray()));
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
private void flush(boolean append) throws IOException {
|
||||
synchronized (outList) {
|
||||
buffer.flush();
|
||||
byte[] bytes = buffer.toByteArray();
|
||||
if (bytes != null && bytes.length > 0) {
|
||||
outList.add(bytes);
|
||||
if (type == InterpreterResult.Type.TEXT) {
|
||||
if (append) {
|
||||
if (flushListener != null) {
|
||||
flushListener.onAppend(this, bytes);
|
||||
}
|
||||
} else {
|
||||
if (flushListener != null) {
|
||||
flushListener.onUpdate(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
buffer.reset();
|
||||
}
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
flush(isAppendSupported());
|
||||
}
|
||||
|
||||
public boolean isAppendSupported() {
|
||||
return type == InterpreterResult.Type.TEXT;
|
||||
}
|
||||
|
||||
private void copyStream(InputStream in, OutputStream out) throws IOException {
|
||||
int bufferSize = 8192;
|
||||
byte[] buffer = new byte[bufferSize];
|
||||
|
|
|
|||
|
|
@ -157,8 +157,7 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
|
|||
|
||||
@Override
|
||||
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
|
||||
eventClient.onInterpreterOutputAppend(noteId, paragraphId,
|
||||
index, out.getType(), new String(line));
|
||||
eventClient.onInterpreterOutputAppend(noteId, paragraphId, index, new String(line));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -83,8 +83,7 @@ public class ZeppelinDevServer extends
|
|||
|
||||
@Override
|
||||
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
|
||||
eventClient.onInterpreterOutputAppend(noteId, paragraphId,
|
||||
index, out.getType(), new String(line));
|
||||
eventClient.onInterpreterOutputAppend(noteId, paragraphId, index, new String(line));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -25,11 +25,13 @@ public class AppendOutputBuffer {
|
|||
|
||||
private String noteId;
|
||||
private String paragraphId;
|
||||
private int index;
|
||||
private String data;
|
||||
|
||||
public AppendOutputBuffer(String noteId, String paragraphId, String data) {
|
||||
public AppendOutputBuffer(String noteId, String paragraphId, int index, String data) {
|
||||
this.noteId = noteId;
|
||||
this.paragraphId = paragraphId;
|
||||
this.index = index;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
|
|
@ -41,6 +43,10 @@ public class AppendOutputBuffer {
|
|||
return paragraphId;
|
||||
}
|
||||
|
||||
public int getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public String getData() {
|
||||
return data;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ public class AppendOutputRunner implements Runnable {
|
|||
@Override
|
||||
public void run() {
|
||||
|
||||
Map<String, Map<String, StringBuilder> > noteMap = new HashMap<>();
|
||||
Map<String, StringBuilder> stringBufferMap = new HashMap<>();
|
||||
List<AppendOutputBuffer> list = new LinkedList<>();
|
||||
|
||||
/* "drainTo" method does not wait for any element
|
||||
|
|
@ -73,15 +73,14 @@ public class AppendOutputRunner implements Runnable {
|
|||
for (AppendOutputBuffer buffer: list) {
|
||||
String noteId = buffer.getNoteId();
|
||||
String paragraphId = buffer.getParagraphId();
|
||||
int index = buffer.getIndex();
|
||||
String stringBufferKey = noteId + ":" + paragraphId + ":" + index;
|
||||
|
||||
Map<String, StringBuilder> paragraphMap = (noteMap.containsKey(noteId)) ?
|
||||
noteMap.get(noteId) : new HashMap<String, StringBuilder>();
|
||||
StringBuilder builder = paragraphMap.containsKey(paragraphId) ?
|
||||
paragraphMap.get(paragraphId) : new StringBuilder();
|
||||
StringBuilder builder = stringBufferMap.containsKey(stringBufferKey) ?
|
||||
stringBufferMap.get(stringBufferKey) : new StringBuilder();
|
||||
|
||||
builder.append(buffer.getData());
|
||||
paragraphMap.put(paragraphId, builder);
|
||||
noteMap.put(noteId, paragraphMap);
|
||||
stringBufferMap.put(stringBufferKey, builder);
|
||||
}
|
||||
Long processingTime = System.currentTimeMillis() - processingStartTime;
|
||||
|
||||
|
|
@ -94,12 +93,11 @@ public class AppendOutputRunner implements Runnable {
|
|||
}
|
||||
|
||||
Long sizeProcessed = new Long(0);
|
||||
for (String noteId: noteMap.keySet()) {
|
||||
for (String paragraphId: noteMap.get(noteId).keySet()) {
|
||||
String data = noteMap.get(noteId).get(paragraphId).toString();
|
||||
sizeProcessed += data.length();
|
||||
listener.onOutputAppend(noteId, paragraphId, data);
|
||||
}
|
||||
for (String stringBufferKey : stringBufferMap.keySet()) {
|
||||
StringBuilder buffer = stringBufferMap.get(stringBufferKey);
|
||||
sizeProcessed += buffer.length();
|
||||
String[] keys = stringBufferKey.split(":");
|
||||
listener.onOutputAppend(keys[0], keys[1], Integer.parseInt(keys[2]), buffer.toString());
|
||||
}
|
||||
|
||||
if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
|
||||
|
|
@ -111,8 +109,8 @@ public class AppendOutputRunner implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
public void appendBuffer(String noteId, String paragraphId, String outputToAppend) {
|
||||
queue.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend));
|
||||
public void appendBuffer(String noteId, String paragraphId, int index, String outputToAppend) {
|
||||
queue.offer(new AppendOutputBuffer(noteId, paragraphId, index, outputToAppend));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -215,13 +215,11 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
}
|
||||
|
||||
public void onInterpreterOutputAppend(
|
||||
String noteId, String paragraphId, int outputIndex,
|
||||
InterpreterResult.Type type, String output) {
|
||||
Map<String, Object> appendOutput = new HashMap<>();
|
||||
String noteId, String paragraphId, int outputIndex, String output) {
|
||||
Map<String, String> appendOutput = new HashMap<>();
|
||||
appendOutput.put("noteId", noteId);
|
||||
appendOutput.put("paragraphId", paragraphId);
|
||||
appendOutput.put("index", outputIndex);
|
||||
appendOutput.put("type", type.name());
|
||||
appendOutput.put("index", Integer.toString(outputIndex));
|
||||
appendOutput.put("data", output);
|
||||
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
|
|
@ -232,10 +230,10 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
public void onInterpreterOutputUpdate(
|
||||
String noteId, String paragraphId, int outputIndex,
|
||||
InterpreterResult.Type type, String output) {
|
||||
Map<String, Object> appendOutput = new HashMap<>();
|
||||
Map<String, String> appendOutput = new HashMap<>();
|
||||
appendOutput.put("noteId", noteId);
|
||||
appendOutput.put("paragraphId", paragraphId);
|
||||
appendOutput.put("index", outputIndex);
|
||||
appendOutput.put("index", Integer.toString(outputIndex));
|
||||
appendOutput.put("type", type.name());
|
||||
appendOutput.put("data", output);
|
||||
|
||||
|
|
@ -275,10 +273,12 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
}
|
||||
}
|
||||
|
||||
public void onAppOutputAppend(String noteId, String paragraphId, String appId, String output) {
|
||||
Map<String, String> appendOutput = new HashMap<>();
|
||||
public void onAppOutputAppend(
|
||||
String noteId, String paragraphId, int index, String appId, String output) {
|
||||
Map<String, Object> appendOutput = new HashMap<>();
|
||||
appendOutput.put("noteId", noteId);
|
||||
appendOutput.put("paragraphId", paragraphId);
|
||||
appendOutput.put("index", index);
|
||||
appendOutput.put("appId", appId);
|
||||
appendOutput.put("data", output);
|
||||
|
||||
|
|
@ -288,11 +288,15 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
}
|
||||
|
||||
|
||||
public void onAppOutputUpdate(String noteId, String paragraphId, String appId, String output) {
|
||||
Map<String, String> appendOutput = new HashMap<>();
|
||||
public void onAppOutputUpdate(
|
||||
String noteId, String paragraphId, int index, String appId,
|
||||
InterpreterResult.Type type, String output) {
|
||||
Map<String, Object> appendOutput = new HashMap<>();
|
||||
appendOutput.put("noteId", noteId);
|
||||
appendOutput.put("paragraphId", paragraphId);
|
||||
appendOutput.put("index", index);
|
||||
appendOutput.put("appId", appId);
|
||||
appendOutput.put("type", type);
|
||||
appendOutput.put("data", output);
|
||||
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
|
|
|
|||
|
|
@ -159,38 +159,36 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
sendResourceResponseGet(resourceId, o);
|
||||
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
|
||||
// on output append
|
||||
Map<String, Object> outputAppend = gson.fromJson(
|
||||
Map<String, String> outputAppend = gson.fromJson(
|
||||
event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
|
||||
String noteId = (String) outputAppend.get("noteId");
|
||||
String paragraphId = (String) outputAppend.get("paragraphId");
|
||||
int index = (int) outputAppend.get("index");
|
||||
InterpreterResult.Type type =
|
||||
InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
|
||||
int index = Integer.parseInt(outputAppend.get("index"));
|
||||
String outputToAppend = (String) outputAppend.get("data");
|
||||
|
||||
String appId = (String) outputAppend.get("appId");
|
||||
|
||||
if (appId == null) {
|
||||
runner.appendBuffer(noteId, paragraphId, outputToAppend);
|
||||
runner.appendBuffer(noteId, paragraphId, index, outputToAppend);
|
||||
} else {
|
||||
appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend);
|
||||
appListener.onOutputAppend(noteId, paragraphId, index, appId, outputToAppend);
|
||||
}
|
||||
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
|
||||
// on output update
|
||||
Map<String, Object> outputAppend = gson.fromJson(
|
||||
Map<String, String> outputAppend = gson.fromJson(
|
||||
event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
|
||||
String noteId = (String) outputAppend.get("noteId");
|
||||
String paragraphId = (String) outputAppend.get("paragraphId");
|
||||
int index = (int) outputAppend.get("index");
|
||||
int index = Integer.parseInt(outputAppend.get("index"));
|
||||
InterpreterResult.Type type =
|
||||
InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
|
||||
String outputToUpdate = (String) outputAppend.get("data");
|
||||
String appId = (String) outputAppend.get("appId");
|
||||
|
||||
if (appId == null) {
|
||||
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
|
||||
listener.onOutputUpdated(noteId, paragraphId, index, type, outputToUpdate);
|
||||
} else {
|
||||
appListener.onOutputUpdated(noteId, paragraphId, appId, outputToUpdate);
|
||||
appListener.onOutputUpdated(noteId, paragraphId, index, appId, type, outputToUpdate);
|
||||
}
|
||||
} else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) {
|
||||
// on output update
|
||||
|
|
|
|||
|
|
@ -16,13 +16,16 @@
|
|||
*/
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Event from remoteInterpreterProcess
|
||||
*/
|
||||
public interface RemoteInterpreterProcessListener {
|
||||
public void onOutputAppend(String noteId, String paragraphId, String output);
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String output);
|
||||
public void onOutputAppend(String noteId, String paragraphId, int index, String output);
|
||||
public void onOutputUpdated(
|
||||
String noteId, String paragraphId, int index, InterpreterResult.Type type, String output);
|
||||
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -573,7 +573,7 @@ public class RemoteInterpreterServer
|
|||
String output = new String(line);
|
||||
logger.debug("Output Append: {}", output);
|
||||
eventClient.onInterpreterOutputAppend(
|
||||
noteId, paragraphId, index, out.getType(), output);
|
||||
noteId, paragraphId, index, output);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.interpreter.remote;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Mockito.atMost;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
|
@ -67,8 +68,8 @@ public class AppendOutputRunnerTest {
|
|||
String[][] buffer = {{"note", "para", "data\n"}};
|
||||
|
||||
loopForCompletingEvents(listener, 1, buffer);
|
||||
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
|
||||
verify(listener, times(1)).onOutputAppend("note", "para", "data\n");
|
||||
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
|
||||
verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -83,8 +84,8 @@ public class AppendOutputRunnerTest {
|
|||
};
|
||||
|
||||
loopForCompletingEvents(listener, 1, buffer);
|
||||
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
|
||||
verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n");
|
||||
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
|
||||
verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -102,11 +103,11 @@ public class AppendOutputRunnerTest {
|
|||
};
|
||||
loopForCompletingEvents(listener, 4, buffer);
|
||||
|
||||
verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), any(String.class));
|
||||
verify(listener, times(1)).onOutputAppend(note1, para1, "data1\n");
|
||||
verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n");
|
||||
verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n");
|
||||
verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n");
|
||||
verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
|
||||
verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n");
|
||||
verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n");
|
||||
verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n");
|
||||
verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -125,7 +126,7 @@ public class AppendOutputRunnerTest {
|
|||
* calls, 30-40 Web-socket calls are made. Keeping
|
||||
* the unit-test to a pessimistic 100 web-socket calls.
|
||||
*/
|
||||
verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class));
|
||||
verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -136,7 +137,7 @@ public class AppendOutputRunnerTest {
|
|||
int numEvents = 100000;
|
||||
|
||||
for (int i=0; i<numEvents; i++) {
|
||||
runner.appendBuffer("noteId", "paraId", data);
|
||||
runner.appendBuffer("noteId", "paraId", 0, data);
|
||||
}
|
||||
|
||||
TestAppender appender = new TestAppender();
|
||||
|
|
@ -178,7 +179,7 @@ public class AppendOutputRunnerTest {
|
|||
String noteId = "noteId";
|
||||
String paraId = "paraId";
|
||||
for (int i=0; i<NUM_EVENTS; i++) {
|
||||
runner.appendBuffer(noteId, paraId, "data\n");
|
||||
runner.appendBuffer(noteId, paraId, 0, "data\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -212,7 +213,7 @@ public class AppendOutputRunnerTest {
|
|||
numInvocations += 1;
|
||||
return null;
|
||||
}
|
||||
}).when(listener).onOutputAppend(any(String.class), any(String.class), any(String.class));
|
||||
}).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
|
||||
}
|
||||
|
||||
private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
|
||||
|
|
@ -221,7 +222,7 @@ public class AppendOutputRunnerTest {
|
|||
prepareInvocationCounts(listener);
|
||||
AppendOutputRunner runner = new AppendOutputRunner(listener);
|
||||
for (String[] bufferElement: buffer) {
|
||||
runner.appendBuffer(bufferElement[0], bufferElement[1], bufferElement[2]);
|
||||
runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
|
||||
}
|
||||
future = service.scheduleWithFixedDelay(runner, 0,
|
||||
AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
|
||||
|
|
|
|||
|
|
@ -149,12 +149,12 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onOutputAppend(String noteId, String paragraphId, String output) {
|
||||
public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String output) {
|
||||
public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,12 +29,9 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
|
||||
|
|
@ -295,12 +292,12 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onOutputAppend(String noteId, String paragraphId, String output) {
|
||||
public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String output) {
|
||||
public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1402,11 +1402,12 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
* @param output output to append
|
||||
*/
|
||||
@Override
|
||||
public void onOutputAppend(String noteId, String paragraphId, String output) {
|
||||
public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
|
||||
Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("data", output);
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("index", index)
|
||||
.put("data", output);
|
||||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
|
|
@ -1417,11 +1418,14 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
* @param output output to update (replace)
|
||||
*/
|
||||
@Override
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String output) {
|
||||
public void onOutputUpdated(
|
||||
String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
|
||||
Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("data", output);
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("index", index)
|
||||
.put("type", type)
|
||||
.put("data", output);
|
||||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
|
|
@ -1433,10 +1437,12 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
* @param output
|
||||
*/
|
||||
@Override
|
||||
public void onOutputAppend(String noteId, String paragraphId, String appId, String output) {
|
||||
public void onOutputAppend(
|
||||
String noteId, String paragraphId, int index, String appId, String output) {
|
||||
Message msg = new Message(OP.APP_APPEND_OUTPUT)
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("index", index)
|
||||
.put("appId", appId)
|
||||
.put("data", output);
|
||||
broadcast(noteId, msg);
|
||||
|
|
@ -1450,10 +1456,14 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
* @param output
|
||||
*/
|
||||
@Override
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output) {
|
||||
public void onOutputUpdated(
|
||||
String noteId, String paragraphId, int index, String appId, InterpreterResult.Type type,
|
||||
String output) {
|
||||
Message msg = new Message(OP.APP_UPDATE_OUTPUT)
|
||||
.put("noteId", noteId)
|
||||
.put("paragraphId", paragraphId)
|
||||
.put("index", index)
|
||||
.put("type", type)
|
||||
.put("appId", appId)
|
||||
.put("data", output);
|
||||
broadcast(noteId, msg);
|
||||
|
|
|
|||
|
|
@ -141,6 +141,30 @@
|
|||
}
|
||||
};
|
||||
|
||||
$scope.$on('updateParagraphOutput', function(event, data) {
|
||||
if ($scope.paragraph.id === data.paragraphId) {
|
||||
if (!$scope.paragraph.result.msg) {
|
||||
$scope.paragraph.result.msg = [];
|
||||
}
|
||||
|
||||
var update = ($scope.paragraph.result.msg[data.index]) ? true : false;
|
||||
|
||||
$scope.paragraph.result.msg[data.index] = {
|
||||
data : data.data,
|
||||
type : data.type
|
||||
};
|
||||
|
||||
if (update) {
|
||||
$rootScope.$broadcast(
|
||||
'updateResult',
|
||||
$scope.paragraph.result.msg[data.index],
|
||||
$scope.paragraph.config.result[data.index],
|
||||
$scope.paragraph,
|
||||
data.index);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
$scope.getIframeDimensions = function() {
|
||||
if ($scope.asIframe) {
|
||||
var paragraphid = '#' + $routeParams.paragraphId + '_container';
|
||||
|
|
@ -769,19 +793,6 @@
|
|||
}
|
||||
};
|
||||
|
||||
$scope.getBase64ImageSrc = function(base64Data) {
|
||||
return 'data:image/png;base64,' + base64Data;
|
||||
};
|
||||
|
||||
$scope.getGraphMode = function(paragraph) {
|
||||
var pdata = (paragraph) ? paragraph : $scope.paragraph;
|
||||
if (pdata.config.graph && pdata.config.graph.mode) {
|
||||
return pdata.config.graph.mode;
|
||||
} else {
|
||||
return 'table';
|
||||
}
|
||||
};
|
||||
|
||||
$scope.parseTableCell = function(cell) {
|
||||
if (!isNaN(cell)) {
|
||||
if (cell.length === 0 || Number(cell) > Number.MAX_SAFE_INTEGER || Number(cell) < Number.MIN_SAFE_INTEGER) {
|
||||
|
|
@ -1175,7 +1186,7 @@
|
|||
}
|
||||
|
||||
/** broadcast update to result controller **/
|
||||
if (data.paragraph.result) {
|
||||
if (data.paragraph.result && data.paragraph.result.msg) {
|
||||
for (var i in data.paragraph.result.msg) {
|
||||
var newResult = data.paragraph.result.msg[i];
|
||||
var oldResult = $scope.paragraph.result.msg[i];
|
||||
|
|
@ -1228,31 +1239,6 @@
|
|||
|
||||
});
|
||||
|
||||
$scope.$on('appendParagraphOutput', function(event, data) {
|
||||
/* It has been observed that append events
|
||||
* can be errorneously called even if paragraph
|
||||
* execution has ended, and in that case, no append
|
||||
* should be made. Also, it was observed that between PENDING
|
||||
* and RUNNING states, append-events can be called and we can't
|
||||
* miss those, else during the length of paragraph run, few
|
||||
* initial output line/s will be missing.
|
||||
*/
|
||||
if ($scope.paragraph.id === data.paragraphId &&
|
||||
($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING')) {
|
||||
if ($scope.flushStreamingOutput) {
|
||||
$scope.clearTextOutput();
|
||||
$scope.flushStreamingOutput = false;
|
||||
}
|
||||
$scope.appendTextOutput(data.data);
|
||||
}
|
||||
});
|
||||
|
||||
$scope.$on('updateParagraphOutput', function(event, data) {
|
||||
if ($scope.paragraph.id === data.paragraphId) {
|
||||
$scope.clearTextOutput();
|
||||
$scope.appendTextOutput(data.data);
|
||||
}
|
||||
});
|
||||
|
||||
$scope.$on('updateProgress', function(event, data) {
|
||||
if (data.id === $scope.paragraph.id) {
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ limitations under the License.
|
|||
ng-if="!config.helium.activeApp"
|
||||
style="padding-bottom: 5px;"
|
||||
resize='{"allowresize": "{{!asIframe && !viewOnly}}", "graphType": "{{type}}"}'
|
||||
resizable on-resize="resizeParagraph(width, height);">
|
||||
resizable on-resize="resize(width, height);">
|
||||
<div ng-include src="'app/notebook/paragraph/result/result-graph.html'"></div>
|
||||
|
||||
<div id="{{id}}_comment"
|
||||
|
|
|
|||
|
|
@ -160,12 +160,36 @@
|
|||
renderResult($scope.type, true);
|
||||
});
|
||||
|
||||
$scope.$on('appendParagraphOutput', function(event, data) {
|
||||
/* It has been observed that append events
|
||||
* can be errorneously called even if paragraph
|
||||
* execution has ended, and in that case, no append
|
||||
* should be made. Also, it was observed that between PENDING
|
||||
* and RUNNING states, append-events can be called and we can't
|
||||
* miss those, else during the length of paragraph run, few
|
||||
* initial output line/s will be missing.
|
||||
*/
|
||||
if (paragraph.id === data.paragraphId &&
|
||||
resultIndex === data.index &&
|
||||
(paragraph.status === 'RUNNING' || paragraph.status === 'PENDING')) {
|
||||
appendTextOutput(data.data);
|
||||
}
|
||||
});
|
||||
|
||||
$scope.$on('updateParagraphOutput', function(event, data) {
|
||||
if (paragraph.id === data.paragraphId &&
|
||||
resultIndex === data.index) {
|
||||
clearTextOutput();
|
||||
appendTextOutput(data.data);
|
||||
}
|
||||
});
|
||||
|
||||
var updateData = function(result, config, paragraphRef, index) {
|
||||
data = result.data;
|
||||
paragraph = paragraphRef;
|
||||
resultIndex = parseInt(index);
|
||||
|
||||
$scope.id = paragraph.id + "_" + index;
|
||||
$scope.id = paragraph.id + '_' + index;
|
||||
$scope.type = result.type;
|
||||
config = config ? config : {};
|
||||
|
||||
|
|
@ -201,7 +225,7 @@
|
|||
if (!config.graph.scatter) {
|
||||
config.graph.scatter = {};
|
||||
}
|
||||
|
||||
|
||||
$scope.graphMode = config.graph.mode;
|
||||
$scope.config = angular.copy(config);
|
||||
|
||||
|
|
@ -211,6 +235,8 @@
|
|||
tableData.loadParagraphResult({type: $scope.type, msg: data});
|
||||
$scope.tableDataColumns = tableData.columns;
|
||||
$scope.tableDataComment = tableData.comment;
|
||||
|
||||
selectDefaultColsForGraphOption();
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -275,21 +301,24 @@
|
|||
$timeout(retryRenderer);
|
||||
};
|
||||
|
||||
var textRendererInitialized = false;
|
||||
var renderText = function() {
|
||||
var retryRenderer = function() {
|
||||
var textEl = angular.element('#p' + $scope.id + '_text');
|
||||
if (textEl.length) {
|
||||
// clear all lines before render
|
||||
clearTextOutput();
|
||||
|
||||
textRendererInitialized = true;
|
||||
|
||||
if (data) {
|
||||
appendTextOutput(data);
|
||||
} else {
|
||||
flushAppendQueue();
|
||||
}
|
||||
|
||||
angular.element('#p' + $scope.id + '_text').bind('mousewheel', function(e) {
|
||||
$scope.keepScrollDown = false;
|
||||
});
|
||||
$scope.flushStreamingOutput = true;
|
||||
});
|
||||
} else {
|
||||
$timeout(retryRenderer, 10);
|
||||
}
|
||||
|
|
@ -304,17 +333,30 @@
|
|||
}
|
||||
};
|
||||
|
||||
var appendTextOutput = function(msg) {
|
||||
var textEl = angular.element('#p' + $scope.id + '_text');
|
||||
if (textEl.length) {
|
||||
var lines = msg.split('\n');
|
||||
for (var i = 0; i < lines.length; i++) {
|
||||
textEl.append(angular.element('<div></div>').text(lines[i]));
|
||||
}
|
||||
var textAppendQueueBeforeInitialize = [];
|
||||
|
||||
var flushAppendQueue = function() {
|
||||
while (textAppendQueueBeforeInitialize.length > 0) {
|
||||
appendTextOutput(textAppendQueueBeforeInitialize.pop());
|
||||
}
|
||||
if ($scope.keepScrollDown) {
|
||||
var doc = angular.element('#p' + $scope.id + '_text');
|
||||
doc[0].scrollTop = doc[0].scrollHeight;
|
||||
};
|
||||
|
||||
var appendTextOutput = function(msg) {
|
||||
if (!textRendererInitialized) {
|
||||
textAppendQueueBeforeInitialize.push(msg);
|
||||
} else {
|
||||
flushAppendQueue();
|
||||
var textEl = angular.element('#p' + $scope.id + '_text');
|
||||
if (textEl.length) {
|
||||
var lines = msg.split('\n');
|
||||
for (var i = 0; i < lines.length; i++) {
|
||||
textEl.append(angular.element('<div></div>').text(lines[i]));
|
||||
}
|
||||
}
|
||||
if ($scope.keepScrollDown) {
|
||||
var doc = angular.element('#p' + $scope.id + '_text');
|
||||
doc[0].scrollTop = doc[0].scrollHeight;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -526,33 +568,33 @@
|
|||
}
|
||||
};
|
||||
|
||||
unique($scope.paragraph.config.graph.keys);
|
||||
removeUnknown($scope.paragraph.config.graph.keys);
|
||||
unique($scope.config.graph.keys);
|
||||
removeUnknown($scope.config.graph.keys);
|
||||
|
||||
removeUnknown($scope.paragraph.config.graph.values);
|
||||
removeUnknown($scope.config.graph.values);
|
||||
|
||||
unique($scope.paragraph.config.graph.groups);
|
||||
removeUnknown($scope.paragraph.config.graph.groups);
|
||||
unique($scope.config.graph.groups);
|
||||
removeUnknown($scope.config.graph.groups);
|
||||
|
||||
removeUnknownFromFields($scope.paragraph.config.graph.scatter);
|
||||
removeUnknownFromFields($scope.config.graph.scatter);
|
||||
};
|
||||
|
||||
/* select default key and value if there're none selected */
|
||||
var selectDefaultColsForGraphOption = function() {
|
||||
if ($scope.paragraph.config.graph.keys.length === 0 && tableData.columns.length > 0) {
|
||||
$scope.paragraph.config.graph.keys.push(tableData.columns[0]);
|
||||
if ($scope.config.graph.keys.length === 0 && tableData.columns.length > 0) {
|
||||
$scope.config.graph.keys.push(tableData.columns[0]);
|
||||
}
|
||||
|
||||
if ($scope.paragraph.config.graph.values.length === 0 && tableData.columns.length > 1) {
|
||||
$scope.paragraph.config.graph.values.push(tableData.columns[1]);
|
||||
if ($scope.config.graph.values.length === 0 && tableData.columns.length > 1) {
|
||||
$scope.config.graph.values.push(tableData.columns[1]);
|
||||
}
|
||||
|
||||
if (!$scope.paragraph.config.graph.scatter.xAxis && !$scope.paragraph.config.graph.scatter.yAxis) {
|
||||
if (!$scope.config.graph.scatter.xAxis && !$scope.config.graph.scatter.yAxis) {
|
||||
if (tableData.columns.length > 1) {
|
||||
$scope.paragraph.config.graph.scatter.xAxis = tableData.columns[0];
|
||||
$scope.paragraph.config.graph.scatter.yAxis = tableData.columns[1];
|
||||
$scope.config.graph.scatter.xAxis = tableData.columns[0];
|
||||
$scope.config.graph.scatter.yAxis = tableData.columns[1];
|
||||
} else if (tableData.columns.length === 1) {
|
||||
$scope.paragraph.config.graph.scatter.xAxis = tableData.columns[0];
|
||||
$scope.config.graph.scatter.xAxis = tableData.columns[0];
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -566,21 +608,20 @@
|
|||
}
|
||||
};
|
||||
|
||||
$scope.resizeParagraph = function(width, height) {
|
||||
$scope.changeColWidth(width);
|
||||
$scope.resize = function(width, height) {
|
||||
$timeout(function() {
|
||||
autoAdjustEditorHeight($scope.paragraph.id + '_editor');
|
||||
$scope.changeHeight(height);
|
||||
changeHeight(width, height);
|
||||
}, 200);
|
||||
};
|
||||
|
||||
$scope.changeHeight = function(height) {
|
||||
var newParams = angular.copy($scope.paragraph.settings.params);
|
||||
var newConfig = angular.copy($scope.paragraph.config);
|
||||
var changeHeight = function(width, height) {
|
||||
var newParams = angular.copy(paragraph.settings.params);
|
||||
var newConfig = angular.copy($scope.config);
|
||||
|
||||
newConfig.graph.height = height;
|
||||
paragraph.config.colWidth = width;
|
||||
|
||||
commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams);
|
||||
commitParagraphResult(paragraph.title, paragraph.text, newConfig, newParams);
|
||||
};
|
||||
|
||||
$scope.exportToDSV = function(delimiter) {
|
||||
|
|
@ -610,5 +651,9 @@
|
|||
}
|
||||
saveAsService.saveAs(dsv, 'data', extension);
|
||||
};
|
||||
|
||||
$scope.getBase64ImageSrc = function(base64Data) {
|
||||
return 'data:image/png;base64,' + base64Data;
|
||||
};
|
||||
};
|
||||
})();
|
||||
|
|
|
|||
|
|
@ -18,10 +18,7 @@ package org.apache.zeppelin.helium;
|
|||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterInfo;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
|
||||
|
|
@ -337,7 +334,8 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onOutputAppend(String noteId, String paragraphId, String appId, String output) {
|
||||
public void onOutputAppend(
|
||||
String noteId, String paragraphId, int index, String appId, String output) {
|
||||
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
|
||||
|
||||
if (appToUpdate != null) {
|
||||
|
|
@ -347,12 +345,14 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
}
|
||||
|
||||
if (applicationEventListener != null) {
|
||||
applicationEventListener.onOutputAppend(noteId, paragraphId, appId, output);
|
||||
applicationEventListener.onOutputAppend(noteId, paragraphId, index, appId, output);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String appId, String output) {
|
||||
public void onOutputUpdated(
|
||||
String noteId, String paragraphId, int index, String appId,
|
||||
InterpreterResult.Type type, String output) {
|
||||
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
|
||||
|
||||
if (appToUpdate != null) {
|
||||
|
|
@ -362,7 +362,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
}
|
||||
|
||||
if (applicationEventListener != null) {
|
||||
applicationEventListener.onOutputUpdated(noteId, paragraphId, appId, output);
|
||||
applicationEventListener.onOutputUpdated(noteId, paragraphId, index, appId, type, output);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue