load properties and environment variables

This commit is contained in:
Lee moon soo 2018-11-23 12:15:21 +09:00
parent b0e2c36c68
commit 0d472ea522
4 changed files with 206 additions and 57 deletions

View file

@ -1,23 +1,23 @@
kind: Pod
apiVersion: v1
metadata:
namespace: {{NAMESPACE}}
name: {{POD_NAME}}
namespace: {{zeppelin.k8s.namespace}}
name: {{zeppelin.k8s.interpreter.pod.name}}
labels:
app: {{POD_NAME}}
interpreterGroupId: {{INTP_ID}}
interpreterSettingName: {{INTP_SETTING}}
{% if OWNER_UID is defined and OWNER_UID %}
app: {{zeppelin.k8s.interpreter.pod.name}}
interpreterGroupId: {{zeppelin.k8s.interpreter.group.id}}
interpreterSettingName: {{zeppelin.k8s.interpreter.setting.name}}
{% if zeppelin.k8s.server.uid is defined %}
ownerReferences:
- apiVersion: v1
controller: false
blockOwnerDeletion: false
kind: Pod
name: {{OWNER_NAME}}
uid: {{OWNER_UID}}
name: {{zeppelin.k8s.server.pod.name}}
uid: {{zeppelin.k8s.server.uid}}
{% endif %}
spec:
{% if SPARK_IMAGE is defined %}
{% if zeppelin.k8s.interpreter.group.name == "spark" %}
automountServiceAccountToken: true
{% else %}
automountServiceAccountToken: false
@ -25,23 +25,21 @@ spec:
restartPolicy: Never
terminationGracePeriodSeconds: 10
containers:
- name: {{CONTAINER_NAME}}
image: {{CONTAINER_IMAGE}}
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{INTP_NAME}} -r {{INTP_PORT}}:{{INTP_PORT}} -c {{CALLBACK_HOST}} -p {{CALLBACK_PORT}} -i {{INTP_ID}} -l {{INTP_REPO}} -g {{INTP_SETTING}}"]
- name: {{zeppelin.k8s.interpreter.container.name}}
image: {{zeppelin.k8s.interpreter.container.image}}
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}} -r {{zeppelin.k8s.interpreter.rpc.portRange}} -c {{zeppelin.k8s.server.rpc.host}} -p {{zeppelin.k8s.server.rpc.portRange}} -i {{zeppelin.k8s.interpreter.group.id}} -l {{zeppelin.k8s.interpreter.localRepo}} -g {{zeppelin.k8s.interpreter.setting.name}}"]
env:
- name: ZEPPELIN_HOME
value: /zeppelin
{% if SPARK_IMAGE is defined %}
- name: SPARK_HOME
value: /spark
- name: SPARK_SUBMIT_OPTIONS
value: {{SPARK_SUBMIT_OPTIONS}}
{% for key, value in zeppelin.k8s.envs.items() %}
- name: {{key}}
value: {{value}}
{% endfor %}
{% if zeppelin.k8s.interpreter.group.name == "spark" %}
volumeMounts:
- name: spark-home
mountPath: /spark
initContainers:
- name: spark-home-init
image: {{SPARK_IMAGE}}
image: {{zeppelin.k8s.spark.image}}
command: ["sh", "-c", "cp -r /opt/spark/* /spark/"]
volumeMounts:
- name: spark-home
@ -54,38 +52,37 @@ spec:
kind: Service
apiVersion: v1
metadata:
namespace: {{NAMESPACE}}
name: {{POD_NAME}} # keep Service name the same to Pod name.
{% if OWNER_UID is defined and OWNER_UID %}
namespace: {{zeppelin.k8s.namespace}}
name: {{zeppelin.k8s.interpreter.pod.name}} # keep Service name the same to Pod name.
{% if zeppelin.k8s.server.uid is defined %}
ownerReferences:
- apiVersion: v1
controller: false
blockOwnerDeletion: false
kind: Pod
name: {{OWNER_NAME}}
uid: {{OWNER_UID}}
name: {{zeppelin.k8s.server.pod.name}}
uid: {{zeppelin.k8s.server.uid}}
{% endif %}
spec:
clusterIP: None
ports:
- name: intp
port: 12321
{% if SPARK_IMAGE is defined %}
{% if zeppelin.k8s.interpreter.group.name == "spark" %}
- name: spark-driver
port: 22321
- name: spark-blockmanager
port: 22322
{% endif %}
selector:
app: {{POD_NAME}}
{% if SPARK_IMAGE is defined %}
app: {{zeppelin.k8s.interpreter.pod.name}}
{% if zeppelin.k8s.interpreter.group.name == "spark" %}
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{POD_NAME}}
namespace: {{NAMESPACE}}
name: {{zeppelin.k8s.interpreter.pod.name}}
namespace: {{zeppelin.k8s.namespace}}
rules:
- apiGroups: [""]
resources: ["pods", "services"]
@ -94,12 +91,12 @@ rules:
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{POD_NAME}}
name: {{zeppelin.k8s.interpreter.pod.name}}
subjects:
- kind: ServiceAccount
name: default
roleRef:
kind: Role
name: {{POD_NAME}}
name: {{zeppelin.k8s.interpreter.pod.name}}
apiGroup: rbac.authorization.k8s.io
{% endif %}

View file

@ -1,5 +1,6 @@
package org.apache.zeppelin.interpreter.launcher;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
@ -60,7 +61,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
this.interpreterGroupName = interpreterGroupName;
this.interpreterSettingName = interpreterSettingName;
this.properties = properties;
this.envs = envs;
this.envs = new HashMap(envs);
this.zeppelinServiceHost = zeppelinServiceHost;
this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
this.portForward = portForward;
@ -220,7 +221,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
} else if (path.isFile()) {
logger.info("Apply " + path.getAbsolutePath());
K8sSpecTemplate specTemplate = new K8sSpecTemplate();
specTemplate.putAll(getTemplateBindings());
specTemplate.loadProperties(getTemplateBindings());
String spec = specTemplate.render(path);
if (delete) {
@ -233,31 +234,44 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
}
}
Map<String, Object> getTemplateBindings() throws IOException {
HashMap<String, Object> var = new HashMap<String, Object>();
var.put("NAMESPACE", kubectl.getNamespace());
var.put("POD_NAME", getPodName());
var.put("CONTAINER_NAME", interpreterGroupName.toLowerCase());
var.put("CONTAINER_IMAGE", containerImage);
var.put("INTP_PORT", "12321"); // interpreter.sh -r
var.put("INTP_ID", interpreterGroupId); // interpreter.sh -i
var.put("INTP_NAME", interpreterGroupName); // interpreter.sh -d
var.put("CALLBACK_HOST", zeppelinServiceHost); // interpreter.sh -c
var.put("CALLBACK_PORT", zeppelinServiceRpcPort); // interpreter.sh -p
var.put("INTP_SETTING", interpreterSettingName); // interpreter.sh -g
var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l
var.put("OWNER_UID", ownerUID());
var.put("OWNER_NAME", ownerName());
Properties getTemplateBindings() throws IOException {
Properties k8sProperties = new Properties();
if (isSpark()) {
var.put("SPARK_IMAGE", "spark:2.4.0");
var.put("SPARK_SUBMIT_OPTIONS", buildSparkSubmitOptions());
// k8s template properties
k8sProperties.put("zeppelin.k8s.namespace", kubectl.getNamespace());
k8sProperties.put("zeppelin.k8s.interpreter.pod.name", getPodName());
k8sProperties.put("zeppelin.k8s.interpreter.container.name", interpreterGroupName.toLowerCase());
k8sProperties.put("zeppelin.k8s.interpreter.container.image", containerImage);
k8sProperties.put("zeppelin.k8s.interpreter.group.id", interpreterGroupId);
k8sProperties.put("zeppelin.k8s.interpreter.group.name", interpreterGroupName);
k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName);
k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo");
k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", "12321:12321");
k8sProperties.put("zeppelin.k8s.server.rpc.host", zeppelinServiceHost);
k8sProperties.put("zeppelin.k8s.server.rpc.portRange", zeppelinServiceRpcPort);
if (ownerUID() != null && ownerName() != null) {
k8sProperties.put("zeppelin.k8s.server.uid", ownerUID());
k8sProperties.put("zeppelin.k8s.server.pod.name", ownerName());
}
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
return var;
// environment variables
envs.put("ZEPPELIN_HOME", envs.getOrDefault("ZEPPELIN_HOME", "/zeppelin"));
if (isSpark()) {
k8sProperties.put("zeppelin.k8s.spark.image", "spark:2.4.0");
envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions());
envs.put("SPARK_HOME", envs.getOrDefault("SPARK_HOME", "/spark"));
}
k8sProperties.put("zeppelin.k8s.envs", envs);
// interpreter properties overrides the values
k8sProperties.putAll(Maps.fromProperties(properties));
return k8sProperties;
}
private boolean isSpark() {
return "spark".equalsIgnoreCase(interpreterGroupName);
}
@ -265,7 +279,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private String buildSparkSubmitOptions() {
StringBuilder options = new StringBuilder();
options.append("--master k8s://https://kubernetes.default.svc");
options.append(" --master k8s://https://kubernetes.default.svc");
options.append(" --deploy-mode client");
options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace());
options.append(" --conf spark.executor.instances=1");

View file

@ -23,6 +23,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class K8sSpecTemplate extends HashMap<String, Object> {
public String render(File templateFile) throws IOException {
@ -40,4 +43,36 @@ public class K8sSpecTemplate extends HashMap<String, Object> {
Thread.currentThread().setContextClassLoader(oldCl);
}
}
public void loadProperties(Properties properties) {
Set<Entry<Object, Object>> entries = properties.entrySet();
for (Entry entry : entries) {
String key = (String) entry.getKey();
Object value = entry.getValue();
String[] keySplit = key.split("[.]");
Map<String, Object> target = this;
for (int i = 0; i < keySplit.length - 1; i++) {
if (!target.containsKey(keySplit[i])) {
HashMap subEntry = new HashMap();
target.put(keySplit[i], subEntry);
target = subEntry;
} else {
Object subEntry = target.get(keySplit[i]);
if (!(subEntry instanceof Map)) {
HashMap replace = new HashMap();
replace.put("_", subEntry);
target.put(keySplit[i], replace);
}
target = (Map<String, Object>) target.get(keySplit[i]);
}
}
if (target.get(keySplit[keySplit.length - 1]) instanceof Map) {
((Map) target.get(keySplit[keySplit.length - 1])).put("_", value);
} else {
target.put(keySplit[keySplit.length - 1], value);
}
}
}
}

View file

@ -16,9 +16,12 @@
*/
package org.apache.zeppelin.interpreter.launcher;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
import static org.fusesource.jansi.AnsiRenderer.render;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class K8sSpecTemplateTest {
@ -34,4 +37,104 @@ public class K8sSpecTemplateTest {
// then
assertEquals("Hello world", spec);
}
@Test
public void testObject() {
K8sSpecTemplate template = new K8sSpecTemplate();
template.put("k8s", ImmutableMap.of("key", "world"));
// when
String spec = template.render("Hello {{k8s.key}}");
// then
assertEquals("Hello world", spec);
}
@Test
public void testIterate() {
// given
K8sSpecTemplate template = new K8sSpecTemplate();
template.put("dict", ImmutableMap.of(
"k1", "v1",
"k2", "v2"
));
// when
String spec = template.render(
"{% for key, value in dict.items() %}" +
"key = {{key}}, value = {{value}}\n" +
"{% endfor %}"
);
// then
assertEquals(
"key = k1, value = v1\n" +
"key = k2, value = v2\n", spec);
}
@Test
public void testLoadProperties() {
// given
K8sSpecTemplate template = new K8sSpecTemplate();
Properties p = new Properties();
p.put("k8s.intp.key1", "v1");
p.put("k8s.intp.key2", "v2");
p.put("k8s.key3", "v3");
p.put("key4", "v4");
// when
template.loadProperties(p);
// then
assertEquals("v4", template.get("key4"));
assertEquals("v3", ((Map) template.get("k8s")).get("key3"));
assertEquals("v2", ((Map) ((Map) template.get("k8s")).get("intp")).get("key2"));
assertEquals("v1", ((Map) ((Map) template.get("k8s")).get("intp")).get("key1"));
}
@Test
public void testLoadPropertyOverrideString() {
// given
K8sSpecTemplate template = new K8sSpecTemplate();
Properties p = new Properties();
p.put("k8s", "v1");
p.put("k8s.key1", "v2");
// when
template.loadProperties(p);
// then
assertEquals("v1", ((Map) template.get("k8s")).get("_"));
assertEquals("v2", ((Map) template.get("k8s")).get("key1"));
}
@Test
public void testLoadPropertyOverrideDict() {
// given
K8sSpecTemplate template = new K8sSpecTemplate();
Properties p = new Properties();
p.put("k8s.key1", "v2");
p.put("k8s", "v1");
// when
template.loadProperties(p);
// then
assertEquals("v1", ((Map) template.get("k8s")).get("_"));
assertEquals("v2", ((Map) template.get("k8s")).get("key1"));
}
@Test
public void testLoadPropertyWithMap() {
// given
K8sSpecTemplate template = new K8sSpecTemplate();
Properties p = new Properties();
p.put("k8s", ImmutableMap.of("k1", "v1"));
// when
template.loadProperties(p);
// then
assertEquals("v1", ((Map) template.get("k8s")).get("k1"));
}
}