ZEPPELIN-3975. Add limit local property for SparkInterpreter and JdbcInterpreter

This commit is contained in:
Jeff Zhang 2019-01-29 11:09:04 +08:00
parent 727600b5ca
commit 6350433cc1
14 changed files with 63 additions and 15 deletions

View file

@ -54,7 +54,7 @@ class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
override def getInterpreterClassMap: util.Map[String, String] =
JavaConversions.mapAsJavaMap(interpreterClassMap)
override def showData(obj: Any): String = {
override def showData(obj: Any, maxResult: Int): String = {
def showTable(table: Table): String = {
val columnNames: Array[String] = table.getSchema.getColumnNames
val dsRow: DataSet[Row] = btenv.toDataSet[Row](table)

View file

@ -43,7 +43,7 @@ public class GroovyZeppelinContext extends BaseZeppelinContext {
}
@Override
public String showData(Object obj) {
public String showData(Object obj, int maxResult) {
return null;
}
}

View file

@ -43,7 +43,7 @@ public class DevZeppelinContext extends BaseZeppelinContext {
}
@Override
public String showData(Object obj) {
public String showData(Object obj, int maxResult) {
return null;
}
}

View file

@ -721,8 +721,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
statement = connection.createStatement();
// fetch n+1 rows in order to indicate there's more rows available (for large selects)
statement.setFetchSize(getMaxResult());
statement.setMaxRows(maxRows);
statement.setFetchSize(interpreterContext.getIntLocalProperty("limit", getMaxResult()));
statement.setMaxRows(interpreterContext.getIntLocalProperty("limit", maxRows));
if (statement == null) {
return new InterpreterResult(Code.ERROR, "Prefix not found.");

View file

@ -188,6 +188,13 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message().get(0).getData());
interpreterContext.getLocalProperties().put("limit", "1");
interpreterResult = t.interpret(sqlQuery, interpreterContext);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message().get(0).getData());
}
@Test

View file

@ -43,7 +43,7 @@ public class PythonZeppelinContext extends BaseZeppelinContext {
}
@Override
public String showData(Object obj) {
public String showData(Object obj, int maxResult) {
return null;
}
}

View file

@ -86,8 +86,10 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
try {
Method method = sqlc.getClass().getMethod("sql", String.class);
int maxResult = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
"" + sparkInterpreter.getZeppelinContext().getMaxResult()));
String msg = sparkInterpreter.getZeppelinContext().showData(
method.invoke(sqlc, st));
method.invoke(sqlc, st), maxResult);
sc.clearJobGroup();
return new InterpreterResult(Code.SUCCESS, msg);
} catch (Exception e) {

View file

@ -67,7 +67,7 @@ class SparkZeppelinContext(val sc: SparkContext,
override def getInterpreterClassMap: util.Map[String, String] =
JavaConversions.mapAsJavaMap(interpreterClassMap)
override def showData(obj: Any): String = sparkShims.showDataFrame(obj, maxResult)
override def showData(obj: Any, maxResult: Int): String = sparkShims.showDataFrame(obj, maxResult)
@ZeppelinApi
def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options)

View file

@ -114,6 +114,7 @@ public class NewSparkSqlInterpreterTest {
InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
}
public void test_null_value_in_row() throws InterpreterException {
@ -155,7 +156,16 @@ public class NewSparkSqlInterpreterTest {
InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
// the number of rows is 10+1, 1 is the head of table
assertEquals(11, ret.message().get(0).getData().split("\n").length);
assertTrue(ret.message().get(1).getData().contains("alert-warning"));
// test limit local property
context.getLocalProperties().put("limit", "5");
ret = sqlInterpreter.interpret("select * from gr", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
// the number of rows is 5+1, 1 is the head of table
assertEquals(6, ret.message().get(0).getData().split("\n").length);
}
@Test

View file

@ -56,12 +56,16 @@ public class Spark1Shims extends SparkShims {
if (obj instanceof DataFrame) {
DataFrame df = (DataFrame) obj;
String[] columns = df.columns();
// fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
List<Row> rows = df.takeAsList(maxResult + 1);
StringBuilder msg = new StringBuilder();
msg.append("%table ");
msg.append(StringUtils.join(columns, "\t"));
msg.append("\n");
boolean isLargerThanMaxResult = rows.size() > maxResult;
if (isLargerThanMaxResult) {
rows = rows.subList(0, maxResult);
}
for (Row row : rows) {
for (int i = 0; i < row.size(); ++i) {
msg.append(row.get(i));
@ -72,7 +76,7 @@ public class Spark1Shims extends SparkShims {
msg.append("\n");
}
if (rows.size() > maxResult) {
if (isLargerThanMaxResult) {
msg.append("\n");
msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
}

View file

@ -57,12 +57,16 @@ public class Spark2Shims extends SparkShims {
if (obj instanceof Dataset) {
Dataset<Row> df = ((Dataset) obj).toDF();
String[] columns = df.columns();
// fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
List<Row> rows = df.takeAsList(maxResult + 1);
StringBuilder msg = new StringBuilder();
msg.append("%table ");
msg.append(StringUtils.join(columns, "\t"));
msg.append("\n");
boolean isLargerThanMaxResult = rows.size() > maxResult;
if (isLargerThanMaxResult) {
rows = rows.subList(0, maxResult);
}
for (Row row : rows) {
for (int i = 0; i < row.size(); ++i) {
msg.append(row.get(i));
@ -73,7 +77,7 @@ public class Spark2Shims extends SparkShims {
msg.append("\n");
}
if (rows.size() > maxResult) {
if (isLargerThanMaxResult) {
msg.append("\n");
msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
}

View file

@ -65,13 +65,18 @@ public abstract class BaseZeppelinContext {
return this.maxResult;
}
public String showData(Object obj) {
return showData(obj, maxResult);
}
/**
* subclasses should implement this method to display specific data type
*
* @param obj
* @param maxResult max number of rows to display
* @return
*/
public abstract String showData(Object obj);
public abstract String showData(Object obj, int maxResult);
/**
* @deprecated use z.textbox instead

View file

@ -201,6 +201,22 @@ public class InterpreterContext {
return localProperties;
}
public String getStringLocalProperty(String key, String defaultValue) {
return localProperties.getOrDefault(key, defaultValue);
}
public int getIntLocalProperty(String key, int defaultValue) {
return Integer.parseInt(localProperties.getOrDefault(key, defaultValue + ""));
}
public long getLongLocalProperty(String key, int defaultValue) {
return Long.parseLong(localProperties.getOrDefault(key, defaultValue + ""));
}
public double getDoubleLocalProperty(String key, double defaultValue) {
return Double.parseDouble(localProperties.getOrDefault(key, defaultValue + ""));
}
public AuthenticationInfo getAuthenticationInfo() {
return authenticationInfo;
}
@ -228,7 +244,7 @@ public class InterpreterContext {
public String getInterpreterClassName() {
return interpreterClassName;
}
public void setInterpreterClassName(String className) {
this.interpreterClassName = className;
}

View file

@ -133,7 +133,7 @@ public class BaseZeppelinContextTest {
}
@Override
public String showData(Object obj) {
public String showData(Object obj, int maxResult) {
return null;
}
}