mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Truncate output
This commit is contained in:
parent
638e4c2f51
commit
a9b4139d9d
10 changed files with 93 additions and 6 deletions
|
|
@ -216,6 +216,11 @@
|
|||
<description>Interpreter process connect timeout in msec.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.interpreter.output.limit</name>
|
||||
<value>102400</value>
|
||||
<description>Output message from interpreter exceed the limit will be truncated</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>zeppelin.ssl</name>
|
||||
|
|
|
|||
|
|
@ -248,6 +248,12 @@ If both are defined, then the **environment variables** will take priority.
|
|||
<td>interpreter</td>
|
||||
<td>Interpreter directory</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>ZEPPELIN_INTERPRETER_OUTPUT_LIMIT</td>
|
||||
<td>zeppelin.interpreter.output.limit</td>
|
||||
<td>102400</td>
|
||||
<td>Output message from interpreter exceed the limit will be truncated</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE</td>
|
||||
<td>zeppelin.websocket.max.text.message.size</td>
|
||||
|
|
|
|||
|
|
@ -30,4 +30,6 @@ public class Constants {
|
|||
|
||||
public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914;
|
||||
|
||||
public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,13 @@ public class InterpreterOutput extends OutputStream {
|
|||
private final InterpreterOutputListener flushListener;
|
||||
private final InterpreterOutputChangeListener changeListener;
|
||||
|
||||
private int size = 0;
|
||||
|
||||
// change static var to set interpreter output limit
|
||||
// limit will be applied to all InterpreterOutput object.
|
||||
// so we can expect the consistent behavior
|
||||
public static int limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
|
||||
|
||||
public InterpreterOutput(InterpreterOutputListener flushListener) {
|
||||
this.flushListener = flushListener;
|
||||
changeListener = null;
|
||||
|
|
@ -52,7 +59,8 @@ public class InterpreterOutput extends OutputStream {
|
|||
}
|
||||
|
||||
public InterpreterOutput(InterpreterOutputListener flushListener,
|
||||
InterpreterOutputChangeListener listener) throws IOException {
|
||||
InterpreterOutputChangeListener listener)
|
||||
throws IOException {
|
||||
this.flushListener = flushListener;
|
||||
this.changeListener = listener;
|
||||
clear();
|
||||
|
|
@ -74,6 +82,7 @@ public class InterpreterOutput extends OutputStream {
|
|||
out.setResourceSearchPaths(resourceSearchPaths);
|
||||
|
||||
buffer.reset();
|
||||
size = 0;
|
||||
|
||||
if (currentOut != null) {
|
||||
currentOut.flush();
|
||||
|
|
@ -125,6 +134,8 @@ public class InterpreterOutput extends OutputStream {
|
|||
}
|
||||
|
||||
public void clear() {
|
||||
size = 0;
|
||||
truncated = false;
|
||||
buffer.reset();
|
||||
|
||||
synchronized (resultMessageOutputs) {
|
||||
|
|
@ -157,11 +168,31 @@ public class InterpreterOutput extends OutputStream {
|
|||
boolean startOfTheNewLine = true;
|
||||
boolean firstCharIsPercentSign = false;
|
||||
|
||||
boolean truncated = false;
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
InterpreterResultMessageOutput out;
|
||||
if (truncated) {
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (resultMessageOutputs) {
|
||||
currentOut = getCurrentOutput();
|
||||
|
||||
if (++size > limit) {
|
||||
if (b == NEW_LINE_CHAR && currentOut != null) {
|
||||
InterpreterResult.Type type = currentOut.getType();
|
||||
if (type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE) {
|
||||
|
||||
setType(InterpreterResult.Type.TEXT);
|
||||
getCurrentOutput().write("Output exceeds " + limit + ". Truncated.\n");
|
||||
truncated = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (startOfTheNewLine) {
|
||||
if (b == '%') {
|
||||
startOfTheNewLine = false;
|
||||
|
|
@ -175,7 +206,6 @@ public class InterpreterOutput extends OutputStream {
|
|||
}
|
||||
|
||||
if (b == NEW_LINE_CHAR) {
|
||||
currentOut = getCurrentOutput();
|
||||
if (currentOut != null && currentOut.getType() == InterpreterResult.Type.TABLE) {
|
||||
if (previousChar == NEW_LINE_CHAR) {
|
||||
startOfTheNewLine = true;
|
||||
|
|
|
|||
|
|
@ -63,6 +63,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private int port;
|
||||
private String userName;
|
||||
private Boolean isUserImpersonate;
|
||||
private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
|
||||
|
||||
/**
|
||||
* Remote interpreter and manage interpreter process
|
||||
|
|
@ -70,7 +71,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
public RemoteInterpreter(Properties property, String sessionKey, String className,
|
||||
String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
|
||||
int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
|
||||
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
|
||||
int outputLimit) {
|
||||
super(property);
|
||||
this.sessionKey = sessionKey;
|
||||
this.className = className;
|
||||
|
|
@ -85,6 +87,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.applicationEventListener = appListener;
|
||||
this.userName = userName;
|
||||
this.isUserImpersonate = isUserImpersonate;
|
||||
this.outputLimit = outputLimit;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -94,7 +97,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
|
||||
int port, String localRepoPath, int connectTimeout, int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
|
||||
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
|
||||
int outputLimit) {
|
||||
super(property);
|
||||
this.sessionKey = sessionKey;
|
||||
this.className = className;
|
||||
|
|
@ -108,6 +112,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.applicationEventListener = appListener;
|
||||
this.userName = userName;
|
||||
this.isUserImpersonate = isUserImpersonate;
|
||||
this.outputLimit = outputLimit;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -217,6 +222,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
if (localRepoPath != null) {
|
||||
property.put("zeppelin.interpreter.localRepo", localRepoPath);
|
||||
}
|
||||
|
||||
property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
|
||||
client.createInterpreter(groupId, sessionKey,
|
||||
getClassName(), (Map) property, userName);
|
||||
// Push angular object loaded from JSON file to remote interpreter
|
||||
|
|
|
|||
|
|
@ -162,6 +162,11 @@ public class RemoteInterpreterServer
|
|||
interpreterGroup.setResourcePool(resourcePool);
|
||||
|
||||
String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
|
||||
if (properties.containsKey("zeppelin.interpreter.output.limit")) {
|
||||
InterpreterOutput.limit = Integer.parseInt(
|
||||
properties.get("zeppelin.interpreter.output.limit"));
|
||||
}
|
||||
|
||||
depLoader = new DependencyResolver(localRepoPath);
|
||||
appLoader = new ApplicationLoader(resourcePool, depLoader);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -162,6 +162,33 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
|
|||
assertEquals("val1\tval2\n", new String(out.getOutputAt(1).toByteArray()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncate() throws IOException {
|
||||
// output is truncated after the new line
|
||||
InterpreterOutput.limit = 3;
|
||||
out = new InterpreterOutput(this);
|
||||
|
||||
// truncate text
|
||||
out.write("%text hello\nworld\n");
|
||||
assertEquals("hello", new String(out.getOutputAt(0).toByteArray()));
|
||||
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
|
||||
|
||||
// truncate table
|
||||
out = new InterpreterOutput(this);
|
||||
out.write("%table key\tvalue\nhello\t100\nworld\t200\n");
|
||||
assertEquals("key\tvalue", new String(out.getOutputAt(0).toByteArray()));
|
||||
assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
|
||||
|
||||
// does not truncate html
|
||||
out = new InterpreterOutput(this);
|
||||
out.write("%html hello\nworld\n");
|
||||
out.flush();
|
||||
assertEquals("hello\nworld\n", new String(out.getOutputAt(0).toByteArray()));
|
||||
|
||||
// restore default
|
||||
InterpreterOutput.limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onUpdateAll(InterpreterOutput out) {
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ import org.apache.zeppelin.helium.Helium;
|
|||
import org.apache.zeppelin.helium.HeliumApplicationFactory;
|
||||
import org.apache.zeppelin.helium.HeliumVisualizationFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.NotebookAuthorization;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
|
||||
|
|
@ -98,6 +99,8 @@ public class ZeppelinServer extends Application {
|
|||
this.depResolver = new DependencyResolver(
|
||||
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
|
||||
|
||||
InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
|
||||
|
||||
HeliumApplicationFactory heliumApplicationFactory = new HeliumApplicationFactory();
|
||||
HeliumVisualizationFactory heliumVisualizationFactory;
|
||||
|
||||
|
|
|
|||
|
|
@ -572,6 +572,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
|
||||
+ "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
|
||||
+ "scalding,jdbc,hbase,bigquery,beam,pig,scio"),
|
||||
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
// use specified notebook (id) as homescreen
|
||||
|
|
|
|||
|
|
@ -1150,7 +1150,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
LazyOpenInterpreter intp = new LazyOpenInterpreter(
|
||||
new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath,
|
||||
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
|
||||
userName, isUserImpersonate));
|
||||
userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));
|
||||
return intp;
|
||||
}
|
||||
|
||||
|
|
@ -1175,7 +1175,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
RemoteInterpreter remoteInterpreter =
|
||||
new RemoteInterpreter(property, interpreterSessionKey, className,
|
||||
interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
|
||||
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
|
||||
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate,
|
||||
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT));
|
||||
remoteInterpreter.addEnv(env);
|
||||
|
||||
return new LazyOpenInterpreter(remoteInterpreter);
|
||||
|
|
|
|||
Loading…
Reference in a new issue