This commit is contained in:
Lee moon soo 2018-11-24 16:15:28 +09:00
parent 3078bac550
commit ec09b8b882
4 changed files with 230 additions and 40 deletions

View file

@ -1,5 +1,6 @@
package org.apache.zeppelin.interpreter.launcher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
@ -76,7 +77,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
* Get interpreter pod name
* @return
*/
private String getPodName() {
@VisibleForTesting
String getPodName() {
return podName;
}
@ -237,6 +239,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
}
}
@VisibleForTesting
Properties getTemplateBindings() throws IOException {
Properties k8sProperties = new Properties();
@ -249,7 +252,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
k8sProperties.put("zeppelin.k8s.interpreter.group.name", interpreterGroupName);
k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName);
k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo");
k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", "12321:12321");
k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", String.format("%d:%d", getPort(), getPort()));
k8sProperties.put("zeppelin.k8s.server.rpc.host", zeppelinServiceHost);
k8sProperties.put("zeppelin.k8s.server.rpc.portRange", zeppelinServiceRpcPort);
if (ownerUID() != null && ownerName() != null) {
@ -282,11 +285,13 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
return k8sProperties;
}
private boolean isSpark() {
@VisibleForTesting
boolean isSpark() {
return "spark".equalsIgnoreCase(interpreterGroupName);
}
private String buildSparkSubmitOptions() {
@VisibleForTesting
String buildSparkSubmitOptions() {
StringBuilder options = new StringBuilder();
options.append(" --master k8s://https://kubernetes.default.svc");
@ -294,7 +299,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace());
options.append(" --conf spark.executor.instances=1");
options.append(" --conf spark.driver.pod.name=" + getPodName());
options.append(" --conf spark.kubernetes.container.image=spark:2.4.0");
options.append(" --conf spark.kubernetes.container.image=" + sparkImage);
options.append(" --conf spark.driver.bindAddress=0.0.0.0");
options.append(" --conf spark.driver.host=" + getInterpreterPodDnsName());
options.append(" --conf spark.driver.port=" + String.format("%d", getSparkDriverPort()));
@ -313,7 +318,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
* See xxx-interpreter-pod.yaml
* @return
*/
private int getSparkDriverPort() {
@VisibleForTesting
int getSparkDriverPort() {
return 22321;
}
@ -321,7 +327,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
* See xxx-interpreter-pod.yaml
* @return
*/
private int getSparkBlockmanagerPort() {
@VisibleForTesting
int getSparkBlockmanagerPort() {
return 22322;
}

View file

@ -23,6 +23,8 @@ import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import com.google.common.annotations.VisibleForTesting;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
@ -49,6 +51,12 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
kubectl.setNamespace(getNamespace());
}
@VisibleForTesting
K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage, Kubectl kubectl) {
super(zConf, recoveryStorage);
this.kubectl = kubectl;
}
/**
* Check if i'm running inside of kubernetes or not.

View file

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class K8sRemoteInterpreterProcessTest {
@Test
public void testGetHostPort() {
// given
Kubectl kubectl = mock(Kubectl.class);
when(kubectl.getNamespace()).thenReturn("default");
Properties properties = new Properties();
HashMap<String, String> envs = new HashMap<String, String>();
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
kubectl,
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
"sh",
"shell",
properties,
envs,
"zeppelin.server.hostname",
"12320",
false,
"spark-container:1.0",
10);
// when
String host = intp.getHost();
int port = intp.getPort();
// then
assertEquals(String.format("%s.%s.svc.cluster.local", intp.getPodName(), kubectl.getNamespace()), intp.getHost());
assertEquals(12321, intp.getPort());
}
@Test
public void testPredefinedPortNumbers() {
// given
Kubectl kubectl = mock(Kubectl.class);
when(kubectl.getNamespace()).thenReturn("default");
Properties properties = new Properties();
HashMap<String, String> envs = new HashMap<String, String>();
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
kubectl,
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
"sh",
"shell",
properties,
envs,
"zeppelin.server.hostname",
"12320",
false,
"spark-container:1.0",
10);
// following values are hardcoded in k8s/interpreter/100-interpreter.yaml.
// when change those values, update the yaml file as well.
assertEquals(12321, intp.getPort());
assertEquals(22321, intp.getSparkDriverPort());
assertEquals(22322, intp.getSparkBlockmanagerPort());
}
@Test
public void testGetTemplateBindings() throws IOException {
// given
Kubectl kubectl = mock(Kubectl.class);
when(kubectl.getNamespace()).thenReturn("default");
Properties properties = new Properties();
properties.put("my.key1", "v1");
HashMap<String, String> envs = new HashMap<String, String>();
envs.put("MY_ENV1", "V1");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
kubectl,
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
"sh",
"shell",
properties,
envs,
"zeppelin.server.hostname",
"12320",
false,
"spark-container:1.0",
10);
// when
Properties p = intp.getTemplateBindings();
// then
assertEquals("default", p.get("zeppelin.k8s.namespace"));
assertEquals(intp.getPodName(), p.get("zeppelin.k8s.interpreter.pod.name"));
assertEquals("sh", p.get("zeppelin.k8s.interpreter.container.name"));
assertEquals("interpreter-container:1.0", p.get("zeppelin.k8s.interpreter.container.image"));
assertEquals("shared_process", p.get("zeppelin.k8s.interpreter.group.id"));
assertEquals("sh", p.get("zeppelin.k8s.interpreter.group.name"));
assertEquals("shell", p.get("zeppelin.k8s.interpreter.setting.name"));
assertEquals(true , p.containsKey("zeppelin.k8s.interpreter.localRepo"));
assertEquals("12321:12321" , p.get("zeppelin.k8s.interpreter.rpc.portRange"));
assertEquals("zeppelin.server.hostname" , p.get("zeppelin.k8s.server.rpc.host"));
assertEquals("12320" , p.get("zeppelin.k8s.server.rpc.portRange"));
assertEquals("v1", p.get("my.key1"));
assertEquals("V1", envs.get("MY_ENV1"));
envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
assertEquals(true, envs.containsKey("SERVICE_DOMAIN"));
assertEquals(true, envs.containsKey("ZEPPELIN_HOME"));
}
@Test
public void testGetTemplateBindingsForSpark() throws IOException {
// given
Kubectl kubectl = mock(Kubectl.class);
when(kubectl.getNamespace()).thenReturn("default");
Properties properties = new Properties();
properties.put("my.key1", "v1");
HashMap<String, String> envs = new HashMap<String, String>();
envs.put("MY_ENV1", "V1");
envs.put("SPARK_SUBMIT_OPTIONS", "my options");
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
kubectl,
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
"spark",
"myspark",
properties,
envs,
"zeppelin.server.hostname",
"12320",
false,
"spark-container:1.0",
10);
// when
Properties p = intp.getTemplateBindings();
// then
assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
assertTrue( envs.containsKey("SPARK_HOME"));
String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
assertTrue(sparkSubmitOptions.startsWith("my options "));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace()));
assertTrue(sparkSubmitOptions.contains("spark.driver.pod.name=" + intp.getPodName()));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
}
}

View file

@ -19,17 +19,18 @@ package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* In the future, test may use minikube on travis for end-to-end test
* https://github.com/LiliC/travis-minikube
* https://blog.travis-ci.com/2017-10-26-running-kubernetes-on-travis-ci-with-minikube
*/
@ -42,10 +43,13 @@ public class K8sStandardInterpreterLauncherTest {
}
@Test
public void testTemplate() throws IOException {
public void testK8sLauncher() throws IOException {
// given
Kubectl kubectl = mock(Kubectl.class);
when(kubectl.getNamespace()).thenReturn("default");
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl);
Properties properties = new Properties();
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
@ -64,36 +68,11 @@ public class K8sStandardInterpreterLauncherTest {
"name",
0,
"host");
InterpreterClient client = launcher.launch(context);
K8sRemoteInterpreterProcess k8sintp = (K8sRemoteInterpreterProcess) client;
k8sintp.start("user");
// assertTrue(k8sintp.isRunning());
// k8sintp.stop();
}
/*
@Test
public void testLauncher() throws IOException {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
InterpreterOption option = new InterpreterOption();
option.setUserImpersonate(true);
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
// when
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
assertEquals("name", interpreterProcess.getInterpreterSettingName());
assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(),
interpreterProcess.getConnectTimeout());
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(2, interpreterProcess.getEnv().size());
assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
assertEquals(true, interpreterProcess.isUserImpersonated());
// then
assertTrue(client instanceof K8sRemoteInterpreterProcess);
}
*/
}