ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created

This commit is contained in:
Jeff Zhang 2016-09-23 13:08:49 +08:00
parent c717daf655
commit 93060b6fe3
2 changed files with 27 additions and 4 deletions

View file

@ -219,14 +219,12 @@ jconf = intp.getSparkConf()
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
if sparkVersion.isSpark2():
sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
spark = SparkSession(sc, intp.getSparkSession())
sqlc = spark._wrapped
else:
sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
sqlContext = sqlc
if sparkVersion.isSpark2():
spark = SparkSession(sc, intp.getSparkSession())
completion = PySparkCompletion(intp)
z = PyZeppelinContext(intp.getZeppelinContext())

View file

@ -220,6 +220,18 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
assertEquals(InterpreterResult.Type.TABLE, p.getResult().type());
// TODO (zjffdu), one more \n is appended, need to investigate why.
assertEquals("age\tid\n20\t1\n\n", p.getResult().message());
// test udf
p = note.addParagraph();
config = p.getConfig();
config.put("enabled", true);
p.setConfig(config);
p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
"sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()");
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(len=u'3')]\n", p.getResult().message());
}
if (sparkVersion >= 20) {
// run SparkSession test
@ -234,6 +246,19 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
// test udf
p = note.addParagraph();
config = p.getConfig();
config.put("enabled", true);
p.setConfig(config);
// use SQLContext to register UDF but use this UDF through SparkSession
p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
"spark.sql(\"select f1(\\\"abc\\\")\").collect()");
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(f1(abc)=u'3')]\n", p.getResult().message());
}
}
ZeppelinServer.notebook.removeNote(note.getId(), null);