initialize python interpreter using py4j

This commit is contained in:
astroshim 2017-03-07 10:43:58 +09:00
parent 252055571b
commit 7304919def
16 changed files with 560 additions and 1851 deletions

View file

@ -350,6 +350,7 @@ public class PythonCondaInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {

View file

@ -1,175 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Helps run python interpreter on a docker container
*/
public class PythonDockerInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(PythonDockerInterpreter.class);
Pattern activatePattern = Pattern.compile("activate\\s*(.*)");
Pattern deactivatePattern = Pattern.compile("deactivate");
Pattern helpPattern = Pattern.compile("help");
public PythonDockerInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
}
@Override
public void close() {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterOutput out = context.out;
Matcher activateMatcher = activatePattern.matcher(st);
Matcher deactivateMatcher = deactivatePattern.matcher(st);
Matcher helpMatcher = helpPattern.matcher(st);
if (st == null || st.isEmpty() || helpMatcher.matches()) {
printUsage(out);
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
} else if (activateMatcher.matches()) {
String image = activateMatcher.group(1);
pull(out, image);
setPythonCommand("docker run -i --rm " + image + " python -iu");
restartPythonProcess();
out.clear();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated");
} else if (deactivateMatcher.matches()) {
setPythonCommand(null);
restartPythonProcess();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Deactivated");
} else {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Not supported command: " + st);
}
}
public void setPythonCommand(String cmd) {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
}
private void printUsage(InterpreterOutput out) {
try {
out.setType(InterpreterResult.Type.HTML);
out.writeResource("output_templates/docker_usage.html");
} catch (IOException e) {
logger.error("Can't print usage", e);
}
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.NONE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
/**
* Use python interpreter's scheduler.
* To make sure %python.docker paragraph and %python paragraph runs sequentially
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
}
private void restartPythonProcess() {
PythonInterpreter python = getPythonInterpreter();
python.close();
python.open();
}
protected PythonInterpreter getPythonInterpreter() {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
python = (PythonInterpreter) p;
if (lazy != null) {
lazy.open();
}
return python;
}
public boolean pull(InterpreterOutput out, String image) {
int exit = 0;
try {
exit = runCommand(out, "docker", "pull", image);
} catch (IOException | InterruptedException e) {
logger.error(e.getMessage(), e);
throw new InterpreterException(e);
}
return exit == 0;
}
protected int runCommand(InterpreterOutput out, String... command)
throws IOException, InterruptedException {
ProcessBuilder builder = new ProcessBuilder(command);
builder.redirectErrorStream(true);
Process process = builder.start();
InputStream stdout = process.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
String line;
while ((line = br.readLine()) != null) {
out.write(line + "\n");
}
int r = process.waitFor(); // Let the process finish.
return r;
}
}

View file

@ -17,21 +17,36 @@
package org.apache.zeppelin.python;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.ServerSocket;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -43,28 +58,110 @@ import py4j.GatewayServer;
/**
* Python interpreter for Zeppelin.
*/
public class PythonInterpreter extends Interpreter {
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class);
public static final String BOOTSTRAP_PY = "/bootstrap.py";
public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py";
public static final String ZEPPELIN_PYTHON = "zeppelin.python";
public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py";
public static final String DEFAULT_ZEPPELIN_PYTHON = "python";
public static final String MAX_RESULT = "zeppelin.python.maxResult";
private Integer port;
private GatewayServer gatewayServer;
private Boolean py4JisInstalled = false;
private InterpreterContext context;
private Pattern errorInLastLine = Pattern.compile(".*(Error|Exception): .*$");
private String pythonPath;
private int maxResult;
PythonProcess process = null;
private String pythonCommand = null;
private String pythonCommand = DEFAULT_ZEPPELIN_PYTHON;
private GatewayServer gatewayServer;
private DefaultExecutor executor;
private int port;
private InterpreterOutputStream outputStream;
private BufferedWriter ins;
private PipedInputStream in;
private ByteArrayOutputStream input;
private String scriptPath;
boolean pythonscriptRunning = false;
private static final int MAX_TIMEOUT_SEC = 10;
private long pythonPid = 0;
Integer statementSetNotifier = new Integer(0);
public PythonInterpreter(Properties property) {
super(property);
try {
File scriptFile = File.createTempFile("zeppelin_python-", ".py");
scriptPath = scriptFile.getAbsolutePath();
} catch (IOException e) {
throw new InterpreterException(e);
}
}
private void createPythonScript() {
ClassLoader classLoader = getClass().getClassLoader();
File out = new File(scriptPath);
if (out.exists() && out.isDirectory()) {
throw new InterpreterException("Can't create python script " + out.getAbsolutePath());
}
try {
FileOutputStream outStream = new FileOutputStream(out);
IOUtils.copy(
classLoader.getResourceAsStream(ZEPPELIN_PYTHON),
outStream);
outStream.close();
} catch (IOException e) {
throw new InterpreterException(e);
}
logger.info("File {} created", scriptPath);
}
private void createGatewayServerAndStartScript() {
// create python script
createPythonScript();
port = findRandomOpenPortOnAllLocalInterfaces();
gatewayServer = new GatewayServer(this, port);
gatewayServer.start();
// Run python shell
CommandLine cmd = CommandLine.parse(getPythonCommand());
cmd.addArgument(scriptPath, false);
cmd.addArgument(Integer.toString(port), false);
executor = new DefaultExecutor();
outputStream = new InterpreterOutputStream(logger);
PipedOutputStream ps = new PipedOutputStream();
in = null;
try {
in = new PipedInputStream(ps);
} catch (IOException e1) {
throw new InterpreterException(e1);
}
ins = new BufferedWriter(new OutputStreamWriter(ps));
input = new ByteArrayOutputStream();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in);
executor.setStreamHandler(streamHandler);
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
try {
Map env = EnvironmentUtils.getProcEnvironment();
env.put("PYTHONPATH", "/Users/shim/zeppelin/interpreter/python/py4j-0.8.2.1-src.zip");
executor.execute(cmd, env, this);
pythonscriptRunning = true;
} catch (IOException e) {
throw new InterpreterException(e);
}
try {
input.write("import sys, getopt\n".getBytes());
ins.flush();
} catch (IOException e) {
throw new InterpreterException(e);
}
}
@Override
@ -72,83 +169,188 @@ public class PythonInterpreter extends Interpreter {
// Add matplotlib display hook
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
registerHook(HookType.POST_EXEC_DEV, "\nz._displayhook()");
registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
}
// Add zeppelin-bundled libs to PYTHONPATH
setPythonPath("../interpreter/lib/python:$PYTHONPATH");
LOG.info("Starting Python interpreter ---->");
LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON));
maxResult = Integer.valueOf(getProperty(MAX_RESULT));
process = getPythonProcess();
try {
process.open();
} catch (IOException e) {
LOG.error("Can't start the python process", e);
}
try {
LOG.info("python PID : " + process.getPid());
} catch (Exception e) {
LOG.warn("Can't find python pid process", e);
}
try {
LOG.info("Bootstrap interpreter with " + BOOTSTRAP_PY);
bootStrapInterpreter(BOOTSTRAP_PY);
} catch (IOException e) {
LOG.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e);
}
py4JisInstalled = isPy4jInstalled();
if (py4JisInstalled) {
port = findRandomOpenPortOnAllLocalInterfaces();
LOG.info("Py4j gateway port : " + port);
try {
gatewayServer = new GatewayServer(this, port);
gatewayServer.start();
LOG.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY);
bootStrapInterpreter(BOOTSTRAP_INPUT_PY);
} catch (IOException e) {
LOG.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " +
"initialize Zeppelin inputs in python process", e);
}
}
// Add matplotlib display hook
createGatewayServerAndStartScript();
}
@Override
public void close() {
LOG.info("closing Python interpreter <----");
pythonscriptRunning = false;
pythonScriptInitialized = false;
try {
if (process != null) {
process.close();
process = null;
}
if (gatewayServer != null) {
gatewayServer.shutdown();
}
ins.flush();
ins.close();
input.flush();
input.close();
} catch (IOException e) {
LOG.error("Can't close the interpreter", e);
e.printStackTrace();
}
executor.getWatchdog().destroyProcess();
new File(scriptPath).delete();
gatewayServer.shutdown();
// wait until getStatements stop
synchronized (statementSetNotifier) {
try {
statementSetNotifier.wait(1500);
} catch (InterruptedException e) {
}
statementSetNotifier.notify();
}
}
PythonInterpretRequest pythonInterpretRequest = null;
/**
*
*/
public class PythonInterpretRequest {
public String statements;
public String jobGroup;
public PythonInterpretRequest(String statements, String jobGroup) {
this.statements = statements;
this.jobGroup = jobGroup;
}
public String statements() {
return statements;
}
public String jobGroup() {
return jobGroup;
}
}
public PythonInterpretRequest getStatements() {
synchronized (statementSetNotifier) {
while (pythonInterpretRequest == null && pythonscriptRunning && pythonScriptInitialized) {
try {
logger.info("astro statementSetNotifier waiting... : {}, {}, {}",
statementSetNotifier, pythonscriptRunning, pythonScriptInitialized);
statementSetNotifier.wait(1000);
} catch (InterruptedException e) {
}
}
PythonInterpretRequest req = pythonInterpretRequest;
pythonInterpretRequest = null;
return req;
}
}
String statementOutput = null;
boolean statementError = false;
Integer statementFinishedNotifier = new Integer(0);
public void setStatementsFinished(String out, boolean error) {
synchronized (statementFinishedNotifier) {
statementOutput = out;
statementError = error;
statementFinishedNotifier.notify();
}
}
boolean pythonScriptInitialized = false;
Integer pythonScriptInitializeNotifier = new Integer(0);
public void onPythonScriptInitialized(long pid) {
pythonPid = pid;
synchronized (pythonScriptInitializeNotifier) {
pythonScriptInitialized = true;
pythonScriptInitializeNotifier.notifyAll();
}
}
public void appendOutput(String message) throws IOException {
outputStream.getInterpreterOutput().write(message);
}
/////////////////////////////////////////////
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
if (cmd == null || cmd.isEmpty()) {
return new InterpreterResult(Code.SUCCESS, "");
}
this.context = contextInterpreter;
String output = sendCommandToPython(cmd);
InterpreterResult result;
if (pythonErrorIn(output)) {
result = new InterpreterResult(Code.ERROR, output.replaceAll("\\.\\.\\.", ""));
} else {
result = new InterpreterResult(Code.SUCCESS, output);
this.context = contextInterpreter;
if (!pythonscriptRunning) {
return new InterpreterResult(Code.ERROR, "python process not running"
+ outputStream.toString());
}
outputStream.setInterpreterOutput(context.out);
synchronized (pythonScriptInitializeNotifier) {
long startTime = System.currentTimeMillis();
while (pythonScriptInitialized == false
&& pythonscriptRunning
&& System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
try {
pythonScriptInitializeNotifier.wait(1000);
} catch (InterruptedException e) {
}
}
}
List<InterpreterResultMessage> errorMessage;
try {
context.out.flush();
errorMessage = context.out.toInterpreterResultMessage();
} catch (IOException e) {
throw new InterpreterException(e);
}
if (pythonscriptRunning == false) {
// python script failed to initialize and terminated
errorMessage.add(new InterpreterResultMessage(
InterpreterResult.Type.TEXT, "failed to start python"));
return new InterpreterResult(Code.ERROR, errorMessage);
}
if (pythonScriptInitialized == false) {
// timeout. didn't get initialized message
errorMessage.add(new InterpreterResultMessage(
InterpreterResult.Type.TEXT, "python is not responding"));
return new InterpreterResult(Code.ERROR, errorMessage);
}
pythonInterpretRequest = new PythonInterpretRequest(cmd, null);
statementOutput = null;
synchronized (statementSetNotifier) {
statementSetNotifier.notify();
}
synchronized (statementFinishedNotifier) {
while (statementOutput == null) {
try {
statementFinishedNotifier.wait(1000);
} catch (InterruptedException e) {
}
}
}
if (statementError) {
return new InterpreterResult(Code.ERROR, statementOutput);
} else {
try {
context.out.flush();
} catch (IOException e) {
throw new InterpreterException(e);
}
return new InterpreterResult(Code.SUCCESS);
}
return result;
}
/**
@ -171,12 +373,22 @@ public class PythonInterpreter extends Interpreter {
return isError;
}
public void interrupt() throws IOException {
if (pythonPid > -1) {
logger.info("Sending SIGINT signal to PID : " + pythonPid);
Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
} else {
logger.warn("Non UNIX/Linux system, close the interpreter");
close();
}
}
@Override
public void cancel(InterpreterContext context) {
try {
process.interrupt();
interrupt();
} catch (IOException e) {
LOG.error("Can't interrupt the python interpreter", e);
e.printStackTrace();
}
}
@ -205,18 +417,6 @@ public class PythonInterpreter extends Interpreter {
this.pythonPath = pythonPath;
}
public PythonProcess getPythonProcess() {
if (process == null) {
String binPath = getProperty(ZEPPELIN_PYTHON);
if (pythonCommand != null) {
binPath = pythonCommand;
}
return new PythonProcess(binPath, pythonPath);
} else {
return process;
}
}
public void setPythonCommand(String cmd) {
pythonCommand = cmd;
}
@ -237,41 +437,6 @@ public class PythonInterpreter extends Interpreter {
return foundJob;
}
/**
* Sends given text to Python interpreter, blocks and returns the output
* @param cmd Python expression text
* @return output
*/
String sendCommandToPython(String cmd) {
String output = "";
LOG.debug("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd));
try {
output = process.sendAndGetResult(cmd);
} catch (IOException e) {
LOG.error("Error when sending commands to python process", e);
}
LOG.debug("Got : \n" + output);
return output;
}
void bootStrapInterpreter(String file) throws IOException {
BufferedReader bootstrapReader = new BufferedReader(
new InputStreamReader(
PythonInterpreter.class.getResourceAsStream(file)));
String line = null;
String bootstrapCode = "";
while ((line = bootstrapReader.readLine()) != null) {
bootstrapCode += line + "\n";
}
if (py4JisInstalled && port != null && port != -1) {
bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString());
}
LOG.info("Bootstrap python interpreter with code from \n " + file);
sendCommandToPython(bootstrapCode);
}
public GUI getGui() {
return context.getGui();
}
@ -280,11 +445,6 @@ public class PythonInterpreter extends Interpreter {
return port;
}
public Boolean isPy4jInstalled() {
String output = sendCommandToPython("\n\nimport py4j\n");
return !output.contains("ImportError");
}
private int findRandomOpenPortOnAllLocalInterfaces() {
Integer port = -1;
try (ServerSocket socket = new ServerSocket(0);) {
@ -299,4 +459,16 @@ public class PythonInterpreter extends Interpreter {
public int getMaxResult() {
return maxResult;
}
@Override
public void onProcessComplete(int exitValue) {
pythonscriptRunning = false;
logger.info("python process terminated. exit code " + exitValue);
}
@Override
public void onProcessFailed(ExecuteException e) {
pythonscriptRunning = false;
logger.error("python process failed", e);
}
}

View file

@ -1,115 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import java.io.IOException;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SQL over Pandas DataFrame interpreter for %python group
*
* Match experience of %sparpk.sql over Spark DataFrame
*/
public class PythonInterpreterPandasSql extends Interpreter {
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterPandasSql.class);
private String SQL_BOOTSTRAP_FILE_PY = "/bootstrap_sql.py";
public PythonInterpreterPandasSql(Properties property) {
super(property);
}
PythonInterpreter getPythonInterpreter() {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
python = (PythonInterpreter) p;
if (lazy != null) {
lazy.open();
}
return python;
}
@Override
public void open() {
LOG.info("Open Python SQL interpreter instance: {}", this.toString());
try {
LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
PythonInterpreter python = getPythonInterpreter();
python.bootStrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
} catch (IOException e) {
LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
}
}
/**
* Checks if Python dependencies pandas and pandasql are installed
* @return True if they are
*/
boolean isPandasAndPandasqlInstalled() {
PythonInterpreter python = getPythonInterpreter();
String output = python.sendCommandToPython("\n\nimport pandas\nimport pandasql\n");
return !output.contains("ImportError");
}
@Override
public void close() {
LOG.info("Close Python SQL interpreter instance: {}", this.toString());
Interpreter python = getPythonInterpreter();
python.close();
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
Interpreter python = getPythonInterpreter();
return python.interpret("z.show(pysqldf('" + st + "'))", context);
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
}

View file

@ -1,138 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.OutputStream;
import java.lang.reflect.Field;
/**
* Object encapsulated interactive
* Python process (REPL) used by python interpreter
*/
public class PythonProcess {
private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class);
private static final String STATEMENT_END = "*!?flush reader!?*";
InputStream stdout;
OutputStream stdin;
PrintWriter writer;
BufferedReader reader;
Process process;
private String binPath;
private String pythonPath;
private long pid;
public PythonProcess(String binPath, String pythonPath) {
this.binPath = binPath;
this.pythonPath = pythonPath;
}
public void open() throws IOException {
ProcessBuilder builder;
boolean hasParams = binPath.split(" ").length > 1;
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
if (hasParams) {
builder = new ProcessBuilder(binPath.split(" "));
} else {
builder = new ProcessBuilder(binPath, "-iu");
}
} else {
String cmd;
if (hasParams) {
cmd = binPath;
} else {
cmd = binPath + " -iu";
}
builder = new ProcessBuilder("bash", "-c", cmd);
if (pythonPath != null) {
builder.environment().put("PYTHONPATH", pythonPath);
}
}
builder.redirectErrorStream(true);
process = builder.start();
stdout = process.getInputStream();
stdin = process.getOutputStream();
writer = new PrintWriter(stdin, true);
reader = new BufferedReader(new InputStreamReader(stdout));
try {
pid = findPid();
} catch (Exception e) {
logger.warn("Can't find python pid process", e);
pid = -1;
}
}
public void close() throws IOException {
process.destroy();
reader.close();
writer.close();
stdin.close();
stdout.close();
}
public void interrupt() throws IOException {
if (pid > -1) {
logger.info("Sending SIGINT signal to PID : " + pid);
Runtime.getRuntime().exec("kill -SIGINT " + pid);
} else {
logger.warn("Non UNIX/Linux system, close the interpreter");
close();
}
}
public String sendAndGetResult(String cmd) throws IOException {
writer.println(cmd);
writer.println();
writer.println("\"" + STATEMENT_END + "\"");
StringBuilder output = new StringBuilder();
String line = null;
while ((line = reader.readLine()) != null &&
!line.contains(STATEMENT_END)) {
logger.debug("Read line from python shell : " + line);
output.append(line + "\n");
}
return output.toString();
}
private long findPid() throws NoSuchFieldException, IllegalAccessException {
long pid = -1;
if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
Field f = process.getClass().getDeclaredField("pid");
f.setAccessible(true);
pid = f.getLong(process);
f.setAccessible(false);
}
return pid;
}
public long getPid() {
return pid;
}
}

View file

@ -1,14 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -1,234 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# PYTHON 2 / 3 compatibility :
# bootstrap.py must be runnable with Python 2 or 3
import os
import sys
import signal
import base64
import warnings
from io import BytesIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
def intHandler(signum, frame): # Set the signal handler
print ("Paragraph interrupted")
raise KeyboardInterrupt()
signal.signal(signal.SIGINT, intHandler)
# set prompt as empty string so that java side don't need to remove the prompt.
sys.ps1=""
def help():
print("""%html
<h2>Python Interpreter help</h2>
<h3>Python 2 & 3 compatibility</h3>
<p>The interpreter is compatible with Python 2 & 3.<br/>
To change Python version,
change in the interpreter configuration the python to the
desired version (example : python=/usr/bin/python3)</p>
<h3>Python modules</h3>
<p>The interpreter can use all modules already installed
(with pip, easy_install, etc)</p>
<h3>Forms</h3>
You must install py4j in order to use
the form feature (pip install py4j)
<h4>Input form</h4>
<pre>print (z.input("f1","defaultValue"))</pre>
<h4>Selection form</h4>
<pre>print(z.select("f2", [("o1","1"), ("o2","2")],2))</pre>
<h4>Checkbox form</h4>
<pre> print("".join(z.checkbox("f3", [("o1","1"), ("o2","2")],["1"])))</pre>')
<h3>Matplotlib graph</h3>
<div>The interpreter can display matplotlib graph with
the function z.show()</div>
<div> You need to already have matplotlib module installed
to use this functionality !</div><br/>
<pre>import matplotlib.pyplot as plt
plt.figure()
(.. ..)
z.show(plt)
plt.close()
</pre>
<div><br/> z.show function can take optional parameters
to adapt graph dimensions (width and height) and format (png or svg)</div>
<div><b>example </b>:
<pre>z.show(plt,width='50px
z.show(plt,height='150px', fmt='svg') </pre></div>
<h3>Pandas DataFrame</h3>
<div> You need to have Pandas module installed
to use this functionality (pip install pandas) !</div><br/>
<div>The interpreter can visualize Pandas DataFrame
with the function z.show()
<pre>
import pandas as pd
df = pd.read_csv("bank.csv", sep=";")
z.show(df)
</pre></div>
<h3>SQL over Pandas DataFrame</h3>
<div> You need to have Pandas&Pandasql modules installed
to use this functionality (pip install pandas pandasql) !</div><br/>
<div>Python interpreter group includes %sql interpreter that can query
Pandas DataFrames using SQL and visualize results using Zeppelin Table Display System
<pre>
%python
import pandas as pd
df = pd.read_csv("bank.csv", sep=";")
</pre>
<br />
<pre>
%python.sql
%sql
SELECT * from df LIMIT 5
</pre>
</div>
""")
class PyZeppelinContext(object):
""" If py4j is detected, these class will be override
with the implementation in bootstrap_input.py
"""
errorMsg = "You must install py4j Python module " \
"(pip install py4j) to use Zeppelin dynamic forms features"
def __init__(self):
self.max_result = 1000
self._displayhook = lambda *args: None
self._setup_matplotlib()
def input(self, name, defaultValue=""):
print(self.errorMsg)
def select(self, name, options, defaultValue=""):
print(self.errorMsg)
def checkbox(self, name, options, defaultChecked=[]):
print(self.errorMsg)
def show(self, p, **kwargs):
if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
self.show_matplotlib(p, **kwargs)
elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
# and so a dependency on pandas
self.show_dataframe(p, **kwargs)
elif hasattr(p, '__call__'):
p() #error reporting
def show_dataframe(self, df, show_index=False, **kwargs):
"""Pretty prints DF using Table Display System
"""
limit = len(df) > self.max_result
header_buf = StringIO("")
if show_index:
idx_name = str(df.index.name) if df.index.name is not None else ""
header_buf.write(idx_name + "\t")
header_buf.write(str(df.columns[0]))
for col in df.columns[1:]:
header_buf.write("\t")
header_buf.write(str(col))
header_buf.write("\n")
body_buf = StringIO("")
rows = df.head(self.max_result).values if limit else df.values
index = df.index.values
for idx, row in zip(index, rows):
if show_index:
body_buf.write("%html <strong>{}</strong>".format(idx))
body_buf.write("\t")
body_buf.write(str(row[0]))
for cell in row[1:]:
body_buf.write("\t")
body_buf.write(str(cell))
body_buf.write("\n")
body_buf.seek(0); header_buf.seek(0)
#TODO(bzz): fix it, so it shows red notice, as in Spark
print("%table " + header_buf.read() + body_buf.read()) # +
# ("\n<font color=red>Results are limited by {}.</font>" \
# .format(self.max_result) if limit else "")
#)
body_buf.close(); header_buf.close()
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
**kwargs):
"""Matplotlib show function
"""
if fmt == "png":
img = BytesIO()
p.savefig(img, format=fmt)
img_str = b"data:image/png;base64,"
img_str += base64.b64encode(img.getvalue().strip())
img_tag = "<img src={img} style='width={width};height:{height}'>"
# Decoding is necessary for Python 3 compability
img_str = img_str.decode("ascii")
img_str = img_tag.format(img=img_str, width=width, height=height)
elif fmt == "svg":
img = StringIO()
p.savefig(img, format=fmt)
img_str = img.getvalue()
else:
raise ValueError("fmt must be 'png' or 'svg'")
html = "%html <div style='width:{width};height:{height}'>{img}<div>"
print(html.format(width=width, height=height, img=img_str))
img.close()
def configure_mpl(self, **kwargs):
import mpl_config
mpl_config.configure(**kwargs)
def _setup_matplotlib(self):
# If we don't have matplotlib installed don't bother continuing
try:
import matplotlib
except ImportError:
return
# Make sure custom backends are available in the PYTHONPATH
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
if mpl_path not in sys.path:
sys.path.append(mpl_path)
# Finally check if backend exists, and if so configure as appropriate
try:
matplotlib.use('module://backend_zinline')
import backend_zinline
# Everything looks good so make config assuming that we are using
# an inline backend
self._displayhook = backend_zinline.displayhook
self.configure_mpl(width=600, height=400, dpi=72,
fontsize=10, interactive=True, format='png')
except ImportError:
# Fall back to Agg if no custom backend installed
matplotlib.use('Agg')
warnings.warn("Unable to load inline matplotlib backend, "
"falling back to Agg")
z = PyZeppelinContext()

View file

@ -1,58 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from py4j.java_gateway import JavaGateway
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
client = GatewayClient(port=%PORT%)
gateway = JavaGateway(client)
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
class Py4jZeppelinContext(PyZeppelinContext):
"""A context impl that uses Py4j to communicate to JVM
"""
def __init__(self, z):
PyZeppelinContext.__init__(self)
self.z = z
self.paramOption = gateway.jvm.org.apache.zeppelin.display.Input.ParamOption
self.javaList = gateway.jvm.java.util.ArrayList
self.max_result = self.z.getMaxResult()
def input(self, name, defaultValue=""):
return self.z.getGui().input(name, defaultValue)
def select(self, name, options, defaultValue=""):
javaOptions = gateway.new_array(self.paramOption, len(options))
i = 0
for tuple in options:
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
i += 1
return self.z.getGui().select(name, defaultValue, javaOptions)
def checkbox(self, name, options, defaultChecked=[]):
javaOptions = gateway.new_array(self.paramOption, len(options))
i = 0
for tuple in options:
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
i += 1
javaDefaultCheck = self.javaList()
for check in defaultChecked:
javaDefaultCheck.append(check)
return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions)
z = Py4jZeppelinContext(gateway.entry_point)

View file

@ -1,28 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Setup SQL over Pandas DataFrames
# It requires next dependencies to be installed:
# - pandas
# - pandasql
from __future__ import print_function
try:
from pandasql import sqldf
pysqldf = lambda q: sqldf(q, globals())
except ImportError:
pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" +
"Make sure 'pandas' and 'pandasql' libraries are installed")

View file

@ -0,0 +1,263 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os, sys, getopt, traceback, json, re
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from py4j.protocol import Py4JJavaError
import warnings
import ast
import traceback
import warnings
import signal
# for back compatibility
class Logger(object):
def __init__(self):
pass
def write(self, message):
intp.appendOutput(message)
def reset(self):
pass
def flush(self):
pass
class PyZeppelinContext(object):
""" If py4j is detected, these class will be override
with the implementation in bootstrap_input.py
"""
errorMsg = "You must install py4j Python module " \
"(pip install py4j) to use Zeppelin dynamic forms features"
def __init__(self):
self.max_result = 1000
self._displayhook = lambda *args: None
self._setup_matplotlib()
def input(self, name, defaultValue=""):
print(self.errorMsg)
def select(self, name, options, defaultValue=""):
print(self.errorMsg)
def checkbox(self, name, options, defaultChecked=[]):
print(self.errorMsg)
def show(self, p, **kwargs):
if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
self.show_matplotlib(p, **kwargs)
elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
# and so a dependency on pandas
self.show_dataframe(p, **kwargs)
elif hasattr(p, '__call__'):
p() #error reporting
def show_dataframe(self, df, show_index=False, **kwargs):
"""Pretty prints DF using Table Display System
"""
limit = len(df) > self.max_result
header_buf = StringIO("")
if show_index:
idx_name = str(df.index.name) if df.index.name is not None else ""
header_buf.write(idx_name + "\t")
header_buf.write(str(df.columns[0]))
for col in df.columns[1:]:
header_buf.write("\t")
header_buf.write(str(col))
header_buf.write("\n")
body_buf = StringIO("")
rows = df.head(self.max_result).values if limit else df.values
index = df.index.values
for idx, row in zip(index, rows):
if show_index:
body_buf.write("%html <strong>{}</strong>".format(idx))
body_buf.write("\t")
body_buf.write(str(row[0]))
for cell in row[1:]:
body_buf.write("\t")
body_buf.write(str(cell))
body_buf.write("\n")
body_buf.seek(0); header_buf.seek(0)
#TODO(bzz): fix it, so it shows red notice, as in Spark
print("%table " + header_buf.read() + body_buf.read()) # +
# ("\n<font color=red>Results are limited by {}.</font>" \
# .format(self.max_result) if limit else "")
#)
body_buf.close(); header_buf.close()
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
**kwargs):
"""Matplotlib show function
"""
if fmt == "png":
img = BytesIO()
p.savefig(img, format=fmt)
img_str = b"data:image/png;base64,"
img_str += base64.b64encode(img.getvalue().strip())
img_tag = "<img src={img} style='width={width};height:{height}'>"
# Decoding is necessary for Python 3 compability
img_str = img_str.decode("ascii")
img_str = img_tag.format(img=img_str, width=width, height=height)
elif fmt == "svg":
img = StringIO()
p.savefig(img, format=fmt)
img_str = img.getvalue()
else:
raise ValueError("fmt must be 'png' or 'svg'")
html = "%html <div style='width:{width};height:{height}'>{img}<div>"
print(html.format(width=width, height=height, img=img_str))
img.close()
def configure_mpl(self, **kwargs):
import mpl_config
mpl_config.configure(**kwargs)
def _setup_matplotlib(self):
# If we don't have matplotlib installed don't bother continuing
try:
import matplotlib
except ImportError:
return
# Make sure custom backends are available in the PYTHONPATH
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
if mpl_path not in sys.path:
sys.path.append(mpl_path)
# Finally check if backend exists, and if so configure as appropriate
try:
matplotlib.use('module://backend_zinline')
import backend_zinline
# Everything looks good so make config assuming that we are using
# an inline backend
self._displayhook = backend_zinline.displayhook
self.configure_mpl(width=600, height=400, dpi=72,
fontsize=10, interactive=True, format='png')
except ImportError:
# Fall back to Agg if no custom backend installed
matplotlib.use('Agg')
warnings.warn("Unable to load inline matplotlib backend, "
"falling back to Agg")
def handler_stop_signals(sig, frame):
sys.exit("Got signal : " + str(sig))
signal.signal(signal.SIGINT, handler_stop_signals)
output = Logger()
sys.stdout = output
sys.stderr = output
client = GatewayClient(port=int(sys.argv[1]))
#gateway = JavaGateway(client, auto_convert = True)
gateway = JavaGateway(client)
intp = gateway.entry_point
intp.onPythonScriptInitialized(os.getpid())
z = PyZeppelinContext()
z._setup_matplotlib()
while True :
req = intp.getStatements()
try:
stmts = req.statements().split("\n")
jobGroup = req.jobGroup()
final_code = []
# Get post-execute hooks
try:
global_hook = intp.getHook('post_exec_dev')
except:
global_hook = None
try:
user_hook = z.getHook('post_exec')
except:
user_hook = None
nhooks = 0
for hook in (global_hook, user_hook):
if hook:
nhooks += 1
for s in stmts:
if s == None:
continue
# skip comment
s_stripped = s.strip()
if len(s_stripped) == 0 or s_stripped.startswith("#"):
continue
final_code.append(s)
if final_code:
# use exec mode to compile the statements except the last statement,
# so that the last statement's evaluation will be printed to stdout
#sc.setJobGroup(jobGroup, "Zeppelin")
code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
to_run_hooks = []
if (nhooks > 0):
to_run_hooks = code.body[-nhooks:]
to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
[code.body[-(nhooks + 1)]])
try:
for node in to_run_exec:
mod = ast.Module([node])
code = compile(mod, '<stdin>', 'exec')
exec(code)
for node in to_run_single:
mod = ast.Interactive([node])
code = compile(mod, '<stdin>', 'single')
exec(code)
for node in to_run_hooks:
mod = ast.Module([node])
code = compile(mod, '<stdin>', 'exec')
exec(code)
except:
raise Exception(traceback.format_exc())
intp.setStatementsFinished("", False)
except Py4JJavaError:
excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
innerErrorStart = excInnerError.find("Py4JJavaError:")
if innerErrorStart > -1:
excInnerError = excInnerError[innerErrorStart:]
intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True)
except:
intp.setStatementsFinished(traceback.format_exc(), True)
output.reset()

View file

@ -1,135 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
public class PythonCondaInterpreterTest {
private PythonCondaInterpreter conda;
private PythonInterpreter python;
@Before
public void setUp() {
conda = spy(new PythonCondaInterpreter(new Properties()));
python = mock(PythonInterpreter.class);
InterpreterGroup group = new InterpreterGroup();
group.put("note", Arrays.asList(python, conda));
python.setInterpreterGroup(group);
conda.setInterpreterGroup(group);
doReturn(python).when(conda).getPythonInterpreter();
}
private void setMockCondaEnvList() throws IOException, InterruptedException {
Map<String, String> envList = new LinkedHashMap<String, String>();
envList.put("env1", "/path1");
envList.put("env2", "/path2");
doReturn(envList).when(conda).getCondaEnvs();
}
@Test
public void testListEnv() throws IOException, InterruptedException {
setMockCondaEnvList();
// list available env
InterpreterContext context = getInterpreterContext();
InterpreterResult result = conda.interpret("env list", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.toString().contains(">env1<"));
assertTrue(result.toString().contains("/path1<"));
assertTrue(result.toString().contains(">env2<"));
assertTrue(result.toString().contains("/path2<"));
}
@Test
public void testActivateEnv() throws IOException, InterruptedException {
setMockCondaEnvList();
InterpreterContext context = getInterpreterContext();
conda.interpret("activate env1", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
verify(python).setPythonCommand("/path1/bin/python");
}
@Test
public void testDeactivate() {
InterpreterContext context = getInterpreterContext();
conda.interpret("deactivate", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
verify(python).setPythonCommand("python");
}
@Test
public void testParseCondaCommonStdout()
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder()
.append("# comment1\n")
.append("# comment2\n")
.append("env1 /location1\n")
.append("env2 /location2\n");
Map<String, String> locationPerEnv =
PythonCondaInterpreter.parseCondaCommonStdout(sb.toString());
assertEquals("/location1", locationPerEnv.get("env1"));
assertEquals("/location2", locationPerEnv.get("env2"));
}
@Test
public void testGetRestArgsFromMatcher() {
Matcher m =
PythonCondaInterpreter.PATTERN_COMMAND_ENV.matcher("env remove --name test --yes");
m.matches();
List<String> restArgs = PythonCondaInterpreter.getRestArgsFromMatcher(m);
List<String> expected = Arrays.asList(new String[]{"remove", "--name", "test", "--yes"});
assertEquals(expected, restArgs);
}
private InterpreterContext getInterpreterContext() {
return new InterpreterContext(
"noteId",
"paragraphId",
null,
"paragraphTitle",
"paragraphText",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
null,
null,
null,
new InterpreterOutput(null));
}
}

View file

@ -1,87 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
public class PythonDockerInterpreterTest {
private PythonDockerInterpreter docker;
private PythonInterpreter python;
@Before
public void setUp() {
docker = spy(new PythonDockerInterpreter(new Properties()));
python = mock(PythonInterpreter.class);
InterpreterGroup group = new InterpreterGroup();
group.put("note", Arrays.asList(python, docker));
python.setInterpreterGroup(group);
docker.setInterpreterGroup(group);
doReturn(true).when(docker).pull(any(InterpreterOutput.class), anyString());
doReturn(python).when(docker).getPythonInterpreter();
}
@Test
public void testActivateEnv() {
InterpreterContext context = getInterpreterContext();
docker.interpret("activate env", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
verify(docker, times(1)).pull(any(InterpreterOutput.class), anyString());
verify(python).setPythonCommand("docker run -i --rm env python -iu");
}
@Test
public void testDeactivate() {
InterpreterContext context = getInterpreterContext();
docker.interpret("deactivate", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
verify(python).setPythonCommand(null);
}
private InterpreterContext getInterpreterContext() {
return new InterpreterContext(
"noteId",
"paragraphId",
"replName",
"paragraphTitle",
"paragraphText",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
null,
null,
null,
new InterpreterOutput(null));
}
}

View file

@ -1,160 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import java.util.*;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* In order for this test to work, test env must have installed:
* <ol>
* - <li>Python</li>
* - <li>Matplotlib</li>
* <ol>
*
* Your PYTHONPATH should also contain the directory of the Matplotlib
* backend files. Usually these can be found in $ZEPPELIN_HOME/interpreter/lib/python.
*
* To run manually on such environment, use:
* <code>
* mvn -Dpython.test.exclude='' test -pl python -am
* </code>
*/
public class PythonInterpreterMatplotlibTest {
private InterpreterGroup intpGroup;
private PythonInterpreter python;
private InterpreterContext context;
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("zeppelin.python", "python");
p.setProperty("zeppelin.python.maxResult", "100");
intpGroup = new InterpreterGroup();
python = new PythonInterpreter(p);
python.setInterpreterGroup(intpGroup);
python.open();
List<Interpreter> interpreters = new LinkedList<>();
interpreters.add(python);
intpGroup.put("note", interpreters);
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
}
@Test
public void dependenciesAreInstalled() {
// matplotlib
InterpreterResult ret = python.interpret("import matplotlib", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
// inline backend
ret = python.interpret("import backend_zinline", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
public void showPlot() {
// Simple plot test
InterpreterResult ret;
ret = python.interpret("import matplotlib.pyplot as plt", context);
ret = python.interpret("z.configure_mpl(interactive=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret = python.interpret("plt.show()", context);
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.HTML, ret.message().get(0).getType());
assertTrue(ret.message().get(0).getData().contains("data:image/png;base64"));
assertTrue(ret.message().get(0).getData().contains("<div>"));
}
@Test
// Test for when configuration is set to auto-close figures after show().
public void testClose() {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
ret = python.interpret("import matplotlib.pyplot as plt", context);
ret = python.interpret("z.configure_mpl(interactive=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret1 = python.interpret("plt.show()", context);
// Second call to show() should print nothing, and Type should be TEXT.
// This is because when close=True, there should be no living instances
// of FigureManager, causing show() to return before setting the output
// type to HTML.
ret = python.interpret("plt.show()", context);
assertEquals(0, ret.message().size());
// Now test that new plot is drawn. It should be identical to the
// previous one.
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret2 = python.interpret("plt.show()", context);
assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType());
assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData());
}
@Test
// Test for when configuration is set to not auto-close figures after show().
public void testNoClose() {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
ret = python.interpret("import matplotlib.pyplot as plt", context);
ret = python.interpret("z.configure_mpl(interactive=False, close=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret1 = python.interpret("plt.show()", context);
// Second call to show() should print nothing, and Type should be HTML.
// This is because when close=False, there should be living instances
// of FigureManager, causing show() to set the output
// type to HTML even though the figure is inactive.
ret = python.interpret("plt.show()", context);
assertEquals("", ret.message().get(0).getData());
// Now test that plot can be reshown if it is updated. It should be
// different from the previous one because it will plot the same line
// again but in a different color.
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret2 = python.interpret("plt.show()", context);
assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData());
}
}

View file

@ -1,175 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
/**
* In order for this test to work, test env must have installed:
* <ol>
* - <li>Python</li>
* - <li>NumPy</li>
* - <li>Pandas</li>
* - <li>PandaSql</li>
* <ol>
*
* To run manually on such environment, use:
* <code>
* mvn -Dpython.test.exclude='' test -pl python -am
* </code>
*/
public class PythonInterpreterPandasSqlTest {
private InterpreterGroup intpGroup;
private PythonInterpreterPandasSql sql;
private PythonInterpreter python;
private InterpreterContext context;
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("zeppelin.python", "python");
p.setProperty("zeppelin.python.maxResult", "100");
intpGroup = new InterpreterGroup();
python = new PythonInterpreter(p);
python.setInterpreterGroup(intpGroup);
python.open();
sql = new PythonInterpreterPandasSql(p);
sql.setInterpreterGroup(intpGroup);
intpGroup.put("note", Arrays.asList(python, sql));
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
//important to be last step
sql.open();
//it depends on python interpreter presence in the same group
}
@Test
public void dependenciesAreInstalled() {
InterpreterResult ret = python.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
public void errorMessageIfDependenciesNotInstalled() {
InterpreterResult ret;
// given
ret = python.interpret(
"pysqldf = lambda q: print('Can not execute SQL as Python dependency is not installed')",
context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
// when
ret = sql.interpret("SELECT * from something", context);
// then
assertNotNull(ret);
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertTrue(ret.message().get(0).getData().contains("dependency is not installed"));
}
@Test
public void sqlOverTestDataPrintsTable() {
InterpreterResult ret;
// given
//String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34";
ret = python.interpret("import pandas as pd", context);
ret = python.interpret("import numpy as np", context);
// DataFrame df2 \w test data
ret = python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), "+
"'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
//when
ret = sql.interpret("select name, age from df2 where age < 40", context);
//then
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
//assertEquals(expectedTable, ret.message()); //somehow it's same but not equal
assertTrue(ret.message().get(0).getData().indexOf("moon\t33") > 0);
assertTrue(ret.message().get(0).getData().indexOf("park\t34") > 0);
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from df2", context).code());
}
@Test
public void badSqlSyntaxFails() {
//when
InterpreterResult ret = sql.interpret("select wrong syntax", context);
//then
assertNotNull("Interpreter returned 'null'", ret);
//System.out.println("\nInterpreter response: \n" + ret.message());
assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().get(0).getData().length() > 0);
}
@Test
public void showDataFrame() {
InterpreterResult ret;
ret = python.interpret("import pandas as pd", context);
ret = python.interpret("import numpy as np", context);
// given a Pandas DataFrame with an index and non-text data
ret = python.interpret("index = pd.Index([10, 11, 12, 13], name='index_name')", context);
ret = python.interpret("d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
ret = python.interpret("df1 = pd.DataFrame(d1, index=index)", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
// when
ret = python.interpret("z.show(df1, show_index=True)", context);
// then
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
assertTrue(ret.message().get(0).getData().indexOf("index_name") == 0);
assertTrue(ret.message().get(0).getData().indexOf("13") > 0);
assertTrue(ret.message().get(0).getData().indexOf("nan") > 0);
assertTrue(ret.message().get(0).getData().indexOf("6.7") > 0);
}
}

View file

@ -1,283 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import static org.apache.zeppelin.python.PythonInterpreter.DEFAULT_ZEPPELIN_PYTHON;
import static org.apache.zeppelin.python.PythonInterpreter.MAX_RESULT;
import static org.apache.zeppelin.python.PythonInterpreter.ZEPPELIN_PYTHON;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Python interpreter unit test
*
* Important: ALL tests here DO NOT REQUIRE Python to be installed
* If Python dependency is required, please look at PythonInterpreterWithPythonInstalledTest
*/
public class PythonInterpreterTest {
private static final Logger LOG = LoggerFactory.getLogger(PythonProcess.class);
PythonInterpreter zeppelinPythonInterpreter = null;
PythonInterpreter pythonInterpreter = null;
PythonProcess mockPythonProcess;
String cmdHistory;
public static Properties getPythonTestProperties() {
Properties p = new Properties();
p.setProperty(ZEPPELIN_PYTHON, DEFAULT_ZEPPELIN_PYTHON);
p.setProperty(MAX_RESULT, "1000");
return p;
}
@Before
public void beforeTest() throws IOException {
cmdHistory = "";
/*Mock python process*/
mockPythonProcess = mock(PythonProcess.class);
when(mockPythonProcess.getPid()).thenReturn(1L);
when(mockPythonProcess.sendAndGetResult(anyString())).thenAnswer(new Answer<String>() {
@Override public String answer(InvocationOnMock invocationOnMock) throws Throwable {
return answerFromPythonMock(invocationOnMock);
}
});
// python interpreter
pythonInterpreter = spy(new PythonInterpreter(getPythonTestProperties()));
zeppelinPythonInterpreter = new PythonInterpreter(getPythonTestProperties());
// create interpreter group
InterpreterGroup group = new InterpreterGroup();
group.put("note", new LinkedList<Interpreter>());
group.get("note").add(pythonInterpreter);
pythonInterpreter.setInterpreterGroup(group);
when(pythonInterpreter.getPythonProcess()).thenReturn(mockPythonProcess);
when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("ImportError");
}
@After
public void afterTest() throws IOException {
pythonInterpreter.close();
zeppelinPythonInterpreter.close();
}
@Test
public void testOpenInterpreter() {
pythonInterpreter.open();
assertEquals(pythonInterpreter.getPythonProcess().getPid(), 1);
}
/**
* If Py4J is not installed, bootstrap_input.py
* is not sent to Python process and py4j JavaGateway is not running
*/
@Test
public void testPy4jIsNotInstalled() {
pythonInterpreter.open();
assertNull(pythonInterpreter.getPy4jPort());
assertTrue(cmdHistory.contains("def help()"));
assertTrue(cmdHistory.contains("class PyZeppelinContext(object):"));
assertTrue(cmdHistory.contains("z = PyZeppelinContext"));
assertTrue(cmdHistory.contains("def show"));
assertFalse(cmdHistory.contains("GatewayClient"));
}
/**
* If Py4J installed, bootstrap_input.py
* is sent to interpreter and JavaGateway is running
*/
@Test
public void testPy4jInstalled() throws IOException, InterruptedException {
when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("");
pythonInterpreter.open();
Integer py4jPort = pythonInterpreter.getPy4jPort();
assertNotNull(py4jPort);
assertTrue(cmdHistory.contains("def help()"));
assertTrue(cmdHistory.contains("class PyZeppelinContext(object):"));
assertTrue(cmdHistory.contains("z = Py4jZeppelinContext"));
assertTrue(cmdHistory.contains("def show"));
assertTrue(cmdHistory.contains("GatewayClient(port=" + py4jPort + ")"));
assertTrue(cmdHistory.contains("org.apache.zeppelin.display.Input"));
assertTrue(serverIsListeningOn(py4jPort));
pythonInterpreter.close();
TimeUnit.MILLISECONDS.sleep(100);
assertFalse(serverIsListeningOn(py4jPort));
}
@Test
public void testClose() throws IOException, InterruptedException {
//given: py4j is installed
when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("");
pythonInterpreter.open();
Integer py4jPort = pythonInterpreter.getPy4jPort();
assertNotNull(py4jPort);
//when
pythonInterpreter.close();
TimeUnit.MILLISECONDS.sleep(100);
//then
assertFalse(serverIsListeningOn(py4jPort));
verify(mockPythonProcess, times(1)).close();
}
@Test
public void testInterpret() {
pythonInterpreter.open();
cmdHistory = "";
InterpreterResult result = pythonInterpreter.interpret("print a", null);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("%text print a", result.message().get(0).toString());
}
@Test
public void testInterpretInvalidSyntax() {
zeppelinPythonInterpreter.open();
InterpreterResult result = zeppelinPythonInterpreter.interpret("for x in range(0,3): print (\"hi\")\n\nz._displayhook()", null);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).toString().contains("hi\nhi\nhi"));
result = zeppelinPythonInterpreter.interpret("for x in range(0,3): print (\"hi\")\nz._displayhook()", null);
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertTrue(result.message().get(0).toString().contains("SyntaxError: invalid syntax"));
}
/**
* Checks if given port is open on 'localhost'
* @param port
*/
private boolean serverIsListeningOn(Integer port) {
Socket s = new Socket();
boolean serverIsListening = false;
int retryCount = 0;
boolean connected = false;
while (connected = tryToConnect(s, port) && retryCount < 10) {
serverIsListening = connected;
tryToClose(s);
retryCount++;
s = new Socket();
}
return serverIsListening;
}
private boolean tryToConnect(Socket s, Integer port) {
boolean connected = false;
SocketAddress sa = new InetSocketAddress("localhost", port);
try {
s.connect(sa, 10000);
connected = true;
} catch (IOException e) {
//LOG.warn("Can't open connection to " + sa, e);
}
return connected;
}
private void tryToClose(Socket s) {
try {
s.close();
} catch (IOException e) {
LOG.error("Can't close connection to " + s.getInetAddress(), e);
}
}
private String answerFromPythonMock(InvocationOnMock invocationOnMock) {
Object[] inputs = invocationOnMock.getArguments();
String cmdToExecute = (String) inputs[0];
if (cmdToExecute != null) {
cmdHistory += cmdToExecute;
String[] lines = cmdToExecute.split("\\n");
String output = "";
for (int i = 0; i < lines.length; i++) {
output += lines[i];
}
return output;
} else {
return "";
}
}
@Test
public void checkMultiRowErrorFails() {
PythonInterpreter pythonInterpreter = new PythonInterpreter(
PythonInterpreterTest.getPythonTestProperties()
);
// create interpreter group
InterpreterGroup group = new InterpreterGroup();
group.put("note", new LinkedList<Interpreter>());
group.get("note").add(pythonInterpreter);
pythonInterpreter.setInterpreterGroup(group);
pythonInterpreter.open();
String codeRaiseException = "raise Exception(\"test exception\")";
InterpreterResult ret = pythonInterpreter.interpret(codeRaiseException, null);
assertNotNull("Interpreter result for raise exception is Null", ret);
System.err.println("ret = '" + ret + "'");
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().get(0).getData().length() > 0);
assertNotNull("Interpreter result for text is Null", ret);
String codePrintText = "print (\"Exception(\\\"test exception\\\")\")";
ret = pythonInterpreter.interpret(codePrintText, null);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertTrue(ret.message().get(0).getData().length() > 0);
}
}

View file

@ -1,125 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.Test;
import java.util.Arrays;
/**
* Python interpreter unit test that user real Python
*
* Important: ALL tests here REQUIRE Python to be installed
* They are excluded from default build, to run them manually do:
*
* <code>
* mvn "-Dtest=org.apache.zeppelin.python.PythonInterpreterWithPythonInstalledTest" test -pl python
* </code>
*
* or
* <code>
* mvn -Dpython.test.exclude='' test -pl python -am
* </code>
*/
public class PythonInterpreterWithPythonInstalledTest {
@Test
public void badPythonSyntaxFails() {
//given
PythonInterpreter realPython = new PythonInterpreter(
PythonInterpreterTest.getPythonTestProperties());
// create interpreter group
InterpreterGroup group = new InterpreterGroup();
group.put("note", Arrays.asList((Interpreter) realPython));
realPython.setInterpreterGroup(group);
realPython.open();
//when
InterpreterResult ret = realPython.interpret("select wrong syntax", null);
//then
assertNotNull("Interpreter returned 'null'", ret);
//System.out.println("\nInterpreter response: \n" + ret.message());
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().get(0).getData().length() > 0);
realPython.close();
}
@Test
public void goodPythonSyntaxRuns() {
//given
PythonInterpreter realPython = new PythonInterpreter(
PythonInterpreterTest.getPythonTestProperties());
InterpreterGroup group = new InterpreterGroup();
group.put("note", Arrays.asList((Interpreter) realPython));
realPython.setInterpreterGroup(group);
realPython.open();
//when
InterpreterResult ret = realPython.interpret("help()", null);
//then
assertNotNull("Interpreter returned 'null'", ret);
//System.out.println("\nInterpreter response: \n" + ret.message());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertTrue(ret.message().get(0).getData().length() > 0);
realPython.close();
}
@Test
public void testZeppelin1555() {
//given
PythonInterpreter realPython = new PythonInterpreter(
PythonInterpreterTest.getPythonTestProperties());
InterpreterGroup group = new InterpreterGroup();
group.put("note", Arrays.asList((Interpreter) realPython));
realPython.setInterpreterGroup(group);
realPython.open();
//when
InterpreterResult ret1 = realPython.interpret("print(\"...\")", null);
//then
//System.out.println("\nInterpreter response: \n" + ret.message());
assertEquals(InterpreterResult.Code.SUCCESS, ret1.code());
assertEquals("...\n", ret1.message().get(0).getData());
InterpreterResult ret2 = realPython.interpret("for i in range(5):", null);
//then
//System.out.println("\nInterpreterResultterpreter response: \n" + ret2.message());
assertEquals(InterpreterResult.Code.ERROR, ret2.code());
assertEquals(" File \"<stdin>\", line 2\n" +
" \n" +
" ^\n" +
"IndentationError: expected an indented block\n", ret2.message().get(0).getData());
realPython.close();
}
}