mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Refactor JobScheduler reload method to use custom timeout
The custom timeout is determined by the value of the RELOAD_MIN_TIMEOUT environment variable, which is set to 5 seconds by default. If the value is not an integer, an error is logged and the timeout defaults to 5 seconds. The reload method now uses the maximum of the custom timeout and twice the length of the SERVER_NAME list to set the timeout for the API call to "/reload". Refactor ApiCaller send_files method to support response option If the response parameter is set to True, the method returns a tuple containing the success status and the response dictionary. Otherwise, it returns only the success status. This change allows callers of the send_files method to access the response data if needed. Update certbot-deploy script to use custom reload timeout
This commit is contained in:
parent
61f8b834eb
commit
8cf603526d
4 changed files with 91 additions and 47 deletions
|
|
@ -37,6 +37,14 @@ try:
|
|||
instances = db.get_instances()
|
||||
services = db.get_non_default_settings(global_only=True, methods=False, with_drafts=True, filtered_settings=("SERVER_NAME"))["SERVER_NAME"].split(" ")
|
||||
|
||||
reload_min_timeout = getenv("RELOAD_MIN_TIMEOUT", "5")
|
||||
|
||||
if not reload_min_timeout.isdigit():
|
||||
LOGGER.error("RELOAD_MIN_TIMEOUT must be an integer, defaulting to 5")
|
||||
reload_min_timeout = 5
|
||||
|
||||
reload_min_timeout = int(reload_min_timeout)
|
||||
|
||||
for instance in instances:
|
||||
endpoint = f"http://{instance['hostname']}:{instance['port']}"
|
||||
host = instance["server_name"]
|
||||
|
|
@ -53,7 +61,7 @@ try:
|
|||
LOGGER.info(
|
||||
f"Successfully sent API request to {api.endpoint}/lets-encrypt/certificates",
|
||||
)
|
||||
sent, err, status, resp = api.request("POST", "/reload", timeout=max(5, 2 * len(services)))
|
||||
sent, err, status, resp = api.request("POST", "/reload", timeout=max(reload_min_timeout, 2 * len(services)))
|
||||
if not sent:
|
||||
status = 1
|
||||
LOGGER.error(f"Can't send API request to {api.endpoint}/reload : {err}")
|
||||
|
|
|
|||
|
|
@ -68,10 +68,13 @@ class ApiCaller:
|
|||
|
||||
return ret, responses
|
||||
|
||||
def send_files(self, path: str, url: str, timeout=(5, 10)) -> bool:
|
||||
def send_files(self, path: str, url: str, timeout=(5, 10), response: bool = False) -> Union[bool, Tuple[bool, Optional[Dict[str, Any]]]]:
|
||||
with BytesIO() as tgz:
|
||||
with tar_open(mode="w:gz", fileobj=tgz, dereference=True, compresslevel=3) as tf:
|
||||
tf.add(path, arcname=".")
|
||||
tgz.seek(0, 0)
|
||||
files = {"archive.tar.gz": tgz}
|
||||
return self.send_to_apis("POST", url, files=files, timeout=timeout)[0]
|
||||
ret = self.send_to_apis("POST", url, files=files, timeout=timeout, response=response)
|
||||
if response:
|
||||
return ret[0], ret[1]
|
||||
return ret[0]
|
||||
|
|
|
|||
|
|
@ -128,7 +128,13 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
def __reload(self) -> bool:
|
||||
self.__logger.info("Reloading nginx...")
|
||||
reload_success = self.send_to_apis("POST", "/reload", timeout=max(5, 2 * len(self.__env["SERVER_NAME"].split(" "))))[0]
|
||||
reload_min_timeout = self.__env.get("RELOAD_MIN_TIMEOUT", "5")
|
||||
|
||||
if not reload_min_timeout.isdigit():
|
||||
self.__logger.error("RELOAD_MIN_TIMEOUT must be an integer, defaulting to 5")
|
||||
reload_min_timeout = 5
|
||||
|
||||
reload_success = self.send_to_apis("POST", "/reload", timeout=max(int(reload_min_timeout), 2 * len(self.__env["SERVER_NAME"].split(" "))))[0]
|
||||
if reload_success:
|
||||
self.__logger.info("Successfully reloaded nginx")
|
||||
return True
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ from stat import S_IEXEC
|
|||
from subprocess import run as subprocess_run, DEVNULL, STDOUT
|
||||
from sys import path as sys_path
|
||||
from tarfile import TarFile, open as tar_open
|
||||
from threading import Event, Thread
|
||||
from threading import Event, Lock, Thread
|
||||
from time import sleep
|
||||
from traceback import format_exc
|
||||
from typing import Any, Dict, List, Literal, Optional, Union
|
||||
|
|
@ -39,6 +39,7 @@ from API import API # type: ignore
|
|||
APPLYING_CHANGES = Event()
|
||||
RUN = True
|
||||
SCHEDULER: Optional[JobScheduler] = None
|
||||
SCHEDULER_LOCK = Lock()
|
||||
|
||||
CACHE_PATH = Path(sep, "var", "cache", "bunkerweb")
|
||||
CACHE_PATH.mkdir(parents=True, exist_ok=True)
|
||||
|
|
@ -93,6 +94,14 @@ HEALTHCHECK_INTERVAL = int(HEALTHCHECK_INTERVAL)
|
|||
HEALTHCHECK_EVENT = Event()
|
||||
HEALTHCHECK_LOGGER = setup_logger("Scheduler.Healthcheck", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
|
||||
|
||||
RELOAD_MIN_TIMEOUT = getenv("RELOAD_MIN_TIMEOUT", "5")
|
||||
|
||||
if not RELOAD_MIN_TIMEOUT.isdigit():
|
||||
LOGGER.error("RELOAD_MIN_TIMEOUT must be an integer, defaulting to 5")
|
||||
RELOAD_MIN_TIMEOUT = 5
|
||||
|
||||
RELOAD_MIN_TIMEOUT = int(RELOAD_MIN_TIMEOUT)
|
||||
|
||||
SLAVE_MODE = getenv("SLAVE_MODE", "no") == "yes"
|
||||
MASTER_MODE = getenv("MASTER_MODE", "no") == "yes"
|
||||
|
||||
|
|
@ -154,39 +163,49 @@ def stop(status):
|
|||
_exit(status)
|
||||
|
||||
|
||||
def send_nginx_configs(sent_path: Path = CONFIG_PATH):
|
||||
def send_file_to_bunkerweb(file_path: Path, endpoint: str):
|
||||
assert SCHEDULER is not None, "SCHEDULER is not defined"
|
||||
LOGGER.info(f"Sending {sent_path} folder ...")
|
||||
ret = SCHEDULER.send_files(sent_path.as_posix(), "/confs")
|
||||
if not ret:
|
||||
LOGGER.error("Sending nginx configs failed, configuration will not work as expected...")
|
||||
LOGGER.info(f"Sending {file_path} to BunkerWeb instances ...")
|
||||
success, responses = SCHEDULER.send_files(file_path.as_posix(), endpoint, response=True)
|
||||
fails = []
|
||||
|
||||
for db_instance in SCHEDULER.db.get_instances():
|
||||
index = -1
|
||||
with SCHEDULER_LOCK:
|
||||
for i, api in enumerate(SCHEDULER.apis):
|
||||
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
|
||||
index = i
|
||||
break
|
||||
|
||||
def send_nginx_cache(sent_path: Path = CACHE_PATH):
|
||||
assert SCHEDULER is not None, "SCHEDULER is not defined"
|
||||
LOGGER.info(f"Sending {sent_path} folder ...")
|
||||
if not SCHEDULER.send_files(sent_path.as_posix(), "/cache"):
|
||||
LOGGER.error(f"Error while sending {sent_path} folder")
|
||||
else:
|
||||
LOGGER.info(f"Successfully sent {sent_path} folder")
|
||||
status = responses.get(db_instance["hostname"], {"status": "down"}).get("status", "down")
|
||||
if status == "success":
|
||||
success = True
|
||||
elif index != -1:
|
||||
fails.append(db_instance["hostname"])
|
||||
|
||||
ret = SCHEDULER.db.update_instance(db_instance["hostname"], "up" if status == "success" else "down")
|
||||
if ret:
|
||||
LOGGER.error(f"Couldn't update instance {db_instance['hostname']} status to down in the database: {ret}")
|
||||
|
||||
def send_nginx_custom_configs(sent_path: Path = CUSTOM_CONFIGS_PATH):
|
||||
assert SCHEDULER is not None, "SCHEDULER is not defined"
|
||||
LOGGER.info(f"Sending {sent_path} folder ...")
|
||||
if not SCHEDULER.send_files(sent_path.as_posix(), "/custom_configs"):
|
||||
LOGGER.error(f"Error while sending {sent_path} folder")
|
||||
else:
|
||||
LOGGER.info(f"Successfully sent {sent_path} folder")
|
||||
if status == "success":
|
||||
if index == -1:
|
||||
with SCHEDULER_LOCK:
|
||||
LOGGER.debug(f"Adding {db_instance['hostname']}:{db_instance['port']} to the list of reachable instances")
|
||||
SCHEDULER.apis.append(API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"]))
|
||||
continue
|
||||
|
||||
with SCHEDULER_LOCK:
|
||||
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
|
||||
LOGGER.debug(f"Removing {db_instance['hostname']}:{db_instance['port']} from the list of reachable instances")
|
||||
del SCHEDULER.apis[index]
|
||||
|
||||
def send_nginx_external_plugins(sent_path: Path = EXTERNAL_PLUGINS_PATH):
|
||||
assert SCHEDULER is not None, "SCHEDULER is not defined"
|
||||
LOGGER.info(f"Sending {sent_path} folder ...")
|
||||
if not SCHEDULER.send_files(sent_path.as_posix(), "/pro_plugins" if sent_path.as_posix().endswith("/pro/plugins") else "/plugins"):
|
||||
LOGGER.error(f"Error while sending {sent_path} folder")
|
||||
else:
|
||||
LOGGER.info(f"Successfully sent {sent_path} folder")
|
||||
if not success:
|
||||
LOGGER.error(f"Error while sending {file_path} to BunkerWeb instances")
|
||||
return
|
||||
elif not fails:
|
||||
LOGGER.info(f"Successfully sent {file_path} folder to reachable BunkerWeb instances")
|
||||
return
|
||||
LOGGER.warning(f"Error while sending {file_path} to some BunkerWeb instances, removing them from the list of reachable instances: {', '.join(fails)}")
|
||||
|
||||
|
||||
def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, original_path: Union[Path, str] = CUSTOM_CONFIGS_PATH):
|
||||
|
|
@ -233,7 +252,7 @@ def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, o
|
|||
)
|
||||
|
||||
if SCHEDULER and SCHEDULER.apis:
|
||||
send_nginx_custom_configs(original_path)
|
||||
send_file_to_bunkerweb(original_path, "/custom_configs")
|
||||
|
||||
|
||||
def generate_external_plugins(original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
|
||||
|
|
@ -298,7 +317,7 @@ def generate_external_plugins(original_path: Union[Path, str] = EXTERNAL_PLUGINS
|
|||
|
||||
if SCHEDULER and SCHEDULER.apis:
|
||||
LOGGER.info(f"Sending {'pro ' if pro else ''}external plugins to BunkerWeb")
|
||||
send_nginx_external_plugins(original_path)
|
||||
send_file_to_bunkerweb(original_path, "/pro_plugins" if original_path.as_posix().endswith("/pro/plugins") else "/plugins")
|
||||
|
||||
|
||||
def generate_caches():
|
||||
|
|
@ -379,10 +398,10 @@ def run_in_slave_mode(): # TODO: Refactor this feature
|
|||
env["TZ"] = tz
|
||||
|
||||
# Instantiate scheduler environment
|
||||
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice"))}
|
||||
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice")), "RELOAD_MIN_TIMEOUT": str(RELOAD_MIN_TIMEOUT)}
|
||||
|
||||
generate_custom_configs()
|
||||
threads = [
|
||||
Thread(target=generate_custom_configs),
|
||||
Thread(target=generate_external_plugins),
|
||||
Thread(target=generate_external_plugins, args=(PRO_PLUGINS_PATH,)),
|
||||
Thread(target=generate_caches),
|
||||
|
|
@ -586,7 +605,7 @@ if __name__ == "__main__":
|
|||
env["TZ"] = tz
|
||||
|
||||
# Instantiate scheduler environment
|
||||
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice"))}
|
||||
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice")), "RELOAD_MIN_TIMEOUT": str(RELOAD_MIN_TIMEOUT)}
|
||||
|
||||
threads = []
|
||||
|
||||
|
|
@ -641,8 +660,6 @@ if __name__ == "__main__":
|
|||
|
||||
generate_custom_configs(SCHEDULER.db.get_custom_configs())
|
||||
|
||||
threads.append(Thread(target=check_configs_changes))
|
||||
|
||||
def check_plugin_changes(_type: Literal["external", "pro"] = "external"):
|
||||
# Check if any external or pro plugin has been added by the user
|
||||
assert SCHEDULER is not None, "SCHEDULER is not defined"
|
||||
|
|
@ -697,10 +714,11 @@ if __name__ == "__main__":
|
|||
except BaseException as e:
|
||||
LOGGER.error(f"Error while saving {_type} plugins to database: {e}")
|
||||
else:
|
||||
return send_nginx_external_plugins(plugin_path)
|
||||
return send_file_to_bunkerweb(plugin_path, "/pro_plugins" if _type == "pro" else "/plugins")
|
||||
|
||||
generate_external_plugins(plugin_path)
|
||||
|
||||
check_configs_changes()
|
||||
threads.extend([Thread(target=check_plugin_changes, args=("external",)), Thread(target=check_plugin_changes, args=("pro",))])
|
||||
|
||||
for thread in threads:
|
||||
|
|
@ -775,7 +793,13 @@ if __name__ == "__main__":
|
|||
if RUN_JOBS_ONCE:
|
||||
# Only run jobs once
|
||||
if not SCHEDULER.reload(
|
||||
env | {"TZ": getenv("TZ", "UTC"), "LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice"))}, changed_plugins=changed_plugins
|
||||
env
|
||||
| {
|
||||
"TZ": getenv("TZ", "UTC"),
|
||||
"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice")),
|
||||
"RELOAD_MIN_TIMEOUT": str(RELOAD_MIN_TIMEOUT),
|
||||
},
|
||||
changed_plugins=changed_plugins,
|
||||
):
|
||||
LOGGER.error("At least one job in run_once() failed")
|
||||
else:
|
||||
|
|
@ -815,7 +839,7 @@ if __name__ == "__main__":
|
|||
|
||||
if SCHEDULER.apis:
|
||||
# send nginx configs
|
||||
threads.append(Thread(target=send_nginx_configs))
|
||||
threads.append(Thread(target=send_file_to_bunkerweb, args=(CONFIG_PATH, "/confs")))
|
||||
threads[-1].start()
|
||||
|
||||
try:
|
||||
|
|
@ -823,13 +847,15 @@ if __name__ == "__main__":
|
|||
reachable = True
|
||||
if SCHEDULER.apis:
|
||||
# send cache
|
||||
threads.append(Thread(target=send_nginx_cache))
|
||||
threads.append(Thread(target=send_file_to_bunkerweb, args=(CACHE_PATH, "/cache")))
|
||||
threads[-1].start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
success, responses = SCHEDULER.send_to_apis("POST", "/reload", timeout=max(5, 2 * len(env["SERVER_NAME"].split(" "))), response=True)
|
||||
success, responses = SCHEDULER.send_to_apis(
|
||||
"POST", "/reload", timeout=max(RELOAD_MIN_TIMEOUT, 2 * len(env["SERVER_NAME"].split(" "))), response=True
|
||||
)
|
||||
if not success:
|
||||
reachable = False
|
||||
LOGGER.debug(f"Error while reloading all bunkerweb instances: {responses}")
|
||||
|
|
@ -852,6 +878,7 @@ if __name__ == "__main__":
|
|||
LOGGER.debug(f"Adding {db_instance['hostname']}:{db_instance['port']} to the list of reachable instances")
|
||||
SCHEDULER.apis.append(API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"]))
|
||||
continue
|
||||
|
||||
for i, api in enumerate(SCHEDULER.apis):
|
||||
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
|
||||
LOGGER.debug(f"Removing {db_instance['hostname']}:{db_instance['port']} from the list of reachable instances")
|
||||
|
|
@ -883,9 +910,9 @@ if __name__ == "__main__":
|
|||
# Failover to last working configuration
|
||||
if SCHEDULER.apis:
|
||||
tmp_threads = [
|
||||
Thread(target=send_nginx_configs, args=(FAILOVER_PATH.joinpath("config"),)),
|
||||
Thread(target=send_nginx_cache, args=(FAILOVER_PATH.joinpath("cache"),)),
|
||||
Thread(target=send_nginx_custom_configs, args=(FAILOVER_PATH.joinpath("custom_configs"),)),
|
||||
Thread(target=send_file_to_bunkerweb, args=(FAILOVER_PATH.joinpath("config"), "/confs")),
|
||||
Thread(target=send_file_to_bunkerweb, args=(FAILOVER_PATH.joinpath("cache"), "/cache")),
|
||||
Thread(target=send_file_to_bunkerweb, args=(FAILOVER_PATH.joinpath("custom_configs"), "/custom_configs")),
|
||||
]
|
||||
for thread in tmp_threads:
|
||||
thread.start()
|
||||
|
|
@ -893,7 +920,7 @@ if __name__ == "__main__":
|
|||
for thread in tmp_threads:
|
||||
thread.join()
|
||||
|
||||
if not SCHEDULER.send_to_apis("POST", "/reload", timeout=max(5, 2 * len(env["SERVER_NAME"].split(" "))))[0]:
|
||||
if not SCHEDULER.send_to_apis("POST", "/reload", timeout=max(RELOAD_MIN_TIMEOUT, 2 * len(env["SERVER_NAME"].split(" "))))[0]:
|
||||
LOGGER.error("Error while reloading bunkerweb with failover configuration, skipping ...")
|
||||
elif not reachable:
|
||||
LOGGER.warning("No BunkerWeb instance is reachable, skipping failover ...")
|
||||
|
|
|
|||
Loading…
Reference in a new issue