Add test against spark cluster

This commit is contained in:
Lee moon soo 2015-06-03 09:50:15 +09:00
parent 654d76148a
commit 80698e9d8d
7 changed files with 138 additions and 21 deletions

View file

@ -21,20 +21,24 @@ before_install:
- "export DISPLAY=:99.0"
- "sh -e /etc/init.d/xvfb start"
install:
- mvn package -DskipTests -B
before_script:
- mvn package -Pbuild-distr -B
- ./testing/startSparkCluster.sh
- ./testing/startSparkCluster.sh 1.1.1
script:
- mvn verify -Pusing-packaged-distr -B
after_failure:
- cat target/rat.txt
- cat zeppelin-server/target/rat.txt
- cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.log
- cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.out
after_script:
- ./testing/stopSparkCluster.sh
- ./testing/stopSparkCluster.sh 1.1.1
notifications:
slack:

View file

@ -416,6 +416,7 @@
<exclude>STYLE.md</exclude>
<exclude>Roadmap.md</exclude>
<exclude>conf/interpreter.json</exclude>
<exclude>spark-*-bin*/**</exclude>
</excludes>
</configuration>

View file

@ -16,11 +16,22 @@
# limitations under the License.
#
wget http://apache.mesi.com.ar/spark/spark-1.1.1/spark-1.1.1-bin-hadoop2.3.tgz
tar zxvf spark-1.1.1-bin-hadoop2.3.tgz
cd spark-1.1.1-bin-hadoop2.3
if [ $# -ne 1 ]; then
echo "usage) $0 [spark version]"
echo " eg) $0 1.3.1"
exit 1
fi
SPARK_VERSION="${1}"
if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop2.3" ]; then
wget -q http://www.us.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop2.3.tgz
tar zxf spark-${SPARK_VERSION}-bin-hadoop2.3.tgz
fi
# start
export SPARK_MASTER_PORT=7071
export SPARK_MASTER_WEBUI_PORT=7072
./sbin/start-master.sh
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7071 &> worker.log &
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7071 &> worker2.log &
./spark-${SPARK_VERSION}-bin-hadoop2.3/sbin/start-master.sh
./spark-${SPARK_VERSION}-bin-hadoop2.3/sbin/start-slave.sh 1 `hostname`:${SPARK_MASTER_PORT}

View file

@ -16,8 +16,15 @@
# limitations under the License.
#
cd spark-1.1.1-bin-hadoop2.3
./sbin/stop-master.sh
kill $(ps -ef | grep 'org.apache.spark.deploy.worker.Worker' | awk '{print $2}')
cd ..
rm -rf spark-1.1.1-bin-hadoop2.3*
if [ $# -ne 1 ]; then
echo "usage) $0 [spark version]"
echo " eg) $0 1.3.1"
exit 1
fi
SPARK_VERSION="${1}"
./spark-${SPARK_VERSION}-bin-hadoop2.3/sbin/spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1
./spark-${SPARK_VERSION}-bin-hadoop2.3/sbin/stop-master.sh

View file

@ -19,15 +19,17 @@ package org.apache.zeppelin.rest;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.server.ZeppelinServer;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
@ -98,19 +100,46 @@ public abstract class AbstractTestRestApi {
throw new RuntimeException("Can not start Zeppelin server");
}
LOG.info("Test Zeppelin stared.");
// ci environment runs spark cluster for testing
// so configure zeppelin use spark cluster
if ("true".equals(System.getenv("CI"))) {
// assume first one is spark
InterpreterSetting sparkIntpSetting = ZeppelinServer.notebook.getInterpreterFactory().get().get(0);
sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id());
}
}
}
protected static void shutDown() {
private static String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
e.printStackTrace();
return "localhost";
}
}
protected static void shutDown() throws Exception {
if (!wasRunning) {
LOG.info("Terminating test Zeppelin...");
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
long s = System.currentTimeMillis();
boolean started = true;
while (System.currentTimeMillis() - s < 1000 * 60 * 3) { // 3 minutes
Thread.sleep(2000);
started = checkIfServerIsRuning();
if (started == false) {
break;
}
}
if (started == true) {
throw new RuntimeException("Can not stop Zeppelin server");
}
LOG.info("Test Zeppelin terminated.");
}
}

View file

@ -57,7 +57,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
}
@AfterClass
public static void destroy() {
public static void destroy() throws Exception {
AbstractTestRestApi.shutDown();
}

View file

@ -0,0 +1,65 @@
package org.apache.zeppelin.rest;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.gson.Gson;
/**
* Test against spark cluster.
* Spark cluster is started by CI server using testing/startSparkCluster.sh
*/
public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
Gson gson = new Gson();
@BeforeClass
public static void init() throws Exception {
AbstractTestRestApi.startUp();
}
@AfterClass
public static void destroy() throws Exception {
AbstractTestRestApi.shutDown();
}
private void waitForFinish(Paragraph p) {
while (p.getStatus() != Status.FINISHED) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Test
public void getApiRoot() throws IOException {
// create new note
Note note = ZeppelinServer.notebook.createNote();
note.addParagraph();
Paragraph p = note.getLastParagraph();
// get spark version string
p.setText("print(sc.version)");
note.run(p.getId());
waitForFinish(p);
String sparkVersion = p.getResult().message();
// run markdown paragraph, again
p = note.addParagraph();
p.setText("print(sc.parallelize(1 to 10).reduce(_ + _))");
note.run(p.getId());
waitForFinish(p);
assertEquals("55", p.getResult().message());
ZeppelinServer.notebook.removeNote(note.id());
}
}