update python

This commit is contained in:
Jeff Zhang 2021-04-23 14:13:12 +08:00
parent 3c963b6b61
commit d5828d203b
4 changed files with 48 additions and 26 deletions

View file

@ -80,12 +80,12 @@ Zeppelin supports python language which is very popular in data analytics and ma
<tr>
<td>zeppelin.yarn.dist.archives</td>
<td></td>
<td>You can specify conda pack archive files via this property</td>
<td>Comma separated list of archives to be extracted into the working directory of interpreter. e.g. You can specify conda pack archive files via this property in python's yarn mode</td>
</tr>
<tr>
<td>zeppelin.interpreter.conda.env.name</td>
<td>environment</td>
<td>conda environment name</td>
<td>conda environment name, aka the folder name in the working directory of interpreter</td>
</tr>
</table>
@ -362,15 +362,19 @@ Python interpreter create a variable `z` which represent `ZeppelinContext` for y
Zeppelin supports to run python interpreter in yarn cluster which means the python interpreter runs in the yarn container.
This can achieve better multi-tenant for python interpreter especially when you already have a hadoop yarn cluster.
In order to run python in yarn cluster. You need to do the following steps
But there's one critical problem to run python in yarn cluster: how to manage the python environment in yarn container. Because yarn cluster is a distributed cluster environemt
which is composed many nodes, and your python interpreter can start in any node. It is not practical to manage python environment in each nodes.
### Step 1 (Optional, but recommended)
So in order to run python in yarn cluster, we would suggest you to use conda to manage your python environment, and Zeppelin can ship your
codna environment to yarn container, so that each python interpreter can has its own python environment.
### Step 1
We would suggest you to use conda pack to create archives of conda environments, and ship it to yarn container. Otherwise python interpreter
will use the python executable in PATH of yarn container.
Here's one example of yml file which could be used to generate a conda environment with python 3 and some useful python libraries.
* Create yml file for conda environment, write the following into file `env_python_3.yml`
* Create yml file for conda environment, write the following content into file `env_python_3.yml`
```text
name: python_3
@ -402,13 +406,11 @@ dependencies:
* Create conda environment via this yml file
```bash
conda env create -f env_python_3.yml
```
* Pack the conda environment
```bash
@ -418,23 +420,21 @@ conda pack -n python_3
### Step 2
Specify the following properties to enable yarn mode for python interpreter, and specify the correct python.
Specify the following properties to enable yarn mode for python interpreter, and specify the correct python environment.
```
zeppelin.interpreter.launcher yarn
zeppelin.yarn.dist.archives /home/hadoop/python_3.tar.gz#environment
zeppelin.interpreter.conda.env.name environment
zeppelin.python ./environment/bin/python
```
`zeppelin.yarn.dist.archives` is the python conda environment tar which is created in step 1.
This tar will be shipped to yarn container and untar in the working directory of yarn container.
`environment` is the folder after untar. And you need to specify `zeppelin.python` as `./environment/bin/python` which is the
`environment` is the folder name after untar. Also you need to specify `zeppelin.python` as `./environment/bin/python` which is the
python path of the conda environment in yarn container.
After these setting, when you run python interpreter, Zeppelin will launch the python interpreter in a yarn container.
## Python environments (used for non-yarn mode)
### Default

View file

@ -77,8 +77,8 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
// working directory of jupyter kernel
protected File kernelWorkDir;
// python executable file for launching the jupyter kernel
private String pythonExecutable;
private String condaEnv;
protected String pythonExecutable;
protected String condaEnv;
private int kernelLaunchTimeout;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
@ -115,7 +115,14 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
// JupyterKernelInterpreter might already been opened
return;
}
pythonExecutable = getProperty("zeppelin.python", "python");
String envName = getProperty("zeppelin.interpreter.conda.env.name");
if (StringUtils.isNotBlank(envName)) {
activateCondaEnv(envName);
} else {
pythonExecutable = getProperty("zeppelin.python", "python");
}
LOGGER.info("Python Exec: {}", pythonExecutable);
String checkPrerequisiteResult = checkKernelPrerequisite(pythonExecutable);
if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
@ -188,19 +195,18 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
return "";
}
private void activateCondaEnv() {
String envName = getProperty("zeppelin.interpreter.conda.env.name", "environment");
private void activateCondaEnv(String envName) throws IOException {
LOGGER.info("Activating conda env: {}", envName);
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
PumpStreamHandler psh = new PumpStreamHandler(stdout);
try {
if (!new File(envName).exists()) {
LOGGER.info("Skip activating conda env because no environment folder: {}", envName);
return;
throw new IOException("Fail to activating conda env because no environment folder: " +
envName);
}
File scriptFile = Files.createTempFile("zeppelin_jupyter_kernel_", ".sh").toFile();
try (FileWriter writer = new FileWriter(scriptFile)) {
IOUtils.write(String.format("chmod 777 -R %s\nsource %s/bin/activate\nconda-unpack",
IOUtils.write(String.format("chmod 777 -R %s \nsource %s/bin/activate \nconda-unpack",
envName, envName),
writer);
}
@ -211,21 +217,21 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
executor.setStreamHandler(psh);
int exitCode = executor.execute(cmd);
if (exitCode != 0) {
LOGGER.warn("Fail to activate conda env, {}", stdout.toString());
throw new IOException("Fail to activate conda env, " + stdout.toString());
} else {
LOGGER.info("Activate conda env successfully");
this.condaEnv = envName;
this.pythonExecutable = envName + "/bin/python";
}
} catch (Exception e) {
LOGGER.warn("Fail to activate conda env: {}, exception: {}", stdout.toString(), e);
throw new IOException("Fail to activate conda env: " + envName +
" exception: " + stdout.toString());
}
}
private void launchJupyterKernel(int kernelPort)
throws IOException {
activateCondaEnv();
LOGGER.info("Launching Jupyter Kernel at port: {}", kernelPort);
// copy the python scripts to a temp directory, then launch jupyter kernel in that folder
this.kernelWorkDir = Files.createTempDirectory(
@ -258,7 +264,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
private File buildBootstrapScriptFile(int kernelPort) throws IOException {
StringBuilder builder = new StringBuilder();
if (condaEnv != null) {
builder.append("source activate " + condaEnv + "/bin/activate\n");
builder.append("source " + condaEnv + "/bin/activate\n");
}
builder.append(pythonExecutable);
builder.append(" " + kernelWorkDir.getAbsolutePath() + "/kernel_server.py");

View file

@ -67,7 +67,7 @@ public class NotebookSocket extends WebSocketAdapter {
}
public synchronized void send(String serializeMessage) throws IOException {
connection.getRemote().sendStringByFuture(serializeMessage);
connection.getRemote().sendString(serializeMessage);
}
public String getUser() {

View file

@ -88,6 +88,15 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
setupPropertiesForPySpark(sparkProperties);
setupPropertiesForSparkR(sparkProperties);
String condaEnvName = context.getProperties().getProperty("zeppelin.interpreter.conda.env.name");
if (StringUtils.isNotBlank(condaEnvName)) {
if (!isYarnCluster()) {
throw new IOException("zeppelin.interpreter.conda.env.name only works for yarn-cluster mode");
}
sparkProperties.setProperty("spark.pyspark.python", condaEnvName + "/bin/python");
}
if (isYarnMode() && getDeployMode().equals("cluster")) {
env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false");
@ -397,4 +406,11 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
return getSparkMaster().startsWith("yarn");
}
private boolean isYarnCluster() {
return isYarnMode() && "cluster".equalsIgnoreCase(getDeployMode());
}
private boolean isYarnClient() {
return isYarnMode() && "client".equalsIgnoreCase(getDeployMode());
}
}