refactor main.py

This commit is contained in:
Jordan Blasenhauer 2024-02-22 15:28:10 +01:00
parent 3b2a42f657
commit 4e2c0a1908

View file

@ -8,6 +8,7 @@ from secrets import choice
from string import ascii_letters, digits
from sys import path as sys_path, modules as sys_modules
from pathlib import Path
from typing import Union
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
if deps_path not in sys_path:
@ -53,7 +54,8 @@ from utils import check_settings, get_b64encoded_qr_image, path_to_dict, get_rem
from Database import Database # type: ignore
from logging import getLogger
# REPLACE BY REAL VALUES
# REPLACE BY REAL VALUES AFTER
PRO_VERSION = False
PRO_PLUGINS_LIST = [{"name": "metrics pro", "id": "metricspro", "type": "pro"}, {"name": "prometheus", "id": "prometheus", "type": "pro"}, {"name": "emergency", "id": "emergency", "type": "pro"}]
@ -289,6 +291,41 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
app.config["RELOADING"] = False
# UTILS
def is_request_form(url_name: str, next: bool = False):
if not request.form:
flash("Missing form data.", "error")
return redirect(url_for(url_name))
def is_request_params(params: list, url_name: str, next: bool = False):
for param in params:
if param not in request.form:
flash(f"Missing {param} parameter.", "error")
if next:
return redirect(url_for("loading", next=url_for(url_name)))
return redirect(url_for(url_name))
def redirect_flash_error(message: str, url_name: str, next: bool = False, log: Union[bool, str] = False):
flash(message, "error")
if log == "error":
app.logger.error(message)
if log == "exception":
app.logger.exception(message)
if next:
return redirect(url_for("loading", next=url_for(url_name)))
return redirect(url_for(url_name))
@app.before_request
def generate_nonce():
app.config["SCRIPT_NONCE"] = sha256(urandom(32)).hexdigest()
@ -403,53 +440,37 @@ def setup():
return redirect(url_for("login"), 301)
if request.method == "POST":
if not request.form:
flash("Missing form data.", "error")
return redirect(url_for("setup"))
is_request_form("setup")
if not any(key in request.form for key in ("admin_username", "admin_password", "admin_password_check", "server_name", "ui_host", "ui_url")):
flash("Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "error")
return redirect(url_for("setup"))
error = False
return redirect_flash_error("Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "setup")
if len(request.form["admin_username"]) > 256:
flash("The admin username is too long. It must be less than 256 characters.", "error")
error = True
return redirect_flash_error("The admin username is too long. It must be less than 256 characters.", "setup")
if request.form["admin_password"] != request.form["admin_password_check"]:
flash("The passwords do not match.", "error")
error = True
return redirect_flash_error("The passwords do not match.", "setup")
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
flash("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).", "error")
error = True
return redirect_flash_error("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).", "setup")
server_names = db_config["SERVER_NAME"].split(" ")
if request.form["server_name"] in server_names:
flash(f"The hostname {request.form['server_name']} is already in use.", "error")
error = True
return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup")
else:
for server_name in server_names:
if request.form["server_name"] in db_config.get(f"{server_name}_SERVER_NAME", "").split(" "):
flash(f"The hostname {request.form['server_name']} is already in use.", "error")
error = True
break
return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup")
if not REVERSE_PROXY_PATH.match(request.form["ui_host"]):
flash("The hostname is not valid.", "error")
error = True
if error:
return redirect(url_for("setup"))
return redirect_flash_error("The hostname is not valid.", "setup")
app.config["USER"] = User(request.form["admin_username"], request.form["admin_password"], method="ui")
ret = db.create_ui_user(request.form["admin_username"], app.config["USER"].password_hash, method="ui")
if ret:
app.logger.error(f"Couldn't create the admin user in the database: {ret}")
flash(f"Couldn't create the admin user in the database: {ret}", "error")
return redirect(url_for("setup"))
return redirect_flash_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error")
flash("The admin user was created successfully", "success")
@ -490,17 +511,13 @@ def setup():
@login_required
def totp():
if request.method == "POST":
if not request.form:
flash("Missing form data.", "error")
return redirect(url_for("totp"))
if "totp_token" not in request.form:
flash("Missing token parameter.", "error")
return redirect(url_for("totp"))
is_request_form("totp")
is_request_params(["totp_token"], "totp")
if not current_user.check_otp(request.form["totp_token"]):
flash("The token is invalid.", "error")
return redirect(url_for("totp"))
return redirect_flash_error("The token is invalid.", "totp")
session["totp_validated"] = True
redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
@ -575,16 +592,12 @@ def home():
def account():
if request.method == "POST":
# Check form data validity
if not request.form:
flash("Missing form data.", "error")
return redirect(url_for("account"))
elif "operation" not in request.form:
flash("Missing operation parameter.", "error")
return redirect(url_for("account"))
is_request_form("account")
if "curr_password" not in request.form or not current_user.check_password(request.form["curr_password"]):
flash(f"The current password is incorrect. ({request.form['operation']})", "error")
return redirect(url_for("account"))
is_request_params(["operation", "curr_password"], "account")
if not current_user.check_password(request.form["curr_password"]):
return redirect_flash_error(f"The current password is incorrect. ({request.form['operation']})", "account")
username = current_user.get_id()
password = request.form["curr_password"]
@ -592,61 +605,49 @@ def account():
secret_token = current_user.secret_token
if request.form["operation"] == "username":
if "admin_username" not in request.form:
flash("Missing admin_username parameter. (username)", "error")
return redirect(url_for("account"))
elif len(request.form["admin_username"]) > 256:
flash("The admin username is too long. It must be less than 256 characters. (username)", "error")
return redirect(url_for("account"))
is_request_params(["admin_username"], "account")
if len(request.form["admin_username"]) > 256:
return redirect_flash_error("The admin username is too long. It must be less than 256 characters. (username)", "account")
username = request.form["admin_username"]
session.clear()
logout_user()
elif request.form["operation"] == "password":
if "admin_password" not in request.form:
flash("Missing admin_password parameter. (password)", "error")
return redirect(url_for("account"))
elif request.form.get("admin_password"):
if not request.form.get("admin_password_check"):
flash("Missing admin_password_check parameter. (password)", "error")
return redirect(url_for("account"))
elif request.form["admin_password"] != request.form["admin_password_check"]:
flash("The passwords does not match. (password)", "error")
return redirect(url_for("account"))
elif not USER_PASSWORD_RX.match(request.form["admin_password"]):
flash("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)", "error")
return redirect(url_for("account"))
elif request.form.get("admin_password_check"):
flash("Missing admin_password parameter. (password)", "error")
return redirect(url_for("account"))
is_request_params(["admin_password", "admin_password_check"], "account")
if request.form["admin_password"] != request.form["admin_password_check"]:
return redirect_flash_error("The passwords do not match. (password)", "account")
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
return redirect_flash_error(
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)", "account"
)
password = request.form["admin_password"]
session.clear()
logout_user()
elif request.form["operation"] == "totp":
if "totp_token" not in request.form:
flash("Missing totp_token parameter. (totp)", "error")
return redirect(url_for("account"))
elif not current_user.check_otp(request.form["totp_token"], secret=app.config["CURRENT_TOTP_TOKEN"]):
flash("The totp token is invalid. (totp)", "error")
return redirect(url_for("account"))
is_request_params(["totp_token"], "account")
if not current_user.check_otp(request.form["totp_token"], secret=app.config["CURRENT_TOTP_TOKEN"]):
return redirect_flash_error("The totp token is invalid. (totp)", "account")
session["totp_validated"] = not current_user.is_two_factor_enabled
is_two_factor_enabled = session["totp_validated"]
secret_token = None if current_user.is_two_factor_enabled else app.config["CURRENT_TOTP_TOKEN"]
app.config["CURRENT_TOTP_TOKEN"] = None
else:
flash("Invalid operation parameter.", "error")
return redirect(url_for("account"))
return redirect_flash_error("Invalid operation parameter.", "account")
user = User(username, password, is_two_factor_enabled=is_two_factor_enabled, secret_token=secret_token, method=current_user.method)
ret = db.update_ui_user(username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
if ret:
app.logger.error(f"Couldn't update the admin user in the database: {ret}")
flash(f"Couldn't update the admin user in the database: {ret}", "error")
return redirect(url_for("account"))
return redirect_flash_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
flash(
f"The {request.form['operation']} has been successfully updated." if request.form["operation"] != "totp" else f"The two-factor authentication was successfully {'disabled' if current_user.is_two_factor_enabled else 'enabled'}.",
@ -677,20 +678,17 @@ def account():
def instances():
# Manage instances
if request.method == "POST":
is_request_params(["operation", "INSTANCE_ID"], "instances", True)
# Check operation
if "operation" not in request.form or request.form["operation"] not in (
if request.form["operation"] not in (
"reload",
"start",
"stop",
"restart",
):
flash("Missing operation parameter on /instances.", "error")
return redirect(url_for("loading", next=url_for("instances")))
# Check that all fields are present
if "INSTANCE_ID" not in request.form:
flash("Missing INSTANCE_ID parameter.", "error")
return redirect(url_for("loading", next=url_for("instances")))
return redirect_flash_error("Missing operation parameter on /instances.", "instances")
app.config["RELOADING"] = True
app.config["LAST_RELOAD"] = time()
@ -718,13 +716,15 @@ def instances():
@login_required
def services():
if request.method == "POST":
is_request_params(["operation", "is_draft"], "services", True)
# Check operation
if "operation" not in request.form or request.form["operation"] not in ("new", "edit", "delete"):
flash("Missing operation parameter on /services.", "error")
return redirect(url_for("loading", next=url_for("services")))
elif "is_draft" not in request.form or request.form["is_draft"] not in ("yes", "no"):
flash("Missing is_draft parameter on /services.", "error")
return redirect(url_for("loading", next=url_for("services")))
if request.form["operation"] not in ("new", "edit", "delete"):
return redirect_flash_error("Missing operation parameter on /services.", "services")
if request.form["is_draft"] not in ("yes", "no"):
return redirect_flash_error("Missing is_draft parameter on /services.", "services")
# Check variables
variables = deepcopy(request.form.to_dict())
@ -732,8 +732,7 @@ def services():
is_draft = variables.pop("is_draft") == "yes"
if "OLD_SERVER_NAME" not in request.form and request.form["operation"] == "edit":
flash("Missing OLD_SERVER_NAME parameter.", "error")
return redirect(url_for("loading", next=url_for("services")))
return redirect_flash_error("Missing OLD_SERVER_NAME parameter.", "services", True)
if "SERVER_NAME" not in variables:
variables["SERVER_NAME"] = variables["OLD_SERVER_NAME"]
@ -761,22 +760,20 @@ def services():
del variables[variable]
if was_draft == is_draft and request.form["operation"] == "edit" and len(variables) == 1 and "SERVER_NAME" in variables and variables["SERVER_NAME"] == request.form.get("OLD_SERVER_NAME", ""):
flash("The service was not edited because no values were changed.", "error")
return redirect(url_for("loading", next=url_for("services")))
return redirect_flash_error("The service was not edited because no values were changed.", "services", True)
elif request.form["operation"] == "new" and not variables:
flash("The service was not created because all values had the default value.", "error")
return redirect(url_for("loading", next=url_for("services")))
return redirect_flash_error("The service was not created because all values had the default value.", "services", True)
error = app.config["CONFIG"].check_variables(variables)
if error:
return redirect(url_for("loading", next=url_for("services")))
return redirect_flash_error("The config variable checks returned error", "services", True)
# Delete
elif request.form["operation"] == "delete":
if "SERVER_NAME" not in request.form:
flash("Missing SERVER_NAME parameter.", "error")
return redirect(url_for("loading", next=url_for("services")))
is_request_params(["SERVER_NAME"], "services", True)
error = app.config["CONFIG"].check_variables({"SERVER_NAME": request.form["SERVER_NAME"]})
@ -860,13 +857,12 @@ def global_config():
del variables[variable]
if not variables:
flash("The global configuration was not edited because no values were changed.")
return redirect(url_for("loading", next=url_for("global_config")))
return redirect_flash_error("The global configuration was not edited because all values had the default value.", "global_config", True)
error = app.config["CONFIG"].check_variables(variables, True)
if error:
return redirect(url_for("loading", next=url_for("global_config")))
return redirect_flash_error("The global configuration variable checks returned error", "global_config", True)
# Reload instances
app.config["RELOADING"] = True
@ -898,14 +894,15 @@ def configs():
if request.method == "POST":
operation = ""
is_request_params(["operation"], "configs", True)
# Check operation
if "operation" not in request.form or request.form["operation"] not in (
if request.form["operation"] not in (
"new",
"edit",
"delete",
):
flash("Missing operation parameter on /configs.", "error")
return redirect(url_for("loading", next=url_for("configs")))
return redirect_flash_error("Missing operation parameter on /configs.", "configs", True)
# Check variables
variables = deepcopy(request.form.to_dict())
@ -914,16 +911,15 @@ def configs():
operation = app.config["CONFIGFILES"].check_path(variables["path"])
if operation:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("configs")))
return redirect_flash_error(operation, "configs", True)
if request.form["operation"] in ("new", "edit"):
if not app.config["CONFIGFILES"].check_name(variables["name"]):
flash(
return redirect_flash_error(
f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores, dots and hyphens (min 4 characters and max 64))",
"error",
"configs",
True,
)
return redirect(url_for("loading", next=url_for("configs")))
if variables["type"] == "file":
variables["name"] = f"{variables['name']}.conf"
@ -956,8 +952,8 @@ def configs():
)
if error:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("configs")))
return redirect_flash_error(operation, "configs", True)
else:
operation, error = app.config["CONFIGFILES"].delete_path(variables["path"])
@ -968,8 +964,9 @@ def configs():
flash(operation)
error = app.config["CONFIGFILES"].save_configs()
if error:
flash("Couldn't save custom configs to database", "error")
return redirect_flash_error("Couldn't save custom configs to disk", "configs", True)
return redirect(url_for("loading", next=url_for("configs")))
@ -1000,8 +997,7 @@ def plugins():
del variables["csrf_token"]
if variables["type"] in ("core", "pro"):
flash(f"Can't delete {variables['type']} plugin {variables['name']}", "error")
return redirect(url_for("loading", next=url_for("plugins")))
return redirect_flash_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True)
plugins = app.config["CONFIG"].get_plugins()
for x, plugin in enumerate(deepcopy(plugins)):
@ -1014,8 +1010,7 @@ def plugins():
flash(f"Deleted plugin {variables['name']} successfully")
else:
if not tmp_ui_path.exists() or not listdir(str(tmp_ui_path)):
flash("Please upload new plugins to reload plugins", "error")
return redirect(url_for("loading", next=url_for("plugins")))
return redirect_flash_error("Please upload new plugins to reload plugins", "plugins", True)
errors = 0
files_count = 0
@ -1334,6 +1329,9 @@ def custom_plugin(plugin: str):
is_metrics_on = True
break
# Check if the plugin is used
# Here we have specific cases for some plugins
# {plugin_id: [[setting_name, setting_false], ...]}
specific_cases = {
"limit": [["USE_LIMIT_REQ", "no"], ["USE_LIMIT_CONN", "no"]],
@ -1803,25 +1801,15 @@ def bans():
if request.method == "POST":
# Check variables
if not request.form:
flash("Missing form data.", "error")
return redirect(url_for("bans"))
is_request_form("bans")
if "operation" not in request.form:
flash("Operation unknown", "error")
return redirect(url_for("bans"))
if "data" not in request.form:
flash("No data to proceed", "error")
return redirect(url_for("bans"))
is_request_params(["operation", "data"], "bans")
try:
data = json_loads(request.form["data"])
assert isinstance(data, list)
except BaseException:
app.logger.exception(f"Couldn't load data: {request.form['data']}")
flash("Data must be a list of dict", "error")
return redirect(url_for("bans"))
return redirect_flash_error("Data must be a list of dict", "bans", False, "exception")
if request.form["operation"] == "unban":
for unban in data:
@ -1872,8 +1860,7 @@ def bans():
else:
flash(f"Successfully banned {ban['ip']}")
else:
flash("Operation unknown", "error")
return redirect(url_for("bans"))
return redirect_flash_error("Operation unknown", "bans")
return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))