mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Remove rscala dependency
This commit is contained in:
parent
1e2c99bbf3
commit
9df95351ad
9 changed files with 688 additions and 240 deletions
|
|
@ -794,6 +794,55 @@
|
|||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>sparkr</id>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>com.googlecode.maven-download-plugin</groupId>
|
||||
<artifactId>download-maven-plugin</artifactId>
|
||||
<version>1.2.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>download-pyspark-files</id>
|
||||
<phase>validate</phase>
|
||||
<goals>
|
||||
<goal>wget</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<url>${spark.download.url}</url>
|
||||
<unpack>true</unpack>
|
||||
<outputDirectory>${project.build.directory}/spark-dist</outputDirectory>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-resources-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>download-and-copy-sparkr-files</id>
|
||||
<phase>generate-resources</phase>
|
||||
<goals>
|
||||
<goal>copy-resources</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/spark/R</outputDirectory>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>${project.build.directory}/spark-dist/spark-${spark.version}/R/lib/SparkR</directory>
|
||||
</resource>
|
||||
</resources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
|
||||
</profiles>
|
||||
|
||||
<build>
|
||||
|
|
@ -924,28 +973,6 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-rscala</id>
|
||||
<phase>generate-resources</phase>
|
||||
<goals>
|
||||
<goal>run</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<target>
|
||||
<copy todir="${user.dir}/interpreter/spark/R"
|
||||
file="${user.dir}/spark/lib/rscala/inst/java/rscala_${scala.binary.version}-${rscala.version}.jar"/>
|
||||
</target>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -239,14 +239,6 @@
|
|||
<version>${jsoup.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.ddahl</groupId>
|
||||
<artifactId>rscala</artifactId>
|
||||
<version>${rscala.version}</version>
|
||||
<scope>system</scope>
|
||||
<systemPath>${project.basedir}/lib/rscala/inst/java/rscala_${scala.binary.version}-${rscala.version}.jar</systemPath>
|
||||
</dependency>
|
||||
|
||||
<!--TEST-->
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
|
|
@ -321,7 +313,6 @@
|
|||
<exclude>**/metastore_db/</exclude>
|
||||
<exclude>**/README.md</exclude>
|
||||
<exclude>**/dependency-reduced-pom.xml</exclude>
|
||||
<exclude>**/lib/rscala/**</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
|||
|
|
@ -21,16 +21,15 @@ import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
|
|||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.spark.SparkRBackend;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.jsoup.Jsoup;
|
||||
import org.jsoup.nodes.Document;
|
||||
import org.jsoup.nodes.Element;
|
||||
import org.jsoup.select.Elements;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
|
@ -42,45 +41,70 @@ public class SparkRInterpreter extends Interpreter {
|
|||
private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class);
|
||||
|
||||
private static String renderOptions;
|
||||
private ZeppelinR zeppelinR;
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"r",
|
||||
"R",
|
||||
"spark",
|
||||
SparkRInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add("spark.master",
|
||||
SparkInterpreter.getSystemDefault("MASTER", "spark.master", "local[*]"),
|
||||
"Spark master uri. ex) spark://masterhost:7077")
|
||||
.add("spark.home",
|
||||
SparkInterpreter.getSystemDefault("SPARK_HOME", "spark.home", "/opt/spark"),
|
||||
"Spark distribution location")
|
||||
.add("zeppelin.R.image.width",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH",
|
||||
"zeppelin.R.image.width", "100%"),
|
||||
"")
|
||||
.add("zeppelin.R.render.options",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS",
|
||||
"zeppelin.R.render.options",
|
||||
"out.format = 'html', comment = NA, "
|
||||
+ "echo = FALSE, results = 'asis', message = F, warning = F"),
|
||||
"")
|
||||
.build());
|
||||
.add("zeppelin.R.cmd",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_CMD", "zeppelin.R.cmd", "R"),
|
||||
"R repl path")
|
||||
.add("zeppelin.R.image.width",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH",
|
||||
"zeppelin.R.image.width", "100%"),
|
||||
"")
|
||||
.add("zeppelin.R.render.options",
|
||||
SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS",
|
||||
"zeppelin.R.render.options",
|
||||
"out.format = 'html', comment = NA, "
|
||||
+ "echo = FALSE, results = 'asis', message = F, warning = F"),
|
||||
"")
|
||||
.build());
|
||||
}
|
||||
|
||||
|
||||
public SparkRInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
zeppelinR().open(getProperty("spark.master"),
|
||||
"/opt/spark", getSparkInterpreter());
|
||||
String rCmdPath = getProperty("zeppelin.R.cmd");
|
||||
String sparkRLibPath;
|
||||
|
||||
if (System.getenv("SPARK_HOME") != null) {
|
||||
sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib";
|
||||
} else {
|
||||
sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R";
|
||||
}
|
||||
|
||||
synchronized (SparkRBackend.backend()) {
|
||||
if (!SparkRBackend.isStarted()) {
|
||||
SparkRBackend.init();
|
||||
SparkRBackend.start();
|
||||
}
|
||||
}
|
||||
|
||||
int port = SparkRBackend.port();
|
||||
|
||||
// SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
// ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext());
|
||||
// ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
|
||||
|
||||
zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port);
|
||||
try {
|
||||
zeppelinR.open();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
renderOptions = getProperty("zeppelin.R.render.options");
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String lines, InterpreterContext contextInterpreter) {
|
||||
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {
|
||||
|
||||
String imageWidth = getProperty("zeppelin.R.image.width");
|
||||
|
||||
|
|
@ -101,11 +125,14 @@ public class SparkRInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
try {
|
||||
interpreterContext.out.clear();
|
||||
zeppelinR.setInterpreterOutput(interpreterContext.out);
|
||||
|
||||
zeppelinR().set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
|
||||
zeppelinR().eval(".zres <- knit2html(text=.zcmd)");
|
||||
String html = zeppelinR().getS0(".zres");
|
||||
try {
|
||||
/*
|
||||
zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
|
||||
zeppelinR.eval(".zres <- knit2html(text=.zcmd)");
|
||||
String html = zeppelinR.getS0(".zres");
|
||||
|
||||
RDisplay rDisplay = render(html, imageWidth);
|
||||
|
||||
|
|
@ -114,8 +141,10 @@ public class SparkRInterpreter extends Interpreter {
|
|||
rDisplay.type(),
|
||||
rDisplay.content()
|
||||
);
|
||||
*/
|
||||
zeppelinR.eval(lines);
|
||||
|
||||
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
} catch (Exception e) {
|
||||
logger.error("Exception while connecting to R", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
|
|
@ -129,7 +158,7 @@ public class SparkRInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
zeppelinR().close();
|
||||
zeppelinR.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -175,44 +204,4 @@ public class SparkRInterpreter extends Interpreter {
|
|||
return spark;
|
||||
}
|
||||
|
||||
protected static ZeppelinRFactory zeppelinR() {
|
||||
return ZeppelinRFactory.instance();
|
||||
}
|
||||
|
||||
/**
|
||||
* Java Factory to support tests with Mockito.
|
||||
*
|
||||
* (Mockito can not mock the zeppelinR final scala object class).
|
||||
*/
|
||||
protected static class ZeppelinRFactory {
|
||||
private static ZeppelinRFactory instance;
|
||||
private static ZeppelinR zeppelinR;
|
||||
private ZeppelinRFactory() {
|
||||
// Singleton
|
||||
}
|
||||
|
||||
protected static synchronized ZeppelinRFactory instance() {
|
||||
if (instance == null) instance = new ZeppelinRFactory();
|
||||
return instance;
|
||||
}
|
||||
protected void open(String master, String sparkHome, SparkInterpreter sparkInterpreter) {
|
||||
zeppelinR.open(master, sparkHome, sparkInterpreter);
|
||||
}
|
||||
protected Object eval(String command) {
|
||||
return zeppelinR.eval(command);
|
||||
}
|
||||
protected void set(String key, Object value) {
|
||||
zeppelinR.set(key, value);
|
||||
}
|
||||
protected Object get(String key) {
|
||||
return zeppelinR.get(key);
|
||||
}
|
||||
protected String getS0(String key) {
|
||||
return zeppelinR.getS0(key);
|
||||
}
|
||||
protected void close() {
|
||||
zeppelinR.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
401
spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
Normal file
401
spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
Normal file
|
|
@ -0,0 +1,401 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.spark;
|
||||
|
||||
import org.apache.commons.exec.*;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
import parquet.org.slf4j.Logger;
|
||||
import parquet.org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* R repl interaction
|
||||
*/
|
||||
public class ZeppelinR implements ExecuteResultHandler {
|
||||
Logger logger = LoggerFactory.getLogger(ZeppelinR.class);
|
||||
private final String rCmdPath;
|
||||
private DefaultExecutor executor;
|
||||
private SparkOutputStream outputStream;
|
||||
private PipedOutputStream input;
|
||||
private final String scriptPath;
|
||||
private final String libPath;
|
||||
static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap(
|
||||
new HashMap<Integer, ZeppelinR>());
|
||||
|
||||
private InterpreterOutput initialOutput;
|
||||
private final int port;
|
||||
private boolean rScriptRunning;
|
||||
|
||||
/**
|
||||
* To be notified R repl initialization
|
||||
*/
|
||||
boolean rScriptInitialized = false;
|
||||
Integer rScriptInitializeNotifier = new Integer(0);
|
||||
|
||||
|
||||
/**
|
||||
* Request to R repl
|
||||
*/
|
||||
Request rRequestObject = null;
|
||||
Integer rRequestNotifier = new Integer(0);
|
||||
|
||||
/**
|
||||
* Request object
|
||||
*
|
||||
* type : "eval", "set", "get"
|
||||
* stmt : statement to evaluate when type is "eval"
|
||||
* key when type is "set" or "get"
|
||||
* value : value object when type is "put"
|
||||
*/
|
||||
public static class Request {
|
||||
String type;
|
||||
String stmt;
|
||||
Object value;
|
||||
|
||||
public Request(String type, String stmt, Object value) {
|
||||
this.type = type;
|
||||
this.stmt = stmt;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getStmt() {
|
||||
return stmt;
|
||||
}
|
||||
|
||||
public Object getValue() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Response from R repl
|
||||
*/
|
||||
Object rResponseValue = null;
|
||||
boolean rResponseError = false;
|
||||
Integer rResponseNotifier = new Integer(0);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Create ZeppelinR instance
|
||||
* @param rCmdPath R repl commandline path
|
||||
* @param libPath sparkr library path
|
||||
*/
|
||||
public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort) {
|
||||
this.rCmdPath = rCmdPath;
|
||||
this.libPath = libPath;
|
||||
this.port = sparkRBackendPort;
|
||||
scriptPath = System.getProperty("java.io.tmpdir") + "/zeppelin_sparkr.R";
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Start R repl
|
||||
* @throws IOException
|
||||
*/
|
||||
public void open() throws IOException {
|
||||
createRScript();
|
||||
|
||||
zeppelinR.put(hashCode(), this);
|
||||
|
||||
CommandLine cmd = CommandLine.parse(rCmdPath);
|
||||
cmd.addArgument("--no-save");
|
||||
cmd.addArgument("--no-restore");
|
||||
cmd.addArgument("-f");
|
||||
cmd.addArgument(scriptPath);
|
||||
cmd.addArgument("--args");
|
||||
cmd.addArgument(Integer.toString(hashCode()));
|
||||
cmd.addArgument(Integer.toString(port));
|
||||
cmd.addArgument(libPath);
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
outputStream = new SparkOutputStream();
|
||||
|
||||
input = new PipedOutputStream();
|
||||
PipedInputStream in = new PipedInputStream(input);
|
||||
|
||||
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in);
|
||||
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
|
||||
executor.setStreamHandler(streamHandler);
|
||||
Map env = EnvironmentUtils.getProcEnvironment();
|
||||
|
||||
|
||||
initialOutput = new InterpreterOutput(new InterpreterOutputListener() {
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
logger.debug(new String(line));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(InterpreterOutput out, byte[] output) {
|
||||
}
|
||||
});
|
||||
//outputStream.setInterpreterOutput(initialOutput);
|
||||
executor.execute(cmd, env, this);
|
||||
rScriptRunning = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluate expression
|
||||
* @param expr
|
||||
* @return
|
||||
*/
|
||||
public Object eval(String expr) {
|
||||
synchronized (this) {
|
||||
rRequestObject = new Request("eval", expr, null);
|
||||
return request();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* assign value to key
|
||||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
public void set(String key, Object value) {
|
||||
synchronized (this) {
|
||||
rRequestObject = new Request("set", key, value);
|
||||
request();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get value of key
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public Object get(String key) {
|
||||
synchronized (this) {
|
||||
rRequestObject = new Request("get", key, null);
|
||||
return request();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get value of key, as a string
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public String getS0(String key) {
|
||||
synchronized (this) {
|
||||
rRequestObject = new Request("getS", key, null);
|
||||
return (String) request();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Send request to r repl and return response
|
||||
* @return responseValue
|
||||
*/
|
||||
private Object request() throws RuntimeException {
|
||||
if (!rScriptRunning) {
|
||||
throw new RuntimeException("r repl is not running");
|
||||
}
|
||||
|
||||
// wait for rscript initialized
|
||||
if (!rScriptInitialized) {
|
||||
waitForRScriptInitialized();
|
||||
}
|
||||
|
||||
rResponseValue = null;
|
||||
|
||||
synchronized (rRequestNotifier) {
|
||||
rRequestNotifier.notify();
|
||||
}
|
||||
|
||||
Object respValue = null;
|
||||
synchronized (rResponseNotifier) {
|
||||
while (rResponseValue == null && rScriptRunning) {
|
||||
try {
|
||||
rResponseNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
respValue = rResponseValue;
|
||||
rResponseValue = null;
|
||||
}
|
||||
|
||||
if (rResponseError) {
|
||||
throw new RuntimeException(respValue.toString());
|
||||
} else {
|
||||
return respValue;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Wait until src/main/resources/R/zeppelin_sparkr.R is initialized
|
||||
* and call onScriptInitialized()
|
||||
*
|
||||
* @throws InterpreterException
|
||||
*/
|
||||
private void waitForRScriptInitialized() throws InterpreterException {
|
||||
synchronized (rScriptInitializeNotifier) {
|
||||
long startTime = System.nanoTime();
|
||||
while (rScriptInitialized == false &&
|
||||
rScriptRunning &&
|
||||
System.nanoTime() - startTime < 10L * 1000 * 1000000) {
|
||||
try {
|
||||
rScriptInitializeNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String errorMessage = "";
|
||||
try {
|
||||
initialOutput.flush();
|
||||
errorMessage = new String(initialOutput.toByteArray());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
if (rScriptInitialized == false) {
|
||||
throw new InterpreterException("sparkr is not responding " + errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* invoked by src/main/resources/R/zeppelin_sparkr.R
|
||||
* @return
|
||||
*/
|
||||
public Request getRequest() {
|
||||
synchronized (rRequestNotifier) {
|
||||
while (rRequestObject == null) {
|
||||
try {
|
||||
rRequestNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
Request req = rRequestObject;
|
||||
rRequestObject = null;
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* invoked by src/main/resources/R/zeppelin_sparkr.R
|
||||
* @param value
|
||||
* @param error
|
||||
*/
|
||||
public void setResponse(Object value, boolean error) {
|
||||
synchronized (rResponseNotifier) {
|
||||
rResponseValue = value;
|
||||
rResponseError = error;
|
||||
rResponseNotifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* invoked by src/main/resources/R/zeppelin_sparkr.R
|
||||
*/
|
||||
public void onScriptInitialized() {
|
||||
synchronized (rScriptInitializeNotifier) {
|
||||
rScriptInitialized = true;
|
||||
rScriptInitializeNotifier.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create R script in tmp dir
|
||||
*/
|
||||
private void createRScript() {
|
||||
ClassLoader classLoader = getClass().getClassLoader();
|
||||
File out = new File(scriptPath);
|
||||
|
||||
if (out.exists() && out.isDirectory()) {
|
||||
throw new InterpreterException("Can't create r script " + out.getAbsolutePath());
|
||||
}
|
||||
|
||||
try {
|
||||
FileOutputStream outStream = new FileOutputStream(out);
|
||||
IOUtils.copy(
|
||||
classLoader.getResourceAsStream("R/zeppelin_sparkr.R"),
|
||||
outStream);
|
||||
outStream.close();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
logger.info("File {} created", scriptPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminate this R repl
|
||||
*/
|
||||
public void close() {
|
||||
executor.getWatchdog().destroyProcess();
|
||||
new File(scriptPath).delete();
|
||||
zeppelinR.remove(hashCode());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get instance
|
||||
* This method will be invoded from zeppelin_sparkr.R
|
||||
* @param hashcode
|
||||
* @return
|
||||
*/
|
||||
public static ZeppelinR getZeppelinR(int hashcode) {
|
||||
return zeppelinR.get(hashcode);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Pass InterpreterOutput to capture the repl output
|
||||
* @param out
|
||||
*/
|
||||
public void setInterpreterOutput(InterpreterOutput out) {
|
||||
outputStream.setInterpreterOutput(out);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int i) {
|
||||
logger.info("process complete {}", i);
|
||||
rScriptRunning = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
rScriptRunning = false;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
83
spark/src/main/resources/R/zeppelin_sparkr.R
Normal file
83
spark/src/main/resources/R/zeppelin_sparkr.R
Normal file
|
|
@ -0,0 +1,83 @@
|
|||
args <- commandArgs(trailingOnly = TRUE)
|
||||
|
||||
hashCode <- as.integer(args[1])
|
||||
port <- as.integer(args[2])
|
||||
libPath <- args[3]
|
||||
rm(args)
|
||||
|
||||
print(paste("Port ", toString(port)))
|
||||
print(paste("LibPath ", libPath))
|
||||
|
||||
.libPaths(c(file.path(libPath), .libPaths()))
|
||||
library(SparkR)
|
||||
|
||||
|
||||
SparkR:::connectBackend("localhost", port)
|
||||
|
||||
# scStartTime is needed by R/pkg/R/sparkR.R
|
||||
assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
|
||||
|
||||
# getZeppelinR
|
||||
.zeppelinR = SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinR", "getZeppelinR", hashCode)
|
||||
|
||||
# setup spark env
|
||||
assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
|
||||
assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
|
||||
assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
|
||||
assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
|
||||
assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv)
|
||||
|
||||
z.put <- function(name, object) {
|
||||
SparkR:::callJMethod(.zeppelinContext, "put", name, object)
|
||||
}
|
||||
z.get <- function(name) {
|
||||
SparkR:::callJMethod(.zeppelinContext, "get", name)
|
||||
}
|
||||
z.input <- function(name, value) {
|
||||
SparkR:::callJMethod(.zeppelinContext, "input", name, value)
|
||||
}
|
||||
|
||||
library("knitr")
|
||||
|
||||
# notify script is initialized
|
||||
SparkR:::callJMethod(.zeppelinR, "onScriptInitialized")
|
||||
|
||||
while (TRUE) {
|
||||
req <- SparkR:::callJMethod(.zeppelinR, "getRequest")
|
||||
type <- SparkR:::callJMethod(req, "getType")
|
||||
stmt <- SparkR:::callJMethod(req, "getStmt")
|
||||
value <- SparkR:::callJMethod(req, "getValue")
|
||||
|
||||
if (type == "eval") {
|
||||
tryCatch({
|
||||
ret <- eval(parse(text=stmt))
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE)
|
||||
}, error = function(e) {
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
|
||||
})
|
||||
} else if (type == "set") {
|
||||
tryCatch({
|
||||
ret <- assign(stmt, value)
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE)
|
||||
}, error = function(e) {
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
|
||||
})
|
||||
} else if (type == "get") {
|
||||
tryCatch({
|
||||
ret <- eval(parse(text=stmt))
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", ret, FALSE)
|
||||
}, error = function(e) {
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
|
||||
})
|
||||
} else if (type == "getS") {
|
||||
tryCatch({
|
||||
ret <- eval(parse(text=stmt))
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(ret), FALSE)
|
||||
}, error = function(e) {
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE)
|
||||
})
|
||||
} else {
|
||||
# unsupported type
|
||||
SparkR:::callJMethod(.zeppelinR, "setResponse", paste("Unsupported type ", type), TRUE)
|
||||
}
|
||||
}
|
||||
|
|
@ -20,6 +20,8 @@ import org.apache.spark.api.r.RBackend
|
|||
|
||||
object SparkRBackend {
|
||||
val backend : RBackend = new RBackend()
|
||||
private var started = false;
|
||||
private var portNumber = 0;
|
||||
|
||||
val backendThread : Thread = new Thread("SparkRBackend") {
|
||||
override def run() {
|
||||
|
|
@ -27,10 +29,26 @@ object SparkRBackend {
|
|||
}
|
||||
}
|
||||
|
||||
def init() : Int = backend.init()
|
||||
def init() : Int = {
|
||||
portNumber = backend.init()
|
||||
portNumber
|
||||
}
|
||||
|
||||
def start() : Unit = backendThread.start()
|
||||
def start() : Unit = {
|
||||
backendThread.start()
|
||||
started = true
|
||||
}
|
||||
|
||||
def close() : Unit = backend.close()
|
||||
def close() : Unit = {
|
||||
backend.close()
|
||||
backendThread.join()
|
||||
}
|
||||
|
||||
def isStarted() : Boolean = {
|
||||
started
|
||||
}
|
||||
|
||||
def port(): Int = {
|
||||
return portNumber
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,126 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.spark
|
||||
|
||||
import org.apache.spark.SparkRBackend
|
||||
import org.ddahl.rscala.callback._
|
||||
|
||||
object ZeppelinR {
|
||||
|
||||
private val R = RClient()
|
||||
|
||||
def open(master: String = "local[*]", sparkHome: String = "/opt/spark", sparkInterpreter: SparkInterpreter): Unit = {
|
||||
|
||||
eval(
|
||||
s"""
|
||||
|Sys.setenv(SPARK_HOME="$sparkHome")
|
||||
|.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
|
||||
|library(SparkR)
|
||||
""".stripMargin
|
||||
)
|
||||
|
||||
// See ./core/src/main/scala/org/apache/spark/deploy/RRunner.scala for RBackend usage
|
||||
val port = SparkRBackend.init()
|
||||
SparkRBackend.start()
|
||||
eval(
|
||||
s"""
|
||||
|SparkR:::connectBackend("localhost", ${port})
|
||||
|""".stripMargin)
|
||||
|
||||
// scStartTime is needed by R/pkg/R/sparkR.R
|
||||
eval(
|
||||
"""
|
||||
|assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
|
||||
""".stripMargin)
|
||||
|
||||
ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext())
|
||||
eval(
|
||||
"""
|
||||
|assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
|
||||
|""".stripMargin)
|
||||
eval(
|
||||
"""
|
||||
|assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
|
||||
""".stripMargin)
|
||||
|
||||
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext())
|
||||
eval(
|
||||
"""
|
||||
|assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
|
||||
|""".stripMargin)
|
||||
eval(
|
||||
"""
|
||||
|assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
|
||||
|""".stripMargin)
|
||||
|
||||
ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext())
|
||||
eval(
|
||||
"""
|
||||
|assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv)
|
||||
|""".stripMargin
|
||||
)
|
||||
|
||||
eval(
|
||||
"""
|
||||
|z.put <- function(name, object) {
|
||||
| SparkR:::callJMethod(.zeppelinContext, "put", name, object)
|
||||
|}
|
||||
|z.get <- function(name) {
|
||||
| SparkR:::callJMethod(.zeppelinContext, "get", name)
|
||||
|}
|
||||
|z.input <- function(name, value) {
|
||||
| SparkR:::callJMethod(.zeppelinContext, "input", name, value)
|
||||
|}
|
||||
|""".stripMargin
|
||||
)
|
||||
|
||||
eval(
|
||||
"""
|
||||
|library("knitr")
|
||||
""".stripMargin)
|
||||
|
||||
}
|
||||
|
||||
def eval(command: String): Any = {
|
||||
try {
|
||||
R.eval(command)
|
||||
} catch {
|
||||
case e: Exception => throw new RuntimeException(e.getMessage + " - Given R command=" + command)
|
||||
}
|
||||
}
|
||||
|
||||
def set(key: String, value: AnyRef): Unit = {
|
||||
R.set(key, value)
|
||||
}
|
||||
|
||||
def get(key: String): Any = {
|
||||
R.get(key)._1
|
||||
}
|
||||
|
||||
def getS0(key: String): String = {
|
||||
R.getS0(key)
|
||||
}
|
||||
|
||||
def close():Unit = {
|
||||
R.eval("""
|
||||
|sparkR.stop()
|
||||
""".stripMargin
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyString;
|
|||
import static org.mockito.Mockito.when;
|
||||
import static org.powermock.api.mockito.PowerMockito.*;
|
||||
|
||||
import org.apache.spark.SparkRBackend;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
|
|
@ -43,7 +44,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
|
|||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest(SparkRInterpreter.ZeppelinRFactory.class)
|
||||
@PowerMockIgnore({"org.apache.spark.*", "org.apache.hadoop.*", "akka.*", "org.w3c.*", "javax.xml.*", "org.xml.*", "scala.*", "org.apache.cxf.*"})
|
||||
public class SparkRInterpreterTest {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SparkRInterpreterTest.class);
|
||||
|
|
@ -54,7 +54,6 @@ public class SparkRInterpreterTest {
|
|||
private static InterpreterContext context;
|
||||
private static InterpreterGroup intpGroup;
|
||||
private static SparkInterpreter sparkInterpreter;
|
||||
private static SparkRInterpreter.ZeppelinRFactory zeppelinRFactory;
|
||||
private static SparkRInterpreter sparkRInterpreter;
|
||||
|
||||
@BeforeClass
|
||||
|
|
@ -70,6 +69,7 @@ public class SparkRInterpreterTest {
|
|||
assertEquals(InterpreterResult.Type.TEXT, ret.type());
|
||||
}
|
||||
|
||||
|
||||
private static void initInterpreters() {
|
||||
|
||||
Properties p = new Properties();
|
||||
|
|
@ -77,13 +77,6 @@ public class SparkRInterpreterTest {
|
|||
sparkInterpreter = new SparkInterpreter(p);
|
||||
intpGroup = new InterpreterGroup();
|
||||
|
||||
zeppelinRFactory = mock(SparkRInterpreter.ZeppelinRFactory.class);
|
||||
doNothing().when(zeppelinRFactory).open(Mockito.anyString(), Mockito.anyString(), any(SparkInterpreter.class));
|
||||
when(zeppelinRFactory.getS0(anyString())).thenReturn(MOCK_RSCALA_RESULT);
|
||||
|
||||
mockStatic(SparkRInterpreter.ZeppelinRFactory.class);
|
||||
when(SparkRInterpreter.ZeppelinRFactory.instance()).thenReturn(zeppelinRFactory);
|
||||
|
||||
sparkRInterpreter = new SparkRInterpreter(p);
|
||||
sparkRInterpreter.setInterpreterGroup(intpGroup);
|
||||
sparkRInterpreter.open();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,72 @@
|
|||
package org.apache.zeppelin.spark;
|
||||
|
||||
import org.apache.spark.SparkRBackend;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.junit.*;
|
||||
import org.junit.runners.MethodSorters;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* SparkR
|
||||
*/
|
||||
|
||||
public class ZeppelinRTest implements InterpreterOutputListener {
|
||||
private static ZeppelinR zr;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws IOException {
|
||||
int port = SparkRBackend.init();
|
||||
SparkRBackend.start();
|
||||
zr = new ZeppelinR("/Library/Frameworks/R.framework/Resources/bin/R",
|
||||
new File("../spark-1.6.0-bin-hadoop2.6/R/lib").getAbsolutePath(),
|
||||
port);
|
||||
zr.open();
|
||||
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
zr.close();
|
||||
SparkRBackend.close();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEval() throws IOException, InterruptedException {
|
||||
zr.eval("a = 1+1");
|
||||
assertEquals(2.0, zr.get("a"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvalError() {
|
||||
try {
|
||||
zr.eval("nonExistObject");
|
||||
assertTrue(false);
|
||||
} catch (RuntimeException e) {
|
||||
assertTrue(true);
|
||||
}
|
||||
zr.eval("a = \"Hello\"");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetGet() {
|
||||
zr.set("a", 1);
|
||||
assertEquals(1, zr.get("a"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(InterpreterOutput out, byte[] output) {
|
||||
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue