mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
renew token periodically
This commit is contained in:
parent
ee741e483a
commit
856c8716ec
3 changed files with 130 additions and 41 deletions
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
|||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
|
|
@ -50,7 +51,7 @@ public class ShellInterpreter extends Interpreter {
|
|||
private final boolean isWindows = System.getProperty("os.name").startsWith("Windows");
|
||||
private final String shell = isWindows ? "cmd /c" : "bash -c";
|
||||
ConcurrentHashMap<String, DefaultExecutor> executors;
|
||||
private Boolean isGSSInit = false;
|
||||
ScheduledExecutorService scheduledExecutorService;
|
||||
|
||||
public ShellInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -61,13 +62,23 @@ public class ShellInterpreter extends Interpreter {
|
|||
LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
|
||||
executors = new ConcurrentHashMap<>();
|
||||
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
|
||||
ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
|
||||
isGSSInit = true;
|
||||
scheduledExecutorService = startKerberosLoginThread();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
public void close() {
|
||||
if (scheduledExecutorService != null) {
|
||||
scheduledExecutorService.shutdown();
|
||||
}
|
||||
|
||||
for (String executorKey : executors.keySet()) {
|
||||
DefaultExecutor executor = executors.remove(executorKey);
|
||||
if (executor != null) {
|
||||
executor.getWatchdog().destroyProcess();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
|
|
@ -95,32 +106,18 @@ public class ShellInterpreter extends Interpreter {
|
|||
+ " return with exit value: " + exitVal);
|
||||
return new InterpreterResult(Code.SUCCESS, outStream.toString());
|
||||
} catch (ExecuteException e) {
|
||||
if (isGSSInit &&
|
||||
contextInterpreter.out != null &&
|
||||
StringUtils.containsIgnoreCase(contextInterpreter.out.getCurrentOutput().toString(),
|
||||
"GSSException")) {
|
||||
isGSSInit = false;
|
||||
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
|
||||
ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
|
||||
isGSSInit = true;
|
||||
}
|
||||
|
||||
appendSessionExpire(contextInterpreter);
|
||||
return interpret(cmd, contextInterpreter);
|
||||
} else {
|
||||
int exitValue = e.getExitValue();
|
||||
LOGGER.error("Can not run " + cmd, e);
|
||||
Code code = Code.ERROR;
|
||||
String message = outStream.toString();
|
||||
if (exitValue == 143) {
|
||||
code = Code.INCOMPLETE;
|
||||
message += "Paragraph received a SIGTERM\n";
|
||||
LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " stopped executing: " + message);
|
||||
}
|
||||
message += "ExitValue: " + exitValue;
|
||||
return new InterpreterResult(code, message);
|
||||
int exitValue = e.getExitValue();
|
||||
LOGGER.error("Can not run " + cmd, e);
|
||||
Code code = Code.ERROR;
|
||||
String message = outStream.toString();
|
||||
if (exitValue == 143) {
|
||||
code = Code.INCOMPLETE;
|
||||
message += "Paragraph received a SIGTERM\n";
|
||||
LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " stopped executing: " + message);
|
||||
}
|
||||
message += "ExitValue: " + exitValue;
|
||||
return new InterpreterResult(code, message);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
|
|
@ -129,18 +126,6 @@ public class ShellInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
private void appendSessionExpire(InterpreterContext contextInterpreter) {
|
||||
String appInfoHtml = "\n\n" +
|
||||
"Previous Keytab session is expired, new Keytab session is created. " +
|
||||
"Will execute this paragraph again!" +
|
||||
"\n\n";
|
||||
try {
|
||||
contextInterpreter.out.getCurrentOutput().write(appInfoHtml);
|
||||
} catch (IOException eIOException) {
|
||||
LOGGER.error("Error writing to currentOutput", eIOException);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
DefaultExecutor executor = executors.remove(context.getParagraphId());
|
||||
|
|
@ -171,4 +156,16 @@ public class ShellInterpreter extends Interpreter {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean runKerberosLogin() {
|
||||
try {
|
||||
ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,11 @@
|
|||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Interpreter related constants
|
||||
*
|
||||
|
|
@ -32,4 +37,17 @@ public class Constants {
|
|||
|
||||
public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100;
|
||||
|
||||
public static final Map<String, TimeUnit> TIME_SUFFIXES;
|
||||
|
||||
static {
|
||||
TIME_SUFFIXES = new HashMap<>();
|
||||
TIME_SUFFIXES.put("us", TimeUnit.MICROSECONDS);
|
||||
TIME_SUFFIXES.put("ms", TimeUnit.MILLISECONDS);
|
||||
TIME_SUFFIXES.put("s", TimeUnit.SECONDS);
|
||||
TIME_SUFFIXES.put("m", TimeUnit.MINUTES);
|
||||
TIME_SUFFIXES.put("min", TimeUnit.MINUTES);
|
||||
TIME_SUFFIXES.put("h", TimeUnit.HOURS);
|
||||
TIME_SUFFIXES.put("d", TimeUnit.DAYS);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,12 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.reflect.FieldUtils;
|
||||
import org.apache.zeppelin.annotation.Experimental;
|
||||
|
|
@ -50,6 +56,8 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public abstract class Interpreter {
|
||||
|
||||
Integer kinitFailCount = 0;
|
||||
|
||||
/**
|
||||
* Opens interpreter. You may want to place your initialize routine here.
|
||||
* open() is called only once
|
||||
|
|
@ -486,4 +494,70 @@ public abstract class Interpreter {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public String getKerberosRefreshInterval() {
|
||||
if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) {
|
||||
return "1d";
|
||||
} else {
|
||||
return System.getenv("KERBEROS_REFRESH_INTERVAL");
|
||||
}
|
||||
}
|
||||
|
||||
public Integer kinitFailThreshold() {
|
||||
if (System.getenv("KINIT_FAIL_THRESHOLD") == null) {
|
||||
return 5;
|
||||
} else {
|
||||
return new Integer(System.getenv("KINIT_FAIL_THRESHOLD"));
|
||||
}
|
||||
}
|
||||
|
||||
public Long getTimeAsMs(String time) {
|
||||
if (time == null) {
|
||||
logger.error("Cannot convert to time value.", time);
|
||||
time = "1d";
|
||||
}
|
||||
|
||||
Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase());
|
||||
if (!m.matches()) {
|
||||
throw new IllegalArgumentException("Invalid time string: " + time);
|
||||
}
|
||||
|
||||
long val = Long.parseLong(m.group(1));
|
||||
String suffix = m.group(2);
|
||||
|
||||
if (suffix != null && !Constants.TIME_SUFFIXES.containsKey(suffix)) {
|
||||
throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\"");
|
||||
}
|
||||
|
||||
return TimeUnit.MILLISECONDS.convert(val,
|
||||
suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
protected abstract boolean runKerberosLogin();
|
||||
|
||||
protected ScheduledExecutorService startKerberosLoginThread() {
|
||||
final ScheduledExecutorService scheduledExecutorService =
|
||||
Executors.newScheduledThreadPool(1);
|
||||
|
||||
scheduledExecutorService.schedule(new Callable() {
|
||||
public Object call() throws Exception {
|
||||
|
||||
if (runKerberosLogin()) {
|
||||
// schedule another kinit run with a fixed delay.
|
||||
scheduledExecutorService
|
||||
.schedule(this, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
// schedule another retry at once or fail the livy server if too many times kinit fail
|
||||
if (kinitFailCount >= kinitFailThreshold()) {
|
||||
close();
|
||||
} else {
|
||||
scheduledExecutorService.submit(this);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
|
||||
|
||||
return scheduledExecutorService;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue