mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
added a null check to avoid testcase failures, another nullcheck for backward compatibility and added two new testcases
This commit is contained in:
parent
ab1ead2a3d
commit
9eae68bbb8
3 changed files with 132 additions and 4 deletions
|
|
@ -272,7 +272,9 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
throw new LivyException(e);
|
||||
}
|
||||
stmtInfo = getStatementInfo(stmtInfo.id);
|
||||
paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
|
||||
if (paragraphId != null) {
|
||||
paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
|
||||
}
|
||||
}
|
||||
if (appendSessionExpired) {
|
||||
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
|
||||
|
|
|
|||
|
|
@ -41,7 +41,10 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
public LivySparkSQLInterpreter(Properties property) {
|
||||
super(property);
|
||||
this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
|
||||
this.truncate = Boolean.parseBoolean(property.getProperty("zeppelin.livy.spark.sql.truncate"));
|
||||
if (property.getProperty("zeppelin.livy.spark.sql.truncate") != null) {
|
||||
this.truncate =
|
||||
Boolean.parseBoolean(property.getProperty("zeppelin.livy.spark.sql.truncate"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ package org.apache.zeppelin.livy;
|
|||
|
||||
import com.cloudera.livy.test.framework.Cluster;
|
||||
import com.cloudera.livy.test.framework.Cluster$;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
|
|
@ -33,7 +32,6 @@ import java.util.ArrayList;
|
|||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class LivyInterpreterIT {
|
||||
|
|
@ -308,6 +306,131 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringWithTruncation() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
}
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
|
||||
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
|
||||
sparkInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
interpreterGroup.get("session_1").add(sparkInterpreter);
|
||||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
|
||||
"title", "text", authInfo, null, null, null, null, null, output);
|
||||
sparkInterpreter.open();
|
||||
|
||||
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
|
||||
interpreterGroup.get("session_1").add(sqlInterpreter);
|
||||
sqlInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
sqlInterpreter.open();
|
||||
|
||||
try {
|
||||
// detect spark version
|
||||
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(1, result.message().size());
|
||||
|
||||
boolean isSpark2 = isSpark2(sparkInterpreter, context);
|
||||
|
||||
// test DataFrame api
|
||||
if (!isSpark2) {
|
||||
result = sparkInterpreter.interpret(
|
||||
"val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
|
||||
+ "df.collect()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData()
|
||||
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
|
||||
} else {
|
||||
result = sparkInterpreter.interpret(
|
||||
"val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
|
||||
+ "df.collect()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData()
|
||||
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
|
||||
}
|
||||
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
|
||||
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
|
||||
result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData());
|
||||
} finally {
|
||||
sparkInterpreter.close();
|
||||
sqlInterpreter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringWithoutTruncation() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
}
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
|
||||
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
Properties newProps = new Properties();
|
||||
for (Object name: properties.keySet()) {
|
||||
newProps.put(name, properties.get(name));
|
||||
}
|
||||
newProps.put("zeppelin.livy.spark.sql.truncate", "false");
|
||||
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps);
|
||||
sparkInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
interpreterGroup.get("session_1").add(sparkInterpreter);
|
||||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
|
||||
"title", "text", authInfo, null, null, null, null, null, output);
|
||||
sparkInterpreter.open();
|
||||
|
||||
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps);
|
||||
interpreterGroup.get("session_1").add(sqlInterpreter);
|
||||
sqlInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
sqlInterpreter.open();
|
||||
|
||||
try {
|
||||
// detect spark version
|
||||
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(1, result.message().size());
|
||||
|
||||
boolean isSpark2 = isSpark2(sparkInterpreter, context);
|
||||
|
||||
// test DataFrame api
|
||||
if (!isSpark2) {
|
||||
result = sparkInterpreter.interpret(
|
||||
"val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
|
||||
+ "df.collect()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData()
|
||||
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
|
||||
} else {
|
||||
result = sparkInterpreter.interpret(
|
||||
"val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
|
||||
+ "df.collect()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData()
|
||||
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
|
||||
}
|
||||
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
|
||||
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
|
||||
result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData());
|
||||
} finally {
|
||||
sparkInterpreter.close();
|
||||
sqlInterpreter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPySparkInterpreter() {
|
||||
if (!checkPreCondition()) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue