Merge remote rscala-z-rs

This commit is contained in:
Eric Charles 2016-03-27 07:42:42 +02:00
commit eb6c40ca64
18 changed files with 725 additions and 516 deletions

4
.gitignore vendored
View file

@ -54,10 +54,6 @@ zeppelin-web/bower_components
**/testing/
!/testing/
**/lib/rscala
**/lib/rscala
# OS generated files #
######################
.DS_Store

View file

@ -15,19 +15,26 @@
language: java
addons:
apt:
sources:
r-packages-precise
packages:
r-base
matrix:
include:
# Test all modules
- jdk: "oraclejdk7"
env: SPARK_VER="1.6.0" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark -Pscalding" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
env: SPARK_VER="1.6.0" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
# Test spark module for 1.5.2
- jdk: "oraclejdk7"
env: SPARK_VER="1.5.2" HADOOP_VER="2.3" PROFILE="-Pspark-1.5 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false"
env: SPARK_VER="1.5.2" HADOOP_VER="2.3" PROFILE="-Pspark-1.5 -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false"
# Test spark module for 1.4.1
- jdk: "oraclejdk7"
env: SPARK_VER="1.4.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.4 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false"
env: SPARK_VER="1.4.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.4 -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false"
# Test spark module for 1.3.1
- jdk: "oraclejdk7"
@ -49,13 +56,18 @@ before_install:
- "export DISPLAY=:99.0"
- "sh -e /etc/init.d/xvfb start"
# install R packages
- mkdir -p ~/Rlib
- echo 'R_LIBS=~/Rlib' > ~/.Renviron
- Rscript -e "install.packages('knitr', repos = 'http://cran.us.r-project.org')"
install:
- mvn $BUILD_FLAG $PROFILE -B
before_script:
- travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
- ./testing/startSparkCluster.sh $SPARK_VER $HADOOP_VER
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
- echo -e "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER\nexport ZEPPELIN_R_CMD=Rscript" > conf/zeppelin-env.sh
script:
- mvn $TEST_FLAG $PROFILE -B $TEST_PROJECTS

View file

@ -129,7 +129,6 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
export SPARK_CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
fi
CLASSPATH+=":${ZEPPELIN_CLASSPATH}":"${ZEPPELIN_HOME}/interpreter/spark/R/rscala_2.10-1.0.6.jar"
fi
addJarInDir "${LOCAL_INTERPRETER_REPO}"

View file

@ -21,21 +21,11 @@ Validate your installation with a simple R command:
R -e "print(1+1)"
```
Then install the required R libraries;
To enjoy plots, install additional libraries with:
```
+ devtools with `R -e "install.packages('devtools', repos = 'http://cran.us.r-project.org')"`
+ knitr with `R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org')"`
+ rscala: You need version 1.0.6 of RScala, so the commands will be [1]
```
[1]
curl https://cran.r-project.org/src/contrib/Archive/rscala/rscala_1.0.6.tar.gz -o /tmp/rscala_1.0.6.tar.gz
R CMD INSTALL /tmp/rscala_1.0.6.tar.gz
```
To enjoy plots, you need:
```
+ ggplot2 with `R -e "install.packages('ggplot2', repos = 'http://cran.us.r-project.org')"`
+ Other vizualisation librairies: `R -e "install.packages(c('devtools','mplot', 'googleVis'), repos = 'http://cran.us.r-project.org'); require(devtools); install_github('ramnathv/rCharts')"`
```

20
pom.xml
View file

@ -116,7 +116,6 @@
<libthrift.version>0.9.2</libthrift.version>
<gson.version>2.2</gson.version>
<guava.version>15.0</guava.version>
<rscala.version>1.0.6</rscala.version>
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
@ -230,25 +229,6 @@
<build>
<plugins>
<plugin>
<groupId>com.googlecode.maven-download-plugin</groupId>
<artifactId>download-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<id>download-rscala-lib</id>
<phase>validate</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>https://cran.r-project.org/src/contrib/Archive/rscala/rscala_${rscala.version}.tar.gz</url>
<unpack>true</unpack>
<outputDirectory>${user.dir}/spark/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>

View file

@ -51,6 +51,7 @@
<akka.version>2.3.4-spark</akka.version>
<spark.download.url>http://archive.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz</spark.download.url>
<spark.bin.download.url>http://archive.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}-bin-without-hadoop.tgz</spark.bin.download.url>
<py4j.version>0.8.2.1</py4j.version>
</properties>
@ -759,10 +760,10 @@
<configuration>
<filesets>
<fileset>
<directory>${basedir}/../python/build</directory>
<directory>${project.build.directory}/spark-dist</directory>
</fileset>
<fileset>
<directory>${project.build.directory}/spark-dist</directory>
<directory>${basedir}/../python/build</directory>
</fileset>
</filesets>
</configuration>
@ -794,6 +795,65 @@
</plugins>
</build>
</profile>
<profile>
<id>sparkr</id>
<build>
<plugins>
<plugin>
<groupId>com.googlecode.maven-download-plugin</groupId>
<artifactId>download-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<id>download-sparkr-files</id>
<phase>validate</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>${spark.bin.download.url}</url>
<unpack>true</unpack>
<outputDirectory>${project.build.directory}/spark-bin-dist</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<configuration>
<filesets>
<fileset>
<directory>${project.build.directory}/spark-bin-dist</directory>
</fileset>
</filesets>
</configuration>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.7</version>
<executions>
<execution>
<id>copy-sparkr-files</id>
<phase>generate-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/spark/R/lib</outputDirectory>
<resources>
<resource>
<directory>${project.build.directory}/spark-bin-dist/spark-${spark.version}-bin-without-hadoop/R/lib</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
@ -924,28 +984,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>copy-rscala</id>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<copy todir="${user.dir}/interpreter/spark/R"
file="${user.dir}/spark/lib/rscala/inst/java/rscala_${scala.binary.version}-${rscala.version}.jar"/>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -1,3 +0,0 @@
This folder contains 3rd party jars.
DO NOT COMMIT THESE JARS IN THE SRC REPO.

View file

@ -239,14 +239,6 @@
<version>${jsoup.version}</version>
</dependency>
<dependency>
<groupId>org.ddahl</groupId>
<artifactId>rscala</artifactId>
<version>${rscala.version}</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/rscala/inst/java/rscala_${scala.binary.version}-${rscala.version}.jar</systemPath>
</dependency>
<!--TEST-->
<dependency>
<groupId>org.scalatest</groupId>
@ -321,7 +313,6 @@
<exclude>**/metastore_db/</exclude>
<exclude>**/README.md</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>**/lib/rscala/**</exclude>
</excludes>
</configuration>
</plugin>
@ -441,84 +432,17 @@
</build>
<profiles>
<!-- to deactivate 'exclude-sparkr' automatically when 'spark' is activated -->
<profile>
<id>spark-1.1</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/SparkRInterpreter.java</exclude>
</excludes>
<testExcludes>
<testExclude>**/SparkRInterpreterTest.java</testExclude>
</testExcludes>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/ZeppelinR.scala</exclude>
<exclude>**/SparkRBackend.scala</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/SparkRInterpreterTest.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<id>sparkr</id>
</profile>
<profile>
<id>spark-1.2</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/SparkRInterpreter.java</exclude>
</excludes>
<testExcludes>
<testExclude>**/SparkRInterpreterTest.java</testExclude>
</testExcludes>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/ZeppelinR.scala</exclude>
<exclude>**/SparkRBackend.scala</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/SparkRInterpreterTest.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>spark-1.3</id>
<id>exclude-sparkr</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
@ -530,6 +454,7 @@
</excludes>
<testExcludes>
<testExclude>**/SparkRInterpreterTest.java</testExclude>
<testExclude>**/ZeppelinRTest.java</testExclude>
</testExcludes>
</configuration>
</plugin>

View file

@ -21,16 +21,14 @@ import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkRBackend;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@ -42,6 +40,7 @@ public class SparkRInterpreter extends Interpreter {
private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class);
private static String renderOptions;
private ZeppelinR zeppelinR;
static {
Interpreter.register(
@ -49,38 +48,72 @@ public class SparkRInterpreter extends Interpreter {
"spark",
SparkRInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.master",
SparkInterpreter.getSystemDefault("MASTER", "spark.master", "local[*]"),
"Spark master uri. ex) spark://masterhost:7077")
.add("spark.home",
SparkInterpreter.getSystemDefault("SPARK_HOME", "spark.home", "/opt/spark"),
"Spark distribution location")
.add("zeppelin.R.image.width",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH",
"zeppelin.R.image.width", "100%"),
"")
.add("zeppelin.R.render.options",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS",
"zeppelin.R.render.options",
"out.format = 'html', comment = NA, "
+ "echo = FALSE, results = 'asis', message = F, warning = F"),
"")
.build());
.add("zeppelin.R.cmd",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_CMD", "zeppelin.R.cmd", "R"),
"R repl path")
.add("zeppelin.R.knitr",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_KNITR", "zeppelin.R.knitr", "true"),
"whether use knitr or not")
.add("zeppelin.R.image.width",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH",
"zeppelin.R.image.width", "100%"),
"")
.add("zeppelin.R.render.options",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS",
"zeppelin.R.render.options",
"out.format = 'html', comment = NA, "
+ "echo = FALSE, results = 'asis', message = F, warning = F"),
"")
.build());
}
public SparkRInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
zeppelinR().open(getProperty("spark.master"),
"/opt/spark", getSparkInterpreter());
String rCmdPath = getProperty("zeppelin.R.cmd");
String sparkRLibPath;
if (System.getenv("SPARK_HOME") != null) {
sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib";
} else {
sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R/lib";
// workaround to make sparkr work without SPARK_HOME
System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/spark");
}
synchronized (SparkRBackend.backend()) {
if (!SparkRBackend.isStarted()) {
SparkRBackend.init();
SparkRBackend.start();
}
}
int port = SparkRBackend.port();
SparkInterpreter sparkInterpreter = getSparkInterpreter();
ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext());
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext());
zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port);
try {
zeppelinR.open();
} catch (IOException e) {
throw new InterpreterException(e);
}
if (useKnitr()) {
zeppelinR.eval("library('knitr')");
}
renderOptions = getProperty("zeppelin.R.render.options");
}
@Override
public InterpreterResult interpret(String lines, InterpreterContext contextInterpreter) {
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {
String imageWidth = getProperty("zeppelin.R.image.width");
@ -102,20 +135,26 @@ public class SparkRInterpreter extends Interpreter {
}
try {
// render output with knitr
if (useKnitr()) {
zeppelinR.setInterpreterOutput(null);
zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
zeppelinR.eval(".zres <- knit2html(text=.zcmd)");
String html = zeppelinR.getS0(".zres");
zeppelinR().set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
zeppelinR().eval(".zres <- knit2html(text=.zcmd)");
String html = zeppelinR().getS0(".zres");
RDisplay rDisplay = render(html, imageWidth);
return new InterpreterResult(
rDisplay.code(),
rDisplay.type(),
rDisplay.content()
);
RDisplay rDisplay = render(html, imageWidth);
return new InterpreterResult(
rDisplay.code(),
rDisplay.type(),
rDisplay.content()
);
} else {
// alternatively, stream the output (without knitr)
zeppelinR.setInterpreterOutput(interpreterContext.out);
zeppelinR.eval(lines);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
} catch (Exception e) {
logger.error("Exception while connecting to R", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
@ -129,7 +168,7 @@ public class SparkRInterpreter extends Interpreter {
@Override
public void close() {
zeppelinR().close();
zeppelinR.close();
}
@Override
@ -175,43 +214,11 @@ public class SparkRInterpreter extends Interpreter {
return spark;
}
protected static ZeppelinRFactory zeppelinR() {
return ZeppelinRFactory.instance();
}
/**
* Java Factory to support tests with Mockito.
*
* (Mockito can not mock the zeppelinR final scala object class).
*/
protected static class ZeppelinRFactory {
private static ZeppelinRFactory instance;
private static ZeppelinR zeppelinR;
private ZeppelinRFactory() {
// Singleton
}
protected static synchronized ZeppelinRFactory instance() {
if (instance == null) instance = new ZeppelinRFactory();
return instance;
}
protected void open(String master, String sparkHome, SparkInterpreter sparkInterpreter) {
zeppelinR.open(master, sparkHome, sparkInterpreter);
}
protected Object eval(String command) {
return zeppelinR.eval(command);
}
protected void set(String key, Object value) {
zeppelinR.set(key, value);
}
protected Object get(String key) {
return zeppelinR.get(key);
}
protected String getS0(String key) {
return zeppelinR.getS0(key);
}
protected void close() {
zeppelinR.close();
private boolean useKnitr() {
try {
return Boolean.parseBoolean(getProperty("zeppelin.R.knitr"));
} catch (Exception e) {
return false;
}
}

View file

@ -0,0 +1,404 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import parquet.org.slf4j.Logger;
import parquet.org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* R repl interaction
*/
public class ZeppelinR implements ExecuteResultHandler {
Logger logger = LoggerFactory.getLogger(ZeppelinR.class);
private final String rCmdPath;
private DefaultExecutor executor;
private SparkOutputStream outputStream;
private PipedOutputStream input;
private final String scriptPath;
private final String libPath;
static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap(
new HashMap<Integer, ZeppelinR>());
private InterpreterOutput initialOutput;
private final int port;
private boolean rScriptRunning;
/**
* To be notified R repl initialization
*/
boolean rScriptInitialized = false;
Integer rScriptInitializeNotifier = new Integer(0);
/**
* Request to R repl
*/
Request rRequestObject = null;
Integer rRequestNotifier = new Integer(0);
/**
* Request object
*
* type : "eval", "set", "get"
* stmt : statement to evaluate when type is "eval"
* key when type is "set" or "get"
* value : value object when type is "put"
*/
public static class Request {
String type;
String stmt;
Object value;
public Request(String type, String stmt, Object value) {
this.type = type;
this.stmt = stmt;
this.value = value;
}
public String getType() {
return type;
}
public String getStmt() {
return stmt;
}
public Object getValue() {
return value;
}
}
/**
* Response from R repl
*/
Object rResponseValue = null;
boolean rResponseError = false;
Integer rResponseNotifier = new Integer(0);
/**
* Create ZeppelinR instance
* @param rCmdPath R repl commandline path
* @param libPath sparkr library path
*/
public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort) {
this.rCmdPath = rCmdPath;
this.libPath = libPath;
this.port = sparkRBackendPort;
scriptPath = System.getProperty("java.io.tmpdir") + "/zeppelin_sparkr.R";
}
/**
* Start R repl
* @throws IOException
*/
public void open() throws IOException {
createRScript();
zeppelinR.put(hashCode(), this);
CommandLine cmd = CommandLine.parse(rCmdPath);
cmd.addArgument("--no-save");
cmd.addArgument("--no-restore");
cmd.addArgument("-f");
cmd.addArgument(scriptPath);
cmd.addArgument("--args");
cmd.addArgument(Integer.toString(hashCode()));
cmd.addArgument(Integer.toString(port));
cmd.addArgument(libPath);
executor = new DefaultExecutor();
outputStream = new SparkOutputStream();
input = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(input);
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in);
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
executor.setStreamHandler(streamHandler);
Map env = EnvironmentUtils.getProcEnvironment();
initialOutput = new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
logger.debug(new String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
});
outputStream.setInterpreterOutput(initialOutput);
executor.execute(cmd, env, this);
rScriptRunning = true;
// flush output
eval("cat('')");
}
/**
* Evaluate expression
* @param expr
* @return
*/
public Object eval(String expr) {
synchronized (this) {
rRequestObject = new Request("eval", expr, null);
return request();
}
}
/**
* assign value to key
* @param key
* @param value
*/
public void set(String key, Object value) {
synchronized (this) {
rRequestObject = new Request("set", key, value);
request();
}
}
/**
* get value of key
* @param key
* @return
*/
public Object get(String key) {
synchronized (this) {
rRequestObject = new Request("get", key, null);
return request();
}
}
/**
* get value of key, as a string
* @param key
* @return
*/
public String getS0(String key) {
synchronized (this) {
rRequestObject = new Request("getS", key, null);
return (String) request();
}
}
/**
* Send request to r repl and return response
* @return responseValue
*/
private Object request() throws RuntimeException {
if (!rScriptRunning) {
throw new RuntimeException("r repl is not running");
}
// wait for rscript initialized
if (!rScriptInitialized) {
waitForRScriptInitialized();
}
rResponseValue = null;
synchronized (rRequestNotifier) {
rRequestNotifier.notify();
}
Object respValue = null;
synchronized (rResponseNotifier) {
while (rResponseValue == null && rScriptRunning) {
try {
rResponseNotifier.wait(1000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
respValue = rResponseValue;
rResponseValue = null;
}
if (rResponseError) {
throw new RuntimeException(respValue.toString());
} else {
return respValue;
}
}
/**
* Wait until src/main/resources/R/zeppelin_sparkr.R is initialized
* and call onScriptInitialized()
*
* @throws InterpreterException
*/
private void waitForRScriptInitialized() throws InterpreterException {
synchronized (rScriptInitializeNotifier) {
long startTime = System.nanoTime();
while (rScriptInitialized == false &&
rScriptRunning &&
System.nanoTime() - startTime < 10L * 1000 * 1000000) {
try {
rScriptInitializeNotifier.wait(1000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
String errorMessage = "";
try {
initialOutput.flush();
errorMessage = new String(initialOutput.toByteArray());
} catch (IOException e) {
e.printStackTrace();
}
if (rScriptInitialized == false) {
throw new InterpreterException("sparkr is not responding " + errorMessage);
}
}
/**
* invoked by src/main/resources/R/zeppelin_sparkr.R
* @return
*/
public Request getRequest() {
synchronized (rRequestNotifier) {
while (rRequestObject == null) {
try {
rRequestNotifier.wait(1000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
Request req = rRequestObject;
rRequestObject = null;
return req;
}
}
/**
* invoked by src/main/resources/R/zeppelin_sparkr.R
* @param value
* @param error
*/
public void setResponse(Object value, boolean error) {
synchronized (rResponseNotifier) {
rResponseValue = value;
rResponseError = error;
rResponseNotifier.notify();
}
}
/**
* invoked by src/main/resources/R/zeppelin_sparkr.R
*/
public void onScriptInitialized() {
synchronized (rScriptInitializeNotifier) {
rScriptInitialized = true;
rScriptInitializeNotifier.notifyAll();
}
}
/**
* Create R script in tmp dir
*/
private void createRScript() {
ClassLoader classLoader = getClass().getClassLoader();
File out = new File(scriptPath);
if (out.exists() && out.isDirectory()) {
throw new InterpreterException("Can't create r script " + out.getAbsolutePath());
}
try {
FileOutputStream outStream = new FileOutputStream(out);
IOUtils.copy(
classLoader.getResourceAsStream("R/zeppelin_sparkr.R"),
outStream);
outStream.close();
} catch (IOException e) {
throw new InterpreterException(e);
}
logger.info("File {} created", scriptPath);
}
/**
* Terminate this R repl
*/
public void close() {
executor.getWatchdog().destroyProcess();
new File(scriptPath).delete();
zeppelinR.remove(hashCode());
}
/**
* Get instance
* This method will be invoded from zeppelin_sparkr.R
* @param hashcode
* @return
*/
public static ZeppelinR getZeppelinR(int hashcode) {
return zeppelinR.get(hashcode);
}
/**
* Pass InterpreterOutput to capture the repl output
* @param out
*/
public void setInterpreterOutput(InterpreterOutput out) {
outputStream.setInterpreterOutput(out);
}
@Override
public void onProcessComplete(int i) {
logger.info("process complete {}", i);
rScriptRunning = false;
}
@Override
public void onProcessFailed(ExecuteException e) {
logger.error(e.getMessage(), e);
rScriptRunning = false;
}
}

View file

@ -0,0 +1,99 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
args <- commandArgs(trailingOnly = TRUE)
hashCode <- as.integer(args[1])
port <- as.integer(args[2])
libPath <- args[3]
rm(args)
print(paste("Port ", toString(port)))
print(paste("LibPath ", libPath))
.libPaths(c(file.path(libPath), .libPaths()))
library(SparkR)
SparkR:::connectBackend("localhost", port)
# scStartTime is needed by R/pkg/R/sparkR.R
assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
# getZeppelinR
.zeppelinR = SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinR", "getZeppelinR", hashCode)
# setup spark env
assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv)
z.put <- function(name, object) {
SparkR:::callJMethod(.zeppelinContext, "put", name, object)
}
z.get <- function(name) {
SparkR:::callJMethod(.zeppelinContext, "get", name)
}
z.input <- function(name, value) {
SparkR:::callJMethod(.zeppelinContext, "input", name, value)
}
# notify script is initialized
SparkR:::callJMethod(.zeppelinR, "onScriptInitialized")
while (TRUE) {
req <- SparkR:::callJMethod(.zeppelinR, "getRequest")
type <- SparkR:::callJMethod(req, "getType")
stmt <- SparkR:::callJMethod(req, "getStmt")
value <- SparkR:::callJMethod(req, "getValue")
if (type == "eval") {
tryCatch({
ret <- eval(parse(text=stmt))
SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE)
}, error = function(e) {
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
})
} else if (type == "set") {
tryCatch({
ret <- assign(stmt, value)
SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE)
}, error = function(e) {
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
})
} else if (type == "get") {
tryCatch({
ret <- eval(parse(text=stmt))
SparkR:::callJMethod(.zeppelinR, "setResponse", ret, FALSE)
}, error = function(e) {
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
})
} else if (type == "getS") {
tryCatch({
ret <- eval(parse(text=stmt))
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(ret), FALSE)
}, error = function(e) {
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
})
} else {
# unsupported type
SparkR:::callJMethod(.zeppelinR, "setResponse", paste("Unsupported type ", type), TRUE)
}
}

View file

@ -20,6 +20,8 @@ import org.apache.spark.api.r.RBackend
object SparkRBackend {
val backend : RBackend = new RBackend()
private var started = false;
private var portNumber = 0;
val backendThread : Thread = new Thread("SparkRBackend") {
override def run() {
@ -27,10 +29,26 @@ object SparkRBackend {
}
}
def init() : Int = backend.init()
def init() : Int = {
portNumber = backend.init()
portNumber
}
def start() : Unit = backendThread.start()
def start() : Unit = {
backendThread.start()
started = true
}
def close() : Unit = backend.close()
def close() : Unit = {
backend.close()
backendThread.join()
}
def isStarted() : Boolean = {
started
}
def port(): Int = {
return portNumber
}
}

View file

@ -1,126 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark
import org.apache.spark.SparkRBackend
import org.ddahl.rscala.callback._
object ZeppelinR {
private val R = RClient()
def open(master: String = "local[*]", sparkHome: String = "/opt/spark", sparkInterpreter: SparkInterpreter): Unit = {
eval(
s"""
|Sys.setenv(SPARK_HOME="$sparkHome")
|.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
|library(SparkR)
""".stripMargin
)
// See ./core/src/main/scala/org/apache/spark/deploy/RRunner.scala for RBackend usage
val port = SparkRBackend.init()
SparkRBackend.start()
eval(
s"""
|SparkR:::connectBackend("localhost", ${port})
|""".stripMargin)
// scStartTime is needed by R/pkg/R/sparkR.R
eval(
"""
|assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
""".stripMargin)
ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext())
eval(
"""
|assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
|""".stripMargin)
eval(
"""
|assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
""".stripMargin)
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext())
eval(
"""
|assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
|""".stripMargin)
eval(
"""
|assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
|""".stripMargin)
ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext())
eval(
"""
|assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv)
|""".stripMargin
)
eval(
"""
|z.put <- function(name, object) {
| SparkR:::callJMethod(.zeppelinContext, "put", name, object)
|}
|z.get <- function(name) {
| SparkR:::callJMethod(.zeppelinContext, "get", name)
|}
|z.input <- function(name, value) {
| SparkR:::callJMethod(.zeppelinContext, "input", name, value)
|}
|""".stripMargin
)
eval(
"""
|library("knitr")
""".stripMargin)
}
def eval(command: String): Any = {
try {
R.eval(command)
} catch {
case e: Exception => throw new RuntimeException(e.getMessage + " - Given R command=" + command)
}
}
def set(key: String, value: AnyRef): Unit = {
R.set(key, value)
}
def get(key: String): Any = {
R.get(key)._1
}
def getS0(key: String): String = {
R.getS0(key)
}
def close():Unit = {
R.eval("""
|sparkR.stop()
""".stripMargin
)
}
}

View file

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.*;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.*;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest(SparkRInterpreter.ZeppelinRFactory.class)
@PowerMockIgnore({"org.apache.spark.*", "org.apache.hadoop.*", "akka.*", "org.w3c.*", "javax.xml.*", "org.xml.*", "scala.*", "org.apache.cxf.*"})
public class SparkRInterpreterTest {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkRInterpreterTest.class);
private static final String MOCK_RSCALA_RESULT = "<body><p> Mock R Result </p></body>";
private static final String MOCK_R_INTERPRETER_RESULT = "Mock R Result";
private static InterpreterContext context;
private static InterpreterGroup intpGroup;
private static SparkInterpreter sparkInterpreter;
private static SparkRInterpreter.ZeppelinRFactory zeppelinRFactory;
private static SparkRInterpreter sparkRInterpreter;
@BeforeClass
public static void beforeClass() {
initInterpreters();
}
@Test
public void testSuccess() throws Exception {
InterpreterResult ret = sparkRInterpreter.interpret(MOCK_RSCALA_RESULT, context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(MOCK_R_INTERPRETER_RESULT, ret.message());
assertEquals(InterpreterResult.Type.TEXT, ret.type());
}
private static void initInterpreters() {
Properties p = new Properties();
sparkInterpreter = new SparkInterpreter(p);
intpGroup = new InterpreterGroup();
zeppelinRFactory = mock(SparkRInterpreter.ZeppelinRFactory.class);
doNothing().when(zeppelinRFactory).open(Mockito.anyString(), Mockito.anyString(), any(SparkInterpreter.class));
when(zeppelinRFactory.getS0(anyString())).thenReturn(MOCK_RSCALA_RESULT);
mockStatic(SparkRInterpreter.ZeppelinRFactory.class);
when(SparkRInterpreter.ZeppelinRFactory.instance()).thenReturn(zeppelinRFactory);
sparkRInterpreter = new SparkRInterpreter(p);
sparkRInterpreter.setInterpreterGroup(intpGroup);
sparkRInterpreter.open();
context = new InterpreterContext("note", "id", "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
null,
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
})
);
}
}

View file

@ -159,7 +159,6 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
(BSD 3 Clause) highlightjs v8.4.0 (https://highlightjs.org/) - https://github.com/isagalaev/highlight.js/blob/8.4/LICENSE
(BSD 3 Clause) hamcrest v1.3 (http://hamcrest.org/JavaHamcrest/) - http://opensource.org/licenses/BSD-3-Clause
(BSD Style) JLine v2.12.1 (https://github.com/jline/jline2) - https://github.com/jline/jline2/blob/master/LICENSE.txt
(BSD Clause) RScala 1.0.6 (https://cran.r-project.org/web/packages/rscala/index.html) - https://cran.r-project.org/web/licenses/BSD_3_clause

View file

@ -1,51 +0,0 @@
https://cran.r-project.org/web/packages/rscala/LICENSE
YEAR: 2013-2015
COPYRIGHT HOLDER: David B. Dahl
ORGANIZATION: Brigham Young University
https://cran.r-project.org/web/licenses/BSD_3_clause
Based on http://opensource.org/licenses/BSD-3-Clause
This is a template. Complete and ship as file LICENSE the following 3
lines (only)
YEAR:
COPYRIGHT HOLDER:
ORGANIZATION:
and specify as
License: BSD_3_clause + file LICENSE
Copyright (c) <YEAR>, <COPYRIGHT HOLDER>
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
Neither the name of the <ORGANIZATION> nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -60,6 +60,7 @@ public abstract class AbstractTestRestApi {
static final String url = getUrlToTest();
protected static final boolean wasRunning = checkIfServerIsRunning();
static boolean pySpark = false;
static boolean sparkR = false;
private String getUrl(String path) {
String url;
@ -132,7 +133,7 @@ public abstract class AbstractTestRestApi {
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
pySpark = true;
sparkR = true;
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id());
} else {
// assume first one is spark
@ -148,6 +149,7 @@ public abstract class AbstractTestRestApi {
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
pySpark = true;
sparkR = true;
}
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id());
@ -174,6 +176,10 @@ public abstract class AbstractTestRestApi {
return pySpark;
}
boolean isSparkR() {
return sparkR;
}
private static String getSparkHomeRecursively(File dir) {
if (dir == null) return null;
File files [] = dir.listFiles();

View file

@ -83,6 +83,30 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
ZeppelinServer.notebook.removeNote(note.id());
}
@Test
public void sparkRTest() throws IOException {
// create new note
Note note = ZeppelinServer.notebook.createNote();
int sparkVersion = getSparkVersionNumber(note);
if (isSparkR() && sparkVersion >= 14) { // sparkr supported from 1.4.0
// run markdown paragraph, again
Paragraph p = note.addParagraph();
Map config = p.getConfig();
config.put("enabled", true);
p.setConfig(config);
p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" +
"df <- createDataFrame(sqlContext, localDF)\n" +
"count(df)"
);
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[1] 3", p.getResult().message());
}
ZeppelinServer.notebook.removeNote(note.id());
}
@Test
public void pySparkTest() throws IOException {
// create new note