mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge remote rscala-z-rs
This commit is contained in:
commit
eb6c40ca64
18 changed files with 725 additions and 516 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
|
@ -54,10 +54,6 @@ zeppelin-web/bower_components
|
|||
**/testing/
|
||||
!/testing/
|
||||
|
||||
**/lib/rscala
|
||||
|
||||
**/lib/rscala
|
||||
|
||||
# OS generated files #
|
||||
######################
|
||||
.DS_Store
|
||||
|
|
|
|||
20
.travis.yml
20
.travis.yml
|
|
@ -15,19 +15,26 @@
|
|||
|
||||
language: java
|
||||
|
||||
addons:
|
||||
apt:
|
||||
sources:
|
||||
r-packages-precise
|
||||
packages:
|
||||
r-base
|
||||
|
||||
matrix:
|
||||
include:
|
||||
# Test all modules
|
||||
- jdk: "oraclejdk7"
|
||||
env: SPARK_VER="1.6.0" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark -Pscalding" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
|
||||
env: SPARK_VER="1.6.0" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
|
||||
|
||||
# Test spark module for 1.5.2
|
||||
- jdk: "oraclejdk7"
|
||||
env: SPARK_VER="1.5.2" HADOOP_VER="2.3" PROFILE="-Pspark-1.5 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false"
|
||||
env: SPARK_VER="1.5.2" HADOOP_VER="2.3" PROFILE="-Pspark-1.5 -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false"
|
||||
|
||||
# Test spark module for 1.4.1
|
||||
- jdk: "oraclejdk7"
|
||||
env: SPARK_VER="1.4.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.4 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false"
|
||||
env: SPARK_VER="1.4.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.4 -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false"
|
||||
|
||||
# Test spark module for 1.3.1
|
||||
- jdk: "oraclejdk7"
|
||||
|
|
@ -49,13 +56,18 @@ before_install:
|
|||
- "export DISPLAY=:99.0"
|
||||
- "sh -e /etc/init.d/xvfb start"
|
||||
|
||||
# install R packages
|
||||
- mkdir -p ~/Rlib
|
||||
- echo 'R_LIBS=~/Rlib' > ~/.Renviron
|
||||
- Rscript -e "install.packages('knitr', repos = 'http://cran.us.r-project.org')"
|
||||
|
||||
install:
|
||||
- mvn $BUILD_FLAG $PROFILE -B
|
||||
|
||||
before_script:
|
||||
- travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
|
||||
- ./testing/startSparkCluster.sh $SPARK_VER $HADOOP_VER
|
||||
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
|
||||
- echo -e "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER\nexport ZEPPELIN_R_CMD=Rscript" > conf/zeppelin-env.sh
|
||||
|
||||
script:
|
||||
- mvn $TEST_FLAG $PROFILE -B $TEST_PROJECTS
|
||||
|
|
|
|||
|
|
@ -129,7 +129,6 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
|
|||
|
||||
export SPARK_CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
|
||||
fi
|
||||
CLASSPATH+=":${ZEPPELIN_CLASSPATH}":"${ZEPPELIN_HOME}/interpreter/spark/R/rscala_2.10-1.0.6.jar"
|
||||
fi
|
||||
|
||||
addJarInDir "${LOCAL_INTERPRETER_REPO}"
|
||||
|
|
|
|||
|
|
@ -21,21 +21,11 @@ Validate your installation with a simple R command:
|
|||
R -e "print(1+1)"
|
||||
```
|
||||
|
||||
Then install the required R libraries;
|
||||
To enjoy plots, install additional libraries with:
|
||||
|
||||
```
|
||||
+ devtools with `R -e "install.packages('devtools', repos = 'http://cran.us.r-project.org')"`
|
||||
+ knitr with `R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org')"`
|
||||
+ rscala: You need version 1.0.6 of RScala, so the commands will be [1]
|
||||
|
||||
```
|
||||
[1]
|
||||
curl https://cran.r-project.org/src/contrib/Archive/rscala/rscala_1.0.6.tar.gz -o /tmp/rscala_1.0.6.tar.gz
|
||||
R CMD INSTALL /tmp/rscala_1.0.6.tar.gz
|
||||
```
|
||||
|
||||
To enjoy plots, you need:
|
||||
|
||||
```
|
||||
+ ggplot2 with `R -e "install.packages('ggplot2', repos = 'http://cran.us.r-project.org')"`
|
||||
+ Other vizualisation librairies: `R -e "install.packages(c('devtools','mplot', 'googleVis'), repos = 'http://cran.us.r-project.org'); require(devtools); install_github('ramnathv/rCharts')"`
|
||||
```
|
||||
|
|
|
|||
20
pom.xml
20
pom.xml
|
|
@ -116,7 +116,6 @@
|
|||
<libthrift.version>0.9.2</libthrift.version>
|
||||
<gson.version>2.2</gson.version>
|
||||
<guava.version>15.0</guava.version>
|
||||
<rscala.version>1.0.6</rscala.version>
|
||||
|
||||
<PermGen>64m</PermGen>
|
||||
<MaxPermGen>512m</MaxPermGen>
|
||||
|
|
@ -230,25 +229,6 @@
|
|||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>com.googlecode.maven-download-plugin</groupId>
|
||||
<artifactId>download-maven-plugin</artifactId>
|
||||
<version>1.2.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>download-rscala-lib</id>
|
||||
<phase>validate</phase>
|
||||
<goals>
|
||||
<goal>wget</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<url>https://cran.r-project.org/src/contrib/Archive/rscala/rscala_${rscala.version}.tar.gz</url>
|
||||
<unpack>true</unpack>
|
||||
<outputDirectory>${user.dir}/spark/lib</outputDirectory>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@
|
|||
<akka.version>2.3.4-spark</akka.version>
|
||||
|
||||
<spark.download.url>http://archive.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz</spark.download.url>
|
||||
<spark.bin.download.url>http://archive.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}-bin-without-hadoop.tgz</spark.bin.download.url>
|
||||
<py4j.version>0.8.2.1</py4j.version>
|
||||
</properties>
|
||||
|
||||
|
|
@ -759,10 +760,10 @@
|
|||
<configuration>
|
||||
<filesets>
|
||||
<fileset>
|
||||
<directory>${basedir}/../python/build</directory>
|
||||
<directory>${project.build.directory}/spark-dist</directory>
|
||||
</fileset>
|
||||
<fileset>
|
||||
<directory>${project.build.directory}/spark-dist</directory>
|
||||
<directory>${basedir}/../python/build</directory>
|
||||
</fileset>
|
||||
</filesets>
|
||||
</configuration>
|
||||
|
|
@ -794,6 +795,65 @@
|
|||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>sparkr</id>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>com.googlecode.maven-download-plugin</groupId>
|
||||
<artifactId>download-maven-plugin</artifactId>
|
||||
<version>1.2.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>download-sparkr-files</id>
|
||||
<phase>validate</phase>
|
||||
<goals>
|
||||
<goal>wget</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<url>${spark.bin.download.url}</url>
|
||||
<unpack>true</unpack>
|
||||
<outputDirectory>${project.build.directory}/spark-bin-dist</outputDirectory>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-clean-plugin</artifactId>
|
||||
<configuration>
|
||||
<filesets>
|
||||
<fileset>
|
||||
<directory>${project.build.directory}/spark-bin-dist</directory>
|
||||
</fileset>
|
||||
</filesets>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-resources-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-sparkr-files</id>
|
||||
<phase>generate-resources</phase>
|
||||
<goals>
|
||||
<goal>copy-resources</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/spark/R/lib</outputDirectory>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>${project.build.directory}/spark-bin-dist/spark-${spark.version}-bin-without-hadoop/R/lib</directory>
|
||||
</resource>
|
||||
</resources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
|
||||
</profiles>
|
||||
|
||||
<build>
|
||||
|
|
@ -924,28 +984,6 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-rscala</id>
|
||||
<phase>generate-resources</phase>
|
||||
<goals>
|
||||
<goal>run</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<target>
|
||||
<copy todir="${user.dir}/interpreter/spark/R"
|
||||
file="${user.dir}/spark/lib/rscala/inst/java/rscala_${scala.binary.version}-${rscala.version}.jar"/>
|
||||
</target>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -1,3 +0,0 @@
|
|||
This folder contains 3rd party jars.
|
||||
|
||||
DO NOT COMMIT THESE JARS IN THE SRC REPO.
|
||||
|
|
@ -239,14 +239,6 @@
|
|||
<version>${jsoup.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.ddahl</groupId>
|
||||
<artifactId>rscala</artifactId>
|
||||
<version>${rscala.version}</version>
|
||||
<scope>system</scope>
|
||||
<systemPath>${project.basedir}/lib/rscala/inst/java/rscala_${scala.binary.version}-${rscala.version}.jar</systemPath>
|
||||
</dependency>
|
||||
|
||||
<!--TEST-->
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
|
|
@ -321,7 +313,6 @@
|
|||
<exclude>**/metastore_db/</exclude>
|
||||
<exclude>**/README.md</exclude>
|
||||
<exclude>**/dependency-reduced-pom.xml</exclude>
|
||||
<exclude>**/lib/rscala/**</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
@ -441,84 +432,17 @@
|
|||
</build>
|
||||
|
||||
<profiles>
|
||||
|
||||
<!-- to deactivate 'exclude-sparkr' automatically when 'spark' is activated -->
|
||||
<profile>
|
||||
<id>spark-1.1</id>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>**/SparkRInterpreter.java</exclude>
|
||||
</excludes>
|
||||
<testExcludes>
|
||||
<testExclude>**/SparkRInterpreterTest.java</testExclude>
|
||||
</testExcludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.scala-tools</groupId>
|
||||
<artifactId>maven-scala-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>**/ZeppelinR.scala</exclude>
|
||||
<exclude>**/SparkRBackend.scala</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>**/SparkRInterpreterTest.java</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<id>sparkr</id>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>spark-1.2</id>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>**/SparkRInterpreter.java</exclude>
|
||||
</excludes>
|
||||
<testExcludes>
|
||||
<testExclude>**/SparkRInterpreterTest.java</testExclude>
|
||||
</testExcludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.scala-tools</groupId>
|
||||
<artifactId>maven-scala-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>**/ZeppelinR.scala</exclude>
|
||||
<exclude>**/SparkRBackend.scala</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>**/SparkRInterpreterTest.java</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>spark-1.3</id>
|
||||
<id>exclude-sparkr</id>
|
||||
<activation>
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
|
|
@ -530,6 +454,7 @@
|
|||
</excludes>
|
||||
<testExcludes>
|
||||
<testExclude>**/SparkRInterpreterTest.java</testExclude>
|
||||
<testExclude>**/ZeppelinRTest.java</testExclude>
|
||||
</testExcludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
|||
|
|
@ -21,16 +21,14 @@ import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
|
|||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.spark.SparkRBackend;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.jsoup.Jsoup;
|
||||
import org.jsoup.nodes.Document;
|
||||
import org.jsoup.nodes.Element;
|
||||
import org.jsoup.select.Elements;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
|
@ -42,6 +40,7 @@ public class SparkRInterpreter extends Interpreter {
|
|||
private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class);
|
||||
|
||||
private static String renderOptions;
|
||||
private ZeppelinR zeppelinR;
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
|
|
@ -49,38 +48,72 @@ public class SparkRInterpreter extends Interpreter {
|
|||
"spark",
|
||||
SparkRInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add("spark.master",
|
||||
SparkInterpreter.getSystemDefault("MASTER", "spark.master", "local[*]"),
|
||||
"Spark master uri. ex) spark://masterhost:7077")
|
||||
.add("spark.home",
|
||||
SparkInterpreter.getSystemDefault("SPARK_HOME", "spark.home", "/opt/spark"),
|
||||
"Spark distribution location")
|
||||
.add("zeppelin.R.image.width",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH",
|
||||
"zeppelin.R.image.width", "100%"),
|
||||
"")
|
||||
.add("zeppelin.R.render.options",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS",
|
||||
"zeppelin.R.render.options",
|
||||
"out.format = 'html', comment = NA, "
|
||||
+ "echo = FALSE, results = 'asis', message = F, warning = F"),
|
||||
"")
|
||||
.build());
|
||||
.add("zeppelin.R.cmd",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_CMD", "zeppelin.R.cmd", "R"),
|
||||
"R repl path")
|
||||
.add("zeppelin.R.knitr",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_KNITR", "zeppelin.R.knitr", "true"),
|
||||
"whether use knitr or not")
|
||||
.add("zeppelin.R.image.width",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH",
|
||||
"zeppelin.R.image.width", "100%"),
|
||||
"")
|
||||
.add("zeppelin.R.render.options",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS",
|
||||
"zeppelin.R.render.options",
|
||||
"out.format = 'html', comment = NA, "
|
||||
+ "echo = FALSE, results = 'asis', message = F, warning = F"),
|
||||
"")
|
||||
.build());
|
||||
}
|
||||
|
||||
|
||||
public SparkRInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
zeppelinR().open(getProperty("spark.master"),
|
||||
"/opt/spark", getSparkInterpreter());
|
||||
String rCmdPath = getProperty("zeppelin.R.cmd");
|
||||
String sparkRLibPath;
|
||||
|
||||
if (System.getenv("SPARK_HOME") != null) {
|
||||
sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib";
|
||||
} else {
|
||||
sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R/lib";
|
||||
// workaround to make sparkr work without SPARK_HOME
|
||||
System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/spark");
|
||||
}
|
||||
|
||||
synchronized (SparkRBackend.backend()) {
|
||||
if (!SparkRBackend.isStarted()) {
|
||||
SparkRBackend.init();
|
||||
SparkRBackend.start();
|
||||
}
|
||||
}
|
||||
|
||||
int port = SparkRBackend.port();
|
||||
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext());
|
||||
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
|
||||
ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext());
|
||||
|
||||
zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port);
|
||||
try {
|
||||
zeppelinR.open();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
if (useKnitr()) {
|
||||
zeppelinR.eval("library('knitr')");
|
||||
}
|
||||
renderOptions = getProperty("zeppelin.R.render.options");
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String lines, InterpreterContext contextInterpreter) {
|
||||
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {
|
||||
|
||||
String imageWidth = getProperty("zeppelin.R.image.width");
|
||||
|
||||
|
|
@ -102,20 +135,26 @@ public class SparkRInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
try {
|
||||
// render output with knitr
|
||||
if (useKnitr()) {
|
||||
zeppelinR.setInterpreterOutput(null);
|
||||
zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
|
||||
zeppelinR.eval(".zres <- knit2html(text=.zcmd)");
|
||||
String html = zeppelinR.getS0(".zres");
|
||||
|
||||
zeppelinR().set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
|
||||
zeppelinR().eval(".zres <- knit2html(text=.zcmd)");
|
||||
String html = zeppelinR().getS0(".zres");
|
||||
|
||||
RDisplay rDisplay = render(html, imageWidth);
|
||||
|
||||
return new InterpreterResult(
|
||||
rDisplay.code(),
|
||||
rDisplay.type(),
|
||||
rDisplay.content()
|
||||
);
|
||||
|
||||
RDisplay rDisplay = render(html, imageWidth);
|
||||
|
||||
return new InterpreterResult(
|
||||
rDisplay.code(),
|
||||
rDisplay.type(),
|
||||
rDisplay.content()
|
||||
);
|
||||
} else {
|
||||
// alternatively, stream the output (without knitr)
|
||||
zeppelinR.setInterpreterOutput(interpreterContext.out);
|
||||
zeppelinR.eval(lines);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Exception while connecting to R", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
|
|
@ -129,7 +168,7 @@ public class SparkRInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
zeppelinR().close();
|
||||
zeppelinR.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -175,43 +214,11 @@ public class SparkRInterpreter extends Interpreter {
|
|||
return spark;
|
||||
}
|
||||
|
||||
protected static ZeppelinRFactory zeppelinR() {
|
||||
return ZeppelinRFactory.instance();
|
||||
}
|
||||
|
||||
/**
|
||||
* Java Factory to support tests with Mockito.
|
||||
*
|
||||
* (Mockito can not mock the zeppelinR final scala object class).
|
||||
*/
|
||||
protected static class ZeppelinRFactory {
|
||||
private static ZeppelinRFactory instance;
|
||||
private static ZeppelinR zeppelinR;
|
||||
private ZeppelinRFactory() {
|
||||
// Singleton
|
||||
}
|
||||
|
||||
protected static synchronized ZeppelinRFactory instance() {
|
||||
if (instance == null) instance = new ZeppelinRFactory();
|
||||
return instance;
|
||||
}
|
||||
protected void open(String master, String sparkHome, SparkInterpreter sparkInterpreter) {
|
||||
zeppelinR.open(master, sparkHome, sparkInterpreter);
|
||||
}
|
||||
protected Object eval(String command) {
|
||||
return zeppelinR.eval(command);
|
||||
}
|
||||
protected void set(String key, Object value) {
|
||||
zeppelinR.set(key, value);
|
||||
}
|
||||
protected Object get(String key) {
|
||||
return zeppelinR.get(key);
|
||||
}
|
||||
protected String getS0(String key) {
|
||||
return zeppelinR.getS0(key);
|
||||
}
|
||||
protected void close() {
|
||||
zeppelinR.close();
|
||||
private boolean useKnitr() {
|
||||
try {
|
||||
return Boolean.parseBoolean(getProperty("zeppelin.R.knitr"));
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
404
spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
Normal file
404
spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
Normal file
|
|
@ -0,0 +1,404 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.spark;
|
||||
|
||||
import org.apache.commons.exec.*;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
import parquet.org.slf4j.Logger;
|
||||
import parquet.org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* R repl interaction
|
||||
*/
|
||||
public class ZeppelinR implements ExecuteResultHandler {
|
||||
Logger logger = LoggerFactory.getLogger(ZeppelinR.class);
|
||||
private final String rCmdPath;
|
||||
private DefaultExecutor executor;
|
||||
private SparkOutputStream outputStream;
|
||||
private PipedOutputStream input;
|
||||
private final String scriptPath;
|
||||
private final String libPath;
|
||||
static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap(
|
||||
new HashMap<Integer, ZeppelinR>());
|
||||
|
||||
private InterpreterOutput initialOutput;
|
||||
private final int port;
|
||||
private boolean rScriptRunning;
|
||||
|
||||
/**
|
||||
* To be notified R repl initialization
|
||||
*/
|
||||
boolean rScriptInitialized = false;
|
||||
Integer rScriptInitializeNotifier = new Integer(0);
|
||||
|
||||
|
||||
/**
|
||||
* Request to R repl
|
||||
*/
|
||||
Request rRequestObject = null;
|
||||
Integer rRequestNotifier = new Integer(0);
|
||||
|
||||
/**
|
||||
* Request object
|
||||
*
|
||||
* type : "eval", "set", "get"
|
||||
* stmt : statement to evaluate when type is "eval"
|
||||
* key when type is "set" or "get"
|
||||
* value : value object when type is "put"
|
||||
*/
|
||||
public static class Request {
|
||||
String type;
|
||||
String stmt;
|
||||
Object value;
|
||||
|
||||
public Request(String type, String stmt, Object value) {
|
||||
this.type = type;
|
||||
this.stmt = stmt;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getStmt() {
|
||||
return stmt;
|
||||
}
|
||||
|
||||
public Object getValue() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Response from R repl
|
||||
*/
|
||||
Object rResponseValue = null;
|
||||
boolean rResponseError = false;
|
||||
Integer rResponseNotifier = new Integer(0);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Create ZeppelinR instance
|
||||
* @param rCmdPath R repl commandline path
|
||||
* @param libPath sparkr library path
|
||||
*/
|
||||
public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort) {
|
||||
this.rCmdPath = rCmdPath;
|
||||
this.libPath = libPath;
|
||||
this.port = sparkRBackendPort;
|
||||
scriptPath = System.getProperty("java.io.tmpdir") + "/zeppelin_sparkr.R";
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Start R repl
|
||||
* @throws IOException
|
||||
*/
|
||||
public void open() throws IOException {
|
||||
createRScript();
|
||||
|
||||
zeppelinR.put(hashCode(), this);
|
||||
|
||||
CommandLine cmd = CommandLine.parse(rCmdPath);
|
||||
cmd.addArgument("--no-save");
|
||||
cmd.addArgument("--no-restore");
|
||||
cmd.addArgument("-f");
|
||||
cmd.addArgument(scriptPath);
|
||||
cmd.addArgument("--args");
|
||||
cmd.addArgument(Integer.toString(hashCode()));
|
||||
cmd.addArgument(Integer.toString(port));
|
||||
cmd.addArgument(libPath);
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
outputStream = new SparkOutputStream();
|
||||
|
||||
input = new PipedOutputStream();
|
||||
PipedInputStream in = new PipedInputStream(input);
|
||||
|
||||
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in);
|
||||
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
|
||||
executor.setStreamHandler(streamHandler);
|
||||
Map env = EnvironmentUtils.getProcEnvironment();
|
||||
|
||||
|
||||
initialOutput = new InterpreterOutput(new InterpreterOutputListener() {
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
logger.debug(new String(line));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(InterpreterOutput out, byte[] output) {
|
||||
}
|
||||
});
|
||||
outputStream.setInterpreterOutput(initialOutput);
|
||||
executor.execute(cmd, env, this);
|
||||
rScriptRunning = true;
|
||||
|
||||
// flush output
|
||||
eval("cat('')");
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluate expression
|
||||
* @param expr
|
||||
* @return
|
||||
*/
|
||||
public Object eval(String expr) {
|
||||
synchronized (this) {
|
||||
rRequestObject = new Request("eval", expr, null);
|
||||
return request();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* assign value to key
|
||||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
public void set(String key, Object value) {
|
||||
synchronized (this) {
|
||||
rRequestObject = new Request("set", key, value);
|
||||
request();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get value of key
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public Object get(String key) {
|
||||
synchronized (this) {
|
||||
rRequestObject = new Request("get", key, null);
|
||||
return request();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get value of key, as a string
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public String getS0(String key) {
|
||||
synchronized (this) {
|
||||
rRequestObject = new Request("getS", key, null);
|
||||
return (String) request();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Send request to r repl and return response
|
||||
* @return responseValue
|
||||
*/
|
||||
private Object request() throws RuntimeException {
|
||||
if (!rScriptRunning) {
|
||||
throw new RuntimeException("r repl is not running");
|
||||
}
|
||||
|
||||
// wait for rscript initialized
|
||||
if (!rScriptInitialized) {
|
||||
waitForRScriptInitialized();
|
||||
}
|
||||
|
||||
rResponseValue = null;
|
||||
|
||||
synchronized (rRequestNotifier) {
|
||||
rRequestNotifier.notify();
|
||||
}
|
||||
|
||||
Object respValue = null;
|
||||
synchronized (rResponseNotifier) {
|
||||
while (rResponseValue == null && rScriptRunning) {
|
||||
try {
|
||||
rResponseNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
respValue = rResponseValue;
|
||||
rResponseValue = null;
|
||||
}
|
||||
|
||||
if (rResponseError) {
|
||||
throw new RuntimeException(respValue.toString());
|
||||
} else {
|
||||
return respValue;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Wait until src/main/resources/R/zeppelin_sparkr.R is initialized
|
||||
* and call onScriptInitialized()
|
||||
*
|
||||
* @throws InterpreterException
|
||||
*/
|
||||
private void waitForRScriptInitialized() throws InterpreterException {
|
||||
synchronized (rScriptInitializeNotifier) {
|
||||
long startTime = System.nanoTime();
|
||||
while (rScriptInitialized == false &&
|
||||
rScriptRunning &&
|
||||
System.nanoTime() - startTime < 10L * 1000 * 1000000) {
|
||||
try {
|
||||
rScriptInitializeNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String errorMessage = "";
|
||||
try {
|
||||
initialOutput.flush();
|
||||
errorMessage = new String(initialOutput.toByteArray());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
if (rScriptInitialized == false) {
|
||||
throw new InterpreterException("sparkr is not responding " + errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* invoked by src/main/resources/R/zeppelin_sparkr.R
|
||||
* @return
|
||||
*/
|
||||
public Request getRequest() {
|
||||
synchronized (rRequestNotifier) {
|
||||
while (rRequestObject == null) {
|
||||
try {
|
||||
rRequestNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
Request req = rRequestObject;
|
||||
rRequestObject = null;
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* invoked by src/main/resources/R/zeppelin_sparkr.R
|
||||
* @param value
|
||||
* @param error
|
||||
*/
|
||||
public void setResponse(Object value, boolean error) {
|
||||
synchronized (rResponseNotifier) {
|
||||
rResponseValue = value;
|
||||
rResponseError = error;
|
||||
rResponseNotifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* invoked by src/main/resources/R/zeppelin_sparkr.R
|
||||
*/
|
||||
public void onScriptInitialized() {
|
||||
synchronized (rScriptInitializeNotifier) {
|
||||
rScriptInitialized = true;
|
||||
rScriptInitializeNotifier.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create R script in tmp dir
|
||||
*/
|
||||
private void createRScript() {
|
||||
ClassLoader classLoader = getClass().getClassLoader();
|
||||
File out = new File(scriptPath);
|
||||
|
||||
if (out.exists() && out.isDirectory()) {
|
||||
throw new InterpreterException("Can't create r script " + out.getAbsolutePath());
|
||||
}
|
||||
|
||||
try {
|
||||
FileOutputStream outStream = new FileOutputStream(out);
|
||||
IOUtils.copy(
|
||||
classLoader.getResourceAsStream("R/zeppelin_sparkr.R"),
|
||||
outStream);
|
||||
outStream.close();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
logger.info("File {} created", scriptPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminate this R repl
|
||||
*/
|
||||
public void close() {
|
||||
executor.getWatchdog().destroyProcess();
|
||||
new File(scriptPath).delete();
|
||||
zeppelinR.remove(hashCode());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get instance
|
||||
* This method will be invoded from zeppelin_sparkr.R
|
||||
* @param hashcode
|
||||
* @return
|
||||
*/
|
||||
public static ZeppelinR getZeppelinR(int hashcode) {
|
||||
return zeppelinR.get(hashcode);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Pass InterpreterOutput to capture the repl output
|
||||
* @param out
|
||||
*/
|
||||
public void setInterpreterOutput(InterpreterOutput out) {
|
||||
outputStream.setInterpreterOutput(out);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int i) {
|
||||
logger.info("process complete {}", i);
|
||||
rScriptRunning = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
rScriptRunning = false;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
99
spark/src/main/resources/R/zeppelin_sparkr.R
Normal file
99
spark/src/main/resources/R/zeppelin_sparkr.R
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
args <- commandArgs(trailingOnly = TRUE)
|
||||
|
||||
hashCode <- as.integer(args[1])
|
||||
port <- as.integer(args[2])
|
||||
libPath <- args[3]
|
||||
rm(args)
|
||||
|
||||
print(paste("Port ", toString(port)))
|
||||
print(paste("LibPath ", libPath))
|
||||
|
||||
.libPaths(c(file.path(libPath), .libPaths()))
|
||||
library(SparkR)
|
||||
|
||||
|
||||
SparkR:::connectBackend("localhost", port)
|
||||
|
||||
# scStartTime is needed by R/pkg/R/sparkR.R
|
||||
assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
|
||||
|
||||
# getZeppelinR
|
||||
.zeppelinR = SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinR", "getZeppelinR", hashCode)
|
||||
|
||||
# setup spark env
|
||||
assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
|
||||
assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
|
||||
assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
|
||||
assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
|
||||
assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv)
|
||||
|
||||
z.put <- function(name, object) {
|
||||
SparkR:::callJMethod(.zeppelinContext, "put", name, object)
|
||||
}
|
||||
z.get <- function(name) {
|
||||
SparkR:::callJMethod(.zeppelinContext, "get", name)
|
||||
}
|
||||
z.input <- function(name, value) {
|
||||
SparkR:::callJMethod(.zeppelinContext, "input", name, value)
|
||||
}
|
||||
|
||||
# notify script is initialized
|
||||
SparkR:::callJMethod(.zeppelinR, "onScriptInitialized")
|
||||
|
||||
while (TRUE) {
|
||||
req <- SparkR:::callJMethod(.zeppelinR, "getRequest")
|
||||
type <- SparkR:::callJMethod(req, "getType")
|
||||
stmt <- SparkR:::callJMethod(req, "getStmt")
|
||||
value <- SparkR:::callJMethod(req, "getValue")
|
||||
|
||||
if (type == "eval") {
|
||||
tryCatch({
|
||||
ret <- eval(parse(text=stmt))
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE)
|
||||
}, error = function(e) {
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
|
||||
})
|
||||
} else if (type == "set") {
|
||||
tryCatch({
|
||||
ret <- assign(stmt, value)
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE)
|
||||
}, error = function(e) {
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
|
||||
})
|
||||
} else if (type == "get") {
|
||||
tryCatch({
|
||||
ret <- eval(parse(text=stmt))
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", ret, FALSE)
|
||||
}, error = function(e) {
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
|
||||
})
|
||||
} else if (type == "getS") {
|
||||
tryCatch({
|
||||
ret <- eval(parse(text=stmt))
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(ret), FALSE)
|
||||
}, error = function(e) {
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
|
||||
})
|
||||
} else {
|
||||
# unsupported type
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", paste("Unsupported type ", type), TRUE)
|
||||
}
|
||||
}
|
||||
|
|
@ -20,6 +20,8 @@ import org.apache.spark.api.r.RBackend
|
|||
|
||||
object SparkRBackend {
|
||||
val backend : RBackend = new RBackend()
|
||||
private var started = false;
|
||||
private var portNumber = 0;
|
||||
|
||||
val backendThread : Thread = new Thread("SparkRBackend") {
|
||||
override def run() {
|
||||
|
|
@ -27,10 +29,26 @@ object SparkRBackend {
|
|||
}
|
||||
}
|
||||
|
||||
def init() : Int = backend.init()
|
||||
def init() : Int = {
|
||||
portNumber = backend.init()
|
||||
portNumber
|
||||
}
|
||||
|
||||
def start() : Unit = backendThread.start()
|
||||
def start() : Unit = {
|
||||
backendThread.start()
|
||||
started = true
|
||||
}
|
||||
|
||||
def close() : Unit = backend.close()
|
||||
def close() : Unit = {
|
||||
backend.close()
|
||||
backendThread.join()
|
||||
}
|
||||
|
||||
def isStarted() : Boolean = {
|
||||
started
|
||||
}
|
||||
|
||||
def port(): Int = {
|
||||
return portNumber
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,126 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.spark
|
||||
|
||||
import org.apache.spark.SparkRBackend
|
||||
import org.ddahl.rscala.callback._
|
||||
|
||||
object ZeppelinR {
|
||||
|
||||
private val R = RClient()
|
||||
|
||||
def open(master: String = "local[*]", sparkHome: String = "/opt/spark", sparkInterpreter: SparkInterpreter): Unit = {
|
||||
|
||||
eval(
|
||||
s"""
|
||||
|Sys.setenv(SPARK_HOME="$sparkHome")
|
||||
|.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
|
||||
|library(SparkR)
|
||||
""".stripMargin
|
||||
)
|
||||
|
||||
// See ./core/src/main/scala/org/apache/spark/deploy/RRunner.scala for RBackend usage
|
||||
val port = SparkRBackend.init()
|
||||
SparkRBackend.start()
|
||||
eval(
|
||||
s"""
|
||||
|SparkR:::connectBackend("localhost", ${port})
|
||||
|""".stripMargin)
|
||||
|
||||
// scStartTime is needed by R/pkg/R/sparkR.R
|
||||
eval(
|
||||
"""
|
||||
|assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
|
||||
""".stripMargin)
|
||||
|
||||
ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext())
|
||||
eval(
|
||||
"""
|
||||
|assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
|
||||
|""".stripMargin)
|
||||
eval(
|
||||
"""
|
||||
|assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
|
||||
""".stripMargin)
|
||||
|
||||
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext())
|
||||
eval(
|
||||
"""
|
||||
|assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
|
||||
|""".stripMargin)
|
||||
eval(
|
||||
"""
|
||||
|assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
|
||||
|""".stripMargin)
|
||||
|
||||
ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext())
|
||||
eval(
|
||||
"""
|
||||
|assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv)
|
||||
|""".stripMargin
|
||||
)
|
||||
|
||||
eval(
|
||||
"""
|
||||
|z.put <- function(name, object) {
|
||||
| SparkR:::callJMethod(.zeppelinContext, "put", name, object)
|
||||
|}
|
||||
|z.get <- function(name) {
|
||||
| SparkR:::callJMethod(.zeppelinContext, "get", name)
|
||||
|}
|
||||
|z.input <- function(name, value) {
|
||||
| SparkR:::callJMethod(.zeppelinContext, "input", name, value)
|
||||
|}
|
||||
|""".stripMargin
|
||||
)
|
||||
|
||||
eval(
|
||||
"""
|
||||
|library("knitr")
|
||||
""".stripMargin)
|
||||
|
||||
}
|
||||
|
||||
def eval(command: String): Any = {
|
||||
try {
|
||||
R.eval(command)
|
||||
} catch {
|
||||
case e: Exception => throw new RuntimeException(e.getMessage + " - Given R command=" + command)
|
||||
}
|
||||
}
|
||||
|
||||
def set(key: String, value: AnyRef): Unit = {
|
||||
R.set(key, value)
|
||||
}
|
||||
|
||||
def get(key: String): Any = {
|
||||
R.get(key)._1
|
||||
}
|
||||
|
||||
def getS0(key: String): String = {
|
||||
R.getS0(key)
|
||||
}
|
||||
|
||||
def close():Unit = {
|
||||
R.eval("""
|
||||
|sparkR.stop()
|
||||
""".stripMargin
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,108 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.spark;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.powermock.api.mockito.PowerMockito.*;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.*;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mockito;
|
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest(SparkRInterpreter.ZeppelinRFactory.class)
|
||||
@PowerMockIgnore({"org.apache.spark.*", "org.apache.hadoop.*", "akka.*", "org.w3c.*", "javax.xml.*", "org.xml.*", "scala.*", "org.apache.cxf.*"})
|
||||
public class SparkRInterpreterTest {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SparkRInterpreterTest.class);
|
||||
|
||||
private static final String MOCK_RSCALA_RESULT = "<body><p> Mock R Result </p></body>";
|
||||
private static final String MOCK_R_INTERPRETER_RESULT = "Mock R Result";
|
||||
|
||||
private static InterpreterContext context;
|
||||
private static InterpreterGroup intpGroup;
|
||||
private static SparkInterpreter sparkInterpreter;
|
||||
private static SparkRInterpreter.ZeppelinRFactory zeppelinRFactory;
|
||||
private static SparkRInterpreter sparkRInterpreter;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
initInterpreters();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccess() throws Exception {
|
||||
InterpreterResult ret = sparkRInterpreter.interpret(MOCK_RSCALA_RESULT, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertEquals(MOCK_R_INTERPRETER_RESULT, ret.message());
|
||||
assertEquals(InterpreterResult.Type.TEXT, ret.type());
|
||||
}
|
||||
|
||||
private static void initInterpreters() {
|
||||
|
||||
Properties p = new Properties();
|
||||
|
||||
sparkInterpreter = new SparkInterpreter(p);
|
||||
intpGroup = new InterpreterGroup();
|
||||
|
||||
zeppelinRFactory = mock(SparkRInterpreter.ZeppelinRFactory.class);
|
||||
doNothing().when(zeppelinRFactory).open(Mockito.anyString(), Mockito.anyString(), any(SparkInterpreter.class));
|
||||
when(zeppelinRFactory.getS0(anyString())).thenReturn(MOCK_RSCALA_RESULT);
|
||||
|
||||
mockStatic(SparkRInterpreter.ZeppelinRFactory.class);
|
||||
when(SparkRInterpreter.ZeppelinRFactory.instance()).thenReturn(zeppelinRFactory);
|
||||
|
||||
sparkRInterpreter = new SparkRInterpreter(p);
|
||||
sparkRInterpreter.setInterpreterGroup(intpGroup);
|
||||
sparkRInterpreter.open();
|
||||
|
||||
context = new InterpreterContext("note", "id", "title", "text",
|
||||
new AuthenticationInfo(),
|
||||
new HashMap<String, Object>(), new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
null,
|
||||
new LinkedList<InterpreterContextRunner>(),
|
||||
new InterpreterOutput(new InterpreterOutputListener() {
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
}
|
||||
@Override
|
||||
public void onUpdate(InterpreterOutput out, byte[] output) {
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -159,7 +159,6 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
|
|||
(BSD 3 Clause) highlightjs v8.4.0 (https://highlightjs.org/) - https://github.com/isagalaev/highlight.js/blob/8.4/LICENSE
|
||||
(BSD 3 Clause) hamcrest v1.3 (http://hamcrest.org/JavaHamcrest/) - http://opensource.org/licenses/BSD-3-Clause
|
||||
(BSD Style) JLine v2.12.1 (https://github.com/jline/jline2) - https://github.com/jline/jline2/blob/master/LICENSE.txt
|
||||
(BSD Clause) RScala 1.0.6 (https://cran.r-project.org/web/packages/rscala/index.html) - https://cran.r-project.org/web/licenses/BSD_3_clause
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,51 +0,0 @@
|
|||
https://cran.r-project.org/web/packages/rscala/LICENSE
|
||||
|
||||
YEAR: 2013-2015
|
||||
COPYRIGHT HOLDER: David B. Dahl
|
||||
ORGANIZATION: Brigham Young University
|
||||
|
||||
https://cran.r-project.org/web/licenses/BSD_3_clause
|
||||
|
||||
Based on http://opensource.org/licenses/BSD-3-Clause
|
||||
|
||||
This is a template. Complete and ship as file LICENSE the following 3
|
||||
lines (only)
|
||||
|
||||
YEAR:
|
||||
COPYRIGHT HOLDER:
|
||||
ORGANIZATION:
|
||||
|
||||
and specify as
|
||||
|
||||
License: BSD_3_clause + file LICENSE
|
||||
|
||||
|
||||
Copyright (c) <YEAR>, <COPYRIGHT HOLDER>
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
|
||||
Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in
|
||||
the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
|
||||
Neither the name of the <ORGANIZATION> nor the names of its
|
||||
contributors may be used to endorse or promote products derived
|
||||
from this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
|
@ -60,6 +60,7 @@ public abstract class AbstractTestRestApi {
|
|||
static final String url = getUrlToTest();
|
||||
protected static final boolean wasRunning = checkIfServerIsRunning();
|
||||
static boolean pySpark = false;
|
||||
static boolean sparkR = false;
|
||||
|
||||
private String getUrl(String path) {
|
||||
String url;
|
||||
|
|
@ -132,7 +133,7 @@ public abstract class AbstractTestRestApi {
|
|||
// set spark home for pyspark
|
||||
sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
|
||||
pySpark = true;
|
||||
|
||||
sparkR = true;
|
||||
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id());
|
||||
} else {
|
||||
// assume first one is spark
|
||||
|
|
@ -148,6 +149,7 @@ public abstract class AbstractTestRestApi {
|
|||
// set spark home for pyspark
|
||||
sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
|
||||
pySpark = true;
|
||||
sparkR = true;
|
||||
}
|
||||
|
||||
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id());
|
||||
|
|
@ -174,6 +176,10 @@ public abstract class AbstractTestRestApi {
|
|||
return pySpark;
|
||||
}
|
||||
|
||||
boolean isSparkR() {
|
||||
return sparkR;
|
||||
}
|
||||
|
||||
private static String getSparkHomeRecursively(File dir) {
|
||||
if (dir == null) return null;
|
||||
File files [] = dir.listFiles();
|
||||
|
|
|
|||
|
|
@ -83,6 +83,30 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
ZeppelinServer.notebook.removeNote(note.id());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sparkRTest() throws IOException {
|
||||
// create new note
|
||||
Note note = ZeppelinServer.notebook.createNote();
|
||||
int sparkVersion = getSparkVersionNumber(note);
|
||||
|
||||
if (isSparkR() && sparkVersion >= 14) { // sparkr supported from 1.4.0
|
||||
// run markdown paragraph, again
|
||||
Paragraph p = note.addParagraph();
|
||||
Map config = p.getConfig();
|
||||
config.put("enabled", true);
|
||||
p.setConfig(config);
|
||||
p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" +
|
||||
"df <- createDataFrame(sqlContext, localDF)\n" +
|
||||
"count(df)"
|
||||
);
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
assertEquals("[1] 3", p.getResult().message());
|
||||
}
|
||||
ZeppelinServer.notebook.removeNote(note.id());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pySparkTest() throws IOException {
|
||||
// create new note
|
||||
|
|
|
|||
Loading…
Reference in a new issue