add k8s-standard launcher module

This commit is contained in:
Lee moon soo 2018-11-05 09:43:19 -08:00 committed by Lee moon soo
parent 09dc9fcae5
commit d2f3d5b7e1
8 changed files with 728 additions and 0 deletions

View file

@ -665,6 +665,14 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT);
}
public String getK8sNamespace() {
return getString(ConfVars.ZEPPELIN_K8S_NAMESPACE);
}
public String getK8sTemplatesDir() {
return getRelativeDir(ConfVars.ZEPPELIN_K8S_TEMPLATE_DIR);
}
public Map<String, String> dumpConfigurations(Predicate<String> predicate) {
Map<String, String> properties = new HashMap<>();
@ -816,6 +824,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 3000),
ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000),
ZEPPELIN_K8S_NAMESPACE("zeppelin.k8s.namespace", "default"),
ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""),

View file

@ -0,0 +1,97 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zengine-plugins-parent</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../../../zeppelin-plugins</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>launcher-k8s-standard</artifactId>
<packaging>jar</packaging>
<version>0.9.0-SNAPSHOT</version>
<name>Zeppelin: Plugin Kubernetes StandardLauncher</name>
<description>Launcher implementation to run interpreters on Kubernetes</description>
<properties>
<plugin.name>Launcher/StandardInterpreterLauncher</plugin.name>
</properties>
<dependencies>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.hubspot.jinjava</groupId>
<artifactId>jinjava</artifactId>
<version>2.0.11-java7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.zeppelin.k8slauncher.com.google</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import com.google.gson.reflect.TypeToken;
import com.squareup.okhttp.Call;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Date;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AsyncWatcher<T> implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWatcher.class);
private final WatchStopCondition<T> stopCondition;
private final int timeoutSec;
private final WatchCall call;
private final Thread t;
boolean stopConditionMatched = false;
private long startTime;
private Exception e;
public AsyncWatcher(WatchCall call, WatchStopCondition<T> stopCondition, int timeoutSec) throws ApiException {
this.call = call;
this.stopCondition = stopCondition;
this.timeoutSec = timeoutSec;
this.t = new Thread(this);
t.start();
}
public void run() {
startTime = System.currentTimeMillis();
String resourceVersion = "0";
while (System.currentTimeMillis() - startTime < timeoutSec * 1000) {
try {
resourceVersion = watchVersion(resourceVersion);
if (stopConditionMatched) {
break;
}
} catch (RuntimeException e) {
if (e.getCause() instanceof SocketTimeoutException) {
// see https://github.com/kubernetes-client/java/issues/259
continue;
} else {
this.e = e;
LOGGER.error("Runtime exception on watch resource", e);
break;
}
} catch (Exception e) {
this.e = e;
LOGGER.error("Exception on watch resource", e);
break;
}
}
}
private String watchVersion(String resourceVersion) throws ApiException {
String newResourceVersion = resourceVersion;
Watch<T> watch = Watch.createWatch(
K8sStandardInterpreterLauncher.client,
call.list(resourceVersion),
new TypeToken<Watch.Response<T>>() {
}.getType());
try {
for (Watch.Response<T> item : watch) {
newResourceVersion = getResourceVersion(item);
if (stopCondition.shouldStop(item)) {
stopConditionMatched = true;
break;
}
}
} finally {
try {
watch.close();
} catch (IOException e) {
// error on close watch.
}
return newResourceVersion;
}
}
private String getResourceVersion(Watch.Response<T> item) {
if (item instanceof Map) {
Object metadata = ((Map) item).get("metadata");
if (metadata != null && metadata instanceof Map) {
return (String) ((Map) metadata).get("resourceVersion");
} else {
return null;
}
} else {
return null;
}
}
public boolean isStopConditionMatched() {
return stopConditionMatched;
}
public boolean isWatching() {
return t.isAlive();
}
public void await() {
try {
t.join();
} catch (InterruptedException e) {
LOGGER.error("Watcher interrupted", e);
}
}
}

View file

@ -0,0 +1,389 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.gson.JsonSyntaxException;
import com.hubspot.jinjava.Jinjava;
import com.squareup.okhttp.Call;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.models.V1PodStatus;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.Yaml;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterRunner;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Interpreter Launcher which use shell script to launch the interpreter process.
*/
public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
private static final String pretty = "true";
private static Integer apiTimeoutSec = new Integer(120);
private InterpreterLaunchContext context;
Jinjava jinja = new Jinjava();
static ApiClient client;
static {
try {
client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
} catch (IOException e) {
LOGGER.error("Can't get kubernetes client configuration", e);
}
}
public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
super(zConf, recoveryStorage);
}
/**
* Apply spec file(s) in the path.
* @param path
*/
void apply(File path) throws IOException {
if (path.getName().startsWith(".") || path.isHidden()) {
LOGGER.info("Skip {}", path.getAbsolutePath());
}
if (path.isDirectory()) {
File[] files = path.listFiles();
Arrays.sort(files);
for (File f : files) {
apply(f);
}
} else if (path.isFile()) {
LOGGER.info("Apply {}", path.getAbsolutePath());
List<Object> yamls = Yaml.loadAll(
jinja.render(
readFile(path.getAbsolutePath(), Charset.defaultCharset()),
getTemplateBindings()));
if (yamls != null) {
for (Object spec : yamls) {
try {
applySpec(spec);
} catch (ApiException e) {
LOGGER.error(e.getResponseBody());
throw new IOException(e);
}
}
}
} else {
LOGGER.error("Can't apply {}", path.getAbsolutePath());
}
}
Map<String, Object> getTemplateBindings() throws IOException {
HashMap<String, Object> var = new HashMap<String, Object>();
var.put("NAMESPACE", getNamespace());
var.put("POD_NAME", context.getInterpreterGroupId().toLowerCase());
var.put("CONTAINER_NAME", context.getInterpreterSettingGroup().toLowerCase());
var.put("CONTAINER_IMAGE", "apache/zeppelin:0.8.0");
var.put("INTP_PORT", "12321"); // interpreter.sh -r
var.put("INTP_NAME", context.getInterpreterSettingGroup()); // interpreter.sh -d
var.put("CALLBACK_HOST", getZeppelinServiceHost()); // interpreter.sh -c
var.put("CALLBACK_PORT", getZeppelinServiceRpcPort()); // interpreter.sh -p
var.put("INTP_SETTING", context.getInterpreterSettingName()); // interpreter.sh -g
var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
return var;
}
/**
* Check if i'm running inside of kubernetes or not.
* @return
*/
boolean isRunningOnKubernetes() {
if (new File("/var/run/secrets/kubernetes.io").exists()) {
return true;
} else {
return false;
}
}
/**
* Get current namespace
* @throws IOException
*/
String getNamespace() throws IOException {
if (isRunningOnKubernetes()) {
return readFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace", Charset.defaultCharset()).trim();
} else {
return "default";
}
}
/**
* Get hostname. It should be the same to Service name (and Pod name) of the Kubernetes
* @return
*/
String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
return "localhost";
}
}
/**
* get Zeppelin server host dns.
* return <hostname>.<namespace>.svc.cluster.local
* @throws IOException
*/
private String getZeppelinServiceHost() throws IOException {
if (isRunningOnKubernetes()) {
return String.format("%s.%s.svc.cluster.local",
getHostname(), // service name and pod name should be the same
getNamespace());
} else {
return context.getZeppelinServerHost();
}
}
/**
* get Zeppelin server rpc port
* Read env variable "<HOSTNAME>_SERVICE_PORT_RPC"
*/
private String getZeppelinServiceRpcPort() {
String envServicePort = System.getenv(
String.format("%s_SERVICE_PORT_RPC", getHostname().replaceAll("[-.]", "_").toUpperCase()));
if (envServicePort != null) {
return envServicePort;
} else {
return Integer.toString(context.getZeppelinServerRPCPort());
}
}
AsyncWatcher<Map<String, Object>> w = null;
String name;
// apply single spec
AsyncWatcher applySpec(Object spec) throws ApiException, IOException {
if (spec instanceof V1Pod) {
V1Pod pod = (V1Pod) spec;
CoreV1Api api = new CoreV1Api();
String namespace = pod.getMetadata().getNamespace();
name = pod.getMetadata().getName();
AsyncWatcher<Map<String, Object>> w =
new AsyncWatcher<Map<String, Object>>(
new WatchCall() {
@Override
public Call list(String resourceVersion) throws ApiException {
return api.listNamespacedPodCall(
namespace,
pretty,
null,
String.format("metadata.name=%s", name),
Boolean.TRUE,
null,
10,
resourceVersion,
apiTimeoutSec,
true,
null,
null);
}
},
new WatchStopCondition<Map<String, Object>>() {
@Override
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
// pod phase https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
Object status = item.object.get("status");
if (status == null) {
return false;
}
System.out.println("Status = " + status);
switch (((Map<String, String>)status).get("phase")) {
case "Succeeded":
case "Failed":
case "Running":
return true;
default:
return false;
}
}
},
apiTimeoutSec);
api.createNamespacedPod(namespace, pod, pretty);
} else if (spec instanceof V1Service) {
V1Service service = (V1Service) spec;
CoreV1Api api = new CoreV1Api();
String namespace = service.getMetadata().getNamespace();
String name = service.getMetadata().getName();
AsyncWatcher<Map<String, Object>> w =
new AsyncWatcher<Map<String, Object>>(
new WatchCall() {
@Override
public Call list(String resourceVersion) throws ApiException {
return api.listNamespacedServiceCall(
namespace,
pretty,
null,
String.format("metadata.name=%s", name),
Boolean.TRUE,
null,
10,
resourceVersion,
apiTimeoutSec,
true,
null,
null);
}
},
new WatchStopCondition<Map<String, Object>>() {
@Override
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
return "ADDED".equals(item.type);
}
},
apiTimeoutSec);
try {
api.createNamespacedService(namespace, service, pretty);
} catch (JsonSyntaxException e) {
// the API return is sometimes not a json message. That cause exception although creation of service actually success.
// So need to ignore JsonSyntaxException here.
}
} else if (spec instanceof V1ConfigMap) {
V1ConfigMap configMap = (V1ConfigMap) spec;
CoreV1Api api = new CoreV1Api();
String namespace = configMap.getMetadata().getNamespace();
String name = configMap.getMetadata().getName();
AsyncWatcher<Map<String, Object>> w =
new AsyncWatcher<Map<String, Object>>(
new WatchCall() {
@Override
public Call list(String resourceVersion) throws ApiException {
return api.listNamespacedConfigMapCall(
namespace,
pretty,
null,
String.format("metadata.name=%s", name),
Boolean.TRUE,
null,
10,
resourceVersion,
apiTimeoutSec,
true,
null,
null);
}
},
new WatchStopCondition<Map<String, Object>>() {
@Override
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
return "ADDED".equals(item.type);
}
},
apiTimeoutSec);
try {
api.createNamespacedConfigMap(namespace, configMap, pretty);
} catch (JsonSyntaxException e) {
// the API return is sometimes not a json message. That cause exception although creation of service actually success.
// So need to ignore JsonSyntaxException here.
}
}
if (w != null) {
w.await();
if (!w.isStopConditionMatched()) {
throw new IOException("Timeout on applying " + name);
}
return w;
} else {
throw new IOException("Unsupported spec " + spec.getClass().getName());
}
}
@Override
public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
this.context = context;
this.properties = context.getProperties();
InterpreterOption option = context.getOption();
InterpreterRunner runner = context.getRunner();
String groupName = context.getInterpreterSettingGroup();
String name = context.getInterpreterSettingName();
int connectTimeout = getConnectTimeout();
// create new pod
apply(new File(zConf.getK8sTemplatesDir(), "interpreter"));
return null;
}
protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
Map<String, String> env = new HashMap<>();
for (Object key : context.getProperties().keySet()) {
if (RemoteInterpreterUtils.isEnvString((String) key)) {
env.put((String) key, context.getProperties().getProperty((String) key));
}
// TODO(zjffdu) move this to FlinkInterpreterLauncher
if (key.toString().equals("FLINK_HOME")) {
String flinkHome = context.getProperties().get(key).toString();
env.put("FLINK_CONF_DIR", flinkHome + "/conf");
env.put("FLINK_LIB_DIR", flinkHome + "/lib");
}
}
env.put("INTERPRETER_GROUP_ID", context.getInterpreterGroupId());
return env;
}
String readFile(String path, Charset encoding) throws IOException {
byte[] encoded = Files.readAllBytes(Paths.get(path));
return new String(encoded, encoding);
}
}

View file

@ -0,0 +1,8 @@
package org.apache.zeppelin.interpreter.launcher;
import com.squareup.okhttp.Call;
import io.kubernetes.client.ApiException;
public interface WatchCall {
Call list(String resourceVersion) throws ApiException;
}

View file

@ -0,0 +1,7 @@
package org.apache.zeppelin.interpreter.launcher;
import io.kubernetes.client.util.Watch;
public interface WatchStopCondition<T> {
boolean shouldStop(Watch.Response<T> item);
}

View file

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* https://github.com/LiliC/travis-minikube
* https://blog.travis-ci.com/2017-10-26-running-kubernetes-on-travis-ci-with-minikube
*/
public class K8sStandardInterpreterLauncherTest {
@Before
public void setUp() {
for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
System.clearProperty(confVar.getVarName());
}
}
@Test
public void testTemplate() throws IOException {
// given
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
properties.setProperty("CALLBACK_HOST", "zeppelin-server.default.svc.cluster.local");
properties.setProperty("CALLBACK_PORT", "12320");
InterpreterOption option = new InterpreterOption();
option.setUserImpersonate(true);
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "sh", "name", 0, "host");
InterpreterClient client = launcher.launch(context);
}
/*
@Test
public void testLauncher() throws IOException {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
InterpreterOption option = new InterpreterOption();
option.setUserImpersonate(true);
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
assertEquals("name", interpreterProcess.getInterpreterSettingName());
assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(),
interpreterProcess.getConnectTimeout());
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(2, interpreterProcess.getEnv().size());
assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
assertEquals(true, interpreterProcess.isUserImpersonated());
}
*/
}

View file

@ -47,6 +47,7 @@
<module>notebookrepo/filesystem</module>
<module>launcher/standard</module>
<module>launcher/k8s-standard</module>
<module>launcher/spark</module>
</modules>