[ZEPPELIN-4354]. Enhance z.angular for more flexiable data sharing

This commit is contained in:
Jeff Zhang 2019-09-27 14:43:48 +08:00
parent 08bfcd9579
commit 34b11e7465
5 changed files with 77 additions and 21 deletions

View file

@ -61,6 +61,12 @@ class PyZeppelinContext(object):
def get(self, key):
return self.__getitem__(key)
def angular(self, key, noteId = None, paragraphId = None):
return self.z.angular(key, noteId, paragraphId)
def angularBind(self, key, value, noteId = None, paragraphId = None):
return self.z.angularBind(key, value, noteId, paragraphId)
def getInterpreterContext(self):
return self.z.getInterpreterContext()

View file

@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkRBackend;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@ -44,7 +46,7 @@ import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
/**
* R and SparkR interpreter with visualization support.
*/
public class SparkRInterpreter extends Interpreter {
public class SparkRInterpreter extends AbstractInterpreter {
private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class);
private String renderOptions;
@ -54,7 +56,6 @@ public class SparkRInterpreter extends Interpreter {
private AtomicBoolean rbackendDead = new AtomicBoolean(false);
private SparkContext sc;
private JavaSparkContext jsc;
private String secret;
public SparkRInterpreter(Properties property) {
super(property);
@ -120,7 +121,7 @@ public class SparkRInterpreter extends Interpreter {
}
@Override
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext)
public InterpreterResult internalInterpret(String lines, InterpreterContext interpreterContext)
throws InterpreterException {
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(),
interpreterContext, properties);
@ -185,8 +186,13 @@ public class SparkRInterpreter extends Interpreter {
@Override
public void close() throws InterpreterException {
zeppelinR.close();
this.sparkInterpreter.close();
if (this.zeppelinR != null) {
zeppelinR.close();
}
if (this.sparkInterpreter != null) {
this.sparkInterpreter.close();
this.sparkInterpreter = null;
}
}
@Override
@ -216,6 +222,11 @@ public class SparkRInterpreter extends Interpreter {
SparkRInterpreter.class.getName() + this.hashCode());
}
@Override
public BaseZeppelinContext getZeppelinContext() {
return sparkInterpreter.getZeppelinContext();
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {

View file

@ -66,6 +66,14 @@ z.put <- function(name, object) {
z.get <- function(name) {
SparkR:::callJMethod(.zeppelinContext, "get", name)
}
z.angular <- function(name, noteId=NULL, paragraphId=NULL) {
SparkR:::callJMethod(.zeppelinContext, "angular", name, noteId, paragraphId)
}
z.angularBind <- function(name, value, noteId=NULL, paragraphId=NULL) {
SparkR:::callJMethod(.zeppelinContext, "angularBind", name, value, noteId, paragraphId)
}
z.input <- function(name, value) {
SparkR:::callJMethod(.zeppelinContext, "input", name, value)
}

View file

@ -35,9 +35,7 @@ public class AngularObjectRegistry {
private final String GLOBAL_KEY = "_GLOBAL_";
private AngularObjectRegistryListener listener;
private String interpreterId;
AngularObjectListener angularObjectListener;
private AngularObjectListener angularObjectListener;
public AngularObjectRegistry(final String interpreterId,
final AngularObjectRegistryListener listener) {

View file

@ -407,19 +407,12 @@ public abstract class BaseZeppelinContext {
runNote(context.getNoteId());
}
private AngularObject getAngularObject(String name, InterpreterContext interpreterContext) {
private AngularObject getAngularObject(String name,
String noteId,
String paragraphId,
InterpreterContext interpreterContext) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
String noteId = interpreterContext.getNoteId();
// try get local object
AngularObject paragraphAo = registry.get(name, noteId, interpreterContext.getParagraphId());
AngularObject noteAo = registry.get(name, noteId, null);
AngularObject ao = paragraphAo != null ? paragraphAo : noteAo;
if (ao == null) {
// then global object
ao = registry.get(name, null, null);
}
AngularObject ao = registry.get(name, noteId, paragraphId);
return ao;
}
@ -432,7 +425,27 @@ public abstract class BaseZeppelinContext {
*/
@ZeppelinApi
public Object angular(String name) {
AngularObject ao = getAngularObject(name, interpreterContext);
AngularObject ao = getAngularObject(name, interpreterContext.getNoteId(),
interpreterContext.getParagraphId(), interpreterContext);
if (ao == null) {
return null;
} else {
return ao.get();
}
}
public Object angular(String name, String noteId) {
AngularObject ao = getAngularObject(name, noteId,
interpreterContext.getParagraphId(), interpreterContext);
if (ao == null) {
return null;
} else {
return ao.get();
}
}
public Object angular(String name, String noteId, String paragraphId) {
AngularObject ao = getAngularObject(name, noteId, paragraphId, interpreterContext);
if (ao == null) {
return null;
} else {
@ -601,6 +614,7 @@ public abstract class BaseZeppelinContext {
*
* @param name name of the variable
* @param o value
* @param noteId
*/
public void angularBind(String name, Object o, String noteId) throws TException {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
@ -612,6 +626,25 @@ public abstract class BaseZeppelinContext {
}
}
/**
* Create angular variable in notebook scope and bind with front end Angular display system.
* If variable exists, it'll be overwritten.
*
* @param name name of the variable
* @param o value
* @param noteId
* @param paragraphId
*/
public void angularBind(String name, Object o, String noteId, String paragraphId) throws TException {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name, noteId, paragraphId) == null) {
registry.add(name, o, noteId, paragraphId);
} else {
registry.get(name, noteId, paragraphId).set(o);
}
}
/**
* Create angular variable in notebook scope and bind with front end Angular display
* system.