mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created
This commit is contained in:
parent
c717daf655
commit
93060b6fe3
2 changed files with 27 additions and 4 deletions
|
|
@ -219,14 +219,12 @@ jconf = intp.getSparkConf()
|
|||
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
|
||||
sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
|
||||
if sparkVersion.isSpark2():
|
||||
sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
|
||||
spark = SparkSession(sc, intp.getSparkSession())
|
||||
sqlc = spark._wrapped
|
||||
else:
|
||||
sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
|
||||
sqlContext = sqlc
|
||||
|
||||
if sparkVersion.isSpark2():
|
||||
spark = SparkSession(sc, intp.getSparkSession())
|
||||
|
||||
completion = PySparkCompletion(intp)
|
||||
z = PyZeppelinContext(intp.getZeppelinContext())
|
||||
|
||||
|
|
|
|||
|
|
@ -220,6 +220,18 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
assertEquals(InterpreterResult.Type.TABLE, p.getResult().type());
|
||||
// TODO (zjffdu), one more \n is appended, need to investigate why.
|
||||
assertEquals("age\tid\n20\t1\n\n", p.getResult().message());
|
||||
|
||||
// test udf
|
||||
p = note.addParagraph();
|
||||
config = p.getConfig();
|
||||
config.put("enabled", true);
|
||||
p.setConfig(config);
|
||||
p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
|
||||
"sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()");
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
assertEquals("[Row(len=u'3')]\n", p.getResult().message());
|
||||
}
|
||||
if (sparkVersion >= 20) {
|
||||
// run SparkSession test
|
||||
|
|
@ -234,6 +246,19 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
|
||||
|
||||
// test udf
|
||||
p = note.addParagraph();
|
||||
config = p.getConfig();
|
||||
config.put("enabled", true);
|
||||
p.setConfig(config);
|
||||
// use SQLContext to register UDF but use this UDF through SparkSession
|
||||
p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
|
||||
"spark.sql(\"select f1(\\\"abc\\\")\").collect()");
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
assertEquals("[Row(f1(abc)=u'3')]\n", p.getResult().message());
|
||||
}
|
||||
}
|
||||
ZeppelinServer.notebook.removeNote(note.getId(), null);
|
||||
|
|
|
|||
Loading…
Reference in a new issue