mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
[ZEPPELIN-5330]. Support conda env for python interpreter in yarn mode
This commit is contained in:
parent
628f064fc4
commit
d27f580216
3 changed files with 184 additions and 6 deletions
|
|
@ -77,6 +77,16 @@ Zeppelin supports python language which is very popular in data analytics and ma
|
|||
IPython is only used in <code>%python.ipython</code>.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>zeppelin.yarn.dist.archives</td>
|
||||
<td></td>
|
||||
<td>You can specify conda pack archive files via this property</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>zeppelin.interpreter.conda.env.name</td>
|
||||
<td>environment</td>
|
||||
<td>conda environment name</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
|
||||
|
|
@ -347,7 +357,85 @@ Python interpreter create a variable `z` which represent `ZeppelinContext` for y
|
|||
</tr>
|
||||
</table>
|
||||
|
||||
## Python environments
|
||||
## Run Python in yarn cluster
|
||||
|
||||
Zeppelin supports to run python interpreter in yarn cluster which means the python interpreter runs in the yarn container.
|
||||
This can achieve better multi-tenant for python interpreter especially when you already have a hadoop yarn cluster.
|
||||
|
||||
In order to run python in yarn cluster. You need to do the following steps
|
||||
|
||||
### Step 1 (Optional, but recommended)
|
||||
We would suggest you to use conda pack to create archives of conda environments, and ship it to yarn container. Otherwise python interpreter
|
||||
will use the python executable in PATH of yarn container.
|
||||
|
||||
Here's one example of yml file which could be used to generate a conda environment with python 3 and some useful python libraries.
|
||||
|
||||
* Create yml file for conda environment, write the following into file `env_python_3.yml`
|
||||
|
||||
```text
|
||||
name: python_3
|
||||
channels:
|
||||
- conda-forge
|
||||
- defaults
|
||||
dependencies:
|
||||
- python=3.7
|
||||
- pycodestyle=2.5.0
|
||||
- numpy=1.17.3
|
||||
- pandas=0.25.0
|
||||
- scipy=1.3.1
|
||||
- grpcio=1.22.0
|
||||
- hvplot=0.5.2
|
||||
- protobuf=3.10.0
|
||||
- pandasql=0.7.3
|
||||
- ipython=7.8.0
|
||||
- matplotlib=3.0.3
|
||||
- ipykernel=5.1.2
|
||||
- jupyter_client=5.3.4
|
||||
- bokeh=1.3.4
|
||||
- panel=0.6.0
|
||||
- holoviews=1.12.3
|
||||
- pip
|
||||
- pip:
|
||||
- bkzep==0.6.1
|
||||
|
||||
```
|
||||
|
||||
* Create conda environment via this yml file
|
||||
|
||||
|
||||
```bash
|
||||
|
||||
conda env create -f env_python_3.yml
|
||||
```
|
||||
|
||||
|
||||
* Pack the conda environment
|
||||
|
||||
```bash
|
||||
|
||||
conda pack -n python_3
|
||||
```
|
||||
|
||||
### Step 2
|
||||
|
||||
Specify the following properties to enable yarn mode for python interpreter, and specify the correct python.
|
||||
|
||||
```
|
||||
zeppelin.interpreter.launcher yarn
|
||||
zeppelin.yarn.dist.archives /home/hadoop/python_3.tar.gz#environment
|
||||
zeppelin.python ./environment/bin/python
|
||||
```
|
||||
|
||||
`zeppelin.yarn.dist.archives` is the python conda environment tar which is created in step 1.
|
||||
This tar will be shipped to yarn container and untar in the working directory of yarn container.
|
||||
`environment` is the folder after untar. And you need to specify `zeppelin.python` as `./environment/bin/python` which is the
|
||||
python path of the conda environment in yarn container.
|
||||
|
||||
|
||||
After these setting, when you run python interpreter, Zeppelin will launch the python interpreter in a yarn container.
|
||||
|
||||
|
||||
## Python environments (used for non-yarn mode)
|
||||
|
||||
### Default
|
||||
By default, PythonInterpreter will use python command defined in `zeppelin.python` property to run python process.
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ package org.apache.zeppelin.jupyter;
|
|||
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
|
@ -45,8 +47,10 @@ import org.apache.zeppelin.interpreter.util.ProcessLauncher;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
|
@ -183,8 +187,43 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
|
|||
return "";
|
||||
}
|
||||
|
||||
private void activateCondaEnv() {
|
||||
String envName = getProperty("zeppelin.interpreter.conda.env.name", "environment");
|
||||
LOGGER.info("Activating conda env: {}", envName);
|
||||
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
|
||||
PumpStreamHandler psh = new PumpStreamHandler(stdout);
|
||||
try {
|
||||
if (!new File(envName).exists()) {
|
||||
LOGGER.info("Skip activating conda env because no environment folder: {}", envName);
|
||||
return;
|
||||
}
|
||||
File scriptFile = Files.createTempFile("zeppelin_jupyter_kernel_", ".sh").toFile();
|
||||
try (FileWriter writer = new FileWriter(scriptFile)) {
|
||||
IOUtils.write(String.format("chmod 777 -R %s\nsource %s/bin/activate\nconda-unpack",
|
||||
envName, envName),
|
||||
writer);
|
||||
}
|
||||
scriptFile.setExecutable(true, false);
|
||||
scriptFile.setReadable(true, false);
|
||||
CommandLine cmd = new CommandLine(scriptFile.getAbsolutePath());
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
executor.setStreamHandler(psh);
|
||||
int exitCode = executor.execute(cmd);
|
||||
if (exitCode != 0) {
|
||||
LOGGER.warn("Fail to activate conda env, {}", stdout.toString());
|
||||
} else {
|
||||
LOGGER.info("Activate conda env successfully");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Fail to activate conda env: {}, exception: {}", stdout.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void launchJupyterKernel(int kernelPort)
|
||||
throws IOException {
|
||||
|
||||
activateCondaEnv();
|
||||
|
||||
LOGGER.info("Launching Jupyter Kernel at port: {}", kernelPort);
|
||||
// copy the python scripts to a temp directory, then launch jupyter kernel in that folder
|
||||
this.kernelWorkDir = Files.createTempDirectory(
|
||||
|
|
|
|||
|
|
@ -57,6 +57,8 @@ import java.io.File;
|
|||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
|
@ -101,15 +103,15 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
Map<String, String> envs,
|
||||
int connectTimeout,
|
||||
int connectionPoolSize) {
|
||||
super(connectTimeout, connectionPoolSize, launchContext.getIntpEventServerHost(), launchContext.getIntpEventServerPort());
|
||||
super(connectTimeout,
|
||||
connectionPoolSize,
|
||||
launchContext.getIntpEventServerHost(),
|
||||
launchContext.getIntpEventServerPort());
|
||||
this.zConf = ZeppelinConfiguration.create();
|
||||
this.launchContext = launchContext;
|
||||
this.properties = properties;
|
||||
this.envs = envs;
|
||||
|
||||
yarnClient = YarnClient.createYarnClient();
|
||||
this.hadoopConf = new YarnConfiguration();
|
||||
|
||||
// Add core-site.xml and yarn-site.xml. This is for integration test where using MiniHadoopCluster.
|
||||
if (properties.containsKey("HADOOP_CONF_DIR") &&
|
||||
!org.apache.commons.lang3.StringUtils.isBlank(properties.getProperty("HADOOP_CONF_DIR"))) {
|
||||
|
|
@ -117,12 +119,14 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
if (hadoopConfDir.exists() && hadoopConfDir.isDirectory()) {
|
||||
File coreSite = new File(hadoopConfDir, "core-site.xml");
|
||||
try {
|
||||
LOGGER.info("Adding resource: {}", coreSite.getAbsolutePath());
|
||||
this.hadoopConf.addResource(coreSite.toURI().toURL());
|
||||
} catch (MalformedURLException e) {
|
||||
LOGGER.warn("Fail to add core-site.xml: " + coreSite.getAbsolutePath(), e);
|
||||
}
|
||||
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
|
||||
try {
|
||||
LOGGER.info("Adding resource: {}", yarnSite.getAbsolutePath());
|
||||
this.hadoopConf.addResource(yarnSite.toURI().toURL());
|
||||
} catch (MalformedURLException e) {
|
||||
LOGGER.warn("Fail to add yarn-site.xml: " + yarnSite.getAbsolutePath(), e);
|
||||
|
|
@ -133,6 +137,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
}
|
||||
}
|
||||
|
||||
yarnClient = YarnClient.createYarnClient();
|
||||
yarnClient.init(this.hadoopConf);
|
||||
yarnClient.start();
|
||||
try {
|
||||
|
|
@ -228,7 +233,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
setQueue(appContext);
|
||||
appContext.setApplicationId(appId);
|
||||
setApplicationName(appContext);
|
||||
appContext.setApplicationType("ZEPPELIN INTERPRETER");
|
||||
appContext.setApplicationType("Zeppelin Interpreter");
|
||||
appContext.setMaxAppAttempts(1);
|
||||
|
||||
ContainerLaunchContext amContainer = setUpAMLaunchContext();
|
||||
|
|
@ -242,12 +247,14 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
|
||||
// Set the resources to localize
|
||||
this.stagingDir = new Path(fs.getHomeDirectory() + "/.zeppelinStaging", appId.toString());
|
||||
LOGGER.info("Use staging directory: {}", this.stagingDir);
|
||||
Map<String, LocalResource> localResources = new HashMap<>();
|
||||
|
||||
File interpreterZip = createInterpreterZip();
|
||||
Path srcPath = localFs.makeQualified(new Path(interpreterZip.toURI()));
|
||||
Path destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
|
||||
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "zeppelin");
|
||||
LOGGER.info("Add zeppelin archive: {}", destPath);
|
||||
FileUtils.forceDelete(interpreterZip);
|
||||
|
||||
// TODO(zjffdu) Should not add interpreter specific logic here.
|
||||
|
|
@ -266,6 +273,30 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "hive_conf");
|
||||
}
|
||||
}
|
||||
|
||||
String yarnDistArchives = launchContext.getProperties().getProperty("zeppelin.yarn.dist.archives");
|
||||
if (org.apache.commons.lang3.StringUtils.isNotBlank(yarnDistArchives)) {
|
||||
for (String localArchive : yarnDistArchives.split(",")) {
|
||||
URI localURI = resolveURI(localArchive);
|
||||
srcPath = localFs.makeQualified(new Path(localURI));
|
||||
destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
|
||||
String linkName = srcPath.getName();
|
||||
if (localURI.getFragment() != null) {
|
||||
linkName = localURI.getFragment();
|
||||
}
|
||||
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, linkName);
|
||||
}
|
||||
}
|
||||
String yarnDistFiles = launchContext.getProperties().getProperty("zeppelin.yarn.dist.files");
|
||||
if (org.apache.commons.lang3.StringUtils.isNotBlank(yarnDistFiles)) {
|
||||
for (String localFile : yarnDistFiles.split(",")) {
|
||||
srcPath = localFs.makeQualified(new Path(localFile));
|
||||
destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
|
||||
addResource(fs, destPath, localResources, LocalResourceType.FILE, srcPath.getName());
|
||||
LOGGER.info("Add dist file: {}", destPath);
|
||||
}
|
||||
}
|
||||
|
||||
amContainer.setLocalResources(localResources);
|
||||
|
||||
// Setup the command to run the AM
|
||||
|
|
@ -316,6 +347,25 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
return amContainer;
|
||||
}
|
||||
|
||||
private URI resolveURI(String path) {
|
||||
try {
|
||||
URI uri = new URI(path);
|
||||
if (uri.getScheme() != null) {
|
||||
return uri;
|
||||
}
|
||||
// make sure to handle if the path has a fragment (applies to yarn
|
||||
// distributed cache)
|
||||
if (uri.getFragment() != null) {
|
||||
URI absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI();
|
||||
return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(),
|
||||
uri.getFragment());
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
LOGGER.warn("Exception when resolveURI: {}, cause : {}", path, e);
|
||||
}
|
||||
return new File(path).getAbsoluteFile().toURI();
|
||||
}
|
||||
|
||||
/**
|
||||
* Populate the classpath entry in the given environment map with any application
|
||||
* classpath specified through the Hadoop and Yarn configurations.
|
||||
|
|
@ -534,6 +584,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
LocalResourceType resourceType,
|
||||
String link) throws IOException {
|
||||
|
||||
LOGGER.info("Add resource: {}, type: {}, link: {}", destPath, resourceType, link);
|
||||
FileStatus destStatus = fs.getFileStatus(destPath);
|
||||
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
|
||||
amJarRsrc.setType(resourceType);
|
||||
|
|
|
|||
Loading…
Reference in a new issue