add pandas

This commit is contained in:
astroshim 2017-03-14 02:25:08 +09:00
parent 60e982045f
commit e49ad24644
10 changed files with 540 additions and 16 deletions

View file

@ -98,6 +98,43 @@
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>download-single</goal></goals>
<configuration>
<url>https://pypi.python.org/packages/64/5c/01e13b68e8caafece40d549f232c9b5677ad1016071a48d04cc3895acaa3</url>
<fromFile>py4j-${py4j.version}.zip</fromFile>
<toFile>${project.build.directory}/../../interpreter/python/py4j-${py4j.version}.zip</toFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<phase>package</phase>
<configuration>
<target>
<unzip src="${project.build.directory}/../../interpreter/python/py4j-${py4j.version}.zip"
dest="${project.build.directory}/../../interpreter/python"/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>

View file

@ -135,7 +135,6 @@ public class PythonCondaInterpreter extends Interpreter {
private void restartPythonProcess() {
PythonInterpreter python = getPythonInterpreter();
logger.info("-----------> " + python);
python.close();
python.open();
}

View file

@ -17,11 +17,13 @@
package org.apache.zeppelin.python;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
@ -60,7 +62,7 @@ import py4j.GatewayServer;
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class);
public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py";
public static final String ZEPPELIN_PY4JPATH = "python/py4j-0.9-src.zip";
public static final String ZEPPELIN_PY4JPATH = "/interpreter/python/py4j-0.9.2/src";
public static final String DEFAULT_ZEPPELIN_PYTHON = "python";
public static final String MAX_RESULT = "zeppelin.python.maxResult";
@ -98,18 +100,6 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
}
private void createPy4jLib() {
py4jLibPath = System.getProperty("user.dir") +
File.separator + "interpreter" + File.separator + ZEPPELIN_PY4JPATH;
File py4jLib = new File(py4jLibPath);
if (py4jLib.exists()) {
return;
}
copyFile(py4jLib, ZEPPELIN_PY4JPATH);
logger.info("py4j library path : {}", py4jLibPath);
}
private void createPythonScript() {
File out = new File(scriptPath);
@ -136,7 +126,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
private void createGatewayServerAndStartScript() {
createPythonScript();
createPy4jLib();
py4jLibPath = System.getenv("ZEPPELIN_HOME") + ZEPPELIN_PY4JPATH;
port = findRandomOpenPortOnAllLocalInterfaces();
gatewayServer = new GatewayServer(this, port);
@ -253,7 +243,6 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
PythonInterpretRequest req = pythonInterpretRequest;
pythonInterpretRequest = null;
return req;
}
}
@ -427,6 +416,19 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
return foundJob;
}
void bootStrapInterpreter(String file) throws IOException {
BufferedReader bootstrapReader = new BufferedReader(
new InputStreamReader(
PythonInterpreter.class.getResourceAsStream(file)));
String line = null;
String bootstrapCode = "";
while ((line = bootstrapReader.readLine()) != null) {
bootstrapCode += line + "\n";
}
interpret(bootstrapCode, context);
}
public GUI getGui() {
return context.getGui();
}

View file

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import java.io.IOException;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SQL over Pandas DataFrame interpreter for %python group
*
* Match experience of %sparpk.sql over Spark DataFrame
*/
public class PythonInterpreterPandasSql extends Interpreter {
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterPandasSql.class);
private String SQL_BOOTSTRAP_FILE_PY = "/python/bootstrap_sql.py";
public PythonInterpreterPandasSql(Properties property) {
super(property);
}
PythonInterpreter getPythonInterpreter() {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
python = (PythonInterpreter) p;
if (lazy != null) {
lazy.open();
}
return python;
}
@Override
public void open() {
LOG.info("Open Python SQL interpreter instance: {}", this.toString());
try {
LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
PythonInterpreter python = getPythonInterpreter();
python.bootStrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
} catch (IOException e) {
LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
}
}
@Override
public void close() {
LOG.info("Close Python SQL interpreter instance: {}", this.toString());
Interpreter python = getPythonInterpreter();
python.close();
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
Interpreter python = getPythonInterpreter();
return python.interpret("z.show(pysqldf('" + st + "'))\nz._displayhook()", context);
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
}

View file

@ -22,6 +22,17 @@
"editOnDblClick": false
}
},
{
"group": "python",
"name": "sql",
"className": "org.apache.zeppelin.python.PythonInterpreterPandasSql",
"properties": {
},
"editor":{
"language": "sql",
"editOnDblClick": false
}
},
{
"group": "python",
"name": "conda",

View file

@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Setup SQL over Pandas DataFrames
# It requires next dependencies to be installed:
# - pandas
# - pandasql
from __future__ import print_function
try:
from pandasql import sqldf
pysqldf = lambda q: sqldf(q, globals())
except ImportError:
pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" +
"Make sure 'pandas' and 'pandasql' libraries are installed")

View file

@ -25,6 +25,11 @@ import traceback
import warnings
import signal
from io import BytesIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
# for back compatibility

View file

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import java.util.*;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* In order for this test to work, test env must have installed:
* <ol>
* - <li>Python</li>
* - <li>Matplotlib</li>
* <ol>
*
* Your PYTHONPATH should also contain the directory of the Matplotlib
* backend files. Usually these can be found in $ZEPPELIN_HOME/interpreter/lib/python.
*
* To run manually on such environment, use:
* <code>
* mvn -Dpython.test.exclude='' test -pl python -am
* </code>
*/
public class PythonInterpreterMatplotlibTest {
private InterpreterGroup intpGroup;
private PythonInterpreter python;
private InterpreterContext context;
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("zeppelin.python", "python");
p.setProperty("zeppelin.python.maxResult", "100");
intpGroup = new InterpreterGroup();
python = new PythonInterpreter(p);
python.setInterpreterGroup(intpGroup);
python.open();
List<Interpreter> interpreters = new LinkedList<>();
interpreters.add(python);
intpGroup.put("note", interpreters);
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
}
@Test
public void dependenciesAreInstalled() {
// matplotlib
InterpreterResult ret = python.interpret("import matplotlib", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
// inline backend
ret = python.interpret("import backend_zinline", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
public void showPlot() {
// Simple plot test
InterpreterResult ret;
ret = python.interpret("import matplotlib.pyplot as plt", context);
ret = python.interpret("z.configure_mpl(interactive=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret = python.interpret("plt.show()", context);
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.HTML, ret.message().get(0).getType());
assertTrue(ret.message().get(0).getData().contains("data:image/png;base64"));
assertTrue(ret.message().get(0).getData().contains("<div>"));
}
@Test
// Test for when configuration is set to auto-close figures after show().
public void testClose() {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
ret = python.interpret("import matplotlib.pyplot as plt", context);
ret = python.interpret("z.configure_mpl(interactive=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret1 = python.interpret("plt.show()", context);
// Second call to show() should print nothing, and Type should be TEXT.
// This is because when close=True, there should be no living instances
// of FigureManager, causing show() to return before setting the output
// type to HTML.
ret = python.interpret("plt.show()", context);
assertEquals(0, ret.message().size());
// Now test that new plot is drawn. It should be identical to the
// previous one.
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret2 = python.interpret("plt.show()", context);
assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType());
assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData());
}
@Test
// Test for when configuration is set to not auto-close figures after show().
public void testNoClose() {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
ret = python.interpret("import matplotlib.pyplot as plt", context);
ret = python.interpret("z.configure_mpl(interactive=False, close=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret1 = python.interpret("plt.show()", context);
// Second call to show() should print nothing, and Type should be HTML.
// This is because when close=False, there should be living instances
// of FigureManager, causing show() to set the output
// type to HTML even though the figure is inactive.
ret = python.interpret("plt.show()", context);
assertEquals("", ret.message().get(0).getData());
// Now test that plot can be reshown if it is updated. It should be
// different from the previous one because it will plot the same line
// again but in a different color.
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret2 = python.interpret("plt.show()", context);
assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData());
}
}

View file

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
/**
* In order for this test to work, test env must have installed:
* <ol>
* - <li>Python</li>
* - <li>NumPy</li>
* - <li>Pandas</li>
* - <li>PandaSql</li>
* <ol>
*
* To run manually on such environment, use:
* <code>
* mvn -Dpython.test.exclude='' test -pl python -am
* </code>
*/
public class PythonInterpreterPandasSqlTest {
private InterpreterGroup intpGroup;
private PythonInterpreterPandasSql sql;
private PythonInterpreter python;
private InterpreterContext context;
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("zeppelin.python", "python");
p.setProperty("zeppelin.python.maxResult", "100");
intpGroup = new InterpreterGroup();
python = new PythonInterpreter(p);
python.setInterpreterGroup(intpGroup);
python.open();
sql = new PythonInterpreterPandasSql(p);
sql.setInterpreterGroup(intpGroup);
intpGroup.put("note", Arrays.asList(python, sql));
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
//important to be last step
sql.open();
//it depends on python interpreter presence in the same group
}
@Test
public void dependenciesAreInstalled() {
InterpreterResult ret = python.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
public void errorMessageIfDependenciesNotInstalled() {
InterpreterResult ret;
// given
ret = python.interpret(
"pysqldf = lambda q: print('Can not execute SQL as Python dependency is not installed')",
context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
// when
ret = sql.interpret("SELECT * from something", context);
// then
assertNotNull(ret);
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertTrue(ret.message().get(0).getData().contains("dependency is not installed"));
}
@Test
public void sqlOverTestDataPrintsTable() {
InterpreterResult ret;
// given
//String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34";
ret = python.interpret("import pandas as pd", context);
ret = python.interpret("import numpy as np", context);
// DataFrame df2 \w test data
ret = python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), "+
"'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
//when
ret = sql.interpret("select name, age from df2 where age < 40", context);
//then
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
//assertEquals(expectedTable, ret.message()); //somehow it's same but not equal
assertTrue(ret.message().get(0).getData().indexOf("moon\t33") > 0);
assertTrue(ret.message().get(0).getData().indexOf("park\t34") > 0);
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from df2", context).code());
}
@Test
public void badSqlSyntaxFails() {
//when
InterpreterResult ret = sql.interpret("select wrong syntax", context);
//then
assertNotNull("Interpreter returned 'null'", ret);
//System.out.println("\nInterpreter response: \n" + ret.message());
assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().get(0).getData().length() > 0);
}
@Test
public void showDataFrame() {
InterpreterResult ret;
ret = python.interpret("import pandas as pd", context);
ret = python.interpret("import numpy as np", context);
// given a Pandas DataFrame with an index and non-text data
ret = python.interpret("index = pd.Index([10, 11, 12, 13], name='index_name')", context);
ret = python.interpret("d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
ret = python.interpret("df1 = pd.DataFrame(d1, index=index)", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
// when
ret = python.interpret("z.show(df1, show_index=True)", context);
// then
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
assertTrue(ret.message().get(0).getData().indexOf("index_name") == 0);
assertTrue(ret.message().get(0).getData().indexOf("13") > 0);
assertTrue(ret.message().get(0).getData().indexOf("nan") > 0);
assertTrue(ret.message().get(0).getData().indexOf("6.7") > 0);
}
}