Truncate output

This commit is contained in:
Lee moon soo 2017-01-17 10:49:38 -08:00
parent 638e4c2f51
commit a9b4139d9d
10 changed files with 93 additions and 6 deletions

View file

@ -216,6 +216,11 @@
<description>Interpreter process connect timeout in msec.</description>
</property>
<property>
<name>zeppelin.interpreter.output.limit</name>
<value>102400</value>
<description>Output message from interpreter exceed the limit will be truncated</description>
</property>
<property>
<name>zeppelin.ssl</name>

View file

@ -248,6 +248,12 @@ If both are defined, then the **environment variables** will take priority.
<td>interpreter</td>
<td>Interpreter directory</td>
</tr>
<tr>
<td>ZEPPELIN_INTERPRETER_OUTPUT_LIMIT</td>
<td>zeppelin.interpreter.output.limit</td>
<td>102400</td>
<td>Output message from interpreter exceed the limit will be truncated</td>
</tr>
<tr>
<td>ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE</td>
<td>zeppelin.websocket.max.text.message.size</td>

View file

@ -30,4 +30,6 @@ public class Constants {
public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914;
public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100;
}

View file

@ -45,6 +45,13 @@ public class InterpreterOutput extends OutputStream {
private final InterpreterOutputListener flushListener;
private final InterpreterOutputChangeListener changeListener;
private int size = 0;
// change static var to set interpreter output limit
// limit will be applied to all InterpreterOutput object.
// so we can expect the consistent behavior
public static int limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
public InterpreterOutput(InterpreterOutputListener flushListener) {
this.flushListener = flushListener;
changeListener = null;
@ -52,7 +59,8 @@ public class InterpreterOutput extends OutputStream {
}
public InterpreterOutput(InterpreterOutputListener flushListener,
InterpreterOutputChangeListener listener) throws IOException {
InterpreterOutputChangeListener listener)
throws IOException {
this.flushListener = flushListener;
this.changeListener = listener;
clear();
@ -74,6 +82,7 @@ public class InterpreterOutput extends OutputStream {
out.setResourceSearchPaths(resourceSearchPaths);
buffer.reset();
size = 0;
if (currentOut != null) {
currentOut.flush();
@ -125,6 +134,8 @@ public class InterpreterOutput extends OutputStream {
}
public void clear() {
size = 0;
truncated = false;
buffer.reset();
synchronized (resultMessageOutputs) {
@ -157,11 +168,31 @@ public class InterpreterOutput extends OutputStream {
boolean startOfTheNewLine = true;
boolean firstCharIsPercentSign = false;
boolean truncated = false;
@Override
public void write(int b) throws IOException {
InterpreterResultMessageOutput out;
if (truncated) {
return;
}
synchronized (resultMessageOutputs) {
currentOut = getCurrentOutput();
if (++size > limit) {
if (b == NEW_LINE_CHAR && currentOut != null) {
InterpreterResult.Type type = currentOut.getType();
if (type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE) {
setType(InterpreterResult.Type.TEXT);
getCurrentOutput().write("Output exceeds " + limit + ". Truncated.\n");
truncated = true;
return;
}
}
}
if (startOfTheNewLine) {
if (b == '%') {
startOfTheNewLine = false;
@ -175,7 +206,6 @@ public class InterpreterOutput extends OutputStream {
}
if (b == NEW_LINE_CHAR) {
currentOut = getCurrentOutput();
if (currentOut != null && currentOut.getType() == InterpreterResult.Type.TABLE) {
if (previousChar == NEW_LINE_CHAR) {
startOfTheNewLine = true;

View file

@ -63,6 +63,7 @@ public class RemoteInterpreter extends Interpreter {
private int port;
private String userName;
private Boolean isUserImpersonate;
private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
/**
* Remote interpreter and manage interpreter process
@ -70,7 +71,8 @@ public class RemoteInterpreter extends Interpreter {
public RemoteInterpreter(Properties property, String sessionKey, String className,
String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
int outputLimit) {
super(property);
this.sessionKey = sessionKey;
this.className = className;
@ -85,6 +87,7 @@ public class RemoteInterpreter extends Interpreter {
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
this.outputLimit = outputLimit;
}
@ -94,7 +97,8 @@ public class RemoteInterpreter extends Interpreter {
public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
int port, String localRepoPath, int connectTimeout, int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
int outputLimit) {
super(property);
this.sessionKey = sessionKey;
this.className = className;
@ -108,6 +112,7 @@ public class RemoteInterpreter extends Interpreter {
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
this.outputLimit = outputLimit;
}
@ -217,6 +222,8 @@ public class RemoteInterpreter extends Interpreter {
if (localRepoPath != null) {
property.put("zeppelin.interpreter.localRepo", localRepoPath);
}
property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
client.createInterpreter(groupId, sessionKey,
getClassName(), (Map) property, userName);
// Push angular object loaded from JSON file to remote interpreter

View file

@ -162,6 +162,11 @@ public class RemoteInterpreterServer
interpreterGroup.setResourcePool(resourcePool);
String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
if (properties.containsKey("zeppelin.interpreter.output.limit")) {
InterpreterOutput.limit = Integer.parseInt(
properties.get("zeppelin.interpreter.output.limit"));
}
depLoader = new DependencyResolver(localRepoPath);
appLoader = new ApplicationLoader(resourcePool, depLoader);
}

View file

@ -162,6 +162,33 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
assertEquals("val1\tval2\n", new String(out.getOutputAt(1).toByteArray()));
}
@Test
public void testTruncate() throws IOException {
// output is truncated after the new line
InterpreterOutput.limit = 3;
out = new InterpreterOutput(this);
// truncate text
out.write("%text hello\nworld\n");
assertEquals("hello", new String(out.getOutputAt(0).toByteArray()));
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
// truncate table
out = new InterpreterOutput(this);
out.write("%table key\tvalue\nhello\t100\nworld\t200\n");
assertEquals("key\tvalue", new String(out.getOutputAt(0).toByteArray()));
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
// does not truncate html
out = new InterpreterOutput(this);
out.write("%html hello\nworld\n");
out.flush();
assertEquals("hello\nworld\n", new String(out.getOutputAt(0).toByteArray()));
// restore default
InterpreterOutput.limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
}
@Override
public void onUpdateAll(InterpreterOutput out) {

View file

@ -37,6 +37,7 @@ import org.apache.zeppelin.helium.Helium;
import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.helium.HeliumVisualizationFactory;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
@ -98,6 +99,8 @@ public class ZeppelinServer extends Application {
this.depResolver = new DependencyResolver(
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
HeliumApplicationFactory heliumApplicationFactory = new HeliumApplicationFactory();
HeliumVisualizationFactory heliumVisualizationFactory;

View file

@ -572,6 +572,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
+ "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
+ "scalding,jdbc,hbase,bigquery,beam,pig,scio"),
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
// use specified notebook (id) as homescreen

View file

@ -1150,7 +1150,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
LazyOpenInterpreter intp = new LazyOpenInterpreter(
new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath,
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
userName, isUserImpersonate));
userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));
return intp;
}
@ -1175,7 +1175,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
RemoteInterpreter remoteInterpreter =
new RemoteInterpreter(property, interpreterSessionKey, className,
interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate,
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT));
remoteInterpreter.addEnv(env);
return new LazyOpenInterpreter(remoteInterpreter);