Test pyspark with spark cluster

This commit is contained in:
Lee moon soo 2015-06-04 15:36:52 +09:00
parent be0b7c4244
commit aa6d1fd86c
5 changed files with 119 additions and 16 deletions

View file

@ -30,7 +30,7 @@ before_script:
script:
- mvn package -Pbuild-distr -Phadoop-2.3 -B
- ./testing/startSparkCluster.sh 1.3.1 2.3
- mvn verify -Pusing-packaged-distr -Phadoop-2.3 -B
- SPARK_HOME=./spark-1.3.1-bin-hadoop2.3 mvn verify -Pusing-packaged-distr -Phadoop-2.3 -B
- ./testing/stopSparkCluster.sh 1.3.1 2.3
after_failure:

View file

@ -26,13 +26,23 @@ fi
SPARK_VERSION="${1}"
HADOOP_VERSION="${2}"
if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" ]; then
FWDIR=$(dirname "${BASH_SOURCE-$0}")
ZEPPELIN_HOME="$(cd "${FWDIR}/.."; pwd)"
export SPARK_HOME=${ZEPPELIN_HOME}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
if [ ! -d "${SPARK_HOME}" ]; then
wget -q http://www.us.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
tar zxf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
fi
# create PID dir. test case detect pid file so they can select active spark home dir for test
mkdir -p ${SPARK_HOME}/run
export SPARK_PID_DIR=${SPARK_HOME}/run
# start
export SPARK_MASTER_PORT=7071
export SPARK_MASTER_WEBUI_PORT=7072
./spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/sbin/start-master.sh
./spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/sbin/start-slave.sh 1 `hostname`:${SPARK_MASTER_PORT}
export SPARK_WORKER_WEBUI_PORT=8082
${SPARK_HOME}/sbin/start-master.sh
${SPARK_HOME}/sbin/start-slave.sh 1 `hostname`:${SPARK_MASTER_PORT}

View file

@ -25,7 +25,13 @@ fi
SPARK_VERSION="${1}"
HADOOP_VERSION="${2}"
./spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/sbin/spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1
./spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/sbin/stop-master.sh
FWDIR=$(dirname "${BASH_SOURCE-$0}")
ZEPPELIN_HOME="$(cd "${FWDIR}/.."; pwd)"
export SPARK_HOME=${ZEPPELIN_HOME}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
# set create PID dir
export SPARK_PID_DIR=${SPARK_HOME}/run
${SPARK_HOME}/sbin/spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1
${SPARK_HOME}/sbin/stop-master.sh

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.rest;
import java.io.File;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
@ -48,6 +49,7 @@ public abstract class AbstractTestRestApi {
static final String restApiUrl = "/api";
static final String url = getUrlToTest();
protected static final boolean wasRunning = checkIfServerIsRuning();
static boolean pySpark = false;
private String getUrl(String path) {
String url;
@ -108,7 +110,26 @@ public abstract class AbstractTestRestApi {
if ("true".equals(System.getenv("CI"))) {
// assume first one is spark
InterpreterSetting sparkIntpSetting = ZeppelinServer.notebook.getInterpreterFactory().get().get(0);
// set spark master
sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
pySpark = true;
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id());
} else {
// assume first one is spark
InterpreterSetting sparkIntpSetting = ZeppelinServer.notebook.getInterpreterFactory().get().get(0);
String sparkHome = getSparkHome();
if (sparkHome != null) {
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
pySpark = true;
}
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id());
}
}
@ -123,6 +144,46 @@ public abstract class AbstractTestRestApi {
}
}
private static String getSparkHome() {
String sparkHome = getSparkHomeRecursively(new File(System.getProperty("user.dir")));
System.out.println("SPARK HOME detected " + sparkHome);
return sparkHome;
}
boolean isPyspark() {
return pySpark;
}
private static String getSparkHomeRecursively(File dir) {
if (dir == null) return null;
File files [] = dir.listFiles();
if (files == null) return null;
File homeDetected = null;
for (File f : files) {
if (isActiveSparkHome(f)) {
homeDetected = f;
break;
}
}
if (homeDetected != null) {
return homeDetected.getAbsolutePath();
} else {
return getSparkHomeRecursively(dir.getParentFile());
}
}
private static boolean isActiveSparkHome(File dir) {
if (dir.getName().matches("spark-[0-9\\.]+-bin-hadoop[0-9\\.]+")) {
File pidDir = new File(dir, "run");
if (pidDir.isDirectory() && pidDir.listFiles().length > 0) {
return true;
}
}
return false;
}
protected static void shutDown() throws Exception {
if (!wasRunning) {
LOG.info("Terminating test Zeppelin...");

View file

@ -58,24 +58,50 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
}
@Test
public void getApiRoot() throws IOException {
public void basicRDDTransformationAndActionTest() throws IOException {
// create new note
Note note = ZeppelinServer.notebook.createNote();
note.addParagraph();
Paragraph p = note.getLastParagraph();
// get spark version string
p.setText("print(sc.version)");
note.run(p.getId());
waitForFinish(p);
String sparkVersion = p.getResult().message();
// run markdown paragraph, again
p = note.addParagraph();
Paragraph p = note.addParagraph();
p.setText("print(sc.parallelize(1 to 10).reduce(_ + _))");
note.run(p.getId());
waitForFinish(p);
assertEquals("55", p.getResult().message());
ZeppelinServer.notebook.removeNote(note.id());
}
@Test
public void pySparkTest() throws IOException {
// create new note
Note note = ZeppelinServer.notebook.createNote();
int sparkVersion = getSparkVersionNumber(note);
if (isPyspark() && sparkVersion >= 12) { // pyspark supported from 1.2.1
// run markdown paragraph, again
Paragraph p = note.addParagraph();
p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))");
note.run(p.getId());
waitForFinish(p);
assertEquals("55\n", p.getResult().message());
}
ZeppelinServer.notebook.removeNote(note.id());
}
/**
* Get spark version number as a numerical value.
* eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
*/
private int getSparkVersionNumber(Note note) {
Paragraph p = note.addParagraph();
p.setText("print(sc.version)");
note.run(p.getId());
waitForFinish(p);
String sparkVersion = p.getResult().message();
System.out.println("Spark version detected " + sparkVersion);
String[] split = sparkVersion.split("\\.");
int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
return version;
}
}