[ZEPPELIN-5330]. Support conda env for python interpreter in yarn mode

This commit is contained in:
Jeff Zhang 2021-04-16 17:52:31 +08:00
parent 628f064fc4
commit d27f580216
3 changed files with 184 additions and 6 deletions

View file

@ -77,6 +77,16 @@ Zeppelin supports python language which is very popular in data analytics and ma
IPython is only used in <code>%python.ipython</code>.
</td>
</tr>
<tr>
<td>zeppelin.yarn.dist.archives</td>
<td></td>
<td>You can specify conda pack archive files via this property</td>
</tr>
<tr>
<td>zeppelin.interpreter.conda.env.name</td>
<td>environment</td>
<td>conda environment name</td>
</tr>
</table>
@ -347,7 +357,85 @@ Python interpreter create a variable `z` which represent `ZeppelinContext` for y
</tr>
</table>
## Python environments
## Run Python in yarn cluster
Zeppelin supports to run python interpreter in yarn cluster which means the python interpreter runs in the yarn container.
This can achieve better multi-tenant for python interpreter especially when you already have a hadoop yarn cluster.
In order to run python in yarn cluster. You need to do the following steps
### Step 1 (Optional, but recommended)
We would suggest you to use conda pack to create archives of conda environments, and ship it to yarn container. Otherwise python interpreter
will use the python executable in PATH of yarn container.
Here's one example of yml file which could be used to generate a conda environment with python 3 and some useful python libraries.
* Create yml file for conda environment, write the following into file `env_python_3.yml`
```text
name: python_3
channels:
- conda-forge
- defaults
dependencies:
- python=3.7
- pycodestyle=2.5.0
- numpy=1.17.3
- pandas=0.25.0
- scipy=1.3.1
- grpcio=1.22.0
- hvplot=0.5.2
- protobuf=3.10.0
- pandasql=0.7.3
- ipython=7.8.0
- matplotlib=3.0.3
- ipykernel=5.1.2
- jupyter_client=5.3.4
- bokeh=1.3.4
- panel=0.6.0
- holoviews=1.12.3
- pip
- pip:
- bkzep==0.6.1
```
* Create conda environment via this yml file
```bash
conda env create -f env_python_3.yml
```
* Pack the conda environment
```bash
conda pack -n python_3
```
### Step 2
Specify the following properties to enable yarn mode for python interpreter, and specify the correct python.
```
zeppelin.interpreter.launcher yarn
zeppelin.yarn.dist.archives /home/hadoop/python_3.tar.gz#environment
zeppelin.python ./environment/bin/python
```
`zeppelin.yarn.dist.archives` is the python conda environment tar which is created in step 1.
This tar will be shipped to yarn container and untar in the working directory of yarn container.
`environment` is the folder after untar. And you need to specify `zeppelin.python` as `./environment/bin/python` which is the
python path of the conda environment in yarn container.
After these setting, when you run python interpreter, Zeppelin will launch the python interpreter in a yarn container.
## Python environments (used for non-yarn mode)
### Default
By default, PythonInterpreter will use python command defined in `zeppelin.python` property to run python process.

View file

@ -19,6 +19,8 @@ package org.apache.zeppelin.jupyter;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@ -45,8 +47,10 @@ import org.apache.zeppelin.interpreter.util.ProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
@ -183,8 +187,43 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
return "";
}
private void activateCondaEnv() {
String envName = getProperty("zeppelin.interpreter.conda.env.name", "environment");
LOGGER.info("Activating conda env: {}", envName);
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
PumpStreamHandler psh = new PumpStreamHandler(stdout);
try {
if (!new File(envName).exists()) {
LOGGER.info("Skip activating conda env because no environment folder: {}", envName);
return;
}
File scriptFile = Files.createTempFile("zeppelin_jupyter_kernel_", ".sh").toFile();
try (FileWriter writer = new FileWriter(scriptFile)) {
IOUtils.write(String.format("chmod 777 -R %s\nsource %s/bin/activate\nconda-unpack",
envName, envName),
writer);
}
scriptFile.setExecutable(true, false);
scriptFile.setReadable(true, false);
CommandLine cmd = new CommandLine(scriptFile.getAbsolutePath());
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(psh);
int exitCode = executor.execute(cmd);
if (exitCode != 0) {
LOGGER.warn("Fail to activate conda env, {}", stdout.toString());
} else {
LOGGER.info("Activate conda env successfully");
}
} catch (Exception e) {
LOGGER.warn("Fail to activate conda env: {}, exception: {}", stdout.toString(), e);
}
}
private void launchJupyterKernel(int kernelPort)
throws IOException {
activateCondaEnv();
LOGGER.info("Launching Jupyter Kernel at port: {}", kernelPort);
// copy the python scripts to a temp directory, then launch jupyter kernel in that folder
this.kernelWorkDir = Files.createTempDirectory(

View file

@ -57,6 +57,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -101,15 +103,15 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
Map<String, String> envs,
int connectTimeout,
int connectionPoolSize) {
super(connectTimeout, connectionPoolSize, launchContext.getIntpEventServerHost(), launchContext.getIntpEventServerPort());
super(connectTimeout,
connectionPoolSize,
launchContext.getIntpEventServerHost(),
launchContext.getIntpEventServerPort());
this.zConf = ZeppelinConfiguration.create();
this.launchContext = launchContext;
this.properties = properties;
this.envs = envs;
yarnClient = YarnClient.createYarnClient();
this.hadoopConf = new YarnConfiguration();
// Add core-site.xml and yarn-site.xml. This is for integration test where using MiniHadoopCluster.
if (properties.containsKey("HADOOP_CONF_DIR") &&
!org.apache.commons.lang3.StringUtils.isBlank(properties.getProperty("HADOOP_CONF_DIR"))) {
@ -117,12 +119,14 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
if (hadoopConfDir.exists() && hadoopConfDir.isDirectory()) {
File coreSite = new File(hadoopConfDir, "core-site.xml");
try {
LOGGER.info("Adding resource: {}", coreSite.getAbsolutePath());
this.hadoopConf.addResource(coreSite.toURI().toURL());
} catch (MalformedURLException e) {
LOGGER.warn("Fail to add core-site.xml: " + coreSite.getAbsolutePath(), e);
}
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
try {
LOGGER.info("Adding resource: {}", yarnSite.getAbsolutePath());
this.hadoopConf.addResource(yarnSite.toURI().toURL());
} catch (MalformedURLException e) {
LOGGER.warn("Fail to add yarn-site.xml: " + yarnSite.getAbsolutePath(), e);
@ -133,6 +137,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
}
}
yarnClient = YarnClient.createYarnClient();
yarnClient.init(this.hadoopConf);
yarnClient.start();
try {
@ -228,7 +233,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
setQueue(appContext);
appContext.setApplicationId(appId);
setApplicationName(appContext);
appContext.setApplicationType("ZEPPELIN INTERPRETER");
appContext.setApplicationType("Zeppelin Interpreter");
appContext.setMaxAppAttempts(1);
ContainerLaunchContext amContainer = setUpAMLaunchContext();
@ -242,12 +247,14 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
// Set the resources to localize
this.stagingDir = new Path(fs.getHomeDirectory() + "/.zeppelinStaging", appId.toString());
LOGGER.info("Use staging directory: {}", this.stagingDir);
Map<String, LocalResource> localResources = new HashMap<>();
File interpreterZip = createInterpreterZip();
Path srcPath = localFs.makeQualified(new Path(interpreterZip.toURI()));
Path destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "zeppelin");
LOGGER.info("Add zeppelin archive: {}", destPath);
FileUtils.forceDelete(interpreterZip);
// TODO(zjffdu) Should not add interpreter specific logic here.
@ -266,6 +273,30 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "hive_conf");
}
}
String yarnDistArchives = launchContext.getProperties().getProperty("zeppelin.yarn.dist.archives");
if (org.apache.commons.lang3.StringUtils.isNotBlank(yarnDistArchives)) {
for (String localArchive : yarnDistArchives.split(",")) {
URI localURI = resolveURI(localArchive);
srcPath = localFs.makeQualified(new Path(localURI));
destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
String linkName = srcPath.getName();
if (localURI.getFragment() != null) {
linkName = localURI.getFragment();
}
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, linkName);
}
}
String yarnDistFiles = launchContext.getProperties().getProperty("zeppelin.yarn.dist.files");
if (org.apache.commons.lang3.StringUtils.isNotBlank(yarnDistFiles)) {
for (String localFile : yarnDistFiles.split(",")) {
srcPath = localFs.makeQualified(new Path(localFile));
destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
addResource(fs, destPath, localResources, LocalResourceType.FILE, srcPath.getName());
LOGGER.info("Add dist file: {}", destPath);
}
}
amContainer.setLocalResources(localResources);
// Setup the command to run the AM
@ -316,6 +347,25 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
return amContainer;
}
private URI resolveURI(String path) {
try {
URI uri = new URI(path);
if (uri.getScheme() != null) {
return uri;
}
// make sure to handle if the path has a fragment (applies to yarn
// distributed cache)
if (uri.getFragment() != null) {
URI absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI();
return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(),
uri.getFragment());
}
} catch (URISyntaxException e) {
LOGGER.warn("Exception when resolveURI: {}, cause : {}", path, e);
}
return new File(path).getAbsoluteFile().toURI();
}
/**
* Populate the classpath entry in the given environment map with any application
* classpath specified through the Hadoop and Yarn configurations.
@ -534,6 +584,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
LocalResourceType resourceType,
String link) throws IOException {
LOGGER.info("Add resource: {}, type: {}, link: {}", destPath, resourceType, link);
FileStatus destStatus = fs.getFileStatus(destPath);
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
amJarRsrc.setType(resourceType);