mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Test pyspark with spark cluster
This commit is contained in:
parent
be0b7c4244
commit
aa6d1fd86c
5 changed files with 119 additions and 16 deletions
|
|
@ -30,7 +30,7 @@ before_script:
|
|||
script:
|
||||
- mvn package -Pbuild-distr -Phadoop-2.3 -B
|
||||
- ./testing/startSparkCluster.sh 1.3.1 2.3
|
||||
- mvn verify -Pusing-packaged-distr -Phadoop-2.3 -B
|
||||
- SPARK_HOME=./spark-1.3.1-bin-hadoop2.3 mvn verify -Pusing-packaged-distr -Phadoop-2.3 -B
|
||||
- ./testing/stopSparkCluster.sh 1.3.1 2.3
|
||||
|
||||
after_failure:
|
||||
|
|
|
|||
|
|
@ -26,13 +26,23 @@ fi
|
|||
SPARK_VERSION="${1}"
|
||||
HADOOP_VERSION="${2}"
|
||||
|
||||
if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" ]; then
|
||||
FWDIR=$(dirname "${BASH_SOURCE-$0}")
|
||||
ZEPPELIN_HOME="$(cd "${FWDIR}/.."; pwd)"
|
||||
export SPARK_HOME=${ZEPPELIN_HOME}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
|
||||
|
||||
if [ ! -d "${SPARK_HOME}" ]; then
|
||||
wget -q http://www.us.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
|
||||
tar zxf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
|
||||
fi
|
||||
|
||||
# create PID dir. test case detect pid file so they can select active spark home dir for test
|
||||
mkdir -p ${SPARK_HOME}/run
|
||||
export SPARK_PID_DIR=${SPARK_HOME}/run
|
||||
|
||||
|
||||
# start
|
||||
export SPARK_MASTER_PORT=7071
|
||||
export SPARK_MASTER_WEBUI_PORT=7072
|
||||
./spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/sbin/start-master.sh
|
||||
./spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/sbin/start-slave.sh 1 `hostname`:${SPARK_MASTER_PORT}
|
||||
export SPARK_WORKER_WEBUI_PORT=8082
|
||||
${SPARK_HOME}/sbin/start-master.sh
|
||||
${SPARK_HOME}/sbin/start-slave.sh 1 `hostname`:${SPARK_MASTER_PORT}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,13 @@ fi
|
|||
SPARK_VERSION="${1}"
|
||||
HADOOP_VERSION="${2}"
|
||||
|
||||
./spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/sbin/spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1
|
||||
./spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/sbin/stop-master.sh
|
||||
FWDIR=$(dirname "${BASH_SOURCE-$0}")
|
||||
ZEPPELIN_HOME="$(cd "${FWDIR}/.."; pwd)"
|
||||
export SPARK_HOME=${ZEPPELIN_HOME}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
|
||||
|
||||
# set create PID dir
|
||||
export SPARK_PID_DIR=${SPARK_HOME}/run
|
||||
|
||||
|
||||
${SPARK_HOME}/sbin/spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1
|
||||
${SPARK_HOME}/sbin/stop-master.sh
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.rest;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.net.InetAddress;
|
||||
|
|
@ -48,6 +49,7 @@ public abstract class AbstractTestRestApi {
|
|||
static final String restApiUrl = "/api";
|
||||
static final String url = getUrlToTest();
|
||||
protected static final boolean wasRunning = checkIfServerIsRuning();
|
||||
static boolean pySpark = false;
|
||||
|
||||
private String getUrl(String path) {
|
||||
String url;
|
||||
|
|
@ -108,7 +110,26 @@ public abstract class AbstractTestRestApi {
|
|||
if ("true".equals(System.getenv("CI"))) {
|
||||
// assume first one is spark
|
||||
InterpreterSetting sparkIntpSetting = ZeppelinServer.notebook.getInterpreterFactory().get().get(0);
|
||||
|
||||
// set spark master
|
||||
sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
|
||||
|
||||
// set spark home for pyspark
|
||||
sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
|
||||
pySpark = true;
|
||||
|
||||
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id());
|
||||
} else {
|
||||
// assume first one is spark
|
||||
InterpreterSetting sparkIntpSetting = ZeppelinServer.notebook.getInterpreterFactory().get().get(0);
|
||||
|
||||
String sparkHome = getSparkHome();
|
||||
if (sparkHome != null) {
|
||||
// set spark home for pyspark
|
||||
sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
|
||||
pySpark = true;
|
||||
}
|
||||
|
||||
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id());
|
||||
}
|
||||
}
|
||||
|
|
@ -123,6 +144,46 @@ public abstract class AbstractTestRestApi {
|
|||
}
|
||||
}
|
||||
|
||||
private static String getSparkHome() {
|
||||
String sparkHome = getSparkHomeRecursively(new File(System.getProperty("user.dir")));
|
||||
System.out.println("SPARK HOME detected " + sparkHome);
|
||||
return sparkHome;
|
||||
}
|
||||
|
||||
boolean isPyspark() {
|
||||
return pySpark;
|
||||
}
|
||||
|
||||
private static String getSparkHomeRecursively(File dir) {
|
||||
if (dir == null) return null;
|
||||
File files [] = dir.listFiles();
|
||||
if (files == null) return null;
|
||||
|
||||
File homeDetected = null;
|
||||
for (File f : files) {
|
||||
if (isActiveSparkHome(f)) {
|
||||
homeDetected = f;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (homeDetected != null) {
|
||||
return homeDetected.getAbsolutePath();
|
||||
} else {
|
||||
return getSparkHomeRecursively(dir.getParentFile());
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isActiveSparkHome(File dir) {
|
||||
if (dir.getName().matches("spark-[0-9\\.]+-bin-hadoop[0-9\\.]+")) {
|
||||
File pidDir = new File(dir, "run");
|
||||
if (pidDir.isDirectory() && pidDir.listFiles().length > 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected static void shutDown() throws Exception {
|
||||
if (!wasRunning) {
|
||||
LOG.info("Terminating test Zeppelin...");
|
||||
|
|
|
|||
|
|
@ -58,24 +58,50 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void getApiRoot() throws IOException {
|
||||
public void basicRDDTransformationAndActionTest() throws IOException {
|
||||
// create new note
|
||||
Note note = ZeppelinServer.notebook.createNote();
|
||||
note.addParagraph();
|
||||
Paragraph p = note.getLastParagraph();
|
||||
|
||||
// get spark version string
|
||||
p.setText("print(sc.version)");
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
String sparkVersion = p.getResult().message();
|
||||
|
||||
// run markdown paragraph, again
|
||||
p = note.addParagraph();
|
||||
Paragraph p = note.addParagraph();
|
||||
p.setText("print(sc.parallelize(1 to 10).reduce(_ + _))");
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals("55", p.getResult().message());
|
||||
ZeppelinServer.notebook.removeNote(note.id());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pySparkTest() throws IOException {
|
||||
// create new note
|
||||
Note note = ZeppelinServer.notebook.createNote();
|
||||
|
||||
int sparkVersion = getSparkVersionNumber(note);
|
||||
|
||||
if (isPyspark() && sparkVersion >= 12) { // pyspark supported from 1.2.1
|
||||
// run markdown paragraph, again
|
||||
Paragraph p = note.addParagraph();
|
||||
p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))");
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals("55\n", p.getResult().message());
|
||||
}
|
||||
ZeppelinServer.notebook.removeNote(note.id());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get spark version number as a numerical value.
|
||||
* eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
|
||||
*/
|
||||
private int getSparkVersionNumber(Note note) {
|
||||
Paragraph p = note.addParagraph();
|
||||
p.setText("print(sc.version)");
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
String sparkVersion = p.getResult().message();
|
||||
System.out.println("Spark version detected " + sparkVersion);
|
||||
String[] split = sparkVersion.split("\\.");
|
||||
int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
|
||||
return version;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue