diff --git a/k8s/interpreter/100-interpreter-pod.yaml b/k8s/interpreter/100-interpreter-pod.yaml index 6268a7f23f..63d228b362 100644 --- a/k8s/interpreter/100-interpreter-pod.yaml +++ b/k8s/interpreter/100-interpreter-pod.yaml @@ -1,23 +1,23 @@ kind: Pod apiVersion: v1 metadata: - namespace: {{NAMESPACE}} - name: {{POD_NAME}} + namespace: {{zeppelin.k8s.namespace}} + name: {{zeppelin.k8s.interpreter.pod.name}} labels: - app: {{POD_NAME}} - interpreterGroupId: {{INTP_ID}} - interpreterSettingName: {{INTP_SETTING}} - {% if OWNER_UID is defined and OWNER_UID %} + app: {{zeppelin.k8s.interpreter.pod.name}} + interpreterGroupId: {{zeppelin.k8s.interpreter.group.id}} + interpreterSettingName: {{zeppelin.k8s.interpreter.setting.name}} + {% if zeppelin.k8s.server.uid is defined %} ownerReferences: - apiVersion: v1 controller: false blockOwnerDeletion: false kind: Pod - name: {{OWNER_NAME}} - uid: {{OWNER_UID}} + name: {{zeppelin.k8s.server.pod.name}} + uid: {{zeppelin.k8s.server.uid}} {% endif %} spec: - {% if SPARK_IMAGE is defined %} + {% if zeppelin.k8s.interpreter.group.name == "spark" %} automountServiceAccountToken: true {% else %} automountServiceAccountToken: false @@ -25,23 +25,21 @@ spec: restartPolicy: Never terminationGracePeriodSeconds: 10 containers: - - name: {{CONTAINER_NAME}} - image: {{CONTAINER_IMAGE}} - command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{INTP_NAME}} -r {{INTP_PORT}}:{{INTP_PORT}} -c {{CALLBACK_HOST}} -p {{CALLBACK_PORT}} -i {{INTP_ID}} -l {{INTP_REPO}} -g {{INTP_SETTING}}"] + - name: {{zeppelin.k8s.interpreter.container.name}} + image: {{zeppelin.k8s.interpreter.container.image}} + command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}} -r {{zeppelin.k8s.interpreter.rpc.portRange}} -c {{zeppelin.k8s.server.rpc.host}} -p {{zeppelin.k8s.server.rpc.portRange}} -i {{zeppelin.k8s.interpreter.group.id}} -l {{zeppelin.k8s.interpreter.localRepo}} -g {{zeppelin.k8s.interpreter.setting.name}}"] env: - - name: ZEPPELIN_HOME - value: /zeppelin - {% if SPARK_IMAGE is defined %} - - name: SPARK_HOME - value: /spark - - name: SPARK_SUBMIT_OPTIONS - value: {{SPARK_SUBMIT_OPTIONS}} + {% for key, value in zeppelin.k8s.envs.items() %} + - name: {{key}} + value: {{value}} + {% endfor %} + {% if zeppelin.k8s.interpreter.group.name == "spark" %} volumeMounts: - name: spark-home mountPath: /spark initContainers: - name: spark-home-init - image: {{SPARK_IMAGE}} + image: {{zeppelin.k8s.spark.image}} command: ["sh", "-c", "cp -r /opt/spark/* /spark/"] volumeMounts: - name: spark-home @@ -54,38 +52,37 @@ spec: kind: Service apiVersion: v1 metadata: - namespace: {{NAMESPACE}} - name: {{POD_NAME}} # keep Service name the same to Pod name. - {% if OWNER_UID is defined and OWNER_UID %} + namespace: {{zeppelin.k8s.namespace}} + name: {{zeppelin.k8s.interpreter.pod.name}} # keep Service name the same to Pod name. + {% if zeppelin.k8s.server.uid is defined %} ownerReferences: - apiVersion: v1 controller: false blockOwnerDeletion: false kind: Pod - name: {{OWNER_NAME}} - uid: {{OWNER_UID}} + name: {{zeppelin.k8s.server.pod.name}} + uid: {{zeppelin.k8s.server.uid}} {% endif %} spec: clusterIP: None ports: - name: intp port: 12321 - {% if SPARK_IMAGE is defined %} + {% if zeppelin.k8s.interpreter.group.name == "spark" %} - name: spark-driver port: 22321 - name: spark-blockmanager port: 22322 {% endif %} selector: - app: {{POD_NAME}} - -{% if SPARK_IMAGE is defined %} + app: {{zeppelin.k8s.interpreter.pod.name}} +{% if zeppelin.k8s.interpreter.group.name == "spark" %} --- kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: - name: {{POD_NAME}} - namespace: {{NAMESPACE}} + name: {{zeppelin.k8s.interpreter.pod.name}} + namespace: {{zeppelin.k8s.namespace}} rules: - apiGroups: [""] resources: ["pods", "services"] @@ -94,12 +91,12 @@ rules: kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: - name: {{POD_NAME}} + name: {{zeppelin.k8s.interpreter.pod.name}} subjects: - kind: ServiceAccount name: default roleRef: kind: Role - name: {{POD_NAME}} + name: {{zeppelin.k8s.interpreter.pod.name}} apiGroup: rbac.authorization.k8s.io {% endif %} \ No newline at end of file diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index db6bd697d5..c23025e017 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -1,5 +1,6 @@ package org.apache.zeppelin.interpreter.launcher; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -60,7 +61,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { this.interpreterGroupName = interpreterGroupName; this.interpreterSettingName = interpreterSettingName; this.properties = properties; - this.envs = envs; + this.envs = new HashMap(envs); this.zeppelinServiceHost = zeppelinServiceHost; this.zeppelinServiceRpcPort = zeppelinServiceRpcPort; this.portForward = portForward; @@ -220,7 +221,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { } else if (path.isFile()) { logger.info("Apply " + path.getAbsolutePath()); K8sSpecTemplate specTemplate = new K8sSpecTemplate(); - specTemplate.putAll(getTemplateBindings()); + specTemplate.loadProperties(getTemplateBindings()); String spec = specTemplate.render(path); if (delete) { @@ -233,31 +234,44 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { } } - Map getTemplateBindings() throws IOException { - HashMap var = new HashMap(); - var.put("NAMESPACE", kubectl.getNamespace()); - var.put("POD_NAME", getPodName()); - var.put("CONTAINER_NAME", interpreterGroupName.toLowerCase()); - var.put("CONTAINER_IMAGE", containerImage); - var.put("INTP_PORT", "12321"); // interpreter.sh -r - var.put("INTP_ID", interpreterGroupId); // interpreter.sh -i - var.put("INTP_NAME", interpreterGroupName); // interpreter.sh -d - var.put("CALLBACK_HOST", zeppelinServiceHost); // interpreter.sh -c - var.put("CALLBACK_PORT", zeppelinServiceRpcPort); // interpreter.sh -p - var.put("INTP_SETTING", interpreterSettingName); // interpreter.sh -g - var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l - var.put("OWNER_UID", ownerUID()); - var.put("OWNER_NAME", ownerName()); + Properties getTemplateBindings() throws IOException { + Properties k8sProperties = new Properties(); - if (isSpark()) { - var.put("SPARK_IMAGE", "spark:2.4.0"); - var.put("SPARK_SUBMIT_OPTIONS", buildSparkSubmitOptions()); + // k8s template properties + k8sProperties.put("zeppelin.k8s.namespace", kubectl.getNamespace()); + k8sProperties.put("zeppelin.k8s.interpreter.pod.name", getPodName()); + k8sProperties.put("zeppelin.k8s.interpreter.container.name", interpreterGroupName.toLowerCase()); + k8sProperties.put("zeppelin.k8s.interpreter.container.image", containerImage); + k8sProperties.put("zeppelin.k8s.interpreter.group.id", interpreterGroupId); + k8sProperties.put("zeppelin.k8s.interpreter.group.name", interpreterGroupName); + k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName); + k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo"); + k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", "12321:12321"); + k8sProperties.put("zeppelin.k8s.server.rpc.host", zeppelinServiceHost); + k8sProperties.put("zeppelin.k8s.server.rpc.portRange", zeppelinServiceRpcPort); + if (ownerUID() != null && ownerName() != null) { + k8sProperties.put("zeppelin.k8s.server.uid", ownerUID()); + k8sProperties.put("zeppelin.k8s.server.pod.name", ownerName()); } - var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables - return var; + // environment variables + envs.put("ZEPPELIN_HOME", envs.getOrDefault("ZEPPELIN_HOME", "/zeppelin")); + + if (isSpark()) { + k8sProperties.put("zeppelin.k8s.spark.image", "spark:2.4.0"); + envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions()); + envs.put("SPARK_HOME", envs.getOrDefault("SPARK_HOME", "/spark")); + } + + k8sProperties.put("zeppelin.k8s.envs", envs); + + // interpreter properties overrides the values + k8sProperties.putAll(Maps.fromProperties(properties)); + return k8sProperties; } + + private boolean isSpark() { return "spark".equalsIgnoreCase(interpreterGroupName); } @@ -265,7 +279,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { private String buildSparkSubmitOptions() { StringBuilder options = new StringBuilder(); - options.append("--master k8s://https://kubernetes.default.svc"); + options.append(" --master k8s://https://kubernetes.default.svc"); options.append(" --deploy-mode client"); options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace()); options.append(" --conf spark.executor.instances=1"); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java index b2fb62b144..2ed2c13be0 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java @@ -23,6 +23,9 @@ import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; public class K8sSpecTemplate extends HashMap { public String render(File templateFile) throws IOException { @@ -40,4 +43,36 @@ public class K8sSpecTemplate extends HashMap { Thread.currentThread().setContextClassLoader(oldCl); } } + + public void loadProperties(Properties properties) { + Set> entries = properties.entrySet(); + for (Entry entry : entries) { + String key = (String) entry.getKey(); + Object value = entry.getValue(); + + String[] keySplit = key.split("[.]"); + Map target = this; + for (int i = 0; i < keySplit.length - 1; i++) { + if (!target.containsKey(keySplit[i])) { + HashMap subEntry = new HashMap(); + target.put(keySplit[i], subEntry); + target = subEntry; + } else { + Object subEntry = target.get(keySplit[i]); + if (!(subEntry instanceof Map)) { + HashMap replace = new HashMap(); + replace.put("_", subEntry); + target.put(keySplit[i], replace); + } + target = (Map) target.get(keySplit[i]); + } + } + + if (target.get(keySplit[keySplit.length - 1]) instanceof Map) { + ((Map) target.get(keySplit[keySplit.length - 1])).put("_", value); + } else { + target.put(keySplit[keySplit.length - 1], value); + } + } + } } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java index a1e656e3eb..daf37733b8 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java @@ -16,9 +16,12 @@ */ package org.apache.zeppelin.interpreter.launcher; +import com.google.common.collect.ImmutableMap; import org.junit.Test; -import static org.fusesource.jansi.AnsiRenderer.render; +import java.util.Map; +import java.util.Properties; + import static org.junit.Assert.assertEquals; public class K8sSpecTemplateTest { @@ -34,4 +37,104 @@ public class K8sSpecTemplateTest { // then assertEquals("Hello world", spec); } + + @Test + public void testObject() { + K8sSpecTemplate template = new K8sSpecTemplate(); + template.put("k8s", ImmutableMap.of("key", "world")); + + // when + String spec = template.render("Hello {{k8s.key}}"); + + // then + assertEquals("Hello world", spec); + } + + @Test + public void testIterate() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + template.put("dict", ImmutableMap.of( + "k1", "v1", + "k2", "v2" + )); + + // when + String spec = template.render( + "{% for key, value in dict.items() %}" + + "key = {{key}}, value = {{value}}\n" + + "{% endfor %}" + ); + + // then + assertEquals( + "key = k1, value = v1\n" + + "key = k2, value = v2\n", spec); + } + + @Test + public void testLoadProperties() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + Properties p = new Properties(); + p.put("k8s.intp.key1", "v1"); + p.put("k8s.intp.key2", "v2"); + p.put("k8s.key3", "v3"); + p.put("key4", "v4"); + + // when + template.loadProperties(p); + + // then + assertEquals("v4", template.get("key4")); + assertEquals("v3", ((Map) template.get("k8s")).get("key3")); + assertEquals("v2", ((Map) ((Map) template.get("k8s")).get("intp")).get("key2")); + assertEquals("v1", ((Map) ((Map) template.get("k8s")).get("intp")).get("key1")); + } + + @Test + public void testLoadPropertyOverrideString() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + Properties p = new Properties(); + p.put("k8s", "v1"); + p.put("k8s.key1", "v2"); + + // when + template.loadProperties(p); + + // then + assertEquals("v1", ((Map) template.get("k8s")).get("_")); + assertEquals("v2", ((Map) template.get("k8s")).get("key1")); + } + + @Test + public void testLoadPropertyOverrideDict() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + Properties p = new Properties(); + p.put("k8s.key1", "v2"); + p.put("k8s", "v1"); + + // when + template.loadProperties(p); + + // then + assertEquals("v1", ((Map) template.get("k8s")).get("_")); + assertEquals("v2", ((Map) template.get("k8s")).get("key1")); + } + + @Test + public void testLoadPropertyWithMap() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + Properties p = new Properties(); + p.put("k8s", ImmutableMap.of("k1", "v1")); + + // when + template.loadProperties(p); + + // then + assertEquals("v1", ((Map) template.get("k8s")).get("k1")); + } }