mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
chore: Add common_utils module for integration detection in bwcli
This commit is contained in:
parent
bb1ff3bf8b
commit
b74df3e719
1 changed files with 3 additions and 21 deletions
|
|
@ -20,6 +20,7 @@ for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in ((
|
|||
|
||||
from API import API # type: ignore
|
||||
from ApiCaller import ApiCaller # type: ignore
|
||||
from common_utils import get_integration # type: ignore
|
||||
from logger import setup_logger # type: ignore
|
||||
|
||||
|
||||
|
|
@ -71,7 +72,7 @@ class CLI(ApiCaller):
|
|||
|
||||
assert isinstance(self.__variables, dict), "Failed to get variables from database"
|
||||
|
||||
self.__integration = self.__detect_integration()
|
||||
self.__integration = get_integration()
|
||||
self.__use_redis = self.__get_variable("USE_REDIS", "no") == "yes"
|
||||
self.__redis = None
|
||||
if self.__use_redis:
|
||||
|
|
@ -179,7 +180,7 @@ class CLI(ApiCaller):
|
|||
self.__logger.error("USE_REDIS is set to yes but REDIS_HOST or REDIS_SENTINEL_HOSTS is not set, disabling redis")
|
||||
self.__use_redis = False
|
||||
|
||||
if self.__integration == "linux":
|
||||
if self.__integration == "Linux":
|
||||
super().__init__(
|
||||
[
|
||||
API(
|
||||
|
|
@ -195,25 +196,6 @@ class CLI(ApiCaller):
|
|||
def __get_variable(self, variable: str, default: Optional[Any] = None) -> Optional[str]:
|
||||
return getenv(variable, self.__variables.get(variable, default))
|
||||
|
||||
def __detect_integration(self) -> str:
|
||||
if Path(sep, "usr", "sbin", "nginx").exists():
|
||||
return "linux"
|
||||
|
||||
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
|
||||
os_release_path = Path(sep, "etc", "os-release")
|
||||
if self.__get_variable("KUBERNETES_MODE", "no").lower() == "yes": # type: ignore
|
||||
return "kubernetes"
|
||||
elif self.__get_variable("SWARM_MODE", "no").lower() == "yes": # type: ignore
|
||||
return "swarm"
|
||||
elif self.__get_variable("AUTOCONF_MODE", "no").lower() == "yes": # type: ignore
|
||||
return "autoconf"
|
||||
elif integration_path.is_file():
|
||||
return integration_path.read_text(encoding="utf-8").strip().lower()
|
||||
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
|
||||
return "docker"
|
||||
|
||||
return "linux"
|
||||
|
||||
def unban(self, ip: str) -> Tuple[bool, str]:
|
||||
if self.__redis:
|
||||
try:
|
||||
|
|
|
|||
Loading…
Reference in a new issue