address comments

This commit is contained in:
Jeff Zhang 2021-06-02 11:51:32 +08:00
parent f7e6b13d4d
commit e4a6c7146a
4 changed files with 67 additions and 49 deletions

View file

@ -199,33 +199,29 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
LOGGER.info("Activating conda env: {}", envName);
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
PumpStreamHandler psh = new PumpStreamHandler(stdout);
try {
if (!new File(envName).exists()) {
throw new IOException("Fail to activating conda env because no environment folder: " +
envName);
}
File scriptFile = Files.createTempFile("zeppelin_jupyter_kernel_", ".sh").toFile();
try (FileWriter writer = new FileWriter(scriptFile)) {
IOUtils.write(String.format("chmod 777 -R %s \nsource %s/bin/activate \nconda-unpack",
envName, envName),
writer);
}
scriptFile.setExecutable(true, false);
scriptFile.setReadable(true, false);
CommandLine cmd = new CommandLine(scriptFile.getAbsolutePath());
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(psh);
int exitCode = executor.execute(cmd);
if (exitCode != 0) {
throw new IOException("Fail to activate conda env, " + stdout.toString());
} else {
LOGGER.info("Activate conda env successfully");
this.condaEnv = envName;
this.pythonExecutable = envName + "/bin/python";
}
} catch (Exception e) {
throw new IOException("Fail to activate conda env: " + envName +
" exception: " + stdout.toString());
if (!new File(envName).exists()) {
throw new IOException("Fail to activating conda env because no environment folder: " +
envName);
}
File scriptFile = Files.createTempFile("zeppelin_jupyter_kernel_", ".sh").toFile();
try (FileWriter writer = new FileWriter(scriptFile)) {
IOUtils.write(String.format("chmod 777 -R %s \nsource %s/bin/activate \nconda-unpack",
envName, envName),
writer);
}
scriptFile.setExecutable(true, false);
scriptFile.setReadable(true, false);
CommandLine cmd = new CommandLine(scriptFile.getAbsolutePath());
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(psh);
int exitCode = executor.execute(cmd);
if (exitCode != 0) {
throw new IOException("Fail to activate conda env, " + stdout.toString());
} else {
LOGGER.info("Activate conda env successfully");
this.condaEnv = envName;
this.pythonExecutable = envName + "/bin/python";
}
}

View file

@ -24,9 +24,9 @@ import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
public class URIUtil {
public class YarnLauncherUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(URIUtil.class);
private static final Logger LOGGER = LoggerFactory.getLogger(YarnLauncherUtil.class);
public static URI resolveURI(String path) {
try {

View file

@ -277,7 +277,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
String yarnDistArchives = launchContext.getProperties().getProperty("zeppelin.yarn.dist.archives");
if (StringUtils.isNotBlank(yarnDistArchives)) {
for (String localArchive : yarnDistArchives.split(",")) {
URI localURI = resolveURI(localArchive);
URI localURI = YarnLauncherUtil.resolveURI(localArchive);
srcPath = localFs.makeQualified(new Path(localURI));
destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
String linkName = srcPath.getName();
@ -347,25 +347,6 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
return amContainer;
}
private URI resolveURI(String path) {
try {
URI uri = new URI(path);
if (uri.getScheme() != null) {
return uri;
}
// make sure to handle if the path has a fragment (applies to yarn
// distributed cache)
if (uri.getFragment() != null) {
URI absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI();
return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(),
uri.getFragment());
}
} catch (URISyntaxException e) {
LOGGER.warn("Exception when resolveURI: {}, cause : {}", path, e);
}
return new File(path).getAbsoluteFile().toURI();
}
/**
* Populate the classpath entry in the given environment map with any application
* classpath specified through the Hadoop and Yarn configurations.

View file

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import org.junit.Test;
import java.net.URI;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class YarnLauncherUtilTest {
@Test
public void testURIUtil() {
URI uri = YarnLauncherUtil.resolveURI("/tmp/env_1");
assertEquals("file", uri.getScheme());
assertEquals("/tmp/env_1", uri.getPath());
assertNull(uri.getFragment());
uri = YarnLauncherUtil.resolveURI("/tmp/env_1#env");
assertEquals("file", uri.getScheme());
assertEquals("/tmp/env_1", uri.getPath());
assertEquals("env", uri.getFragment());
}
}