mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
create and connect interpreter pod
This commit is contained in:
parent
9f1b7a1691
commit
0dea3836b0
11 changed files with 171 additions and 57 deletions
|
|
@ -5,6 +5,8 @@ metadata:
|
|||
name: {{POD_NAME}}
|
||||
labels:
|
||||
app: {{POD_NAME}}
|
||||
interpreterGroupId: {{INTP_ID}}
|
||||
interpreterSettingName: {{INTP_SETTING}}
|
||||
spec:
|
||||
automountServiceAccountToken: false
|
||||
restartPolicy: Never
|
||||
|
|
@ -12,7 +14,7 @@ spec:
|
|||
containers:
|
||||
- name: {{CONTAINER_NAME}}
|
||||
image: {{CONTAINER_IMAGE}}
|
||||
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{INTP_NAME}} -r {{INTP_PORT}} -c {{CALLBACK_HOST}} -p {{CALLBACK_PORT}} -i {{INTP_ID}} -l {{INTP_REPO}} -g {{INTP_SETTING}}"]
|
||||
command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{INTP_NAME}} -r {{INTP_PORT}}:{{INTP_PORT}} -c {{CALLBACK_HOST}} -p {{CALLBACK_PORT}} -i {{INTP_ID}} -l {{INTP_REPO}} -g {{INTP_SETTING}}"]
|
||||
env:
|
||||
- name: ZEPPELIN_HOME
|
||||
value: /zeppelin
|
||||
|
|
|
|||
|
|
@ -669,12 +669,16 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getString(ConfVars.ZEPPELIN_K8S_MODE);
|
||||
}
|
||||
|
||||
public boolean getK8sPortForward() {
|
||||
return getBoolean(ConfVars.ZEPPELIN_K8S_PORTFORWARD);
|
||||
}
|
||||
|
||||
public String getK8sKubectlCmd() {
|
||||
return getString(ConfVars.ZEPPELIN_K8S_KUBECTL);
|
||||
}
|
||||
|
||||
public String getK8sNamespace() {
|
||||
return getString(ConfVars.ZEPPELIN_K8S_NAMESPACE);
|
||||
public String getK8sContainerImage() {
|
||||
return getString(ConfVars.ZEPPELIN_K8S_CONTAINER_IMAGE);
|
||||
}
|
||||
|
||||
public String getK8sTemplatesDir() {
|
||||
|
|
@ -833,8 +837,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000),
|
||||
|
||||
ZEPPELIN_K8S_MODE("zeppelin.k8s.mode", "auto"), // auto | on | off
|
||||
ZEPPELIN_K8S_PORTFORWARD("zeppelin.k8s.portforward", false), // kubectl port-forward incase of Zeppelin is running outside of kuberentes
|
||||
ZEPPELIN_K8S_KUBECTL("zeppelin.k8s.kubectl", "kubectl"), // kubectl command
|
||||
ZEPPELIN_K8S_NAMESPACE("zeppelin.k8s.namespace", "default"),
|
||||
ZEPPELIN_K8S_CONTAINER_IMAGE("zeppelin.k8s.container.image", "apache/zeppelin:" + Util.getVersion()),
|
||||
ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"),
|
||||
|
||||
ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),
|
||||
|
|
|
|||
|
|
@ -41,11 +41,6 @@
|
|||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.kubernetes</groupId>
|
||||
<artifactId>client-java</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.hubspot.jinjava</groupId>
|
||||
<artifactId>jinjava</artifactId>
|
||||
|
|
@ -55,31 +50,6 @@
|
|||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<relocations>
|
||||
<relocation>
|
||||
<pattern>com.google</pattern>
|
||||
<shadedPattern>org.apache.zeppelin.k8slauncher.com.google</shadedPattern>
|
||||
</relocation>
|
||||
</relocations>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>*:*</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
</plugin>
|
||||
|
|
|
|||
|
|
@ -5,22 +5,26 @@ import com.google.gson.Gson;
|
|||
import com.google.gson.reflect.TypeToken;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
||||
private static final Logger logger = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
|
||||
private static final int K8S_INTERPRETER_SERVICE_PORT = 12321;
|
||||
private final Kubectl kubectl;
|
||||
private final String interpreterGroupId;
|
||||
private final String interpreterGroupName;
|
||||
private final String interpreterSettingName;
|
||||
private final File specTempaltes;
|
||||
private final String containerImage;
|
||||
private final Properties properties;
|
||||
private final Map<String, String> envs;
|
||||
private final String zeppelinServiceHost;
|
||||
|
|
@ -28,10 +32,17 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
private final int podCreateTimeoutSec = 180;
|
||||
|
||||
private final Gson gson = new Gson();
|
||||
private final String podName;
|
||||
private final boolean portForward;
|
||||
private ExecuteWatchdog portForwardWatchdog;
|
||||
private int podPort = K8S_INTERPRETER_SERVICE_PORT;
|
||||
|
||||
private AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
public K8sRemoteInterpreterProcess(
|
||||
Kubectl kubectl,
|
||||
File specTemplates,
|
||||
String containerImage,
|
||||
String interpreterGroupId,
|
||||
String interpreterGroupName,
|
||||
String interpreterSettingName,
|
||||
|
|
@ -39,10 +50,12 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
Map<String, String> envs,
|
||||
String zeppelinServiceHost,
|
||||
String zeppelinServiceRpcPort,
|
||||
boolean portForward,
|
||||
int connectTimeout) {
|
||||
super(connectTimeout);
|
||||
this.kubectl = kubectl;
|
||||
this.specTempaltes = specTemplates;
|
||||
this.containerImage = containerImage;
|
||||
this.interpreterGroupId = interpreterGroupId;
|
||||
this.interpreterGroupName = interpreterGroupName;
|
||||
this.interpreterSettingName = interpreterSettingName;
|
||||
|
|
@ -50,6 +63,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
this.envs = envs;
|
||||
this.zeppelinServiceHost = zeppelinServiceHost;
|
||||
this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
|
||||
this.portForward = portForward;
|
||||
this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -58,7 +73,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
* @return
|
||||
*/
|
||||
private String getPodName() {
|
||||
return interpreterGroupId.toLowerCase();
|
||||
return podName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -71,6 +86,46 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
// create new pod
|
||||
apply(specTempaltes, false);
|
||||
kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", podCreateTimeoutSec);
|
||||
|
||||
if (portForward) {
|
||||
podPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
portForwardWatchdog = kubectl.portForward(
|
||||
String.format("pod/%s", getPodName()),
|
||||
new String[] {
|
||||
String.format("%s:%s", podPort, K8S_INTERPRETER_SERVICE_PORT)
|
||||
});
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// wait until interpreter send started message through thrift rpc
|
||||
synchronized (started) {
|
||||
if (!started.get()) {
|
||||
try {
|
||||
started.wait(getConnectTimeout());
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Remote interpreter is not accessible");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!started.get()) {
|
||||
logger.info(
|
||||
String.format("Interpreter pod creation is time out in %d seconds",
|
||||
getConnectTimeout()/1000));
|
||||
}
|
||||
|
||||
// waits for interpreter thrift rpc server ready
|
||||
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
|
||||
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
|
||||
break;
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -78,27 +133,45 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
// delete pod
|
||||
try {
|
||||
apply(specTempaltes, true);
|
||||
} catch (IOException e) {
|
||||
logger.info("Error on removing interpreter pod", e);
|
||||
}
|
||||
|
||||
try {
|
||||
kubectl.wait(String.format("pod/%s", getPodName()), "delete", 60);
|
||||
} catch (IOException e) {
|
||||
logger.error("Error on removing interpreter pod", e);
|
||||
logger.debug("Error on waiting pod delete", e);
|
||||
}
|
||||
|
||||
|
||||
if (portForwardWatchdog != null) {
|
||||
portForwardWatchdog.destroyProcess();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHost() {
|
||||
return String.format("%s.%s.svc.cluster.local",
|
||||
getPodName(), // service name and pod name is the same
|
||||
kubectl.getNamespace());
|
||||
if (portForward) {
|
||||
return "localhost";
|
||||
} else {
|
||||
return String.format("%s.%s.svc.cluster.local",
|
||||
getPodName(), // service name and pod name is the same
|
||||
kubectl.getNamespace());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return 12321;
|
||||
return podPort;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
try {
|
||||
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
String ret = kubectl.execAndGet(new String[]{
|
||||
"get",
|
||||
String.format("pods/%s", getPodName()),
|
||||
|
|
@ -120,8 +193,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
return false;
|
||||
}
|
||||
|
||||
return "Running".equals(status.get("phase"));
|
||||
} catch (IOException e) {
|
||||
return "Running".equals(status.get("phase")) && started.get();
|
||||
} catch (Exception e) {
|
||||
logger.error("Can't get pod status", e);
|
||||
return false;
|
||||
}
|
||||
|
|
@ -167,7 +240,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
var.put("NAMESPACE", kubectl.getNamespace());
|
||||
var.put("POD_NAME", getPodName());
|
||||
var.put("CONTAINER_NAME", interpreterGroupName.toLowerCase());
|
||||
var.put("CONTAINER_IMAGE", "apache/zeppelin:0.8.0");
|
||||
var.put("CONTAINER_IMAGE", containerImage);
|
||||
var.put("INTP_PORT", "12321"); // interpreter.sh -r
|
||||
var.put("INTP_ID", interpreterGroupId); // interpreter.sh -i
|
||||
var.put("INTP_NAME", interpreterGroupName); // interpreter.sh -d
|
||||
|
|
@ -178,4 +251,27 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
|
|||
var.putAll(Maps.fromProperties(properties)); // interpreter properties override template variables
|
||||
return var;
|
||||
}
|
||||
|
||||
private String getRandomString(int length) {
|
||||
char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Random random = new Random();
|
||||
for (int i = 0; i < length; i++) {
|
||||
char c = chars[random.nextInt(chars.length)];
|
||||
sb.append(c);
|
||||
}
|
||||
String randomStr = sb.toString();
|
||||
|
||||
return randomStr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processStarted(int port, String host) {
|
||||
logger.info("Interpreter pod created {}:{}", host, port);
|
||||
synchronized (started) {
|
||||
started.set(true);
|
||||
started.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,14 +25,19 @@ import java.nio.charset.Charset;
|
|||
import java.util.HashMap;
|
||||
|
||||
public class K8sSpecTemplate extends HashMap<String, Object> {
|
||||
private final Jinjava jinja = new Jinjava();
|
||||
|
||||
public String render(File templateFile) throws IOException {
|
||||
String template = FileUtils.readFileToString(templateFile, Charset.defaultCharset());
|
||||
return render(template);
|
||||
}
|
||||
|
||||
public String render(String template) {
|
||||
return jinja.render(template, this);
|
||||
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
|
||||
Jinjava jinja = new Jinjava();
|
||||
return jinja.render(template, this);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(oldCl);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -125,6 +125,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
|
|||
return new K8sRemoteInterpreterProcess(
|
||||
kubectl,
|
||||
new File(zConf.getK8sTemplatesDir(), "interpreter"),
|
||||
zConf.getK8sContainerImage(),
|
||||
context.getInterpreterGroupId(),
|
||||
context.getInterpreterSettingGroup(),
|
||||
context.getInterpreterSettingName(),
|
||||
|
|
@ -132,6 +133,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
|
|||
buildEnvFromProperties(context),
|
||||
getZeppelinServiceHost(),
|
||||
getZeppelinServiceRpcPort(),
|
||||
zConf.getK8sPortForward(),
|
||||
connectTimeout);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,10 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.gson.Gson;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
|
||||
import org.apache.commons.exec.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
||||
import java.io.*;
|
||||
|
|
@ -69,6 +67,31 @@ public class Kubectl {
|
|||
String.format("--timeout=%ds", timeoutSec)});
|
||||
}
|
||||
|
||||
public ExecuteWatchdog portForward(String resource, String [] ports) throws IOException {
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
CommandLine cmd = new CommandLine(kubectlCmd);
|
||||
cmd.addArguments("port-forward");
|
||||
cmd.addArguments(resource);
|
||||
cmd.addArguments(ports);
|
||||
|
||||
ExecuteWatchdog watchdog = new ExecuteWatchdog(-1);
|
||||
executor.setWatchdog(watchdog);
|
||||
|
||||
executor.execute(cmd, new ExecuteResultHandler() {
|
||||
@Override
|
||||
public void onProcessComplete(int i) {
|
||||
logger.info("Port-forward stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProcessFailed(ExecuteException e) {
|
||||
logger.debug("port-forward process exit", e);
|
||||
}
|
||||
});
|
||||
|
||||
return watchdog;
|
||||
}
|
||||
|
||||
String execAndGet(String [] args) throws IOException {
|
||||
return execAndGet(args, "");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ import org.apache.zeppelin.resource.Resource;
|
|||
import org.apache.zeppelin.resource.ResourceId;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
import org.apache.zeppelin.util.ReflectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -164,8 +165,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
|
|||
LOGGER.warn("Interpreter process does not existed yet for InterpreterGroup: " +
|
||||
registerInfo.getInterpreterGroupId());
|
||||
}
|
||||
((RemoteInterpreterManagedProcess) interpreterProcess)
|
||||
.processStarted(registerInfo.port, registerInfo.host);
|
||||
|
||||
interpreterProcess.processStarted(registerInfo.port, registerInfo.host);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -189,7 +189,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
|
||||
}
|
||||
|
||||
// called by RemoteInterpreterServer to notify that RemoteInterpreter Process is started
|
||||
@Override
|
||||
public void processStarted(int port, String host) {
|
||||
this.port = port;
|
||||
this.host = host;
|
||||
|
|
|
|||
|
|
@ -137,4 +137,9 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
|
|||
public interface RemoteFunction<T> {
|
||||
T call(Client client) throws Exception;
|
||||
}
|
||||
|
||||
/**
|
||||
* called by RemoteInterpreterEventServer to notify that RemoteInterpreter Process is started
|
||||
*/
|
||||
public abstract void processStarted(int port, String host);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,4 +88,9 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
public boolean isRunning() {
|
||||
return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processStarted(int port, String host) {
|
||||
// assume process is externally managed. nothing to do
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue