chore: Add common_utils module for integration detection in bwcli

This commit is contained in:
Théophile Diot 2024-06-04 18:35:49 +01:00
parent bb1ff3bf8b
commit b74df3e719
No known key found for this signature in database
GPG key ID: FA995104A0BA376A

View file

@ -20,6 +20,7 @@ for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in ((
from API import API # type: ignore
from ApiCaller import ApiCaller # type: ignore
from common_utils import get_integration # type: ignore
from logger import setup_logger # type: ignore
@ -71,7 +72,7 @@ class CLI(ApiCaller):
assert isinstance(self.__variables, dict), "Failed to get variables from database"
self.__integration = self.__detect_integration()
self.__integration = get_integration()
self.__use_redis = self.__get_variable("USE_REDIS", "no") == "yes"
self.__redis = None
if self.__use_redis:
@ -179,7 +180,7 @@ class CLI(ApiCaller):
self.__logger.error("USE_REDIS is set to yes but REDIS_HOST or REDIS_SENTINEL_HOSTS is not set, disabling redis")
self.__use_redis = False
if self.__integration == "linux":
if self.__integration == "Linux":
super().__init__(
[
API(
@ -195,25 +196,6 @@ class CLI(ApiCaller):
def __get_variable(self, variable: str, default: Optional[Any] = None) -> Optional[str]:
return getenv(variable, self.__variables.get(variable, default))
def __detect_integration(self) -> str:
if Path(sep, "usr", "sbin", "nginx").exists():
return "linux"
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
os_release_path = Path(sep, "etc", "os-release")
if self.__get_variable("KUBERNETES_MODE", "no").lower() == "yes": # type: ignore
return "kubernetes"
elif self.__get_variable("SWARM_MODE", "no").lower() == "yes": # type: ignore
return "swarm"
elif self.__get_variable("AUTOCONF_MODE", "no").lower() == "yes": # type: ignore
return "autoconf"
elif integration_path.is_file():
return integration_path.read_text(encoding="utf-8").strip().lower()
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
return "docker"
return "linux"
def unban(self, ip: str) -> Tuple[bool, str]:
if self.__redis:
try: