mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-3621. refactor getInterpreterInTheSameSession of Interpreter to reduce code duplication
This commit is contained in:
parent
480e6471ee
commit
06a6cc6246
21 changed files with 147 additions and 418 deletions
|
|
@ -77,8 +77,6 @@ import org.apache.zeppelin.interpreter.InterpreterException;
|
|||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
|
||||
/**
|
||||
|
|
@ -159,7 +157,7 @@ public abstract class BaseLivyInterpreter extends Interpreter {
|
|||
try {
|
||||
this.livyVersion = getLivyVersion();
|
||||
if (this.livyVersion.isSharedSupported()) {
|
||||
sharedInterpreter = getLivySharedInterpreter();
|
||||
sharedInterpreter = getInterpreterInTheSameSessionByClassName(LivySharedInterpreter.class);
|
||||
}
|
||||
if (sharedInterpreter == null || !sharedInterpreter.isSupported()) {
|
||||
initLivySession();
|
||||
|
|
@ -171,26 +169,6 @@ public abstract class BaseLivyInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
protected LivySharedInterpreter getLivySharedInterpreter() throws InterpreterException {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
LivySharedInterpreter sharedInterpreter = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(
|
||||
LivySharedInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
sharedInterpreter = (LivySharedInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return sharedInterpreter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (sharedInterpreter != null && sharedInterpreter.isSupported()) {
|
||||
|
|
|
|||
|
|
@ -18,16 +18,13 @@
|
|||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.ResultMessages;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
||||
|
|
@ -70,7 +67,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
|
|||
|
||||
@Override
|
||||
public void open() throws InterpreterException {
|
||||
this.sparkInterpreter = getSparkInterpreter();
|
||||
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class);
|
||||
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
|
||||
// to judge whether it is using spark2.
|
||||
try {
|
||||
|
|
@ -106,25 +103,6 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
|
|||
}
|
||||
}
|
||||
|
||||
private LivySparkInterpreter getSparkInterpreter() throws InterpreterException {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
LivySparkInterpreter spark = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (LivySparkInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return spark;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
try {
|
||||
|
|
@ -248,10 +226,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
|
|||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency);
|
||||
} else {
|
||||
Interpreter intp =
|
||||
getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
|
||||
if (intp != null) {
|
||||
return intp.getScheduler();
|
||||
if (sparkInterpreter != null) {
|
||||
return sparkInterpreter.getScheduler();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,14 +36,11 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.ResultMessages;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
@ -60,7 +57,7 @@ public class PigQueryInterpreter extends BasePigInterpreter {
|
|||
|
||||
@Override
|
||||
public void open() throws InterpreterException {
|
||||
pigServer = getPigInterpreter().getPigServer();
|
||||
pigServer = getInterpreterInTheSameSessionByClassName(PigInterpreter.class).getPigServer();
|
||||
maxResult = Integer.parseInt(getProperty(MAX_RESULTS));
|
||||
}
|
||||
|
||||
|
|
@ -162,23 +159,4 @@ public class PigQueryInterpreter extends BasePigInterpreter {
|
|||
public PigServer getPigServer() {
|
||||
return this.pigServer;
|
||||
}
|
||||
|
||||
private PigInterpreter getPigInterpreter() throws InterpreterException {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
PigInterpreter pig = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
pig = (PigInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return pig;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
@ -44,8 +45,8 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
|
|||
*/
|
||||
public class PigQueryInterpreterTest {
|
||||
|
||||
private PigInterpreter pigInterpreter;
|
||||
private PigQueryInterpreter pigQueryInterpreter;
|
||||
private Interpreter pigInterpreter;
|
||||
private Interpreter pigQueryInterpreter;
|
||||
private InterpreterContext context;
|
||||
|
||||
@Before
|
||||
|
|
@ -54,8 +55,8 @@ public class PigQueryInterpreterTest {
|
|||
properties.put("zeppelin.pig.execType", "local");
|
||||
properties.put("zeppelin.pig.maxResult", "20");
|
||||
|
||||
pigInterpreter = new PigInterpreter(properties);
|
||||
pigQueryInterpreter = new PigQueryInterpreter(properties);
|
||||
pigInterpreter = new LazyOpenInterpreter(new PigInterpreter(properties));
|
||||
pigQueryInterpreter = new LazyOpenInterpreter(new PigQueryInterpreter(properties));
|
||||
List<Interpreter> interpreters = new ArrayList();
|
||||
interpreters.add(pigInterpreter);
|
||||
interpreters.add(pigQueryInterpreter);
|
||||
|
|
@ -70,13 +71,13 @@ public class PigQueryInterpreterTest {
|
|||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
public void tearDown() throws InterpreterException {
|
||||
pigInterpreter.close();
|
||||
pigQueryInterpreter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasics() throws IOException {
|
||||
public void testBasics() throws IOException, InterpreterException {
|
||||
String content = "andy\tmale\t10\n"
|
||||
+ "peter\tmale\t20\n"
|
||||
+ "amy\tfemale\t14\n";
|
||||
|
|
@ -134,7 +135,7 @@ public class PigQueryInterpreterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMaxResult() throws IOException {
|
||||
public void testMaxResult() throws IOException, InterpreterException {
|
||||
StringBuilder content = new StringBuilder();
|
||||
for (int i = 0; i < 30; ++i) {
|
||||
content.append(i + "\tname_" + i + "\n");
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
|
|||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -71,7 +70,7 @@ public class PythonCondaInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
public void open() throws InterpreterException {
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -142,7 +141,6 @@ public class PythonCondaInterpreter extends Interpreter {
|
|||
|
||||
private void changePythonEnvironment(String envName)
|
||||
throws IOException, InterruptedException, InterpreterException {
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
String binPath = null;
|
||||
if (envName == null) {
|
||||
binPath = getProperty(ZEPPELIN_PYTHON);
|
||||
|
|
@ -159,22 +157,17 @@ public class PythonCondaInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
setCurrentCondaEnvName(envName);
|
||||
python.setPythonExec(binPath);
|
||||
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class, false)
|
||||
.setPythonExec(binPath);
|
||||
}
|
||||
|
||||
private void restartPythonProcess() throws InterpreterException {
|
||||
logger.debug("Restarting PythonInterpreter");
|
||||
Interpreter python =
|
||||
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
|
||||
python.close();
|
||||
python.open();
|
||||
}
|
||||
PythonInterpreter pythonInterpreter =
|
||||
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class, false);
|
||||
pythonInterpreter.close();
|
||||
pythonInterpreter.open();
|
||||
|
||||
protected PythonInterpreter getPythonInterpreter() throws InterpreterException {
|
||||
PythonInterpreter python = null;
|
||||
Interpreter p =
|
||||
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
|
||||
return (PythonInterpreter) ((LazyOpenInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
|
||||
public static String runCondaCommandForTextOutput(String title, List<String> commands)
|
||||
|
|
@ -379,16 +372,11 @@ public class PythonCondaInterpreter extends Interpreter {
|
|||
*/
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
PythonInterpreter pythonInterpreter = null;
|
||||
try {
|
||||
pythonInterpreter = getPythonInterpreter();
|
||||
if (pythonInterpreter != null) {
|
||||
return pythonInterpreter.getScheduler();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
PythonInterpreter pythonInterpreter =
|
||||
getInterpreterInTheSameSessionByClassName(PythonInterpreter.class, false);
|
||||
return pythonInterpreter.getScheduler();
|
||||
} catch (InterpreterException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,8 +21,6 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -46,18 +44,20 @@ public class PythonDockerInterpreter extends Interpreter {
|
|||
Pattern deactivatePattern = Pattern.compile("deactivate");
|
||||
Pattern helpPattern = Pattern.compile("help");
|
||||
private File zeppelinHome;
|
||||
private PythonInterpreter pythonInterpreter;
|
||||
|
||||
public PythonDockerInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
public void open() throws InterpreterException {
|
||||
if (System.getenv("ZEPPELIN_HOME") != null) {
|
||||
zeppelinHome = new File(System.getenv("ZEPPELIN_HOME"));
|
||||
} else {
|
||||
zeppelinHome = Paths.get("..").toAbsolutePath().toFile();
|
||||
}
|
||||
this.pythonInterpreter = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -68,7 +68,7 @@ public class PythonDockerInterpreter extends Interpreter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
File pythonWorkDir = getPythonInterpreter().getPythonWorkDir();
|
||||
File pythonWorkDir = pythonInterpreter.getPythonWorkDir();
|
||||
InterpreterOutput out = context.out;
|
||||
|
||||
Matcher activateMatcher = activatePattern.matcher(st);
|
||||
|
|
@ -98,7 +98,7 @@ public class PythonDockerInterpreter extends Interpreter {
|
|||
mountPy4j +
|
||||
"-e PYTHONPATH=\"" + pythonPath + "\" " +
|
||||
image + " " +
|
||||
getPythonInterpreter().getPythonExec() + " " +
|
||||
pythonInterpreter.getPythonExec() + " " +
|
||||
"/_python_workdir/zeppelin_python.py");
|
||||
restartPythonProcess();
|
||||
out.clear();
|
||||
|
|
@ -114,8 +114,7 @@ public class PythonDockerInterpreter extends Interpreter {
|
|||
|
||||
|
||||
public void setPythonCommand(String cmd) throws InterpreterException {
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
python.setPythonExec(cmd);
|
||||
pythonInterpreter.setPythonExec(cmd);
|
||||
}
|
||||
|
||||
private void printUsage(InterpreterOutput out) {
|
||||
|
|
@ -148,43 +147,18 @@ public class PythonDockerInterpreter extends Interpreter {
|
|||
*/
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
PythonInterpreter pythonInterpreter = null;
|
||||
try {
|
||||
pythonInterpreter = getPythonInterpreter();
|
||||
if (pythonInterpreter != null) {
|
||||
return pythonInterpreter.getScheduler();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} catch (InterpreterException e) {
|
||||
e.printStackTrace();
|
||||
if (pythonInterpreter != null) {
|
||||
return pythonInterpreter.getScheduler();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void restartPythonProcess() throws InterpreterException {
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
python.close();
|
||||
python.open();
|
||||
}
|
||||
|
||||
protected PythonInterpreter getPythonInterpreter() throws InterpreterException {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
PythonInterpreter python = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
if (pythonInterpreter != null) {
|
||||
pythonInterpreter.close();
|
||||
pythonInterpreter.open();
|
||||
}
|
||||
python = (PythonInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return python;
|
||||
}
|
||||
|
||||
public boolean pull(InterpreterOutput out, String image) throws InterpreterException {
|
||||
|
|
|
|||
|
|
@ -38,8 +38,6 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
|
|||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
|
||||
import org.apache.zeppelin.interpreter.InvalidHookException;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
|
|
@ -554,19 +552,8 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
|
|||
return resultCompletionText;
|
||||
}
|
||||
|
||||
protected IPythonInterpreter getIPythonInterpreter() {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
IPythonInterpreter iPython = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(IPythonInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
iPython = (IPythonInterpreter) p;
|
||||
return iPython;
|
||||
protected IPythonInterpreter getIPythonInterpreter() throws InterpreterException {
|
||||
return getInterpreterInTheSameSessionByClassName(IPythonInterpreter.class, false);
|
||||
}
|
||||
|
||||
protected BaseZeppelinContext createZeppelinContext() {
|
||||
|
|
|
|||
|
|
@ -21,8 +21,6 @@ import org.apache.zeppelin.interpreter.Interpreter;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -39,38 +37,20 @@ public class PythonInterpreterPandasSql extends Interpreter {
|
|||
|
||||
private String SQL_BOOTSTRAP_FILE_PY = "python/bootstrap_sql.py";
|
||||
|
||||
private PythonInterpreter pythonInterpreter;
|
||||
|
||||
public PythonInterpreterPandasSql(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
PythonInterpreter getPythonInterpreter() throws InterpreterException {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
PythonInterpreter python = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
python = (PythonInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return python;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() throws InterpreterException {
|
||||
LOG.info("Open Python SQL interpreter instance: {}", this.toString());
|
||||
|
||||
try {
|
||||
LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
|
||||
python.bootstrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
|
||||
this.pythonInterpreter = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class);
|
||||
this.pythonInterpreter.bootstrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
|
||||
}
|
||||
|
|
@ -79,17 +59,16 @@ public class PythonInterpreterPandasSql extends Interpreter {
|
|||
@Override
|
||||
public void close() throws InterpreterException {
|
||||
LOG.info("Close Python SQL interpreter instance: {}", this.toString());
|
||||
Interpreter python = getPythonInterpreter();
|
||||
python.close();
|
||||
if (pythonInterpreter != null) {
|
||||
pythonInterpreter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
|
||||
Interpreter python = getPythonInterpreter();
|
||||
|
||||
return python.interpret(
|
||||
return pythonInterpreter.interpret(
|
||||
"__zeppelin__.show(pysqldf('" + st + "'))\n__zeppelin__._displayhook()", context);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -58,8 +58,6 @@ public class PythonCondaInterpreterTest {
|
|||
group.put("note", Arrays.asList(python, conda));
|
||||
python.setInterpreterGroup(group);
|
||||
conda.setInterpreterGroup(group);
|
||||
|
||||
doReturn(python).when(conda).getPythonInterpreter();
|
||||
}
|
||||
|
||||
private void setMockCondaEnvList() throws IOException, InterruptedException {
|
||||
|
|
|
|||
|
|
@ -51,9 +51,9 @@ public class PythonDockerInterpreterTest {
|
|||
docker.setInterpreterGroup(group);
|
||||
|
||||
doReturn(true).when(docker).pull(any(InterpreterOutput.class), anyString());
|
||||
doReturn(python).when(docker).getPythonInterpreter();
|
||||
doReturn(new File("/scriptpath")).when(python).getPythonWorkDir();
|
||||
|
||||
doReturn(PythonDockerInterpreter.class.getName()).when(docker).getClassName();
|
||||
doReturn(PythonInterpreter.class.getName()).when(python).getClassName();
|
||||
docker.open();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -66,8 +66,14 @@ abstract class RInterpreter(properties : Properties, startSpark : Boolean = true
|
|||
| license so it can be used with this project.""".stripMargin)
|
||||
}
|
||||
|
||||
def getSparkInterpreter() : Option[SparkInterpreter] =
|
||||
getSparkInterpreter(getInterpreterInTheSameSessionByClassName(classOf[SparkInterpreter].getName))
|
||||
def getSparkInterpreter() : Option[SparkInterpreter] = {
|
||||
val sparkInterpreter = getInterpreterInTheSameSessionByClassName(classOf[SparkInterpreter])
|
||||
if (sparkInterpreter == null) {
|
||||
None
|
||||
} else {
|
||||
Some(sparkInterpreter)
|
||||
}
|
||||
}
|
||||
|
||||
def getSparkInterpreter(p1 : Interpreter) : Option[SparkInterpreter] = p1 match {
|
||||
case s : SparkInterpreter => Some[SparkInterpreter](s)
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ import org.apache.commons.lang.StringUtils;
|
|||
import org.apache.spark.repl.SparkILoop;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
|
|
@ -226,12 +227,14 @@ public class DepInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
public InterpreterResult interpret(String st, InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
PrintStream printStream = new PrintStream(out);
|
||||
Console.setOut(printStream);
|
||||
out.reset();
|
||||
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
SparkInterpreter sparkInterpreter =
|
||||
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class, false);
|
||||
|
||||
if (sparkInterpreter != null && sparkInterpreter.getDelegation().isSparkContextInitialized()) {
|
||||
return new InterpreterResult(Code.ERROR,
|
||||
|
|
@ -334,29 +337,17 @@ public class DepInterpreter extends Interpreter {
|
|||
return paths;
|
||||
}
|
||||
|
||||
private SparkInterpreter getSparkInterpreter() {
|
||||
InterpreterGroup intpGroup = getInterpreterGroup();
|
||||
if (intpGroup == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
if (p == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (SparkInterpreter) p;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
if (sparkInterpreter != null) {
|
||||
return getSparkInterpreter().getScheduler();
|
||||
} else {
|
||||
try {
|
||||
SparkInterpreter sparkInterpreter =
|
||||
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class, false);
|
||||
if (sparkInterpreter != null) {
|
||||
return sparkInterpreter.getScheduler();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} catch (InterpreterException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,14 +20,10 @@ package org.apache.zeppelin.spark;
|
|||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.python.IPythonInterpreter;
|
||||
import org.apache.zeppelin.python.PythonInterpreter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -50,9 +46,10 @@ public class IPySparkInterpreter extends IPythonInterpreter {
|
|||
|
||||
@Override
|
||||
public void open() throws InterpreterException {
|
||||
PySparkInterpreter pySparkInterpreter = getPySparkInterpreter();
|
||||
PySparkInterpreter pySparkInterpreter =
|
||||
getInterpreterInTheSameSessionByClassName(PySparkInterpreter.class, false);
|
||||
setProperty("zeppelin.python", pySparkInterpreter.getPythonExec());
|
||||
sparkInterpreter = getSparkInterpreter();
|
||||
sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
|
||||
setProperty("zeppelin.py4j.useAuth",
|
||||
sparkInterpreter.getSparkVersion().isSecretSocketSupported() + "");
|
||||
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
|
||||
|
|
@ -80,35 +77,6 @@ public class IPySparkInterpreter extends IPythonInterpreter {
|
|||
return env;
|
||||
}
|
||||
|
||||
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
SparkInterpreter spark = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (SparkInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return spark;
|
||||
}
|
||||
|
||||
private PySparkInterpreter getPySparkInterpreter() throws InterpreterException {
|
||||
PySparkInterpreter pySpark = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(PySparkInterpreter.class.getName());
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
pySpark = (PySparkInterpreter) p;
|
||||
return pySpark;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BaseZeppelinContext buildZeppelinContext() {
|
||||
return sparkInterpreter.getZeppelinContext();
|
||||
|
|
|
|||
|
|
@ -24,13 +24,10 @@ import org.apache.spark.SparkConf;
|
|||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.zeppelin.interpreter.DefaultInterpreterProperty;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -207,19 +204,6 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter {
|
|||
return sparkVersion;
|
||||
}
|
||||
|
||||
private DepInterpreter getDepInterpreter() {
|
||||
Interpreter p = getParentSparkInterpreter()
|
||||
.getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
|
||||
if (p == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (DepInterpreter) p;
|
||||
}
|
||||
|
||||
private String extractScalaVersion() throws IOException, InterruptedException {
|
||||
String scalaVersionString = scala.util.Properties.versionString();
|
||||
if (scalaVersionString.contains("version 2.10")) {
|
||||
|
|
@ -233,10 +217,11 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter {
|
|||
return this.sc != null;
|
||||
}
|
||||
|
||||
private List<String> getDependencyFiles() {
|
||||
private List<String> getDependencyFiles() throws InterpreterException {
|
||||
List<String> depFiles = new ArrayList<>();
|
||||
// add jar from DepInterpreter
|
||||
DepInterpreter depInterpreter = getDepInterpreter();
|
||||
DepInterpreter depInterpreter = getParentSparkInterpreter().
|
||||
getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false);
|
||||
if (depInterpreter != null) {
|
||||
SparkDependencyContext depc = depInterpreter.getDependencyContext();
|
||||
if (depc != null) {
|
||||
|
|
|
|||
|
|
@ -17,21 +17,6 @@
|
|||
|
||||
package org.apache.zeppelin.spark;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.spark.JobProgressUtil;
|
||||
|
|
@ -42,17 +27,15 @@ import org.apache.spark.SparkEnv;
|
|||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.repl.SparkILoop;
|
||||
import org.apache.spark.scheduler.Pool;
|
||||
import org.apache.spark.scheduler.SparkListener;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.spark.ui.SparkUI;
|
||||
import org.apache.spark.scheduler.SparkListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
|
@ -63,7 +46,6 @@ import org.apache.zeppelin.spark.dep.SparkDependencyContext;
|
|||
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import scala.Console;
|
||||
import scala.Enumeration.Value;
|
||||
import scala.None;
|
||||
|
|
@ -82,6 +64,21 @@ import scala.tools.nsc.settings.MutableSettings;
|
|||
import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
|
||||
import scala.tools.nsc.settings.MutableSettings.PathSetting;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Spark interpreter for Zeppelin.
|
||||
*
|
||||
|
|
@ -250,19 +247,6 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
|
|||
return dep;
|
||||
}
|
||||
|
||||
private DepInterpreter getDepInterpreter() {
|
||||
Interpreter p = getParentSparkInterpreter()
|
||||
.getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
|
||||
if (p == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (DepInterpreter) p;
|
||||
}
|
||||
|
||||
public boolean isYarnMode() {
|
||||
String master = getProperty("master");
|
||||
if (master == null) {
|
||||
|
|
@ -505,7 +489,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
|
|||
argList.add(arg);
|
||||
}
|
||||
|
||||
DepInterpreter depInterpreter = getDepInterpreter();
|
||||
DepInterpreter depInterpreter = getParentSparkInterpreter().
|
||||
getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false);
|
||||
String depInterpreterClasspath = "";
|
||||
if (depInterpreter != null) {
|
||||
SparkDependencyContext depc = depInterpreter.getDependencyContext();
|
||||
|
|
|
|||
|
|
@ -22,12 +22,9 @@ import org.apache.spark.SparkConf;
|
|||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.python.IPythonInterpreter;
|
||||
import org.apache.zeppelin.python.PythonInterpreter;
|
||||
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
|
||||
|
|
@ -64,7 +61,8 @@ public class PySparkInterpreter extends PythonInterpreter {
|
|||
setProperty("zeppelin.python.useIPython", getProperty("zeppelin.pyspark.useIPython", "true"));
|
||||
|
||||
// create SparkInterpreter in JVM side TODO(zjffdu) move to SparkInterpreter
|
||||
DepInterpreter depInterpreter = getDepInterpreter();
|
||||
DepInterpreter depInterpreter =
|
||||
getInterpreterInTheSameSessionByClassName(DepInterpreter.class, false);
|
||||
// load libraries from Dependency Interpreter
|
||||
URL [] urls = new URL[0];
|
||||
List<URL> urlList = new LinkedList<>();
|
||||
|
|
@ -109,7 +107,7 @@ public class PySparkInterpreter extends PythonInterpreter {
|
|||
Thread.currentThread().setContextClassLoader(newCl);
|
||||
// must create spark interpreter after ClassLoader is set, otherwise the additional jars
|
||||
// can not be loaded by spark repl.
|
||||
this.sparkInterpreter = getSparkInterpreter();
|
||||
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
|
||||
setProperty("zeppelin.py4j.useAuth",
|
||||
sparkInterpreter.getSparkVersion().isSecretSocketSupported() + "");
|
||||
// create Python Process and JVM gateway
|
||||
|
|
@ -136,6 +134,11 @@ public class PySparkInterpreter extends PythonInterpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IPythonInterpreter getIPythonInterpreter() throws InterpreterException {
|
||||
return getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BaseZeppelinContext createZeppelinContext() {
|
||||
return sparkInterpreter.getZeppelinContext();
|
||||
|
|
@ -183,37 +186,6 @@ public class PySparkInterpreter extends PythonInterpreter {
|
|||
return "python";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IPythonInterpreter getIPythonInterpreter() {
|
||||
IPySparkInterpreter iPython = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class.getName());
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
iPython = (IPySparkInterpreter) p;
|
||||
return iPython;
|
||||
}
|
||||
|
||||
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
SparkInterpreter spark = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (SparkInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return spark;
|
||||
}
|
||||
|
||||
|
||||
public SparkZeppelinContext getZeppelinContext() {
|
||||
if (sparkInterpreter != null) {
|
||||
return sparkInterpreter.getZeppelinContext();
|
||||
|
|
@ -255,18 +227,6 @@ public class PySparkInterpreter extends PythonInterpreter {
|
|||
}
|
||||
}
|
||||
|
||||
private DepInterpreter getDepInterpreter() {
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
|
||||
if (p == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
return (DepInterpreter) p;
|
||||
}
|
||||
|
||||
public boolean isSpark2() {
|
||||
return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,17 +17,16 @@
|
|||
|
||||
package org.apache.zeppelin.spark;
|
||||
|
||||
import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.SparkRBackend;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -39,7 +38,8 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
|
||||
|
||||
/**
|
||||
* R and SparkR interpreter with visualization support.
|
||||
|
|
@ -81,7 +81,7 @@ public class SparkRInterpreter extends Interpreter {
|
|||
throw new InterpreterException(String.format("sparkRLib %s doesn't exist", sparkRLibPath));
|
||||
}
|
||||
|
||||
this.sparkInterpreter = getSparkInterpreter();
|
||||
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
|
||||
this.sc = sparkInterpreter.getSparkContext();
|
||||
this.jsc = sparkInterpreter.getJavaSparkContext();
|
||||
// Share the same SparkRBackend across sessions
|
||||
|
|
@ -219,25 +219,6 @@ public class SparkRInterpreter extends Interpreter {
|
|||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
SparkInterpreter spark = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (SparkInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return spark;
|
||||
}
|
||||
|
||||
private boolean useKnitr() {
|
||||
return Boolean.parseBoolean(getProperty("zeppelin.R.knitr", "true"));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,12 +17,6 @@
|
|||
|
||||
package org.apache.zeppelin.spark;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
|
|
@ -30,46 +24,32 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Spark SQL interpreter for Zeppelin.
|
||||
*/
|
||||
public class SparkSqlInterpreter extends Interpreter {
|
||||
private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
|
||||
|
||||
private SparkInterpreter sparkInterpreter;
|
||||
|
||||
public SparkSqlInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
|
||||
}
|
||||
|
||||
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
SparkInterpreter spark = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (SparkInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return spark;
|
||||
public void open() throws InterpreterException {
|
||||
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
|
||||
}
|
||||
|
||||
public boolean concurrentSQL() {
|
||||
|
|
@ -82,7 +62,6 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
if (sparkInterpreter.isUnsupportedSparkVersion()) {
|
||||
return new InterpreterResult(Code.ERROR, "Spark "
|
||||
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
|
||||
|
|
@ -123,7 +102,6 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) throws InterpreterException {
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
SparkContext sc = sparkInterpreter.getSparkContext();
|
||||
sc.cancelJobGroup(Utils.buildJobGroupId(context));
|
||||
}
|
||||
|
|
@ -136,7 +114,6 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) throws InterpreterException {
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
return sparkInterpreter.getProgress(context);
|
||||
}
|
||||
|
||||
|
|
@ -153,13 +130,11 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
// It's because of scheduler is not created yet, and scheduler is created by this function.
|
||||
// Therefore, we can still use getSparkInterpreter() here, but it's better and safe
|
||||
// to getSparkInterpreter without opening it.
|
||||
|
||||
Interpreter intp =
|
||||
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
|
||||
if (intp != null) {
|
||||
return intp.getScheduler();
|
||||
} else {
|
||||
return null;
|
||||
try {
|
||||
return getInterpreterInTheSameSessionByClassName(SparkInterpreter.class, false)
|
||||
.getScheduler();
|
||||
} catch (InterpreterException e) {
|
||||
throw new RuntimeException("Fail to getScheduler", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package org.apache.zeppelin.spark;
|
|||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
|
|
@ -72,7 +73,7 @@ public class DepInterpreterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDefault() {
|
||||
public void testDefault() throws InterpreterException {
|
||||
dep.getDependencyContext().reset();
|
||||
InterpreterResult ret = dep.interpret("z.load(\"org.apache.commons:commons-csv:1.1\")", context);
|
||||
assertEquals(Code.SUCCESS, ret.code());
|
||||
|
|
|
|||
|
|
@ -338,13 +338,14 @@ public abstract class Interpreter {
|
|||
}
|
||||
|
||||
@ZeppelinApi
|
||||
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
|
||||
public <T> T getInterpreterInTheSameSessionByClassName(Class<T> interpreterClass, boolean open)
|
||||
throws InterpreterException {
|
||||
synchronized (interpreterGroup) {
|
||||
for (List<Interpreter> interpreters : interpreterGroup.values()) {
|
||||
boolean belongsToSameNoteGroup = false;
|
||||
Interpreter interpreterFound = null;
|
||||
for (Interpreter intp : interpreters) {
|
||||
if (intp.getClassName().equals(className)) {
|
||||
if (intp.getClassName().equals(interpreterClass.getName())) {
|
||||
interpreterFound = intp;
|
||||
}
|
||||
|
||||
|
|
@ -357,14 +358,32 @@ public abstract class Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
if (belongsToSameNoteGroup) {
|
||||
return interpreterFound;
|
||||
if (belongsToSameNoteGroup && interpreterFound != null) {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
T innerInterpreter = null;
|
||||
while (interpreterFound instanceof WrappedInterpreter) {
|
||||
if (interpreterFound instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) interpreterFound;
|
||||
}
|
||||
interpreterFound = ((WrappedInterpreter) interpreterFound).getInnerInterpreter();
|
||||
}
|
||||
innerInterpreter = (T) interpreterFound;
|
||||
|
||||
if (lazy != null && open) {
|
||||
lazy.open();
|
||||
}
|
||||
return innerInterpreter;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public <T> T getInterpreterInTheSameSessionByClassName(Class<T> interpreterClass)
|
||||
throws InterpreterException {
|
||||
return getInterpreterInTheSameSessionByClassName(interpreterClass, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Replace markers #{contextFieldName} by values from {@link InterpreterContext} fields
|
||||
* with same name and marker #{user}. If value == null then replace by empty string.
|
||||
|
|
|
|||
|
|
@ -47,7 +47,6 @@ public class SparkDownloadUtils {
|
|||
LOGGER.warn("Failed to download Spark", e);
|
||||
}
|
||||
}
|
||||
|
||||
// fallback to use apache archive
|
||||
// https://archive.apache.org/dist/spark/spark-1.6.3/spark-1.6.3-bin-hadoop2.6.tgz
|
||||
if (!downloaded) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue