diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 582cb6b06c..13407b22a3 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -55,6 +55,7 @@ import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +72,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand private GatewayServer gatewayServer; private DefaultExecutor executor; private int port; - private SparkOutputStream outputStream; + private InterpreterOutputStream outputStream; private BufferedWriter ins; private PipedInputStream in; private ByteArrayOutputStream input; @@ -196,7 +197,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand cmd.addArgument(Integer.toString(port), false); cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false); executor = new DefaultExecutor(); - outputStream = new SparkOutputStream(logger); + outputStream = new InterpreterOutputStream(logger); PipedOutputStream ps = new PipedOutputStream(); in = null; try { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 53bf30b953..41e83ef6f6 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -55,6 +55,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterUtils; import org.apache.zeppelin.interpreter.WrappedInterpreter; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.resource.WellKnownResourceName; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -110,7 +111,7 @@ public class SparkInterpreter extends Interpreter { private static Integer sharedInterpreterLock = new Integer(0); private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0); - private SparkOutputStream out; + private InterpreterOutputStream out; private SparkDependencyResolver dep; /** @@ -126,7 +127,7 @@ public class SparkInterpreter extends Interpreter { public SparkInterpreter(Properties property) { super(property); - out = new SparkOutputStream(logger); + out = new InterpreterOutputStream(logger); } public SparkInterpreter(Properties property, SparkContext sc) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index 961793db17..98c6de301f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ public class ZeppelinR implements ExecuteResultHandler { private final String rCmdPath; private final SparkVersion sparkVersion; private DefaultExecutor executor; - private SparkOutputStream outputStream; + private InterpreterOutputStream outputStream; private PipedOutputStream input; private final String scriptPath; private final String libPath; @@ -146,7 +147,7 @@ public class ZeppelinR implements ExecuteResultHandler { logger.debug(cmd.toString()); executor = new DefaultExecutor(); - outputStream = new SparkOutputStream(logger); + outputStream = new InterpreterOutputStream(logger); input = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(input); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java similarity index 89% rename from spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java index e454994aa0..b6f01b1a7c 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java @@ -14,7 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.zeppelin.spark; + +package org.apache.zeppelin.interpreter.util; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.slf4j.Logger; @@ -22,14 +23,15 @@ import org.slf4j.Logger; import java.io.IOException; /** - * InterpreterOutput can be attached / detached. + * Output Stream integrated with InterpreterOutput. + * + * Can be used to channel output from interpreters. */ -public class SparkOutputStream extends LogOutputStream { - +public class InterpreterOutputStream extends LogOutputStream { public static Logger logger; InterpreterOutput interpreterOutput; - public SparkOutputStream(Logger logger) { + public InterpreterOutputStream(Logger logger) { this.logger = logger; } @@ -78,6 +80,7 @@ public class SparkOutputStream extends LogOutputStream { } } + @Override public void flush() throws IOException { super.flush(); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/LogOutputStream.java similarity index 98% rename from spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/LogOutputStream.java index d941cd772c..e77f441f90 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/LogOutputStream.java @@ -15,13 +15,12 @@ * limitations under the License. */ -package org.apache.zeppelin.spark; +package org.apache.zeppelin.interpreter.util; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; - /** * Minor modification of LogOutputStream of apache commons exec. * LogOutputStream of apache commons exec has one issue that method flush doesn't throw IOException,