mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
add test
This commit is contained in:
parent
3078bac550
commit
ec09b8b882
4 changed files with 230 additions and 40 deletions
|
|
@ -1,5 +1,6 @@
|
|||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
|
|
@ -76,7 +77,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
* Get interpreter pod name
|
||||
* @return
|
||||
*/
|
||||
private String getPodName() {
|
||||
@VisibleForTesting
|
||||
String getPodName() {
|
||||
return podName;
|
||||
}
|
||||
|
||||
|
|
@ -237,6 +239,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Properties getTemplateBindings() throws IOException {
|
||||
Properties k8sProperties = new Properties();
|
||||
|
||||
|
|
@ -249,7 +252,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
k8sProperties.put("zeppelin.k8s.interpreter.group.name", interpreterGroupName);
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName);
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo");
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", "12321:12321");
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", String.format("%d:%d", getPort(), getPort()));
|
||||
k8sProperties.put("zeppelin.k8s.server.rpc.host", zeppelinServiceHost);
|
||||
k8sProperties.put("zeppelin.k8s.server.rpc.portRange", zeppelinServiceRpcPort);
|
||||
if (ownerUID() != null && ownerName() != null) {
|
||||
|
|
@ -282,11 +285,13 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
return k8sProperties;
|
||||
}
|
||||
|
||||
private boolean isSpark() {
|
||||
@VisibleForTesting
|
||||
boolean isSpark() {
|
||||
return "spark".equalsIgnoreCase(interpreterGroupName);
|
||||
}
|
||||
|
||||
private String buildSparkSubmitOptions() {
|
||||
@VisibleForTesting
|
||||
String buildSparkSubmitOptions() {
|
||||
StringBuilder options = new StringBuilder();
|
||||
|
||||
options.append(" --master k8s://https://kubernetes.default.svc");
|
||||
|
|
@ -294,7 +299,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace());
|
||||
options.append(" --conf spark.executor.instances=1");
|
||||
options.append(" --conf spark.driver.pod.name=" + getPodName());
|
||||
options.append(" --conf spark.kubernetes.container.image=spark:2.4.0");
|
||||
options.append(" --conf spark.kubernetes.container.image=" + sparkImage);
|
||||
options.append(" --conf spark.driver.bindAddress=0.0.0.0");
|
||||
options.append(" --conf spark.driver.host=" + getInterpreterPodDnsName());
|
||||
options.append(" --conf spark.driver.port=" + String.format("%d", getSparkDriverPort()));
|
||||
|
|
@ -313,7 +318,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
* See xxx-interpreter-pod.yaml
|
||||
* @return
|
||||
*/
|
||||
private int getSparkDriverPort() {
|
||||
@VisibleForTesting
|
||||
int getSparkDriverPort() {
|
||||
return 22321;
|
||||
}
|
||||
|
||||
|
|
@ -321,7 +327,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
* See xxx-interpreter-pod.yaml
|
||||
* @return
|
||||
*/
|
||||
private int getSparkBlockmanagerPort() {
|
||||
@VisibleForTesting
|
||||
int getSparkBlockmanagerPort() {
|
||||
return 22322;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ import java.net.UnknownHostException;
|
|||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
|
|
@ -49,6 +51,12 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
|
|||
kubectl.setNamespace(getNamespace());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage, Kubectl kubectl) {
|
||||
super(zConf, recoveryStorage);
|
||||
this.kubectl = kubectl;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check if i'm running inside of kubernetes or not.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,196 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class K8sRemoteInterpreterProcessTest {
|
||||
|
||||
@Test
|
||||
public void testGetHostPort() {
|
||||
// given
|
||||
Kubectl kubectl = mock(Kubectl.class);
|
||||
when(kubectl.getNamespace()).thenReturn("default");
|
||||
|
||||
Properties properties = new Properties();
|
||||
HashMap<String, String> envs = new HashMap<String, String>();
|
||||
|
||||
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
|
||||
kubectl,
|
||||
new File(".skip"),
|
||||
"interpreter-container:1.0",
|
||||
"shared_process",
|
||||
"sh",
|
||||
"shell",
|
||||
properties,
|
||||
envs,
|
||||
"zeppelin.server.hostname",
|
||||
"12320",
|
||||
false,
|
||||
"spark-container:1.0",
|
||||
10);
|
||||
|
||||
// when
|
||||
String host = intp.getHost();
|
||||
int port = intp.getPort();
|
||||
|
||||
// then
|
||||
assertEquals(String.format("%s.%s.svc.cluster.local", intp.getPodName(), kubectl.getNamespace()), intp.getHost());
|
||||
assertEquals(12321, intp.getPort());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPredefinedPortNumbers() {
|
||||
// given
|
||||
Kubectl kubectl = mock(Kubectl.class);
|
||||
when(kubectl.getNamespace()).thenReturn("default");
|
||||
|
||||
Properties properties = new Properties();
|
||||
HashMap<String, String> envs = new HashMap<String, String>();
|
||||
|
||||
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
|
||||
kubectl,
|
||||
new File(".skip"),
|
||||
"interpreter-container:1.0",
|
||||
"shared_process",
|
||||
"sh",
|
||||
"shell",
|
||||
properties,
|
||||
envs,
|
||||
"zeppelin.server.hostname",
|
||||
"12320",
|
||||
false,
|
||||
"spark-container:1.0",
|
||||
10);
|
||||
|
||||
|
||||
// following values are hardcoded in k8s/interpreter/100-interpreter.yaml.
|
||||
// when change those values, update the yaml file as well.
|
||||
assertEquals(12321, intp.getPort());
|
||||
assertEquals(22321, intp.getSparkDriverPort());
|
||||
assertEquals(22322, intp.getSparkBlockmanagerPort());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTemplateBindings() throws IOException {
|
||||
// given
|
||||
Kubectl kubectl = mock(Kubectl.class);
|
||||
when(kubectl.getNamespace()).thenReturn("default");
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.put("my.key1", "v1");
|
||||
HashMap<String, String> envs = new HashMap<String, String>();
|
||||
envs.put("MY_ENV1", "V1");
|
||||
|
||||
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
|
||||
kubectl,
|
||||
new File(".skip"),
|
||||
"interpreter-container:1.0",
|
||||
"shared_process",
|
||||
"sh",
|
||||
"shell",
|
||||
properties,
|
||||
envs,
|
||||
"zeppelin.server.hostname",
|
||||
"12320",
|
||||
false,
|
||||
"spark-container:1.0",
|
||||
10);
|
||||
|
||||
// when
|
||||
Properties p = intp.getTemplateBindings();
|
||||
|
||||
// then
|
||||
assertEquals("default", p.get("zeppelin.k8s.namespace"));
|
||||
assertEquals(intp.getPodName(), p.get("zeppelin.k8s.interpreter.pod.name"));
|
||||
assertEquals("sh", p.get("zeppelin.k8s.interpreter.container.name"));
|
||||
assertEquals("interpreter-container:1.0", p.get("zeppelin.k8s.interpreter.container.image"));
|
||||
assertEquals("shared_process", p.get("zeppelin.k8s.interpreter.group.id"));
|
||||
assertEquals("sh", p.get("zeppelin.k8s.interpreter.group.name"));
|
||||
assertEquals("shell", p.get("zeppelin.k8s.interpreter.setting.name"));
|
||||
assertEquals(true , p.containsKey("zeppelin.k8s.interpreter.localRepo"));
|
||||
assertEquals("12321:12321" , p.get("zeppelin.k8s.interpreter.rpc.portRange"));
|
||||
assertEquals("zeppelin.server.hostname" , p.get("zeppelin.k8s.server.rpc.host"));
|
||||
assertEquals("12320" , p.get("zeppelin.k8s.server.rpc.portRange"));
|
||||
assertEquals("v1", p.get("my.key1"));
|
||||
assertEquals("V1", envs.get("MY_ENV1"));
|
||||
|
||||
envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
|
||||
assertEquals(true, envs.containsKey("SERVICE_DOMAIN"));
|
||||
assertEquals(true, envs.containsKey("ZEPPELIN_HOME"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTemplateBindingsForSpark() throws IOException {
|
||||
// given
|
||||
Kubectl kubectl = mock(Kubectl.class);
|
||||
when(kubectl.getNamespace()).thenReturn("default");
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.put("my.key1", "v1");
|
||||
HashMap<String, String> envs = new HashMap<String, String>();
|
||||
envs.put("MY_ENV1", "V1");
|
||||
envs.put("SPARK_SUBMIT_OPTIONS", "my options");
|
||||
envs.put("SERVICE_DOMAIN", "mydomain");
|
||||
|
||||
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
|
||||
kubectl,
|
||||
new File(".skip"),
|
||||
"interpreter-container:1.0",
|
||||
"shared_process",
|
||||
"spark",
|
||||
"myspark",
|
||||
properties,
|
||||
envs,
|
||||
"zeppelin.server.hostname",
|
||||
"12320",
|
||||
false,
|
||||
"spark-container:1.0",
|
||||
10);
|
||||
|
||||
// when
|
||||
Properties p = intp.getTemplateBindings();
|
||||
|
||||
// then
|
||||
assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
|
||||
assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
|
||||
|
||||
envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
|
||||
assertTrue( envs.containsKey("SPARK_HOME"));
|
||||
|
||||
String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
|
||||
assertTrue(sparkSubmitOptions.startsWith("my options "));
|
||||
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace()));
|
||||
assertTrue(sparkSubmitOptions.contains("spark.driver.pod.name=" + intp.getPodName()));
|
||||
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
|
||||
assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
|
||||
assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
|
||||
assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
|
||||
}
|
||||
}
|
||||
|
|
@ -19,17 +19,18 @@ package org.apache.zeppelin.interpreter.launcher;
|
|||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* In the future, test may use minikube on travis for end-to-end test
|
||||
* https://github.com/LiliC/travis-minikube
|
||||
* https://blog.travis-ci.com/2017-10-26-running-kubernetes-on-travis-ci-with-minikube
|
||||
*/
|
||||
|
|
@ -42,10 +43,13 @@ public class K8sStandardInterpreterLauncherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTemplate() throws IOException {
|
||||
public void testK8sLauncher() throws IOException {
|
||||
// given
|
||||
Kubectl kubectl = mock(Kubectl.class);
|
||||
when(kubectl.getNamespace()).thenReturn("default");
|
||||
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
|
||||
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("ENV_1", "VALUE_1");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -64,36 +68,11 @@ public class K8sStandardInterpreterLauncherTest {
|
|||
"name",
|
||||
0,
|
||||
"host");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
K8sRemoteInterpreterProcess k8sintp = (K8sRemoteInterpreterProcess) client;
|
||||
k8sintp.start("user");
|
||||
// assertTrue(k8sintp.isRunning());
|
||||
// k8sintp.stop();
|
||||
}
|
||||
|
||||
/*
|
||||
@Test
|
||||
public void testLauncher() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("ENV_1", "VALUE_1");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
option.setUserImpersonate(true);
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
|
||||
// when
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
assertEquals("name", interpreterProcess.getInterpreterSettingName());
|
||||
assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
|
||||
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
|
||||
assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(),
|
||||
interpreterProcess.getConnectTimeout());
|
||||
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
|
||||
assertEquals(2, interpreterProcess.getEnv().size());
|
||||
assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
|
||||
assertEquals(true, interpreterProcess.isUserImpersonated());
|
||||
|
||||
// then
|
||||
assertTrue(client instanceof K8sRemoteInterpreterProcess);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue