K8sRemoteInterpreterProcess

This commit is contained in:
Lee moon soo 2018-11-20 12:38:00 +09:00 committed by Lee moon soo
parent 07489f76df
commit 5f602a65ef
11 changed files with 334 additions and 531 deletions

View file

@ -0,0 +1,30 @@
kind: Pod
apiVersion: v1
metadata:
namespace: {{NAMESPACE}}
name: {{POD_NAME}}
labels:
app: {{POD_NAME}}
spec:
automountServiceAccountToken: false
restartPolicy: Never
terminationGracePeriodSeconds: 10
containers:
- name: {{CONTAINER_NAME}}
image: {{CONTAINER_IMAGE}}
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{INTP_NAME}} -r {{INTP_PORT}}:{{INTP_PORT}} -c {{CALLBACK_HOST}} -p {{CALLBACK_PORT}} -i {{INTP_ID}} -l {{INTP_REPO}} -g {{INTP_SETTING}}"]
env:
- name: ZEPPELIN_HOME
value: /zeppelin
---
kind: Service
apiVersion: v1
metadata:
namespace: {{NAMESPACE}}
name: {{POD_NAME}} # keep Service name the same to Pod name.
spec:
ports:
- name: intp
port: 12321
selector:
app: {{POD_NAME}}

32
k8s/zeppelin.yaml Normal file
View file

@ -0,0 +1,32 @@
kind: Pod
apiVersion: v1
metadata:
namespace: default
name: zeppelin-server
labels:
app: zeppelin-server
spec:
automountServiceAccountToken: true
containers:
- name: zeppelin-server
image: apache/zeppelin:0.8.0
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/zeppelin.sh"]
env:
- name: ZEPPELIN_HOME
value: /zeppelin
- name: ZEPPELIN_SERVER_RPC_PORTRANGE
value: 12320:12320
---
kind: Service
apiVersion: v1
metadata:
namespace: default
name: zeppelin-server # keep Service name the same to Pod name.
spec:
ports:
- name: http
port: 8080
- name: rpc # port name is referenced in the code. So it shouldn't be changed.
port: 12320
selector:
app: zeppelin-server

View file

@ -1,130 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import com.google.gson.reflect.TypeToken;
import com.squareup.okhttp.Call;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Date;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AsyncWatcher<T> implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWatcher.class);
private final WatchStopCondition<T> stopCondition;
private final int timeoutSec;
private final WatchCall call;
private final Thread t;
boolean stopConditionMatched = false;
private long startTime;
private Exception e;
public AsyncWatcher(WatchCall call, WatchStopCondition<T> stopCondition, int timeoutSec) throws ApiException {
this.call = call;
this.stopCondition = stopCondition;
this.timeoutSec = timeoutSec;
this.t = new Thread(this);
t.start();
}
public void run() {
startTime = System.currentTimeMillis();
String resourceVersion = "0";
while (System.currentTimeMillis() - startTime < timeoutSec * 1000) {
try {
resourceVersion = watchVersion(resourceVersion);
if (stopConditionMatched) {
break;
}
} catch (RuntimeException e) {
if (e.getCause() instanceof SocketTimeoutException) {
// see https://github.com/kubernetes-client/java/issues/259
continue;
} else {
this.e = e;
LOGGER.error("Runtime exception on watch resource", e);
break;
}
} catch (Exception e) {
this.e = e;
LOGGER.error("Exception on watch resource", e);
break;
}
}
}
private String watchVersion(String resourceVersion) throws ApiException {
String newResourceVersion = resourceVersion;
Watch<T> watch = Watch.createWatch(
K8sStandardInterpreterLauncher.client,
call.list(resourceVersion),
new TypeToken<Watch.Response<T>>() {
}.getType());
try {
for (Watch.Response<T> item : watch) {
newResourceVersion = getResourceVersion(item);
if (stopCondition.shouldStop(item)) {
stopConditionMatched = true;
break;
}
}
} finally {
try {
watch.close();
} catch (IOException e) {
// error on close watch.
}
return newResourceVersion;
}
}
private String getResourceVersion(Watch.Response<T> item) {
if (item instanceof Map) {
Object metadata = ((Map) item).get("metadata");
if (metadata != null && metadata instanceof Map) {
return (String) ((Map) metadata).get("resourceVersion");
} else {
return null;
}
} else {
return null;
}
}
public boolean isStopConditionMatched() {
return stopConditionMatched;
}
public boolean isWatching() {
return t.isAlive();
}
public void await() {
try {
t.join();
} catch (InterruptedException e) {
LOGGER.error("Watcher interrupted", e);
}
}
}

View file

@ -0,0 +1,181 @@
package org.apache.zeppelin.interpreter.launcher;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private static final Logger logger = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
private final Kubectl kubectl;
private final String interpreterGroupId;
private final String interpreterGroupName;
private final String interpreterSettingName;
private final File specTempaltes;
private final Properties properties;
private final Map<String, String> envs;
private final String zeppelinServiceHost;
private final String zeppelinServiceRpcPort;
private final int podCreateTimeoutSec = 180;
private final Gson gson = new Gson();
public K8sRemoteInterpreterProcess(
Kubectl kubectl,
File specTemplates,
String interpreterGroupId,
String interpreterGroupName,
String interpreterSettingName,
Properties properties,
Map<String, String> envs,
String zeppelinServiceHost,
String zeppelinServiceRpcPort,
int connectTimeout) {
super(connectTimeout);
this.kubectl = kubectl;
this.specTempaltes = specTemplates;
this.interpreterGroupId = interpreterGroupId;
this.interpreterGroupName = interpreterGroupName;
this.interpreterSettingName = interpreterSettingName;
this.properties = properties;
this.envs = envs;
this.zeppelinServiceHost = zeppelinServiceHost;
this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
}
/**
* Get interpreter pod name
* @return
*/
private String getPodName() {
return interpreterGroupId.toLowerCase();
}
@Override
public String getInterpreterSettingName() {
return interpreterSettingName;
}
@Override
public void start(String userName) throws IOException {
// create new pod
apply(specTempaltes, false);
kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", podCreateTimeoutSec);
}
@Override
public void stop() {
// delete pod
try {
apply(specTempaltes, true);
kubectl.wait(String.format("pod/%s", getPodName()), "delete", 60);
} catch (IOException e) {
logger.error("Error on removing interpreter pod", e);
}
}
@Override
public String getHost() {
return String.format("%s.%s.svc.cluster.local",
getPodName(), // service name and pod name is the same
kubectl.getNamespace());
}
@Override
public int getPort() {
return 12321;
}
@Override
public boolean isRunning() {
try {
String ret = kubectl.execAndGet(new String[]{
"get",
String.format("pods/%s", getPodName()),
"-o",
"json"
});
if (ret == null) {
return false;
}
Map<String, Object> pod = gson.fromJson(ret, new TypeToken<Map<String, Object>>() {}.getType());
if (pod == null || !pod.containsKey("status")) {
return false;
}
Map<String, Object> status = (Map<String, Object>) pod.get("status");
if (status == null || !status.containsKey("phase")) {
return false;
}
return "Running".equals(status.get("phase"));
} catch (IOException e) {
logger.error("Can't get pod status", e);
return false;
}
}
/**
* Apply spec file(s) in the path.
* @param path
*/
void apply(File path, boolean delete) throws IOException {
if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) {
logger.info("Skip " + path.getAbsolutePath());
}
if (path.isDirectory()) {
File[] files = path.listFiles();
Arrays.sort(files);
if (delete) {
ArrayUtils.reverse(files);
}
for (File f : files) {
apply(f, delete);
}
} else if (path.isFile()) {
logger.info("Apply " + path.getAbsolutePath());
K8sSpecTemplate specTemplate = new K8sSpecTemplate();
specTemplate.putAll(getTemplateBindings());
String spec = specTemplate.render(path);
if (delete) {
kubectl.delete(spec);
} else {
kubectl.apply(spec);
}
} else {
logger.error("Can't apply " + path.getAbsolutePath());
}
}
Map<String, Object> getTemplateBindings() throws IOException {
HashMap<String, Object> var = new HashMap<String, Object>();
var.put("NAMESPACE", kubectl.getNamespace());
var.put("POD_NAME", getPodName());
var.put("CONTAINER_NAME", interpreterGroupName.toLowerCase());
var.put("CONTAINER_IMAGE", "apache/zeppelin:0.8.0");
var.put("INTP_PORT", "12321"); // interpreter.sh -r
var.put("INTP_ID", interpreterGroupId); // interpreter.sh -i
var.put("INTP_NAME", interpreterGroupName); // interpreter.sh -d
var.put("CALLBACK_HOST", zeppelinServiceHost); // interpreter.sh -c
var.put("CALLBACK_PORT", zeppelinServiceRpcPort); // interpreter.sh -p
var.put("INTP_SETTING", interpreterSettingName); // interpreter.sh -g
var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
return var;
}
}

View file

@ -15,34 +15,15 @@
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import com.google.common.collect.Maps;
import com.google.gson.JsonSyntaxException;
import com.hubspot.jinjava.Jinjava;
import com.squareup.okhttp.Call;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.Yaml;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterRunner;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
@ -58,80 +39,16 @@ import java.util.Map;
public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
private static final String pretty = "true";
private static Integer apiTimeoutSec = new Integer(120);
private final Kubectl kubectl;
private InterpreterLaunchContext context;
Jinjava jinja = new Jinjava();
static ApiClient client;
static {
try {
client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
} catch (IOException e) {
LOGGER.error("Can't get kubernetes client configuration", e);
}
}
public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException {
super(zConf, recoveryStorage);
kubectl = new Kubectl(zConf.getK8sKubectlCmd());
kubectl.setNamespace(getNamespace());
}
/**
* Apply spec file(s) in the path.
* @param path
*/
void apply(File path) throws IOException {
if (path.getName().startsWith(".") || path.isHidden()) {
LOGGER.info("Skip {}", path.getAbsolutePath());
}
if (path.isDirectory()) {
File[] files = path.listFiles();
Arrays.sort(files);
for (File f : files) {
apply(f);
}
} else if (path.isFile()) {
LOGGER.info("Apply {}", path.getAbsolutePath());
List<Object> yamls = Yaml.loadAll(
jinja.render(
readFile(path.getAbsolutePath(), Charset.defaultCharset()),
getTemplateBindings()));
if (yamls != null) {
for (Object spec : yamls) {
try {
applySpec(spec);
} catch (ApiException e) {
LOGGER.error(e.getResponseBody());
throw new IOException(e);
}
}
}
} else {
LOGGER.error("Can't apply {}", path.getAbsolutePath());
}
}
Map<String, Object> getTemplateBindings() throws IOException {
HashMap<String, Object> var = new HashMap<String, Object>();
var.put("NAMESPACE", getNamespace());
var.put("POD_NAME", context.getInterpreterGroupId().toLowerCase());
var.put("CONTAINER_NAME", context.getInterpreterSettingGroup().toLowerCase());
var.put("CONTAINER_IMAGE", "apache/zeppelin:0.8.0");
var.put("INTP_PORT", "12321"); // interpreter.sh -r
var.put("INTP_NAME", context.getInterpreterSettingGroup()); // interpreter.sh -d
var.put("CALLBACK_HOST", getZeppelinServiceHost()); // interpreter.sh -c
var.put("CALLBACK_PORT", getZeppelinServiceRpcPort()); // interpreter.sh -p
var.put("INTP_SETTING", context.getInterpreterSettingName()); // interpreter.sh -g
var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
return var;
}
/**
* Check if i'm running inside of kubernetes or not.
@ -198,169 +115,24 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
}
}
AsyncWatcher<Map<String, Object>> w = null;
String name;
// apply single spec
AsyncWatcher applySpec(Object spec) throws ApiException, IOException {
if (spec instanceof V1Pod) {
V1Pod pod = (V1Pod) spec;
CoreV1Api api = new CoreV1Api();
String namespace = pod.getMetadata().getNamespace();
name = pod.getMetadata().getName();
AsyncWatcher<Map<String, Object>> w =
new AsyncWatcher<Map<String, Object>>(
new WatchCall() {
@Override
public Call list(String resourceVersion) throws ApiException {
return api.listNamespacedPodCall(
namespace,
pretty,
null,
String.format("metadata.name=%s", name),
Boolean.TRUE,
null,
10,
resourceVersion,
apiTimeoutSec,
true,
null,
null);
}
},
new WatchStopCondition<Map<String, Object>>() {
@Override
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
// pod phase https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
Object status = item.object.get("status");
if (status == null) {
return false;
}
System.out.println("Status = " + status);
switch (((Map<String, String>)status).get("phase")) {
case "Succeeded":
case "Failed":
case "Running":
return true;
default:
return false;
}
}
},
apiTimeoutSec);
api.createNamespacedPod(namespace, pod, pretty);
} else if (spec instanceof V1Service) {
V1Service service = (V1Service) spec;
CoreV1Api api = new CoreV1Api();
String namespace = service.getMetadata().getNamespace();
String name = service.getMetadata().getName();
AsyncWatcher<Map<String, Object>> w =
new AsyncWatcher<Map<String, Object>>(
new WatchCall() {
@Override
public Call list(String resourceVersion) throws ApiException {
return api.listNamespacedServiceCall(
namespace,
pretty,
null,
String.format("metadata.name=%s", name),
Boolean.TRUE,
null,
10,
resourceVersion,
apiTimeoutSec,
true,
null,
null);
}
},
new WatchStopCondition<Map<String, Object>>() {
@Override
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
return "ADDED".equals(item.type);
}
},
apiTimeoutSec);
try {
api.createNamespacedService(namespace, service, pretty);
} catch (JsonSyntaxException e) {
// the API return is sometimes not a json message. That cause exception although creation of service actually success.
// So need to ignore JsonSyntaxException here.
}
} else if (spec instanceof V1ConfigMap) {
V1ConfigMap configMap = (V1ConfigMap) spec;
CoreV1Api api = new CoreV1Api();
String namespace = configMap.getMetadata().getNamespace();
String name = configMap.getMetadata().getName();
AsyncWatcher<Map<String, Object>> w =
new AsyncWatcher<Map<String, Object>>(
new WatchCall() {
@Override
public Call list(String resourceVersion) throws ApiException {
return api.listNamespacedConfigMapCall(
namespace,
pretty,
null,
String.format("metadata.name=%s", name),
Boolean.TRUE,
null,
10,
resourceVersion,
apiTimeoutSec,
true,
null,
null);
}
},
new WatchStopCondition<Map<String, Object>>() {
@Override
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
return "ADDED".equals(item.type);
}
},
apiTimeoutSec);
try {
api.createNamespacedConfigMap(namespace, configMap, pretty);
} catch (JsonSyntaxException e) {
// the API return is sometimes not a json message. That cause exception although creation of service actually success.
// So need to ignore JsonSyntaxException here.
}
}
if (w != null) {
w.await();
if (!w.isStopConditionMatched()) {
throw new IOException("Timeout on applying " + name);
}
return w;
} else {
throw new IOException("Unsupported spec " + spec.getClass().getName());
}
}
@Override
public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
this.context = context;
this.properties = context.getProperties();
InterpreterOption option = context.getOption();
InterpreterRunner runner = context.getRunner();
String groupName = context.getInterpreterSettingGroup();
String name = context.getInterpreterSettingName();
int connectTimeout = getConnectTimeout();
// create new pod
apply(new File(zConf.getK8sTemplatesDir(), "interpreter"));
return null;
return new K8sRemoteInterpreterProcess(
kubectl,
new File(zConf.getK8sTemplatesDir(), "interpreter"),
context.getInterpreterGroupId(),
context.getInterpreterSettingGroup(),
context.getInterpreterSettingName(),
properties,
buildEnvFromProperties(context),
getZeppelinServiceHost(),
getZeppelinServiceRpcPort(),
connectTimeout);
}
protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {

View file

@ -19,7 +19,8 @@ package org.apache.zeppelin.interpreter.launcher;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
@ -27,46 +28,83 @@ import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.io.IOUtils;
import java.io.*;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Kubectl {
private final Logger logger = LoggerFactory.getLogger(Kubectl.class);
private final String kubectlCmd;
private final Gson gson = new Gson();
private String namespace;
public Kubectl(String kubectlCmd) {
this.kubectlCmd = kubectlCmd;
}
public Map<String, Object> apply(String spec) throws IOException {
return execAndGetJson(new String[]{"apply", "-o", "json", "-f", "-"}, spec);
/**
* Override namespace. Otherwise use namespace provided in schema
* @param namespace
*/
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public Map<String, Object> delete(String spec) throws IOException {
return execAndGetJson(new String[]{"delete", "-o", "json", "-f", "-"}, spec);
public String getNamespace() {
return namespace;
}
Map<String, Object> execAndGetJson(String [] args) throws IOException {
return execAndGetJson(args, "");
public String apply(String spec) throws IOException {
return execAndGet(new String[]{"apply", "-f", "-"}, spec);
}
public String delete(String spec) throws IOException {
return execAndGet(new String[]{"delete", "-f", "-"}, spec);
}
public String wait(String resource, String waitFor, int timeoutSec) throws IOException {
return execAndGet(new String[]{
"wait",
resource,
String.format("--for=%s", waitFor),
String.format("--timeout=%ds", timeoutSec)});
}
String execAndGet(String [] args) throws IOException {
return execAndGet(args, "");
}
@VisibleForTesting
Map<String, Object> execAndGetJson(String [] args, String stdin) throws IOException {
String execAndGet(String [] args, String stdin) throws IOException {
InputStream ins = IOUtils.toInputStream(stdin);
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
ArrayList<String> argsToOverride = new ArrayList<>(Arrays.asList(args));
int exitCode = execute(
args,
ins,
stdout,
stderr
);
// set namespace
if (namespace != null) {
argsToOverride.add("--namespace=" + namespace);
}
if (exitCode == 0) {
String output = new String(stdout.toByteArray());
return gson.fromJson(output, new TypeToken<Map<String, Object>>() {}.getType());
} else {
throw new IOException(String.format("non zero return code (%d)", exitCode));
logger.info("kubectl " + argsToOverride + "\n" + stdin);
try {
int exitCode = execute(
argsToOverride.toArray(new String[0]),
ins,
stdout,
stderr
);
if (exitCode == 0) {
String output = new String(stdout.toByteArray());
return output;
} else {
String output = new String(stderr.toByteArray());
throw new IOException(String.format("non zero return code (%d). %s", exitCode, output));
}
} catch (Exception e) {
String output = new String(stderr.toByteArray());
throw new IOException(output, e);
}
}

View file

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public abstract class KubectlPollingWait implements Runnable {
private final Logger logger = LoggerFactory.getLogger(KubectlPollingWait.class);
private final Kubectl kubectl;
private final int timeoutSec;
private final int intervalSec;
private final String[] args;
private final ScheduledFuture<?> t;
private Exception exception;
private boolean stopConditionMatched = false;
private Map<String, Object> lastResult;
@VisibleForTesting
KubectlPollingWait(Kubectl kubectl, String [] args, int intervalSec, int timeoutSec) {
this.kubectl = kubectl;
this.args = args;
this.timeoutSec = timeoutSec;
this.intervalSec = intervalSec;
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
t = executor.schedule(this, 0, TimeUnit.SECONDS);
}
@Override
public void run() {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeoutSec) {
try {
Map<String, Object> ret = kubectl.execAndGetJson(args);
lastResult = ret;
if (shouldStop(ret)) {
stopConditionMatched = true;
break;
} else {
Thread.sleep(intervalSec * 1000);
}
} catch (Exception e) {
logger.error("Error", e);
exception = e;
break;
}
}
synchronized (this) {
this.notify();
}
}
protected abstract boolean shouldStop(Map<String, Object> ret);
public boolean isDone() {
return t != null && t.isDone() && stopConditionMatched;
}
public boolean isRunning() {
return !isDone();
}
public Exception getException() {
return exception;
}
public Map<String, Object> getLastResult() {
return lastResult;
}
public void await() {
while (isRunning()) {
synchronized (this) {
try {
this.wait(1000);
} catch (InterruptedException e) {
// ignore
}
}
}
}
}

View file

@ -1,8 +0,0 @@
package org.apache.zeppelin.interpreter.launcher;
import com.squareup.okhttp.Call;
import io.kubernetes.client.ApiException;
public interface WatchCall {
Call list(String resourceVersion) throws ApiException;
}

View file

@ -1,7 +0,0 @@
package org.apache.zeppelin.interpreter.launcher;
import io.kubernetes.client.util.Watch;
public interface WatchStopCondition<T> {
boolean shouldStop(Watch.Response<T> item);
}

View file

@ -53,8 +53,22 @@ public class K8sStandardInterpreterLauncherTest {
properties.setProperty("CALLBACK_PORT", "12320");
InterpreterOption option = new InterpreterOption();
option.setUserImpersonate(true);
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "sh", "name", 0, "host");
InterpreterLaunchContext context = new InterpreterLaunchContext(
properties,
option,
null,
"user1",
"intpGroupId",
"groupId",
"sh",
"name",
0,
"host");
InterpreterClient client = launcher.launch(context);
K8sRemoteInterpreterProcess k8sintp = (K8sRemoteInterpreterProcess) client;
k8sintp.start("user");
// assertTrue(k8sintp.isRunning());
// k8sintp.stop();
}
/*

View file

@ -22,7 +22,6 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -92,28 +91,15 @@ public class KubectlTest {
}
@Test
public void testExecSpecAndGetJson() throws IOException {
public void testExecSpecAndGet() throws IOException {
// given
Kubectl kubectl = new Kubectl("cat");
String spec = "{'k1': 'v1', 'k2': 2}";
// when
Map<String, Object> result = kubectl.execAndGetJson(new String[]{}, spec);
String result = kubectl.execAndGet(new String[]{}, spec);
// then
assertEquals("v1", result.get("k1"));
assertEquals(2.0, result.get("k2"));
}
@Test(expected = com.google.gson.JsonSyntaxException.class)
public void testExecSpecAndGetJsonInvalidOutput() throws IOException {
// given
Kubectl kubectl = new Kubectl("cat");
String spec = "Not a json format";
// when
Map<String, Object> result = kubectl.execAndGetJson(new String[]{}, spec);
// then throw JsonSyntaxException
assertEquals(spec, result);
}
}