Refactor JobScheduler reload method to use custom timeout

The custom timeout is determined by the value of the RELOAD_MIN_TIMEOUT environment variable, which is set to 5 seconds by default. If the value is not an integer, an error is logged and the timeout defaults to 5 seconds. The reload method now uses the maximum of the custom timeout and twice the length of the SERVER_NAME list to set the timeout for the API call to "/reload".

Refactor ApiCaller send_files method to support response option

If the response parameter is set to True, the method returns a tuple containing the success status and the response dictionary. Otherwise, it returns only the success status. This change allows callers of the send_files method to access the response data if needed.

Update certbot-deploy script to use custom reload timeout
This commit is contained in:
Théophile Diot 2024-10-24 16:27:07 +02:00
parent 61f8b834eb
commit 8cf603526d
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
4 changed files with 91 additions and 47 deletions

View file

@ -37,6 +37,14 @@ try:
instances = db.get_instances()
services = db.get_non_default_settings(global_only=True, methods=False, with_drafts=True, filtered_settings=("SERVER_NAME"))["SERVER_NAME"].split(" ")
reload_min_timeout = getenv("RELOAD_MIN_TIMEOUT", "5")
if not reload_min_timeout.isdigit():
LOGGER.error("RELOAD_MIN_TIMEOUT must be an integer, defaulting to 5")
reload_min_timeout = 5
reload_min_timeout = int(reload_min_timeout)
for instance in instances:
endpoint = f"http://{instance['hostname']}:{instance['port']}"
host = instance["server_name"]
@ -53,7 +61,7 @@ try:
LOGGER.info(
f"Successfully sent API request to {api.endpoint}/lets-encrypt/certificates",
)
sent, err, status, resp = api.request("POST", "/reload", timeout=max(5, 2 * len(services)))
sent, err, status, resp = api.request("POST", "/reload", timeout=max(reload_min_timeout, 2 * len(services)))
if not sent:
status = 1
LOGGER.error(f"Can't send API request to {api.endpoint}/reload : {err}")

View file

@ -68,10 +68,13 @@ class ApiCaller:
return ret, responses
def send_files(self, path: str, url: str, timeout=(5, 10)) -> bool:
def send_files(self, path: str, url: str, timeout=(5, 10), response: bool = False) -> Union[bool, Tuple[bool, Optional[Dict[str, Any]]]]:
with BytesIO() as tgz:
with tar_open(mode="w:gz", fileobj=tgz, dereference=True, compresslevel=3) as tf:
tf.add(path, arcname=".")
tgz.seek(0, 0)
files = {"archive.tar.gz": tgz}
return self.send_to_apis("POST", url, files=files, timeout=timeout)[0]
ret = self.send_to_apis("POST", url, files=files, timeout=timeout, response=response)
if response:
return ret[0], ret[1]
return ret[0]

View file

@ -128,7 +128,13 @@ class JobScheduler(ApiCaller):
def __reload(self) -> bool:
self.__logger.info("Reloading nginx...")
reload_success = self.send_to_apis("POST", "/reload", timeout=max(5, 2 * len(self.__env["SERVER_NAME"].split(" "))))[0]
reload_min_timeout = self.__env.get("RELOAD_MIN_TIMEOUT", "5")
if not reload_min_timeout.isdigit():
self.__logger.error("RELOAD_MIN_TIMEOUT must be an integer, defaulting to 5")
reload_min_timeout = 5
reload_success = self.send_to_apis("POST", "/reload", timeout=max(int(reload_min_timeout), 2 * len(self.__env["SERVER_NAME"].split(" "))))[0]
if reload_success:
self.__logger.info("Successfully reloaded nginx")
return True

View file

@ -15,7 +15,7 @@ from stat import S_IEXEC
from subprocess import run as subprocess_run, DEVNULL, STDOUT
from sys import path as sys_path
from tarfile import TarFile, open as tar_open
from threading import Event, Thread
from threading import Event, Lock, Thread
from time import sleep
from traceback import format_exc
from typing import Any, Dict, List, Literal, Optional, Union
@ -39,6 +39,7 @@ from API import API # type: ignore
APPLYING_CHANGES = Event()
RUN = True
SCHEDULER: Optional[JobScheduler] = None
SCHEDULER_LOCK = Lock()
CACHE_PATH = Path(sep, "var", "cache", "bunkerweb")
CACHE_PATH.mkdir(parents=True, exist_ok=True)
@ -93,6 +94,14 @@ HEALTHCHECK_INTERVAL = int(HEALTHCHECK_INTERVAL)
HEALTHCHECK_EVENT = Event()
HEALTHCHECK_LOGGER = setup_logger("Scheduler.Healthcheck", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
RELOAD_MIN_TIMEOUT = getenv("RELOAD_MIN_TIMEOUT", "5")
if not RELOAD_MIN_TIMEOUT.isdigit():
LOGGER.error("RELOAD_MIN_TIMEOUT must be an integer, defaulting to 5")
RELOAD_MIN_TIMEOUT = 5
RELOAD_MIN_TIMEOUT = int(RELOAD_MIN_TIMEOUT)
SLAVE_MODE = getenv("SLAVE_MODE", "no") == "yes"
MASTER_MODE = getenv("MASTER_MODE", "no") == "yes"
@ -154,39 +163,49 @@ def stop(status):
_exit(status)
def send_nginx_configs(sent_path: Path = CONFIG_PATH):
def send_file_to_bunkerweb(file_path: Path, endpoint: str):
assert SCHEDULER is not None, "SCHEDULER is not defined"
LOGGER.info(f"Sending {sent_path} folder ...")
ret = SCHEDULER.send_files(sent_path.as_posix(), "/confs")
if not ret:
LOGGER.error("Sending nginx configs failed, configuration will not work as expected...")
LOGGER.info(f"Sending {file_path} to BunkerWeb instances ...")
success, responses = SCHEDULER.send_files(file_path.as_posix(), endpoint, response=True)
fails = []
for db_instance in SCHEDULER.db.get_instances():
index = -1
with SCHEDULER_LOCK:
for i, api in enumerate(SCHEDULER.apis):
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
index = i
break
def send_nginx_cache(sent_path: Path = CACHE_PATH):
assert SCHEDULER is not None, "SCHEDULER is not defined"
LOGGER.info(f"Sending {sent_path} folder ...")
if not SCHEDULER.send_files(sent_path.as_posix(), "/cache"):
LOGGER.error(f"Error while sending {sent_path} folder")
else:
LOGGER.info(f"Successfully sent {sent_path} folder")
status = responses.get(db_instance["hostname"], {"status": "down"}).get("status", "down")
if status == "success":
success = True
elif index != -1:
fails.append(db_instance["hostname"])
ret = SCHEDULER.db.update_instance(db_instance["hostname"], "up" if status == "success" else "down")
if ret:
LOGGER.error(f"Couldn't update instance {db_instance['hostname']} status to down in the database: {ret}")
def send_nginx_custom_configs(sent_path: Path = CUSTOM_CONFIGS_PATH):
assert SCHEDULER is not None, "SCHEDULER is not defined"
LOGGER.info(f"Sending {sent_path} folder ...")
if not SCHEDULER.send_files(sent_path.as_posix(), "/custom_configs"):
LOGGER.error(f"Error while sending {sent_path} folder")
else:
LOGGER.info(f"Successfully sent {sent_path} folder")
if status == "success":
if index == -1:
with SCHEDULER_LOCK:
LOGGER.debug(f"Adding {db_instance['hostname']}:{db_instance['port']} to the list of reachable instances")
SCHEDULER.apis.append(API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"]))
continue
with SCHEDULER_LOCK:
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
LOGGER.debug(f"Removing {db_instance['hostname']}:{db_instance['port']} from the list of reachable instances")
del SCHEDULER.apis[index]
def send_nginx_external_plugins(sent_path: Path = EXTERNAL_PLUGINS_PATH):
assert SCHEDULER is not None, "SCHEDULER is not defined"
LOGGER.info(f"Sending {sent_path} folder ...")
if not SCHEDULER.send_files(sent_path.as_posix(), "/pro_plugins" if sent_path.as_posix().endswith("/pro/plugins") else "/plugins"):
LOGGER.error(f"Error while sending {sent_path} folder")
else:
LOGGER.info(f"Successfully sent {sent_path} folder")
if not success:
LOGGER.error(f"Error while sending {file_path} to BunkerWeb instances")
return
elif not fails:
LOGGER.info(f"Successfully sent {file_path} folder to reachable BunkerWeb instances")
return
LOGGER.warning(f"Error while sending {file_path} to some BunkerWeb instances, removing them from the list of reachable instances: {', '.join(fails)}")
def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, original_path: Union[Path, str] = CUSTOM_CONFIGS_PATH):
@ -233,7 +252,7 @@ def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, o
)
if SCHEDULER and SCHEDULER.apis:
send_nginx_custom_configs(original_path)
send_file_to_bunkerweb(original_path, "/custom_configs")
def generate_external_plugins(original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
@ -298,7 +317,7 @@ def generate_external_plugins(original_path: Union[Path, str] = EXTERNAL_PLUGINS
if SCHEDULER and SCHEDULER.apis:
LOGGER.info(f"Sending {'pro ' if pro else ''}external plugins to BunkerWeb")
send_nginx_external_plugins(original_path)
send_file_to_bunkerweb(original_path, "/pro_plugins" if original_path.as_posix().endswith("/pro/plugins") else "/plugins")
def generate_caches():
@ -379,10 +398,10 @@ def run_in_slave_mode(): # TODO: Refactor this feature
env["TZ"] = tz
# Instantiate scheduler environment
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice"))}
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice")), "RELOAD_MIN_TIMEOUT": str(RELOAD_MIN_TIMEOUT)}
generate_custom_configs()
threads = [
Thread(target=generate_custom_configs),
Thread(target=generate_external_plugins),
Thread(target=generate_external_plugins, args=(PRO_PLUGINS_PATH,)),
Thread(target=generate_caches),
@ -586,7 +605,7 @@ if __name__ == "__main__":
env["TZ"] = tz
# Instantiate scheduler environment
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice"))}
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice")), "RELOAD_MIN_TIMEOUT": str(RELOAD_MIN_TIMEOUT)}
threads = []
@ -641,8 +660,6 @@ if __name__ == "__main__":
generate_custom_configs(SCHEDULER.db.get_custom_configs())
threads.append(Thread(target=check_configs_changes))
def check_plugin_changes(_type: Literal["external", "pro"] = "external"):
# Check if any external or pro plugin has been added by the user
assert SCHEDULER is not None, "SCHEDULER is not defined"
@ -697,10 +714,11 @@ if __name__ == "__main__":
except BaseException as e:
LOGGER.error(f"Error while saving {_type} plugins to database: {e}")
else:
return send_nginx_external_plugins(plugin_path)
return send_file_to_bunkerweb(plugin_path, "/pro_plugins" if _type == "pro" else "/plugins")
generate_external_plugins(plugin_path)
check_configs_changes()
threads.extend([Thread(target=check_plugin_changes, args=("external",)), Thread(target=check_plugin_changes, args=("pro",))])
for thread in threads:
@ -775,7 +793,13 @@ if __name__ == "__main__":
if RUN_JOBS_ONCE:
# Only run jobs once
if not SCHEDULER.reload(
env | {"TZ": getenv("TZ", "UTC"), "LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice"))}, changed_plugins=changed_plugins
env
| {
"TZ": getenv("TZ", "UTC"),
"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice")),
"RELOAD_MIN_TIMEOUT": str(RELOAD_MIN_TIMEOUT),
},
changed_plugins=changed_plugins,
):
LOGGER.error("At least one job in run_once() failed")
else:
@ -815,7 +839,7 @@ if __name__ == "__main__":
if SCHEDULER.apis:
# send nginx configs
threads.append(Thread(target=send_nginx_configs))
threads.append(Thread(target=send_file_to_bunkerweb, args=(CONFIG_PATH, "/confs")))
threads[-1].start()
try:
@ -823,13 +847,15 @@ if __name__ == "__main__":
reachable = True
if SCHEDULER.apis:
# send cache
threads.append(Thread(target=send_nginx_cache))
threads.append(Thread(target=send_file_to_bunkerweb, args=(CACHE_PATH, "/cache")))
threads[-1].start()
for thread in threads:
thread.join()
success, responses = SCHEDULER.send_to_apis("POST", "/reload", timeout=max(5, 2 * len(env["SERVER_NAME"].split(" "))), response=True)
success, responses = SCHEDULER.send_to_apis(
"POST", "/reload", timeout=max(RELOAD_MIN_TIMEOUT, 2 * len(env["SERVER_NAME"].split(" "))), response=True
)
if not success:
reachable = False
LOGGER.debug(f"Error while reloading all bunkerweb instances: {responses}")
@ -852,6 +878,7 @@ if __name__ == "__main__":
LOGGER.debug(f"Adding {db_instance['hostname']}:{db_instance['port']} to the list of reachable instances")
SCHEDULER.apis.append(API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"]))
continue
for i, api in enumerate(SCHEDULER.apis):
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
LOGGER.debug(f"Removing {db_instance['hostname']}:{db_instance['port']} from the list of reachable instances")
@ -883,9 +910,9 @@ if __name__ == "__main__":
# Failover to last working configuration
if SCHEDULER.apis:
tmp_threads = [
Thread(target=send_nginx_configs, args=(FAILOVER_PATH.joinpath("config"),)),
Thread(target=send_nginx_cache, args=(FAILOVER_PATH.joinpath("cache"),)),
Thread(target=send_nginx_custom_configs, args=(FAILOVER_PATH.joinpath("custom_configs"),)),
Thread(target=send_file_to_bunkerweb, args=(FAILOVER_PATH.joinpath("config"), "/confs")),
Thread(target=send_file_to_bunkerweb, args=(FAILOVER_PATH.joinpath("cache"), "/cache")),
Thread(target=send_file_to_bunkerweb, args=(FAILOVER_PATH.joinpath("custom_configs"), "/custom_configs")),
]
for thread in tmp_threads:
thread.start()
@ -893,7 +920,7 @@ if __name__ == "__main__":
for thread in tmp_threads:
thread.join()
if not SCHEDULER.send_to_apis("POST", "/reload", timeout=max(5, 2 * len(env["SERVER_NAME"].split(" "))))[0]:
if not SCHEDULER.send_to_apis("POST", "/reload", timeout=max(RELOAD_MIN_TIMEOUT, 2 * len(env["SERVER_NAME"].split(" "))))[0]:
LOGGER.error("Error while reloading bunkerweb with failover configuration, skipping ...")
elif not reachable:
LOGGER.warning("No BunkerWeb instance is reachable, skipping failover ...")