added a null check to avoid testcase failures, another nullcheck for backward compatibility and added two new testcases

This commit is contained in:
Benoy Antony 2017-03-30 07:11:53 -07:00
parent ab1ead2a3d
commit 9eae68bbb8
3 changed files with 132 additions and 4 deletions

View file

@ -272,7 +272,9 @@ public abstract class BaseLivyInterprereter extends Interpreter {
throw new LivyException(e);
}
stmtInfo = getStatementInfo(stmtInfo.id);
paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
if (paragraphId != null) {
paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
}
}
if (appendSessionExpired) {
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),

View file

@ -41,7 +41,10 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
public LivySparkSQLInterpreter(Properties property) {
super(property);
this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
this.truncate = Boolean.parseBoolean(property.getProperty("zeppelin.livy.spark.sql.truncate"));
if (property.getProperty("zeppelin.livy.spark.sql.truncate") != null) {
this.truncate =
Boolean.parseBoolean(property.getProperty("zeppelin.livy.spark.sql.truncate"));
}
}
@Override

View file

@ -20,7 +20,6 @@ package org.apache.zeppelin.livy;
import com.cloudera.livy.test.framework.Cluster;
import com.cloudera.livy.test.framework.Cluster$;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
@ -33,7 +32,6 @@ import java.util.ArrayList;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class LivyInterpreterIT {
@ -308,6 +306,131 @@ public class LivyInterpreterIT {
}
}
@Test
public void testStringWithTruncation() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
try {
// detect spark version
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
boolean isSpark2 = isSpark2(sparkInterpreter, context);
// test DataFrame api
if (!isSpark2) {
result = sparkInterpreter.interpret(
"val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
} else {
result = sparkInterpreter.interpret(
"val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
}
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData());
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
}
}
@Test
public void testStringWithoutTruncation() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
Properties newProps = new Properties();
for (Object name: properties.keySet()) {
newProps.put(name, properties.get(name));
}
newProps.put("zeppelin.livy.spark.sql.truncate", "false");
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
try {
// detect spark version
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
boolean isSpark2 = isSpark2(sparkInterpreter, context);
// test DataFrame api
if (!isSpark2) {
result = sparkInterpreter.interpret(
"val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
} else {
result = sparkInterpreter.interpret(
"val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
}
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData());
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
}
}
@Test
public void testPySparkInterpreter() {
if (!checkPreCondition()) {