diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 41697786e9..538a8446ef 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -356,7 +356,6 @@ public class RemoteInterpreterServer context.out.flush(); InterpreterResult.Type outputType = context.out.getType(); byte[] interpreterOutput = context.out.toByteArray(); - context.out.clear(); if (interpreterOutput != null && interpreterOutput.length > 0) { message = new String(interpreterOutput); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 0ef4b4c5e0..8976f37521 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -212,7 +212,7 @@ public class Paragraph extends Job implements Serializable, Cloneable { String replName = getRequiredReplName(); Interpreter repl = getRepl(replName); if (repl != null) { - return repl.getProgress(getInterpreterContext()); + return repl.getProgress(getInterpreterContext(null)); } else { return 0; } @@ -259,7 +259,6 @@ public class Paragraph extends Job implements Serializable, Cloneable { context.out.flush(); InterpreterResult.Type outputType = context.out.getType(); byte[] interpreterOutput = context.out.toByteArray(); - context.out.clear(); if (interpreterOutput != null && interpreterOutput.length > 0) { message = new String(interpreterOutput); @@ -298,12 +297,44 @@ public class Paragraph extends Job implements Serializable, Cloneable { if (job != null) { job.setStatus(Status.ABORT); } else { - repl.cancel(getInterpreterContext()); + repl.cancel(getInterpreterContext(null)); } return true; } private InterpreterContext getInterpreterContext() { + final Paragraph self = this; + + return getInterpreterContext(new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + updateParagraphResult(out); + ((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line)); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + updateParagraphResult(out); + ((ParagraphJobListener) getListener()).onOutputUpdate(self, out, + new String(output)); + } + + private void updateParagraphResult(InterpreterOutput out) { + // update paragraph result + Throwable t = null; + String message = null; + try { + message = new String(out.toByteArray()); + } catch (IOException e) { + logger().error(e.getMessage(), e); + t = e; + } + setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t); + } + })); + } + + private InterpreterContext getInterpreterContext(InterpreterOutput output) { AngularObjectRegistry registry = null; ResourcePool resourcePool = null; @@ -318,7 +349,6 @@ public class Paragraph extends Job implements Serializable, Cloneable { runners.add(new ParagraphRunner(note, note.id(), p.getId())); } - final Paragraph self = this; InterpreterContext interpreterContext = new InterpreterContext( note.id(), getId(), @@ -330,33 +360,7 @@ public class Paragraph extends Job implements Serializable, Cloneable { registry, resourcePool, runners, - new InterpreterOutput(new InterpreterOutputListener() { - @Override - public void onAppend(InterpreterOutput out, byte[] line) { - updateParagraphResult(out); - ((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line)); - } - - @Override - public void onUpdate(InterpreterOutput out, byte[] output) { - updateParagraphResult(out); - ((ParagraphJobListener) getListener()).onOutputUpdate(self, out, - new String(output)); - } - - private void updateParagraphResult(InterpreterOutput out) { - // update paragraph result - Throwable t = null; - String message = null; - try { - message = new String(out.toByteArray()); - } catch (IOException e) { - logger().error(e.getMessage(), e); - t = e; - } - setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t); - } - })); + output); return interpreterContext; }