mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-3975. Add limit local property for SparkInterpreter and JdbcInterpreter
This commit is contained in:
parent
727600b5ca
commit
6350433cc1
14 changed files with 63 additions and 15 deletions
|
|
@ -54,7 +54,7 @@ class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
|
|||
override def getInterpreterClassMap: util.Map[String, String] =
|
||||
JavaConversions.mapAsJavaMap(interpreterClassMap)
|
||||
|
||||
override def showData(obj: Any): String = {
|
||||
override def showData(obj: Any, maxResult: Int): String = {
|
||||
def showTable(table: Table): String = {
|
||||
val columnNames: Array[String] = table.getSchema.getColumnNames
|
||||
val dsRow: DataSet[Row] = btenv.toDataSet[Row](table)
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class GroovyZeppelinContext extends BaseZeppelinContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String showData(Object obj) {
|
||||
public String showData(Object obj, int maxResult) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class DevZeppelinContext extends BaseZeppelinContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String showData(Object obj) {
|
||||
public String showData(Object obj, int maxResult) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -721,8 +721,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
|
|||
statement = connection.createStatement();
|
||||
|
||||
// fetch n+1 rows in order to indicate there's more rows available (for large selects)
|
||||
statement.setFetchSize(getMaxResult());
|
||||
statement.setMaxRows(maxRows);
|
||||
statement.setFetchSize(interpreterContext.getIntLocalProperty("limit", getMaxResult()));
|
||||
statement.setMaxRows(interpreterContext.getIntLocalProperty("limit", maxRows));
|
||||
|
||||
if (statement == null) {
|
||||
return new InterpreterResult(Code.ERROR, "Prefix not found.");
|
||||
|
|
|
|||
|
|
@ -188,6 +188,13 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
|
||||
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message().get(0).getData());
|
||||
|
||||
interpreterContext.getLocalProperties().put("limit", "1");
|
||||
interpreterResult = t.interpret(sqlQuery, interpreterContext);
|
||||
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
|
||||
assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class PythonZeppelinContext extends BaseZeppelinContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String showData(Object obj) {
|
||||
public String showData(Object obj, int maxResult) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -86,8 +86,10 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
|
|||
|
||||
try {
|
||||
Method method = sqlc.getClass().getMethod("sql", String.class);
|
||||
int maxResult = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
|
||||
"" + sparkInterpreter.getZeppelinContext().getMaxResult()));
|
||||
String msg = sparkInterpreter.getZeppelinContext().showData(
|
||||
method.invoke(sqlc, st));
|
||||
method.invoke(sqlc, st), maxResult);
|
||||
sc.clearJobGroup();
|
||||
return new InterpreterResult(Code.SUCCESS, msg);
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ class SparkZeppelinContext(val sc: SparkContext,
|
|||
override def getInterpreterClassMap: util.Map[String, String] =
|
||||
JavaConversions.mapAsJavaMap(interpreterClassMap)
|
||||
|
||||
override def showData(obj: Any): String = sparkShims.showDataFrame(obj, maxResult)
|
||||
override def showData(obj: Any, maxResult: Int): String = sparkShims.showDataFrame(obj, maxResult)
|
||||
|
||||
@ZeppelinApi
|
||||
def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options)
|
||||
|
|
|
|||
|
|
@ -114,6 +114,7 @@ public class NewSparkSqlInterpreterTest {
|
|||
|
||||
InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
|
||||
|
||||
}
|
||||
|
||||
public void test_null_value_in_row() throws InterpreterException {
|
||||
|
|
@ -155,7 +156,16 @@ public class NewSparkSqlInterpreterTest {
|
|||
|
||||
InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
|
||||
// the number of rows is 10+1, 1 is the head of table
|
||||
assertEquals(11, ret.message().get(0).getData().split("\n").length);
|
||||
assertTrue(ret.message().get(1).getData().contains("alert-warning"));
|
||||
|
||||
// test limit local property
|
||||
context.getLocalProperties().put("limit", "5");
|
||||
ret = sqlInterpreter.interpret("select * from gr", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
|
||||
// the number of rows is 5+1, 1 is the head of table
|
||||
assertEquals(6, ret.message().get(0).getData().split("\n").length);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -56,12 +56,16 @@ public class Spark1Shims extends SparkShims {
|
|||
if (obj instanceof DataFrame) {
|
||||
DataFrame df = (DataFrame) obj;
|
||||
String[] columns = df.columns();
|
||||
// fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
|
||||
List<Row> rows = df.takeAsList(maxResult + 1);
|
||||
|
||||
StringBuilder msg = new StringBuilder();
|
||||
msg.append("%table ");
|
||||
msg.append(StringUtils.join(columns, "\t"));
|
||||
msg.append("\n");
|
||||
boolean isLargerThanMaxResult = rows.size() > maxResult;
|
||||
if (isLargerThanMaxResult) {
|
||||
rows = rows.subList(0, maxResult);
|
||||
}
|
||||
for (Row row : rows) {
|
||||
for (int i = 0; i < row.size(); ++i) {
|
||||
msg.append(row.get(i));
|
||||
|
|
@ -72,7 +76,7 @@ public class Spark1Shims extends SparkShims {
|
|||
msg.append("\n");
|
||||
}
|
||||
|
||||
if (rows.size() > maxResult) {
|
||||
if (isLargerThanMaxResult) {
|
||||
msg.append("\n");
|
||||
msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,12 +57,16 @@ public class Spark2Shims extends SparkShims {
|
|||
if (obj instanceof Dataset) {
|
||||
Dataset<Row> df = ((Dataset) obj).toDF();
|
||||
String[] columns = df.columns();
|
||||
// fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
|
||||
List<Row> rows = df.takeAsList(maxResult + 1);
|
||||
|
||||
StringBuilder msg = new StringBuilder();
|
||||
msg.append("%table ");
|
||||
msg.append(StringUtils.join(columns, "\t"));
|
||||
msg.append("\n");
|
||||
boolean isLargerThanMaxResult = rows.size() > maxResult;
|
||||
if (isLargerThanMaxResult) {
|
||||
rows = rows.subList(0, maxResult);
|
||||
}
|
||||
for (Row row : rows) {
|
||||
for (int i = 0; i < row.size(); ++i) {
|
||||
msg.append(row.get(i));
|
||||
|
|
@ -73,7 +77,7 @@ public class Spark2Shims extends SparkShims {
|
|||
msg.append("\n");
|
||||
}
|
||||
|
||||
if (rows.size() > maxResult) {
|
||||
if (isLargerThanMaxResult) {
|
||||
msg.append("\n");
|
||||
msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,13 +65,18 @@ public abstract class BaseZeppelinContext {
|
|||
return this.maxResult;
|
||||
}
|
||||
|
||||
public String showData(Object obj) {
|
||||
return showData(obj, maxResult);
|
||||
}
|
||||
|
||||
/**
|
||||
* subclasses should implement this method to display specific data type
|
||||
*
|
||||
* @param obj
|
||||
* @param maxResult max number of rows to display
|
||||
* @return
|
||||
*/
|
||||
public abstract String showData(Object obj);
|
||||
public abstract String showData(Object obj, int maxResult);
|
||||
|
||||
/**
|
||||
* @deprecated use z.textbox instead
|
||||
|
|
|
|||
|
|
@ -201,6 +201,22 @@ public class InterpreterContext {
|
|||
return localProperties;
|
||||
}
|
||||
|
||||
public String getStringLocalProperty(String key, String defaultValue) {
|
||||
return localProperties.getOrDefault(key, defaultValue);
|
||||
}
|
||||
|
||||
public int getIntLocalProperty(String key, int defaultValue) {
|
||||
return Integer.parseInt(localProperties.getOrDefault(key, defaultValue + ""));
|
||||
}
|
||||
|
||||
public long getLongLocalProperty(String key, int defaultValue) {
|
||||
return Long.parseLong(localProperties.getOrDefault(key, defaultValue + ""));
|
||||
}
|
||||
|
||||
public double getDoubleLocalProperty(String key, double defaultValue) {
|
||||
return Double.parseDouble(localProperties.getOrDefault(key, defaultValue + ""));
|
||||
}
|
||||
|
||||
public AuthenticationInfo getAuthenticationInfo() {
|
||||
return authenticationInfo;
|
||||
}
|
||||
|
|
@ -228,7 +244,7 @@ public class InterpreterContext {
|
|||
public String getInterpreterClassName() {
|
||||
return interpreterClassName;
|
||||
}
|
||||
|
||||
|
||||
public void setInterpreterClassName(String className) {
|
||||
this.interpreterClassName = className;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ public class BaseZeppelinContextTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String showData(Object obj) {
|
||||
public String showData(Object obj, int maxResult) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue