mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
retry
This commit is contained in:
parent
4922de1d46
commit
40b080a7b0
4 changed files with 44 additions and 26 deletions
|
|
@ -118,7 +118,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
private Map<String, Object> binder;
|
||||
private SparkVersion sparkVersion;
|
||||
private File outputDir; // class outputdir for scala 2.11
|
||||
private static File outputDir; // class outputdir for scala 2.11
|
||||
private Object classServer; // classserver for scala 2.11
|
||||
|
||||
|
||||
|
|
@ -572,8 +572,11 @@ public class SparkInterpreter extends Interpreter {
|
|||
sparkReplClassDir = System.getProperty("java.io.tmpdir");
|
||||
}
|
||||
|
||||
outputDir = createTempDir(sparkReplClassDir);
|
||||
|
||||
synchronized (sharedInterpreterLock) {
|
||||
if (outputDir == null) {
|
||||
outputDir = createTempDir(sparkReplClassDir);
|
||||
}
|
||||
}
|
||||
argList.add("-Yrepl-class-based");
|
||||
argList.add("-Yrepl-outdir");
|
||||
argList.add(outputDir.getAbsolutePath());
|
||||
|
|
@ -1276,7 +1279,12 @@ public class SparkInterpreter extends Interpreter {
|
|||
logger.info("Close interpreter");
|
||||
|
||||
if (numReferenceOfSparkContext.decrementAndGet() == 0) {
|
||||
sc.stop();
|
||||
if (sparkSession != null) {
|
||||
Utils.invokeMethod(sparkSession, "stop");
|
||||
} else if (sc != null){
|
||||
sc.stop();
|
||||
}
|
||||
sparkSession = null;
|
||||
sc = null;
|
||||
if (classServer != null) {
|
||||
Utils.invokeMethod(classServer, "stop");
|
||||
|
|
|
|||
|
|
@ -154,7 +154,7 @@ public abstract class AbstractTestRestApi {
|
|||
// set spark master and other properties
|
||||
sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
|
||||
sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
|
||||
|
||||
sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
|
||||
// set spark home for pyspark
|
||||
sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
|
||||
pySpark = true;
|
||||
|
|
@ -180,6 +180,7 @@ public abstract class AbstractTestRestApi {
|
|||
sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
|
||||
// set spark home for pyspark
|
||||
sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
|
||||
sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
|
||||
pySpark = true;
|
||||
sparkR = true;
|
||||
}
|
||||
|
|
@ -199,7 +200,11 @@ public abstract class AbstractTestRestApi {
|
|||
}
|
||||
|
||||
private static String getSparkHome() {
|
||||
String sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
|
||||
String sparkHome = System.getenv("SPARK_HOME");
|
||||
if (sparkHome != null) {
|
||||
return sparkHome;
|
||||
}
|
||||
sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
|
||||
System.out.println("SPARK HOME detected " + sparkHome);
|
||||
return sparkHome;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -135,31 +135,37 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
config.put("enabled", true);
|
||||
p.setConfig(config);
|
||||
p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))");
|
||||
// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
assertEquals("55\n", p.getResult().message());
|
||||
LOG.info("*************************result:" + p.getResult().message());
|
||||
// run sqlContext test
|
||||
p.setText("%pyspark from pyspark.sql import Row\n" +
|
||||
"df=sqlContext.createDataFrame([Row(name='Alice', age=20)])\n" +
|
||||
"df.count()");
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
assertEquals("1\n", p.getResult().message());
|
||||
LOG.info("*************************result:" + p.getResult().message());
|
||||
if (sparkVersion >= 20) {
|
||||
// run SparkSession test
|
||||
if (sparkVersion >= 13) {
|
||||
// run sqlContext test
|
||||
p = note.addParagraph();
|
||||
config = p.getConfig();
|
||||
config.put("enabled", true);
|
||||
p.setConfig(config);
|
||||
p.setText("%pyspark from pyspark.sql import Row\n" +
|
||||
"df=sqlContext.createDataFrame([Row(name='Alice', age=20)])\n" +
|
||||
"df.count()");
|
||||
"df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
|
||||
"df.collect()");
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
assertEquals("1\n", p.getResult().message());
|
||||
LOG.info("*************************result:" + p.getResult().message());
|
||||
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
|
||||
}
|
||||
if (sparkVersion >= 20) {
|
||||
// run SparkSession test
|
||||
p = note.addParagraph();
|
||||
config = p.getConfig();
|
||||
config.put("enabled", true);
|
||||
p.setConfig(config);
|
||||
p.setText("%pyspark from pyspark.sql import Row\n" +
|
||||
"df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
|
||||
"df.collect()");
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
|
||||
}
|
||||
}
|
||||
ZeppelinServer.notebook.removeNote(note.getId(), null);
|
||||
|
|
@ -187,7 +193,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
|
||||
p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
|
||||
+ "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())");
|
||||
// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
|
|
@ -278,6 +283,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
assertEquals(Status.FINISHED, p1.getStatus());
|
||||
assertEquals("2\n", p1.getResult().message());
|
||||
}
|
||||
ZeppelinServer.notebook.removeNote(note.getId(), null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -291,7 +297,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
config.put("enabled", true);
|
||||
p.setConfig(config);
|
||||
p.setText("%spark print(sc.version)");
|
||||
// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
|
|
|
|||
|
|
@ -43,4 +43,4 @@ log4j.logger.DataNucleus.Datastore=ERROR
|
|||
# Log all JDBC parameters
|
||||
log4j.logger.org.hibernate.type=ALL
|
||||
|
||||
log4j.logger.org.apache.zeppelin.interpreter=DEBUG
|
||||
log4j.logger.org.apache.zeppelin.interpreter.remote.RemoteInterpreter=DEBUG
|
||||
|
|
|
|||
Loading…
Reference in a new issue