diff --git a/src/ui/main.py b/src/ui/main.py index 27e133260..4f472c8e5 100755 --- a/src/ui/main.py +++ b/src/ui/main.py @@ -8,6 +8,7 @@ from secrets import choice from string import ascii_letters, digits from sys import path as sys_path, modules as sys_modules from pathlib import Path +from typing import Union for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]: if deps_path not in sys_path: @@ -53,7 +54,8 @@ from utils import check_settings, get_b64encoded_qr_image, path_to_dict, get_rem from Database import Database # type: ignore from logging import getLogger -# REPLACE BY REAL VALUES + +# REPLACE BY REAL VALUES AFTER PRO_VERSION = False PRO_PLUGINS_LIST = [{"name": "metrics pro", "id": "metricspro", "type": "pro"}, {"name": "prometheus", "id": "prometheus", "type": "pro"}, {"name": "emergency", "id": "emergency", "type": "pro"}] @@ -289,6 +291,41 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b app.config["RELOADING"] = False +# UTILS + + +def is_request_form(url_name: str, next: bool = False): + if not request.form: + flash("Missing form data.", "error") + return redirect(url_for(url_name)) + + +def is_request_params(params: list, url_name: str, next: bool = False): + for param in params: + if param not in request.form: + flash(f"Missing {param} parameter.", "error") + if next: + return redirect(url_for("loading", next=url_for(url_name))) + + return redirect(url_for(url_name)) + + +def redirect_flash_error(message: str, url_name: str, next: bool = False, log: Union[bool, str] = False): + + flash(message, "error") + + if log == "error": + app.logger.error(message) + + if log == "exception": + app.logger.exception(message) + + if next: + return redirect(url_for("loading", next=url_for(url_name))) + + return redirect(url_for(url_name)) + + @app.before_request def generate_nonce(): app.config["SCRIPT_NONCE"] = sha256(urandom(32)).hexdigest() @@ -403,53 +440,37 @@ def setup(): return redirect(url_for("login"), 301) if request.method == "POST": - if not request.form: - flash("Missing form data.", "error") - return redirect(url_for("setup")) + + is_request_form("setup") if not any(key in request.form for key in ("admin_username", "admin_password", "admin_password_check", "server_name", "ui_host", "ui_url")): - flash("Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "error") - return redirect(url_for("setup")) - - error = False + return redirect_flash_error("Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "setup") if len(request.form["admin_username"]) > 256: - flash("The admin username is too long. It must be less than 256 characters.", "error") - error = True + return redirect_flash_error("The admin username is too long. It must be less than 256 characters.", "setup") if request.form["admin_password"] != request.form["admin_password_check"]: - flash("The passwords do not match.", "error") - error = True + return redirect_flash_error("The passwords do not match.", "setup") if not USER_PASSWORD_RX.match(request.form["admin_password"]): - flash("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).", "error") - error = True + return redirect_flash_error("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).", "setup") server_names = db_config["SERVER_NAME"].split(" ") if request.form["server_name"] in server_names: - flash(f"The hostname {request.form['server_name']} is already in use.", "error") - error = True + return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup") else: for server_name in server_names: if request.form["server_name"] in db_config.get(f"{server_name}_SERVER_NAME", "").split(" "): - flash(f"The hostname {request.form['server_name']} is already in use.", "error") - error = True - break + return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup") if not REVERSE_PROXY_PATH.match(request.form["ui_host"]): - flash("The hostname is not valid.", "error") - error = True - - if error: - return redirect(url_for("setup")) + return redirect_flash_error("The hostname is not valid.", "setup") app.config["USER"] = User(request.form["admin_username"], request.form["admin_password"], method="ui") ret = db.create_ui_user(request.form["admin_username"], app.config["USER"].password_hash, method="ui") if ret: - app.logger.error(f"Couldn't create the admin user in the database: {ret}") - flash(f"Couldn't create the admin user in the database: {ret}", "error") - return redirect(url_for("setup")) + return redirect_flash_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error") flash("The admin user was created successfully", "success") @@ -490,17 +511,13 @@ def setup(): @login_required def totp(): if request.method == "POST": - if not request.form: - flash("Missing form data.", "error") - return redirect(url_for("totp")) - if "totp_token" not in request.form: - flash("Missing token parameter.", "error") - return redirect(url_for("totp")) + is_request_form("totp") + + is_request_params(["totp_token"], "totp") if not current_user.check_otp(request.form["totp_token"]): - flash("The token is invalid.", "error") - return redirect(url_for("totp")) + return redirect_flash_error("The token is invalid.", "totp") session["totp_validated"] = True redirect(url_for("loading", next=request.form.get("next") or url_for("home"))) @@ -575,16 +592,12 @@ def home(): def account(): if request.method == "POST": # Check form data validity - if not request.form: - flash("Missing form data.", "error") - return redirect(url_for("account")) - elif "operation" not in request.form: - flash("Missing operation parameter.", "error") - return redirect(url_for("account")) + is_request_form("account") - if "curr_password" not in request.form or not current_user.check_password(request.form["curr_password"]): - flash(f"The current password is incorrect. ({request.form['operation']})", "error") - return redirect(url_for("account")) + is_request_params(["operation", "curr_password"], "account") + + if not current_user.check_password(request.form["curr_password"]): + return redirect_flash_error(f"The current password is incorrect. ({request.form['operation']})", "account") username = current_user.get_id() password = request.form["curr_password"] @@ -592,61 +605,49 @@ def account(): secret_token = current_user.secret_token if request.form["operation"] == "username": - if "admin_username" not in request.form: - flash("Missing admin_username parameter. (username)", "error") - return redirect(url_for("account")) - elif len(request.form["admin_username"]) > 256: - flash("The admin username is too long. It must be less than 256 characters. (username)", "error") - return redirect(url_for("account")) + is_request_params(["admin_username"], "account") + + if len(request.form["admin_username"]) > 256: + return redirect_flash_error("The admin username is too long. It must be less than 256 characters. (username)", "account") username = request.form["admin_username"] session.clear() logout_user() elif request.form["operation"] == "password": - if "admin_password" not in request.form: - flash("Missing admin_password parameter. (password)", "error") - return redirect(url_for("account")) - elif request.form.get("admin_password"): - if not request.form.get("admin_password_check"): - flash("Missing admin_password_check parameter. (password)", "error") - return redirect(url_for("account")) - elif request.form["admin_password"] != request.form["admin_password_check"]: - flash("The passwords does not match. (password)", "error") - return redirect(url_for("account")) - elif not USER_PASSWORD_RX.match(request.form["admin_password"]): - flash("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)", "error") - return redirect(url_for("account")) - elif request.form.get("admin_password_check"): - flash("Missing admin_password parameter. (password)", "error") - return redirect(url_for("account")) + + is_request_params(["admin_password", "admin_password_check"], "account") + + if request.form["admin_password"] != request.form["admin_password_check"]: + return redirect_flash_error("The passwords do not match. (password)", "account") + + if not USER_PASSWORD_RX.match(request.form["admin_password"]): + return redirect_flash_error( + "The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)", "account" + ) password = request.form["admin_password"] session.clear() logout_user() elif request.form["operation"] == "totp": - if "totp_token" not in request.form: - flash("Missing totp_token parameter. (totp)", "error") - return redirect(url_for("account")) - elif not current_user.check_otp(request.form["totp_token"], secret=app.config["CURRENT_TOTP_TOKEN"]): - flash("The totp token is invalid. (totp)", "error") - return redirect(url_for("account")) + + is_request_params(["totp_token"], "account") + + if not current_user.check_otp(request.form["totp_token"], secret=app.config["CURRENT_TOTP_TOKEN"]): + return redirect_flash_error("The totp token is invalid. (totp)", "account") session["totp_validated"] = not current_user.is_two_factor_enabled is_two_factor_enabled = session["totp_validated"] secret_token = None if current_user.is_two_factor_enabled else app.config["CURRENT_TOTP_TOKEN"] app.config["CURRENT_TOTP_TOKEN"] = None else: - flash("Invalid operation parameter.", "error") - return redirect(url_for("account")) + return redirect_flash_error("Invalid operation parameter.", "account") user = User(username, password, is_two_factor_enabled=is_two_factor_enabled, secret_token=secret_token, method=current_user.method) ret = db.update_ui_user(username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui") if ret: - app.logger.error(f"Couldn't update the admin user in the database: {ret}") - flash(f"Couldn't update the admin user in the database: {ret}", "error") - return redirect(url_for("account")) + return redirect_flash_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error") flash( f"The {request.form['operation']} has been successfully updated." if request.form["operation"] != "totp" else f"The two-factor authentication was successfully {'disabled' if current_user.is_two_factor_enabled else 'enabled'}.", @@ -677,20 +678,17 @@ def account(): def instances(): # Manage instances if request.method == "POST": + + is_request_params(["operation", "INSTANCE_ID"], "instances", True) + # Check operation - if "operation" not in request.form or request.form["operation"] not in ( + if request.form["operation"] not in ( "reload", "start", "stop", "restart", ): - flash("Missing operation parameter on /instances.", "error") - return redirect(url_for("loading", next=url_for("instances"))) - - # Check that all fields are present - if "INSTANCE_ID" not in request.form: - flash("Missing INSTANCE_ID parameter.", "error") - return redirect(url_for("loading", next=url_for("instances"))) + return redirect_flash_error("Missing operation parameter on /instances.", "instances") app.config["RELOADING"] = True app.config["LAST_RELOAD"] = time() @@ -718,13 +716,15 @@ def instances(): @login_required def services(): if request.method == "POST": + + is_request_params(["operation", "is_draft"], "services", True) + # Check operation - if "operation" not in request.form or request.form["operation"] not in ("new", "edit", "delete"): - flash("Missing operation parameter on /services.", "error") - return redirect(url_for("loading", next=url_for("services"))) - elif "is_draft" not in request.form or request.form["is_draft"] not in ("yes", "no"): - flash("Missing is_draft parameter on /services.", "error") - return redirect(url_for("loading", next=url_for("services"))) + if request.form["operation"] not in ("new", "edit", "delete"): + return redirect_flash_error("Missing operation parameter on /services.", "services") + + if request.form["is_draft"] not in ("yes", "no"): + return redirect_flash_error("Missing is_draft parameter on /services.", "services") # Check variables variables = deepcopy(request.form.to_dict()) @@ -732,8 +732,7 @@ def services(): is_draft = variables.pop("is_draft") == "yes" if "OLD_SERVER_NAME" not in request.form and request.form["operation"] == "edit": - flash("Missing OLD_SERVER_NAME parameter.", "error") - return redirect(url_for("loading", next=url_for("services"))) + return redirect_flash_error("Missing OLD_SERVER_NAME parameter.", "services", True) if "SERVER_NAME" not in variables: variables["SERVER_NAME"] = variables["OLD_SERVER_NAME"] @@ -761,22 +760,20 @@ def services(): del variables[variable] if was_draft == is_draft and request.form["operation"] == "edit" and len(variables) == 1 and "SERVER_NAME" in variables and variables["SERVER_NAME"] == request.form.get("OLD_SERVER_NAME", ""): - flash("The service was not edited because no values were changed.", "error") - return redirect(url_for("loading", next=url_for("services"))) + return redirect_flash_error("The service was not edited because no values were changed.", "services", True) + elif request.form["operation"] == "new" and not variables: - flash("The service was not created because all values had the default value.", "error") - return redirect(url_for("loading", next=url_for("services"))) + return redirect_flash_error("The service was not created because all values had the default value.", "services", True) error = app.config["CONFIG"].check_variables(variables) if error: - return redirect(url_for("loading", next=url_for("services"))) + return redirect_flash_error("The config variable checks returned error", "services", True) # Delete elif request.form["operation"] == "delete": - if "SERVER_NAME" not in request.form: - flash("Missing SERVER_NAME parameter.", "error") - return redirect(url_for("loading", next=url_for("services"))) + + is_request_params(["SERVER_NAME"], "services", True) error = app.config["CONFIG"].check_variables({"SERVER_NAME": request.form["SERVER_NAME"]}) @@ -860,13 +857,12 @@ def global_config(): del variables[variable] if not variables: - flash("The global configuration was not edited because no values were changed.") - return redirect(url_for("loading", next=url_for("global_config"))) + return redirect_flash_error("The global configuration was not edited because all values had the default value.", "global_config", True) error = app.config["CONFIG"].check_variables(variables, True) if error: - return redirect(url_for("loading", next=url_for("global_config"))) + return redirect_flash_error("The global configuration variable checks returned error", "global_config", True) # Reload instances app.config["RELOADING"] = True @@ -898,14 +894,15 @@ def configs(): if request.method == "POST": operation = "" + is_request_params(["operation"], "configs", True) + # Check operation - if "operation" not in request.form or request.form["operation"] not in ( + if request.form["operation"] not in ( "new", "edit", "delete", ): - flash("Missing operation parameter on /configs.", "error") - return redirect(url_for("loading", next=url_for("configs"))) + return redirect_flash_error("Missing operation parameter on /configs.", "configs", True) # Check variables variables = deepcopy(request.form.to_dict()) @@ -914,16 +911,15 @@ def configs(): operation = app.config["CONFIGFILES"].check_path(variables["path"]) if operation: - flash(operation, "error") - return redirect(url_for("loading", next=url_for("configs"))) + return redirect_flash_error(operation, "configs", True) if request.form["operation"] in ("new", "edit"): if not app.config["CONFIGFILES"].check_name(variables["name"]): - flash( + return redirect_flash_error( f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores, dots and hyphens (min 4 characters and max 64))", - "error", + "configs", + True, ) - return redirect(url_for("loading", next=url_for("configs"))) if variables["type"] == "file": variables["name"] = f"{variables['name']}.conf" @@ -956,8 +952,8 @@ def configs(): ) if error: - flash(operation, "error") - return redirect(url_for("loading", next=url_for("configs"))) + return redirect_flash_error(operation, "configs", True) + else: operation, error = app.config["CONFIGFILES"].delete_path(variables["path"]) @@ -968,8 +964,9 @@ def configs(): flash(operation) error = app.config["CONFIGFILES"].save_configs() + if error: - flash("Couldn't save custom configs to database", "error") + return redirect_flash_error("Couldn't save custom configs to disk", "configs", True) return redirect(url_for("loading", next=url_for("configs"))) @@ -1000,8 +997,7 @@ def plugins(): del variables["csrf_token"] if variables["type"] in ("core", "pro"): - flash(f"Can't delete {variables['type']} plugin {variables['name']}", "error") - return redirect(url_for("loading", next=url_for("plugins"))) + return redirect_flash_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True) plugins = app.config["CONFIG"].get_plugins() for x, plugin in enumerate(deepcopy(plugins)): @@ -1014,8 +1010,7 @@ def plugins(): flash(f"Deleted plugin {variables['name']} successfully") else: if not tmp_ui_path.exists() or not listdir(str(tmp_ui_path)): - flash("Please upload new plugins to reload plugins", "error") - return redirect(url_for("loading", next=url_for("plugins"))) + return redirect_flash_error("Please upload new plugins to reload plugins", "plugins", True) errors = 0 files_count = 0 @@ -1334,6 +1329,9 @@ def custom_plugin(plugin: str): is_metrics_on = True break + # Check if the plugin is used + + # Here we have specific cases for some plugins # {plugin_id: [[setting_name, setting_false], ...]} specific_cases = { "limit": [["USE_LIMIT_REQ", "no"], ["USE_LIMIT_CONN", "no"]], @@ -1803,25 +1801,15 @@ def bans(): if request.method == "POST": # Check variables - if not request.form: - flash("Missing form data.", "error") - return redirect(url_for("bans")) + is_request_form("bans") - if "operation" not in request.form: - flash("Operation unknown", "error") - return redirect(url_for("bans")) - - if "data" not in request.form: - flash("No data to proceed", "error") - return redirect(url_for("bans")) + is_request_params(["operation", "data"], "bans") try: data = json_loads(request.form["data"]) assert isinstance(data, list) except BaseException: - app.logger.exception(f"Couldn't load data: {request.form['data']}") - flash("Data must be a list of dict", "error") - return redirect(url_for("bans")) + return redirect_flash_error("Data must be a list of dict", "bans", False, "exception") if request.form["operation"] == "unban": for unban in data: @@ -1872,8 +1860,7 @@ def bans(): else: flash(f"Successfully banned {ban['ip']}") else: - flash("Operation unknown", "error") - return redirect(url_for("bans")) + return redirect_flash_error("Operation unknown", "bans") return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))