renew token periodically

This commit is contained in:
Prabhjyot Singh 2017-06-15 18:48:13 +05:30
parent ee741e483a
commit 856c8716ec
3 changed files with 130 additions and 41 deletions

View file

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
@ -50,7 +51,7 @@ public class ShellInterpreter extends Interpreter {
private final boolean isWindows = System.getProperty("os.name").startsWith("Windows");
private final String shell = isWindows ? "cmd /c" : "bash -c";
ConcurrentHashMap<String, DefaultExecutor> executors;
private Boolean isGSSInit = false;
ScheduledExecutorService scheduledExecutorService;
public ShellInterpreter(Properties property) {
super(property);
@ -61,13 +62,23 @@ public class ShellInterpreter extends Interpreter {
LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
executors = new ConcurrentHashMap<>();
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
isGSSInit = true;
scheduledExecutorService = startKerberosLoginThread();
}
}
@Override
public void close() {}
public void close() {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
}
for (String executorKey : executors.keySet()) {
DefaultExecutor executor = executors.remove(executorKey);
if (executor != null) {
executor.getWatchdog().destroyProcess();
}
}
}
@Override
@ -95,32 +106,18 @@ public class ShellInterpreter extends Interpreter {
+ " return with exit value: " + exitVal);
return new InterpreterResult(Code.SUCCESS, outStream.toString());
} catch (ExecuteException e) {
if (isGSSInit &&
contextInterpreter.out != null &&
StringUtils.containsIgnoreCase(contextInterpreter.out.getCurrentOutput().toString(),
"GSSException")) {
isGSSInit = false;
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
isGSSInit = true;
}
appendSessionExpire(contextInterpreter);
return interpret(cmd, contextInterpreter);
} else {
int exitValue = e.getExitValue();
LOGGER.error("Can not run " + cmd, e);
Code code = Code.ERROR;
String message = outStream.toString();
if (exitValue == 143) {
code = Code.INCOMPLETE;
message += "Paragraph received a SIGTERM\n";
LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
+ " stopped executing: " + message);
}
message += "ExitValue: " + exitValue;
return new InterpreterResult(code, message);
int exitValue = e.getExitValue();
LOGGER.error("Can not run " + cmd, e);
Code code = Code.ERROR;
String message = outStream.toString();
if (exitValue == 143) {
code = Code.INCOMPLETE;
message += "Paragraph received a SIGTERM\n";
LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
+ " stopped executing: " + message);
}
message += "ExitValue: " + exitValue;
return new InterpreterResult(code, message);
} catch (IOException e) {
LOGGER.error("Can not run " + cmd, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
@ -129,18 +126,6 @@ public class ShellInterpreter extends Interpreter {
}
}
private void appendSessionExpire(InterpreterContext contextInterpreter) {
String appInfoHtml = "\n\n" +
"Previous Keytab session is expired, new Keytab session is created. " +
"Will execute this paragraph again!" +
"\n\n";
try {
contextInterpreter.out.getCurrentOutput().write(appInfoHtml);
} catch (IOException eIOException) {
LOGGER.error("Error writing to currentOutput", eIOException);
}
}
@Override
public void cancel(InterpreterContext context) {
DefaultExecutor executor = executors.remove(context.getParagraphId());
@ -171,4 +156,16 @@ public class ShellInterpreter extends Interpreter {
return null;
}
@Override
protected boolean runKerberosLogin() {
try {
ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
} catch (Exception e) {
return false;
}
return true;
}
}

View file

@ -16,6 +16,11 @@
*/
package org.apache.zeppelin.interpreter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Interpreter related constants
*
@ -32,4 +37,17 @@ public class Constants {
public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100;
public static final Map<String, TimeUnit> TIME_SUFFIXES;
static {
TIME_SUFFIXES = new HashMap<>();
TIME_SUFFIXES.put("us", TimeUnit.MICROSECONDS);
TIME_SUFFIXES.put("ms", TimeUnit.MILLISECONDS);
TIME_SUFFIXES.put("s", TimeUnit.SECONDS);
TIME_SUFFIXES.put("m", TimeUnit.MINUTES);
TIME_SUFFIXES.put("min", TimeUnit.MINUTES);
TIME_SUFFIXES.put("h", TimeUnit.HOURS);
TIME_SUFFIXES.put("d", TimeUnit.DAYS);
}
}

View file

@ -27,6 +27,12 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.zeppelin.annotation.Experimental;
@ -50,6 +56,8 @@ import org.slf4j.LoggerFactory;
*/
public abstract class Interpreter {
Integer kinitFailCount = 0;
/**
* Opens interpreter. You may want to place your initialize routine here.
* open() is called only once
@ -486,4 +494,70 @@ public abstract class Interpreter {
}
return null;
}
public String getKerberosRefreshInterval() {
if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) {
return "1d";
} else {
return System.getenv("KERBEROS_REFRESH_INTERVAL");
}
}
public Integer kinitFailThreshold() {
if (System.getenv("KINIT_FAIL_THRESHOLD") == null) {
return 5;
} else {
return new Integer(System.getenv("KINIT_FAIL_THRESHOLD"));
}
}
public Long getTimeAsMs(String time) {
if (time == null) {
logger.error("Cannot convert to time value.", time);
time = "1d";
}
Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase());
if (!m.matches()) {
throw new IllegalArgumentException("Invalid time string: " + time);
}
long val = Long.parseLong(m.group(1));
String suffix = m.group(2);
if (suffix != null && !Constants.TIME_SUFFIXES.containsKey(suffix)) {
throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\"");
}
return TimeUnit.MILLISECONDS.convert(val,
suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
}
protected abstract boolean runKerberosLogin();
protected ScheduledExecutorService startKerberosLoginThread() {
final ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(new Callable() {
public Object call() throws Exception {
if (runKerberosLogin()) {
// schedule another kinit run with a fixed delay.
scheduledExecutorService
.schedule(this, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
} else {
// schedule another retry at once or fail the livy server if too many times kinit fail
if (kinitFailCount >= kinitFailThreshold()) {
close();
} else {
scheduledExecutorService.submit(this);
}
}
return null;
}
}, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
return scheduledExecutorService;
}
}