create and connect interpreter pod

This commit is contained in:
Lee moon soo 2018-11-22 11:56:59 +09:00
parent 9f1b7a1691
commit 0dea3836b0
11 changed files with 171 additions and 57 deletions

View file

@ -5,6 +5,8 @@ metadata:
name: {{POD_NAME}}
labels:
app: {{POD_NAME}}
interpreterGroupId: {{INTP_ID}}
interpreterSettingName: {{INTP_SETTING}}
spec:
automountServiceAccountToken: false
restartPolicy: Never
@ -12,7 +14,7 @@ spec:
containers:
- name: {{CONTAINER_NAME}}
image: {{CONTAINER_IMAGE}}
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{INTP_NAME}} -r {{INTP_PORT}} -c {{CALLBACK_HOST}} -p {{CALLBACK_PORT}} -i {{INTP_ID}} -l {{INTP_REPO}} -g {{INTP_SETTING}}"]
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{INTP_NAME}} -r {{INTP_PORT}}:{{INTP_PORT}} -c {{CALLBACK_HOST}} -p {{CALLBACK_PORT}} -i {{INTP_ID}} -l {{INTP_REPO}} -g {{INTP_SETTING}}"]
env:
- name: ZEPPELIN_HOME
value: /zeppelin

View file

@ -669,12 +669,16 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getString(ConfVars.ZEPPELIN_K8S_MODE);
}
public boolean getK8sPortForward() {
return getBoolean(ConfVars.ZEPPELIN_K8S_PORTFORWARD);
}
public String getK8sKubectlCmd() {
return getString(ConfVars.ZEPPELIN_K8S_KUBECTL);
}
public String getK8sNamespace() {
return getString(ConfVars.ZEPPELIN_K8S_NAMESPACE);
public String getK8sContainerImage() {
return getString(ConfVars.ZEPPELIN_K8S_CONTAINER_IMAGE);
}
public String getK8sTemplatesDir() {
@ -833,8 +837,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000),
ZEPPELIN_K8S_MODE("zeppelin.k8s.mode", "auto"), // auto | on | off
ZEPPELIN_K8S_PORTFORWARD("zeppelin.k8s.portforward", false), // kubectl port-forward incase of Zeppelin is running outside of kuberentes
ZEPPELIN_K8S_KUBECTL("zeppelin.k8s.kubectl", "kubectl"), // kubectl command
ZEPPELIN_K8S_NAMESPACE("zeppelin.k8s.namespace", "default"),
ZEPPELIN_K8S_CONTAINER_IMAGE("zeppelin.k8s.container.image", "apache/zeppelin:" + Util.getVersion()),
ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),

View file

@ -41,11 +41,6 @@
</properties>
<dependencies>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.hubspot.jinjava</groupId>
<artifactId>jinjava</artifactId>
@ -55,31 +50,6 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.zeppelin.k8slauncher.com.google</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>

View file

@ -5,22 +5,26 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private static final Logger logger = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
private static final int K8S_INTERPRETER_SERVICE_PORT = 12321;
private final Kubectl kubectl;
private final String interpreterGroupId;
private final String interpreterGroupName;
private final String interpreterSettingName;
private final File specTempaltes;
private final String containerImage;
private final Properties properties;
private final Map<String, String> envs;
private final String zeppelinServiceHost;
@ -28,10 +32,17 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private final int podCreateTimeoutSec = 180;
private final Gson gson = new Gson();
private final String podName;
private final boolean portForward;
private ExecuteWatchdog portForwardWatchdog;
private int podPort = K8S_INTERPRETER_SERVICE_PORT;
private AtomicBoolean started = new AtomicBoolean(false);
public K8sRemoteInterpreterProcess(
Kubectl kubectl,
File specTemplates,
String containerImage,
String interpreterGroupId,
String interpreterGroupName,
String interpreterSettingName,
@ -39,10 +50,12 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
Map<String, String> envs,
String zeppelinServiceHost,
String zeppelinServiceRpcPort,
boolean portForward,
int connectTimeout) {
super(connectTimeout);
this.kubectl = kubectl;
this.specTempaltes = specTemplates;
this.containerImage = containerImage;
this.interpreterGroupId = interpreterGroupId;
this.interpreterGroupName = interpreterGroupName;
this.interpreterSettingName = interpreterSettingName;
@ -50,6 +63,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
this.envs = envs;
this.zeppelinServiceHost = zeppelinServiceHost;
this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
this.portForward = portForward;
this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6);
}
@ -58,7 +73,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
* @return
*/
private String getPodName() {
return interpreterGroupId.toLowerCase();
return podName;
}
@Override
@ -71,6 +86,46 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
// create new pod
apply(specTempaltes, false);
kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", podCreateTimeoutSec);
if (portForward) {
podPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
portForwardWatchdog = kubectl.portForward(
String.format("pod/%s", getPodName()),
new String[] {
String.format("%s:%s", podPort, K8S_INTERPRETER_SERVICE_PORT)
});
}
long startTime = System.currentTimeMillis();
// wait until interpreter send started message through thrift rpc
synchronized (started) {
if (!started.get()) {
try {
started.wait(getConnectTimeout());
} catch (InterruptedException e) {
logger.error("Remote interpreter is not accessible");
}
}
}
if (!started.get()) {
logger.info(
String.format("Interpreter pod creation is time out in %d seconds",
getConnectTimeout()/1000));
}
// waits for interpreter thrift rpc server ready
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
break;
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
@Override
@ -78,27 +133,45 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
// delete pod
try {
apply(specTempaltes, true);
} catch (IOException e) {
logger.info("Error on removing interpreter pod", e);
}
try {
kubectl.wait(String.format("pod/%s", getPodName()), "delete", 60);
} catch (IOException e) {
logger.error("Error on removing interpreter pod", e);
logger.debug("Error on waiting pod delete", e);
}
if (portForwardWatchdog != null) {
portForwardWatchdog.destroyProcess();
}
}
@Override
public String getHost() {
return String.format("%s.%s.svc.cluster.local",
getPodName(), // service name and pod name is the same
kubectl.getNamespace());
if (portForward) {
return "localhost";
} else {
return String.format("%s.%s.svc.cluster.local",
getPodName(), // service name and pod name is the same
kubectl.getNamespace());
}
}
@Override
public int getPort() {
return 12321;
return podPort;
}
@Override
public boolean isRunning() {
try {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
return true;
}
String ret = kubectl.execAndGet(new String[]{
"get",
String.format("pods/%s", getPodName()),
@ -120,8 +193,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
return false;
}
return "Running".equals(status.get("phase"));
} catch (IOException e) {
return "Running".equals(status.get("phase")) && started.get();
} catch (Exception e) {
logger.error("Can't get pod status", e);
return false;
}
@ -167,7 +240,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
var.put("NAMESPACE", kubectl.getNamespace());
var.put("POD_NAME", getPodName());
var.put("CONTAINER_NAME", interpreterGroupName.toLowerCase());
var.put("CONTAINER_IMAGE", "apache/zeppelin:0.8.0");
var.put("CONTAINER_IMAGE", containerImage);
var.put("INTP_PORT", "12321"); // interpreter.sh -r
var.put("INTP_ID", interpreterGroupId); // interpreter.sh -i
var.put("INTP_NAME", interpreterGroupName); // interpreter.sh -d
@ -178,4 +251,27 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
return var;
}
private String getRandomString(int length) {
char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
StringBuilder sb = new StringBuilder();
Random random = new Random();
for (int i = 0; i < length; i++) {
char c = chars[random.nextInt(chars.length)];
sb.append(c);
}
String randomStr = sb.toString();
return randomStr;
}
@Override
public void processStarted(int port, String host) {
logger.info("Interpreter pod created {}:{}", host, port);
synchronized (started) {
started.set(true);
started.notify();
}
}
}

View file

@ -25,14 +25,19 @@ import java.nio.charset.Charset;
import java.util.HashMap;
public class K8sSpecTemplate extends HashMap<String, Object> {
private final Jinjava jinja = new Jinjava();
public String render(File templateFile) throws IOException {
String template = FileUtils.readFileToString(templateFile, Charset.defaultCharset());
return render(template);
}
public String render(String template) {
return jinja.render(template, this);
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
Jinjava jinja = new Jinjava();
return jinja.render(template, this);
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
}
}
}

View file

@ -125,6 +125,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
return new K8sRemoteInterpreterProcess(
kubectl,
new File(zConf.getK8sTemplatesDir(), "interpreter"),
zConf.getK8sContainerImage(),
context.getInterpreterGroupId(),
context.getInterpreterSettingGroup(),
context.getInterpreterSettingName(),
@ -132,6 +133,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
buildEnvFromProperties(context),
getZeppelinServiceHost(),
getZeppelinServiceRpcPort(),
zConf.getK8sPortForward(),
connectTimeout);
}

View file

@ -21,10 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.*;
import org.apache.commons.io.IOUtils;
import java.io.*;
@ -69,6 +67,31 @@ public class Kubectl {
String.format("--timeout=%ds", timeoutSec)});
}
public ExecuteWatchdog portForward(String resource, String [] ports) throws IOException {
DefaultExecutor executor = new DefaultExecutor();
CommandLine cmd = new CommandLine(kubectlCmd);
cmd.addArguments("port-forward");
cmd.addArguments(resource);
cmd.addArguments(ports);
ExecuteWatchdog watchdog = new ExecuteWatchdog(-1);
executor.setWatchdog(watchdog);
executor.execute(cmd, new ExecuteResultHandler() {
@Override
public void onProcessComplete(int i) {
logger.info("Port-forward stopped");
}
@Override
public void onProcessFailed(ExecuteException e) {
logger.debug("port-forward process exit", e);
}
});
return watchdog;
}
String execAndGet(String [] args) throws IOException {
return execAndGet(args, "");
}

View file

@ -48,6 +48,7 @@ import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourceId;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourceSet;
import org.apache.zeppelin.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -164,8 +165,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
LOGGER.warn("Interpreter process does not existed yet for InterpreterGroup: " +
registerInfo.getInterpreterGroupId());
}
((RemoteInterpreterManagedProcess) interpreterProcess)
.processStarted(registerInfo.port, registerInfo.host);
interpreterProcess.processStarted(registerInfo.port, registerInfo.host);
}
@Override

View file

@ -189,7 +189,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
}
// called by RemoteInterpreterServer to notify that RemoteInterpreter Process is started
@Override
public void processStarted(int port, String host) {
this.port = port;
this.host = host;

View file

@ -137,4 +137,9 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
public interface RemoteFunction<T> {
T call(Client client) throws Exception;
}
/**
* called by RemoteInterpreterEventServer to notify that RemoteInterpreter Process is started
*/
public abstract void processStarted(int port, String host);
}

View file

@ -88,4 +88,9 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
public boolean isRunning() {
return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
}
@Override
public void processStarted(int port, String host) {
// assume process is externally managed. nothing to do
}
}