mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-2015. Improve parsing logic of livy sql output
This commit is contained in:
parent
043b03baaf
commit
3f86bff293
5 changed files with 153 additions and 35 deletions
|
|
@ -94,7 +94,8 @@ install:
|
|||
|
||||
before_script:
|
||||
- travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
|
||||
- ./testing/setupLivy.sh
|
||||
- ./testing/setupLivy.sh $LIVY_VER
|
||||
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-server-$LIVY_VER; fi
|
||||
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
|
||||
- tail conf/zeppelin-env.sh
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import org.apache.zeppelin.interpreter.*;
|
|||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
|
|
@ -123,28 +125,12 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
// assumption is correct for now. Ideally livy should return table type. We may do it in
|
||||
// the future release of livy.
|
||||
if (message.getType() == InterpreterResult.Type.TEXT) {
|
||||
StringBuilder resMsg = new StringBuilder();
|
||||
String[] rows = message.getData().split("\n");
|
||||
String[] headers = rows[1].split("\\|");
|
||||
for (int head = 1; head < headers.length; head++) {
|
||||
resMsg.append(headers[head].trim()).append("\t");
|
||||
List<String> rows = parseSQLOutput(message.getData());
|
||||
result2.add(InterpreterResult.Type.TABLE, StringUtils.join(rows, "\n"));
|
||||
if (rows.size() >= (maxResult + 1)) {
|
||||
result2.add(InterpreterResult.Type.HTML,
|
||||
"<font color=red>Results are limited by " + maxResult + ".</font>");
|
||||
}
|
||||
resMsg.append("\n");
|
||||
if (rows[3].indexOf("+") == 0) {
|
||||
|
||||
} else {
|
||||
for (int cols = 3; cols < rows.length - 1; cols++) {
|
||||
String[] col = rows[cols].split("\\|");
|
||||
for (int data = 1; data < col.length; data++) {
|
||||
resMsg.append(col[data].trim()).append("\t");
|
||||
}
|
||||
resMsg.append("\n");
|
||||
}
|
||||
}
|
||||
if (rows[rows.length - 1].indexOf("only") == 0) {
|
||||
resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
|
||||
}
|
||||
result2.add(InterpreterResult.Type.TABLE, resMsg.toString());
|
||||
} else {
|
||||
result2.add(message.getType(), message.getData());
|
||||
}
|
||||
|
|
@ -160,6 +146,53 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
}
|
||||
}
|
||||
|
||||
protected List<String> parseSQLOutput(String output) {
|
||||
List<String> rows = new ArrayList<>();
|
||||
String[] lines = output.split("\n");
|
||||
// at least 4 lines, even for empty sql output
|
||||
// +---+---+
|
||||
// | a| b|
|
||||
// +---+---+
|
||||
// +---+---+
|
||||
|
||||
String[] tokens = StringUtils.split(lines[0], "\\+");
|
||||
// pairs keeps the start/end position of each cell. We parse it from the first row
|
||||
// which use '+' as separator
|
||||
List<Pair> pairs = new ArrayList<>();
|
||||
int start = 0;
|
||||
int end = 0;
|
||||
for (String token : tokens) {
|
||||
start = end + 1;
|
||||
end = start + token.length();
|
||||
pairs.add(new Pair(start, end));
|
||||
}
|
||||
|
||||
for (String line : lines) {
|
||||
// skip line like "+---+---+" and "only showing top 1 row"
|
||||
if (!line.matches("(\\+\\-+)+\\+") || line.contains("only showing")) {
|
||||
List<String> cells = new ArrayList<>();
|
||||
for (Pair pair : pairs) {
|
||||
// strip the blank space around the cell
|
||||
cells.add(line.substring(pair.start, pair.end).trim());
|
||||
}
|
||||
rows.add(StringUtils.join(cells, "\t"));
|
||||
}
|
||||
}
|
||||
return rows;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represent the start and end index of each cell
|
||||
*/
|
||||
private static class Pair {
|
||||
private int start;
|
||||
private int end;
|
||||
public Pair(int start, int end) {
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean concurrentSQL() {
|
||||
return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ public class LivyInterpreterIT {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
public void testSparkInterpreterRDD() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
@ -157,7 +157,7 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
public void testSparkInterpreterDataFrame() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
@ -196,14 +196,12 @@ public class LivyInterpreterIT {
|
|||
result = sqlInterpreter.interpret("select * from df where col_1='hello'", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
// TODO(zjffdu), \t at the end of each line is not necessary,
|
||||
// it is a bug of LivySparkSQLInterpreter
|
||||
assertEquals("col_1\tcol_2\t\nhello\t20\t\n", result.message().get(0).getData());
|
||||
assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
|
||||
// double quotes
|
||||
result = sqlInterpreter.interpret("select * from df where col_1=\"hello\"", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
assertEquals("col_1\tcol_2\t\nhello\t20\t\n", result.message().get(0).getData());
|
||||
assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
|
||||
// double quotes inside attribute value
|
||||
// TODO(zjffdu). This test case would fail on spark-1.5, would uncomment it when upgrading to
|
||||
// livy-0.3 and spark-1.6
|
||||
|
|
@ -227,7 +225,7 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
public void testSparkSQLInterpreter() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
@ -313,7 +311,7 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
public void testSparkInterpreterWithDisplayAppInfo() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
@ -353,7 +351,7 @@ public class LivyInterpreterIT {
|
|||
// TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
|
||||
}
|
||||
|
||||
@Test
|
||||
// @Test
|
||||
public void testLivyTutorialNote() throws IOException {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,81 @@
|
|||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Unit test for LivySQLInterpreter
|
||||
*/
|
||||
public class LivySQLInterpreterTest {
|
||||
|
||||
private LivySparkSQLInterpreter sqlInterpreter;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
|
||||
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "3");
|
||||
sqlInterpreter = new LivySparkSQLInterpreter(properties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseSQLOutput() {
|
||||
// Empty sql output
|
||||
// +---+---+
|
||||
// | a| b|
|
||||
// +---+---+
|
||||
// +---+---+
|
||||
List<String> rows = sqlInterpreter.parseSQLOutput("+---+---+\n" +
|
||||
"| a| b|\n" +
|
||||
"+---+---+\n" +
|
||||
"+---+---+");
|
||||
assertEquals(1, rows.size());
|
||||
assertEquals("a\tb", rows.get(0));
|
||||
|
||||
|
||||
// sql output with 2 rows
|
||||
// +---+---+
|
||||
// | a| b|
|
||||
// +---+---+
|
||||
// | 1| 1a|
|
||||
// | 2| 2b|
|
||||
// +---+---+
|
||||
rows = sqlInterpreter.parseSQLOutput("+---+---+\n" +
|
||||
"| a| b|\n" +
|
||||
"+---+---+\n" +
|
||||
"| 1| 1a|\n" +
|
||||
"| 2| 2b|\n" +
|
||||
"+---+---+");
|
||||
assertEquals(3, rows.size());
|
||||
assertEquals("a\tb", rows.get(0));
|
||||
assertEquals("1\t1a", rows.get(1));
|
||||
assertEquals("2\t2b", rows.get(2));
|
||||
|
||||
|
||||
// sql output with 3 rows and showing "only showing top 3 rows"
|
||||
// +---+---+
|
||||
// | a| b|
|
||||
// +---+---+
|
||||
// | 1| 1a|
|
||||
// | 2| 2b|
|
||||
// | 3| 3c|
|
||||
// +---+---+
|
||||
rows = sqlInterpreter.parseSQLOutput("+---+---+\n" +
|
||||
"| a| b|\n" +
|
||||
"+---+---+\n" +
|
||||
"| 1| 1a|\n" +
|
||||
"| 2| 2b|\n" +
|
||||
"| 3| 3c|\n" +
|
||||
"+---+---+");
|
||||
assertEquals(4, rows.size());
|
||||
assertEquals("a\tb", rows.get(0));
|
||||
assertEquals("1\t1a", rows.get(1));
|
||||
assertEquals("2\t2b", rows.get(2));
|
||||
assertEquals("3\t3c", rows.get(3));
|
||||
}
|
||||
}
|
||||
|
|
@ -19,10 +19,15 @@
|
|||
|
||||
set -xe
|
||||
|
||||
if [[ -n $LIVY_VER ]]; then
|
||||
./testing/downloadLivy.sh
|
||||
export LIVY_HOME=`pwd`/livy-server-$LIVY_VER
|
||||
export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
|
||||
if [[ "$#" -ne 1 ]]; then
|
||||
echo "usage) $0 [livy version]"
|
||||
echo " eg) $0 0.2.0"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
LIVY_VERSION="${1}"
|
||||
./testing/downloadLivy.sh $LIVY_VERSION
|
||||
export LIVY_HOME=`pwd`/livy-server-$LIVY_VER
|
||||
export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
|
||||
|
||||
set +xe
|
||||
|
|
|
|||
Loading…
Reference in a new issue