[ZEPPELIN-502] Add tests and reformating code to help tests writing

This commit is contained in:
Hervé RIVIERE 2016-05-05 20:21:04 +02:00
parent fecaf256ef
commit 7d533e128a
4 changed files with 420 additions and 96 deletions

View file

@ -73,8 +73,18 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>

View file

@ -18,17 +18,22 @@
package org.apache.zeppelin.python;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
import java.io.*;
import java.lang.reflect.Field;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.util.Collection;
import java.util.List;
@ -44,33 +49,27 @@ public class PythonInterpreter extends Interpreter {
public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py";
public static final String PYTHON_PATH = "python";
public static final String DEFAULT_PYTHON_PATH = "python";
private String pythonPath;
private Integer port;
private GatewayServer gatewayServer;
InputStream stdout;
OutputStream stdin;
BufferedWriter writer;
BufferedReader reader;
Process process = null;
PythonProcess process = null;
private long pythonPid;
private Boolean py4J = false;
private InterpreterContext context;
static {
Interpreter.register(
"python",
"python",
PythonInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(PYTHON_PATH, DEFAULT_PYTHON_PATH,
"python",
"python",
PythonInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(PYTHON_PATH, DEFAULT_PYTHON_PATH,
"Python directory. Default : python (assume python is in your $PATH)")
.build()
.build()
);
}
public PythonInterpreter(Properties property) {
super(property);
}
@ -80,28 +79,22 @@ public class PythonInterpreter extends Interpreter {
logger.info("Starting Python interpreter .....");
pythonPath = getProperty(PYTHON_PATH);
logger.info("Python path is set to:" + pythonPath );
logger.info("Python path is set to:" + property.getProperty(PYTHON_PATH));
ProcessBuilder builder = new ProcessBuilder(pythonPath, "-iu");
builder.redirectErrorStream(true);
process = getPythonProcess();
try {
process = builder.start();
process.open();
} catch (IOException e) {
logger.error("Can't start python process", e);
logger.error("Can't start the python process", e);
}
pythonPid = getPidOfProcess(process);
logger.info("python PID : " + pythonPid);
stdout = process.getInputStream ();
stdin = process.getOutputStream();
writer = new BufferedWriter(new OutputStreamWriter(stdin));
reader = new BufferedReader(new InputStreamReader(stdout));
try {
logger.info("python PID : " + process.getPid());
} catch (Exception e) {
logger.warn("Can't find python pid process", e);
}
try {
logger.info("Bootstrap interpreter with " + BOOTSTRAP_PY);
@ -110,7 +103,7 @@ public class PythonInterpreter extends Interpreter {
logger.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e);
}
if (py4J = isPy4jInstalled()){
if (py4J = isPy4jInstalled()) {
port = findRandomOpenPortOnAllLocalInterfaces();
logger.info("Py4j gateway port : " + port);
try {
@ -120,7 +113,7 @@ public class PythonInterpreter extends Interpreter {
bootStrapInterpreter(BOOTSTRAP_INPUT_PY);
} catch (IOException e) {
logger.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " +
"initialize Zeppelin inputs in python process", e);
"initialize Zeppelin inputs in python process", e);
}
}
@ -132,11 +125,8 @@ public class PythonInterpreter extends Interpreter {
logger.info("closing Python interpreter .....");
try {
process.destroy();
reader.close();
writer.close();
stdin.close();
stdout.close();
process.close();
gatewayServer.shutdown();
} catch (IOException e) {
logger.error("Can't close the interpreter", e);
}
@ -157,19 +147,13 @@ public class PythonInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
if (pythonPid > -1) {
try {
logger.info("Sending SIGINT signal to PID : " + pythonPid);
Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
} catch (IOException e) {
logger.error("Can't send SiGINT to PID : " + pythonPid, e);
}
}
else {
logger.warn("Non UNIX/Linux system, close the interpreter");
close();
try {
process.interrupt();
} catch (IOException e) {
logger.error("Can't interrupt the python interpreter", e);
}
}
@Override
public FormType getFormType() {
return FormType.NATIVE;
@ -191,6 +175,13 @@ public class PythonInterpreter extends Interpreter {
return null;
}
public PythonProcess getPythonProcess() {
if (process == null)
return new PythonProcess(getProperty(PYTHON_PATH));
else
return process;
}
private Job getRunningJob(String paragraphId) {
Job foundJob = null;
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
@ -204,33 +195,15 @@ public class PythonInterpreter extends Interpreter {
private String sendCommandToPython(String cmd) {
try {
logger.info("Sending : \n " + cmd);
writer.write(cmd + "\n\n");
writer.write("print (\"*!?flush reader!?*\")\n\n");
writer.flush();
} catch (IOException e) {
logger.error("Error when sending commands to python process stdin", e);
}
String output = "";
String line;
logger.info("Sending : \n " + cmd);
try {
while (!(line = reader.readLine ()).contains("*!?flush reader!?*")){
logger.info("Readed line from python shell : " + line);
if (line.equals("...")) {
logger.info("Syntax error ! ");
output += "Syntax error ! ";
break;
}
output += "\r" + line + "\n";
}
output = process.sendAndGetResult(cmd);
} catch (IOException e) {
logger.error("Error when reading from python process stdout", e);
logger.error("Error when sending commands to python process", e);
}
return output;
}
@ -238,47 +211,34 @@ public class PythonInterpreter extends Interpreter {
private void bootStrapInterpreter(String file) throws IOException {
BufferedReader bootstrapReader = new BufferedReader(
new InputStreamReader(
PythonInterpreter.class.getResourceAsStream(file)));
new InputStreamReader(
PythonInterpreter.class.getResourceAsStream(file)));
String line = null;
String bootstrapCode = "";
while ((line = bootstrapReader.readLine()) != null)
{
while ((line = bootstrapReader.readLine()) != null) {
bootstrapCode += line + "\n";
}
if (py4J && port != null && port != -1)
bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString());
logger.info("Bootstrap python interpreter with \n " + bootstrapCode);
writer.write(bootstrapCode);
writer.flush();
sendCommandToPython(bootstrapCode);
}
private long getPidOfProcess(Process p) {
long pid = -1;
try {
if (p.getClass().getName().equals("java.lang.UNIXProcess")) {
Field f = p.getClass().getDeclaredField("pid");
f.setAccessible(true);
pid = f.getLong(p);
f.setAccessible(false);
}
}
catch (Exception e) {
logger.warn("Can't find python pid process", e);
pid = -1;
}
return pid;
}
public GUI getGui(){
public GUI getGui() {
return context.getGui();
}
private Boolean isPy4jInstalled(){
public Integer getPy4JPort() {
return port;
}
public Boolean isPy4jInstalled() {
String output = sendCommandToPython("\n\nimport py4j\n");
if (output.contains("ImportError"))

View file

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
/**
* Object encapsulated interactive
* Python process (REPL) used by python interpreter
*/
public class PythonProcess {
Logger logger = LoggerFactory.getLogger(PythonProcess.class);
InputStream stdout;
OutputStream stdin;
BufferedWriter writer;
BufferedReader reader;
Process process;
private String binPath;
private long pid;
public PythonProcess(String binPath) {
this.binPath = binPath;
}
public void open() throws IOException {
ProcessBuilder builder = new ProcessBuilder(binPath, "-iu");
builder.redirectErrorStream(true);
process = builder.start();
stdout = process.getInputStream();
stdin = process.getOutputStream();
writer = new BufferedWriter(new OutputStreamWriter(stdin));
reader = new BufferedReader(new InputStreamReader(stdout));
try {
pid = findPid();
} catch (Exception e) {
logger.warn("Can't find python pid process", e);
pid = -1;
}
}
public void close() throws IOException {
process.destroy();
reader.close();
writer.close();
stdin.close();
stdout.close();
}
public void interrupt() throws IOException {
if (pid > -1) {
logger.info("Sending SIGINT signal to PID : " + pid);
Runtime.getRuntime().exec("kill -SIGINT " + pid);
} else {
logger.warn("Non UNIX/Linux system, close the interpreter");
close();
}
}
public String sendAndGetResult(String cmd) throws IOException {
writer.write(cmd + "\n\n");
writer.write("print (\"*!?flush reader!?*\")\n\n");
writer.flush();
String output = "";
String line;
while (!(line = reader.readLine()).contains("*!?flush reader!?*")) {
logger.debug("Readed line from python shell : " + line);
if (line.equals("...")) {
logger.warn("Syntax error ! ");
output += "Syntax error ! ";
break;
}
output += "\r" + line + "\n";
}
return output;
}
private long findPid() throws NoSuchFieldException, IllegalAccessException {
long pid = -1;
if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
Field f = process.getClass().getDeclaredField("pid");
f.setAccessible(true);
pid = f.getLong(process);
f.setAccessible(false);
}
return pid;
}
public long getPid() {
return pid;
}
}

View file

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.anyString;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Properties;
public class PythonInterpreterTest {
Logger logger = LoggerFactory.getLogger(PythonProcess.class);
public static final String PYTHON_PATH = "python";
public static final String DEFAULT_PYTHON_PATH = "python";
PythonInterpreter pythonInterpreter = null;
PythonProcess mockPythonProcess;
String cmdHistory;
@Before
public void beforeTest() {
cmdHistory = "";
/*Mock python process*/
mockPythonProcess = mock(PythonProcess.class);
when(mockPythonProcess.getPid()).thenReturn((long) 1);
try {
when(mockPythonProcess.sendAndGetResult(anyString())).thenAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocationOnMock) throws Throwable {
Object[] args = invocationOnMock.getArguments();
String cmd = (String) args[0];
if (cmd != null) {
cmdHistory += cmd;
String[] lines = cmd.split("\\n");
String output = "";
for (int i = 0; i < lines.length; i++) {
output += ">>>" + lines[i];
}
return output;
} else return ">>>";
}
});
} catch (IOException e) {
logger.error("Can't initiate python process", e);
}
Properties properties = new Properties();
properties.put(PYTHON_PATH, DEFAULT_PYTHON_PATH);
pythonInterpreter = spy(new PythonInterpreter(properties));
when(pythonInterpreter.getPythonProcess()).thenReturn(mockPythonProcess);
try {
when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("ImportError");
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testOpenInterpreter() {
pythonInterpreter.open();
assertEquals(pythonInterpreter.getPythonProcess().getPid(), 1);
}
@Test
public void testPy4jIsNotInstalled() {
/*
If Py4J is not installed, bootstrap_input.py
is not sent to Python process and
py4j JavaGateway is not running
*/
pythonInterpreter.open();
assertNull(pythonInterpreter.getPy4JPort());
assertTrue(cmdHistory.contains("def help()"));
assertTrue(cmdHistory.contains("class PyZeppelinContext():"));
assertTrue(cmdHistory.contains("z = PyZeppelinContext"));
assertTrue(cmdHistory.contains("def zeppelin_show"));
assertFalse(cmdHistory.contains("GatewayClient"));
}
@Test
public void testPy4JInstalled() {
/*
If Py4J installed, bootstrap_input.py
is sent to interpreter and JavaGateway is
running
*/
try {
when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn(">>>");
} catch (IOException e) {
e.printStackTrace();
}
pythonInterpreter.open();
Integer py4jPort = pythonInterpreter.getPy4JPort();
assertNotNull(py4jPort);
assertTrue(cmdHistory.contains("def help()"));
assertTrue(cmdHistory.contains("class PyZeppelinContext():"));
assertTrue(cmdHistory.contains("z = PyZeppelinContext"));
assertTrue(cmdHistory.contains("def zeppelin_show"));
assertTrue(cmdHistory.contains("GatewayClient(port=" + py4jPort + ")"));
assertTrue(cmdHistory.contains("org.apache.zeppelin.display.Input"));
assertTrue(checkSocketAdress(py4jPort));
}
@Test
public void testClose() {
try {
when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn(">>>");
} catch (IOException e) {
e.printStackTrace();
}
pythonInterpreter.open();
Integer py4jPort = pythonInterpreter.getPy4JPort();
assertNotNull(py4jPort);
pythonInterpreter.close();
assertFalse(checkSocketAdress(py4jPort));
try {
verify(mockPythonProcess, times(1)).close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testInterpret() {
pythonInterpreter.open();
cmdHistory = "";
InterpreterResult result = pythonInterpreter.interpret("print a", null);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("%text print a", result.toString());
}
private boolean checkSocketAdress(Integer py4jPort) {
Socket s = new Socket();
SocketAddress sa = new InetSocketAddress("localhost", py4jPort);
Boolean working = null;
try {
s.connect(sa, 10000);
} catch (IOException e) {
working = false;
}
if (working == null) {
working = s.isConnected();
try {
s.close();
} catch (IOException e) {
logger.error("Can't close connection to localhost:" + py4jPort, e);
}
}
return working;
}
}