mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Generalize SparkOutputStream
This commit is contained in:
parent
c4e722afd1
commit
7b596ea5c2
5 changed files with 18 additions and 13 deletions
|
|
@ -55,6 +55,7 @@ import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
|||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -71,7 +72,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
private GatewayServer gatewayServer;
|
||||
private DefaultExecutor executor;
|
||||
private int port;
|
||||
private SparkOutputStream outputStream;
|
||||
private InterpreterOutputStream outputStream;
|
||||
private BufferedWriter ins;
|
||||
private PipedInputStream in;
|
||||
private ByteArrayOutputStream input;
|
||||
|
|
@ -196,7 +197,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
cmd.addArgument(Integer.toString(port), false);
|
||||
cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
|
||||
executor = new DefaultExecutor();
|
||||
outputStream = new SparkOutputStream(logger);
|
||||
outputStream = new InterpreterOutputStream(logger);
|
||||
PipedOutputStream ps = new PipedOutputStream();
|
||||
in = null;
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
|
|||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.WellKnownResourceName;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
|
|
@ -110,7 +111,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
private static Integer sharedInterpreterLock = new Integer(0);
|
||||
private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
|
||||
|
||||
private SparkOutputStream out;
|
||||
private InterpreterOutputStream out;
|
||||
private SparkDependencyResolver dep;
|
||||
|
||||
/**
|
||||
|
|
@ -126,7 +127,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
public SparkInterpreter(Properties property) {
|
||||
super(property);
|
||||
out = new SparkOutputStream(logger);
|
||||
out = new InterpreterOutputStream(logger);
|
||||
}
|
||||
|
||||
public SparkInterpreter(Properties property, SparkContext sc) {
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
|
|||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -38,7 +39,7 @@ public class ZeppelinR implements ExecuteResultHandler {
|
|||
private final String rCmdPath;
|
||||
private final SparkVersion sparkVersion;
|
||||
private DefaultExecutor executor;
|
||||
private SparkOutputStream outputStream;
|
||||
private InterpreterOutputStream outputStream;
|
||||
private PipedOutputStream input;
|
||||
private final String scriptPath;
|
||||
private final String libPath;
|
||||
|
|
@ -146,7 +147,7 @@ public class ZeppelinR implements ExecuteResultHandler {
|
|||
logger.debug(cmd.toString());
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
outputStream = new SparkOutputStream(logger);
|
||||
outputStream = new InterpreterOutputStream(logger);
|
||||
|
||||
input = new PipedOutputStream();
|
||||
PipedInputStream in = new PipedInputStream(input);
|
||||
|
|
|
|||
|
|
@ -14,7 +14,8 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.spark;
|
||||
|
||||
package org.apache.zeppelin.interpreter.util;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -22,14 +23,15 @@ import org.slf4j.Logger;
|
|||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* InterpreterOutput can be attached / detached.
|
||||
* Output Stream integrated with InterpreterOutput.
|
||||
*
|
||||
* Can be used to channel output from interpreters.
|
||||
*/
|
||||
public class SparkOutputStream extends LogOutputStream {
|
||||
|
||||
public class InterpreterOutputStream extends LogOutputStream {
|
||||
public static Logger logger;
|
||||
InterpreterOutput interpreterOutput;
|
||||
|
||||
public SparkOutputStream(Logger logger) {
|
||||
public InterpreterOutputStream(Logger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
|
|
@ -78,6 +80,7 @@ public class SparkOutputStream extends LogOutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
super.flush();
|
||||
|
|
@ -15,13 +15,12 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.spark;
|
||||
package org.apache.zeppelin.interpreter.util;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
|
||||
/**
|
||||
* Minor modification of LogOutputStream of apache commons exec.
|
||||
* LogOutputStream of apache commons exec has one issue that method flush doesn't throw IOException,
|
||||
Loading…
Reference in a new issue