Refactor autoconf config saving logic to avoid overriding configs set elsewhere

This commit is contained in:
Théophile Diot 2024-06-03 11:38:40 +01:00
parent f598bd789a
commit dc5f347fc6
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
9 changed files with 177 additions and 282 deletions

View file

@ -4,13 +4,13 @@ from contextlib import suppress
from os import getenv
from time import sleep
from copy import deepcopy
from typing import Any, Dict, List, Optional
from ConfigCaller import ConfigCaller # type: ignore
from Database import Database # type: ignore
from logger import setup_logger # type: ignore
class Config(ConfigCaller):
class Config:
def __init__(self):
super().__init__()
self.__logger = setup_logger("Config", getenv("LOG_LEVEL", "INFO"))
@ -40,48 +40,64 @@ class Config(ConfigCaller):
self._settings.update(plugin["settings"])
def __get_full_env(self) -> dict:
env_instances = {"SERVER_NAME": ""}
for instance in self.__instances:
for variable, value in instance["env"].items():
env_instances[variable] = value
env_services = {}
config = {"SERVER_NAME": "", "MULTISITE": "yes"}
for service in self.__services:
server_name = service["SERVER_NAME"].split(" ")[0]
if not server_name:
continue
for variable, value in service.items():
env_services[f"{server_name}_{variable}"] = value
env_instances["SERVER_NAME"] += f" {server_name}"
env_instances["SERVER_NAME"] = env_instances["SERVER_NAME"].strip()
return self._full_env(env_instances, env_services)
if self._db.is_setting(variable, multisite=True):
config[f"{server_name}_{variable}"] = value
config["SERVER_NAME"] += f" {server_name}"
config["SERVER_NAME"] = config["SERVER_NAME"].strip()
return config
def update_needed(self, instances, services, configs={}) -> bool:
def update_needed(self, instances: List[Dict[str, Any]], services: List[Dict[str, str]], configs: Optional[Dict[str, Dict[str, bytes]]] = None) -> bool:
if instances != self.__instances:
return True
elif services != self.__services:
return True
elif configs != self.__configs:
elif (configs or {}) != self.__configs:
return True
return False
def wait_applying(self):
def have_to_wait(self) -> bool:
curr_changes = self._db.check_changes()
return isinstance(curr_changes, str) or any(curr_changes.values())
def wait_applying(self, startup: bool = False):
i = 0
while i < 10:
curr_changes = self._db.check_changes()
if isinstance(curr_changes, str):
self.__logger.error(f"An error occurred when checking for changes in the database : {curr_changes}")
if not startup:
self.__logger.error(f"An error occurred when checking for changes in the database : {curr_changes}")
elif not any(curr_changes.values()):
break
else:
self.__logger.warning(
"Scheduler is already applying a configuration, retrying in 5 seconds ...",
)
self.__logger.warning("Scheduler is already applying a configuration, retrying in 5 seconds ...")
i += 1
sleep(5)
if i >= 10:
if i >= 60:
raise Exception("Too many retries while waiting for scheduler to apply configuration...")
def apply(self, instances, services, configs={}, first=False) -> bool:
def apply(
self, instances: List[Dict[str, Any]], services: List[Dict[str, str]], configs: Optional[Dict[str, Dict[str, bytes]]] = None, first: bool = False
) -> bool:
success = True
err = self._try_database_readonly()
if err:
return False
while not self._db.is_initialized():
self.__logger.warning("Database is not initialized, retrying in 5 seconds ...")
sleep(5)
self.wait_applying()
configs = configs or {}
changes = []
if instances != self.__instances or first:
self.__instances = instances
@ -112,33 +128,10 @@ class Config(ConfigCaller):
custom_configs.append(
{
"value": data,
"exploded": [
site,
config_type,
name.replace(".conf", ""),
],
"exploded": [site, config_type, name.replace(".conf", "")],
}
)
err = self._try_database_readonly()
if err:
return False
while not self._db.is_initialized():
self.__logger.warning("Database is not initialized, retrying in 5 seconds ...")
sleep(5)
# wait until changes are applied
while True:
curr_changes = self._db.check_changes()
if isinstance(curr_changes, str):
self.__logger.error(f"An error occurred when checking for changes in the database : {curr_changes}")
elif not any(curr_changes.values()):
break
else:
self.__logger.warning("Scheduler is already applying a configuration, retrying in 5 seconds ...")
sleep(5)
# update instances in database
if "instances" in changes:
err = self._db.update_instances(self.__instances, changed=False)

View file

@ -66,25 +66,21 @@ class Controller(Config):
def _to_services(self, controller_service):
pass
@abstractmethod
def _get_static_services(self):
pass
def _set_autoconf_load_db(self):
if not self._loaded:
ret = self._db.set_autoconf_load(True)
if ret:
self._logger.warning(
f"Can't set autoconf loaded metadata to true in database: {ret}",
)
self._logger.warning(f"Can't set autoconf loaded metadata to true in database: {ret}")
else:
self._loaded = True
def get_services(self):
while not self._get_controller_services():
sleep(1)
services = []
for controller_service in self._get_controller_services():
services.extend(self._to_services(controller_service))
services.extend(self._get_static_services())
return services
@abstractmethod

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from time import sleep
from typing import Any, Dict, List
from docker import DockerClient
from re import compile as re_compile
@ -30,8 +31,7 @@ class DockerController(Controller):
for env in controller_instance.attrs["Config"]["Env"]:
variable = env.split("=")[0]
value = env.replace(f"{variable}=", "", 1)
if self._is_setting(variable):
instance["env"][variable] = value
instance["env"][variable] = value
return [instance]
def _to_services(self, controller_service) -> List[dict]:
@ -39,35 +39,9 @@ class DockerController(Controller):
for variable, value in controller_service.labels.items():
if not variable.startswith("bunkerweb."):
continue
real_variable = variable.replace("bunkerweb.", "", 1)
if not self._is_setting_context(real_variable, "multisite"):
continue
service[real_variable] = value
service[variable.replace("bunkerweb.", "", 1)] = value
return [service]
def _get_static_services(self) -> List[dict]:
services = []
variables = {}
for instance in self.__client.containers.list(filters={"label": "bunkerweb.INSTANCE"}):
if not instance.attrs or not instance.attrs.get("Config", {}).get("Env"):
continue
for env in instance.attrs["Config"]["Env"]:
variable = env.split("=")[0]
value = env.replace(f"{variable}=", "", 1)
variables[variable] = value
if "SERVER_NAME" in variables and variables["SERVER_NAME"].strip():
for server_name in variables["SERVER_NAME"].strip().split(" "):
service = {"SERVER_NAME": server_name}
for variable, value in variables.items():
prefix = variable.split("_")[0]
real_variable = variable.replace(f"{prefix}_", "", 1)
if prefix == server_name and self._is_setting_context(real_variable, "multisite"):
service[real_variable] = value
services.append(service)
return services
def get_configs(self) -> Dict[str, Dict[str, Any]]:
configs = {config_type: {} for config_type in self._supported_config_types}
# get site configs from labels
@ -85,9 +59,7 @@ class DockerController(Controller):
# check if server_name exists
if not self._is_service_present(server_name):
self._logger.warning(
f"Ignoring config because {server_name} doesn't exist",
)
self._logger.warning(f"Ignoring config because {server_name} doesn't exist")
continue
for variable, value in labels.items():
@ -117,24 +89,34 @@ class DockerController(Controller):
def process_events(self):
self._set_autoconf_load_db()
for event in self.__client.events(decode=True, filters={"type": "container"}):
applied = False
try:
if not self.__process_event(event):
continue
self.wait_applying()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not self.update_needed(self._instances, self._services, configs=self._configs):
continue
self._logger.info("Caught Docker event, deploying new configuration ...")
if not self.apply_config():
self._logger.error("Error while deploying new configuration")
else:
self._logger.info(
"Successfully deployed new configuration 🚀",
)
to_apply = False
while not applied:
waiting = self.have_to_wait()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
self._set_autoconf_load_db()
if not to_apply and not self.update_needed(self._instances, self._services, configs=self._configs):
applied = True
continue
to_apply = True
if waiting:
sleep(1)
continue
self._logger.info("Caught Docker event, deploying new configuration ...")
if not self.apply_config():
self._logger.error("Error while deploying new configuration")
else:
self._logger.info("Successfully deployed new configuration 🚀")
self._set_autoconf_load_db()
applied = True
except:
self._logger.error(f"Exception while processing events :\n{format_exc()}")

View file

@ -55,9 +55,7 @@ class IngressController(Controller):
) in controller_service.metadata.annotations.items():
if not annotation.startswith("bunkerweb.io/"):
continue
variable = annotation.replace("bunkerweb.io/", "", 1)
if self._is_setting(variable):
instance["env"][variable] = value
instance["env"][annotation.replace("bunkerweb.io/", "", 1)] = value
return [instance]
def _get_controller_services(self) -> list:
@ -139,9 +137,7 @@ class IngressController(Controller):
server_name = service["SERVER_NAME"].strip().split(" ")[0]
if not variable.startswith(f"{server_name}_"):
continue
variable = variable.replace(f"{server_name}_", "", 1)
if self._is_setting_context(variable, "multisite"):
service[variable] = value
service[variable.replace(f"{server_name}_", "", 1)] = value
# parse tls
if controller_service.spec.tls:
@ -175,34 +171,6 @@ class IngressController(Controller):
service["CUSTOM_SSL_KEY_DATA"] = secret_tls.data["tls.key"]
return services
def _get_static_services(self) -> List[dict]:
services = []
variables = {}
for instance in self.__corev1.list_pod_for_all_namespaces(watch=False).items:
if not instance.metadata.annotations or "bunkerweb.io/INSTANCE" not in instance.metadata.annotations:
continue
pod = None
for container in instance.spec.containers:
if container.name == "bunkerweb":
pod = container
break
if not pod:
continue
variables = {env.name: env.value or "" for env in pod.env}
if "SERVER_NAME" in variables and variables["SERVER_NAME"].strip():
for server_name in variables["SERVER_NAME"].strip().split(" "):
service = {"SERVER_NAME": server_name}
for variable, value in variables.items():
prefix = variable.split("_")[0]
real_variable = variable.replace(f"{prefix}_", "", 1)
if prefix == server_name and self._is_setting_context(real_variable, "multisite"):
service[real_variable] = value
services.append(service)
return services
def get_configs(self) -> dict:
configs = {config_type: {} for config_type in self._supported_config_types}
for configmap in self.__corev1.list_config_map_for_all_namespaces(watch=False).items:
@ -270,42 +238,49 @@ class IngressController(Controller):
while True:
locked = False
error = False
applied = False
try:
for event in w.stream(what):
applied = False
self.__internal_lock.acquire()
locked = True
if not self.__process_event(event):
self.__internal_lock.release()
locked = False
continue
self.wait_applying()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not self.update_needed(self._instances, self._services, configs=self._configs):
self.__internal_lock.release()
locked = False
continue
self._logger.info(
f"Caught kubernetes event ({watch_type}), deploying new configuration ...",
)
try:
ret = self.apply_config()
if not ret:
self._logger.error(
"Error while deploying new configuration ...",
)
else:
self._logger.info(
"Successfully deployed new configuration 🚀",
)
self._set_autoconf_load_db()
except:
self._logger.error(
f"Exception while deploying new configuration :\n{format_exc()}",
)
to_apply = False
while not applied:
waiting = self.have_to_wait()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not to_apply and not self.update_needed(self._instances, self._services, configs=self._configs):
self.__internal_lock.release()
locked = False
applied = True
continue
to_apply = True
if waiting:
sleep(1)
continue
self._logger.info(f"Caught kubernetes event ({watch_type}), deploying new configuration ...")
try:
ret = self.apply_config()
if not ret:
self._logger.error("Error while deploying new configuration ...")
else:
self._logger.info("Successfully deployed new configuration 🚀")
self._set_autoconf_load_db()
except:
self._logger.error(f"Exception while deploying new configuration :\n{format_exc()}")
applied = True
self.__internal_lock.release()
locked = False
except ApiException as e:

View file

@ -35,8 +35,7 @@ class SwarmController(Controller):
for env in controller_instance.attrs["Spec"]["TaskTemplate"]["ContainerSpec"]["Env"]:
variable = env.split("=")[0]
value = env.replace(f"{variable}=", "", 1)
if self._is_setting(variable):
instance_env[variable] = value
instance_env[variable] = value
for task in controller_instance.tasks():
if task["DesiredState"] != "running":
@ -57,35 +56,9 @@ class SwarmController(Controller):
for variable, value in controller_service.attrs["Spec"]["Labels"].items():
if not variable.startswith("bunkerweb."):
continue
real_variable = variable.replace("bunkerweb.", "", 1)
if not self._is_setting_context(real_variable, "multisite"):
continue
service[real_variable] = value
service[variable.replace("bunkerweb.", "", 1)] = value
return [service]
def _get_static_services(self) -> List[dict]:
services = []
variables = {}
for instance in self.__client.services.list(filters={"label": "bunkerweb.INSTANCE"}):
if not instance.attrs or not instance.attrs.get("Spec", {}).get("TaskTemplate", {}).get("ContainerSpec", {}).get("Env"):
continue
for env in instance.attrs["Spec"]["TaskTemplate"]["ContainerSpec"]["Env"]:
variable = env.split("=")[0]
value = env.replace(f"{variable}=", "", 1)
variables[variable] = value
if "SERVER_NAME" in variables and variables["SERVER_NAME"].strip():
for server_name in variables["SERVER_NAME"].strip().split(" "):
service = {}
service["SERVER_NAME"] = server_name
for variable, value in variables.items():
prefix = variable.split("_")[0]
real_variable = variable.replace(f"{prefix}_", "", 1)
if prefix == server_name and self._is_setting_context(real_variable, "multisite"):
service[real_variable] = value
services.append(service)
return services
def get_configs(self) -> Dict[str, Dict[str, Any]]:
self.__swarm_configs = []
configs = {}
@ -148,36 +121,51 @@ class SwarmController(Controller):
while True:
locked = False
error = False
applied = False
try:
for event in self.__client.events(decode=True, filters={"type": event_type}):
applied = False
self.__internal_lock.acquire()
locked = True
if not self.__process_event(event):
self.__internal_lock.release()
locked = False
continue
try:
self.wait_applying()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not self.update_needed(self._instances, self._services, configs=self._configs):
self.__internal_lock.release()
locked = False
continue
self._logger.info(f"Caught Swarm event ({event_type}), deploying new configuration ...")
if not self.apply_config():
self._logger.error("Error while deploying new configuration")
else:
self._logger.info(
"Successfully deployed new configuration 🚀",
)
self._set_autoconf_load_db()
except:
to_apply = False
while not applied:
waiting = self.have_to_wait()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not to_apply and not self.update_needed(self._instances, self._services, configs=self._configs):
self.__internal_lock.release()
locked = False
applied = True
continue
to_apply = True
if waiting:
sleep(1)
continue
self._logger.info(f"Caught Swarm event ({event_type}), deploying new configuration ...")
if not self.apply_config():
self._logger.error("Error while deploying new configuration")
else:
self._logger.info(
"Successfully deployed new configuration 🚀",
)
self._set_autoconf_load_db()
applied = True
except BaseException:
self._logger.error(f"Exception while processing Swarm event ({event_type}) :\n{format_exc()}")
self.__internal_lock.release()
locked = False
finally:
self.__internal_lock.release()
locked = False
except:
self._logger.error(
f"Exception while reading Swarm event ({event_type}) :\n{format_exc()}",

View file

@ -59,11 +59,7 @@ try:
logger.info(f"Instance #{i} : {instance['name']}")
i += 1
# Run first configuration
ret = controller.apply_config()
if not ret:
logger.error("Error while applying initial configuration")
_exit(1)
controller.wait_applying(True)
# Process events
Path(sep, "var", "tmp", "bunkerweb", "autoconf.healthy").write_text("ok")

View file

@ -390,6 +390,16 @@ class Database:
except (ProgrammingError, OperationalError, DatabaseError):
return False
def is_setting(self, setting: str, *, multisite: bool = False) -> bool:
"""Check if the setting exists in the database and optionally if it's multisite"""
with self.__db_session() as session:
try:
if multisite:
return session.query(Settings).filter_by(id=setting, context="multisite").first() is not None
return session.query(Settings).filter_by(name=setting).first() is not None
except (ProgrammingError, OperationalError):
return False
def initialize_db(self, version: str, integration: str = "Unknown") -> str:
"""Initialize the database"""
with self.__db_session() as session:
@ -1167,7 +1177,11 @@ class Database:
services = config.get("SERVER_NAME", [])
if isinstance(services, str):
services = services.split(" ")
services = services.strip().split(" ")
for i, service in enumerate(services):
if not service:
services.pop(i)
if db_services:
missing_ids = [service.id for service in db_services if service.method == method and service.id not in services]
@ -1247,7 +1261,10 @@ class Database:
changed_plugins.add(setting.plugin_id)
to_put.append(Services_settings(service_id=server_name, setting_id=key, value=value, suffix=suffix, method=method))
elif method in (service_setting.method, "autoconf") and service_setting.value != value:
elif (
method == service_setting.method
or (service_setting.method not in ("scheduler", "autoconf") and method in ("scheduler", "autoconf"))
) and service_setting.value != value:
changed_plugins.add(setting.plugin_id)
query = session.query(Services_settings).filter(
Services_settings.service_id == server_name,
@ -1277,7 +1294,9 @@ class Database:
changed_plugins.add(setting.plugin_id)
to_put.append(Global_values(setting_id=key, value=value, suffix=suffix, method=method))
elif method in (global_value.method, "autoconf") and global_value.value != value:
elif (
method == global_value.method or (global_value.method not in ("scheduler", "autoconf") and method in ("scheduler", "autoconf"))
) and global_value.value != value:
changed_plugins.add(setting.plugin_id)
query = session.query(Global_values).filter(Global_values.setting_id == key, Global_values.suffix == suffix)
@ -1320,14 +1339,16 @@ class Database:
changed_plugins.add(setting.plugin_id)
to_put.append(Global_values(setting_id=key, value=value, suffix=suffix, method=method))
elif global_value.method == method and value != global_value.value:
elif (
method == global_value.method or (global_value.method not in ("scheduler", "autoconf") and method in ("scheduler", "autoconf"))
) and value != global_value.value:
changed_plugins.add(setting.plugin_id)
query = session.query(Global_values).filter(Global_values.setting_id == key, Global_values.suffix == suffix)
if value == setting.default:
query.delete()
continue
query.update({Global_values.value: value})
query.update({Global_values.value: value, Global_values.method: method})
if changed_services:
changed_plugins = set(plugin.id for plugin in session.query(Plugins).with_entities(Plugins.id).all())
@ -1423,12 +1444,12 @@ class Database:
if not custom_conf:
to_put.append(Custom_configs(**custom_config))
elif custom_config["checksum"] != custom_conf.checksum and method in (custom_conf.method, "autoconf"):
elif custom_config["checksum"] != custom_conf.checksum and (
method == custom_conf.method or (custom_conf.method not in ("scheduler", "autoconf") and method in ("scheduler", "autoconf"))
):
custom_conf.data = custom_config["data"]
custom_conf.checksum = custom_config["checksum"]
if method == "autoconf":
custom_conf.method = method
custom_conf.method = method
if changed:
with suppress(ProgrammingError, OperationalError):
metadata = session.query(Metadata).get(1)

View file

@ -86,6 +86,9 @@ class Configurator:
return {}
servers = {}
for server_name in self.__variables["SERVER_NAME"].strip().split(" "):
if not server_name:
continue
if not re_search(self.__settings["SERVER_NAME"]["regex"], server_name):
self.__logger.warning(f"Ignoring server name {server_name} because regex is not valid")
continue

View file

@ -1,59 +0,0 @@
#!/usr/bin/env python3
from glob import glob
from json import JSONDecodeError, loads
from os import sep
from os.path import join
from pathlib import Path
from re import match
from traceback import format_exc
from typing import Any, Dict, Literal, Union
from logger import setup_logger
class ConfigCaller:
def __init__(self):
self.__logger = setup_logger("Config", "INFO")
self._settings = loads(Path(sep, "usr", "share", "bunkerweb", "settings.json").read_text(encoding="utf-8"))
for plugin in glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json")) + glob(
join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json")
):
try:
self._settings.update(loads(Path(plugin).read_text(encoding="utf-8"))["settings"])
except KeyError:
self.__logger.error(
f'Error while loading plugin metadata file at {plugin} : missing "settings" key',
)
except JSONDecodeError:
self.__logger.error(
f"Exception while loading plugin metadata file at {plugin} :\n{format_exc()}",
)
def _is_setting(self, setting) -> bool:
return setting in self._settings
def _is_setting_context(self, setting: str, context: Union[Literal["global"], Literal["multisite"]]) -> bool:
if self._is_setting(setting):
return self._settings[setting]["context"] == context
elif match(r"^.+_\d+$", setting):
multiple_setting = "_".join(setting.split("_")[:-1])
return (
self._is_setting(multiple_setting) and self._settings[multiple_setting]["context"] == context and "multiple" in self._settings[multiple_setting]
)
return False
def _full_env(self, env_instances: Dict[str, Any], env_services: Dict[str, Any]) -> Dict[str, Any]:
full_env = {}
# Fill with default values
for k, v in self._settings.items():
full_env[k] = v["default"]
# Replace with instances values
for k, v in env_instances.items():
full_env[k] = v
if not self._is_setting_context(k, "global") and env_instances.get("MULTISITE", "no") == "yes" and env_instances.get("SERVER_NAME", "") != "":
for server_name in env_instances["SERVER_NAME"].split(" "):
full_env[f"{server_name}_{k}"] = v
# Replace with services values
full_env = full_env | env_services
return full_env