[ZEPPELIN-502] Input form, change from simple input form to native (pyspark syntax)

This commit is contained in:
Hervé RIVIERE 2016-05-01 18:56:29 +02:00
parent 60d2956d2e
commit 02d1320bd3
5 changed files with 193 additions and 39 deletions

View file

@ -42,7 +42,14 @@ You can leverage [Zeppelin Dynamic Form]({{BASE_PATH}}/manual/dynamicform.html)
Example :
```bash
%python
print "${input_form(name)=defaultValue}"
# Input fom
print (z.input("f1","defaultValue"))
# Select fom
print (z.select("f1",[("o1","1"),("o2","2")],"2"))
#Checkbox form
print("".join(z.checkbox("f3", [("o1","1"), ("o2","2")],["1"])))
```
## Matplotlib integration

View file

@ -33,6 +33,10 @@
<name>Zeppelin: Python interpreter</name>
<url>http://zeppelin.incubator.apache.org</url>
<properties>
<py4j.version>0.9.2</py4j.version>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
@ -46,7 +50,14 @@
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>${py4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View file

@ -17,19 +17,19 @@
package org.apache.zeppelin.python;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
import java.io.*;
import java.lang.reflect.Field;
import java.net.ServerSocket;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
@ -40,16 +40,22 @@ import java.util.Properties;
public class PythonInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(PythonInterpreter.class);
public static final String BOOTSTRAP_PY = "/bootstrap.py";
public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py";
public static final String PYTHON_PATH = "python.path";
public static final String DEFAULT_PYTHON_PATH = "/usr/bin/python";
private String pythonPath;
private Integer port;
private GatewayServer gatewayServer;
InputStream stdout;
OutputStream stdin;
BufferedWriter writer;
BufferedReader reader;
Process process = null;
private long pythonPid;
private Boolean py4J = false;
private InterpreterContext context;
static {
Interpreter.register(
@ -64,6 +70,7 @@ public class PythonInterpreter extends Interpreter {
}
public PythonInterpreter(Properties property) {
super(property);
}
@ -95,11 +102,29 @@ public class PythonInterpreter extends Interpreter {
writer = new BufferedWriter(new OutputStreamWriter(stdin));
reader = new BufferedReader(new InputStreamReader(stdout));
try {
bootStrapInterpreter();
logger.info("Bootstrap interpreter with " + BOOTSTRAP_PY);
bootStrapInterpreter(BOOTSTRAP_PY);
} catch (IOException e) {
logger.error("Can't execute bootstrap.py to initiate python process", e);
logger.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e);
}
if (py4J = isPy4jInstalled()){
port = findRandomOpenPortOnAllLocalInterfaces();
logger.info("Py4j gateway port : " + port);
try {
gatewayServer = new GatewayServer(this, port);
gatewayServer.start();
logger.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY);
bootStrapInterpreter(BOOTSTRAP_INPUT_PY);
} catch (IOException e) {
logger.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " +
"initialize Zeppelin inputs in python process", e);
}
}
}
@Override
@ -121,36 +146,15 @@ public class PythonInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
try {
logger.info("Sending : \n " + cmd);
writer.write(cmd + "\n\n");
writer.write("print (\"*!?flush reader!?*\")\n\n");
writer.flush();
} catch (IOException e) {
logger.error("Error when sending commands to python process stdin", e);
}
String output = "";
String line;
this.context = contextInterpreter;
try {
while (!(line = reader.readLine ()).contains("*!?flush reader!?*")){
logger.info("Readed line from python shell : " + line);
if (line.equals("...")) {
logger.info("Syntax error ! ");
output += "Syntax error ! ";
break;
}
output += "\r" + line + "\n";
}
} catch (IOException e) {
logger.error("Error when sending commands to python process stdout", e);
}
String output = sendCommandToPython(cmd);
return new InterpreterResult(Code.SUCCESS, output.replaceAll(">>>", "")
.replaceAll("\\.\\.\\.", "").trim());
}
@Override
public void cancel(InterpreterContext context) {
if (pythonPid > -1) {
@ -158,7 +162,7 @@ public class PythonInterpreter extends Interpreter {
logger.info("Sending SIGINT signal to PID : " + pythonPid);
Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
} catch (IOException e) {
e.printStackTrace();
logger.error("Can't send SiGINT to PID : " + pythonPid, e);
}
}
else {
@ -168,7 +172,7 @@ public class PythonInterpreter extends Interpreter {
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
return FormType.NATIVE;
}
@Override
@ -199,11 +203,43 @@ public class PythonInterpreter extends Interpreter {
}
private void bootStrapInterpreter() throws IOException {
private String sendCommandToPython(String cmd) {
try {
logger.info("Sending : \n " + cmd);
writer.write(cmd + "\n\n");
writer.write("print (\"*!?flush reader!?*\")\n\n");
writer.flush();
} catch (IOException e) {
logger.error("Error when sending commands to python process stdin", e);
}
String output = "";
String line;
try {
while (!(line = reader.readLine ()).contains("*!?flush reader!?*")){
logger.info("Readed line from python shell : " + line);
if (line.equals("...")) {
logger.info("Syntax error ! ");
output += "Syntax error ! ";
break;
}
output += "\r" + line + "\n";
}
} catch (IOException e) {
logger.error("Error when reading from python process stdout", e);
}
return output;
}
private void bootStrapInterpreter(String file) throws IOException {
BufferedReader bootstrapReader = new BufferedReader(
new InputStreamReader(
PythonInterpreter.class.getResourceAsStream("/bootstrap.py")));
PythonInterpreter.class.getResourceAsStream(file)));
String line = null;
String bootstrapCode = "";
while ((line = bootstrapReader.readLine()) != null)
@ -211,6 +247,8 @@ public class PythonInterpreter extends Interpreter {
bootstrapCode += line + "\n";
}
if (py4J && port != null && port != -1)
bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString());
logger.info("Bootstrap python interpreter with \n " + bootstrapCode);
writer.write(bootstrapCode);
writer.flush();
@ -234,4 +272,30 @@ public class PythonInterpreter extends Interpreter {
return pid;
}
public GUI getGui(){
return context.getGui();
}
private Boolean isPy4jInstalled(){
String output = sendCommandToPython("\n\nimport py4j\n");
if (output.contains("ImportError"))
return false;
else return true;
}
private int findRandomOpenPortOnAllLocalInterfaces() {
Integer port = -1;
try (ServerSocket socket = new ServerSocket(0);) {
port = socket.getLocalPort();
socket.close();
} catch (IOException e) {
logger.error("Can't find an open port", e);
}
return port;
}
}

View file

@ -46,12 +46,15 @@ def help():
print ('<p>The interpreter can use all modules already installed ')
print ('(with pip, easy_install, etc)</p>')
print ('<h3>Forms</h3>')
print ('You must install py4j in order to use '
'the form feature (pip install py4j)')
print ('<h4>Input form</h4>')
print ('<pre> print "&#36{input_form(name)=defaultValue}"</pre>')
print ('<pre>print (z.input("f1","defaultValue"))</pre>')
print ('<h4>Selection form</h4>')
print ('<pre> print "&#36{select_form(Selection Form)=o1,o1|o2}"</pre>')
print ('<pre>print(z.select("f2", [("o1","1"), ("o2","2")],2))</pre>')
print ('<h4>Checkbox form</h4>')
print ('<pre> print "&#36{checkbox:checkbox_form=o1,o1|o3}"</pre>')
print ('<pre> print("".join(z.checkbox("f3", [("o1","1"), '
'("o2","2")],["1"])))</pre>')
print ('<h3>Matplotlib graph</h3>')
print ('<div>The interpreter can display matplotlib graph with ')
print ('the function zeppelin_show()</div>')
@ -85,3 +88,21 @@ def zeppelin_show(p, width="0", height="0"):
style += 'height:'+height
print("%html <div style='" + style + "'>" + img.read() + "<div>")
# If py4j is detected, these class will be override
# with the implementation in bootstrap_input.py
class PyZeppelinContext():
errorMsg = "You must install py4j Python module " \
"(pip install py4j) to use Zeppelin dynamic forms features"
def __init__(self, zc):
self.z = zc
def input(self, name, defaultValue=""):
print (self.errorMsg)
def select(self, name, options, defaultValue=""):
print (self.errorMsg)
def checkbox(self, name, options, defaultChecked=[]):
print (self.errorMsg)
z = PyZeppelinContext("")

View file

@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from py4j.java_gateway import JavaGateway
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
client = GatewayClient(port=%PORT%)
gateway = JavaGateway(client)
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
class PyZeppelinContext():
paramOption = gateway.jvm.org.apache.zeppelin.display.Input.ParamOption
javaList = gateway.jvm.java.util.ArrayList
def __init__(self, zc):
self.z = zc
def input(self, name, defaultValue=""):
return self.z.getGui().input(name, defaultValue)
def select(self, name, options, defaultValue=""):
javaOptions = gateway.new_array(self.paramOption, len(options))
i = 0
for tuple in options:
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
i += 1
return self.z.getGui().select(name, defaultValue, javaOptions)
def checkbox(self, name, options, defaultChecked=[]):
javaOptions = gateway.new_array(self.paramOption, len(options))
i = 0
for tuple in options:
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
i += 1
javaDefaultCheck = self.javaList()
for check in defaultChecked:
javaDefaultCheck.append(check)
return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions)
z = PyZeppelinContext(gateway.entry_point)