mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
K8sRemoteInterpreterProcess
This commit is contained in:
parent
07489f76df
commit
5f602a65ef
11 changed files with 334 additions and 531 deletions
30
k8s/interpreter/100-interpreter-pod.yaml
Normal file
30
k8s/interpreter/100-interpreter-pod.yaml
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
kind: Pod
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
namespace: {{NAMESPACE}}
|
||||
name: {{POD_NAME}}
|
||||
labels:
|
||||
app: {{POD_NAME}}
|
||||
spec:
|
||||
automountServiceAccountToken: false
|
||||
restartPolicy: Never
|
||||
terminationGracePeriodSeconds: 10
|
||||
containers:
|
||||
- name: {{CONTAINER_NAME}}
|
||||
image: {{CONTAINER_IMAGE}}
|
||||
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{INTP_NAME}} -r {{INTP_PORT}}:{{INTP_PORT}} -c {{CALLBACK_HOST}} -p {{CALLBACK_PORT}} -i {{INTP_ID}} -l {{INTP_REPO}} -g {{INTP_SETTING}}"]
|
||||
env:
|
||||
- name: ZEPPELIN_HOME
|
||||
value: /zeppelin
|
||||
---
|
||||
kind: Service
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
namespace: {{NAMESPACE}}
|
||||
name: {{POD_NAME}} # keep Service name the same to Pod name.
|
||||
spec:
|
||||
ports:
|
||||
- name: intp
|
||||
port: 12321
|
||||
selector:
|
||||
app: {{POD_NAME}}
|
||||
32
k8s/zeppelin.yaml
Normal file
32
k8s/zeppelin.yaml
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
kind: Pod
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
namespace: default
|
||||
name: zeppelin-server
|
||||
labels:
|
||||
app: zeppelin-server
|
||||
spec:
|
||||
automountServiceAccountToken: true
|
||||
containers:
|
||||
- name: zeppelin-server
|
||||
image: apache/zeppelin:0.8.0
|
||||
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/zeppelin.sh"]
|
||||
env:
|
||||
- name: ZEPPELIN_HOME
|
||||
value: /zeppelin
|
||||
- name: ZEPPELIN_SERVER_RPC_PORTRANGE
|
||||
value: 12320:12320
|
||||
---
|
||||
kind: Service
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
namespace: default
|
||||
name: zeppelin-server # keep Service name the same to Pod name.
|
||||
spec:
|
||||
ports:
|
||||
- name: http
|
||||
port: 8080
|
||||
- name: rpc # port name is referenced in the code. So it shouldn't be changed.
|
||||
port: 12320
|
||||
selector:
|
||||
app: zeppelin-server
|
||||
|
|
@ -1,130 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import com.squareup.okhttp.Call;
|
||||
import io.kubernetes.client.ApiException;
|
||||
import io.kubernetes.client.util.Watch;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class AsyncWatcher<T> implements Runnable {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWatcher.class);
|
||||
private final WatchStopCondition<T> stopCondition;
|
||||
private final int timeoutSec;
|
||||
private final WatchCall call;
|
||||
private final Thread t;
|
||||
boolean stopConditionMatched = false;
|
||||
private long startTime;
|
||||
private Exception e;
|
||||
|
||||
public AsyncWatcher(WatchCall call, WatchStopCondition<T> stopCondition, int timeoutSec) throws ApiException {
|
||||
this.call = call;
|
||||
this.stopCondition = stopCondition;
|
||||
this.timeoutSec = timeoutSec;
|
||||
this.t = new Thread(this);
|
||||
t.start();
|
||||
}
|
||||
|
||||
public void run() {
|
||||
startTime = System.currentTimeMillis();
|
||||
String resourceVersion = "0";
|
||||
|
||||
while (System.currentTimeMillis() - startTime < timeoutSec * 1000) {
|
||||
try {
|
||||
resourceVersion = watchVersion(resourceVersion);
|
||||
if (stopConditionMatched) {
|
||||
break;
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
if (e.getCause() instanceof SocketTimeoutException) {
|
||||
// see https://github.com/kubernetes-client/java/issues/259
|
||||
continue;
|
||||
} else {
|
||||
this.e = e;
|
||||
LOGGER.error("Runtime exception on watch resource", e);
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
this.e = e;
|
||||
LOGGER.error("Exception on watch resource", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String watchVersion(String resourceVersion) throws ApiException {
|
||||
String newResourceVersion = resourceVersion;
|
||||
Watch<T> watch = Watch.createWatch(
|
||||
K8sStandardInterpreterLauncher.client,
|
||||
call.list(resourceVersion),
|
||||
new TypeToken<Watch.Response<T>>() {
|
||||
}.getType());
|
||||
|
||||
try {
|
||||
for (Watch.Response<T> item : watch) {
|
||||
newResourceVersion = getResourceVersion(item);
|
||||
if (stopCondition.shouldStop(item)) {
|
||||
stopConditionMatched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
watch.close();
|
||||
} catch (IOException e) {
|
||||
// error on close watch.
|
||||
}
|
||||
return newResourceVersion;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private String getResourceVersion(Watch.Response<T> item) {
|
||||
if (item instanceof Map) {
|
||||
Object metadata = ((Map) item).get("metadata");
|
||||
if (metadata != null && metadata instanceof Map) {
|
||||
return (String) ((Map) metadata).get("resourceVersion");
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isStopConditionMatched() {
|
||||
return stopConditionMatched;
|
||||
}
|
||||
|
||||
public boolean isWatching() {
|
||||
return t.isAlive();
|
||||
}
|
||||
|
||||
public void await() {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("Watcher interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,181 @@
|
|||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
||||
private static final Logger logger = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
|
||||
private final Kubectl kubectl;
|
||||
private final String interpreterGroupId;
|
||||
private final String interpreterGroupName;
|
||||
private final String interpreterSettingName;
|
||||
private final File specTempaltes;
|
||||
private final Properties properties;
|
||||
private final Map<String, String> envs;
|
||||
private final String zeppelinServiceHost;
|
||||
private final String zeppelinServiceRpcPort;
|
||||
private final int podCreateTimeoutSec = 180;
|
||||
|
||||
private final Gson gson = new Gson();
|
||||
|
||||
public K8sRemoteInterpreterProcess(
|
||||
Kubectl kubectl,
|
||||
File specTemplates,
|
||||
String interpreterGroupId,
|
||||
String interpreterGroupName,
|
||||
String interpreterSettingName,
|
||||
Properties properties,
|
||||
Map<String, String> envs,
|
||||
String zeppelinServiceHost,
|
||||
String zeppelinServiceRpcPort,
|
||||
int connectTimeout) {
|
||||
super(connectTimeout);
|
||||
this.kubectl = kubectl;
|
||||
this.specTempaltes = specTemplates;
|
||||
this.interpreterGroupId = interpreterGroupId;
|
||||
this.interpreterGroupName = interpreterGroupName;
|
||||
this.interpreterSettingName = interpreterSettingName;
|
||||
this.properties = properties;
|
||||
this.envs = envs;
|
||||
this.zeppelinServiceHost = zeppelinServiceHost;
|
||||
this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get interpreter pod name
|
||||
* @return
|
||||
*/
|
||||
private String getPodName() {
|
||||
return interpreterGroupId.toLowerCase();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInterpreterSettingName() {
|
||||
return interpreterSettingName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String userName) throws IOException {
|
||||
// create new pod
|
||||
apply(specTempaltes, false);
|
||||
kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", podCreateTimeoutSec);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
// delete pod
|
||||
try {
|
||||
apply(specTempaltes, true);
|
||||
kubectl.wait(String.format("pod/%s", getPodName()), "delete", 60);
|
||||
} catch (IOException e) {
|
||||
logger.error("Error on removing interpreter pod", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHost() {
|
||||
return String.format("%s.%s.svc.cluster.local",
|
||||
getPodName(), // service name and pod name is the same
|
||||
kubectl.getNamespace());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return 12321;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
try {
|
||||
String ret = kubectl.execAndGet(new String[]{
|
||||
"get",
|
||||
String.format("pods/%s", getPodName()),
|
||||
"-o",
|
||||
"json"
|
||||
});
|
||||
|
||||
if (ret == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Map<String, Object> pod = gson.fromJson(ret, new TypeToken<Map<String, Object>>() {}.getType());
|
||||
if (pod == null || !pod.containsKey("status")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Map<String, Object> status = (Map<String, Object>) pod.get("status");
|
||||
if (status == null || !status.containsKey("phase")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return "Running".equals(status.get("phase"));
|
||||
} catch (IOException e) {
|
||||
logger.error("Can't get pod status", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply spec file(s) in the path.
|
||||
* @param path
|
||||
*/
|
||||
void apply(File path, boolean delete) throws IOException {
|
||||
if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) {
|
||||
logger.info("Skip " + path.getAbsolutePath());
|
||||
}
|
||||
|
||||
if (path.isDirectory()) {
|
||||
File[] files = path.listFiles();
|
||||
Arrays.sort(files);
|
||||
if (delete) {
|
||||
ArrayUtils.reverse(files);
|
||||
}
|
||||
|
||||
for (File f : files) {
|
||||
apply(f, delete);
|
||||
}
|
||||
} else if (path.isFile()) {
|
||||
logger.info("Apply " + path.getAbsolutePath());
|
||||
K8sSpecTemplate specTemplate = new K8sSpecTemplate();
|
||||
specTemplate.putAll(getTemplateBindings());
|
||||
|
||||
String spec = specTemplate.render(path);
|
||||
if (delete) {
|
||||
kubectl.delete(spec);
|
||||
} else {
|
||||
kubectl.apply(spec);
|
||||
}
|
||||
} else {
|
||||
logger.error("Can't apply " + path.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Object> getTemplateBindings() throws IOException {
|
||||
HashMap<String, Object> var = new HashMap<String, Object>();
|
||||
var.put("NAMESPACE", kubectl.getNamespace());
|
||||
var.put("POD_NAME", getPodName());
|
||||
var.put("CONTAINER_NAME", interpreterGroupName.toLowerCase());
|
||||
var.put("CONTAINER_IMAGE", "apache/zeppelin:0.8.0");
|
||||
var.put("INTP_PORT", "12321"); // interpreter.sh -r
|
||||
var.put("INTP_ID", interpreterGroupId); // interpreter.sh -i
|
||||
var.put("INTP_NAME", interpreterGroupName); // interpreter.sh -d
|
||||
var.put("CALLBACK_HOST", zeppelinServiceHost); // interpreter.sh -c
|
||||
var.put("CALLBACK_PORT", zeppelinServiceRpcPort); // interpreter.sh -p
|
||||
var.put("INTP_SETTING", interpreterSettingName); // interpreter.sh -g
|
||||
var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l
|
||||
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
|
||||
return var;
|
||||
}
|
||||
}
|
||||
|
|
@ -15,34 +15,15 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.JsonSyntaxException;
|
||||
import com.hubspot.jinjava.Jinjava;
|
||||
import com.squareup.okhttp.Call;
|
||||
import io.kubernetes.client.ApiClient;
|
||||
import io.kubernetes.client.ApiException;
|
||||
import io.kubernetes.client.Configuration;
|
||||
import io.kubernetes.client.apis.CoreV1Api;
|
||||
import io.kubernetes.client.models.V1ConfigMap;
|
||||
import io.kubernetes.client.models.V1Pod;
|
||||
import io.kubernetes.client.models.V1Service;
|
||||
import io.kubernetes.client.util.Config;
|
||||
import io.kubernetes.client.util.Watch;
|
||||
import io.kubernetes.client.util.Yaml;
|
||||
import java.io.File;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterRunner;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -58,80 +39,16 @@ import java.util.Map;
|
|||
public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
|
||||
private static final String pretty = "true";
|
||||
private static Integer apiTimeoutSec = new Integer(120);
|
||||
private final Kubectl kubectl;
|
||||
private InterpreterLaunchContext context;
|
||||
|
||||
Jinjava jinja = new Jinjava();
|
||||
|
||||
static ApiClient client;
|
||||
static {
|
||||
try {
|
||||
client = Config.defaultClient();
|
||||
Configuration.setDefaultApiClient(client);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Can't get kubernetes client configuration", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
|
||||
public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException {
|
||||
super(zConf, recoveryStorage);
|
||||
kubectl = new Kubectl(zConf.getK8sKubectlCmd());
|
||||
kubectl.setNamespace(getNamespace());
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply spec file(s) in the path.
|
||||
* @param path
|
||||
*/
|
||||
void apply(File path) throws IOException {
|
||||
if (path.getName().startsWith(".") || path.isHidden()) {
|
||||
LOGGER.info("Skip {}", path.getAbsolutePath());
|
||||
}
|
||||
|
||||
if (path.isDirectory()) {
|
||||
File[] files = path.listFiles();
|
||||
Arrays.sort(files);
|
||||
for (File f : files) {
|
||||
apply(f);
|
||||
}
|
||||
} else if (path.isFile()) {
|
||||
LOGGER.info("Apply {}", path.getAbsolutePath());
|
||||
List<Object> yamls = Yaml.loadAll(
|
||||
jinja.render(
|
||||
readFile(path.getAbsolutePath(), Charset.defaultCharset()),
|
||||
getTemplateBindings()));
|
||||
if (yamls != null) {
|
||||
for (Object spec : yamls) {
|
||||
try {
|
||||
applySpec(spec);
|
||||
} catch (ApiException e) {
|
||||
LOGGER.error(e.getResponseBody());
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOGGER.error("Can't apply {}", path.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Map<String, Object> getTemplateBindings() throws IOException {
|
||||
HashMap<String, Object> var = new HashMap<String, Object>();
|
||||
var.put("NAMESPACE", getNamespace());
|
||||
var.put("POD_NAME", context.getInterpreterGroupId().toLowerCase());
|
||||
var.put("CONTAINER_NAME", context.getInterpreterSettingGroup().toLowerCase());
|
||||
var.put("CONTAINER_IMAGE", "apache/zeppelin:0.8.0");
|
||||
var.put("INTP_PORT", "12321"); // interpreter.sh -r
|
||||
var.put("INTP_NAME", context.getInterpreterSettingGroup()); // interpreter.sh -d
|
||||
var.put("CALLBACK_HOST", getZeppelinServiceHost()); // interpreter.sh -c
|
||||
var.put("CALLBACK_PORT", getZeppelinServiceRpcPort()); // interpreter.sh -p
|
||||
var.put("INTP_SETTING", context.getInterpreterSettingName()); // interpreter.sh -g
|
||||
var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l
|
||||
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
|
||||
return var;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if i'm running inside of kubernetes or not.
|
||||
|
|
@ -198,169 +115,24 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
|
|||
}
|
||||
}
|
||||
|
||||
AsyncWatcher<Map<String, Object>> w = null;
|
||||
String name;
|
||||
|
||||
// apply single spec
|
||||
AsyncWatcher applySpec(Object spec) throws ApiException, IOException {
|
||||
if (spec instanceof V1Pod) {
|
||||
V1Pod pod = (V1Pod) spec;
|
||||
CoreV1Api api = new CoreV1Api();
|
||||
String namespace = pod.getMetadata().getNamespace();
|
||||
name = pod.getMetadata().getName();
|
||||
|
||||
AsyncWatcher<Map<String, Object>> w =
|
||||
new AsyncWatcher<Map<String, Object>>(
|
||||
new WatchCall() {
|
||||
@Override
|
||||
public Call list(String resourceVersion) throws ApiException {
|
||||
return api.listNamespacedPodCall(
|
||||
namespace,
|
||||
pretty,
|
||||
null,
|
||||
String.format("metadata.name=%s", name),
|
||||
Boolean.TRUE,
|
||||
null,
|
||||
10,
|
||||
resourceVersion,
|
||||
apiTimeoutSec,
|
||||
true,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
},
|
||||
new WatchStopCondition<Map<String, Object>>() {
|
||||
@Override
|
||||
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
|
||||
// pod phase https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
|
||||
Object status = item.object.get("status");
|
||||
if (status == null) {
|
||||
return false;
|
||||
}
|
||||
System.out.println("Status = " + status);
|
||||
switch (((Map<String, String>)status).get("phase")) {
|
||||
case "Succeeded":
|
||||
case "Failed":
|
||||
case "Running":
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
},
|
||||
apiTimeoutSec);
|
||||
|
||||
api.createNamespacedPod(namespace, pod, pretty);
|
||||
} else if (spec instanceof V1Service) {
|
||||
V1Service service = (V1Service) spec;
|
||||
CoreV1Api api = new CoreV1Api();
|
||||
String namespace = service.getMetadata().getNamespace();
|
||||
String name = service.getMetadata().getName();
|
||||
|
||||
AsyncWatcher<Map<String, Object>> w =
|
||||
new AsyncWatcher<Map<String, Object>>(
|
||||
new WatchCall() {
|
||||
@Override
|
||||
public Call list(String resourceVersion) throws ApiException {
|
||||
return api.listNamespacedServiceCall(
|
||||
namespace,
|
||||
pretty,
|
||||
null,
|
||||
String.format("metadata.name=%s", name),
|
||||
Boolean.TRUE,
|
||||
null,
|
||||
10,
|
||||
resourceVersion,
|
||||
apiTimeoutSec,
|
||||
true,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
},
|
||||
new WatchStopCondition<Map<String, Object>>() {
|
||||
@Override
|
||||
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
|
||||
return "ADDED".equals(item.type);
|
||||
}
|
||||
},
|
||||
apiTimeoutSec);
|
||||
|
||||
try {
|
||||
api.createNamespacedService(namespace, service, pretty);
|
||||
} catch (JsonSyntaxException e) {
|
||||
// the API return is sometimes not a json message. That cause exception although creation of service actually success.
|
||||
// So need to ignore JsonSyntaxException here.
|
||||
}
|
||||
} else if (spec instanceof V1ConfigMap) {
|
||||
V1ConfigMap configMap = (V1ConfigMap) spec;
|
||||
CoreV1Api api = new CoreV1Api();
|
||||
String namespace = configMap.getMetadata().getNamespace();
|
||||
String name = configMap.getMetadata().getName();
|
||||
|
||||
AsyncWatcher<Map<String, Object>> w =
|
||||
new AsyncWatcher<Map<String, Object>>(
|
||||
new WatchCall() {
|
||||
@Override
|
||||
public Call list(String resourceVersion) throws ApiException {
|
||||
return api.listNamespacedConfigMapCall(
|
||||
namespace,
|
||||
pretty,
|
||||
null,
|
||||
String.format("metadata.name=%s", name),
|
||||
Boolean.TRUE,
|
||||
null,
|
||||
10,
|
||||
resourceVersion,
|
||||
apiTimeoutSec,
|
||||
true,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
},
|
||||
new WatchStopCondition<Map<String, Object>>() {
|
||||
@Override
|
||||
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
|
||||
return "ADDED".equals(item.type);
|
||||
}
|
||||
},
|
||||
apiTimeoutSec);
|
||||
|
||||
try {
|
||||
api.createNamespacedConfigMap(namespace, configMap, pretty);
|
||||
} catch (JsonSyntaxException e) {
|
||||
// the API return is sometimes not a json message. That cause exception although creation of service actually success.
|
||||
// So need to ignore JsonSyntaxException here.
|
||||
}
|
||||
}
|
||||
|
||||
if (w != null) {
|
||||
w.await();
|
||||
|
||||
if (!w.isStopConditionMatched()) {
|
||||
throw new IOException("Timeout on applying " + name);
|
||||
}
|
||||
|
||||
return w;
|
||||
} else {
|
||||
throw new IOException("Unsupported spec " + spec.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
|
||||
LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
|
||||
this.context = context;
|
||||
this.properties = context.getProperties();
|
||||
InterpreterOption option = context.getOption();
|
||||
InterpreterRunner runner = context.getRunner();
|
||||
String groupName = context.getInterpreterSettingGroup();
|
||||
String name = context.getInterpreterSettingName();
|
||||
int connectTimeout = getConnectTimeout();
|
||||
|
||||
// create new pod
|
||||
apply(new File(zConf.getK8sTemplatesDir(), "interpreter"));
|
||||
return null;
|
||||
return new K8sRemoteInterpreterProcess(
|
||||
kubectl,
|
||||
new File(zConf.getK8sTemplatesDir(), "interpreter"),
|
||||
context.getInterpreterGroupId(),
|
||||
context.getInterpreterSettingGroup(),
|
||||
context.getInterpreterSettingName(),
|
||||
properties,
|
||||
buildEnvFromProperties(context),
|
||||
getZeppelinServiceHost(),
|
||||
getZeppelinServiceRpcPort(),
|
||||
connectTimeout);
|
||||
}
|
||||
|
||||
protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
|
||||
|
|
|
|||
|
|
@ -19,7 +19,8 @@ package org.apache.zeppelin.interpreter.launcher;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
|
|
@ -27,46 +28,83 @@ import org.apache.commons.exec.PumpStreamHandler;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class Kubectl {
|
||||
private final Logger logger = LoggerFactory.getLogger(Kubectl.class);
|
||||
private final String kubectlCmd;
|
||||
private final Gson gson = new Gson();
|
||||
private String namespace;
|
||||
|
||||
public Kubectl(String kubectlCmd) {
|
||||
this.kubectlCmd = kubectlCmd;
|
||||
}
|
||||
|
||||
public Map<String, Object> apply(String spec) throws IOException {
|
||||
return execAndGetJson(new String[]{"apply", "-o", "json", "-f", "-"}, spec);
|
||||
/**
|
||||
* Override namespace. Otherwise use namespace provided in schema
|
||||
* @param namespace
|
||||
*/
|
||||
public void setNamespace(String namespace) {
|
||||
this.namespace = namespace;
|
||||
}
|
||||
|
||||
public Map<String, Object> delete(String spec) throws IOException {
|
||||
return execAndGetJson(new String[]{"delete", "-o", "json", "-f", "-"}, spec);
|
||||
public String getNamespace() {
|
||||
return namespace;
|
||||
}
|
||||
|
||||
Map<String, Object> execAndGetJson(String [] args) throws IOException {
|
||||
return execAndGetJson(args, "");
|
||||
public String apply(String spec) throws IOException {
|
||||
return execAndGet(new String[]{"apply", "-f", "-"}, spec);
|
||||
}
|
||||
|
||||
public String delete(String spec) throws IOException {
|
||||
return execAndGet(new String[]{"delete", "-f", "-"}, spec);
|
||||
}
|
||||
|
||||
public String wait(String resource, String waitFor, int timeoutSec) throws IOException {
|
||||
return execAndGet(new String[]{
|
||||
"wait",
|
||||
resource,
|
||||
String.format("--for=%s", waitFor),
|
||||
String.format("--timeout=%ds", timeoutSec)});
|
||||
}
|
||||
|
||||
String execAndGet(String [] args) throws IOException {
|
||||
return execAndGet(args, "");
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<String, Object> execAndGetJson(String [] args, String stdin) throws IOException {
|
||||
String execAndGet(String [] args, String stdin) throws IOException {
|
||||
InputStream ins = IOUtils.toInputStream(stdin);
|
||||
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
|
||||
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
|
||||
ArrayList<String> argsToOverride = new ArrayList<>(Arrays.asList(args));
|
||||
|
||||
int exitCode = execute(
|
||||
args,
|
||||
ins,
|
||||
stdout,
|
||||
stderr
|
||||
);
|
||||
// set namespace
|
||||
if (namespace != null) {
|
||||
argsToOverride.add("--namespace=" + namespace);
|
||||
}
|
||||
|
||||
if (exitCode == 0) {
|
||||
String output = new String(stdout.toByteArray());
|
||||
return gson.fromJson(output, new TypeToken<Map<String, Object>>() {}.getType());
|
||||
} else {
|
||||
throw new IOException(String.format("non zero return code (%d)", exitCode));
|
||||
logger.info("kubectl " + argsToOverride + "\n" + stdin);
|
||||
|
||||
try {
|
||||
int exitCode = execute(
|
||||
argsToOverride.toArray(new String[0]),
|
||||
ins,
|
||||
stdout,
|
||||
stderr
|
||||
);
|
||||
|
||||
if (exitCode == 0) {
|
||||
String output = new String(stdout.toByteArray());
|
||||
return output;
|
||||
} else {
|
||||
String output = new String(stderr.toByteArray());
|
||||
throw new IOException(String.format("non zero return code (%d). %s", exitCode, output));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String output = new String(stderr.toByteArray());
|
||||
throw new IOException(output, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,105 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class KubectlPollingWait implements Runnable {
|
||||
private final Logger logger = LoggerFactory.getLogger(KubectlPollingWait.class);
|
||||
private final Kubectl kubectl;
|
||||
private final int timeoutSec;
|
||||
private final int intervalSec;
|
||||
private final String[] args;
|
||||
private final ScheduledFuture<?> t;
|
||||
private Exception exception;
|
||||
private boolean stopConditionMatched = false;
|
||||
private Map<String, Object> lastResult;
|
||||
|
||||
@VisibleForTesting
|
||||
KubectlPollingWait(Kubectl kubectl, String [] args, int intervalSec, int timeoutSec) {
|
||||
this.kubectl = kubectl;
|
||||
this.args = args;
|
||||
this.timeoutSec = timeoutSec;
|
||||
this.intervalSec = intervalSec;
|
||||
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||
t = executor.schedule(this, 0, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < timeoutSec) {
|
||||
try {
|
||||
Map<String, Object> ret = kubectl.execAndGetJson(args);
|
||||
lastResult = ret;
|
||||
if (shouldStop(ret)) {
|
||||
stopConditionMatched = true;
|
||||
break;
|
||||
} else {
|
||||
Thread.sleep(intervalSec * 1000);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Error", e);
|
||||
exception = e;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
this.notify();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract boolean shouldStop(Map<String, Object> ret);
|
||||
|
||||
public boolean isDone() {
|
||||
return t != null && t.isDone() && stopConditionMatched;
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return !isDone();
|
||||
}
|
||||
|
||||
public Exception getException() {
|
||||
return exception;
|
||||
}
|
||||
|
||||
public Map<String, Object> getLastResult() {
|
||||
return lastResult;
|
||||
}
|
||||
|
||||
public void await() {
|
||||
while (isRunning()) {
|
||||
synchronized (this) {
|
||||
try {
|
||||
this.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,8 +0,0 @@
|
|||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.squareup.okhttp.Call;
|
||||
import io.kubernetes.client.ApiException;
|
||||
|
||||
public interface WatchCall {
|
||||
Call list(String resourceVersion) throws ApiException;
|
||||
}
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import io.kubernetes.client.util.Watch;
|
||||
|
||||
public interface WatchStopCondition<T> {
|
||||
boolean shouldStop(Watch.Response<T> item);
|
||||
}
|
||||
|
|
@ -53,8 +53,22 @@ public class K8sStandardInterpreterLauncherTest {
|
|||
properties.setProperty("CALLBACK_PORT", "12320");
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
option.setUserImpersonate(true);
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "sh", "name", 0, "host");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(
|
||||
properties,
|
||||
option,
|
||||
null,
|
||||
"user1",
|
||||
"intpGroupId",
|
||||
"groupId",
|
||||
"sh",
|
||||
"name",
|
||||
0,
|
||||
"host");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
K8sRemoteInterpreterProcess k8sintp = (K8sRemoteInterpreterProcess) client;
|
||||
k8sintp.start("user");
|
||||
// assertTrue(k8sintp.isRunning());
|
||||
// k8sintp.stop();
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import org.junit.Test;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
|
@ -92,28 +91,15 @@ public class KubectlTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testExecSpecAndGetJson() throws IOException {
|
||||
public void testExecSpecAndGet() throws IOException {
|
||||
// given
|
||||
Kubectl kubectl = new Kubectl("cat");
|
||||
String spec = "{'k1': 'v1', 'k2': 2}";
|
||||
|
||||
// when
|
||||
Map<String, Object> result = kubectl.execAndGetJson(new String[]{}, spec);
|
||||
String result = kubectl.execAndGet(new String[]{}, spec);
|
||||
|
||||
// then
|
||||
assertEquals("v1", result.get("k1"));
|
||||
assertEquals(2.0, result.get("k2"));
|
||||
}
|
||||
|
||||
@Test(expected = com.google.gson.JsonSyntaxException.class)
|
||||
public void testExecSpecAndGetJsonInvalidOutput() throws IOException {
|
||||
// given
|
||||
Kubectl kubectl = new Kubectl("cat");
|
||||
String spec = "Not a json format";
|
||||
|
||||
// when
|
||||
Map<String, Object> result = kubectl.execAndGetJson(new String[]{}, spec);
|
||||
|
||||
// then throw JsonSyntaxException
|
||||
assertEquals(spec, result);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue