call startKerberosLoginThread and shutdownExecutorService in parent class

This commit is contained in:
prabhjyotsingh 2017-06-30 20:56:50 +05:30
parent 57ea80c0c3
commit 7f8b8672b6
3 changed files with 41 additions and 13 deletions

View file

@ -172,6 +172,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
@Override
public void open() {
super.open();
for (String propertyKey : property.stringPropertyNames()) {
logger.debug("propertyKey: {}", propertyKey);
String[] keyValue = propertyKey.split("\\.", 2);
@ -207,13 +208,18 @@ public class JDBCInterpreter extends KerberosInterpreter {
logger.debug("JDBC PropretiesMap: {}", basePropretiesMap);
setMaxLineResults();
}
protected boolean isKerboseEnabled() {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
if (authType.equals(KERBEROS)) {
startKerberosLoginThread();
return true;
}
return false;
}
private void setMaxLineResults() {
if (basePropretiesMap.containsKey(COMMON_KEY) &&
basePropretiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) {
@ -283,6 +289,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
@Override
public void close() {
super.close();
try {
initStatementMap();
initConnectionPoolMap();

View file

@ -57,17 +57,14 @@ public class ShellInterpreter extends KerberosInterpreter {
@Override
public void open() {
super.open();
LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
executors = new ConcurrentHashMap<>();
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
startKerberosLoginThread();
}
}
@Override
public void close() {
shutdownExecutorService();
super.close();
for (String executorKey : executors.keySet()) {
DefaultExecutor executor = executors.remove(executorKey);
if (executor != null) {
@ -171,4 +168,13 @@ public class ShellInterpreter extends KerberosInterpreter {
return true;
}
@Override
protected boolean isKerboseEnabled() {
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type")) && getProperty(
"zeppelin.shell.auth.type").equalsIgnoreCase("kerberos")) {
return true;
}
return false;
}
}

View file

@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
public abstract class KerberosInterpreter extends Interpreter {
private Integer kinitFailCount = 0;
protected ScheduledExecutorService scheduledExecutorService;
public static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class);
private ScheduledExecutorService scheduledExecutorService;
private static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class);
public KerberosInterpreter(Properties property) {
super(property);
@ -48,7 +48,22 @@ public abstract class KerberosInterpreter extends Interpreter {
@ZeppelinApi
protected abstract boolean runKerberosLogin();
public String getKerberosRefreshInterval() {
@ZeppelinApi
protected abstract boolean isKerboseEnabled();
public void open() {
if (isKerboseEnabled()) {
startKerberosLoginThread();
}
}
public void close() {
if (isKerboseEnabled()) {
shutdownExecutorService();
}
}
private String getKerberosRefreshInterval() {
if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) {
return "1d";
} else {
@ -56,7 +71,7 @@ public abstract class KerberosInterpreter extends Interpreter {
}
}
public Integer kinitFailThreshold() {
private Integer kinitFailThreshold() {
if (System.getenv("KINIT_FAIL_THRESHOLD") == null) {
return 5;
} else {
@ -64,7 +79,7 @@ public abstract class KerberosInterpreter extends Interpreter {
}
}
public Long getTimeAsMs(String time) {
private Long getTimeAsMs(String time) {
if (time == null) {
logger.error("Cannot convert to time value.", time);
time = "1d";
@ -86,7 +101,7 @@ public abstract class KerberosInterpreter extends Interpreter {
suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
}
protected ScheduledExecutorService startKerberosLoginThread() {
private ScheduledExecutorService startKerberosLoginThread() {
scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(new Callable() {
@ -116,7 +131,7 @@ public abstract class KerberosInterpreter extends Interpreter {
return scheduledExecutorService;
}
protected void shutdownExecutorService() {
private void shutdownExecutorService() {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
}