configure spark on kubernetes

This commit is contained in:
Lee moon soo 2018-11-23 06:12:14 +09:00
parent 263d859d42
commit 7a87367561
2 changed files with 104 additions and 3 deletions

View file

@ -17,7 +17,11 @@ metadata:
uid: {{OWNER_UID}}
{% endif %}
spec:
{% if SPARK_IMAGE is defined %}
automountServiceAccountToken: true
{% else %}
automountServiceAccountToken: false
{% endif %}
restartPolicy: Never
terminationGracePeriodSeconds: 10
containers:
@ -27,6 +31,25 @@ spec:
env:
- name: ZEPPELIN_HOME
value: /zeppelin
{% if SPARK_IMAGE is defined %}
- name: SPARK_HOME
value: /spark
- name: SPARK_SUBMIT_OPTIONS
value: {{SPARK_SUBMIT_OPTIONS}}
volumeMounts:
- name: spark-home
mountPath: /spark
initContainers:
- name: spark-home-init
image: {{SPARK_IMAGE}}
command: ["sh", "-c", "cp -r /opt/spark/* /spark/"]
volumeMounts:
- name: spark-home
mountPath: /spark
volumes:
- name: spark-home
emptyDir: {}
{% endif %}
---
kind: Service
apiVersion: v1
@ -47,5 +70,36 @@ spec:
ports:
- name: intp
port: 12321
{% if SPARK_IMAGE is defined %}
- name: spark-driver
port: 22321
- name: spark-blockmanager
port: 22322
{% endif %}
selector:
app: {{POD_NAME}}
{% if SPARK_IMAGE is defined %}
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{POD_NAME}}
namespace: {{NAMESPACE}}
rules:
- apiGroups: [""]
resources: ["pods", "services"]
verbs: ["create", "get", "update", "list", "delete", "watch" ]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{POD_NAME}}
subjects:
- kind: ServiceAccount
name: default
roleRef:
kind: Role
name: {{POD_NAME}}
apiGroup: rbac.authorization.k8s.io
{% endif %}

View file

@ -154,9 +154,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
if (portForward) {
return "localhost";
} else {
return String.format("%s.%s.svc.cluster.local",
getPodName(), // service name and pod name is the same
kubectl.getNamespace());
return getInterpreterPodDnsName();
}
}
@ -250,10 +248,59 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l
var.put("OWNER_UID", ownerUID());
var.put("OWNER_NAME", ownerName());
if (isSpark()) {
var.put("SPARK_IMAGE", "spark:2.4.0");
var.put("SPARK_SUBMIT_OPTIONS", buildSparkSubmitOptions());
}
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
return var;
}
private boolean isSpark() {
return "spark".equalsIgnoreCase(interpreterGroupName);
}
private String buildSparkSubmitOptions() {
StringBuilder options = new StringBuilder();
options.append("--master k8s://https://kubernetes.default.svc");
options.append(" --deploy-mode client");
options.append(" --conf spark.executor.instances=1");
options.append(" --conf spark.driver.pod.name=" + getPodName());
options.append(" --conf spark.kubernetes.container.image=spark:2.4.0");
options.append(" --conf spark.driver.bindAddress=0.0.0.0");
options.append(" --conf spark.driver.host=" + getInterpreterPodDnsName());
options.append(" --conf spark.driver.port=" + String.format("%d", getSparkDriverPort()));
options.append(" --conf spark.blockManager.port=" + String.format("%d", getSparkBlockmanagerPort()));
return options.toString();
}
private String getInterpreterPodDnsName() {
return String.format("%s.%s.svc.cluster.local",
getPodName(), // service name and pod name is the same
kubectl.getNamespace());
}
/**
* See xxx-interpreter-pod.yaml
* @return
*/
private int getSparkDriverPort() {
return 22321;
}
/**
* See xxx-interpreter-pod.yaml
* @return
*/
private int getSparkBlockmanagerPort() {
return 22322;
}
/**
* Get UID of owner (zeppelin-server pod) for garbage collection
* https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/