This commit is contained in:
Jeff Zhang 2016-09-14 15:42:39 +08:00
parent 4922de1d46
commit 40b080a7b0
4 changed files with 44 additions and 26 deletions

View file

@ -118,7 +118,7 @@ public class SparkInterpreter extends Interpreter {
private Map<String, Object> binder;
private SparkVersion sparkVersion;
private File outputDir; // class outputdir for scala 2.11
private static File outputDir; // class outputdir for scala 2.11
private Object classServer; // classserver for scala 2.11
@ -572,8 +572,11 @@ public class SparkInterpreter extends Interpreter {
sparkReplClassDir = System.getProperty("java.io.tmpdir");
}
outputDir = createTempDir(sparkReplClassDir);
synchronized (sharedInterpreterLock) {
if (outputDir == null) {
outputDir = createTempDir(sparkReplClassDir);
}
}
argList.add("-Yrepl-class-based");
argList.add("-Yrepl-outdir");
argList.add(outputDir.getAbsolutePath());
@ -1276,7 +1279,12 @@ public class SparkInterpreter extends Interpreter {
logger.info("Close interpreter");
if (numReferenceOfSparkContext.decrementAndGet() == 0) {
sc.stop();
if (sparkSession != null) {
Utils.invokeMethod(sparkSession, "stop");
} else if (sc != null){
sc.stop();
}
sparkSession = null;
sc = null;
if (classServer != null) {
Utils.invokeMethod(classServer, "stop");

View file

@ -154,7 +154,7 @@ public abstract class AbstractTestRestApi {
// set spark master and other properties
sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
pySpark = true;
@ -180,6 +180,7 @@ public abstract class AbstractTestRestApi {
sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
pySpark = true;
sparkR = true;
}
@ -199,7 +200,11 @@ public abstract class AbstractTestRestApi {
}
private static String getSparkHome() {
String sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
String sparkHome = System.getenv("SPARK_HOME");
if (sparkHome != null) {
return sparkHome;
}
sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
System.out.println("SPARK HOME detected " + sparkHome);
return sparkHome;
}

View file

@ -135,31 +135,37 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
config.put("enabled", true);
p.setConfig(config);
p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))");
// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("55\n", p.getResult().message());
LOG.info("*************************result:" + p.getResult().message());
// run sqlContext test
p.setText("%pyspark from pyspark.sql import Row\n" +
"df=sqlContext.createDataFrame([Row(name='Alice', age=20)])\n" +
"df.count()");
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("1\n", p.getResult().message());
LOG.info("*************************result:" + p.getResult().message());
if (sparkVersion >= 20) {
// run SparkSession test
if (sparkVersion >= 13) {
// run sqlContext test
p = note.addParagraph();
config = p.getConfig();
config.put("enabled", true);
p.setConfig(config);
p.setText("%pyspark from pyspark.sql import Row\n" +
"df=sqlContext.createDataFrame([Row(name='Alice', age=20)])\n" +
"df.count()");
"df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
"df.collect()");
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("1\n", p.getResult().message());
LOG.info("*************************result:" + p.getResult().message());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
}
if (sparkVersion >= 20) {
// run SparkSession test
p = note.addParagraph();
config = p.getConfig();
config.put("enabled", true);
p.setConfig(config);
p.setText("%pyspark from pyspark.sql import Row\n" +
"df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
"df.collect()");
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
}
}
ZeppelinServer.notebook.removeNote(note.getId(), null);
@ -187,7 +193,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
+ "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())");
// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
@ -278,6 +283,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
assertEquals(Status.FINISHED, p1.getStatus());
assertEquals("2\n", p1.getResult().message());
}
ZeppelinServer.notebook.removeNote(note.getId(), null);
}
/**
@ -291,7 +297,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
config.put("enabled", true);
p.setConfig(config);
p.setText("%spark print(sc.version)");
// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());

View file

@ -43,4 +43,4 @@ log4j.logger.DataNucleus.Datastore=ERROR
# Log all JDBC parameters
log4j.logger.org.hibernate.type=ALL
log4j.logger.org.apache.zeppelin.interpreter=DEBUG
log4j.logger.org.apache.zeppelin.interpreter.remote.RemoteInterpreter=DEBUG