Generalize SparkOutputStream

This commit is contained in:
Rafal Wojdyla 2016-09-28 23:23:57 -04:00
parent c4e722afd1
commit 7b596ea5c2
5 changed files with 18 additions and 13 deletions

View file

@ -55,6 +55,7 @@ import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,7 +72,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
private GatewayServer gatewayServer;
private DefaultExecutor executor;
private int port;
private SparkOutputStream outputStream;
private InterpreterOutputStream outputStream;
private BufferedWriter ins;
private PipedInputStream in;
private ByteArrayOutputStream input;
@ -196,7 +197,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
cmd.addArgument(Integer.toString(port), false);
cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
executor = new DefaultExecutor();
outputStream = new SparkOutputStream(logger);
outputStream = new InterpreterOutputStream(logger);
PipedOutputStream ps = new PipedOutputStream();
in = null;
try {

View file

@ -55,6 +55,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -110,7 +111,7 @@ public class SparkInterpreter extends Interpreter {
private static Integer sharedInterpreterLock = new Integer(0);
private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
private SparkOutputStream out;
private InterpreterOutputStream out;
private SparkDependencyResolver dep;
/**
@ -126,7 +127,7 @@ public class SparkInterpreter extends Interpreter {
public SparkInterpreter(Properties property) {
super(property);
out = new SparkOutputStream(logger);
out = new InterpreterOutputStream(logger);
}
public SparkInterpreter(Properties property, SparkContext sc) {

View file

@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,7 +39,7 @@ public class ZeppelinR implements ExecuteResultHandler {
private final String rCmdPath;
private final SparkVersion sparkVersion;
private DefaultExecutor executor;
private SparkOutputStream outputStream;
private InterpreterOutputStream outputStream;
private PipedOutputStream input;
private final String scriptPath;
private final String libPath;
@ -146,7 +147,7 @@ public class ZeppelinR implements ExecuteResultHandler {
logger.debug(cmd.toString());
executor = new DefaultExecutor();
outputStream = new SparkOutputStream(logger);
outputStream = new InterpreterOutputStream(logger);
input = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(input);

View file

@ -14,7 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
package org.apache.zeppelin.interpreter.util;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.slf4j.Logger;
@ -22,14 +23,15 @@ import org.slf4j.Logger;
import java.io.IOException;
/**
* InterpreterOutput can be attached / detached.
* Output Stream integrated with InterpreterOutput.
*
* Can be used to channel output from interpreters.
*/
public class SparkOutputStream extends LogOutputStream {
public class InterpreterOutputStream extends LogOutputStream {
public static Logger logger;
InterpreterOutput interpreterOutput;
public SparkOutputStream(Logger logger) {
public InterpreterOutputStream(Logger logger) {
this.logger = logger;
}
@ -78,6 +80,7 @@ public class SparkOutputStream extends LogOutputStream {
}
}
@Override
public void flush() throws IOException {
super.flush();

View file

@ -15,13 +15,12 @@
* limitations under the License.
*/
package org.apache.zeppelin.spark;
package org.apache.zeppelin.interpreter.util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
/**
* Minor modification of LogOutputStream of apache commons exec.
* LogOutputStream of apache commons exec has one issue that method flush doesn't throw IOException,