mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
initialize python interpreter using py4j
This commit is contained in:
parent
252055571b
commit
7304919def
16 changed files with 560 additions and 1851 deletions
|
|
@ -350,6 +350,7 @@ public class PythonCondaInterpreter extends Interpreter {
|
|||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
PythonInterpreter pythonInterpreter = getPythonInterpreter();
|
||||
|
||||
if (pythonInterpreter != null) {
|
||||
return pythonInterpreter.getScheduler();
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -1,175 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.Properties;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Helps run python interpreter on a docker container
|
||||
*/
|
||||
public class PythonDockerInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(PythonDockerInterpreter.class);
|
||||
Pattern activatePattern = Pattern.compile("activate\\s*(.*)");
|
||||
Pattern deactivatePattern = Pattern.compile("deactivate");
|
||||
Pattern helpPattern = Pattern.compile("help");
|
||||
|
||||
public PythonDockerInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
InterpreterOutput out = context.out;
|
||||
|
||||
Matcher activateMatcher = activatePattern.matcher(st);
|
||||
Matcher deactivateMatcher = deactivatePattern.matcher(st);
|
||||
Matcher helpMatcher = helpPattern.matcher(st);
|
||||
|
||||
if (st == null || st.isEmpty() || helpMatcher.matches()) {
|
||||
printUsage(out);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
|
||||
} else if (activateMatcher.matches()) {
|
||||
String image = activateMatcher.group(1);
|
||||
pull(out, image);
|
||||
setPythonCommand("docker run -i --rm " + image + " python -iu");
|
||||
restartPythonProcess();
|
||||
out.clear();
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated");
|
||||
} else if (deactivateMatcher.matches()) {
|
||||
setPythonCommand(null);
|
||||
restartPythonProcess();
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Deactivated");
|
||||
} else {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Not supported command: " + st);
|
||||
}
|
||||
}
|
||||
|
||||
public void setPythonCommand(String cmd) {
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
python.setPythonCommand(cmd);
|
||||
}
|
||||
|
||||
private void printUsage(InterpreterOutput out) {
|
||||
try {
|
||||
out.setType(InterpreterResult.Type.HTML);
|
||||
out.writeResource("output_templates/docker_usage.html");
|
||||
} catch (IOException e) {
|
||||
logger.error("Can't print usage", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.NONE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use python interpreter's scheduler.
|
||||
* To make sure %python.docker paragraph and %python paragraph runs sequentially
|
||||
*/
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
PythonInterpreter pythonInterpreter = getPythonInterpreter();
|
||||
if (pythonInterpreter != null) {
|
||||
return pythonInterpreter.getScheduler();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void restartPythonProcess() {
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
python.close();
|
||||
python.open();
|
||||
}
|
||||
|
||||
protected PythonInterpreter getPythonInterpreter() {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
PythonInterpreter python = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
python = (PythonInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return python;
|
||||
}
|
||||
|
||||
public boolean pull(InterpreterOutput out, String image) {
|
||||
int exit = 0;
|
||||
try {
|
||||
exit = runCommand(out, "docker", "pull", image);
|
||||
} catch (IOException | InterruptedException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
return exit == 0;
|
||||
}
|
||||
|
||||
protected int runCommand(InterpreterOutput out, String... command)
|
||||
throws IOException, InterruptedException {
|
||||
ProcessBuilder builder = new ProcessBuilder(command);
|
||||
builder.redirectErrorStream(true);
|
||||
Process process = builder.start();
|
||||
InputStream stdout = process.getInputStream();
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
|
||||
String line;
|
||||
while ((line = br.readLine()) != null) {
|
||||
out.write(line + "\n");
|
||||
}
|
||||
int r = process.waitFor(); // Let the process finish.
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
|
@ -17,21 +17,36 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.PipedInputStream;
|
||||
import java.io.PipedOutputStream;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteResultHandler;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -43,28 +58,110 @@ import py4j.GatewayServer;
|
|||
/**
|
||||
* Python interpreter for Zeppelin.
|
||||
*/
|
||||
public class PythonInterpreter extends Interpreter {
|
||||
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class);
|
||||
|
||||
public static final String BOOTSTRAP_PY = "/bootstrap.py";
|
||||
public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py";
|
||||
public static final String ZEPPELIN_PYTHON = "zeppelin.python";
|
||||
public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py";
|
||||
public static final String DEFAULT_ZEPPELIN_PYTHON = "python";
|
||||
public static final String MAX_RESULT = "zeppelin.python.maxResult";
|
||||
|
||||
private Integer port;
|
||||
private GatewayServer gatewayServer;
|
||||
private Boolean py4JisInstalled = false;
|
||||
private InterpreterContext context;
|
||||
private Pattern errorInLastLine = Pattern.compile(".*(Error|Exception): .*$");
|
||||
private String pythonPath;
|
||||
private int maxResult;
|
||||
|
||||
PythonProcess process = null;
|
||||
private String pythonCommand = null;
|
||||
private String pythonCommand = DEFAULT_ZEPPELIN_PYTHON;
|
||||
|
||||
private GatewayServer gatewayServer;
|
||||
private DefaultExecutor executor;
|
||||
private int port;
|
||||
private InterpreterOutputStream outputStream;
|
||||
private BufferedWriter ins;
|
||||
private PipedInputStream in;
|
||||
private ByteArrayOutputStream input;
|
||||
private String scriptPath;
|
||||
boolean pythonscriptRunning = false;
|
||||
private static final int MAX_TIMEOUT_SEC = 10;
|
||||
|
||||
private long pythonPid = 0;
|
||||
|
||||
Integer statementSetNotifier = new Integer(0);
|
||||
|
||||
public PythonInterpreter(Properties property) {
|
||||
super(property);
|
||||
try {
|
||||
File scriptFile = File.createTempFile("zeppelin_python-", ".py");
|
||||
scriptPath = scriptFile.getAbsolutePath();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void createPythonScript() {
|
||||
ClassLoader classLoader = getClass().getClassLoader();
|
||||
File out = new File(scriptPath);
|
||||
|
||||
if (out.exists() && out.isDirectory()) {
|
||||
throw new InterpreterException("Can't create python script " + out.getAbsolutePath());
|
||||
}
|
||||
|
||||
try {
|
||||
FileOutputStream outStream = new FileOutputStream(out);
|
||||
IOUtils.copy(
|
||||
classLoader.getResourceAsStream(ZEPPELIN_PYTHON),
|
||||
outStream);
|
||||
outStream.close();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
logger.info("File {} created", scriptPath);
|
||||
}
|
||||
|
||||
private void createGatewayServerAndStartScript() {
|
||||
// create python script
|
||||
createPythonScript();
|
||||
|
||||
port = findRandomOpenPortOnAllLocalInterfaces();
|
||||
|
||||
gatewayServer = new GatewayServer(this, port);
|
||||
gatewayServer.start();
|
||||
|
||||
// Run python shell
|
||||
CommandLine cmd = CommandLine.parse(getPythonCommand());
|
||||
cmd.addArgument(scriptPath, false);
|
||||
cmd.addArgument(Integer.toString(port), false);
|
||||
executor = new DefaultExecutor();
|
||||
outputStream = new InterpreterOutputStream(logger);
|
||||
PipedOutputStream ps = new PipedOutputStream();
|
||||
in = null;
|
||||
try {
|
||||
in = new PipedInputStream(ps);
|
||||
} catch (IOException e1) {
|
||||
throw new InterpreterException(e1);
|
||||
}
|
||||
ins = new BufferedWriter(new OutputStreamWriter(ps));
|
||||
input = new ByteArrayOutputStream();
|
||||
|
||||
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in);
|
||||
executor.setStreamHandler(streamHandler);
|
||||
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
|
||||
|
||||
try {
|
||||
Map env = EnvironmentUtils.getProcEnvironment();
|
||||
env.put("PYTHONPATH", "/Users/shim/zeppelin/interpreter/python/py4j-0.8.2.1-src.zip");
|
||||
executor.execute(cmd, env, this);
|
||||
pythonscriptRunning = true;
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
input.write("import sys, getopt\n".getBytes());
|
||||
ins.flush();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -72,83 +169,188 @@ public class PythonInterpreter extends Interpreter {
|
|||
// Add matplotlib display hook
|
||||
InterpreterGroup intpGroup = getInterpreterGroup();
|
||||
if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
|
||||
registerHook(HookType.POST_EXEC_DEV, "\nz._displayhook()");
|
||||
registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
|
||||
}
|
||||
|
||||
// Add zeppelin-bundled libs to PYTHONPATH
|
||||
setPythonPath("../interpreter/lib/python:$PYTHONPATH");
|
||||
LOG.info("Starting Python interpreter ---->");
|
||||
LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON));
|
||||
|
||||
maxResult = Integer.valueOf(getProperty(MAX_RESULT));
|
||||
process = getPythonProcess();
|
||||
|
||||
try {
|
||||
process.open();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't start the python process", e);
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.info("python PID : " + process.getPid());
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Can't find python pid process", e);
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.info("Bootstrap interpreter with " + BOOTSTRAP_PY);
|
||||
bootStrapInterpreter(BOOTSTRAP_PY);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e);
|
||||
}
|
||||
|
||||
py4JisInstalled = isPy4jInstalled();
|
||||
if (py4JisInstalled) {
|
||||
port = findRandomOpenPortOnAllLocalInterfaces();
|
||||
LOG.info("Py4j gateway port : " + port);
|
||||
try {
|
||||
gatewayServer = new GatewayServer(this, port);
|
||||
gatewayServer.start();
|
||||
LOG.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY);
|
||||
bootStrapInterpreter(BOOTSTRAP_INPUT_PY);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " +
|
||||
"initialize Zeppelin inputs in python process", e);
|
||||
}
|
||||
}
|
||||
// Add matplotlib display hook
|
||||
createGatewayServerAndStartScript();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
LOG.info("closing Python interpreter <----");
|
||||
pythonscriptRunning = false;
|
||||
pythonScriptInitialized = false;
|
||||
|
||||
try {
|
||||
if (process != null) {
|
||||
process.close();
|
||||
process = null;
|
||||
}
|
||||
if (gatewayServer != null) {
|
||||
gatewayServer.shutdown();
|
||||
}
|
||||
ins.flush();
|
||||
ins.close();
|
||||
input.flush();
|
||||
input.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't close the interpreter", e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
executor.getWatchdog().destroyProcess();
|
||||
new File(scriptPath).delete();
|
||||
gatewayServer.shutdown();
|
||||
|
||||
// wait until getStatements stop
|
||||
synchronized (statementSetNotifier) {
|
||||
try {
|
||||
statementSetNotifier.wait(1500);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
statementSetNotifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
PythonInterpretRequest pythonInterpretRequest = null;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PythonInterpretRequest {
|
||||
public String statements;
|
||||
public String jobGroup;
|
||||
|
||||
public PythonInterpretRequest(String statements, String jobGroup) {
|
||||
this.statements = statements;
|
||||
this.jobGroup = jobGroup;
|
||||
}
|
||||
|
||||
public String statements() {
|
||||
return statements;
|
||||
}
|
||||
|
||||
public String jobGroup() {
|
||||
return jobGroup;
|
||||
}
|
||||
}
|
||||
|
||||
public PythonInterpretRequest getStatements() {
|
||||
synchronized (statementSetNotifier) {
|
||||
|
||||
while (pythonInterpretRequest == null && pythonscriptRunning && pythonScriptInitialized) {
|
||||
try {
|
||||
logger.info("astro statementSetNotifier waiting... : {}, {}, {}",
|
||||
statementSetNotifier, pythonscriptRunning, pythonScriptInitialized);
|
||||
|
||||
statementSetNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
PythonInterpretRequest req = pythonInterpretRequest;
|
||||
pythonInterpretRequest = null;
|
||||
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
||||
String statementOutput = null;
|
||||
boolean statementError = false;
|
||||
Integer statementFinishedNotifier = new Integer(0);
|
||||
|
||||
public void setStatementsFinished(String out, boolean error) {
|
||||
synchronized (statementFinishedNotifier) {
|
||||
statementOutput = out;
|
||||
statementError = error;
|
||||
statementFinishedNotifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
boolean pythonScriptInitialized = false;
|
||||
Integer pythonScriptInitializeNotifier = new Integer(0);
|
||||
|
||||
public void onPythonScriptInitialized(long pid) {
|
||||
pythonPid = pid;
|
||||
synchronized (pythonScriptInitializeNotifier) {
|
||||
pythonScriptInitialized = true;
|
||||
pythonScriptInitializeNotifier.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public void appendOutput(String message) throws IOException {
|
||||
outputStream.getInterpreterOutput().write(message);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
if (cmd == null || cmd.isEmpty()) {
|
||||
return new InterpreterResult(Code.SUCCESS, "");
|
||||
}
|
||||
this.context = contextInterpreter;
|
||||
String output = sendCommandToPython(cmd);
|
||||
|
||||
InterpreterResult result;
|
||||
if (pythonErrorIn(output)) {
|
||||
result = new InterpreterResult(Code.ERROR, output.replaceAll("\\.\\.\\.", ""));
|
||||
} else {
|
||||
result = new InterpreterResult(Code.SUCCESS, output);
|
||||
this.context = contextInterpreter;
|
||||
|
||||
if (!pythonscriptRunning) {
|
||||
return new InterpreterResult(Code.ERROR, "python process not running"
|
||||
+ outputStream.toString());
|
||||
}
|
||||
|
||||
outputStream.setInterpreterOutput(context.out);
|
||||
|
||||
synchronized (pythonScriptInitializeNotifier) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (pythonScriptInitialized == false
|
||||
&& pythonscriptRunning
|
||||
&& System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
|
||||
try {
|
||||
pythonScriptInitializeNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<InterpreterResultMessage> errorMessage;
|
||||
try {
|
||||
context.out.flush();
|
||||
errorMessage = context.out.toInterpreterResultMessage();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
if (pythonscriptRunning == false) {
|
||||
// python script failed to initialize and terminated
|
||||
errorMessage.add(new InterpreterResultMessage(
|
||||
InterpreterResult.Type.TEXT, "failed to start python"));
|
||||
return new InterpreterResult(Code.ERROR, errorMessage);
|
||||
}
|
||||
if (pythonScriptInitialized == false) {
|
||||
// timeout. didn't get initialized message
|
||||
errorMessage.add(new InterpreterResultMessage(
|
||||
InterpreterResult.Type.TEXT, "python is not responding"));
|
||||
return new InterpreterResult(Code.ERROR, errorMessage);
|
||||
}
|
||||
|
||||
pythonInterpretRequest = new PythonInterpretRequest(cmd, null);
|
||||
statementOutput = null;
|
||||
|
||||
synchronized (statementSetNotifier) {
|
||||
statementSetNotifier.notify();
|
||||
}
|
||||
|
||||
synchronized (statementFinishedNotifier) {
|
||||
while (statementOutput == null) {
|
||||
try {
|
||||
statementFinishedNotifier.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (statementError) {
|
||||
return new InterpreterResult(Code.ERROR, statementOutput);
|
||||
} else {
|
||||
|
||||
try {
|
||||
context.out.flush();
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -171,12 +373,22 @@ public class PythonInterpreter extends Interpreter {
|
|||
return isError;
|
||||
}
|
||||
|
||||
public void interrupt() throws IOException {
|
||||
if (pythonPid > -1) {
|
||||
logger.info("Sending SIGINT signal to PID : " + pythonPid);
|
||||
Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
|
||||
} else {
|
||||
logger.warn("Non UNIX/Linux system, close the interpreter");
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
try {
|
||||
process.interrupt();
|
||||
interrupt();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't interrupt the python interpreter", e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -205,18 +417,6 @@ public class PythonInterpreter extends Interpreter {
|
|||
this.pythonPath = pythonPath;
|
||||
}
|
||||
|
||||
public PythonProcess getPythonProcess() {
|
||||
if (process == null) {
|
||||
String binPath = getProperty(ZEPPELIN_PYTHON);
|
||||
if (pythonCommand != null) {
|
||||
binPath = pythonCommand;
|
||||
}
|
||||
return new PythonProcess(binPath, pythonPath);
|
||||
} else {
|
||||
return process;
|
||||
}
|
||||
}
|
||||
|
||||
public void setPythonCommand(String cmd) {
|
||||
pythonCommand = cmd;
|
||||
}
|
||||
|
|
@ -237,41 +437,6 @@ public class PythonInterpreter extends Interpreter {
|
|||
return foundJob;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sends given text to Python interpreter, blocks and returns the output
|
||||
* @param cmd Python expression text
|
||||
* @return output
|
||||
*/
|
||||
String sendCommandToPython(String cmd) {
|
||||
String output = "";
|
||||
LOG.debug("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd));
|
||||
try {
|
||||
output = process.sendAndGetResult(cmd);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error when sending commands to python process", e);
|
||||
}
|
||||
LOG.debug("Got : \n" + output);
|
||||
return output;
|
||||
}
|
||||
|
||||
void bootStrapInterpreter(String file) throws IOException {
|
||||
BufferedReader bootstrapReader = new BufferedReader(
|
||||
new InputStreamReader(
|
||||
PythonInterpreter.class.getResourceAsStream(file)));
|
||||
String line = null;
|
||||
String bootstrapCode = "";
|
||||
|
||||
while ((line = bootstrapReader.readLine()) != null) {
|
||||
bootstrapCode += line + "\n";
|
||||
}
|
||||
if (py4JisInstalled && port != null && port != -1) {
|
||||
bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString());
|
||||
}
|
||||
LOG.info("Bootstrap python interpreter with code from \n " + file);
|
||||
sendCommandToPython(bootstrapCode);
|
||||
}
|
||||
|
||||
public GUI getGui() {
|
||||
return context.getGui();
|
||||
}
|
||||
|
|
@ -280,11 +445,6 @@ public class PythonInterpreter extends Interpreter {
|
|||
return port;
|
||||
}
|
||||
|
||||
public Boolean isPy4jInstalled() {
|
||||
String output = sendCommandToPython("\n\nimport py4j\n");
|
||||
return !output.contains("ImportError");
|
||||
}
|
||||
|
||||
private int findRandomOpenPortOnAllLocalInterfaces() {
|
||||
Integer port = -1;
|
||||
try (ServerSocket socket = new ServerSocket(0);) {
|
||||
|
|
@ -299,4 +459,16 @@ public class PythonInterpreter extends Interpreter {
|
|||
public int getMaxResult() {
|
||||
return maxResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessComplete(int exitValue) {
|
||||
pythonscriptRunning = false;
|
||||
logger.info("python process terminated. exit code " + exitValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
pythonscriptRunning = false;
|
||||
logger.error("python process failed", e);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,115 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* SQL over Pandas DataFrame interpreter for %python group
|
||||
*
|
||||
* Match experience of %sparpk.sql over Spark DataFrame
|
||||
*/
|
||||
public class PythonInterpreterPandasSql extends Interpreter {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterPandasSql.class);
|
||||
|
||||
private String SQL_BOOTSTRAP_FILE_PY = "/bootstrap_sql.py";
|
||||
|
||||
public PythonInterpreterPandasSql(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
PythonInterpreter getPythonInterpreter() {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
PythonInterpreter python = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
python = (PythonInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return python;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
LOG.info("Open Python SQL interpreter instance: {}", this.toString());
|
||||
try {
|
||||
LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
python.bootStrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if Python dependencies pandas and pandasql are installed
|
||||
* @return True if they are
|
||||
*/
|
||||
boolean isPandasAndPandasqlInstalled() {
|
||||
PythonInterpreter python = getPythonInterpreter();
|
||||
String output = python.sendCommandToPython("\n\nimport pandas\nimport pandasql\n");
|
||||
return !output.contains("ImportError");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
LOG.info("Close Python SQL interpreter instance: {}", this.toString());
|
||||
Interpreter python = getPythonInterpreter();
|
||||
python.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
|
||||
Interpreter python = getPythonInterpreter();
|
||||
return python.interpret("z.show(pysqldf('" + st + "'))", context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,138 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
/**
|
||||
* Object encapsulated interactive
|
||||
* Python process (REPL) used by python interpreter
|
||||
*/
|
||||
public class PythonProcess {
|
||||
private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class);
|
||||
private static final String STATEMENT_END = "*!?flush reader!?*";
|
||||
InputStream stdout;
|
||||
OutputStream stdin;
|
||||
PrintWriter writer;
|
||||
BufferedReader reader;
|
||||
Process process;
|
||||
|
||||
private String binPath;
|
||||
private String pythonPath;
|
||||
private long pid;
|
||||
|
||||
public PythonProcess(String binPath, String pythonPath) {
|
||||
this.binPath = binPath;
|
||||
this.pythonPath = pythonPath;
|
||||
}
|
||||
|
||||
public void open() throws IOException {
|
||||
ProcessBuilder builder;
|
||||
boolean hasParams = binPath.split(" ").length > 1;
|
||||
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
|
||||
if (hasParams) {
|
||||
builder = new ProcessBuilder(binPath.split(" "));
|
||||
} else {
|
||||
builder = new ProcessBuilder(binPath, "-iu");
|
||||
}
|
||||
} else {
|
||||
String cmd;
|
||||
if (hasParams) {
|
||||
cmd = binPath;
|
||||
} else {
|
||||
cmd = binPath + " -iu";
|
||||
}
|
||||
builder = new ProcessBuilder("bash", "-c", cmd);
|
||||
if (pythonPath != null) {
|
||||
builder.environment().put("PYTHONPATH", pythonPath);
|
||||
}
|
||||
}
|
||||
|
||||
builder.redirectErrorStream(true);
|
||||
process = builder.start();
|
||||
stdout = process.getInputStream();
|
||||
stdin = process.getOutputStream();
|
||||
writer = new PrintWriter(stdin, true);
|
||||
reader = new BufferedReader(new InputStreamReader(stdout));
|
||||
try {
|
||||
pid = findPid();
|
||||
} catch (Exception e) {
|
||||
logger.warn("Can't find python pid process", e);
|
||||
pid = -1;
|
||||
}
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
process.destroy();
|
||||
reader.close();
|
||||
writer.close();
|
||||
stdin.close();
|
||||
stdout.close();
|
||||
}
|
||||
|
||||
public void interrupt() throws IOException {
|
||||
if (pid > -1) {
|
||||
logger.info("Sending SIGINT signal to PID : " + pid);
|
||||
Runtime.getRuntime().exec("kill -SIGINT " + pid);
|
||||
} else {
|
||||
logger.warn("Non UNIX/Linux system, close the interpreter");
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
public String sendAndGetResult(String cmd) throws IOException {
|
||||
writer.println(cmd);
|
||||
writer.println();
|
||||
writer.println("\"" + STATEMENT_END + "\"");
|
||||
StringBuilder output = new StringBuilder();
|
||||
String line = null;
|
||||
|
||||
while ((line = reader.readLine()) != null &&
|
||||
!line.contains(STATEMENT_END)) {
|
||||
logger.debug("Read line from python shell : " + line);
|
||||
output.append(line + "\n");
|
||||
}
|
||||
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
private long findPid() throws NoSuchFieldException, IllegalAccessException {
|
||||
long pid = -1;
|
||||
if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
|
||||
Field f = process.getClass().getDeclaredField("pid");
|
||||
f.setAccessible(true);
|
||||
pid = f.getLong(process);
|
||||
f.setAccessible(false);
|
||||
}
|
||||
return pid;
|
||||
}
|
||||
|
||||
public long getPid() {
|
||||
return pid;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
|
@ -1,234 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# PYTHON 2 / 3 compatibility :
|
||||
# bootstrap.py must be runnable with Python 2 or 3
|
||||
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
import base64
|
||||
import warnings
|
||||
from io import BytesIO
|
||||
try:
|
||||
from StringIO import StringIO
|
||||
except ImportError:
|
||||
from io import StringIO
|
||||
|
||||
def intHandler(signum, frame): # Set the signal handler
|
||||
print ("Paragraph interrupted")
|
||||
raise KeyboardInterrupt()
|
||||
|
||||
signal.signal(signal.SIGINT, intHandler)
|
||||
# set prompt as empty string so that java side don't need to remove the prompt.
|
||||
sys.ps1=""
|
||||
|
||||
def help():
|
||||
print("""%html
|
||||
<h2>Python Interpreter help</h2>
|
||||
|
||||
<h3>Python 2 & 3 compatibility</h3>
|
||||
<p>The interpreter is compatible with Python 2 & 3.<br/>
|
||||
To change Python version,
|
||||
change in the interpreter configuration the python to the
|
||||
desired version (example : python=/usr/bin/python3)</p>
|
||||
|
||||
<h3>Python modules</h3>
|
||||
<p>The interpreter can use all modules already installed
|
||||
(with pip, easy_install, etc)</p>
|
||||
|
||||
<h3>Forms</h3>
|
||||
You must install py4j in order to use
|
||||
the form feature (pip install py4j)
|
||||
<h4>Input form</h4>
|
||||
<pre>print (z.input("f1","defaultValue"))</pre>
|
||||
<h4>Selection form</h4>
|
||||
<pre>print(z.select("f2", [("o1","1"), ("o2","2")],2))</pre>
|
||||
<h4>Checkbox form</h4>
|
||||
<pre> print("".join(z.checkbox("f3", [("o1","1"), ("o2","2")],["1"])))</pre>')
|
||||
|
||||
<h3>Matplotlib graph</h3>
|
||||
<div>The interpreter can display matplotlib graph with
|
||||
the function z.show()</div>
|
||||
<div> You need to already have matplotlib module installed
|
||||
to use this functionality !</div><br/>
|
||||
<pre>import matplotlib.pyplot as plt
|
||||
plt.figure()
|
||||
(.. ..)
|
||||
z.show(plt)
|
||||
plt.close()
|
||||
</pre>
|
||||
<div><br/> z.show function can take optional parameters
|
||||
to adapt graph dimensions (width and height) and format (png or svg)</div>
|
||||
<div><b>example </b>:
|
||||
<pre>z.show(plt,width='50px
|
||||
z.show(plt,height='150px', fmt='svg') </pre></div>
|
||||
|
||||
<h3>Pandas DataFrame</h3>
|
||||
<div> You need to have Pandas module installed
|
||||
to use this functionality (pip install pandas) !</div><br/>
|
||||
<div>The interpreter can visualize Pandas DataFrame
|
||||
with the function z.show()
|
||||
<pre>
|
||||
import pandas as pd
|
||||
df = pd.read_csv("bank.csv", sep=";")
|
||||
z.show(df)
|
||||
</pre></div>
|
||||
|
||||
<h3>SQL over Pandas DataFrame</h3>
|
||||
<div> You need to have Pandas&Pandasql modules installed
|
||||
to use this functionality (pip install pandas pandasql) !</div><br/>
|
||||
|
||||
<div>Python interpreter group includes %sql interpreter that can query
|
||||
Pandas DataFrames using SQL and visualize results using Zeppelin Table Display System
|
||||
|
||||
<pre>
|
||||
%python
|
||||
import pandas as pd
|
||||
df = pd.read_csv("bank.csv", sep=";")
|
||||
</pre>
|
||||
<br />
|
||||
<pre>
|
||||
%python.sql
|
||||
%sql
|
||||
SELECT * from df LIMIT 5
|
||||
</pre>
|
||||
</div>
|
||||
""")
|
||||
|
||||
|
||||
class PyZeppelinContext(object):
|
||||
""" If py4j is detected, these class will be override
|
||||
with the implementation in bootstrap_input.py
|
||||
"""
|
||||
errorMsg = "You must install py4j Python module " \
|
||||
"(pip install py4j) to use Zeppelin dynamic forms features"
|
||||
|
||||
def __init__(self):
|
||||
self.max_result = 1000
|
||||
self._displayhook = lambda *args: None
|
||||
self._setup_matplotlib()
|
||||
|
||||
def input(self, name, defaultValue=""):
|
||||
print(self.errorMsg)
|
||||
|
||||
def select(self, name, options, defaultValue=""):
|
||||
print(self.errorMsg)
|
||||
|
||||
def checkbox(self, name, options, defaultChecked=[]):
|
||||
print(self.errorMsg)
|
||||
|
||||
def show(self, p, **kwargs):
|
||||
if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
|
||||
self.show_matplotlib(p, **kwargs)
|
||||
elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
|
||||
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
|
||||
# and so a dependency on pandas
|
||||
self.show_dataframe(p, **kwargs)
|
||||
elif hasattr(p, '__call__'):
|
||||
p() #error reporting
|
||||
|
||||
def show_dataframe(self, df, show_index=False, **kwargs):
|
||||
"""Pretty prints DF using Table Display System
|
||||
"""
|
||||
limit = len(df) > self.max_result
|
||||
header_buf = StringIO("")
|
||||
if show_index:
|
||||
idx_name = str(df.index.name) if df.index.name is not None else ""
|
||||
header_buf.write(idx_name + "\t")
|
||||
header_buf.write(str(df.columns[0]))
|
||||
for col in df.columns[1:]:
|
||||
header_buf.write("\t")
|
||||
header_buf.write(str(col))
|
||||
header_buf.write("\n")
|
||||
|
||||
body_buf = StringIO("")
|
||||
rows = df.head(self.max_result).values if limit else df.values
|
||||
index = df.index.values
|
||||
for idx, row in zip(index, rows):
|
||||
if show_index:
|
||||
body_buf.write("%html <strong>{}</strong>".format(idx))
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(row[0]))
|
||||
for cell in row[1:]:
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(cell))
|
||||
body_buf.write("\n")
|
||||
body_buf.seek(0); header_buf.seek(0)
|
||||
#TODO(bzz): fix it, so it shows red notice, as in Spark
|
||||
print("%table " + header_buf.read() + body_buf.read()) # +
|
||||
# ("\n<font color=red>Results are limited by {}.</font>" \
|
||||
# .format(self.max_result) if limit else "")
|
||||
#)
|
||||
body_buf.close(); header_buf.close()
|
||||
|
||||
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
|
||||
**kwargs):
|
||||
"""Matplotlib show function
|
||||
"""
|
||||
if fmt == "png":
|
||||
img = BytesIO()
|
||||
p.savefig(img, format=fmt)
|
||||
img_str = b"data:image/png;base64,"
|
||||
img_str += base64.b64encode(img.getvalue().strip())
|
||||
img_tag = "<img src={img} style='width={width};height:{height}'>"
|
||||
# Decoding is necessary for Python 3 compability
|
||||
img_str = img_str.decode("ascii")
|
||||
img_str = img_tag.format(img=img_str, width=width, height=height)
|
||||
elif fmt == "svg":
|
||||
img = StringIO()
|
||||
p.savefig(img, format=fmt)
|
||||
img_str = img.getvalue()
|
||||
else:
|
||||
raise ValueError("fmt must be 'png' or 'svg'")
|
||||
|
||||
html = "%html <div style='width:{width};height:{height}'>{img}<div>"
|
||||
print(html.format(width=width, height=height, img=img_str))
|
||||
img.close()
|
||||
|
||||
def configure_mpl(self, **kwargs):
|
||||
import mpl_config
|
||||
mpl_config.configure(**kwargs)
|
||||
|
||||
def _setup_matplotlib(self):
|
||||
# If we don't have matplotlib installed don't bother continuing
|
||||
try:
|
||||
import matplotlib
|
||||
except ImportError:
|
||||
return
|
||||
# Make sure custom backends are available in the PYTHONPATH
|
||||
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
|
||||
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
|
||||
if mpl_path not in sys.path:
|
||||
sys.path.append(mpl_path)
|
||||
|
||||
# Finally check if backend exists, and if so configure as appropriate
|
||||
try:
|
||||
matplotlib.use('module://backend_zinline')
|
||||
import backend_zinline
|
||||
|
||||
# Everything looks good so make config assuming that we are using
|
||||
# an inline backend
|
||||
self._displayhook = backend_zinline.displayhook
|
||||
self.configure_mpl(width=600, height=400, dpi=72,
|
||||
fontsize=10, interactive=True, format='png')
|
||||
except ImportError:
|
||||
# Fall back to Agg if no custom backend installed
|
||||
matplotlib.use('Agg')
|
||||
warnings.warn("Unable to load inline matplotlib backend, "
|
||||
"falling back to Agg")
|
||||
|
||||
|
||||
z = PyZeppelinContext()
|
||||
|
|
@ -1,58 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from py4j.java_gateway import JavaGateway
|
||||
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
|
||||
|
||||
|
||||
client = GatewayClient(port=%PORT%)
|
||||
gateway = JavaGateway(client)
|
||||
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
|
||||
|
||||
|
||||
class Py4jZeppelinContext(PyZeppelinContext):
|
||||
"""A context impl that uses Py4j to communicate to JVM
|
||||
"""
|
||||
def __init__(self, z):
|
||||
PyZeppelinContext.__init__(self)
|
||||
self.z = z
|
||||
self.paramOption = gateway.jvm.org.apache.zeppelin.display.Input.ParamOption
|
||||
self.javaList = gateway.jvm.java.util.ArrayList
|
||||
self.max_result = self.z.getMaxResult()
|
||||
|
||||
def input(self, name, defaultValue=""):
|
||||
return self.z.getGui().input(name, defaultValue)
|
||||
|
||||
def select(self, name, options, defaultValue=""):
|
||||
javaOptions = gateway.new_array(self.paramOption, len(options))
|
||||
i = 0
|
||||
for tuple in options:
|
||||
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
|
||||
i += 1
|
||||
return self.z.getGui().select(name, defaultValue, javaOptions)
|
||||
|
||||
def checkbox(self, name, options, defaultChecked=[]):
|
||||
javaOptions = gateway.new_array(self.paramOption, len(options))
|
||||
i = 0
|
||||
for tuple in options:
|
||||
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
|
||||
i += 1
|
||||
javaDefaultCheck = self.javaList()
|
||||
for check in defaultChecked:
|
||||
javaDefaultCheck.append(check)
|
||||
return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions)
|
||||
|
||||
|
||||
z = Py4jZeppelinContext(gateway.entry_point)
|
||||
|
|
@ -1,28 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Setup SQL over Pandas DataFrames
|
||||
# It requires next dependencies to be installed:
|
||||
# - pandas
|
||||
# - pandasql
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
try:
|
||||
from pandasql import sqldf
|
||||
pysqldf = lambda q: sqldf(q, globals())
|
||||
except ImportError:
|
||||
pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" +
|
||||
"Make sure 'pandas' and 'pandasql' libraries are installed")
|
||||
263
python/src/main/resources/python/zeppelin_python.py
Normal file
263
python/src/main/resources/python/zeppelin_python.py
Normal file
|
|
@ -0,0 +1,263 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
import os, sys, getopt, traceback, json, re
|
||||
|
||||
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
|
||||
from py4j.protocol import Py4JJavaError
|
||||
import warnings
|
||||
import ast
|
||||
import traceback
|
||||
import warnings
|
||||
import signal
|
||||
|
||||
|
||||
# for back compatibility
|
||||
|
||||
class Logger(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def write(self, message):
|
||||
intp.appendOutput(message)
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
|
||||
class PyZeppelinContext(object):
|
||||
""" If py4j is detected, these class will be override
|
||||
with the implementation in bootstrap_input.py
|
||||
"""
|
||||
errorMsg = "You must install py4j Python module " \
|
||||
"(pip install py4j) to use Zeppelin dynamic forms features"
|
||||
|
||||
def __init__(self):
|
||||
self.max_result = 1000
|
||||
self._displayhook = lambda *args: None
|
||||
self._setup_matplotlib()
|
||||
|
||||
def input(self, name, defaultValue=""):
|
||||
print(self.errorMsg)
|
||||
|
||||
def select(self, name, options, defaultValue=""):
|
||||
print(self.errorMsg)
|
||||
|
||||
def checkbox(self, name, options, defaultChecked=[]):
|
||||
print(self.errorMsg)
|
||||
|
||||
def show(self, p, **kwargs):
|
||||
if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
|
||||
self.show_matplotlib(p, **kwargs)
|
||||
elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
|
||||
# `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
|
||||
# and so a dependency on pandas
|
||||
self.show_dataframe(p, **kwargs)
|
||||
elif hasattr(p, '__call__'):
|
||||
p() #error reporting
|
||||
|
||||
def show_dataframe(self, df, show_index=False, **kwargs):
|
||||
"""Pretty prints DF using Table Display System
|
||||
"""
|
||||
limit = len(df) > self.max_result
|
||||
header_buf = StringIO("")
|
||||
if show_index:
|
||||
idx_name = str(df.index.name) if df.index.name is not None else ""
|
||||
header_buf.write(idx_name + "\t")
|
||||
header_buf.write(str(df.columns[0]))
|
||||
for col in df.columns[1:]:
|
||||
header_buf.write("\t")
|
||||
header_buf.write(str(col))
|
||||
header_buf.write("\n")
|
||||
|
||||
body_buf = StringIO("")
|
||||
rows = df.head(self.max_result).values if limit else df.values
|
||||
index = df.index.values
|
||||
for idx, row in zip(index, rows):
|
||||
if show_index:
|
||||
body_buf.write("%html <strong>{}</strong>".format(idx))
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(row[0]))
|
||||
for cell in row[1:]:
|
||||
body_buf.write("\t")
|
||||
body_buf.write(str(cell))
|
||||
body_buf.write("\n")
|
||||
body_buf.seek(0); header_buf.seek(0)
|
||||
#TODO(bzz): fix it, so it shows red notice, as in Spark
|
||||
print("%table " + header_buf.read() + body_buf.read()) # +
|
||||
# ("\n<font color=red>Results are limited by {}.</font>" \
|
||||
# .format(self.max_result) if limit else "")
|
||||
#)
|
||||
body_buf.close(); header_buf.close()
|
||||
|
||||
def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
|
||||
**kwargs):
|
||||
"""Matplotlib show function
|
||||
"""
|
||||
if fmt == "png":
|
||||
img = BytesIO()
|
||||
p.savefig(img, format=fmt)
|
||||
img_str = b"data:image/png;base64,"
|
||||
img_str += base64.b64encode(img.getvalue().strip())
|
||||
img_tag = "<img src={img} style='width={width};height:{height}'>"
|
||||
# Decoding is necessary for Python 3 compability
|
||||
img_str = img_str.decode("ascii")
|
||||
img_str = img_tag.format(img=img_str, width=width, height=height)
|
||||
elif fmt == "svg":
|
||||
img = StringIO()
|
||||
p.savefig(img, format=fmt)
|
||||
img_str = img.getvalue()
|
||||
else:
|
||||
raise ValueError("fmt must be 'png' or 'svg'")
|
||||
|
||||
html = "%html <div style='width:{width};height:{height}'>{img}<div>"
|
||||
print(html.format(width=width, height=height, img=img_str))
|
||||
img.close()
|
||||
|
||||
def configure_mpl(self, **kwargs):
|
||||
import mpl_config
|
||||
mpl_config.configure(**kwargs)
|
||||
|
||||
def _setup_matplotlib(self):
|
||||
# If we don't have matplotlib installed don't bother continuing
|
||||
try:
|
||||
import matplotlib
|
||||
except ImportError:
|
||||
return
|
||||
# Make sure custom backends are available in the PYTHONPATH
|
||||
rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
|
||||
mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
|
||||
if mpl_path not in sys.path:
|
||||
sys.path.append(mpl_path)
|
||||
|
||||
# Finally check if backend exists, and if so configure as appropriate
|
||||
try:
|
||||
matplotlib.use('module://backend_zinline')
|
||||
import backend_zinline
|
||||
|
||||
# Everything looks good so make config assuming that we are using
|
||||
# an inline backend
|
||||
self._displayhook = backend_zinline.displayhook
|
||||
self.configure_mpl(width=600, height=400, dpi=72,
|
||||
fontsize=10, interactive=True, format='png')
|
||||
except ImportError:
|
||||
# Fall back to Agg if no custom backend installed
|
||||
matplotlib.use('Agg')
|
||||
warnings.warn("Unable to load inline matplotlib backend, "
|
||||
"falling back to Agg")
|
||||
|
||||
|
||||
def handler_stop_signals(sig, frame):
|
||||
sys.exit("Got signal : " + str(sig))
|
||||
|
||||
|
||||
signal.signal(signal.SIGINT, handler_stop_signals)
|
||||
|
||||
output = Logger()
|
||||
sys.stdout = output
|
||||
sys.stderr = output
|
||||
|
||||
client = GatewayClient(port=int(sys.argv[1]))
|
||||
|
||||
#gateway = JavaGateway(client, auto_convert = True)
|
||||
gateway = JavaGateway(client)
|
||||
|
||||
intp = gateway.entry_point
|
||||
intp.onPythonScriptInitialized(os.getpid())
|
||||
|
||||
z = PyZeppelinContext()
|
||||
z._setup_matplotlib()
|
||||
|
||||
while True :
|
||||
req = intp.getStatements()
|
||||
try:
|
||||
stmts = req.statements().split("\n")
|
||||
jobGroup = req.jobGroup()
|
||||
final_code = []
|
||||
|
||||
# Get post-execute hooks
|
||||
try:
|
||||
global_hook = intp.getHook('post_exec_dev')
|
||||
except:
|
||||
global_hook = None
|
||||
|
||||
try:
|
||||
user_hook = z.getHook('post_exec')
|
||||
except:
|
||||
user_hook = None
|
||||
|
||||
nhooks = 0
|
||||
for hook in (global_hook, user_hook):
|
||||
if hook:
|
||||
nhooks += 1
|
||||
|
||||
for s in stmts:
|
||||
if s == None:
|
||||
continue
|
||||
|
||||
# skip comment
|
||||
s_stripped = s.strip()
|
||||
if len(s_stripped) == 0 or s_stripped.startswith("#"):
|
||||
continue
|
||||
|
||||
final_code.append(s)
|
||||
|
||||
if final_code:
|
||||
# use exec mode to compile the statements except the last statement,
|
||||
# so that the last statement's evaluation will be printed to stdout
|
||||
#sc.setJobGroup(jobGroup, "Zeppelin")
|
||||
code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
|
||||
|
||||
to_run_hooks = []
|
||||
if (nhooks > 0):
|
||||
to_run_hooks = code.body[-nhooks:]
|
||||
|
||||
to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
|
||||
[code.body[-(nhooks + 1)]])
|
||||
|
||||
try:
|
||||
for node in to_run_exec:
|
||||
mod = ast.Module([node])
|
||||
code = compile(mod, '<stdin>', 'exec')
|
||||
exec(code)
|
||||
|
||||
for node in to_run_single:
|
||||
mod = ast.Interactive([node])
|
||||
code = compile(mod, '<stdin>', 'single')
|
||||
exec(code)
|
||||
|
||||
for node in to_run_hooks:
|
||||
mod = ast.Module([node])
|
||||
code = compile(mod, '<stdin>', 'exec')
|
||||
exec(code)
|
||||
except:
|
||||
raise Exception(traceback.format_exc())
|
||||
|
||||
intp.setStatementsFinished("", False)
|
||||
except Py4JJavaError:
|
||||
excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
|
||||
innerErrorStart = excInnerError.find("Py4JJavaError:")
|
||||
if innerErrorStart > -1:
|
||||
excInnerError = excInnerError[innerErrorStart:]
|
||||
intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True)
|
||||
except:
|
||||
intp.setStatementsFinished(traceback.format_exc(), True)
|
||||
|
||||
output.reset()
|
||||
|
|
@ -1,135 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class PythonCondaInterpreterTest {
|
||||
private PythonCondaInterpreter conda;
|
||||
private PythonInterpreter python;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
conda = spy(new PythonCondaInterpreter(new Properties()));
|
||||
python = mock(PythonInterpreter.class);
|
||||
|
||||
InterpreterGroup group = new InterpreterGroup();
|
||||
group.put("note", Arrays.asList(python, conda));
|
||||
python.setInterpreterGroup(group);
|
||||
conda.setInterpreterGroup(group);
|
||||
|
||||
doReturn(python).when(conda).getPythonInterpreter();
|
||||
}
|
||||
|
||||
private void setMockCondaEnvList() throws IOException, InterruptedException {
|
||||
Map<String, String> envList = new LinkedHashMap<String, String>();
|
||||
envList.put("env1", "/path1");
|
||||
envList.put("env2", "/path2");
|
||||
doReturn(envList).when(conda).getCondaEnvs();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListEnv() throws IOException, InterruptedException {
|
||||
setMockCondaEnvList();
|
||||
|
||||
// list available env
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
InterpreterResult result = conda.interpret("env list", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
assertTrue(result.toString().contains(">env1<"));
|
||||
assertTrue(result.toString().contains("/path1<"));
|
||||
assertTrue(result.toString().contains(">env2<"));
|
||||
assertTrue(result.toString().contains("/path2<"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testActivateEnv() throws IOException, InterruptedException {
|
||||
setMockCondaEnvList();
|
||||
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
conda.interpret("activate env1", context);
|
||||
verify(python, times(1)).open();
|
||||
verify(python, times(1)).close();
|
||||
verify(python).setPythonCommand("/path1/bin/python");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeactivate() {
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
conda.interpret("deactivate", context);
|
||||
verify(python, times(1)).open();
|
||||
verify(python, times(1)).close();
|
||||
verify(python).setPythonCommand("python");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseCondaCommonStdout()
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
StringBuilder sb = new StringBuilder()
|
||||
.append("# comment1\n")
|
||||
.append("# comment2\n")
|
||||
.append("env1 /location1\n")
|
||||
.append("env2 /location2\n");
|
||||
|
||||
Map<String, String> locationPerEnv =
|
||||
PythonCondaInterpreter.parseCondaCommonStdout(sb.toString());
|
||||
|
||||
assertEquals("/location1", locationPerEnv.get("env1"));
|
||||
assertEquals("/location2", locationPerEnv.get("env2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRestArgsFromMatcher() {
|
||||
Matcher m =
|
||||
PythonCondaInterpreter.PATTERN_COMMAND_ENV.matcher("env remove --name test --yes");
|
||||
m.matches();
|
||||
|
||||
List<String> restArgs = PythonCondaInterpreter.getRestArgsFromMatcher(m);
|
||||
List<String> expected = Arrays.asList(new String[]{"remove", "--name", "test", "--yes"});
|
||||
assertEquals(expected, restArgs);
|
||||
}
|
||||
|
||||
private InterpreterContext getInterpreterContext() {
|
||||
return new InterpreterContext(
|
||||
"noteId",
|
||||
"paragraphId",
|
||||
null,
|
||||
"paragraphTitle",
|
||||
"paragraphText",
|
||||
new AuthenticationInfo(),
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new InterpreterOutput(null));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,87 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class PythonDockerInterpreterTest {
|
||||
private PythonDockerInterpreter docker;
|
||||
private PythonInterpreter python;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
docker = spy(new PythonDockerInterpreter(new Properties()));
|
||||
python = mock(PythonInterpreter.class);
|
||||
|
||||
InterpreterGroup group = new InterpreterGroup();
|
||||
group.put("note", Arrays.asList(python, docker));
|
||||
python.setInterpreterGroup(group);
|
||||
docker.setInterpreterGroup(group);
|
||||
doReturn(true).when(docker).pull(any(InterpreterOutput.class), anyString());
|
||||
doReturn(python).when(docker).getPythonInterpreter();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testActivateEnv() {
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
docker.interpret("activate env", context);
|
||||
verify(python, times(1)).open();
|
||||
verify(python, times(1)).close();
|
||||
verify(docker, times(1)).pull(any(InterpreterOutput.class), anyString());
|
||||
verify(python).setPythonCommand("docker run -i --rm env python -iu");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeactivate() {
|
||||
InterpreterContext context = getInterpreterContext();
|
||||
docker.interpret("deactivate", context);
|
||||
verify(python, times(1)).open();
|
||||
verify(python, times(1)).close();
|
||||
verify(python).setPythonCommand(null);
|
||||
}
|
||||
|
||||
private InterpreterContext getInterpreterContext() {
|
||||
return new InterpreterContext(
|
||||
"noteId",
|
||||
"paragraphId",
|
||||
"replName",
|
||||
"paragraphTitle",
|
||||
"paragraphText",
|
||||
new AuthenticationInfo(),
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new InterpreterOutput(null));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,160 +0,0 @@
|
|||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* In order for this test to work, test env must have installed:
|
||||
* <ol>
|
||||
* - <li>Python</li>
|
||||
* - <li>Matplotlib</li>
|
||||
* <ol>
|
||||
*
|
||||
* Your PYTHONPATH should also contain the directory of the Matplotlib
|
||||
* backend files. Usually these can be found in $ZEPPELIN_HOME/interpreter/lib/python.
|
||||
*
|
||||
* To run manually on such environment, use:
|
||||
* <code>
|
||||
* mvn -Dpython.test.exclude='' test -pl python -am
|
||||
* </code>
|
||||
*/
|
||||
public class PythonInterpreterMatplotlibTest {
|
||||
|
||||
private InterpreterGroup intpGroup;
|
||||
private PythonInterpreter python;
|
||||
|
||||
private InterpreterContext context;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Properties p = new Properties();
|
||||
p.setProperty("zeppelin.python", "python");
|
||||
p.setProperty("zeppelin.python.maxResult", "100");
|
||||
|
||||
intpGroup = new InterpreterGroup();
|
||||
|
||||
python = new PythonInterpreter(p);
|
||||
python.setInterpreterGroup(intpGroup);
|
||||
python.open();
|
||||
|
||||
List<Interpreter> interpreters = new LinkedList<>();
|
||||
interpreters.add(python);
|
||||
intpGroup.put("note", interpreters);
|
||||
|
||||
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
|
||||
new HashMap<String, Object>(), new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null), null,
|
||||
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dependenciesAreInstalled() {
|
||||
// matplotlib
|
||||
InterpreterResult ret = python.interpret("import matplotlib", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
|
||||
// inline backend
|
||||
ret = python.interpret("import backend_zinline", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void showPlot() {
|
||||
// Simple plot test
|
||||
InterpreterResult ret;
|
||||
ret = python.interpret("import matplotlib.pyplot as plt", context);
|
||||
ret = python.interpret("z.configure_mpl(interactive=False)", context);
|
||||
ret = python.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret = python.interpret("plt.show()", context);
|
||||
|
||||
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertEquals(ret.message().get(0).getData(), Type.HTML, ret.message().get(0).getType());
|
||||
assertTrue(ret.message().get(0).getData().contains("data:image/png;base64"));
|
||||
assertTrue(ret.message().get(0).getData().contains("<div>"));
|
||||
}
|
||||
|
||||
@Test
|
||||
// Test for when configuration is set to auto-close figures after show().
|
||||
public void testClose() {
|
||||
InterpreterResult ret;
|
||||
InterpreterResult ret1;
|
||||
InterpreterResult ret2;
|
||||
ret = python.interpret("import matplotlib.pyplot as plt", context);
|
||||
ret = python.interpret("z.configure_mpl(interactive=False)", context);
|
||||
ret = python.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret1 = python.interpret("plt.show()", context);
|
||||
|
||||
// Second call to show() should print nothing, and Type should be TEXT.
|
||||
// This is because when close=True, there should be no living instances
|
||||
// of FigureManager, causing show() to return before setting the output
|
||||
// type to HTML.
|
||||
ret = python.interpret("plt.show()", context);
|
||||
assertEquals(0, ret.message().size());
|
||||
|
||||
// Now test that new plot is drawn. It should be identical to the
|
||||
// previous one.
|
||||
ret = python.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret2 = python.interpret("plt.show()", context);
|
||||
assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType());
|
||||
assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
// Test for when configuration is set to not auto-close figures after show().
|
||||
public void testNoClose() {
|
||||
InterpreterResult ret;
|
||||
InterpreterResult ret1;
|
||||
InterpreterResult ret2;
|
||||
ret = python.interpret("import matplotlib.pyplot as plt", context);
|
||||
ret = python.interpret("z.configure_mpl(interactive=False, close=False)", context);
|
||||
ret = python.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret1 = python.interpret("plt.show()", context);
|
||||
|
||||
// Second call to show() should print nothing, and Type should be HTML.
|
||||
// This is because when close=False, there should be living instances
|
||||
// of FigureManager, causing show() to set the output
|
||||
// type to HTML even though the figure is inactive.
|
||||
ret = python.interpret("plt.show()", context);
|
||||
assertEquals("", ret.message().get(0).getData());
|
||||
|
||||
// Now test that plot can be reshown if it is updated. It should be
|
||||
// different from the previous one because it will plot the same line
|
||||
// again but in a different color.
|
||||
ret = python.interpret("plt.plot([1, 2, 3])", context);
|
||||
ret2 = python.interpret("plt.show()", context);
|
||||
assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData());
|
||||
}
|
||||
}
|
||||
|
|
@ -1,175 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* In order for this test to work, test env must have installed:
|
||||
* <ol>
|
||||
* - <li>Python</li>
|
||||
* - <li>NumPy</li>
|
||||
* - <li>Pandas</li>
|
||||
* - <li>PandaSql</li>
|
||||
* <ol>
|
||||
*
|
||||
* To run manually on such environment, use:
|
||||
* <code>
|
||||
* mvn -Dpython.test.exclude='' test -pl python -am
|
||||
* </code>
|
||||
*/
|
||||
public class PythonInterpreterPandasSqlTest {
|
||||
|
||||
private InterpreterGroup intpGroup;
|
||||
private PythonInterpreterPandasSql sql;
|
||||
private PythonInterpreter python;
|
||||
|
||||
private InterpreterContext context;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Properties p = new Properties();
|
||||
p.setProperty("zeppelin.python", "python");
|
||||
p.setProperty("zeppelin.python.maxResult", "100");
|
||||
|
||||
intpGroup = new InterpreterGroup();
|
||||
|
||||
python = new PythonInterpreter(p);
|
||||
python.setInterpreterGroup(intpGroup);
|
||||
python.open();
|
||||
|
||||
sql = new PythonInterpreterPandasSql(p);
|
||||
sql.setInterpreterGroup(intpGroup);
|
||||
|
||||
intpGroup.put("note", Arrays.asList(python, sql));
|
||||
|
||||
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
|
||||
new HashMap<String, Object>(), new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null), null,
|
||||
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
|
||||
|
||||
//important to be last step
|
||||
sql.open();
|
||||
//it depends on python interpreter presence in the same group
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dependenciesAreInstalled() {
|
||||
InterpreterResult ret = python.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void errorMessageIfDependenciesNotInstalled() {
|
||||
InterpreterResult ret;
|
||||
// given
|
||||
ret = python.interpret(
|
||||
"pysqldf = lambda q: print('Can not execute SQL as Python dependency is not installed')",
|
||||
context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
|
||||
// when
|
||||
ret = sql.interpret("SELECT * from something", context);
|
||||
|
||||
// then
|
||||
assertNotNull(ret);
|
||||
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertTrue(ret.message().get(0).getData().contains("dependency is not installed"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sqlOverTestDataPrintsTable() {
|
||||
InterpreterResult ret;
|
||||
// given
|
||||
//String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34";
|
||||
ret = python.interpret("import pandas as pd", context);
|
||||
ret = python.interpret("import numpy as np", context);
|
||||
// DataFrame df2 \w test data
|
||||
ret = python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), "+
|
||||
"'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
|
||||
//when
|
||||
ret = sql.interpret("select name, age from df2 where age < 40", context);
|
||||
|
||||
//then
|
||||
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
|
||||
//assertEquals(expectedTable, ret.message()); //somehow it's same but not equal
|
||||
assertTrue(ret.message().get(0).getData().indexOf("moon\t33") > 0);
|
||||
assertTrue(ret.message().get(0).getData().indexOf("park\t34") > 0);
|
||||
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from df2", context).code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void badSqlSyntaxFails() {
|
||||
//when
|
||||
InterpreterResult ret = sql.interpret("select wrong syntax", context);
|
||||
|
||||
//then
|
||||
assertNotNull("Interpreter returned 'null'", ret);
|
||||
//System.out.println("\nInterpreter response: \n" + ret.message());
|
||||
assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
|
||||
assertTrue(ret.message().get(0).getData().length() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void showDataFrame() {
|
||||
InterpreterResult ret;
|
||||
ret = python.interpret("import pandas as pd", context);
|
||||
ret = python.interpret("import numpy as np", context);
|
||||
|
||||
// given a Pandas DataFrame with an index and non-text data
|
||||
ret = python.interpret("index = pd.Index([10, 11, 12, 13], name='index_name')", context);
|
||||
ret = python.interpret("d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
|
||||
ret = python.interpret("df1 = pd.DataFrame(d1, index=index)", context);
|
||||
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
|
||||
// when
|
||||
ret = python.interpret("z.show(df1, show_index=True)", context);
|
||||
|
||||
// then
|
||||
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
|
||||
assertTrue(ret.message().get(0).getData().indexOf("index_name") == 0);
|
||||
assertTrue(ret.message().get(0).getData().indexOf("13") > 0);
|
||||
assertTrue(ret.message().get(0).getData().indexOf("nan") > 0);
|
||||
assertTrue(ret.message().get(0).getData().indexOf("6.7") > 0);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,283 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import static org.apache.zeppelin.python.PythonInterpreter.DEFAULT_ZEPPELIN_PYTHON;
|
||||
import static org.apache.zeppelin.python.PythonInterpreter.MAX_RESULT;
|
||||
import static org.apache.zeppelin.python.PythonInterpreter.ZEPPELIN_PYTHON;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* Python interpreter unit test
|
||||
*
|
||||
* Important: ALL tests here DO NOT REQUIRE Python to be installed
|
||||
* If Python dependency is required, please look at PythonInterpreterWithPythonInstalledTest
|
||||
*/
|
||||
public class PythonInterpreterTest {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PythonProcess.class);
|
||||
|
||||
PythonInterpreter zeppelinPythonInterpreter = null;
|
||||
PythonInterpreter pythonInterpreter = null;
|
||||
PythonProcess mockPythonProcess;
|
||||
String cmdHistory;
|
||||
|
||||
public static Properties getPythonTestProperties() {
|
||||
Properties p = new Properties();
|
||||
p.setProperty(ZEPPELIN_PYTHON, DEFAULT_ZEPPELIN_PYTHON);
|
||||
p.setProperty(MAX_RESULT, "1000");
|
||||
return p;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws IOException {
|
||||
cmdHistory = "";
|
||||
|
||||
/*Mock python process*/
|
||||
mockPythonProcess = mock(PythonProcess.class);
|
||||
when(mockPythonProcess.getPid()).thenReturn(1L);
|
||||
when(mockPythonProcess.sendAndGetResult(anyString())).thenAnswer(new Answer<String>() {
|
||||
@Override public String answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
return answerFromPythonMock(invocationOnMock);
|
||||
}
|
||||
});
|
||||
|
||||
// python interpreter
|
||||
pythonInterpreter = spy(new PythonInterpreter(getPythonTestProperties()));
|
||||
zeppelinPythonInterpreter = new PythonInterpreter(getPythonTestProperties());
|
||||
|
||||
// create interpreter group
|
||||
InterpreterGroup group = new InterpreterGroup();
|
||||
group.put("note", new LinkedList<Interpreter>());
|
||||
group.get("note").add(pythonInterpreter);
|
||||
pythonInterpreter.setInterpreterGroup(group);
|
||||
|
||||
when(pythonInterpreter.getPythonProcess()).thenReturn(mockPythonProcess);
|
||||
when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("ImportError");
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterTest() throws IOException {
|
||||
pythonInterpreter.close();
|
||||
zeppelinPythonInterpreter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpenInterpreter() {
|
||||
pythonInterpreter.open();
|
||||
assertEquals(pythonInterpreter.getPythonProcess().getPid(), 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* If Py4J is not installed, bootstrap_input.py
|
||||
* is not sent to Python process and py4j JavaGateway is not running
|
||||
*/
|
||||
@Test
|
||||
public void testPy4jIsNotInstalled() {
|
||||
pythonInterpreter.open();
|
||||
assertNull(pythonInterpreter.getPy4jPort());
|
||||
assertTrue(cmdHistory.contains("def help()"));
|
||||
assertTrue(cmdHistory.contains("class PyZeppelinContext(object):"));
|
||||
assertTrue(cmdHistory.contains("z = PyZeppelinContext"));
|
||||
assertTrue(cmdHistory.contains("def show"));
|
||||
assertFalse(cmdHistory.contains("GatewayClient"));
|
||||
}
|
||||
|
||||
/**
|
||||
* If Py4J installed, bootstrap_input.py
|
||||
* is sent to interpreter and JavaGateway is running
|
||||
*/
|
||||
@Test
|
||||
public void testPy4jInstalled() throws IOException, InterruptedException {
|
||||
when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("");
|
||||
|
||||
pythonInterpreter.open();
|
||||
Integer py4jPort = pythonInterpreter.getPy4jPort();
|
||||
assertNotNull(py4jPort);
|
||||
|
||||
assertTrue(cmdHistory.contains("def help()"));
|
||||
assertTrue(cmdHistory.contains("class PyZeppelinContext(object):"));
|
||||
assertTrue(cmdHistory.contains("z = Py4jZeppelinContext"));
|
||||
assertTrue(cmdHistory.contains("def show"));
|
||||
assertTrue(cmdHistory.contains("GatewayClient(port=" + py4jPort + ")"));
|
||||
assertTrue(cmdHistory.contains("org.apache.zeppelin.display.Input"));
|
||||
|
||||
assertTrue(serverIsListeningOn(py4jPort));
|
||||
pythonInterpreter.close();
|
||||
TimeUnit.MILLISECONDS.sleep(100);
|
||||
assertFalse(serverIsListeningOn(py4jPort));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClose() throws IOException, InterruptedException {
|
||||
//given: py4j is installed
|
||||
when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("");
|
||||
|
||||
pythonInterpreter.open();
|
||||
Integer py4jPort = pythonInterpreter.getPy4jPort();
|
||||
assertNotNull(py4jPort);
|
||||
|
||||
//when
|
||||
pythonInterpreter.close();
|
||||
TimeUnit.MILLISECONDS.sleep(100);
|
||||
|
||||
//then
|
||||
assertFalse(serverIsListeningOn(py4jPort));
|
||||
verify(mockPythonProcess, times(1)).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterpret() {
|
||||
pythonInterpreter.open();
|
||||
cmdHistory = "";
|
||||
InterpreterResult result = pythonInterpreter.interpret("print a", null);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals("%text print a", result.message().get(0).toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterpretInvalidSyntax() {
|
||||
zeppelinPythonInterpreter.open();
|
||||
InterpreterResult result = zeppelinPythonInterpreter.interpret("for x in range(0,3): print (\"hi\")\n\nz._displayhook()", null);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertTrue(result.message().get(0).toString().contains("hi\nhi\nhi"));
|
||||
|
||||
result = zeppelinPythonInterpreter.interpret("for x in range(0,3): print (\"hi\")\nz._displayhook()", null);
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertTrue(result.message().get(0).toString().contains("SyntaxError: invalid syntax"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if given port is open on 'localhost'
|
||||
* @param port
|
||||
*/
|
||||
private boolean serverIsListeningOn(Integer port) {
|
||||
Socket s = new Socket();
|
||||
boolean serverIsListening = false;
|
||||
|
||||
int retryCount = 0;
|
||||
boolean connected = false;
|
||||
while (connected = tryToConnect(s, port) && retryCount < 10) {
|
||||
serverIsListening = connected;
|
||||
tryToClose(s);
|
||||
retryCount++;
|
||||
s = new Socket();
|
||||
}
|
||||
return serverIsListening;
|
||||
}
|
||||
|
||||
private boolean tryToConnect(Socket s, Integer port) {
|
||||
boolean connected = false;
|
||||
SocketAddress sa = new InetSocketAddress("localhost", port);
|
||||
try {
|
||||
s.connect(sa, 10000);
|
||||
connected = true;
|
||||
} catch (IOException e) {
|
||||
//LOG.warn("Can't open connection to " + sa, e);
|
||||
}
|
||||
return connected;
|
||||
}
|
||||
|
||||
private void tryToClose(Socket s) {
|
||||
try {
|
||||
s.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't close connection to " + s.getInetAddress(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private String answerFromPythonMock(InvocationOnMock invocationOnMock) {
|
||||
Object[] inputs = invocationOnMock.getArguments();
|
||||
String cmdToExecute = (String) inputs[0];
|
||||
|
||||
if (cmdToExecute != null) {
|
||||
cmdHistory += cmdToExecute;
|
||||
String[] lines = cmdToExecute.split("\\n");
|
||||
String output = "";
|
||||
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
output += lines[i];
|
||||
}
|
||||
return output;
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkMultiRowErrorFails() {
|
||||
|
||||
PythonInterpreter pythonInterpreter = new PythonInterpreter(
|
||||
PythonInterpreterTest.getPythonTestProperties()
|
||||
);
|
||||
// create interpreter group
|
||||
InterpreterGroup group = new InterpreterGroup();
|
||||
group.put("note", new LinkedList<Interpreter>());
|
||||
group.get("note").add(pythonInterpreter);
|
||||
pythonInterpreter.setInterpreterGroup(group);
|
||||
|
||||
pythonInterpreter.open();
|
||||
|
||||
String codeRaiseException = "raise Exception(\"test exception\")";
|
||||
InterpreterResult ret = pythonInterpreter.interpret(codeRaiseException, null);
|
||||
|
||||
assertNotNull("Interpreter result for raise exception is Null", ret);
|
||||
|
||||
System.err.println("ret = '" + ret + "'");
|
||||
assertEquals(InterpreterResult.Code.ERROR, ret.code());
|
||||
assertTrue(ret.message().get(0).getData().length() > 0);
|
||||
|
||||
assertNotNull("Interpreter result for text is Null", ret);
|
||||
String codePrintText = "print (\"Exception(\\\"test exception\\\")\")";
|
||||
ret = pythonInterpreter.interpret(codePrintText, null);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertTrue(ret.message().get(0).getData().length() > 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,125 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Python interpreter unit test that user real Python
|
||||
*
|
||||
* Important: ALL tests here REQUIRE Python to be installed
|
||||
* They are excluded from default build, to run them manually do:
|
||||
*
|
||||
* <code>
|
||||
* mvn "-Dtest=org.apache.zeppelin.python.PythonInterpreterWithPythonInstalledTest" test -pl python
|
||||
* </code>
|
||||
*
|
||||
* or
|
||||
* <code>
|
||||
* mvn -Dpython.test.exclude='' test -pl python -am
|
||||
* </code>
|
||||
*/
|
||||
public class PythonInterpreterWithPythonInstalledTest {
|
||||
|
||||
@Test
|
||||
public void badPythonSyntaxFails() {
|
||||
//given
|
||||
PythonInterpreter realPython = new PythonInterpreter(
|
||||
PythonInterpreterTest.getPythonTestProperties());
|
||||
// create interpreter group
|
||||
InterpreterGroup group = new InterpreterGroup();
|
||||
group.put("note", Arrays.asList((Interpreter) realPython));
|
||||
realPython.setInterpreterGroup(group);
|
||||
|
||||
realPython.open();
|
||||
|
||||
//when
|
||||
InterpreterResult ret = realPython.interpret("select wrong syntax", null);
|
||||
|
||||
//then
|
||||
assertNotNull("Interpreter returned 'null'", ret);
|
||||
//System.out.println("\nInterpreter response: \n" + ret.message());
|
||||
assertEquals(InterpreterResult.Code.ERROR, ret.code());
|
||||
assertTrue(ret.message().get(0).getData().length() > 0);
|
||||
|
||||
realPython.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void goodPythonSyntaxRuns() {
|
||||
//given
|
||||
PythonInterpreter realPython = new PythonInterpreter(
|
||||
PythonInterpreterTest.getPythonTestProperties());
|
||||
InterpreterGroup group = new InterpreterGroup();
|
||||
group.put("note", Arrays.asList((Interpreter) realPython));
|
||||
realPython.setInterpreterGroup(group);
|
||||
realPython.open();
|
||||
|
||||
//when
|
||||
InterpreterResult ret = realPython.interpret("help()", null);
|
||||
|
||||
//then
|
||||
assertNotNull("Interpreter returned 'null'", ret);
|
||||
//System.out.println("\nInterpreter response: \n" + ret.message());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
|
||||
assertTrue(ret.message().get(0).getData().length() > 0);
|
||||
|
||||
realPython.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZeppelin1555() {
|
||||
//given
|
||||
PythonInterpreter realPython = new PythonInterpreter(
|
||||
PythonInterpreterTest.getPythonTestProperties());
|
||||
InterpreterGroup group = new InterpreterGroup();
|
||||
group.put("note", Arrays.asList((Interpreter) realPython));
|
||||
realPython.setInterpreterGroup(group);
|
||||
realPython.open();
|
||||
|
||||
//when
|
||||
InterpreterResult ret1 = realPython.interpret("print(\"...\")", null);
|
||||
|
||||
//then
|
||||
//System.out.println("\nInterpreter response: \n" + ret.message());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, ret1.code());
|
||||
assertEquals("...\n", ret1.message().get(0).getData());
|
||||
|
||||
|
||||
InterpreterResult ret2 = realPython.interpret("for i in range(5):", null);
|
||||
//then
|
||||
//System.out.println("\nInterpreterResultterpreter response: \n" + ret2.message());
|
||||
assertEquals(InterpreterResult.Code.ERROR, ret2.code());
|
||||
assertEquals(" File \"<stdin>\", line 2\n" +
|
||||
" \n" +
|
||||
" ^\n" +
|
||||
"IndentationError: expected an indented block\n", ret2.message().get(0).getData());
|
||||
|
||||
realPython.close();
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue