mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
[ZEPPELIN-502] Input form, change from simple input form to native (pyspark syntax)
This commit is contained in:
parent
60d2956d2e
commit
02d1320bd3
5 changed files with 193 additions and 39 deletions
|
|
@ -42,7 +42,14 @@ You can leverage [Zeppelin Dynamic Form]({{BASE_PATH}}/manual/dynamicform.html)
|
|||
Example :
|
||||
```bash
|
||||
%python
|
||||
print "${input_form(name)=defaultValue}"
|
||||
# Input fom
|
||||
print (z.input("f1","defaultValue"))
|
||||
|
||||
# Select fom
|
||||
print (z.select("f1",[("o1","1"),("o2","2")],"2"))
|
||||
|
||||
#Checkbox form
|
||||
print("".join(z.checkbox("f3", [("o1","1"), ("o2","2")],["1"])))
|
||||
```
|
||||
|
||||
## Matplotlib integration
|
||||
|
|
|
|||
|
|
@ -33,6 +33,10 @@
|
|||
<name>Zeppelin: Python interpreter</name>
|
||||
<url>http://zeppelin.incubator.apache.org</url>
|
||||
|
||||
<properties>
|
||||
<py4j.version>0.9.2</py4j.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
|
|
@ -46,7 +50,14 @@
|
|||
<artifactId>commons-exec</artifactId>
|
||||
<version>1.3</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>net.sf.py4j</groupId>
|
||||
<artifactId>py4j</artifactId>
|
||||
<version>${py4j.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
|
|
|
|||
|
|
@ -17,19 +17,19 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import py4j.GatewayServer;
|
||||
|
||||
import java.io.*;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
|
@ -40,16 +40,22 @@ import java.util.Properties;
|
|||
public class PythonInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(PythonInterpreter.class);
|
||||
|
||||
public static final String BOOTSTRAP_PY = "/bootstrap.py";
|
||||
public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py";
|
||||
public static final String PYTHON_PATH = "python.path";
|
||||
public static final String DEFAULT_PYTHON_PATH = "/usr/bin/python";
|
||||
private String pythonPath;
|
||||
|
||||
private Integer port;
|
||||
private GatewayServer gatewayServer;
|
||||
InputStream stdout;
|
||||
OutputStream stdin;
|
||||
BufferedWriter writer;
|
||||
BufferedReader reader;
|
||||
Process process = null;
|
||||
private long pythonPid;
|
||||
private Boolean py4J = false;
|
||||
private InterpreterContext context;
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
|
|
@ -64,6 +70,7 @@ public class PythonInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
|
||||
|
||||
public PythonInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
|
@ -95,11 +102,29 @@ public class PythonInterpreter extends Interpreter {
|
|||
writer = new BufferedWriter(new OutputStreamWriter(stdin));
|
||||
reader = new BufferedReader(new InputStreamReader(stdout));
|
||||
|
||||
|
||||
try {
|
||||
bootStrapInterpreter();
|
||||
logger.info("Bootstrap interpreter with " + BOOTSTRAP_PY);
|
||||
bootStrapInterpreter(BOOTSTRAP_PY);
|
||||
} catch (IOException e) {
|
||||
logger.error("Can't execute bootstrap.py to initiate python process", e);
|
||||
logger.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e);
|
||||
}
|
||||
|
||||
if (py4J = isPy4jInstalled()){
|
||||
port = findRandomOpenPortOnAllLocalInterfaces();
|
||||
logger.info("Py4j gateway port : " + port);
|
||||
try {
|
||||
gatewayServer = new GatewayServer(this, port);
|
||||
gatewayServer.start();
|
||||
logger.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY);
|
||||
bootStrapInterpreter(BOOTSTRAP_INPUT_PY);
|
||||
} catch (IOException e) {
|
||||
logger.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " +
|
||||
"initialize Zeppelin inputs in python process", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -121,36 +146,15 @@ public class PythonInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
try {
|
||||
logger.info("Sending : \n " + cmd);
|
||||
writer.write(cmd + "\n\n");
|
||||
writer.write("print (\"*!?flush reader!?*\")\n\n");
|
||||
writer.flush();
|
||||
} catch (IOException e) {
|
||||
logger.error("Error when sending commands to python process stdin", e);
|
||||
}
|
||||
|
||||
String output = "";
|
||||
String line;
|
||||
this.context = contextInterpreter;
|
||||
|
||||
try {
|
||||
while (!(line = reader.readLine ()).contains("*!?flush reader!?*")){
|
||||
logger.info("Readed line from python shell : " + line);
|
||||
if (line.equals("...")) {
|
||||
logger.info("Syntax error ! ");
|
||||
output += "Syntax error ! ";
|
||||
break;
|
||||
}
|
||||
output += "\r" + line + "\n";
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.error("Error when sending commands to python process stdout", e);
|
||||
}
|
||||
String output = sendCommandToPython(cmd);
|
||||
return new InterpreterResult(Code.SUCCESS, output.replaceAll(">>>", "")
|
||||
.replaceAll("\\.\\.\\.", "").trim());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
if (pythonPid > -1) {
|
||||
|
|
@ -158,7 +162,7 @@ public class PythonInterpreter extends Interpreter {
|
|||
logger.info("Sending SIGINT signal to PID : " + pythonPid);
|
||||
Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
logger.error("Can't send SiGINT to PID : " + pythonPid, e);
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
|
@ -168,7 +172,7 @@ public class PythonInterpreter extends Interpreter {
|
|||
}
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
return FormType.NATIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -199,11 +203,43 @@ public class PythonInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
|
||||
private void bootStrapInterpreter() throws IOException {
|
||||
private String sendCommandToPython(String cmd) {
|
||||
try {
|
||||
logger.info("Sending : \n " + cmd);
|
||||
writer.write(cmd + "\n\n");
|
||||
writer.write("print (\"*!?flush reader!?*\")\n\n");
|
||||
writer.flush();
|
||||
} catch (IOException e) {
|
||||
logger.error("Error when sending commands to python process stdin", e);
|
||||
}
|
||||
|
||||
String output = "";
|
||||
String line;
|
||||
|
||||
try {
|
||||
while (!(line = reader.readLine ()).contains("*!?flush reader!?*")){
|
||||
logger.info("Readed line from python shell : " + line);
|
||||
if (line.equals("...")) {
|
||||
logger.info("Syntax error ! ");
|
||||
output += "Syntax error ! ";
|
||||
break;
|
||||
}
|
||||
|
||||
output += "\r" + line + "\n";
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.error("Error when reading from python process stdout", e);
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
|
||||
private void bootStrapInterpreter(String file) throws IOException {
|
||||
|
||||
BufferedReader bootstrapReader = new BufferedReader(
|
||||
new InputStreamReader(
|
||||
PythonInterpreter.class.getResourceAsStream("/bootstrap.py")));
|
||||
PythonInterpreter.class.getResourceAsStream(file)));
|
||||
String line = null;
|
||||
String bootstrapCode = "";
|
||||
while ((line = bootstrapReader.readLine()) != null)
|
||||
|
|
@ -211,6 +247,8 @@ public class PythonInterpreter extends Interpreter {
|
|||
bootstrapCode += line + "\n";
|
||||
}
|
||||
|
||||
if (py4J && port != null && port != -1)
|
||||
bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString());
|
||||
logger.info("Bootstrap python interpreter with \n " + bootstrapCode);
|
||||
writer.write(bootstrapCode);
|
||||
writer.flush();
|
||||
|
|
@ -234,4 +272,30 @@ public class PythonInterpreter extends Interpreter {
|
|||
return pid;
|
||||
}
|
||||
|
||||
public GUI getGui(){
|
||||
|
||||
return context.getGui();
|
||||
|
||||
}
|
||||
|
||||
private Boolean isPy4jInstalled(){
|
||||
|
||||
String output = sendCommandToPython("\n\nimport py4j\n");
|
||||
if (output.contains("ImportError"))
|
||||
return false;
|
||||
else return true;
|
||||
|
||||
}
|
||||
|
||||
private int findRandomOpenPortOnAllLocalInterfaces() {
|
||||
Integer port = -1;
|
||||
try (ServerSocket socket = new ServerSocket(0);) {
|
||||
port = socket.getLocalPort();
|
||||
socket.close();
|
||||
} catch (IOException e) {
|
||||
logger.error("Can't find an open port", e);
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,12 +46,15 @@ def help():
|
|||
print ('<p>The interpreter can use all modules already installed ')
|
||||
print ('(with pip, easy_install, etc)</p>')
|
||||
print ('<h3>Forms</h3>')
|
||||
print ('You must install py4j in order to use '
|
||||
'the form feature (pip install py4j)')
|
||||
print ('<h4>Input form</h4>')
|
||||
print ('<pre> print "${input_form(name)=defaultValue}"</pre>')
|
||||
print ('<pre>print (z.input("f1","defaultValue"))</pre>')
|
||||
print ('<h4>Selection form</h4>')
|
||||
print ('<pre> print "${select_form(Selection Form)=o1,o1|o2}"</pre>')
|
||||
print ('<pre>print(z.select("f2", [("o1","1"), ("o2","2")],2))</pre>')
|
||||
print ('<h4>Checkbox form</h4>')
|
||||
print ('<pre> print "${checkbox:checkbox_form=o1,o1|o3}"</pre>')
|
||||
print ('<pre> print("".join(z.checkbox("f3", [("o1","1"), '
|
||||
'("o2","2")],["1"])))</pre>')
|
||||
print ('<h3>Matplotlib graph</h3>')
|
||||
print ('<div>The interpreter can display matplotlib graph with ')
|
||||
print ('the function zeppelin_show()</div>')
|
||||
|
|
@ -85,3 +88,21 @@ def zeppelin_show(p, width="0", height="0"):
|
|||
style += 'height:'+height
|
||||
print("%html <div style='" + style + "'>" + img.read() + "<div>")
|
||||
|
||||
|
||||
# If py4j is detected, these class will be override
|
||||
# with the implementation in bootstrap_input.py
|
||||
|
||||
class PyZeppelinContext():
|
||||
errorMsg = "You must install py4j Python module " \
|
||||
"(pip install py4j) to use Zeppelin dynamic forms features"
|
||||
def __init__(self, zc):
|
||||
self.z = zc
|
||||
def input(self, name, defaultValue=""):
|
||||
print (self.errorMsg)
|
||||
def select(self, name, options, defaultValue=""):
|
||||
print (self.errorMsg)
|
||||
def checkbox(self, name, options, defaultChecked=[]):
|
||||
print (self.errorMsg)
|
||||
|
||||
z = PyZeppelinContext("")
|
||||
|
||||
|
|
|
|||
51
python/src/main/resources/bootstrap_input.py
Normal file
51
python/src/main/resources/bootstrap_input.py
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from py4j.java_gateway import JavaGateway
|
||||
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
|
||||
|
||||
client = GatewayClient(port=%PORT%)
|
||||
gateway = JavaGateway(client)
|
||||
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
|
||||
|
||||
|
||||
class PyZeppelinContext():
|
||||
paramOption = gateway.jvm.org.apache.zeppelin.display.Input.ParamOption
|
||||
javaList = gateway.jvm.java.util.ArrayList
|
||||
def __init__(self, zc):
|
||||
self.z = zc
|
||||
def input(self, name, defaultValue=""):
|
||||
return self.z.getGui().input(name, defaultValue)
|
||||
def select(self, name, options, defaultValue=""):
|
||||
javaOptions = gateway.new_array(self.paramOption, len(options))
|
||||
i = 0
|
||||
for tuple in options:
|
||||
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
|
||||
i += 1
|
||||
return self.z.getGui().select(name, defaultValue, javaOptions)
|
||||
def checkbox(self, name, options, defaultChecked=[]):
|
||||
javaOptions = gateway.new_array(self.paramOption, len(options))
|
||||
i = 0
|
||||
for tuple in options:
|
||||
javaOptions[i] = self.paramOption(tuple[0], tuple[1])
|
||||
i += 1
|
||||
javaDefaultCheck = self.javaList()
|
||||
for check in defaultChecked:
|
||||
javaDefaultCheck.append(check)
|
||||
return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions)
|
||||
|
||||
|
||||
z = PyZeppelinContext(gateway.entry_point)
|
||||
Loading…
Reference in a new issue