update test

This commit is contained in:
Jeff Zhang 2016-11-08 17:44:30 +08:00
parent e99be8d451
commit 6c9795f3d9
5 changed files with 99 additions and 52 deletions

View file

@ -94,6 +94,8 @@ before_script:
- travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
- if [[ -n $LIVY_VER ]]; then travis_retry ./testing/downloadLivy.sh $LIVY_VER; fi
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-server-$LIVY_VER; fi
- if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
- tail conf/zeppelin-env.sh
script:
@ -110,3 +112,10 @@ after_failure:
- cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.out
- cat zeppelin-web/npm-debug.log
- cat spark-*/logs/*
- cat livy/target/tmp/*/output.log
- ls -l livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*
- ls -l livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*
- ls -l livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*
- ls -l livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/*
- cat livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stdout
- cat livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stderr

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.livy;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
@ -92,7 +93,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
}
}
if (st == null || st.trim().length() == 0) {
if (StringUtils.isEmpty(st)) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.livy;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@ -59,25 +60,31 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
@Override
public InterpreterResult interpret(String line, InterpreterContext context) {
try {
if (line == null || line.trim().length() == 0) {
if (StringUtils.isEmpty(line)) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
// create sqlContext implicitly, as in livy 0.2 sqlContext is not available.
if (!sqlContextCreated) {
InterpreterResult result = sparkInterpreter.interpret("sqlContext", context);
if (result.code() == InterpreterResult.Code.ERROR) {
result = sparkInterpreter.interpret(
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", context);
// create sqlContext implicitly if it is not available, as in livy 0.2 sqlContext
// is not available.
synchronized (this) {
if (!sqlContextCreated) {
InterpreterResult result = sparkInterpreter.interpret("sqlContext", context);
if (result.code() == InterpreterResult.Code.ERROR) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to create sqlContext,"
+ result.message());
result = sparkInterpreter.interpret(
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", context);
if (result.code() == InterpreterResult.Code.ERROR) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Fail to create sqlContext," + result.message());
}
}
sqlContextCreated = true;
}
sqlContextCreated = true;
}
// delegate the work to LivySparkInterpreter in the same session.
// TODO(zjffdu), we may create multiple session for the same user here. This can be fixed
// after we move session creation to open()
InterpreterResult res = sparkInterpreter.interpret("sqlContext.sql(\"" +
line.replaceAll("\"", "\\\\\"")
.replaceAll("\\n", " ")
@ -143,4 +150,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
}
}
@Override
public void close() {
this.sparkInterpreter.close();
}
}

View file

@ -73,7 +73,7 @@ public class LivyInterpreterIT {
}
@Test
public void testSparkInterpreter() {
public void testSparkInterpreterRDD() {
if (!checkPreCondition()) {
return;
}
@ -85,8 +85,8 @@ public class LivyInterpreterIT {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", null, "title",
"text", authInfo, null, null, null, null, null, output);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@ -101,32 +101,7 @@ public class LivyInterpreterIT {
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertNull(result.message());
assertTrue(outputListener.getOutputAppended().contains("Double = 55.0"));
// test DataFrame api
outputListener.reset();
sparkInterpreter.interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", context);
result = sparkInterpreter.interpret("val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n"
+ "df.collect()" , context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertNull(result.message());
assertTrue(outputListener.getOutputAppended()
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
outputListener.reset();
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
result = sqlInterpreter.interpret("select * from df", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
// TODO (zjffdu), \t at the end of each line is not necessary, it is a bug of LivySparkSQLInterpreter
assertEquals("_1\t_2\t\nhello\t20\t\n", result.message().get(0).getData());
// single line comment
outputListener.reset();
String singleLineComment = "// my comment";
@ -183,8 +158,57 @@ public class LivyInterpreterIT {
sparkInterpreter.close();
}
@Test
public void testSparkInterpreterDataFrame() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();
// test DataFrame api
outputListener.reset();
sparkInterpreter.interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", context);
InterpreterResult result = sparkInterpreter.interpret("val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n"
+ "df.collect()" , context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertNull(result.message());
assertTrue(outputListener.getOutputAppended()
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
outputListener.reset();
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
result = sqlInterpreter.interpret("select * from df", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.type());
// TODO(zjffdu), \t at the end of each line is not necessary, it is a bug of LivySparkSQLInterpreter
assertEquals("_1\t_2\t\nhello\t20\t\n", result.message());
sparkInterpreter.close();
sqlInterpreter.close();
}
@Test
public void testSparkSQLInterpreter() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
@ -198,12 +222,14 @@ public class LivyInterpreterIT {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "title",
"text", authInfo, null, null, null, null, null, output);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql",
"title", "text", authInfo, null, null, null, null, null, output);
InterpreterResult result = sqlInterpreter.interpret("show tables", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertTrue(result.message().contains("tableName"));
sqlInterpreter.close();
}
@Test
@ -216,8 +242,8 @@ public class LivyInterpreterIT {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", null, "title",
"text", authInfo, null, null, null, null, null, output);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.pyspark",
"title", "text", authInfo, null, null, null, null, null, output);
pysparkInterpreter.open();
InterpreterResult result = pysparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());

View file

@ -49,8 +49,8 @@ download_with_retry() {
}
LIVY_CACHE=".livy-dist"
LIVY_ARCHIVE="v${LIVY_VERSION}"
export LIVY_HOME="${ZEPPELIN_HOME}/${LIVY_ARCHIVE}"
LIVY_ARCHIVE="livy-assembly-${LIVY_VERSION}"
export LIVY_HOME="${ZEPPELIN_HOME}/livy-server-$LIVY_VERSION"
echo "LIVY_HOME is ${LIVY_HOME}"
if [[ ! -d "${LIVY_HOME}" ]]; then
@ -64,18 +64,18 @@ if [[ ! -d "${LIVY_HOME}" ]]; then
# download livy from archive if not cached
echo "${LIVY_VERSION} being downloaded from archives"
STARTTIME=`date +%s`
download_with_retry "https://github.com/cloudera/livy/archive/${LIVY_ARCHIVE}.tgz"
download_with_retry "https://oss.sonatype.org/content/repositories/releases/com/cloudera/livy/livy-assembly/${LIVY_VERSION}/${LIVY_ARCHIVE}.zip"
ENDTIME=`date +%s`
DOWNLOADTIME="$((ENDTIME-STARTTIME))"
fi
# extract archive in un-cached root, clean-up on failure
cp "${LIVY_ARCHIVE}.tar.gz" ..
cp "${LIVY_ARCHIVE}.zip" ..
cd ..
if ! tar zxf "${LIVY_ARCHIVE}.tar.gz" ; then
echo "Unable to extract ${LIVY_ARCHIVE}.tar.gz" >&2
if ! unzip "${LIVY_ARCHIVE}.zip" ; then
echo "Unable to extract ${LIVY_ARCHIVE}.zip" >&2
rm -rf "${LIVY_ARCHIVE}"
rm -f "${LIVY_ARCHIVE}.tar.gz"
rm -f "${LIVY_ARCHIVE}.zip"
fi
fi