From 2d3243ba539fc9181e781fac6374e63ca0ecbde3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Diot?= Date: Fri, 22 Mar 2024 12:07:40 +0000 Subject: [PATCH] Only use the database for custom configs CRUD in the web UI --- src/common/db/Database.py | 96 ++++++++++++----------- src/ui/main.py | 114 +++++++++++++++------------ src/ui/src/ConfigFiles.py | 157 +------------------------------------- 3 files changed, 114 insertions(+), 253 deletions(-) diff --git a/src/common/db/Database.py b/src/common/db/Database.py index d3714067e..b70aa967b 100644 --- a/src/common/db/Database.py +++ b/src/common/db/Database.py @@ -962,7 +962,7 @@ class Database: def save_custom_configs( self, - custom_configs: List[Dict[str, Tuple[str, List[str]]]], + custom_configs: List[Dict[str, Union[str, bytes, Tuple[str, List[str]]]]], method: str, changed: Optional[bool] = True, ) -> str: @@ -975,59 +975,57 @@ class Database: to_put = [] endl = "\n" for custom_config in custom_configs: - config = { - "data": custom_config["value"].encode("utf-8") if isinstance(custom_config["value"], str) else custom_config["value"], - "method": method, + if method != "ui": + config = { + "data": custom_config["value"], + "method": method, + } + assert isinstance(custom_config["exploded"], tuple) and len(custom_config["exploded"]) == 3, "Invalid exploded custom config" + + if custom_config["exploded"][0]: + if not session.query(Services).with_entities(Services.id).filter_by(id=custom_config["exploded"][0]).first(): + message += f"{endl if message else ''}Service {custom_config['exploded'][0]} not found, please check your config" + + config.update( + { + "service_id": custom_config["exploded"][0], + "type": custom_config["exploded"][1], + "name": custom_config["exploded"][2], + } + ) + else: + config.update( + { + "type": custom_config["exploded"][1], + "name": custom_config["exploded"][2], + } + ) + + custom_config = config + + custom_config["type"] = custom_config["type"].replace("-", "_").lower() # type: ignore + custom_config["data"] = custom_config["data"].encode("utf-8") if isinstance(custom_config["data"], str) else custom_config["data"] + custom_config["checksum"] = sha256(custom_config["data"]).hexdigest() # type: ignore + + service_id = custom_config.get("service_id", None) or None + filters = { + "type": custom_config["type"], + "name": custom_config["name"], } - config["checksum"] = sha256(config["data"]).hexdigest() - if custom_config["exploded"][0]: - if not session.query(Services).with_entities(Services.id).filter_by(id=custom_config["exploded"][0]).first(): - message += f"{endl if message else ''}Service {custom_config['exploded'][0]} not found, please check your config" + if service_id: + filters["service_id"] = service_id - config.update( - { - "service_id": custom_config["exploded"][0], - "type": custom_config["exploded"][1].replace("-", "_").lower(), - "name": custom_config["exploded"][2], - } - ) - else: - config.update( - { - "type": custom_config["exploded"][1].replace("-", "_").lower(), - "name": custom_config["exploded"][2], - } - ) - - custom_conf = ( - session.query(Custom_configs) - .with_entities(Custom_configs.checksum, Custom_configs.method) - .filter_by( - service_id=config.get("service_id", None), - type=config["type"], - name=config["name"], - ) - .first() - ) + custom_conf = session.query(Custom_configs).with_entities(Custom_configs.checksum, Custom_configs.method).filter_by(**filters).first() if not custom_conf: - to_put.append(Custom_configs(**config)) - elif config["checksum"] != custom_conf.checksum and method in ( - custom_conf.method, - "autoconf", - ): - session.query(Custom_configs).filter( - Custom_configs.service_id == config.get("service_id", None), - Custom_configs.type == config["type"], - Custom_configs.name == config["name"], - ).update( - { - Custom_configs.data: config["data"], - Custom_configs.checksum: config["checksum"], - } - | ({Custom_configs.method: "autoconf"} if method == "autoconf" else {}) - ) + to_put.append(Custom_configs(**custom_config)) + elif custom_config["checksum"] != custom_conf.checksum and method in (custom_conf.method, "autoconf"): + custom_conf.data = custom_config["data"] + custom_conf.checksum = custom_config["checksum"] + + if method == "autoconf": + custom_conf.method = method if changed: with suppress(ProgrammingError, OperationalError): metadata = session.query(Metadata).get(1) diff --git a/src/ui/main.py b/src/ui/main.py index 2862d5cce..84e105283 100755 --- a/src/ui/main.py +++ b/src/ui/main.py @@ -195,7 +195,7 @@ try: DEBUG=True, INSTANCES=Instances(docker_client, kubernetes_client, INTEGRATION), CONFIG=Config(db), - CONFIGFILES=ConfigFiles(app.logger, db), + CONFIGFILES=ConfigFiles(), WTF_CSRF_SSL_STRICT=False, USER=USER, SEND_FILE_MAX_AGE_DEFAULT=86400, @@ -255,14 +255,8 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b app.config["TO_FLASH"].append({"content": operation, "type": "success"}) if (was_draft != is_draft or not is_draft) and (moved or deleted): - changes = ["config", "custom_configs"] - error = app.config["CONFIGFILES"].save_configs(check_changes=False) - if error: - app.config["TO_FLASH"].append({"content": error, "type": "error"}) - changes.pop() - # update changes in db - ret = db.checked_changes(changes, value=True) + ret = db.checked_changes(["config", "custom_configs"], value=True) if ret: app.logger.error(f"Couldn't set the changes to checked in the database: {ret}") app.config["TO_FLASH"].append( @@ -1069,6 +1063,8 @@ def global_config(): @app.route("/configs", methods=["GET", "POST"]) @login_required def configs(): + db_configs = db.get_custom_configs() + if request.method == "POST": operation = "" @@ -1080,74 +1076,92 @@ def configs(): "edit", "delete", ): - return redirect_flash_error("Missing operation parameter on /configs.", "configs", True) + return redirect_flash_error("Operation parameter is invalid on /configs.", "configs", True) # Check variables variables = deepcopy(request.form.to_dict()) del variables["csrf_token"] + if variables["type"] != "file": + return redirect_flash_error("Invalid type parameter on /configs.", "configs", True) + operation = app.config["CONFIGFILES"].check_path(variables["path"]) if operation: return redirect_flash_error(operation, "configs", True) + old_name = variables.get("old_name", "").replace(".conf", "") + name = variables.get("name", old_name).replace(".conf", "") + path_exploded = variables["path"].split(sep) + service_id = (path_exploded[5] if len(path_exploded) > 6 else None) or None + root_dir = path_exploded[4].replace("-", "_").lower() + + if not old_name and not name: + return redirect_flash_error("Missing name parameter on /configs.", "configs", True) + + index = -1 + for i, db_config in enumerate(db_configs): + if db_config["type"] == root_dir and db_config["name"] == name and db_config["service_id"] == service_id: + if request.form["operation"] == "new": + return redirect_flash_error(f"Config {name} already exists{f' for service {service_id}' if service_id else ''}", "configs", True) + elif db_config["method"] not in ("ui", "manual"): + return redirect_flash_error( + f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it was not created by the UI or manually", + "configs", + True, + ) + index = i + break + # New or edit a config if request.form["operation"] in ("new", "edit"): - if not app.config["CONFIGFILES"].check_name(variables["name"]): + if not app.config["CONFIGFILES"].check_name(name): return redirect_flash_error( f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores, dots and hyphens (min 4 characters and max 64))", "configs", True, ) - if variables["type"] == "file": - variables["name"] = f"{variables['name']}.conf" + content = BeautifulSoup(variables["content"], "html.parser").get_text() - if "old_name" in variables: - variables["old_name"] = f"{variables['old_name']}.conf" + if request.form["operation"] == "new": + db_configs.append({"type": root_dir, "name": name, "service_id": service_id, "data": content, "method": "ui"}) + operation = f"Created config {name}{f' for service {service_id}' if service_id else ''}" + elif request.form["operation"] == "edit": + if index == -1: + return redirect_flash_error( + f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True + ) - variables["content"] = BeautifulSoup(variables["content"], "html.parser").get_text() + if old_name != name: + db_configs[index]["name"] = name + elif db_configs[index]["data"] == content: + return redirect_flash_error( + f"Config {name} was not edited because no values were changed{f' for service {service_id}' if service_id else ''}", + "configs", + True, + ) - error = False - - if request.form["operation"] == "new" and variables["type"] == "folder": - operation, error = app.config["CONFIGFILES"].create_folder(variables["path"], variables["name"]) - - if request.form["operation"] == "new" and variables["type"] == "file": - operation, error = app.config["CONFIGFILES"].create_file(variables["path"], variables["name"], variables["content"]) - - if request.form["operation"] == "edit" and variables["type"] == "file": - operation, error = app.config["CONFIGFILES"].edit_file( - variables["path"], - variables["name"], - variables.get("old_name", variables["name"]), - variables["content"], - ) - - if request.form["operation"] == "edit" and variables["type"] == "folder": - operation, error = app.config["CONFIGFILES"].edit_folder( - variables["path"], - variables["name"], - variables.get("old_name", variables["name"]), - ) - - if error: - return redirect_flash_error(operation, "configs", True) + db_configs[index]["data"] = content + operation = f"Edited config {name}{f' for service {service_id}' if service_id else ''}" # Delete a config - if request.form["operation"] == "delete": - operation, error = app.config["CONFIGFILES"].delete_path(variables["path"]) + elif request.form["operation"] == "delete": + if index == -1: + return redirect_flash_error( + f"Can't delete config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True + ) - if error: - return redirect_flash_error(operation, "configs", True) + del db_configs[index] + operation = f"Deleted config {name}{f' for service {service_id}' if service_id else ''}" + + error = db.save_custom_configs([config for config in db_configs if config["method"] == "ui"], "ui") + if error: + app.logger.error(f"Could not save custom configs: {error}") + return redirect_flash_error("Couldn't save custom configs", "configs", True) flash(operation) - error = app.config["CONFIGFILES"].save_configs() - - if error: - return redirect_flash_error("Couldn't save custom configs to disk", "configs", True) - return redirect(url_for("loading", next=url_for("configs"))) return render_template( @@ -1155,7 +1169,7 @@ def configs(): folders=[ path_to_dict( join(sep, "etc", "bunkerweb", "configs"), - db_data=db.get_custom_configs(), + db_data=db_configs, services=app.config["CONFIG"].get_config(methods=False).get("SERVER_NAME", "").split(" "), ) ], diff --git a/src/ui/src/ConfigFiles.py b/src/ui/src/ConfigFiles.py index af5f2e778..dedb00c6b 100644 --- a/src/ui/src/ConfigFiles.py +++ b/src/ui/src/ConfigFiles.py @@ -1,84 +1,18 @@ #!/usr/bin/env python3 -from glob import glob -from os import listdir, replace, sep, walk -from os.path import basename, dirname, join +from os import sep +from os.path import join from pathlib import Path from re import compile as re_compile -from shutil import rmtree, move as shutil_move -from typing import Any, Dict, List, Tuple from utils import path_to_dict -def generate_custom_configs( - custom_configs: List[Dict[str, Any]], - *, - original_path: Path = Path(sep, "etc", "bunkerweb", "configs"), -): - original_path.mkdir(parents=True, exist_ok=True) - for custom_config in custom_configs: - tmp_path = original_path.joinpath(custom_config["type"].replace("_", "-")) - if custom_config["service_id"]: - tmp_path = tmp_path.joinpath(custom_config["service_id"]) - tmp_path = tmp_path.joinpath(f"{custom_config['name']}.conf") - tmp_path.parent.mkdir(parents=True, exist_ok=True) - tmp_path.write_bytes(custom_config["data"]) - - class ConfigFiles: - def __init__(self, logger, db): + def __init__(self): self.__name_regex = re_compile(r"^[\w.-]{4,64}$") self.__root_dirs = [child["name"] for child in path_to_dict(join(sep, "etc", "bunkerweb", "configs"))["children"]] self.__file_creation_blacklist = ["http", "stream"] - self.__logger = logger - self.__db = db - - if not Path(sep, "usr", "sbin", "nginx").is_file(): - custom_configs = self.__db.get_custom_configs() - - if custom_configs: - self.__logger.info("Refreshing custom configs ...") - # Remove old custom configs files - for file in glob(join(sep, "etc", "bunkerweb", "configs", "*", "*")): - file = Path(file) - if file.is_symlink() or file.is_file(): - file.unlink() - elif file.is_dir(): - rmtree(str(file), ignore_errors=True) - - generate_custom_configs(custom_configs) - self.__logger.info("Custom configs refreshed successfully") - - def save_configs(self, *, check_changes: bool = True) -> str: - custom_configs = [] - configs_path = join(sep, "etc", "bunkerweb", "configs") - root_dirs = listdir(configs_path) - for root, dirs, files in walk(configs_path): - if files or (dirs and basename(root) not in root_dirs): - path_exploded = root.split("/") - for file in files: - # root_dirs is index 4 on path exploded - # in case this is a service config, index 5 is the service id and index 6 is the config name - # else index 5 is the config name - service_id = path_exploded[5] if len(path_exploded) >= 6 else None - root_dir = path_exploded[4] - path_result = (service_id, root_dir, file.replace(".conf", "")) - with open(join(root, file), "r", encoding="utf-8") as f: - custom_configs.append( - { - "value": f.read(), - "exploded": path_result, - } - ) - - print("custom config", custom_configs, flush=True) - err = self.__db.save_custom_configs(custom_configs, "ui", changed=check_changes) - if err: - self.__logger.error(f"Could not save custom configs: {err}") - return "Couldn't save custom configs to database" - - return "" def check_name(self, name: str) -> bool: return self.__name_regex.match(name) is not None @@ -104,88 +38,3 @@ class ConfigFiles: return f"{join(root_path, root_dir, '/'.join(dirs.split('/')[0:-x]))} doesn't exist" return "" - - def delete_path(self, path: str) -> Tuple[str, int]: - try: - path: Path = Path(path) - if path.is_file(): - path.unlink() - elif path.is_dir(): - rmtree(path, ignore_errors=False) - else: - path = Path(f"{path}.conf") - if path.is_file(): - path.unlink() - else: - rmtree(path, ignore_errors=False) - except OSError: - return f"Could not delete {path}", 1 - - return f"{path} was successfully deleted", 0 - - def create_folder(self, path: str, name: str) -> Tuple[str, int]: - folder_path = join(path, name) if not path.endswith(name) else path - try: - Path(folder_path).mkdir(parents=True) - except OSError: - return f"Could not create {folder_path}", 1 - - return f"The folder {folder_path} was successfully created", 0 - - def create_file(self, path: str, name: str, content: str) -> Tuple[str, int]: - file_path = Path(path, name) - file_path.parent.mkdir(exist_ok=True) - file_path.write_text(content, encoding="utf-8") - return f"The file {file_path} was successfully created", 0 - - def edit_folder(self, path: str, name: str, old_name: str) -> Tuple[str, int]: - new_folder_path = join(dirname(path), name) - old_folder_path = join(dirname(path), old_name) - - if old_folder_path == new_folder_path: - return ( - f"{old_folder_path} was not renamed because the name didn't change", - 0, - ) - - try: - shutil_move(old_folder_path, new_folder_path) - except OSError: - return f"Could not move {old_folder_path}", 1 - - return ( - f"The folder {old_folder_path} was successfully renamed to {new_folder_path}", - 0, - ) - - def edit_file(self, path: str, name: str, old_name: str, content: str) -> Tuple[str, int]: - new_path = join(dirname(path), name) - old_path = join(dirname(path), old_name) - - try: - file_content = Path(old_path).read_text(encoding="utf-8") - except FileNotFoundError: - return f"Could not find {old_path}", 1 - - if old_path == new_path and file_content == content: - return ( - f"{old_path} was not edited because the content and the name didn't change", - 0, - ) - elif file_content == content: - try: - replace(path, new_path) - return f"{old_path} was successfully renamed to {new_path}", 0 - except OSError: - return f"Could not rename {old_path} into {new_path}", 1 - elif old_path == new_path: - new_path = old_path - else: - try: - Path(old_path).unlink() - except OSError: - return f"Could not remove {old_path}", 1 - - Path(new_path).write_text(content, encoding="utf-8") - - return f"The file {old_path} was successfully edited", 0