mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
call startKerberosLoginThread and shutdownExecutorService in parent class
This commit is contained in:
parent
57ea80c0c3
commit
7f8b8672b6
3 changed files with 41 additions and 13 deletions
|
|
@ -172,6 +172,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
|
|||
|
||||
@Override
|
||||
public void open() {
|
||||
super.open();
|
||||
for (String propertyKey : property.stringPropertyNames()) {
|
||||
logger.debug("propertyKey: {}", propertyKey);
|
||||
String[] keyValue = propertyKey.split("\\.", 2);
|
||||
|
|
@ -207,13 +208,18 @@ public class JDBCInterpreter extends KerberosInterpreter {
|
|||
logger.debug("JDBC PropretiesMap: {}", basePropretiesMap);
|
||||
|
||||
setMaxLineResults();
|
||||
}
|
||||
|
||||
|
||||
protected boolean isKerboseEnabled() {
|
||||
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
|
||||
if (authType.equals(KERBEROS)) {
|
||||
startKerberosLoginThread();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
private void setMaxLineResults() {
|
||||
if (basePropretiesMap.containsKey(COMMON_KEY) &&
|
||||
basePropretiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) {
|
||||
|
|
@ -283,6 +289,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
super.close();
|
||||
try {
|
||||
initStatementMap();
|
||||
initConnectionPoolMap();
|
||||
|
|
|
|||
|
|
@ -57,17 +57,14 @@ public class ShellInterpreter extends KerberosInterpreter {
|
|||
|
||||
@Override
|
||||
public void open() {
|
||||
super.open();
|
||||
LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
|
||||
executors = new ConcurrentHashMap<>();
|
||||
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
|
||||
startKerberosLoginThread();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
shutdownExecutorService();
|
||||
|
||||
super.close();
|
||||
for (String executorKey : executors.keySet()) {
|
||||
DefaultExecutor executor = executors.remove(executorKey);
|
||||
if (executor != null) {
|
||||
|
|
@ -171,4 +168,13 @@ public class ShellInterpreter extends KerberosInterpreter {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isKerboseEnabled() {
|
||||
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type")) && getProperty(
|
||||
"zeppelin.shell.auth.type").equalsIgnoreCase("kerberos")) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
|
|||
public abstract class KerberosInterpreter extends Interpreter {
|
||||
|
||||
private Integer kinitFailCount = 0;
|
||||
protected ScheduledExecutorService scheduledExecutorService;
|
||||
public static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class);
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class);
|
||||
|
||||
public KerberosInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -48,7 +48,22 @@ public abstract class KerberosInterpreter extends Interpreter {
|
|||
@ZeppelinApi
|
||||
protected abstract boolean runKerberosLogin();
|
||||
|
||||
public String getKerberosRefreshInterval() {
|
||||
@ZeppelinApi
|
||||
protected abstract boolean isKerboseEnabled();
|
||||
|
||||
public void open() {
|
||||
if (isKerboseEnabled()) {
|
||||
startKerberosLoginThread();
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if (isKerboseEnabled()) {
|
||||
shutdownExecutorService();
|
||||
}
|
||||
}
|
||||
|
||||
private String getKerberosRefreshInterval() {
|
||||
if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) {
|
||||
return "1d";
|
||||
} else {
|
||||
|
|
@ -56,7 +71,7 @@ public abstract class KerberosInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
public Integer kinitFailThreshold() {
|
||||
private Integer kinitFailThreshold() {
|
||||
if (System.getenv("KINIT_FAIL_THRESHOLD") == null) {
|
||||
return 5;
|
||||
} else {
|
||||
|
|
@ -64,7 +79,7 @@ public abstract class KerberosInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
public Long getTimeAsMs(String time) {
|
||||
private Long getTimeAsMs(String time) {
|
||||
if (time == null) {
|
||||
logger.error("Cannot convert to time value.", time);
|
||||
time = "1d";
|
||||
|
|
@ -86,7 +101,7 @@ public abstract class KerberosInterpreter extends Interpreter {
|
|||
suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
protected ScheduledExecutorService startKerberosLoginThread() {
|
||||
private ScheduledExecutorService startKerberosLoginThread() {
|
||||
scheduledExecutorService = Executors.newScheduledThreadPool(1);
|
||||
|
||||
scheduledExecutorService.schedule(new Callable() {
|
||||
|
|
@ -116,7 +131,7 @@ public abstract class KerberosInterpreter extends Interpreter {
|
|||
return scheduledExecutorService;
|
||||
}
|
||||
|
||||
protected void shutdownExecutorService() {
|
||||
private void shutdownExecutorService() {
|
||||
if (scheduledExecutorService != null) {
|
||||
scheduledExecutorService.shutdown();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue