mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Complete implementation of the PythonPandasSqlInterpreter
This commit is contained in:
parent
f6ca1eb0de
commit
76bbb44dc6
1 changed files with 30 additions and 9 deletions
|
|
@ -23,11 +23,13 @@ import java.util.Properties;
|
|||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* SQL over Pandas DataFrame interpreter for %python group
|
||||
* SQL over Pandas DataFrame interpreter for %python group
|
||||
*
|
||||
* Match experience of %sparpk.sql over Spark DataFrame
|
||||
*/
|
||||
|
|
@ -40,16 +42,34 @@ public class PythonPandasSqlInterpreter extends Interpreter {
|
|||
super(property);
|
||||
}
|
||||
|
||||
private PythonInterpreter getPythonInterpreter() {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
PythonInterpreter python = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
python = (PythonInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return python;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
LOG.info("Open Python SQL interpreter instance: {}", this.toString());
|
||||
|
||||
|
||||
//TODO(bzz): check by importing and catching ImportError
|
||||
//if (pandasAndNumpyAndPandasqlAreInstalled) {
|
||||
try {
|
||||
LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
|
||||
PythonInterpreter python = (PythonInterpreter) this.getInterpreterInTheSameSessionByClassName(
|
||||
PythonInterpreter.class.getName());
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
python.bootStrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
|
||||
|
|
@ -59,15 +79,16 @@ public class PythonPandasSqlInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
LOG.info("Close Python SQL interpreter instance: {}", this.toString());
|
||||
LOG.info("Close Python SQL interpreter instance: {}", this.toString());
|
||||
Interpreter python = getPythonInterpreter();
|
||||
python.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
|
||||
Interpreter python = this.getInterpreterInTheSameSessionByClassName(
|
||||
PythonInterpreter.class.getName());
|
||||
return python.interpret("print pysqldf('" + st + "')", context);
|
||||
Interpreter python = getPythonInterpreter();
|
||||
return python.interpret("z.show(pysqldf('" + st + "'))", context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -77,7 +98,7 @@ public class PythonPandasSqlInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return null;
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in a new issue