Remove rscala dependency

This commit is contained in:
Lee moon soo 2016-03-16 21:18:46 -07:00
parent 1e2c99bbf3
commit 9df95351ad
9 changed files with 688 additions and 240 deletions

View file

@ -794,6 +794,55 @@
</plugins>
</build>
</profile>
<profile>
<id>sparkr</id>
<build>
<plugins>
<plugin>
<groupId>com.googlecode.maven-download-plugin</groupId>
<artifactId>download-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<id>download-pyspark-files</id>
<phase>validate</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>${spark.download.url}</url>
<unpack>true</unpack>
<outputDirectory>${project.build.directory}/spark-dist</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.7</version>
<executions>
<execution>
<id>download-and-copy-sparkr-files</id>
<phase>generate-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/spark/R</outputDirectory>
<resources>
<resource>
<directory>${project.build.directory}/spark-dist/spark-${spark.version}/R/lib/SparkR</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
@ -924,28 +973,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>copy-rscala</id>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<copy todir="${user.dir}/interpreter/spark/R"
file="${user.dir}/spark/lib/rscala/inst/java/rscala_${scala.binary.version}-${rscala.version}.jar"/>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -239,14 +239,6 @@
<version>${jsoup.version}</version>
</dependency>
<dependency>
<groupId>org.ddahl</groupId>
<artifactId>rscala</artifactId>
<version>${rscala.version}</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/rscala/inst/java/rscala_${scala.binary.version}-${rscala.version}.jar</systemPath>
</dependency>
<!--TEST-->
<dependency>
<groupId>org.scalatest</groupId>
@ -321,7 +313,6 @@
<exclude>**/metastore_db/</exclude>
<exclude>**/README.md</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>**/lib/rscala/**</exclude>
</excludes>
</configuration>
</plugin>

View file

@ -21,16 +21,15 @@ import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkRBackend;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@ -42,45 +41,70 @@ public class SparkRInterpreter extends Interpreter {
private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class);
private static String renderOptions;
private ZeppelinR zeppelinR;
static {
Interpreter.register(
"r",
"R",
"spark",
SparkRInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.master",
SparkInterpreter.getSystemDefault("MASTER", "spark.master", "local[*]"),
"Spark master uri. ex) spark://masterhost:7077")
.add("spark.home",
SparkInterpreter.getSystemDefault("SPARK_HOME", "spark.home", "/opt/spark"),
"Spark distribution location")
.add("zeppelin.R.image.width",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH",
"zeppelin.R.image.width", "100%"),
"")
.add("zeppelin.R.render.options",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS",
"zeppelin.R.render.options",
"out.format = 'html', comment = NA, "
+ "echo = FALSE, results = 'asis', message = F, warning = F"),
"")
.build());
.add("zeppelin.R.cmd",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_CMD", "zeppelin.R.cmd", "R"),
"R repl path")
.add("zeppelin.R.image.width",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH",
"zeppelin.R.image.width", "100%"),
"")
.add("zeppelin.R.render.options",
SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS",
"zeppelin.R.render.options",
"out.format = 'html', comment = NA, "
+ "echo = FALSE, results = 'asis', message = F, warning = F"),
"")
.build());
}
public SparkRInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
zeppelinR().open(getProperty("spark.master"),
"/opt/spark", getSparkInterpreter());
String rCmdPath = getProperty("zeppelin.R.cmd");
String sparkRLibPath;
if (System.getenv("SPARK_HOME") != null) {
sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib";
} else {
sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R";
}
synchronized (SparkRBackend.backend()) {
if (!SparkRBackend.isStarted()) {
SparkRBackend.init();
SparkRBackend.start();
}
}
int port = SparkRBackend.port();
// SparkInterpreter sparkInterpreter = getSparkInterpreter();
// ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext());
// ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port);
try {
zeppelinR.open();
} catch (IOException e) {
throw new InterpreterException(e);
}
renderOptions = getProperty("zeppelin.R.render.options");
}
@Override
public InterpreterResult interpret(String lines, InterpreterContext contextInterpreter) {
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {
String imageWidth = getProperty("zeppelin.R.image.width");
@ -101,11 +125,14 @@ public class SparkRInterpreter extends Interpreter {
}
}
try {
interpreterContext.out.clear();
zeppelinR.setInterpreterOutput(interpreterContext.out);
zeppelinR().set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
zeppelinR().eval(".zres <- knit2html(text=.zcmd)");
String html = zeppelinR().getS0(".zres");
try {
/*
zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
zeppelinR.eval(".zres <- knit2html(text=.zcmd)");
String html = zeppelinR.getS0(".zres");
RDisplay rDisplay = render(html, imageWidth);
@ -114,8 +141,10 @@ public class SparkRInterpreter extends Interpreter {
rDisplay.type(),
rDisplay.content()
);
*/
zeppelinR.eval(lines);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
} catch (Exception e) {
logger.error("Exception while connecting to R", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
@ -129,7 +158,7 @@ public class SparkRInterpreter extends Interpreter {
@Override
public void close() {
zeppelinR().close();
zeppelinR.close();
}
@Override
@ -175,44 +204,4 @@ public class SparkRInterpreter extends Interpreter {
return spark;
}
protected static ZeppelinRFactory zeppelinR() {
return ZeppelinRFactory.instance();
}
/**
* Java Factory to support tests with Mockito.
*
* (Mockito can not mock the zeppelinR final scala object class).
*/
protected static class ZeppelinRFactory {
private static ZeppelinRFactory instance;
private static ZeppelinR zeppelinR;
private ZeppelinRFactory() {
// Singleton
}
protected static synchronized ZeppelinRFactory instance() {
if (instance == null) instance = new ZeppelinRFactory();
return instance;
}
protected void open(String master, String sparkHome, SparkInterpreter sparkInterpreter) {
zeppelinR.open(master, sparkHome, sparkInterpreter);
}
protected Object eval(String command) {
return zeppelinR.eval(command);
}
protected void set(String key, Object value) {
zeppelinR.set(key, value);
}
protected Object get(String key) {
return zeppelinR.get(key);
}
protected String getS0(String key) {
return zeppelinR.getS0(key);
}
protected void close() {
zeppelinR.close();
}
}
}

View file

@ -0,0 +1,401 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import parquet.org.slf4j.Logger;
import parquet.org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* R repl interaction
*/
public class ZeppelinR implements ExecuteResultHandler {
Logger logger = LoggerFactory.getLogger(ZeppelinR.class);
private final String rCmdPath;
private DefaultExecutor executor;
private SparkOutputStream outputStream;
private PipedOutputStream input;
private final String scriptPath;
private final String libPath;
static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap(
new HashMap<Integer, ZeppelinR>());
private InterpreterOutput initialOutput;
private final int port;
private boolean rScriptRunning;
/**
* To be notified R repl initialization
*/
boolean rScriptInitialized = false;
Integer rScriptInitializeNotifier = new Integer(0);
/**
* Request to R repl
*/
Request rRequestObject = null;
Integer rRequestNotifier = new Integer(0);
/**
* Request object
*
* type : "eval", "set", "get"
* stmt : statement to evaluate when type is "eval"
* key when type is "set" or "get"
* value : value object when type is "put"
*/
public static class Request {
String type;
String stmt;
Object value;
public Request(String type, String stmt, Object value) {
this.type = type;
this.stmt = stmt;
this.value = value;
}
public String getType() {
return type;
}
public String getStmt() {
return stmt;
}
public Object getValue() {
return value;
}
}
/**
* Response from R repl
*/
Object rResponseValue = null;
boolean rResponseError = false;
Integer rResponseNotifier = new Integer(0);
/**
* Create ZeppelinR instance
* @param rCmdPath R repl commandline path
* @param libPath sparkr library path
*/
public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort) {
this.rCmdPath = rCmdPath;
this.libPath = libPath;
this.port = sparkRBackendPort;
scriptPath = System.getProperty("java.io.tmpdir") + "/zeppelin_sparkr.R";
}
/**
* Start R repl
* @throws IOException
*/
public void open() throws IOException {
createRScript();
zeppelinR.put(hashCode(), this);
CommandLine cmd = CommandLine.parse(rCmdPath);
cmd.addArgument("--no-save");
cmd.addArgument("--no-restore");
cmd.addArgument("-f");
cmd.addArgument(scriptPath);
cmd.addArgument("--args");
cmd.addArgument(Integer.toString(hashCode()));
cmd.addArgument(Integer.toString(port));
cmd.addArgument(libPath);
executor = new DefaultExecutor();
outputStream = new SparkOutputStream();
input = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(input);
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in);
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
executor.setStreamHandler(streamHandler);
Map env = EnvironmentUtils.getProcEnvironment();
initialOutput = new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
logger.debug(new String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
});
//outputStream.setInterpreterOutput(initialOutput);
executor.execute(cmd, env, this);
rScriptRunning = true;
}
/**
* Evaluate expression
* @param expr
* @return
*/
public Object eval(String expr) {
synchronized (this) {
rRequestObject = new Request("eval", expr, null);
return request();
}
}
/**
* assign value to key
* @param key
* @param value
*/
public void set(String key, Object value) {
synchronized (this) {
rRequestObject = new Request("set", key, value);
request();
}
}
/**
* get value of key
* @param key
* @return
*/
public Object get(String key) {
synchronized (this) {
rRequestObject = new Request("get", key, null);
return request();
}
}
/**
* get value of key, as a string
* @param key
* @return
*/
public String getS0(String key) {
synchronized (this) {
rRequestObject = new Request("getS", key, null);
return (String) request();
}
}
/**
* Send request to r repl and return response
* @return responseValue
*/
private Object request() throws RuntimeException {
if (!rScriptRunning) {
throw new RuntimeException("r repl is not running");
}
// wait for rscript initialized
if (!rScriptInitialized) {
waitForRScriptInitialized();
}
rResponseValue = null;
synchronized (rRequestNotifier) {
rRequestNotifier.notify();
}
Object respValue = null;
synchronized (rResponseNotifier) {
while (rResponseValue == null && rScriptRunning) {
try {
rResponseNotifier.wait(1000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
respValue = rResponseValue;
rResponseValue = null;
}
if (rResponseError) {
throw new RuntimeException(respValue.toString());
} else {
return respValue;
}
}
/**
* Wait until src/main/resources/R/zeppelin_sparkr.R is initialized
* and call onScriptInitialized()
*
* @throws InterpreterException
*/
private void waitForRScriptInitialized() throws InterpreterException {
synchronized (rScriptInitializeNotifier) {
long startTime = System.nanoTime();
while (rScriptInitialized == false &&
rScriptRunning &&
System.nanoTime() - startTime < 10L * 1000 * 1000000) {
try {
rScriptInitializeNotifier.wait(1000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
String errorMessage = "";
try {
initialOutput.flush();
errorMessage = new String(initialOutput.toByteArray());
} catch (IOException e) {
e.printStackTrace();
}
if (rScriptInitialized == false) {
throw new InterpreterException("sparkr is not responding " + errorMessage);
}
}
/**
* invoked by src/main/resources/R/zeppelin_sparkr.R
* @return
*/
public Request getRequest() {
synchronized (rRequestNotifier) {
while (rRequestObject == null) {
try {
rRequestNotifier.wait(1000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
Request req = rRequestObject;
rRequestObject = null;
return req;
}
}
/**
* invoked by src/main/resources/R/zeppelin_sparkr.R
* @param value
* @param error
*/
public void setResponse(Object value, boolean error) {
synchronized (rResponseNotifier) {
rResponseValue = value;
rResponseError = error;
rResponseNotifier.notify();
}
}
/**
* invoked by src/main/resources/R/zeppelin_sparkr.R
*/
public void onScriptInitialized() {
synchronized (rScriptInitializeNotifier) {
rScriptInitialized = true;
rScriptInitializeNotifier.notifyAll();
}
}
/**
* Create R script in tmp dir
*/
private void createRScript() {
ClassLoader classLoader = getClass().getClassLoader();
File out = new File(scriptPath);
if (out.exists() && out.isDirectory()) {
throw new InterpreterException("Can't create r script " + out.getAbsolutePath());
}
try {
FileOutputStream outStream = new FileOutputStream(out);
IOUtils.copy(
classLoader.getResourceAsStream("R/zeppelin_sparkr.R"),
outStream);
outStream.close();
} catch (IOException e) {
throw new InterpreterException(e);
}
logger.info("File {} created", scriptPath);
}
/**
* Terminate this R repl
*/
public void close() {
executor.getWatchdog().destroyProcess();
new File(scriptPath).delete();
zeppelinR.remove(hashCode());
}
/**
* Get instance
* This method will be invoded from zeppelin_sparkr.R
* @param hashcode
* @return
*/
public static ZeppelinR getZeppelinR(int hashcode) {
return zeppelinR.get(hashcode);
}
/**
* Pass InterpreterOutput to capture the repl output
* @param out
*/
public void setInterpreterOutput(InterpreterOutput out) {
outputStream.setInterpreterOutput(out);
}
@Override
public void onProcessComplete(int i) {
logger.info("process complete {}", i);
rScriptRunning = false;
}
@Override
public void onProcessFailed(ExecuteException e) {
logger.error(e.getMessage(), e);
rScriptRunning = false;
}
}

View file

@ -0,0 +1,83 @@
args <- commandArgs(trailingOnly = TRUE)
hashCode <- as.integer(args[1])
port <- as.integer(args[2])
libPath <- args[3]
rm(args)
print(paste("Port ", toString(port)))
print(paste("LibPath ", libPath))
.libPaths(c(file.path(libPath), .libPaths()))
library(SparkR)
SparkR:::connectBackend("localhost", port)
# scStartTime is needed by R/pkg/R/sparkR.R
assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
# getZeppelinR
.zeppelinR = SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinR", "getZeppelinR", hashCode)
# setup spark env
assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv)
z.put <- function(name, object) {
SparkR:::callJMethod(.zeppelinContext, "put", name, object)
}
z.get <- function(name) {
SparkR:::callJMethod(.zeppelinContext, "get", name)
}
z.input <- function(name, value) {
SparkR:::callJMethod(.zeppelinContext, "input", name, value)
}
library("knitr")
# notify script is initialized
SparkR:::callJMethod(.zeppelinR, "onScriptInitialized")
while (TRUE) {
req <- SparkR:::callJMethod(.zeppelinR, "getRequest")
type <- SparkR:::callJMethod(req, "getType")
stmt <- SparkR:::callJMethod(req, "getStmt")
value <- SparkR:::callJMethod(req, "getValue")
if (type == "eval") {
tryCatch({
ret <- eval(parse(text=stmt))
SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE)
}, error = function(e) {
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
})
} else if (type == "set") {
tryCatch({
ret <- assign(stmt, value)
SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE)
}, error = function(e) {
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
})
} else if (type == "get") {
tryCatch({
ret <- eval(parse(text=stmt))
SparkR:::callJMethod(.zeppelinR, "setResponse", ret, FALSE)
}, error = function(e) {
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
})
} else if (type == "getS") {
tryCatch({
ret <- eval(parse(text=stmt))
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(ret), FALSE)
}, error = function(e) {
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
})
} else {
# unsupported type
SparkR:::callJMethod(.zeppelinR, "setResponse", paste("Unsupported type ", type), TRUE)
}
}

View file

@ -20,6 +20,8 @@ import org.apache.spark.api.r.RBackend
object SparkRBackend {
val backend : RBackend = new RBackend()
private var started = false;
private var portNumber = 0;
val backendThread : Thread = new Thread("SparkRBackend") {
override def run() {
@ -27,10 +29,26 @@ object SparkRBackend {
}
}
def init() : Int = backend.init()
def init() : Int = {
portNumber = backend.init()
portNumber
}
def start() : Unit = backendThread.start()
def start() : Unit = {
backendThread.start()
started = true
}
def close() : Unit = backend.close()
def close() : Unit = {
backend.close()
backendThread.join()
}
def isStarted() : Boolean = {
started
}
def port(): Int = {
return portNumber
}
}

View file

@ -1,126 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark
import org.apache.spark.SparkRBackend
import org.ddahl.rscala.callback._
object ZeppelinR {
private val R = RClient()
def open(master: String = "local[*]", sparkHome: String = "/opt/spark", sparkInterpreter: SparkInterpreter): Unit = {
eval(
s"""
|Sys.setenv(SPARK_HOME="$sparkHome")
|.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
|library(SparkR)
""".stripMargin
)
// See ./core/src/main/scala/org/apache/spark/deploy/RRunner.scala for RBackend usage
val port = SparkRBackend.init()
SparkRBackend.start()
eval(
s"""
|SparkR:::connectBackend("localhost", ${port})
|""".stripMargin)
// scStartTime is needed by R/pkg/R/sparkR.R
eval(
"""
|assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
""".stripMargin)
ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext())
eval(
"""
|assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
|""".stripMargin)
eval(
"""
|assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
""".stripMargin)
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext())
eval(
"""
|assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
|""".stripMargin)
eval(
"""
|assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
|""".stripMargin)
ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext())
eval(
"""
|assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv)
|""".stripMargin
)
eval(
"""
|z.put <- function(name, object) {
| SparkR:::callJMethod(.zeppelinContext, "put", name, object)
|}
|z.get <- function(name) {
| SparkR:::callJMethod(.zeppelinContext, "get", name)
|}
|z.input <- function(name, value) {
| SparkR:::callJMethod(.zeppelinContext, "input", name, value)
|}
|""".stripMargin
)
eval(
"""
|library("knitr")
""".stripMargin)
}
def eval(command: String): Any = {
try {
R.eval(command)
} catch {
case e: Exception => throw new RuntimeException(e.getMessage + " - Given R command=" + command)
}
}
def set(key: String, value: AnyRef): Unit = {
R.set(key, value)
}
def get(key: String): Any = {
R.get(key)._1
}
def getS0(key: String): String = {
R.getS0(key)
}
def close():Unit = {
R.eval("""
|sparkR.stop()
""".stripMargin
)
}
}

View file

@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.*;
import org.apache.spark.SparkRBackend;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
@ -43,7 +44,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest(SparkRInterpreter.ZeppelinRFactory.class)
@PowerMockIgnore({"org.apache.spark.*", "org.apache.hadoop.*", "akka.*", "org.w3c.*", "javax.xml.*", "org.xml.*", "scala.*", "org.apache.cxf.*"})
public class SparkRInterpreterTest {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkRInterpreterTest.class);
@ -54,7 +54,6 @@ public class SparkRInterpreterTest {
private static InterpreterContext context;
private static InterpreterGroup intpGroup;
private static SparkInterpreter sparkInterpreter;
private static SparkRInterpreter.ZeppelinRFactory zeppelinRFactory;
private static SparkRInterpreter sparkRInterpreter;
@BeforeClass
@ -70,6 +69,7 @@ public class SparkRInterpreterTest {
assertEquals(InterpreterResult.Type.TEXT, ret.type());
}
private static void initInterpreters() {
Properties p = new Properties();
@ -77,13 +77,6 @@ public class SparkRInterpreterTest {
sparkInterpreter = new SparkInterpreter(p);
intpGroup = new InterpreterGroup();
zeppelinRFactory = mock(SparkRInterpreter.ZeppelinRFactory.class);
doNothing().when(zeppelinRFactory).open(Mockito.anyString(), Mockito.anyString(), any(SparkInterpreter.class));
when(zeppelinRFactory.getS0(anyString())).thenReturn(MOCK_RSCALA_RESULT);
mockStatic(SparkRInterpreter.ZeppelinRFactory.class);
when(SparkRInterpreter.ZeppelinRFactory.instance()).thenReturn(zeppelinRFactory);
sparkRInterpreter = new SparkRInterpreter(p);
sparkRInterpreter.setInterpreterGroup(intpGroup);
sparkRInterpreter.open();

View file

@ -0,0 +1,72 @@
package org.apache.zeppelin.spark;
import org.apache.spark.SparkRBackend;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.*;
import org.junit.runners.MethodSorters;
import java.io.File;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* SparkR
*/
public class ZeppelinRTest implements InterpreterOutputListener {
private static ZeppelinR zr;
@BeforeClass
public static void setUp() throws IOException {
int port = SparkRBackend.init();
SparkRBackend.start();
zr = new ZeppelinR("/Library/Frameworks/R.framework/Resources/bin/R",
new File("../spark-1.6.0-bin-hadoop2.6/R/lib").getAbsolutePath(),
port);
zr.open();
}
@AfterClass
public static void tearDown() {
zr.close();
SparkRBackend.close();
}
@Test
public void testEval() throws IOException, InterruptedException {
zr.eval("a = 1+1");
assertEquals(2.0, zr.get("a"));
}
@Test
public void testEvalError() {
try {
zr.eval("nonExistObject");
assertTrue(false);
} catch (RuntimeException e) {
assertTrue(true);
}
zr.eval("a = \"Hello\"");
}
@Test
public void testSetGet() {
zr.set("a", 1);
assertEquals(1, zr.get("a"));
}
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}