mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Fail SQL gracefully if no python dependencies installed
This commit is contained in:
parent
aca2bdf76c
commit
0f2f852b16
8 changed files with 78 additions and 44 deletions
|
|
@ -37,7 +37,7 @@
|
|||
<py4j.version>0.9.2</py4j.version>
|
||||
<python.test.exclude>
|
||||
**/PythonInterpreterWithPythonInstalledTest.java,
|
||||
**/PythonPandasSqlInterpreterTest.java
|
||||
**/PythonInterpreterPandasSqlTest.java
|
||||
</python.test.exclude>
|
||||
</properties>
|
||||
|
||||
|
|
|
|||
|
|
@ -204,15 +204,20 @@ public class PythonInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
|
||||
private String sendCommandToPython(String cmd) {
|
||||
/**
|
||||
* Sends given text to Python interpreter, blocks and returns the output
|
||||
* @param cmd Python expression text
|
||||
* @return output
|
||||
*/
|
||||
String sendCommandToPython(String cmd) {
|
||||
String output = "";
|
||||
LOG.info("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd));
|
||||
LOG.debug("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd));
|
||||
try {
|
||||
output = process.sendAndGetResult(cmd);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error when sending commands to python process", e);
|
||||
}
|
||||
//logger.info("Got : \n" + output);
|
||||
LOG.debug("Got : \n" + output);
|
||||
return output;
|
||||
}
|
||||
|
||||
|
|
@ -243,11 +248,7 @@ public class PythonInterpreter extends Interpreter {
|
|||
|
||||
public Boolean isPy4jInstalled() {
|
||||
String output = sendCommandToPython("\n\nimport py4j\n");
|
||||
if (output.contains("ImportError")) {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
return !output.contains("ImportError");
|
||||
}
|
||||
|
||||
private int findRandomOpenPortOnAllLocalInterfaces() {
|
||||
|
|
|
|||
|
|
@ -33,16 +33,16 @@ import org.slf4j.LoggerFactory;
|
|||
*
|
||||
* Match experience of %sparpk.sql over Spark DataFrame
|
||||
*/
|
||||
public class PythonPandasSqlInterpreter extends Interpreter {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PythonPandasSqlInterpreter.class);
|
||||
public class PythonInterpreterPandasSql extends Interpreter {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterPandasSql.class);
|
||||
|
||||
private String SQL_BOOTSTRAP_FILE_PY = "/bootstrap_sql.py";
|
||||
|
||||
public PythonPandasSqlInterpreter(Properties property) {
|
||||
public PythonInterpreterPandasSql(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
private PythonInterpreter getPythonInterpreter() {
|
||||
PythonInterpreter getPythonInterpreter() {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
PythonInterpreter python = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
|
||||
|
|
@ -64,9 +64,6 @@ public class PythonPandasSqlInterpreter extends Interpreter {
|
|||
@Override
|
||||
public void open() {
|
||||
LOG.info("Open Python SQL interpreter instance: {}", this.toString());
|
||||
|
||||
//TODO(bzz): check i.e by importing and catching ImportError
|
||||
//if (py4jAndPandasAndPandasqlAreInstalled) {
|
||||
try {
|
||||
LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
|
|
@ -74,7 +71,16 @@ public class PythonPandasSqlInterpreter extends Interpreter {
|
|||
} catch (IOException e) {
|
||||
LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
|
||||
}
|
||||
//}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if Python dependencies pandas and pandasql are installed
|
||||
* @return True if they are
|
||||
*/
|
||||
boolean isPandasAndPandasqlInstalled() {
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
String output = python.sendCommandToPython("\n\nimport pandas\nimport pandasql\n");
|
||||
return !output.contains("ImportError");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -132,6 +132,8 @@ class PyZeppelinContext(object):
|
|||
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
|
||||
# and so a dependency on pandas
|
||||
self.show_dataframe(p, **kwargs)
|
||||
elif hasattr(p, '__call__'):
|
||||
p() #error reporting
|
||||
|
||||
def show_dataframe(self, df, **kwargs):
|
||||
"""Pretty prints DF using Table Display System
|
||||
|
|
|
|||
|
|
@ -13,15 +13,16 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# This is for org.apache.zeppelin.python.PythonPandasSqlInterpreterTest
|
||||
# Setup SQL over Pandas DataFrames
|
||||
# It requires next dependencies to be installed:
|
||||
# - numpy
|
||||
# - pandas
|
||||
# - pandasql
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from pandasql import sqldf
|
||||
|
||||
pysqldf = lambda q: sqldf(q, globals())
|
||||
try:
|
||||
from pandasql import sqldf
|
||||
pysqldf = lambda q: sqldf(q, globals())
|
||||
except ImportError:
|
||||
pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" +
|
||||
"Make sure 'pandas' and 'pandasql' libraries are installed")
|
||||
|
|
@ -44,7 +44,8 @@ import org.junit.Test;
|
|||
* <ol>
|
||||
* - <li>Python</li>
|
||||
* - <li>NumPy</li>
|
||||
* - <li>Pandas DataFrame</li>
|
||||
* - <li>Pandas</li>
|
||||
* - <li>PandaSql</li>
|
||||
* <ol>
|
||||
*
|
||||
* To run manually on such environment, use:
|
||||
|
|
@ -52,10 +53,10 @@ import org.junit.Test;
|
|||
* mvn -Dpython.test.exclude='' test -pl python -am
|
||||
* </code>
|
||||
*/
|
||||
public class PythonPandasSqlInterpreterTest {
|
||||
public class PythonInterpreterPandasSqlTest {
|
||||
|
||||
private InterpreterGroup intpGroup;
|
||||
private PythonPandasSqlInterpreter sql;
|
||||
private PythonInterpreterPandasSql sql;
|
||||
private PythonInterpreter python;
|
||||
|
||||
private InterpreterContext context;
|
||||
|
|
@ -72,7 +73,7 @@ public class PythonPandasSqlInterpreterTest {
|
|||
python.setInterpreterGroup(intpGroup);
|
||||
python.open();
|
||||
|
||||
sql = new PythonPandasSqlInterpreter(p);
|
||||
sql = new PythonInterpreterPandasSql(p);
|
||||
sql.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpGroup.put("note", Arrays.asList(python, sql));
|
||||
|
|
@ -92,23 +93,47 @@ public class PythonPandasSqlInterpreterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void sqlOverTestDataPrintsTable() {
|
||||
//given
|
||||
// `import pandas as pd` and `import numpy as np` done
|
||||
// DataFrame \w test data
|
||||
String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34";
|
||||
python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), "+
|
||||
"'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
|
||||
public void dependenciesAreInstalled() {
|
||||
InterpreterResult ret = python.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
|
||||
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void errorMessageIfDependenciesNotInstalled() {
|
||||
InterpreterResult ret;
|
||||
// given
|
||||
ret = python.interpret(
|
||||
"pysqldf = lambda q: print('Can not execute SQL as Python dependency is not installed')",
|
||||
context);
|
||||
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
|
||||
// when
|
||||
ret = sql.interpret("SELECT * from something", context);
|
||||
|
||||
// then
|
||||
assertNotNull(ret);
|
||||
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertTrue(ret.message().contains("dependency is not installed"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sqlOverTestDataPrintsTable() {
|
||||
InterpreterResult ret;
|
||||
// given
|
||||
//String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34";
|
||||
ret = python.interpret("import pandas as pd", context);
|
||||
ret = python.interpret("import numpy as np", context);
|
||||
// DataFrame df2 \w test data
|
||||
ret = python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), "+
|
||||
"'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
|
||||
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
|
||||
//when
|
||||
InterpreterResult ret = sql.interpret("select name, age from df2 where age < 40", context);
|
||||
ret = sql.interpret("select name, age from df2 where age < 40", context);
|
||||
|
||||
//then
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertEquals(Type.TABLE, ret.type());
|
||||
//System.out.println(ret.message());
|
||||
//System.out.println(expectedTable);
|
||||
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertEquals(ret.message(), Type.TABLE, ret.type());
|
||||
//assertEquals(expectedTable, ret.message()); //somehow it's same but not equal
|
||||
assertTrue(ret.message().indexOf("moon\t33") > 0);
|
||||
assertTrue(ret.message().indexOf("park\t34") > 0);
|
||||
|
|
@ -124,9 +149,8 @@ public class PythonPandasSqlInterpreterTest {
|
|||
//then
|
||||
assertNotNull("Interpreter returned 'null'", ret);
|
||||
//System.out.println("\nInterpreter response: \n" + ret.message());
|
||||
assertEquals(InterpreterResult.Code.ERROR, ret.code());
|
||||
assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
|
||||
assertTrue(ret.message().length() > 0);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -187,7 +187,7 @@ public class PythonInterpreterTest {
|
|||
s.connect(sa, 10000);
|
||||
connected = true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't open connection to " + sa, e);
|
||||
//LOG.warn("Can't open connection to " + sa, e);
|
||||
}
|
||||
return connected;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ import org.junit.Test;
|
|||
*
|
||||
* or
|
||||
* <code>
|
||||
* mvn -Dpython.test.exclude='' test -pl python
|
||||
* mvn -Dpython.test.exclude='' test -pl python -am
|
||||
* </code>
|
||||
*/
|
||||
public class PythonInterpreterWithPythonInstalledTest {
|
||||
|
|
|
|||
Loading…
Reference in a new issue