Run python process in docker container

This commit is contained in:
Lee moon soo 2016-11-21 11:53:31 -08:00
parent 4dd36bfa31
commit 41c09d9e47
12 changed files with 370 additions and 40 deletions

View file

@ -190,7 +190,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.python.PythonInterpreterPandasSql,org.apache.zeppelin.python.PythonCondaInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter,org.apache.zeppelin.pig.PigQueryInterpreter,org.apache.zeppelin.scio.ScioInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.python.PythonInterpreterPandasSql,org.apache.zeppelin.python.PythonCondaInterpreter,org.apache.zeppelin.python.PythonDockerInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter,org.apache.zeppelin.pig.PigQueryInterpreter,org.apache.zeppelin.scio.ScioInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

View file

@ -95,7 +95,7 @@ Congratulations, you have successfully installed Apache Zeppelin! Here are few s
* Check [JDBC Interpreter](../interpreter/jdbc.html) to know more about configure and uses multiple JDBC data sources.
#### Zeppelin with Python ...
* Check [Python interpreter](../interpreter/python.html) to know more about Matplotlib, Pandas, Conda integration.
* Check [Python interpreter](../interpreter/python.html) to know more about Matplotlib, Pandas, Conda/Docker environment integration.
#### Multi-user environment ...

View file

@ -56,10 +56,13 @@ The interpreter can only work if you already have python installed (the interpre
To access the help, type **help()**
## Python modules
## Python environments
### Default
By default, PythonInterpreter will use python command defined in `zeppelin.python` property to run python process.
The interpreter can use all modules already installed (with pip, easy_install...)
## Conda
### Conda
[Conda](http://conda.pydata.org/) is an package management system and environment management system for python.
`%python.conda` interpreter lets you change between environments.
@ -83,6 +86,32 @@ Deactivate
%python.conda deactivate
```
### Docker
`%python.docker` interpreter allows PythonInterpreter creates python process in a specified docker container.
#### Usage
Activate an environment
```
%python.docker activate [Repository]
%python.docker activate [Repository:Tag]
%python.docker activate [Image Id]
```
Deactivate
```
%python.docker deactivate
```
Example
```
# activate latest tensorflow image as a python environment
%python.docker activate gcr.io/tensorflow/tensorflow:latest
```
## Using Zeppelin Dynamic Forms
You can leverage [Zeppelin Dynamic Form]({{BASE_PATH}}/manual/dynamicform.html) inside your Python code.

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.python;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,7 +37,6 @@ public class PythonCondaInterpreter extends Interpreter {
Pattern activatePattern = Pattern.compile("activate\\s*(.*)");
Pattern deactivatePattern = Pattern.compile("deactivate");
Pattern helpPattern = Pattern.compile("help");
String pythonCommand = null;
public PythonCondaInterpreter(Properties property) {
super(property);
@ -66,11 +66,11 @@ public class PythonCondaInterpreter extends Interpreter {
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
} else if (activateMatcher.matches()) {
String envName = activateMatcher.group(1);
pythonCommand = "conda run -n " + envName + " \"python -iu\"";
setPythonCommand("conda run -n " + envName + " \"python -iu\"");
restartPythonProcess();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + envName + "\" activated");
} else if (deactivateMatcher.matches()) {
pythonCommand = null;
setPythonCommand(null);
restartPythonProcess();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Deactivated");
} else if (helpMatcher.matches()) {
@ -81,6 +81,11 @@ public class PythonCondaInterpreter extends Interpreter {
}
}
public void setPythonCommand(String cmd) {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
}
private void restartPythonProcess() {
PythonInterpreter python = getPythonInterpreter();
python.close();
@ -106,10 +111,6 @@ public class PythonCondaInterpreter extends Interpreter {
return python;
}
public String getPythonCommand() {
return pythonCommand;
}
private void listEnv(InterpreterOutput out) {
StringBuilder sb = createStringBuilder();
try {
@ -149,7 +150,7 @@ public class PythonCondaInterpreter extends Interpreter {
private void printUsage(InterpreterOutput out) {
try {
out.setType(InterpreterResult.Type.HTML);
out.writeResource("output_templates/usage.html");
out.writeResource("output_templates/conda_usage.html");
} catch (IOException e) {
logger.error("Can't print usage", e);
}
@ -170,6 +171,21 @@ public class PythonCondaInterpreter extends Interpreter {
return 0;
}
/**
* Use python interpreter's scheduler.
* To make sure %python.conda paragraph and %python paragraph runs sequentially
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
}
protected int runCommand(StringBuilder sb, String ... command)
throws IOException, InterruptedException {
ProcessBuilder builder = new ProcessBuilder(command);

View file

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Helps run python interpreter on a docker container
*/
public class PythonDockerInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(PythonDockerInterpreter.class);
Pattern activatePattern = Pattern.compile("activate\\s*(.*)");
Pattern deactivatePattern = Pattern.compile("deactivate");
Pattern helpPattern = Pattern.compile("help");
public PythonDockerInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
}
@Override
public void close() {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterOutput out = context.out;
Matcher activateMatcher = activatePattern.matcher(st);
Matcher deactivateMatcher = deactivatePattern.matcher(st);
Matcher helpMatcher = helpPattern.matcher(st);
if (st == null || st.isEmpty() || helpMatcher.matches()) {
printUsage(out);
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
} else if (activateMatcher.matches()) {
String image = activateMatcher.group(1);
pull(out, image);
setPythonCommand("docker run -i --rm " + image + " python -iu");
restartPythonProcess();
out.clear();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated");
} else if (deactivateMatcher.matches()) {
setPythonCommand(null);
restartPythonProcess();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Deactivated");
} else {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Not supported command: " + st);
}
}
public void setPythonCommand(String cmd) {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
}
private void printUsage(InterpreterOutput out) {
try {
out.setType(InterpreterResult.Type.HTML);
out.writeResource("output_templates/docker_usage.html");
} catch (IOException e) {
logger.error("Can't print usage", e);
}
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.NONE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
/**
* Use python interpreter's scheduler.
* To make sure %python.docker paragraph and %python paragraph runs sequentially
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
}
private void restartPythonProcess() {
PythonInterpreter python = getPythonInterpreter();
python.close();
python.open();
}
protected PythonInterpreter getPythonInterpreter() {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
python = (PythonInterpreter) p;
if (lazy != null) {
lazy.open();
}
return python;
}
public boolean pull(InterpreterOutput out, String image) {
int exit = 0;
try {
exit = runCommand(out, "docker", "pull", image);
} catch (IOException | InterruptedException e) {
logger.error(e.getMessage(), e);
throw new InterpreterException(e);
}
return exit == 0;
}
protected int runCommand(InterpreterOutput out, String ... command)
throws IOException, InterruptedException {
ProcessBuilder builder = new ProcessBuilder(command);
builder.redirectErrorStream(true);
Process process = builder.start();
InputStream stdout = process.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
String line;
while ((line = br.readLine()) != null) {
out.write(line + "\n");
}
int r = process.waitFor(); // Let the process finish.
return r;
}
}

View file

@ -60,6 +60,7 @@ public class PythonInterpreter extends Interpreter {
private int maxResult;
PythonProcess process = null;
private String pythonCommand = null;
public PythonInterpreter(Properties property) {
super(property);
@ -199,10 +200,9 @@ public class PythonInterpreter extends Interpreter {
public PythonProcess getPythonProcess() {
if (process == null) {
PythonCondaInterpreter conda = getCondaInterpreter();
String binPath = getProperty(ZEPPELIN_PYTHON);
if (conda != null && conda.getPythonCommand() != null) {
binPath = conda.getPythonCommand();
if (pythonCommand != null) {
binPath = pythonCommand;
}
return new PythonProcess(binPath);
} else {
@ -210,6 +210,14 @@ public class PythonInterpreter extends Interpreter {
}
}
public void setPythonCommand(String cmd) {
pythonCommand = cmd;
}
public String getPythonCommand() {
return pythonCommand;
}
private Job getRunningJob(String paragraphId) {
Job foundJob = null;
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
@ -284,25 +292,4 @@ public class PythonInterpreter extends Interpreter {
public int getMaxResult() {
return maxResult;
}
private PythonCondaInterpreter getCondaInterpreter() {
LazyOpenInterpreter lazy = null;
PythonCondaInterpreter conda = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(
PythonCondaInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
conda = (PythonCondaInterpreter) p;
if (lazy != null) {
lazy.open();
}
return conda;
}
}

View file

@ -0,0 +1,27 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<h4>Usage</h4>
<div>
Activate an docker environment (python interpreter will be restarted)
<pre>%python.docker activate [Repository]
%python.docker activate [Repository:Tag]
%python.docker activate [Image Id]</pre>
</div>
<div>
Deactivate
<pre>%python.docker deactivate</pre>
</div>
<div>
Example
<pre># Run python interpreter with latest tensorflow image
%python.docker activate gcr.io/tensorflow/tensorflow:latest</pre>
</div>

View file

@ -77,7 +77,7 @@ public class PythonCondaInterpreterTest implements InterpreterOutputListener {
conda.interpret("activate env", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
assertEquals("conda run -n env \"python -iu\"", conda.getPythonCommand());
verify(python).setPythonCommand("conda run -n env \"python -iu\"");
}
@Test
@ -86,7 +86,7 @@ public class PythonCondaInterpreterTest implements InterpreterOutputListener {
conda.interpret("deactivate", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
assertEquals(null, conda.getPythonCommand());
verify(python).setPythonCommand(null);
}
private InterpreterContext getInterpreterContext() {

View file

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
public class PythonDockerInterpreterTest implements InterpreterOutputListener {
private PythonDockerInterpreter docker;
private PythonInterpreter python;
@Before
public void setUp() {
docker = spy(new PythonDockerInterpreter(new Properties()));
python = mock(PythonInterpreter.class);
InterpreterGroup group = new InterpreterGroup();
group.put("note", Arrays.asList(python, docker));
python.setInterpreterGroup(group);
docker.setInterpreterGroup(group);
doReturn(true).when(docker).pull(any(InterpreterOutput.class), anyString());
doReturn(python).when(docker).getPythonInterpreter();
}
@Test
public void testActivateEnv() {
InterpreterContext context = getInterpreterContext();
docker.interpret("activate env", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
verify(docker, times(1)).pull(any(InterpreterOutput.class), anyString());
verify(python).setPythonCommand("docker run -i --rm env python -iu");
}
@Test
public void testDeactivate() {
InterpreterContext context = getInterpreterContext();
docker.interpret("deactivate", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
verify(python).setPythonCommand(null);
}
private InterpreterContext getInterpreterContext() {
return new InterpreterContext(
"noteId",
"paragraphId",
"paragraphTitle",
"paragraphText",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
null,
null,
null,
new InterpreterOutput(this));
}
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}

View file

@ -538,6 +538,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.python.PythonInterpreter,"
+ "org.apache.zeppelin.python.PythonInterpreterPandasSql,"
+ "org.apache.zeppelin.python.PythonCondaInterpreter,"
+ "org.apache.zeppelin.python.PythonDockerInterpreter,"
+ "org.apache.zeppelin.ignite.IgniteInterpreter,"
+ "org.apache.zeppelin.ignite.IgniteSqlInterpreter,"
+ "org.apache.zeppelin.lens.LensInterpreter,"

View file

@ -58,8 +58,7 @@ public class ParagraphTest {
assertEquals("md", Paragraph.getRequiredReplName(text));
assertEquals("", Paragraph.getScriptBody(text));
}
@Test
public void replSingleCharName() {
String text = "%r a";