Add failover logic to scheduler and web UI

This commit is contained in:
Théophile Diot 2024-06-12 17:42:34 +02:00
parent ffd0e6a5ea
commit 0d3ae939c0
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
7 changed files with 203 additions and 68 deletions

View file

@ -0,0 +1,25 @@
#!/usr/bin/env python3
from os import getenv, sep
from os.path import join
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]:
if deps_path not in sys_path:
sys_path.append(deps_path)
from jobs import Job # type: ignore
from logger import setup_logger # type: ignore
LOGGER = setup_logger("FAILOVER-BACKUP", getenv("LOG_LEVEL", "INFO"))
status = 0
try:
# Restoring the backup failover configuration
JOB = Job(LOGGER)
except:
status = 2
LOGGER.error(f"Exception while running failover-backup.py :\n{format_exc()}")
sys_exit(status)

View file

@ -23,6 +23,12 @@
"file": "update-check.py",
"every": "day",
"reload": false
},
{
"name": "failover-backup",
"file": "failover-backup.py",
"every": "once",
"reload": false
}
]
}

View file

@ -400,6 +400,25 @@ class Database:
except (ProgrammingError, OperationalError):
return False
def set_failover(self, value: bool = True) -> str:
"""Set the failover value"""
with self.__db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
try:
metadata = session.query(Metadata).get(1)
if not metadata:
return "The metadata are not set yet, try again"
metadata.failover = value
session.commit()
except BaseException as e:
return str(e)
return ""
def initialize_db(self, version: str, integration: str = "Unknown") -> str:
"""Initialize the database"""
with self.__db_session() as session:
@ -428,7 +447,7 @@ class Database:
return ""
def get_metadata(self) -> Dict[str, str]:
def get_metadata(self) -> Dict[str, Any]:
"""Get the metadata from the database"""
data = {
"version": "1.5.8",
@ -440,6 +459,7 @@ class Database:
"pro_overlapped": False,
"pro_status": "invalid",
"last_pro_check": None,
"failover": False,
"default": True,
}
with self.__db_session() as session:
@ -459,6 +479,7 @@ class Database:
Metadata.pro_overlapped,
Metadata.pro_status,
Metadata.last_pro_check,
Metadata.failover,
)
.filter_by(id=1)
.first()
@ -474,6 +495,7 @@ class Database:
"pro_overlapped": metadata.pro_overlapped,
"pro_status": metadata.pro_status,
"last_pro_check": metadata.last_pro_check,
"failover": metadata.failover,
"default": False,
}
)

View file

@ -250,5 +250,6 @@ class Metadata(Base):
last_pro_plugins_change = Column(DateTime, nullable=True)
instances_changed = Column(Boolean, default=False, nullable=True)
last_instances_change = Column(DateTime, nullable=True)
failover = Column(Boolean, default=None, nullable=True)
integration = Column(INTEGRATIONS_ENUM, default="Unknown", nullable=False)
version = Column(String(32), default="1.5.8", nullable=False)

View file

@ -9,7 +9,7 @@ from json import load as json_load
from os import _exit, environ, getenv, getpid, sep
from os.path import join
from pathlib import Path
from shutil import copy, rmtree
from shutil import copy, rmtree, copytree
from signal import SIGINT, SIGTERM, signal, SIGHUP
from stat import S_IEXEC
from subprocess import run as subprocess_run, DEVNULL, STDOUT, PIPE
@ -30,13 +30,14 @@ from common_utils import bytes_hash, dict_to_frozenset, get_integration # type:
from logger import setup_logger # type: ignore
from Database import Database # type: ignore
from JobScheduler import JobScheduler
from API import API
from jobs import Job # type: ignore
from API import API # type: ignore
RUN = True
SCHEDULER: Optional[JobScheduler] = None
CACHE_PATH = join(sep, "var", "cache", "bunkerweb")
Path(CACHE_PATH).mkdir(parents=True, exist_ok=True)
CACHE_PATH = Path(sep, "var", "cache", "bunkerweb")
CACHE_PATH.mkdir(parents=True, exist_ok=True)
CUSTOM_CONFIGS_PATH = Path(sep, "etc", "bunkerweb", "configs")
CUSTOM_CONFIGS_PATH.mkdir(parents=True, exist_ok=True)
@ -54,6 +55,8 @@ CUSTOM_CONFIGS_DIRS = (
for custom_config_dir in CUSTOM_CONFIGS_DIRS:
CUSTOM_CONFIGS_PATH.joinpath(custom_config_dir).mkdir(parents=True, exist_ok=True)
CONFIG_PATH = Path(sep, "etc", "nginx")
EXTERNAL_PLUGINS_PATH = Path(sep, "etc", "bunkerweb", "plugins")
EXTERNAL_PLUGINS_PATH.mkdir(parents=True, exist_ok=True)
@ -63,6 +66,9 @@ PRO_PLUGINS_PATH.mkdir(parents=True, exist_ok=True)
TMP_PATH = Path(sep, "var", "tmp", "bunkerweb")
TMP_PATH.mkdir(parents=True, exist_ok=True)
FAILOVER_PATH = TMP_PATH.joinpath("failover")
FAILOVER_PATH.mkdir(parents=True, exist_ok=True)
HEALTHY_PATH = TMP_PATH.joinpath("scheduler.healthy")
SCHEDULER_TMP_ENV_PATH = TMP_PATH.joinpath("scheduler.env")
@ -124,6 +130,56 @@ def stop(status):
_exit(status)
def send_nginx_configs(sent_path: Path = CONFIG_PATH):
assert SCHEDULER is not None, "SCHEDULER is not defined"
logger.info(f"Sending {sent_path} folder ...")
ret = SCHEDULER.send_files(sent_path.as_posix(), "/confs")
if not ret:
logger.error("Sending nginx configs failed, configuration will not work as expected...")
def send_nginx_cache(sent_path: Path = CACHE_PATH):
assert SCHEDULER is not None, "SCHEDULER is not defined"
logger.info(f"Sending {sent_path} folder ...")
if not SCHEDULER.send_files(sent_path.as_posix(), "/cache"):
logger.error(f"Error while sending {sent_path} folder")
else:
logger.info(f"Successfully sent {sent_path} folder")
def send_nginx_custom_configs(sent_path: Path = CUSTOM_CONFIGS_PATH):
assert SCHEDULER is not None, "SCHEDULER is not defined"
logger.info(f"Sending {sent_path} folder ...")
if not SCHEDULER.send_files(sent_path.as_posix(), "/custom_configs"):
logger.error(f"Error while sending {sent_path} folder")
else:
logger.info(f"Successfully sent {sent_path} folder")
def listen_for_instances_reload():
from docker import DockerClient
global SCHEDULER
assert SCHEDULER is not None, "SCHEDULER is not defined"
docker_client = DockerClient(base_url=getenv("DOCKER_HOST", "unix:///var/run/docker.sock"))
for event in docker_client.events(decode=True, filters={"type": "container", "label": "bunkerweb.INSTANCE"}):
if event["Action"] in ("start", "die"):
logger.info(f"🐋 Detected {event['Action']} event on container {event['Actor']['Attributes']['name']}")
SCHEDULER.auto_setup()
try:
ret = SCHEDULER.db.update_instances([api_to_instance(api) for api in SCHEDULER.apis], changed=event["Action"] == "die")
if ret:
logger.error(f"Error while updating instances after {event['Action']} event: {ret}")
continue
if event["Action"] == "start":
ret = SCHEDULER.db.checked_changes(value=True)
if ret:
logger.error(f"Error while setting changes to checked in the database after {event['Action']} event: {ret}")
except BaseException as e:
logger.error(f"Error while updating instances after {event['Action']} event: {e}")
def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, original_path: Union[Path, str] = CUSTOM_CONFIGS_PATH):
if not isinstance(original_path, Path):
original_path = Path(original_path)
@ -168,11 +224,7 @@ def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, o
)
if SCHEDULER and SCHEDULER.apis:
logger.info("Sending custom configs to BunkerWeb")
ret = SCHEDULER.send_files(original_path, "/custom_configs")
if not ret:
logger.error("Sending custom configs failed, configuration will not work as expected...")
send_nginx_custom_configs(original_path)
def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]] = None, *, original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
@ -351,7 +403,7 @@ def run_in_slave_mode():
"--templates",
join(sep, "usr", "share", "bunkerweb", "confs"),
"--output",
join(sep, "etc", "nginx"),
CONFIG_PATH.as_posix(),
"--variables",
str(SCHEDULER_TMP_ENV_PATH),
],
@ -387,11 +439,13 @@ if __name__ == "__main__":
INTEGRATION = get_integration()
tmp_variables_path = Path(args.variables or join(sep, "var", "tmp", "bunkerweb", "variables.env"))
nginx_variables_path = Path(sep, "etc", "nginx", "variables.env")
nginx_variables_path = CONFIG_PATH.joinpath("variables.env")
dotenv_env = dotenv_values(str(tmp_variables_path))
SCHEDULER = JobScheduler(environ, logger, INTEGRATION, db=Database(logger, sqlalchemy_string=dotenv_env.get("DATABASE_URI", getenv("DATABASE_URI", None)))) # type: ignore
JOB = Job(logger, SCHEDULER.db)
if SLAVE_MODE:
run_in_slave_mode()
stop(1)
@ -624,41 +678,6 @@ if __name__ == "__main__":
RUN_JOBS_ONCE = True
CHANGES = []
def send_nginx_configs():
logger.info(f"Sending {join(sep, 'etc', 'nginx')} folder ...")
ret = SCHEDULER.send_files(join(sep, "etc", "nginx"), "/confs")
if not ret:
logger.error("Sending nginx configs failed, configuration will not work as expected...")
def send_nginx_cache():
logger.info(f"Sending {CACHE_PATH} folder ...")
if not SCHEDULER.send_files(CACHE_PATH, "/cache"):
logger.error(f"Error while sending {CACHE_PATH} folder")
else:
logger.info(f"Successfully sent {CACHE_PATH} folder")
def listen_for_instances_reload():
from docker import DockerClient
global SCHEDULER
docker_client = DockerClient(base_url=getenv("DOCKER_HOST", "unix:///var/run/docker.sock"))
for event in docker_client.events(decode=True, filters={"type": "container", "label": "bunkerweb.INSTANCE"}):
if event["Action"] in ("start", "die"):
logger.info(f"🐋 Detected {event['Action']} event on container {event['Actor']['Attributes']['name']}")
SCHEDULER.auto_setup()
try:
ret = SCHEDULER.db.update_instances([api_to_instance(api) for api in SCHEDULER.apis], changed=event["Action"] == "die")
if ret:
logger.error(f"Error while updating instances after {event['Action']} event: {ret}")
continue
if event["Action"] == "start":
ret = SCHEDULER.db.checked_changes(value=True)
if ret:
logger.error(f"Error while setting changes to checked in the database after {event['Action']} event: {ret}")
except BaseException as e:
logger.error(f"Error while updating instances after {event['Action']} event: {e}")
if INTEGRATION == "Docker" and not override_instances:
Thread(target=listen_for_instances_reload, name="listen_for_instances_reload").start()
@ -692,7 +711,7 @@ if __name__ == "__main__":
"--templates",
join(sep, "usr", "share", "bunkerweb", "confs"),
"--output",
join(sep, "etc", "nginx"),
CONFIG_PATH.as_posix(),
"--variables",
str(SCHEDULER_TMP_ENV_PATH),
]
@ -716,6 +735,7 @@ if __name__ == "__main__":
logger.warning("No BunkerWeb instance found, skipping nginx configs sending ...")
try:
failed = False
if SCHEDULER.apis:
# send cache
thread = Thread(target=send_nginx_cache)
@ -725,10 +745,7 @@ if __name__ == "__main__":
for thread in threads:
thread.join()
if SCHEDULER.send_to_apis("POST", "/reload"):
logger.info("Successfully reloaded nginx")
else:
logger.error("Error while reloading nginx")
failed = not SCHEDULER.send_to_apis("POST", "/reload")
elif INTEGRATION == "Linux":
# Reload nginx
logger.info("Reloading nginx ...")
@ -740,14 +757,47 @@ if __name__ == "__main__":
check=False,
stdout=PIPE,
)
if proc.returncode == 0:
logger.info("Successfully sent reload signal to nginx")
else:
logger.error(
f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stdout.decode('utf-8') if proc.stdout else 'no output'}"
)
failed = proc.returncode != 0
else:
logger.warning("No BunkerWeb instance found, skipping nginx reload ...")
logger.warning("No BunkerWeb instance found, skipping bunkerweb reload ...")
try:
SCHEDULER.db.set_failover(failed)
except BaseException as e:
logger.error(f"Error while setting failover to true in the database: {e}")
if failed:
logger.error("Error while reloading bunkerweb, failing over to last working configuration ...")
if (
not FAILOVER_PATH.joinpath("config").is_dir()
or not FAILOVER_PATH.joinpath("custom_configs").is_dir()
or not FAILOVER_PATH.joinpath("cache").is_dir()
):
logger.error("No failover configuration found, ignoring failover ...")
else:
# Failover to last working configuration
if SCHEDULER.apis:
tmp_threads = [
Thread(target=send_nginx_configs, args=(FAILOVER_PATH.joinpath("config"),)),
Thread(target=send_nginx_cache, args=(FAILOVER_PATH.joinpath("cache"),)),
Thread(target=send_nginx_custom_configs, args=(FAILOVER_PATH.joinpath("custom_configs"),)),
]
for thread in tmp_threads:
thread.start()
for thread in tmp_threads:
thread.join()
SCHEDULER.send_to_apis("POST", "/reload")
else:
logger.info("Successfully reloaded bunkerweb")
# Update the failover path with the working configuration
rmtree(FAILOVER_PATH, ignore_errors=True)
FAILOVER_PATH.mkdir(parents=True, exist_ok=True)
copytree(CONFIG_PATH, FAILOVER_PATH.joinpath("config"))
copytree(CUSTOM_CONFIGS_PATH, FAILOVER_PATH.joinpath("custom_configs"))
copytree(CACHE_PATH, FAILOVER_PATH.joinpath("cache"))
Thread(target=JOB.cache_dir, args=(FAILOVER_PATH,), kwargs={"job_name": "failover-backup"}).start()
except:
logger.error(f"Exception while reloading after running jobs once scheduling : {format_exc()}")

View file

@ -232,7 +232,7 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
elif operation == "restart":
operation = app.config["INSTANCES"].restart_instance(args[0])
elif not error:
operation = "The scheduler will be in charge of reloading the instances."
operation = "The scheduler will be in charge of applying the changes."
if operation:
if isinstance(operation, list):
@ -380,13 +380,24 @@ def inject_variables():
ui_data = get_ui_data()
metadata = app.config["DB"].get_metadata()
curr_changes = app.config["DB"].check_changes()
changes_ongoing = any(app.config["DB"].check_changes().values())
if ui_data.get("PRO_LOADING") and not any(curr_changes.values()):
if ui_data.get("PRO_LOADING") and not changes_ongoing:
ui_data["PRO_LOADING"] = False
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
if not changes_ongoing and metadata["failover"]:
flash(
"The last changes could not be applied because it creates a configuration error on NGINX, please check the logs for more information. The configured fell back to the last working one.",
"error",
)
elif not changes_ongoing and not metadata["failover"] and ui_data.get("CONFIG_CHANGED", False):
flash("The last changes have been applied successfully.", "success")
ui_data["CONFIG_CHANGED"] = False
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
# check that is value is in tuple
return dict(
script_nonce=app.config["SCRIPT_NONCE"],
@ -769,6 +780,7 @@ def account():
ui_data = get_ui_data()
ui_data["PRO_LOADING"] = True
ui_data["CONFIG_CHANGED"] = True
if any(curr_changes.values()):
ui_data["RELOADING"] = True
@ -1056,17 +1068,20 @@ def services():
threaded=threaded,
)
ui_data = get_ui_data()
if any(curr_changes.values()):
ui_data = get_ui_data()
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
Thread(target=update_services, args=(True,)).start()
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
else:
update_services()
ui_data["CONFIG_CHANGED"] = True
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
message = ""
if request.form["operation"] == "new":
@ -1189,6 +1204,8 @@ def global_config():
else:
update_global_config()
ui_data["CONFIG_CHANGED"] = True
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
@ -1312,6 +1329,11 @@ def configs():
app.logger.error(f"Could not save custom configs: {error}")
return redirect_flash_error("Couldn't save custom configs", "configs", True)
ui_data = get_ui_data()
ui_data["CONFIG_CHANGED"] = True
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
flash(operation)
return redirect(url_for("loading", next=url_for("configs")))
@ -1953,7 +1975,11 @@ def logs_linux():
error_type = (
"error"
if "[error]" in log_lower or "[crit]" in log_lower or "[alert]" in log_lower or "" in log_lower
else (("warn" if "[warn]" in log_lower or "⚠️" in log_lower else ("info" if "[info]" in log_lower or "" in log_lower else "message")))
else (
"emerg"
if "[emerg]" in log_lower
else ("warn" if "[warn]" in log_lower or "⚠️" in log_lower else ("info" if "[info]" in log_lower or "" in log_lower else "message"))
)
)
logs.append(
@ -2079,7 +2105,11 @@ def logs_container(container_id):
"type": (
"error"
if "[error]" in log_lower or "[crit]" in log_lower or "[alert]" in log_lower or "" in log_lower
else ("warn" if "[warn]" in log_lower or "⚠️" in log_lower else ("info" if "[info]" in log_lower or "" in log_lower else "message"))
else (
"emerg"
if "[emerg]" in log_lower
else ("warn" if "[warn]" in log_lower or "⚠️" in log_lower else ("info" if "[info]" in log_lower or "" in log_lower else "message"))
)
),
}
)

View file

@ -159,6 +159,7 @@
"values": [
"all",
"message",
"emerg",
"error",
"warn",
"info",