mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
load properties and environment variables
This commit is contained in:
parent
b0e2c36c68
commit
0d472ea522
4 changed files with 206 additions and 57 deletions
|
|
@ -1,23 +1,23 @@
|
|||
kind: Pod
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
namespace: {{NAMESPACE}}
|
||||
name: {{POD_NAME}}
|
||||
namespace: {{zeppelin.k8s.namespace}}
|
||||
name: {{zeppelin.k8s.interpreter.pod.name}}
|
||||
labels:
|
||||
app: {{POD_NAME}}
|
||||
interpreterGroupId: {{INTP_ID}}
|
||||
interpreterSettingName: {{INTP_SETTING}}
|
||||
{% if OWNER_UID is defined and OWNER_UID %}
|
||||
app: {{zeppelin.k8s.interpreter.pod.name}}
|
||||
interpreterGroupId: {{zeppelin.k8s.interpreter.group.id}}
|
||||
interpreterSettingName: {{zeppelin.k8s.interpreter.setting.name}}
|
||||
{% if zeppelin.k8s.server.uid is defined %}
|
||||
ownerReferences:
|
||||
- apiVersion: v1
|
||||
controller: false
|
||||
blockOwnerDeletion: false
|
||||
kind: Pod
|
||||
name: {{OWNER_NAME}}
|
||||
uid: {{OWNER_UID}}
|
||||
name: {{zeppelin.k8s.server.pod.name}}
|
||||
uid: {{zeppelin.k8s.server.uid}}
|
||||
{% endif %}
|
||||
spec:
|
||||
{% if SPARK_IMAGE is defined %}
|
||||
{% if zeppelin.k8s.interpreter.group.name == "spark" %}
|
||||
automountServiceAccountToken: true
|
||||
{% else %}
|
||||
automountServiceAccountToken: false
|
||||
|
|
@ -25,23 +25,21 @@ spec:
|
|||
restartPolicy: Never
|
||||
terminationGracePeriodSeconds: 10
|
||||
containers:
|
||||
- name: {{CONTAINER_NAME}}
|
||||
image: {{CONTAINER_IMAGE}}
|
||||
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{INTP_NAME}} -r {{INTP_PORT}}:{{INTP_PORT}} -c {{CALLBACK_HOST}} -p {{CALLBACK_PORT}} -i {{INTP_ID}} -l {{INTP_REPO}} -g {{INTP_SETTING}}"]
|
||||
- name: {{zeppelin.k8s.interpreter.container.name}}
|
||||
image: {{zeppelin.k8s.interpreter.container.image}}
|
||||
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}} -r {{zeppelin.k8s.interpreter.rpc.portRange}} -c {{zeppelin.k8s.server.rpc.host}} -p {{zeppelin.k8s.server.rpc.portRange}} -i {{zeppelin.k8s.interpreter.group.id}} -l {{zeppelin.k8s.interpreter.localRepo}} -g {{zeppelin.k8s.interpreter.setting.name}}"]
|
||||
env:
|
||||
- name: ZEPPELIN_HOME
|
||||
value: /zeppelin
|
||||
{% if SPARK_IMAGE is defined %}
|
||||
- name: SPARK_HOME
|
||||
value: /spark
|
||||
- name: SPARK_SUBMIT_OPTIONS
|
||||
value: {{SPARK_SUBMIT_OPTIONS}}
|
||||
{% for key, value in zeppelin.k8s.envs.items() %}
|
||||
- name: {{key}}
|
||||
value: {{value}}
|
||||
{% endfor %}
|
||||
{% if zeppelin.k8s.interpreter.group.name == "spark" %}
|
||||
volumeMounts:
|
||||
- name: spark-home
|
||||
mountPath: /spark
|
||||
initContainers:
|
||||
- name: spark-home-init
|
||||
image: {{SPARK_IMAGE}}
|
||||
image: {{zeppelin.k8s.spark.image}}
|
||||
command: ["sh", "-c", "cp -r /opt/spark/* /spark/"]
|
||||
volumeMounts:
|
||||
- name: spark-home
|
||||
|
|
@ -54,38 +52,37 @@ spec:
|
|||
kind: Service
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
namespace: {{NAMESPACE}}
|
||||
name: {{POD_NAME}} # keep Service name the same to Pod name.
|
||||
{% if OWNER_UID is defined and OWNER_UID %}
|
||||
namespace: {{zeppelin.k8s.namespace}}
|
||||
name: {{zeppelin.k8s.interpreter.pod.name}} # keep Service name the same to Pod name.
|
||||
{% if zeppelin.k8s.server.uid is defined %}
|
||||
ownerReferences:
|
||||
- apiVersion: v1
|
||||
controller: false
|
||||
blockOwnerDeletion: false
|
||||
kind: Pod
|
||||
name: {{OWNER_NAME}}
|
||||
uid: {{OWNER_UID}}
|
||||
name: {{zeppelin.k8s.server.pod.name}}
|
||||
uid: {{zeppelin.k8s.server.uid}}
|
||||
{% endif %}
|
||||
spec:
|
||||
clusterIP: None
|
||||
ports:
|
||||
- name: intp
|
||||
port: 12321
|
||||
{% if SPARK_IMAGE is defined %}
|
||||
{% if zeppelin.k8s.interpreter.group.name == "spark" %}
|
||||
- name: spark-driver
|
||||
port: 22321
|
||||
- name: spark-blockmanager
|
||||
port: 22322
|
||||
{% endif %}
|
||||
selector:
|
||||
app: {{POD_NAME}}
|
||||
|
||||
{% if SPARK_IMAGE is defined %}
|
||||
app: {{zeppelin.k8s.interpreter.pod.name}}
|
||||
{% if zeppelin.k8s.interpreter.group.name == "spark" %}
|
||||
---
|
||||
kind: Role
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
name: {{POD_NAME}}
|
||||
namespace: {{NAMESPACE}}
|
||||
name: {{zeppelin.k8s.interpreter.pod.name}}
|
||||
namespace: {{zeppelin.k8s.namespace}}
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["pods", "services"]
|
||||
|
|
@ -94,12 +91,12 @@ rules:
|
|||
kind: RoleBinding
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
name: {{POD_NAME}}
|
||||
name: {{zeppelin.k8s.interpreter.pod.name}}
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: default
|
||||
roleRef:
|
||||
kind: Role
|
||||
name: {{POD_NAME}}
|
||||
name: {{zeppelin.k8s.interpreter.pod.name}}
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
{% endif %}
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
|
@ -60,7 +61,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
this.interpreterGroupName = interpreterGroupName;
|
||||
this.interpreterSettingName = interpreterSettingName;
|
||||
this.properties = properties;
|
||||
this.envs = envs;
|
||||
this.envs = new HashMap(envs);
|
||||
this.zeppelinServiceHost = zeppelinServiceHost;
|
||||
this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
|
||||
this.portForward = portForward;
|
||||
|
|
@ -220,7 +221,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
} else if (path.isFile()) {
|
||||
logger.info("Apply " + path.getAbsolutePath());
|
||||
K8sSpecTemplate specTemplate = new K8sSpecTemplate();
|
||||
specTemplate.putAll(getTemplateBindings());
|
||||
specTemplate.loadProperties(getTemplateBindings());
|
||||
|
||||
String spec = specTemplate.render(path);
|
||||
if (delete) {
|
||||
|
|
@ -233,31 +234,44 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
}
|
||||
}
|
||||
|
||||
Map<String, Object> getTemplateBindings() throws IOException {
|
||||
HashMap<String, Object> var = new HashMap<String, Object>();
|
||||
var.put("NAMESPACE", kubectl.getNamespace());
|
||||
var.put("POD_NAME", getPodName());
|
||||
var.put("CONTAINER_NAME", interpreterGroupName.toLowerCase());
|
||||
var.put("CONTAINER_IMAGE", containerImage);
|
||||
var.put("INTP_PORT", "12321"); // interpreter.sh -r
|
||||
var.put("INTP_ID", interpreterGroupId); // interpreter.sh -i
|
||||
var.put("INTP_NAME", interpreterGroupName); // interpreter.sh -d
|
||||
var.put("CALLBACK_HOST", zeppelinServiceHost); // interpreter.sh -c
|
||||
var.put("CALLBACK_PORT", zeppelinServiceRpcPort); // interpreter.sh -p
|
||||
var.put("INTP_SETTING", interpreterSettingName); // interpreter.sh -g
|
||||
var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l
|
||||
var.put("OWNER_UID", ownerUID());
|
||||
var.put("OWNER_NAME", ownerName());
|
||||
Properties getTemplateBindings() throws IOException {
|
||||
Properties k8sProperties = new Properties();
|
||||
|
||||
if (isSpark()) {
|
||||
var.put("SPARK_IMAGE", "spark:2.4.0");
|
||||
var.put("SPARK_SUBMIT_OPTIONS", buildSparkSubmitOptions());
|
||||
// k8s template properties
|
||||
k8sProperties.put("zeppelin.k8s.namespace", kubectl.getNamespace());
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.pod.name", getPodName());
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.container.name", interpreterGroupName.toLowerCase());
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.container.image", containerImage);
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.group.id", interpreterGroupId);
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.group.name", interpreterGroupName);
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName);
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo");
|
||||
k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", "12321:12321");
|
||||
k8sProperties.put("zeppelin.k8s.server.rpc.host", zeppelinServiceHost);
|
||||
k8sProperties.put("zeppelin.k8s.server.rpc.portRange", zeppelinServiceRpcPort);
|
||||
if (ownerUID() != null && ownerName() != null) {
|
||||
k8sProperties.put("zeppelin.k8s.server.uid", ownerUID());
|
||||
k8sProperties.put("zeppelin.k8s.server.pod.name", ownerName());
|
||||
}
|
||||
|
||||
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
|
||||
return var;
|
||||
// environment variables
|
||||
envs.put("ZEPPELIN_HOME", envs.getOrDefault("ZEPPELIN_HOME", "/zeppelin"));
|
||||
|
||||
if (isSpark()) {
|
||||
k8sProperties.put("zeppelin.k8s.spark.image", "spark:2.4.0");
|
||||
envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions());
|
||||
envs.put("SPARK_HOME", envs.getOrDefault("SPARK_HOME", "/spark"));
|
||||
}
|
||||
|
||||
k8sProperties.put("zeppelin.k8s.envs", envs);
|
||||
|
||||
// interpreter properties overrides the values
|
||||
k8sProperties.putAll(Maps.fromProperties(properties));
|
||||
return k8sProperties;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private boolean isSpark() {
|
||||
return "spark".equalsIgnoreCase(interpreterGroupName);
|
||||
}
|
||||
|
|
@ -265,7 +279,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
private String buildSparkSubmitOptions() {
|
||||
StringBuilder options = new StringBuilder();
|
||||
|
||||
options.append("--master k8s://https://kubernetes.default.svc");
|
||||
options.append(" --master k8s://https://kubernetes.default.svc");
|
||||
options.append(" --deploy-mode client");
|
||||
options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace());
|
||||
options.append(" --conf spark.executor.instances=1");
|
||||
|
|
|
|||
|
|
@ -23,6 +23,9 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
public class K8sSpecTemplate extends HashMap<String, Object> {
|
||||
public String render(File templateFile) throws IOException {
|
||||
|
|
@ -40,4 +43,36 @@ public class K8sSpecTemplate extends HashMap<String, Object> {
|
|||
Thread.currentThread().setContextClassLoader(oldCl);
|
||||
}
|
||||
}
|
||||
|
||||
public void loadProperties(Properties properties) {
|
||||
Set<Entry<Object, Object>> entries = properties.entrySet();
|
||||
for (Entry entry : entries) {
|
||||
String key = (String) entry.getKey();
|
||||
Object value = entry.getValue();
|
||||
|
||||
String[] keySplit = key.split("[.]");
|
||||
Map<String, Object> target = this;
|
||||
for (int i = 0; i < keySplit.length - 1; i++) {
|
||||
if (!target.containsKey(keySplit[i])) {
|
||||
HashMap subEntry = new HashMap();
|
||||
target.put(keySplit[i], subEntry);
|
||||
target = subEntry;
|
||||
} else {
|
||||
Object subEntry = target.get(keySplit[i]);
|
||||
if (!(subEntry instanceof Map)) {
|
||||
HashMap replace = new HashMap();
|
||||
replace.put("_", subEntry);
|
||||
target.put(keySplit[i], replace);
|
||||
}
|
||||
target = (Map<String, Object>) target.get(keySplit[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (target.get(keySplit[keySplit.length - 1]) instanceof Map) {
|
||||
((Map) target.get(keySplit[keySplit.length - 1])).put("_", value);
|
||||
} else {
|
||||
target.put(keySplit[keySplit.length - 1], value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,9 +16,12 @@
|
|||
*/
|
||||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.fusesource.jansi.AnsiRenderer.render;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class K8sSpecTemplateTest {
|
||||
|
|
@ -34,4 +37,104 @@ public class K8sSpecTemplateTest {
|
|||
// then
|
||||
assertEquals("Hello world", spec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObject() {
|
||||
K8sSpecTemplate template = new K8sSpecTemplate();
|
||||
template.put("k8s", ImmutableMap.of("key", "world"));
|
||||
|
||||
// when
|
||||
String spec = template.render("Hello {{k8s.key}}");
|
||||
|
||||
// then
|
||||
assertEquals("Hello world", spec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIterate() {
|
||||
// given
|
||||
K8sSpecTemplate template = new K8sSpecTemplate();
|
||||
template.put("dict", ImmutableMap.of(
|
||||
"k1", "v1",
|
||||
"k2", "v2"
|
||||
));
|
||||
|
||||
// when
|
||||
String spec = template.render(
|
||||
"{% for key, value in dict.items() %}" +
|
||||
"key = {{key}}, value = {{value}}\n" +
|
||||
"{% endfor %}"
|
||||
);
|
||||
|
||||
// then
|
||||
assertEquals(
|
||||
"key = k1, value = v1\n" +
|
||||
"key = k2, value = v2\n", spec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadProperties() {
|
||||
// given
|
||||
K8sSpecTemplate template = new K8sSpecTemplate();
|
||||
Properties p = new Properties();
|
||||
p.put("k8s.intp.key1", "v1");
|
||||
p.put("k8s.intp.key2", "v2");
|
||||
p.put("k8s.key3", "v3");
|
||||
p.put("key4", "v4");
|
||||
|
||||
// when
|
||||
template.loadProperties(p);
|
||||
|
||||
// then
|
||||
assertEquals("v4", template.get("key4"));
|
||||
assertEquals("v3", ((Map) template.get("k8s")).get("key3"));
|
||||
assertEquals("v2", ((Map) ((Map) template.get("k8s")).get("intp")).get("key2"));
|
||||
assertEquals("v1", ((Map) ((Map) template.get("k8s")).get("intp")).get("key1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadPropertyOverrideString() {
|
||||
// given
|
||||
K8sSpecTemplate template = new K8sSpecTemplate();
|
||||
Properties p = new Properties();
|
||||
p.put("k8s", "v1");
|
||||
p.put("k8s.key1", "v2");
|
||||
|
||||
// when
|
||||
template.loadProperties(p);
|
||||
|
||||
// then
|
||||
assertEquals("v1", ((Map) template.get("k8s")).get("_"));
|
||||
assertEquals("v2", ((Map) template.get("k8s")).get("key1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadPropertyOverrideDict() {
|
||||
// given
|
||||
K8sSpecTemplate template = new K8sSpecTemplate();
|
||||
Properties p = new Properties();
|
||||
p.put("k8s.key1", "v2");
|
||||
p.put("k8s", "v1");
|
||||
|
||||
// when
|
||||
template.loadProperties(p);
|
||||
|
||||
// then
|
||||
assertEquals("v1", ((Map) template.get("k8s")).get("_"));
|
||||
assertEquals("v2", ((Map) template.get("k8s")).get("key1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadPropertyWithMap() {
|
||||
// given
|
||||
K8sSpecTemplate template = new K8sSpecTemplate();
|
||||
Properties p = new Properties();
|
||||
p.put("k8s", ImmutableMap.of("k1", "v1"));
|
||||
|
||||
// when
|
||||
template.loadProperties(p);
|
||||
|
||||
// then
|
||||
assertEquals("v1", ((Map) template.get("k8s")).get("k1"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue