ZEPPELIN-3621. refactor getInterpreterInTheSameSession of Interpreter to reduce code duplication

This commit is contained in:
Jeff Zhang 2018-07-13 14:42:53 +08:00
parent 480e6471ee
commit 06a6cc6246
21 changed files with 147 additions and 418 deletions

View file

@ -77,8 +77,6 @@ import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
/**
@ -159,7 +157,7 @@ public abstract class BaseLivyInterpreter extends Interpreter {
try {
this.livyVersion = getLivyVersion();
if (this.livyVersion.isSharedSupported()) {
sharedInterpreter = getLivySharedInterpreter();
sharedInterpreter = getInterpreterInTheSameSessionByClassName(LivySharedInterpreter.class);
}
if (sharedInterpreter == null || !sharedInterpreter.isSupported()) {
initLivySession();
@ -171,26 +169,6 @@ public abstract class BaseLivyInterpreter extends Interpreter {
}
}
protected LivySharedInterpreter getLivySharedInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
LivySharedInterpreter sharedInterpreter = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(
LivySharedInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
sharedInterpreter = (LivySharedInterpreter) p;
if (lazy != null) {
lazy.open();
}
return sharedInterpreter;
}
@Override
public void close() {
if (sharedInterpreter != null && sharedInterpreter.isSupported()) {

View file

@ -18,16 +18,13 @@
package org.apache.zeppelin.livy;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.ResultMessages;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -70,7 +67,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
@Override
public void open() throws InterpreterException {
this.sparkInterpreter = getSparkInterpreter();
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class);
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
// to judge whether it is using spark2.
try {
@ -106,25 +103,6 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
}
}
private LivySparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
LivySparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (LivySparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
}
@Override
public InterpreterResult interpret(String line, InterpreterContext context) {
try {
@ -248,10 +226,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
return SchedulerFactory.singleton().createOrGetParallelScheduler(
LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency);
} else {
Interpreter intp =
getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
if (intp != null) {
return intp.getScheduler();
if (sparkInterpreter != null) {
return sparkInterpreter.getScheduler();
} else {
return null;
}

View file

@ -36,14 +36,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.ResultMessages;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
/**
*
@ -60,7 +57,7 @@ public class PigQueryInterpreter extends BasePigInterpreter {
@Override
public void open() throws InterpreterException {
pigServer = getPigInterpreter().getPigServer();
pigServer = getInterpreterInTheSameSessionByClassName(PigInterpreter.class).getPigServer();
maxResult = Integer.parseInt(getProperty(MAX_RESULTS));
}
@ -162,23 +159,4 @@ public class PigQueryInterpreter extends BasePigInterpreter {
public PigServer getPigServer() {
return this.pigServer;
}
private PigInterpreter getPigInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
PigInterpreter pig = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
pig = (PigInterpreter) p;
if (lazy != null) {
lazy.open();
}
return pig;
}
}

View file

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -44,8 +45,8 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
*/
public class PigQueryInterpreterTest {
private PigInterpreter pigInterpreter;
private PigQueryInterpreter pigQueryInterpreter;
private Interpreter pigInterpreter;
private Interpreter pigQueryInterpreter;
private InterpreterContext context;
@Before
@ -54,8 +55,8 @@ public class PigQueryInterpreterTest {
properties.put("zeppelin.pig.execType", "local");
properties.put("zeppelin.pig.maxResult", "20");
pigInterpreter = new PigInterpreter(properties);
pigQueryInterpreter = new PigQueryInterpreter(properties);
pigInterpreter = new LazyOpenInterpreter(new PigInterpreter(properties));
pigQueryInterpreter = new LazyOpenInterpreter(new PigQueryInterpreter(properties));
List<Interpreter> interpreters = new ArrayList();
interpreters.add(pigInterpreter);
interpreters.add(pigQueryInterpreter);
@ -70,13 +71,13 @@ public class PigQueryInterpreterTest {
}
@After
public void tearDown() {
public void tearDown() throws InterpreterException {
pigInterpreter.close();
pigQueryInterpreter.close();
}
@Test
public void testBasics() throws IOException {
public void testBasics() throws IOException, InterpreterException {
String content = "andy\tmale\t10\n"
+ "peter\tmale\t20\n"
+ "amy\tfemale\t14\n";
@ -134,7 +135,7 @@ public class PigQueryInterpreterTest {
}
@Test
public void testMaxResult() throws IOException {
public void testMaxResult() throws IOException, InterpreterException {
StringBuilder content = new StringBuilder();
for (int i = 0; i < 30; ++i) {
content.append(i + "\tname_" + i + "\n");

View file

@ -24,7 +24,6 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,7 +70,7 @@ public class PythonCondaInterpreter extends Interpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
}
@ -142,7 +141,6 @@ public class PythonCondaInterpreter extends Interpreter {
private void changePythonEnvironment(String envName)
throws IOException, InterruptedException, InterpreterException {
PythonInterpreter python = getPythonInterpreter();
String binPath = null;
if (envName == null) {
binPath = getProperty(ZEPPELIN_PYTHON);
@ -159,22 +157,17 @@ public class PythonCondaInterpreter extends Interpreter {
}
}
setCurrentCondaEnvName(envName);
python.setPythonExec(binPath);
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class, false)
.setPythonExec(binPath);
}
private void restartPythonProcess() throws InterpreterException {
logger.debug("Restarting PythonInterpreter");
Interpreter python =
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
python.close();
python.open();
}
PythonInterpreter pythonInterpreter =
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class, false);
pythonInterpreter.close();
pythonInterpreter.open();
protected PythonInterpreter getPythonInterpreter() throws InterpreterException {
PythonInterpreter python = null;
Interpreter p =
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
return (PythonInterpreter) ((LazyOpenInterpreter) p).getInnerInterpreter();
}
public static String runCondaCommandForTextOutput(String title, List<String> commands)
@ -379,16 +372,11 @@ public class PythonCondaInterpreter extends Interpreter {
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = null;
try {
pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
PythonInterpreter pythonInterpreter =
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class, false);
return pythonInterpreter.getScheduler();
} catch (InterpreterException e) {
e.printStackTrace();
return null;
}
}

View file

@ -21,8 +21,6 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,18 +44,20 @@ public class PythonDockerInterpreter extends Interpreter {
Pattern deactivatePattern = Pattern.compile("deactivate");
Pattern helpPattern = Pattern.compile("help");
private File zeppelinHome;
private PythonInterpreter pythonInterpreter;
public PythonDockerInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
public void open() throws InterpreterException {
if (System.getenv("ZEPPELIN_HOME") != null) {
zeppelinHome = new File(System.getenv("ZEPPELIN_HOME"));
} else {
zeppelinHome = Paths.get("..").toAbsolutePath().toFile();
}
this.pythonInterpreter = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class);
}
@Override
@ -68,7 +68,7 @@ public class PythonDockerInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
File pythonWorkDir = getPythonInterpreter().getPythonWorkDir();
File pythonWorkDir = pythonInterpreter.getPythonWorkDir();
InterpreterOutput out = context.out;
Matcher activateMatcher = activatePattern.matcher(st);
@ -98,7 +98,7 @@ public class PythonDockerInterpreter extends Interpreter {
mountPy4j +
"-e PYTHONPATH=\"" + pythonPath + "\" " +
image + " " +
getPythonInterpreter().getPythonExec() + " " +
pythonInterpreter.getPythonExec() + " " +
"/_python_workdir/zeppelin_python.py");
restartPythonProcess();
out.clear();
@ -114,8 +114,7 @@ public class PythonDockerInterpreter extends Interpreter {
public void setPythonCommand(String cmd) throws InterpreterException {
PythonInterpreter python = getPythonInterpreter();
python.setPythonExec(cmd);
pythonInterpreter.setPythonExec(cmd);
}
private void printUsage(InterpreterOutput out) {
@ -148,43 +147,18 @@ public class PythonDockerInterpreter extends Interpreter {
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = null;
try {
pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
} catch (InterpreterException e) {
e.printStackTrace();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
}
private void restartPythonProcess() throws InterpreterException {
PythonInterpreter python = getPythonInterpreter();
python.close();
python.open();
}
protected PythonInterpreter getPythonInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
if (pythonInterpreter != null) {
pythonInterpreter.close();
pythonInterpreter.open();
}
python = (PythonInterpreter) p;
if (lazy != null) {
lazy.open();
}
return python;
}
public boolean pull(InterpreterOutput out, String image) throws InterpreterException {

View file

@ -38,8 +38,6 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InvalidHookException;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
@ -554,19 +552,8 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
return resultCompletionText;
}
protected IPythonInterpreter getIPythonInterpreter() {
LazyOpenInterpreter lazy = null;
IPythonInterpreter iPython = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(IPythonInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
iPython = (IPythonInterpreter) p;
return iPython;
protected IPythonInterpreter getIPythonInterpreter() throws InterpreterException {
return getInterpreterInTheSameSessionByClassName(IPythonInterpreter.class, false);
}
protected BaseZeppelinContext createZeppelinContext() {

View file

@ -21,8 +21,6 @@ import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,38 +37,20 @@ public class PythonInterpreterPandasSql extends Interpreter {
private String SQL_BOOTSTRAP_FILE_PY = "python/bootstrap_sql.py";
private PythonInterpreter pythonInterpreter;
public PythonInterpreterPandasSql(Properties property) {
super(property);
}
PythonInterpreter getPythonInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
python = (PythonInterpreter) p;
if (lazy != null) {
lazy.open();
}
return python;
}
@Override
public void open() throws InterpreterException {
LOG.info("Open Python SQL interpreter instance: {}", this.toString());
try {
LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
PythonInterpreter python = getPythonInterpreter();
python.bootstrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
this.pythonInterpreter = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class);
this.pythonInterpreter.bootstrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
} catch (IOException e) {
LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
}
@ -79,17 +59,16 @@ public class PythonInterpreterPandasSql extends Interpreter {
@Override
public void close() throws InterpreterException {
LOG.info("Close Python SQL interpreter instance: {}", this.toString());
Interpreter python = getPythonInterpreter();
python.close();
if (pythonInterpreter != null) {
pythonInterpreter.close();
}
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
Interpreter python = getPythonInterpreter();
return python.interpret(
return pythonInterpreter.interpret(
"__zeppelin__.show(pysqldf('" + st + "'))\n__zeppelin__._displayhook()", context);
}

View file

@ -58,8 +58,6 @@ public class PythonCondaInterpreterTest {
group.put("note", Arrays.asList(python, conda));
python.setInterpreterGroup(group);
conda.setInterpreterGroup(group);
doReturn(python).when(conda).getPythonInterpreter();
}
private void setMockCondaEnvList() throws IOException, InterruptedException {

View file

@ -51,9 +51,9 @@ public class PythonDockerInterpreterTest {
docker.setInterpreterGroup(group);
doReturn(true).when(docker).pull(any(InterpreterOutput.class), anyString());
doReturn(python).when(docker).getPythonInterpreter();
doReturn(new File("/scriptpath")).when(python).getPythonWorkDir();
doReturn(PythonDockerInterpreter.class.getName()).when(docker).getClassName();
doReturn(PythonInterpreter.class.getName()).when(python).getClassName();
docker.open();
}

View file

@ -66,8 +66,14 @@ abstract class RInterpreter(properties : Properties, startSpark : Boolean = true
| license so it can be used with this project.""".stripMargin)
}
def getSparkInterpreter() : Option[SparkInterpreter] =
getSparkInterpreter(getInterpreterInTheSameSessionByClassName(classOf[SparkInterpreter].getName))
def getSparkInterpreter() : Option[SparkInterpreter] = {
val sparkInterpreter = getInterpreterInTheSameSessionByClassName(classOf[SparkInterpreter])
if (sparkInterpreter == null) {
None
} else {
Some(sparkInterpreter)
}
}
def getSparkInterpreter(p1 : Interpreter) : Option[SparkInterpreter] = p1 match {
case s : SparkInterpreter => Some[SparkInterpreter](s)

View file

@ -38,6 +38,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.spark.repl.SparkILoop;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
@ -226,12 +227,14 @@ public class DepInterpreter extends Interpreter {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
PrintStream printStream = new PrintStream(out);
Console.setOut(printStream);
out.reset();
SparkInterpreter sparkInterpreter = getSparkInterpreter();
SparkInterpreter sparkInterpreter =
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class, false);
if (sparkInterpreter != null && sparkInterpreter.getDelegation().isSparkContextInitialized()) {
return new InterpreterResult(Code.ERROR,
@ -334,29 +337,17 @@ public class DepInterpreter extends Interpreter {
return paths;
}
private SparkInterpreter getSparkInterpreter() {
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup == null) {
return null;
}
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
if (p == null) {
return null;
}
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (SparkInterpreter) p;
}
@Override
public Scheduler getScheduler() {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
if (sparkInterpreter != null) {
return getSparkInterpreter().getScheduler();
} else {
try {
SparkInterpreter sparkInterpreter =
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class, false);
if (sparkInterpreter != null) {
return sparkInterpreter.getScheduler();
} else {
return null;
}
} catch (InterpreterException e) {
return null;
}
}

View file

@ -20,14 +20,10 @@ package org.apache.zeppelin.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.apache.zeppelin.python.PythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,9 +46,10 @@ public class IPySparkInterpreter extends IPythonInterpreter {
@Override
public void open() throws InterpreterException {
PySparkInterpreter pySparkInterpreter = getPySparkInterpreter();
PySparkInterpreter pySparkInterpreter =
getInterpreterInTheSameSessionByClassName(PySparkInterpreter.class, false);
setProperty("zeppelin.python", pySparkInterpreter.getPythonExec());
sparkInterpreter = getSparkInterpreter();
sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
setProperty("zeppelin.py4j.useAuth",
sparkInterpreter.getSparkVersion().isSecretSocketSupported() + "");
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
@ -80,35 +77,6 @@ public class IPySparkInterpreter extends IPythonInterpreter {
return env;
}
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
}
private PySparkInterpreter getPySparkInterpreter() throws InterpreterException {
PySparkInterpreter pySpark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PySparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
pySpark = (PySparkInterpreter) p;
return pySpark;
}
@Override
public BaseZeppelinContext buildZeppelinContext() {
return sparkInterpreter.getZeppelinContext();

View file

@ -24,13 +24,10 @@ import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.DefaultInterpreterProperty;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.slf4j.Logger;
@ -207,19 +204,6 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter {
return sparkVersion;
}
private DepInterpreter getDepInterpreter() {
Interpreter p = getParentSparkInterpreter()
.getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
if (p == null) {
return null;
}
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (DepInterpreter) p;
}
private String extractScalaVersion() throws IOException, InterruptedException {
String scalaVersionString = scala.util.Properties.versionString();
if (scalaVersionString.contains("version 2.10")) {
@ -233,10 +217,11 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter {
return this.sc != null;
}
private List<String> getDependencyFiles() {
private List<String> getDependencyFiles() throws InterpreterException {
List<String> depFiles = new ArrayList<>();
// add jar from DepInterpreter
DepInterpreter depInterpreter = getDepInterpreter();
DepInterpreter depInterpreter = getParentSparkInterpreter().
getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false);
if (depInterpreter != null) {
SparkDependencyContext depc = depInterpreter.getDependencyContext();
if (depc != null) {

View file

@ -17,21 +17,6 @@
package org.apache.zeppelin.spark;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.JobProgressUtil;
@ -42,17 +27,15 @@ import org.apache.spark.SparkEnv;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.repl.SparkILoop;
import org.apache.spark.scheduler.Pool;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.SparkUI;
import org.apache.spark.scheduler.SparkListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.resource.ResourcePool;
@ -63,7 +46,6 @@ import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Console;
import scala.Enumeration.Value;
import scala.None;
@ -82,6 +64,21 @@ import scala.tools.nsc.settings.MutableSettings;
import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
import scala.tools.nsc.settings.MutableSettings.PathSetting;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Spark interpreter for Zeppelin.
*
@ -250,19 +247,6 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
return dep;
}
private DepInterpreter getDepInterpreter() {
Interpreter p = getParentSparkInterpreter()
.getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
if (p == null) {
return null;
}
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (DepInterpreter) p;
}
public boolean isYarnMode() {
String master = getProperty("master");
if (master == null) {
@ -505,7 +489,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
argList.add(arg);
}
DepInterpreter depInterpreter = getDepInterpreter();
DepInterpreter depInterpreter = getParentSparkInterpreter().
getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false);
String depInterpreterClasspath = "";
if (depInterpreter != null) {
SparkDependencyContext depc = depInterpreter.getDependencyContext();

View file

@ -22,12 +22,9 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.apache.zeppelin.python.PythonInterpreter;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
@ -64,7 +61,8 @@ public class PySparkInterpreter extends PythonInterpreter {
setProperty("zeppelin.python.useIPython", getProperty("zeppelin.pyspark.useIPython", "true"));
// create SparkInterpreter in JVM side TODO(zjffdu) move to SparkInterpreter
DepInterpreter depInterpreter = getDepInterpreter();
DepInterpreter depInterpreter =
getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false);
// load libraries from Dependency Interpreter
URL [] urls = new URL[0];
List<URL> urlList = new LinkedList<>();
@ -109,7 +107,7 @@ public class PySparkInterpreter extends PythonInterpreter {
Thread.currentThread().setContextClassLoader(newCl);
// must create spark interpreter after ClassLoader is set, otherwise the additional jars
// can not be loaded by spark repl.
this.sparkInterpreter = getSparkInterpreter();
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
setProperty("zeppelin.py4j.useAuth",
sparkInterpreter.getSparkVersion().isSecretSocketSupported() + "");
// create Python Process and JVM gateway
@ -136,6 +134,11 @@ public class PySparkInterpreter extends PythonInterpreter {
}
}
@Override
protected IPythonInterpreter getIPythonInterpreter() throws InterpreterException {
return getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class, false);
}
@Override
protected BaseZeppelinContext createZeppelinContext() {
return sparkInterpreter.getZeppelinContext();
@ -183,37 +186,6 @@ public class PySparkInterpreter extends PythonInterpreter {
return "python";
}
@Override
protected IPythonInterpreter getIPythonInterpreter() {
IPySparkInterpreter iPython = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
iPython = (IPySparkInterpreter) p;
return iPython;
}
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
}
public SparkZeppelinContext getZeppelinContext() {
if (sparkInterpreter != null) {
return sparkInterpreter.getZeppelinContext();
@ -255,18 +227,6 @@ public class PySparkInterpreter extends PythonInterpreter {
}
}
private DepInterpreter getDepInterpreter() {
Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
if (p == null) {
return null;
}
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (DepInterpreter) p;
}
public boolean isSpark2() {
return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0);
}

View file

@ -17,17 +17,16 @@
package org.apache.zeppelin.spark;
import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkRBackend;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@ -39,7 +38,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
/**
* R and SparkR interpreter with visualization support.
@ -81,7 +81,7 @@ public class SparkRInterpreter extends Interpreter {
throw new InterpreterException(String.format("sparkRLib %s doesn't exist", sparkRLibPath));
}
this.sparkInterpreter = getSparkInterpreter();
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
this.sc = sparkInterpreter.getSparkContext();
this.jsc = sparkInterpreter.getJavaSparkContext();
// Share the same SparkRBackend across sessions
@ -219,25 +219,6 @@ public class SparkRInterpreter extends Interpreter {
return new ArrayList<>();
}
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
}
private boolean useKnitr() {
return Boolean.parseBoolean(getProperty("zeppelin.R.knitr", "true"));
}

View file

@ -17,12 +17,6 @@
package org.apache.zeppelin.spark;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.Interpreter;
@ -30,46 +24,32 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Properties;
/**
* Spark SQL interpreter for Zeppelin.
*/
public class SparkSqlInterpreter extends Interpreter {
private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
private SparkInterpreter sparkInterpreter;
public SparkSqlInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
}
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
public void open() throws InterpreterException {
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
}
public boolean concurrentSQL() {
@ -82,7 +62,6 @@ public class SparkSqlInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
if (sparkInterpreter.isUnsupportedSparkVersion()) {
return new InterpreterResult(Code.ERROR, "Spark "
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
@ -123,7 +102,6 @@ public class SparkSqlInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) throws InterpreterException {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
SparkContext sc = sparkInterpreter.getSparkContext();
sc.cancelJobGroup(Utils.buildJobGroupId(context));
}
@ -136,7 +114,6 @@ public class SparkSqlInterpreter extends Interpreter {
@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
return sparkInterpreter.getProgress(context);
}
@ -153,13 +130,11 @@ public class SparkSqlInterpreter extends Interpreter {
// It's because of scheduler is not created yet, and scheduler is created by this function.
// Therefore, we can still use getSparkInterpreter() here, but it's better and safe
// to getSparkInterpreter without opening it.
Interpreter intp =
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
if (intp != null) {
return intp.getScheduler();
} else {
return null;
try {
return getInterpreterInTheSameSessionByClassName(SparkInterpreter.class, false)
.getScheduler();
} catch (InterpreterException e) {
throw new RuntimeException("Fail to getScheduler", e);
}
}
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.spark;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
@ -72,7 +73,7 @@ public class DepInterpreterTest {
}
@Test
public void testDefault() {
public void testDefault() throws InterpreterException {
dep.getDependencyContext().reset();
InterpreterResult ret = dep.interpret("z.load(\"org.apache.commons:commons-csv:1.1\")", context);
assertEquals(Code.SUCCESS, ret.code());

View file

@ -338,13 +338,14 @@ public abstract class Interpreter {
}
@ZeppelinApi
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
public <T> T getInterpreterInTheSameSessionByClassName(Class<T> interpreterClass, boolean open)
throws InterpreterException {
synchronized (interpreterGroup) {
for (List<Interpreter> interpreters : interpreterGroup.values()) {
boolean belongsToSameNoteGroup = false;
Interpreter interpreterFound = null;
for (Interpreter intp : interpreters) {
if (intp.getClassName().equals(className)) {
if (intp.getClassName().equals(interpreterClass.getName())) {
interpreterFound = intp;
}
@ -357,14 +358,32 @@ public abstract class Interpreter {
}
}
if (belongsToSameNoteGroup) {
return interpreterFound;
if (belongsToSameNoteGroup && interpreterFound != null) {
LazyOpenInterpreter lazy = null;
T innerInterpreter = null;
while (interpreterFound instanceof WrappedInterpreter) {
if (interpreterFound instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) interpreterFound;
}
interpreterFound = ((WrappedInterpreter) interpreterFound).getInnerInterpreter();
}
innerInterpreter = (T) interpreterFound;
if (lazy != null && open) {
lazy.open();
}
return innerInterpreter;
}
}
}
return null;
}
public <T> T getInterpreterInTheSameSessionByClassName(Class<T> interpreterClass)
throws InterpreterException {
return getInterpreterInTheSameSessionByClassName(interpreterClass, true);
}
/**
* Replace markers #{contextFieldName} by values from {@link InterpreterContext} fields
* with same name and marker #{user}. If value == null then replace by empty string.

View file

@ -47,7 +47,6 @@ public class SparkDownloadUtils {
LOGGER.warn("Failed to download Spark", e);
}
}
// fallback to use apache archive
// https://archive.apache.org/dist/spark/spark-1.6.3/spark-1.6.3-bin-hadoop2.6.tgz
if (!downloaded) {