mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
update test
This commit is contained in:
parent
e99be8d451
commit
6c9795f3d9
5 changed files with 99 additions and 52 deletions
|
|
@ -94,6 +94,8 @@ before_script:
|
|||
- travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
|
||||
- if [[ -n $LIVY_VER ]]; then travis_retry ./testing/downloadLivy.sh $LIVY_VER; fi
|
||||
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
|
||||
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-server-$LIVY_VER; fi
|
||||
- if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
|
||||
- tail conf/zeppelin-env.sh
|
||||
|
||||
script:
|
||||
|
|
@ -110,3 +112,10 @@ after_failure:
|
|||
- cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.out
|
||||
- cat zeppelin-web/npm-debug.log
|
||||
- cat spark-*/logs/*
|
||||
- cat livy/target/tmp/*/output.log
|
||||
- ls -l livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*
|
||||
- ls -l livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*
|
||||
- ls -l livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*
|
||||
- ls -l livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/*
|
||||
- cat livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stdout
|
||||
- cat livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stderr
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
|
|
@ -92,7 +93,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (st == null || st.trim().length() == 0) {
|
||||
if (StringUtils.isEmpty(st)) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -59,25 +60,31 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
try {
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
if (StringUtils.isEmpty(line)) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
// create sqlContext implicitly, as in livy 0.2 sqlContext is not available.
|
||||
if (!sqlContextCreated) {
|
||||
InterpreterResult result = sparkInterpreter.interpret("sqlContext", context);
|
||||
if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
result = sparkInterpreter.interpret(
|
||||
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
|
||||
+ "import sqlContext.implicits._", context);
|
||||
// create sqlContext implicitly if it is not available, as in livy 0.2 sqlContext
|
||||
// is not available.
|
||||
synchronized (this) {
|
||||
if (!sqlContextCreated) {
|
||||
InterpreterResult result = sparkInterpreter.interpret("sqlContext", context);
|
||||
if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to create sqlContext,"
|
||||
+ result.message());
|
||||
result = sparkInterpreter.interpret(
|
||||
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
|
||||
+ "import sqlContext.implicits._", context);
|
||||
if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"Fail to create sqlContext," + result.message());
|
||||
}
|
||||
}
|
||||
sqlContextCreated = true;
|
||||
}
|
||||
sqlContextCreated = true;
|
||||
}
|
||||
|
||||
// delegate the work to LivySparkInterpreter in the same session.
|
||||
// TODO(zjffdu), we may create multiple session for the same user here. This can be fixed
|
||||
// after we move session creation to open()
|
||||
InterpreterResult res = sparkInterpreter.interpret("sqlContext.sql(\"" +
|
||||
line.replaceAll("\"", "\\\\\"")
|
||||
.replaceAll("\\n", " ")
|
||||
|
|
@ -143,4 +150,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.sparkInterpreter.close();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSparkInterpreter() {
|
||||
public void testSparkInterpreterRDD() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -85,8 +85,8 @@ public class LivyInterpreterIT {
|
|||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", null, "title",
|
||||
"text", authInfo, null, null, null, null, null, output);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
|
||||
"title", "text", authInfo, null, null, null, null, null, output);
|
||||
sparkInterpreter.open();
|
||||
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
|
@ -101,32 +101,7 @@ public class LivyInterpreterIT {
|
|||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertNull(result.message());
|
||||
assertTrue(outputListener.getOutputAppended().contains("Double = 55.0"));
|
||||
|
||||
// test DataFrame api
|
||||
outputListener.reset();
|
||||
sparkInterpreter.interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
|
||||
+ "import sqlContext.implicits._", context);
|
||||
result = sparkInterpreter.interpret("val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n"
|
||||
+ "df.collect()" , context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertNull(result.message());
|
||||
assertTrue(outputListener.getOutputAppended()
|
||||
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
|
||||
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
|
||||
|
||||
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
|
||||
outputListener.reset();
|
||||
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
|
||||
interpreterGroup.get("session_1").add(sqlInterpreter);
|
||||
sqlInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
sqlInterpreter.open();
|
||||
result = sqlInterpreter.interpret("select * from df", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
// TODO (zjffdu), \t at the end of each line is not necessary, it is a bug of LivySparkSQLInterpreter
|
||||
assertEquals("_1\t_2\t\nhello\t20\t\n", result.message().get(0).getData());
|
||||
|
||||
|
||||
// single line comment
|
||||
outputListener.reset();
|
||||
String singleLineComment = "// my comment";
|
||||
|
|
@ -183,8 +158,57 @@ public class LivyInterpreterIT {
|
|||
sparkInterpreter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSparkInterpreterDataFrame() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
}
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
|
||||
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
|
||||
sparkInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
interpreterGroup.get("session_1").add(sparkInterpreter);
|
||||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
|
||||
"title", "text", authInfo, null, null, null, null, null, output);
|
||||
sparkInterpreter.open();
|
||||
|
||||
// test DataFrame api
|
||||
outputListener.reset();
|
||||
sparkInterpreter.interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
|
||||
+ "import sqlContext.implicits._", context);
|
||||
InterpreterResult result = sparkInterpreter.interpret("val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n"
|
||||
+ "df.collect()" , context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertNull(result.message());
|
||||
assertTrue(outputListener.getOutputAppended()
|
||||
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
|
||||
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
|
||||
|
||||
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
|
||||
outputListener.reset();
|
||||
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
|
||||
interpreterGroup.get("session_1").add(sqlInterpreter);
|
||||
sqlInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
sqlInterpreter.open();
|
||||
result = sqlInterpreter.interpret("select * from df", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
// TODO(zjffdu), \t at the end of each line is not necessary, it is a bug of LivySparkSQLInterpreter
|
||||
assertEquals("_1\t_2\t\nhello\t20\t\n", result.message());
|
||||
|
||||
sparkInterpreter.close();
|
||||
sqlInterpreter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSparkSQLInterpreter() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
}
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
|
||||
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
|
||||
|
|
@ -198,12 +222,14 @@ public class LivyInterpreterIT {
|
|||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "title",
|
||||
"text", authInfo, null, null, null, null, null, output);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql",
|
||||
"title", "text", authInfo, null, null, null, null, null, output);
|
||||
InterpreterResult result = sqlInterpreter.interpret("show tables", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
assertTrue(result.message().contains("tableName"));
|
||||
|
||||
sqlInterpreter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -216,8 +242,8 @@ public class LivyInterpreterIT {
|
|||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", null, "title",
|
||||
"text", authInfo, null, null, null, null, null, output);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.pyspark",
|
||||
"title", "text", authInfo, null, null, null, null, null, output);
|
||||
pysparkInterpreter.open();
|
||||
InterpreterResult result = pysparkInterpreter.interpret("sc.version", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
|
|
|||
|
|
@ -49,8 +49,8 @@ download_with_retry() {
|
|||
}
|
||||
|
||||
LIVY_CACHE=".livy-dist"
|
||||
LIVY_ARCHIVE="v${LIVY_VERSION}"
|
||||
export LIVY_HOME="${ZEPPELIN_HOME}/${LIVY_ARCHIVE}"
|
||||
LIVY_ARCHIVE="livy-assembly-${LIVY_VERSION}"
|
||||
export LIVY_HOME="${ZEPPELIN_HOME}/livy-server-$LIVY_VERSION"
|
||||
echo "LIVY_HOME is ${LIVY_HOME}"
|
||||
|
||||
if [[ ! -d "${LIVY_HOME}" ]]; then
|
||||
|
|
@ -64,18 +64,18 @@ if [[ ! -d "${LIVY_HOME}" ]]; then
|
|||
# download livy from archive if not cached
|
||||
echo "${LIVY_VERSION} being downloaded from archives"
|
||||
STARTTIME=`date +%s`
|
||||
download_with_retry "https://github.com/cloudera/livy/archive/${LIVY_ARCHIVE}.tgz"
|
||||
download_with_retry "https://oss.sonatype.org/content/repositories/releases/com/cloudera/livy/livy-assembly/${LIVY_VERSION}/${LIVY_ARCHIVE}.zip"
|
||||
ENDTIME=`date +%s`
|
||||
DOWNLOADTIME="$((ENDTIME-STARTTIME))"
|
||||
fi
|
||||
|
||||
# extract archive in un-cached root, clean-up on failure
|
||||
cp "${LIVY_ARCHIVE}.tar.gz" ..
|
||||
cp "${LIVY_ARCHIVE}.zip" ..
|
||||
cd ..
|
||||
if ! tar zxf "${LIVY_ARCHIVE}.tar.gz" ; then
|
||||
echo "Unable to extract ${LIVY_ARCHIVE}.tar.gz" >&2
|
||||
if ! unzip "${LIVY_ARCHIVE}.zip" ; then
|
||||
echo "Unable to extract ${LIVY_ARCHIVE}.zip" >&2
|
||||
rm -rf "${LIVY_ARCHIVE}"
|
||||
rm -f "${LIVY_ARCHIVE}.tar.gz"
|
||||
rm -f "${LIVY_ARCHIVE}.zip"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue