mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
add KerberosInterpreter and move kinit loginc there.
This commit is contained in:
parent
856c8716ec
commit
df6645a64b
3 changed files with 117 additions and 82 deletions
|
|
@ -24,15 +24,14 @@ import java.util.List;
|
|||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.KerberosInterpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
|
|
@ -45,13 +44,12 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Shell interpreter for Zeppelin.
|
||||
*/
|
||||
public class ShellInterpreter extends Interpreter {
|
||||
public class ShellInterpreter extends KerberosInterpreter {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
private static final String TIMEOUT_PROPERTY = "shell.command.timeout.millisecs";
|
||||
private final boolean isWindows = System.getProperty("os.name").startsWith("Windows");
|
||||
private final String shell = isWindows ? "cmd /c" : "bash -c";
|
||||
ConcurrentHashMap<String, DefaultExecutor> executors;
|
||||
ScheduledExecutorService scheduledExecutorService;
|
||||
|
||||
public ShellInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -62,15 +60,13 @@ public class ShellInterpreter extends Interpreter {
|
|||
LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
|
||||
executors = new ConcurrentHashMap<>();
|
||||
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
|
||||
scheduledExecutorService = startKerberosLoginThread();
|
||||
startKerberosLoginThread();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (scheduledExecutorService != null) {
|
||||
scheduledExecutorService.shutdown();
|
||||
}
|
||||
shutdownExecutorService();
|
||||
|
||||
for (String executorKey : executors.keySet()) {
|
||||
DefaultExecutor executor = executors.remove(executorKey);
|
||||
|
|
|
|||
|
|
@ -27,12 +27,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.reflect.FieldUtils;
|
||||
import org.apache.zeppelin.annotation.Experimental;
|
||||
|
|
@ -56,8 +50,6 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public abstract class Interpreter {
|
||||
|
||||
Integer kinitFailCount = 0;
|
||||
|
||||
/**
|
||||
* Opens interpreter. You may want to place your initialize routine here.
|
||||
* open() is called only once
|
||||
|
|
@ -494,70 +486,4 @@ public abstract class Interpreter {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public String getKerberosRefreshInterval() {
|
||||
if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) {
|
||||
return "1d";
|
||||
} else {
|
||||
return System.getenv("KERBEROS_REFRESH_INTERVAL");
|
||||
}
|
||||
}
|
||||
|
||||
public Integer kinitFailThreshold() {
|
||||
if (System.getenv("KINIT_FAIL_THRESHOLD") == null) {
|
||||
return 5;
|
||||
} else {
|
||||
return new Integer(System.getenv("KINIT_FAIL_THRESHOLD"));
|
||||
}
|
||||
}
|
||||
|
||||
public Long getTimeAsMs(String time) {
|
||||
if (time == null) {
|
||||
logger.error("Cannot convert to time value.", time);
|
||||
time = "1d";
|
||||
}
|
||||
|
||||
Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase());
|
||||
if (!m.matches()) {
|
||||
throw new IllegalArgumentException("Invalid time string: " + time);
|
||||
}
|
||||
|
||||
long val = Long.parseLong(m.group(1));
|
||||
String suffix = m.group(2);
|
||||
|
||||
if (suffix != null && !Constants.TIME_SUFFIXES.containsKey(suffix)) {
|
||||
throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\"");
|
||||
}
|
||||
|
||||
return TimeUnit.MILLISECONDS.convert(val,
|
||||
suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
protected abstract boolean runKerberosLogin();
|
||||
|
||||
protected ScheduledExecutorService startKerberosLoginThread() {
|
||||
final ScheduledExecutorService scheduledExecutorService =
|
||||
Executors.newScheduledThreadPool(1);
|
||||
|
||||
scheduledExecutorService.schedule(new Callable() {
|
||||
public Object call() throws Exception {
|
||||
|
||||
if (runKerberosLogin()) {
|
||||
// schedule another kinit run with a fixed delay.
|
||||
scheduledExecutorService
|
||||
.schedule(this, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
// schedule another retry at once or fail the livy server if too many times kinit fail
|
||||
if (kinitFailCount >= kinitFailThreshold()) {
|
||||
close();
|
||||
} else {
|
||||
scheduledExecutorService.submit(this);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
|
||||
|
||||
return scheduledExecutorService;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.zeppelin.annotation.ZeppelinApi;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class KerberosInterpreter extends Interpreter {
|
||||
|
||||
Integer kinitFailCount = 0;
|
||||
protected ScheduledExecutorService scheduledExecutorService;
|
||||
public static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class);
|
||||
|
||||
public KerberosInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@ZeppelinApi
|
||||
protected abstract boolean runKerberosLogin();
|
||||
|
||||
public String getKerberosRefreshInterval() {
|
||||
if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) {
|
||||
return "1d";
|
||||
} else {
|
||||
return System.getenv("KERBEROS_REFRESH_INTERVAL");
|
||||
}
|
||||
}
|
||||
|
||||
public Integer kinitFailThreshold() {
|
||||
if (System.getenv("KINIT_FAIL_THRESHOLD") == null) {
|
||||
return 5;
|
||||
} else {
|
||||
return new Integer(System.getenv("KINIT_FAIL_THRESHOLD"));
|
||||
}
|
||||
}
|
||||
|
||||
public Long getTimeAsMs(String time) {
|
||||
if (time == null) {
|
||||
logger.error("Cannot convert to time value.", time);
|
||||
time = "1d";
|
||||
}
|
||||
|
||||
Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase());
|
||||
if (!m.matches()) {
|
||||
throw new IllegalArgumentException("Invalid time string: " + time);
|
||||
}
|
||||
|
||||
long val = Long.parseLong(m.group(1));
|
||||
String suffix = m.group(2);
|
||||
|
||||
if (suffix != null && !Constants.TIME_SUFFIXES.containsKey(suffix)) {
|
||||
throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\"");
|
||||
}
|
||||
|
||||
return TimeUnit.MILLISECONDS.convert(val,
|
||||
suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
protected ScheduledExecutorService startKerberosLoginThread() {
|
||||
scheduledExecutorService = Executors.newScheduledThreadPool(1);
|
||||
|
||||
scheduledExecutorService.schedule(new Callable() {
|
||||
public Object call() throws Exception {
|
||||
|
||||
if (runKerberosLogin()) {
|
||||
// schedule another kinit run with a fixed delay.
|
||||
scheduledExecutorService
|
||||
.schedule(this, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
// schedule another retry at once or close the interpreter if too many times kinit fails
|
||||
if (kinitFailCount >= kinitFailThreshold()) {
|
||||
close();
|
||||
} else {
|
||||
scheduledExecutorService.submit(this);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
|
||||
|
||||
return scheduledExecutorService;
|
||||
}
|
||||
|
||||
protected void shutdownExecutorService() {
|
||||
if (scheduledExecutorService != null) {
|
||||
scheduledExecutorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue