mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
add k8s-standard launcher module
This commit is contained in:
parent
09dc9fcae5
commit
d2f3d5b7e1
8 changed files with 728 additions and 0 deletions
|
|
@ -665,6 +665,14 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT);
|
||||
}
|
||||
|
||||
public String getK8sNamespace() {
|
||||
return getString(ConfVars.ZEPPELIN_K8S_NAMESPACE);
|
||||
}
|
||||
|
||||
public String getK8sTemplatesDir() {
|
||||
return getRelativeDir(ConfVars.ZEPPELIN_K8S_TEMPLATE_DIR);
|
||||
}
|
||||
|
||||
public Map<String, String> dumpConfigurations(Predicate<String> predicate) {
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
|
||||
|
|
@ -816,6 +824,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 3000),
|
||||
ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000),
|
||||
|
||||
ZEPPELIN_K8S_NAMESPACE("zeppelin.k8s.namespace", "default"),
|
||||
ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"),
|
||||
|
||||
ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),
|
||||
ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"),
|
||||
ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""),
|
||||
|
|
|
|||
97
zeppelin-plugins/launcher/k8s-standard/pom.xml
Normal file
97
zeppelin-plugins/launcher/k8s-standard/pom.xml
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zengine-plugins-parent</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.9.0-SNAPSHOT</version>
|
||||
<relativePath>../../../zeppelin-plugins</relativePath>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>launcher-k8s-standard</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.9.0-SNAPSHOT</version>
|
||||
<name>Zeppelin: Plugin Kubernetes StandardLauncher</name>
|
||||
<description>Launcher implementation to run interpreters on Kubernetes</description>
|
||||
|
||||
<properties>
|
||||
<plugin.name>Launcher/StandardInterpreterLauncher</plugin.name>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.kubernetes</groupId>
|
||||
<artifactId>client-java</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.hubspot.jinjava</groupId>
|
||||
<artifactId>jinjava</artifactId>
|
||||
<version>2.0.11-java7</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<relocations>
|
||||
<relocation>
|
||||
<pattern>com.google</pattern>
|
||||
<shadedPattern>org.apache.zeppelin.k8slauncher.com.google</shadedPattern>
|
||||
</relocation>
|
||||
</relocations>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>*:*</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
@ -0,0 +1,130 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import com.squareup.okhttp.Call;
|
||||
import io.kubernetes.client.ApiException;
|
||||
import io.kubernetes.client.util.Watch;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class AsyncWatcher<T> implements Runnable {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWatcher.class);
|
||||
private final WatchStopCondition<T> stopCondition;
|
||||
private final int timeoutSec;
|
||||
private final WatchCall call;
|
||||
private final Thread t;
|
||||
boolean stopConditionMatched = false;
|
||||
private long startTime;
|
||||
private Exception e;
|
||||
|
||||
public AsyncWatcher(WatchCall call, WatchStopCondition<T> stopCondition, int timeoutSec) throws ApiException {
|
||||
this.call = call;
|
||||
this.stopCondition = stopCondition;
|
||||
this.timeoutSec = timeoutSec;
|
||||
this.t = new Thread(this);
|
||||
t.start();
|
||||
}
|
||||
|
||||
public void run() {
|
||||
startTime = System.currentTimeMillis();
|
||||
String resourceVersion = "0";
|
||||
|
||||
while (System.currentTimeMillis() - startTime < timeoutSec * 1000) {
|
||||
try {
|
||||
resourceVersion = watchVersion(resourceVersion);
|
||||
if (stopConditionMatched) {
|
||||
break;
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
if (e.getCause() instanceof SocketTimeoutException) {
|
||||
// see https://github.com/kubernetes-client/java/issues/259
|
||||
continue;
|
||||
} else {
|
||||
this.e = e;
|
||||
LOGGER.error("Runtime exception on watch resource", e);
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
this.e = e;
|
||||
LOGGER.error("Exception on watch resource", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String watchVersion(String resourceVersion) throws ApiException {
|
||||
String newResourceVersion = resourceVersion;
|
||||
Watch<T> watch = Watch.createWatch(
|
||||
K8sStandardInterpreterLauncher.client,
|
||||
call.list(resourceVersion),
|
||||
new TypeToken<Watch.Response<T>>() {
|
||||
}.getType());
|
||||
|
||||
try {
|
||||
for (Watch.Response<T> item : watch) {
|
||||
newResourceVersion = getResourceVersion(item);
|
||||
if (stopCondition.shouldStop(item)) {
|
||||
stopConditionMatched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
watch.close();
|
||||
} catch (IOException e) {
|
||||
// error on close watch.
|
||||
}
|
||||
return newResourceVersion;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private String getResourceVersion(Watch.Response<T> item) {
|
||||
if (item instanceof Map) {
|
||||
Object metadata = ((Map) item).get("metadata");
|
||||
if (metadata != null && metadata instanceof Map) {
|
||||
return (String) ((Map) metadata).get("resourceVersion");
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isStopConditionMatched() {
|
||||
return stopConditionMatched;
|
||||
}
|
||||
|
||||
public boolean isWatching() {
|
||||
return t.isAlive();
|
||||
}
|
||||
|
||||
public void await() {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("Watcher interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,389 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.JsonSyntaxException;
|
||||
import com.hubspot.jinjava.Jinjava;
|
||||
import com.squareup.okhttp.Call;
|
||||
import io.kubernetes.client.ApiClient;
|
||||
import io.kubernetes.client.ApiException;
|
||||
import io.kubernetes.client.Configuration;
|
||||
import io.kubernetes.client.apis.CoreV1Api;
|
||||
import io.kubernetes.client.models.V1ConfigMap;
|
||||
import io.kubernetes.client.models.V1Pod;
|
||||
import io.kubernetes.client.models.V1PodStatus;
|
||||
import io.kubernetes.client.models.V1Service;
|
||||
import io.kubernetes.client.util.Config;
|
||||
import io.kubernetes.client.util.Watch;
|
||||
import io.kubernetes.client.util.Yaml;
|
||||
import java.io.File;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterRunner;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interpreter Launcher which use shell script to launch the interpreter process.
|
||||
*/
|
||||
public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
|
||||
private static final String pretty = "true";
|
||||
private static Integer apiTimeoutSec = new Integer(120);
|
||||
private InterpreterLaunchContext context;
|
||||
|
||||
Jinjava jinja = new Jinjava();
|
||||
|
||||
static ApiClient client;
|
||||
static {
|
||||
try {
|
||||
client = Config.defaultClient();
|
||||
Configuration.setDefaultApiClient(client);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Can't get kubernetes client configuration", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
|
||||
super(zConf, recoveryStorage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply spec file(s) in the path.
|
||||
* @param path
|
||||
*/
|
||||
void apply(File path) throws IOException {
|
||||
if (path.getName().startsWith(".") || path.isHidden()) {
|
||||
LOGGER.info("Skip {}", path.getAbsolutePath());
|
||||
}
|
||||
|
||||
if (path.isDirectory()) {
|
||||
File[] files = path.listFiles();
|
||||
Arrays.sort(files);
|
||||
for (File f : files) {
|
||||
apply(f);
|
||||
}
|
||||
} else if (path.isFile()) {
|
||||
LOGGER.info("Apply {}", path.getAbsolutePath());
|
||||
List<Object> yamls = Yaml.loadAll(
|
||||
jinja.render(
|
||||
readFile(path.getAbsolutePath(), Charset.defaultCharset()),
|
||||
getTemplateBindings()));
|
||||
if (yamls != null) {
|
||||
for (Object spec : yamls) {
|
||||
try {
|
||||
applySpec(spec);
|
||||
} catch (ApiException e) {
|
||||
LOGGER.error(e.getResponseBody());
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOGGER.error("Can't apply {}", path.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Map<String, Object> getTemplateBindings() throws IOException {
|
||||
HashMap<String, Object> var = new HashMap<String, Object>();
|
||||
var.put("NAMESPACE", getNamespace());
|
||||
var.put("POD_NAME", context.getInterpreterGroupId().toLowerCase());
|
||||
var.put("CONTAINER_NAME", context.getInterpreterSettingGroup().toLowerCase());
|
||||
var.put("CONTAINER_IMAGE", "apache/zeppelin:0.8.0");
|
||||
var.put("INTP_PORT", "12321"); // interpreter.sh -r
|
||||
var.put("INTP_NAME", context.getInterpreterSettingGroup()); // interpreter.sh -d
|
||||
var.put("CALLBACK_HOST", getZeppelinServiceHost()); // interpreter.sh -c
|
||||
var.put("CALLBACK_PORT", getZeppelinServiceRpcPort()); // interpreter.sh -p
|
||||
var.put("INTP_SETTING", context.getInterpreterSettingName()); // interpreter.sh -g
|
||||
var.put("INTP_REPO", "/tmp/local-repo"); // interpreter.sh -l
|
||||
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
|
||||
return var;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if i'm running inside of kubernetes or not.
|
||||
* @return
|
||||
*/
|
||||
boolean isRunningOnKubernetes() {
|
||||
if (new File("/var/run/secrets/kubernetes.io").exists()) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current namespace
|
||||
* @throws IOException
|
||||
*/
|
||||
String getNamespace() throws IOException {
|
||||
if (isRunningOnKubernetes()) {
|
||||
return readFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace", Charset.defaultCharset()).trim();
|
||||
} else {
|
||||
return "default";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get hostname. It should be the same to Service name (and Pod name) of the Kubernetes
|
||||
* @return
|
||||
*/
|
||||
String getHostname() {
|
||||
try {
|
||||
return InetAddress.getLocalHost().getHostName();
|
||||
} catch (UnknownHostException e) {
|
||||
return "localhost";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get Zeppelin server host dns.
|
||||
* return <hostname>.<namespace>.svc.cluster.local
|
||||
* @throws IOException
|
||||
*/
|
||||
private String getZeppelinServiceHost() throws IOException {
|
||||
if (isRunningOnKubernetes()) {
|
||||
return String.format("%s.%s.svc.cluster.local",
|
||||
getHostname(), // service name and pod name should be the same
|
||||
getNamespace());
|
||||
} else {
|
||||
return context.getZeppelinServerHost();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get Zeppelin server rpc port
|
||||
* Read env variable "<HOSTNAME>_SERVICE_PORT_RPC"
|
||||
*/
|
||||
private String getZeppelinServiceRpcPort() {
|
||||
String envServicePort = System.getenv(
|
||||
String.format("%s_SERVICE_PORT_RPC", getHostname().replaceAll("[-.]", "_").toUpperCase()));
|
||||
if (envServicePort != null) {
|
||||
return envServicePort;
|
||||
} else {
|
||||
return Integer.toString(context.getZeppelinServerRPCPort());
|
||||
}
|
||||
}
|
||||
|
||||
AsyncWatcher<Map<String, Object>> w = null;
|
||||
String name;
|
||||
|
||||
// apply single spec
|
||||
AsyncWatcher applySpec(Object spec) throws ApiException, IOException {
|
||||
if (spec instanceof V1Pod) {
|
||||
V1Pod pod = (V1Pod) spec;
|
||||
CoreV1Api api = new CoreV1Api();
|
||||
String namespace = pod.getMetadata().getNamespace();
|
||||
name = pod.getMetadata().getName();
|
||||
|
||||
AsyncWatcher<Map<String, Object>> w =
|
||||
new AsyncWatcher<Map<String, Object>>(
|
||||
new WatchCall() {
|
||||
@Override
|
||||
public Call list(String resourceVersion) throws ApiException {
|
||||
return api.listNamespacedPodCall(
|
||||
namespace,
|
||||
pretty,
|
||||
null,
|
||||
String.format("metadata.name=%s", name),
|
||||
Boolean.TRUE,
|
||||
null,
|
||||
10,
|
||||
resourceVersion,
|
||||
apiTimeoutSec,
|
||||
true,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
},
|
||||
new WatchStopCondition<Map<String, Object>>() {
|
||||
@Override
|
||||
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
|
||||
// pod phase https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
|
||||
Object status = item.object.get("status");
|
||||
if (status == null) {
|
||||
return false;
|
||||
}
|
||||
System.out.println("Status = " + status);
|
||||
switch (((Map<String, String>)status).get("phase")) {
|
||||
case "Succeeded":
|
||||
case "Failed":
|
||||
case "Running":
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
},
|
||||
apiTimeoutSec);
|
||||
|
||||
api.createNamespacedPod(namespace, pod, pretty);
|
||||
} else if (spec instanceof V1Service) {
|
||||
V1Service service = (V1Service) spec;
|
||||
CoreV1Api api = new CoreV1Api();
|
||||
String namespace = service.getMetadata().getNamespace();
|
||||
String name = service.getMetadata().getName();
|
||||
|
||||
AsyncWatcher<Map<String, Object>> w =
|
||||
new AsyncWatcher<Map<String, Object>>(
|
||||
new WatchCall() {
|
||||
@Override
|
||||
public Call list(String resourceVersion) throws ApiException {
|
||||
return api.listNamespacedServiceCall(
|
||||
namespace,
|
||||
pretty,
|
||||
null,
|
||||
String.format("metadata.name=%s", name),
|
||||
Boolean.TRUE,
|
||||
null,
|
||||
10,
|
||||
resourceVersion,
|
||||
apiTimeoutSec,
|
||||
true,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
},
|
||||
new WatchStopCondition<Map<String, Object>>() {
|
||||
@Override
|
||||
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
|
||||
return "ADDED".equals(item.type);
|
||||
}
|
||||
},
|
||||
apiTimeoutSec);
|
||||
|
||||
try {
|
||||
api.createNamespacedService(namespace, service, pretty);
|
||||
} catch (JsonSyntaxException e) {
|
||||
// the API return is sometimes not a json message. That cause exception although creation of service actually success.
|
||||
// So need to ignore JsonSyntaxException here.
|
||||
}
|
||||
} else if (spec instanceof V1ConfigMap) {
|
||||
V1ConfigMap configMap = (V1ConfigMap) spec;
|
||||
CoreV1Api api = new CoreV1Api();
|
||||
String namespace = configMap.getMetadata().getNamespace();
|
||||
String name = configMap.getMetadata().getName();
|
||||
|
||||
AsyncWatcher<Map<String, Object>> w =
|
||||
new AsyncWatcher<Map<String, Object>>(
|
||||
new WatchCall() {
|
||||
@Override
|
||||
public Call list(String resourceVersion) throws ApiException {
|
||||
return api.listNamespacedConfigMapCall(
|
||||
namespace,
|
||||
pretty,
|
||||
null,
|
||||
String.format("metadata.name=%s", name),
|
||||
Boolean.TRUE,
|
||||
null,
|
||||
10,
|
||||
resourceVersion,
|
||||
apiTimeoutSec,
|
||||
true,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
},
|
||||
new WatchStopCondition<Map<String, Object>>() {
|
||||
@Override
|
||||
public boolean shouldStop(Watch.Response<Map<String, Object>> item) {
|
||||
return "ADDED".equals(item.type);
|
||||
}
|
||||
},
|
||||
apiTimeoutSec);
|
||||
|
||||
try {
|
||||
api.createNamespacedConfigMap(namespace, configMap, pretty);
|
||||
} catch (JsonSyntaxException e) {
|
||||
// the API return is sometimes not a json message. That cause exception although creation of service actually success.
|
||||
// So need to ignore JsonSyntaxException here.
|
||||
}
|
||||
}
|
||||
|
||||
if (w != null) {
|
||||
w.await();
|
||||
|
||||
if (!w.isStopConditionMatched()) {
|
||||
throw new IOException("Timeout on applying " + name);
|
||||
}
|
||||
|
||||
return w;
|
||||
} else {
|
||||
throw new IOException("Unsupported spec " + spec.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
|
||||
LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
|
||||
this.context = context;
|
||||
this.properties = context.getProperties();
|
||||
InterpreterOption option = context.getOption();
|
||||
InterpreterRunner runner = context.getRunner();
|
||||
String groupName = context.getInterpreterSettingGroup();
|
||||
String name = context.getInterpreterSettingName();
|
||||
int connectTimeout = getConnectTimeout();
|
||||
|
||||
// create new pod
|
||||
apply(new File(zConf.getK8sTemplatesDir(), "interpreter"));
|
||||
return null;
|
||||
}
|
||||
|
||||
protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
|
||||
Map<String, String> env = new HashMap<>();
|
||||
for (Object key : context.getProperties().keySet()) {
|
||||
if (RemoteInterpreterUtils.isEnvString((String) key)) {
|
||||
env.put((String) key, context.getProperties().getProperty((String) key));
|
||||
}
|
||||
// TODO(zjffdu) move this to FlinkInterpreterLauncher
|
||||
if (key.toString().equals("FLINK_HOME")) {
|
||||
String flinkHome = context.getProperties().get(key).toString();
|
||||
env.put("FLINK_CONF_DIR", flinkHome + "/conf");
|
||||
env.put("FLINK_LIB_DIR", flinkHome + "/lib");
|
||||
}
|
||||
}
|
||||
env.put("INTERPRETER_GROUP_ID", context.getInterpreterGroupId());
|
||||
return env;
|
||||
}
|
||||
|
||||
String readFile(String path, Charset encoding) throws IOException {
|
||||
byte[] encoded = Files.readAllBytes(Paths.get(path));
|
||||
return new String(encoded, encoding);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import com.squareup.okhttp.Call;
|
||||
import io.kubernetes.client.ApiException;
|
||||
|
||||
public interface WatchCall {
|
||||
Call list(String resourceVersion) throws ApiException;
|
||||
}
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import io.kubernetes.client.util.Watch;
|
||||
|
||||
public interface WatchStopCondition<T> {
|
||||
boolean shouldStop(Watch.Response<T> item);
|
||||
}
|
||||
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* https://github.com/LiliC/travis-minikube
|
||||
* https://blog.travis-ci.com/2017-10-26-running-kubernetes-on-travis-ci-with-minikube
|
||||
*/
|
||||
public class K8sStandardInterpreterLauncherTest {
|
||||
@Before
|
||||
public void setUp() {
|
||||
for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
|
||||
System.clearProperty(confVar.getVarName());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTemplate() throws IOException {
|
||||
// given
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("ENV_1", "VALUE_1");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
properties.setProperty("CALLBACK_HOST", "zeppelin-server.default.svc.cluster.local");
|
||||
properties.setProperty("CALLBACK_PORT", "12320");
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
option.setUserImpersonate(true);
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "sh", "name", 0, "host");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
}
|
||||
|
||||
/*
|
||||
@Test
|
||||
public void testLauncher() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("ENV_1", "VALUE_1");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
option.setUserImpersonate(true);
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
assertEquals("name", interpreterProcess.getInterpreterSettingName());
|
||||
assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
|
||||
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
|
||||
assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(),
|
||||
interpreterProcess.getConnectTimeout());
|
||||
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
|
||||
assertEquals(2, interpreterProcess.getEnv().size());
|
||||
assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
|
||||
assertEquals(true, interpreterProcess.isUserImpersonated());
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
|
@ -47,6 +47,7 @@
|
|||
<module>notebookrepo/filesystem</module>
|
||||
|
||||
<module>launcher/standard</module>
|
||||
<module>launcher/k8s-standard</module>
|
||||
<module>launcher/spark</module>
|
||||
</modules>
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue