mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
refactor main.py
This commit is contained in:
parent
3b2a42f657
commit
4e2c0a1908
1 changed files with 121 additions and 134 deletions
255
src/ui/main.py
255
src/ui/main.py
|
|
@ -8,6 +8,7 @@ from secrets import choice
|
|||
from string import ascii_letters, digits
|
||||
from sys import path as sys_path, modules as sys_modules
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
|
||||
if deps_path not in sys_path:
|
||||
|
|
@ -53,7 +54,8 @@ from utils import check_settings, get_b64encoded_qr_image, path_to_dict, get_rem
|
|||
from Database import Database # type: ignore
|
||||
from logging import getLogger
|
||||
|
||||
# REPLACE BY REAL VALUES
|
||||
|
||||
# REPLACE BY REAL VALUES AFTER
|
||||
PRO_VERSION = False
|
||||
PRO_PLUGINS_LIST = [{"name": "metrics pro", "id": "metricspro", "type": "pro"}, {"name": "prometheus", "id": "prometheus", "type": "pro"}, {"name": "emergency", "id": "emergency", "type": "pro"}]
|
||||
|
||||
|
|
@ -289,6 +291,41 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
|
|||
app.config["RELOADING"] = False
|
||||
|
||||
|
||||
# UTILS
|
||||
|
||||
|
||||
def is_request_form(url_name: str, next: bool = False):
|
||||
if not request.form:
|
||||
flash("Missing form data.", "error")
|
||||
return redirect(url_for(url_name))
|
||||
|
||||
|
||||
def is_request_params(params: list, url_name: str, next: bool = False):
|
||||
for param in params:
|
||||
if param not in request.form:
|
||||
flash(f"Missing {param} parameter.", "error")
|
||||
if next:
|
||||
return redirect(url_for("loading", next=url_for(url_name)))
|
||||
|
||||
return redirect(url_for(url_name))
|
||||
|
||||
|
||||
def redirect_flash_error(message: str, url_name: str, next: bool = False, log: Union[bool, str] = False):
|
||||
|
||||
flash(message, "error")
|
||||
|
||||
if log == "error":
|
||||
app.logger.error(message)
|
||||
|
||||
if log == "exception":
|
||||
app.logger.exception(message)
|
||||
|
||||
if next:
|
||||
return redirect(url_for("loading", next=url_for(url_name)))
|
||||
|
||||
return redirect(url_for(url_name))
|
||||
|
||||
|
||||
@app.before_request
|
||||
def generate_nonce():
|
||||
app.config["SCRIPT_NONCE"] = sha256(urandom(32)).hexdigest()
|
||||
|
|
@ -403,53 +440,37 @@ def setup():
|
|||
return redirect(url_for("login"), 301)
|
||||
|
||||
if request.method == "POST":
|
||||
if not request.form:
|
||||
flash("Missing form data.", "error")
|
||||
return redirect(url_for("setup"))
|
||||
|
||||
is_request_form("setup")
|
||||
|
||||
if not any(key in request.form for key in ("admin_username", "admin_password", "admin_password_check", "server_name", "ui_host", "ui_url")):
|
||||
flash("Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "error")
|
||||
return redirect(url_for("setup"))
|
||||
|
||||
error = False
|
||||
return redirect_flash_error("Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "setup")
|
||||
|
||||
if len(request.form["admin_username"]) > 256:
|
||||
flash("The admin username is too long. It must be less than 256 characters.", "error")
|
||||
error = True
|
||||
return redirect_flash_error("The admin username is too long. It must be less than 256 characters.", "setup")
|
||||
|
||||
if request.form["admin_password"] != request.form["admin_password_check"]:
|
||||
flash("The passwords do not match.", "error")
|
||||
error = True
|
||||
return redirect_flash_error("The passwords do not match.", "setup")
|
||||
|
||||
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
|
||||
flash("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).", "error")
|
||||
error = True
|
||||
return redirect_flash_error("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).", "setup")
|
||||
|
||||
server_names = db_config["SERVER_NAME"].split(" ")
|
||||
if request.form["server_name"] in server_names:
|
||||
flash(f"The hostname {request.form['server_name']} is already in use.", "error")
|
||||
error = True
|
||||
return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup")
|
||||
else:
|
||||
for server_name in server_names:
|
||||
if request.form["server_name"] in db_config.get(f"{server_name}_SERVER_NAME", "").split(" "):
|
||||
flash(f"The hostname {request.form['server_name']} is already in use.", "error")
|
||||
error = True
|
||||
break
|
||||
return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup")
|
||||
|
||||
if not REVERSE_PROXY_PATH.match(request.form["ui_host"]):
|
||||
flash("The hostname is not valid.", "error")
|
||||
error = True
|
||||
|
||||
if error:
|
||||
return redirect(url_for("setup"))
|
||||
return redirect_flash_error("The hostname is not valid.", "setup")
|
||||
|
||||
app.config["USER"] = User(request.form["admin_username"], request.form["admin_password"], method="ui")
|
||||
|
||||
ret = db.create_ui_user(request.form["admin_username"], app.config["USER"].password_hash, method="ui")
|
||||
if ret:
|
||||
app.logger.error(f"Couldn't create the admin user in the database: {ret}")
|
||||
flash(f"Couldn't create the admin user in the database: {ret}", "error")
|
||||
return redirect(url_for("setup"))
|
||||
return redirect_flash_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error")
|
||||
|
||||
flash("The admin user was created successfully", "success")
|
||||
|
||||
|
|
@ -490,17 +511,13 @@ def setup():
|
|||
@login_required
|
||||
def totp():
|
||||
if request.method == "POST":
|
||||
if not request.form:
|
||||
flash("Missing form data.", "error")
|
||||
return redirect(url_for("totp"))
|
||||
|
||||
if "totp_token" not in request.form:
|
||||
flash("Missing token parameter.", "error")
|
||||
return redirect(url_for("totp"))
|
||||
is_request_form("totp")
|
||||
|
||||
is_request_params(["totp_token"], "totp")
|
||||
|
||||
if not current_user.check_otp(request.form["totp_token"]):
|
||||
flash("The token is invalid.", "error")
|
||||
return redirect(url_for("totp"))
|
||||
return redirect_flash_error("The token is invalid.", "totp")
|
||||
|
||||
session["totp_validated"] = True
|
||||
redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
|
||||
|
|
@ -575,16 +592,12 @@ def home():
|
|||
def account():
|
||||
if request.method == "POST":
|
||||
# Check form data validity
|
||||
if not request.form:
|
||||
flash("Missing form data.", "error")
|
||||
return redirect(url_for("account"))
|
||||
elif "operation" not in request.form:
|
||||
flash("Missing operation parameter.", "error")
|
||||
return redirect(url_for("account"))
|
||||
is_request_form("account")
|
||||
|
||||
if "curr_password" not in request.form or not current_user.check_password(request.form["curr_password"]):
|
||||
flash(f"The current password is incorrect. ({request.form['operation']})", "error")
|
||||
return redirect(url_for("account"))
|
||||
is_request_params(["operation", "curr_password"], "account")
|
||||
|
||||
if not current_user.check_password(request.form["curr_password"]):
|
||||
return redirect_flash_error(f"The current password is incorrect. ({request.form['operation']})", "account")
|
||||
|
||||
username = current_user.get_id()
|
||||
password = request.form["curr_password"]
|
||||
|
|
@ -592,61 +605,49 @@ def account():
|
|||
secret_token = current_user.secret_token
|
||||
|
||||
if request.form["operation"] == "username":
|
||||
if "admin_username" not in request.form:
|
||||
flash("Missing admin_username parameter. (username)", "error")
|
||||
return redirect(url_for("account"))
|
||||
elif len(request.form["admin_username"]) > 256:
|
||||
flash("The admin username is too long. It must be less than 256 characters. (username)", "error")
|
||||
return redirect(url_for("account"))
|
||||
is_request_params(["admin_username"], "account")
|
||||
|
||||
if len(request.form["admin_username"]) > 256:
|
||||
return redirect_flash_error("The admin username is too long. It must be less than 256 characters. (username)", "account")
|
||||
|
||||
username = request.form["admin_username"]
|
||||
|
||||
session.clear()
|
||||
logout_user()
|
||||
elif request.form["operation"] == "password":
|
||||
if "admin_password" not in request.form:
|
||||
flash("Missing admin_password parameter. (password)", "error")
|
||||
return redirect(url_for("account"))
|
||||
elif request.form.get("admin_password"):
|
||||
if not request.form.get("admin_password_check"):
|
||||
flash("Missing admin_password_check parameter. (password)", "error")
|
||||
return redirect(url_for("account"))
|
||||
elif request.form["admin_password"] != request.form["admin_password_check"]:
|
||||
flash("The passwords does not match. (password)", "error")
|
||||
return redirect(url_for("account"))
|
||||
elif not USER_PASSWORD_RX.match(request.form["admin_password"]):
|
||||
flash("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)", "error")
|
||||
return redirect(url_for("account"))
|
||||
elif request.form.get("admin_password_check"):
|
||||
flash("Missing admin_password parameter. (password)", "error")
|
||||
return redirect(url_for("account"))
|
||||
|
||||
is_request_params(["admin_password", "admin_password_check"], "account")
|
||||
|
||||
if request.form["admin_password"] != request.form["admin_password_check"]:
|
||||
return redirect_flash_error("The passwords do not match. (password)", "account")
|
||||
|
||||
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
|
||||
return redirect_flash_error(
|
||||
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)", "account"
|
||||
)
|
||||
|
||||
password = request.form["admin_password"]
|
||||
|
||||
session.clear()
|
||||
logout_user()
|
||||
elif request.form["operation"] == "totp":
|
||||
if "totp_token" not in request.form:
|
||||
flash("Missing totp_token parameter. (totp)", "error")
|
||||
return redirect(url_for("account"))
|
||||
elif not current_user.check_otp(request.form["totp_token"], secret=app.config["CURRENT_TOTP_TOKEN"]):
|
||||
flash("The totp token is invalid. (totp)", "error")
|
||||
return redirect(url_for("account"))
|
||||
|
||||
is_request_params(["totp_token"], "account")
|
||||
|
||||
if not current_user.check_otp(request.form["totp_token"], secret=app.config["CURRENT_TOTP_TOKEN"]):
|
||||
return redirect_flash_error("The totp token is invalid. (totp)", "account")
|
||||
|
||||
session["totp_validated"] = not current_user.is_two_factor_enabled
|
||||
is_two_factor_enabled = session["totp_validated"]
|
||||
secret_token = None if current_user.is_two_factor_enabled else app.config["CURRENT_TOTP_TOKEN"]
|
||||
app.config["CURRENT_TOTP_TOKEN"] = None
|
||||
else:
|
||||
flash("Invalid operation parameter.", "error")
|
||||
return redirect(url_for("account"))
|
||||
return redirect_flash_error("Invalid operation parameter.", "account")
|
||||
|
||||
user = User(username, password, is_two_factor_enabled=is_two_factor_enabled, secret_token=secret_token, method=current_user.method)
|
||||
ret = db.update_ui_user(username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
|
||||
if ret:
|
||||
app.logger.error(f"Couldn't update the admin user in the database: {ret}")
|
||||
flash(f"Couldn't update the admin user in the database: {ret}", "error")
|
||||
return redirect(url_for("account"))
|
||||
return redirect_flash_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
|
||||
|
||||
flash(
|
||||
f"The {request.form['operation']} has been successfully updated." if request.form["operation"] != "totp" else f"The two-factor authentication was successfully {'disabled' if current_user.is_two_factor_enabled else 'enabled'}.",
|
||||
|
|
@ -677,20 +678,17 @@ def account():
|
|||
def instances():
|
||||
# Manage instances
|
||||
if request.method == "POST":
|
||||
|
||||
is_request_params(["operation", "INSTANCE_ID"], "instances", True)
|
||||
|
||||
# Check operation
|
||||
if "operation" not in request.form or request.form["operation"] not in (
|
||||
if request.form["operation"] not in (
|
||||
"reload",
|
||||
"start",
|
||||
"stop",
|
||||
"restart",
|
||||
):
|
||||
flash("Missing operation parameter on /instances.", "error")
|
||||
return redirect(url_for("loading", next=url_for("instances")))
|
||||
|
||||
# Check that all fields are present
|
||||
if "INSTANCE_ID" not in request.form:
|
||||
flash("Missing INSTANCE_ID parameter.", "error")
|
||||
return redirect(url_for("loading", next=url_for("instances")))
|
||||
return redirect_flash_error("Missing operation parameter on /instances.", "instances")
|
||||
|
||||
app.config["RELOADING"] = True
|
||||
app.config["LAST_RELOAD"] = time()
|
||||
|
|
@ -718,13 +716,15 @@ def instances():
|
|||
@login_required
|
||||
def services():
|
||||
if request.method == "POST":
|
||||
|
||||
is_request_params(["operation", "is_draft"], "services", True)
|
||||
|
||||
# Check operation
|
||||
if "operation" not in request.form or request.form["operation"] not in ("new", "edit", "delete"):
|
||||
flash("Missing operation parameter on /services.", "error")
|
||||
return redirect(url_for("loading", next=url_for("services")))
|
||||
elif "is_draft" not in request.form or request.form["is_draft"] not in ("yes", "no"):
|
||||
flash("Missing is_draft parameter on /services.", "error")
|
||||
return redirect(url_for("loading", next=url_for("services")))
|
||||
if request.form["operation"] not in ("new", "edit", "delete"):
|
||||
return redirect_flash_error("Missing operation parameter on /services.", "services")
|
||||
|
||||
if request.form["is_draft"] not in ("yes", "no"):
|
||||
return redirect_flash_error("Missing is_draft parameter on /services.", "services")
|
||||
|
||||
# Check variables
|
||||
variables = deepcopy(request.form.to_dict())
|
||||
|
|
@ -732,8 +732,7 @@ def services():
|
|||
is_draft = variables.pop("is_draft") == "yes"
|
||||
|
||||
if "OLD_SERVER_NAME" not in request.form and request.form["operation"] == "edit":
|
||||
flash("Missing OLD_SERVER_NAME parameter.", "error")
|
||||
return redirect(url_for("loading", next=url_for("services")))
|
||||
return redirect_flash_error("Missing OLD_SERVER_NAME parameter.", "services", True)
|
||||
|
||||
if "SERVER_NAME" not in variables:
|
||||
variables["SERVER_NAME"] = variables["OLD_SERVER_NAME"]
|
||||
|
|
@ -761,22 +760,20 @@ def services():
|
|||
del variables[variable]
|
||||
|
||||
if was_draft == is_draft and request.form["operation"] == "edit" and len(variables) == 1 and "SERVER_NAME" in variables and variables["SERVER_NAME"] == request.form.get("OLD_SERVER_NAME", ""):
|
||||
flash("The service was not edited because no values were changed.", "error")
|
||||
return redirect(url_for("loading", next=url_for("services")))
|
||||
return redirect_flash_error("The service was not edited because no values were changed.", "services", True)
|
||||
|
||||
elif request.form["operation"] == "new" and not variables:
|
||||
flash("The service was not created because all values had the default value.", "error")
|
||||
return redirect(url_for("loading", next=url_for("services")))
|
||||
return redirect_flash_error("The service was not created because all values had the default value.", "services", True)
|
||||
|
||||
error = app.config["CONFIG"].check_variables(variables)
|
||||
|
||||
if error:
|
||||
return redirect(url_for("loading", next=url_for("services")))
|
||||
return redirect_flash_error("The config variable checks returned error", "services", True)
|
||||
|
||||
# Delete
|
||||
elif request.form["operation"] == "delete":
|
||||
if "SERVER_NAME" not in request.form:
|
||||
flash("Missing SERVER_NAME parameter.", "error")
|
||||
return redirect(url_for("loading", next=url_for("services")))
|
||||
|
||||
is_request_params(["SERVER_NAME"], "services", True)
|
||||
|
||||
error = app.config["CONFIG"].check_variables({"SERVER_NAME": request.form["SERVER_NAME"]})
|
||||
|
||||
|
|
@ -860,13 +857,12 @@ def global_config():
|
|||
del variables[variable]
|
||||
|
||||
if not variables:
|
||||
flash("The global configuration was not edited because no values were changed.")
|
||||
return redirect(url_for("loading", next=url_for("global_config")))
|
||||
return redirect_flash_error("The global configuration was not edited because all values had the default value.", "global_config", True)
|
||||
|
||||
error = app.config["CONFIG"].check_variables(variables, True)
|
||||
|
||||
if error:
|
||||
return redirect(url_for("loading", next=url_for("global_config")))
|
||||
return redirect_flash_error("The global configuration variable checks returned error", "global_config", True)
|
||||
|
||||
# Reload instances
|
||||
app.config["RELOADING"] = True
|
||||
|
|
@ -898,14 +894,15 @@ def configs():
|
|||
if request.method == "POST":
|
||||
operation = ""
|
||||
|
||||
is_request_params(["operation"], "configs", True)
|
||||
|
||||
# Check operation
|
||||
if "operation" not in request.form or request.form["operation"] not in (
|
||||
if request.form["operation"] not in (
|
||||
"new",
|
||||
"edit",
|
||||
"delete",
|
||||
):
|
||||
flash("Missing operation parameter on /configs.", "error")
|
||||
return redirect(url_for("loading", next=url_for("configs")))
|
||||
return redirect_flash_error("Missing operation parameter on /configs.", "configs", True)
|
||||
|
||||
# Check variables
|
||||
variables = deepcopy(request.form.to_dict())
|
||||
|
|
@ -914,16 +911,15 @@ def configs():
|
|||
operation = app.config["CONFIGFILES"].check_path(variables["path"])
|
||||
|
||||
if operation:
|
||||
flash(operation, "error")
|
||||
return redirect(url_for("loading", next=url_for("configs")))
|
||||
return redirect_flash_error(operation, "configs", True)
|
||||
|
||||
if request.form["operation"] in ("new", "edit"):
|
||||
if not app.config["CONFIGFILES"].check_name(variables["name"]):
|
||||
flash(
|
||||
return redirect_flash_error(
|
||||
f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores, dots and hyphens (min 4 characters and max 64))",
|
||||
"error",
|
||||
"configs",
|
||||
True,
|
||||
)
|
||||
return redirect(url_for("loading", next=url_for("configs")))
|
||||
|
||||
if variables["type"] == "file":
|
||||
variables["name"] = f"{variables['name']}.conf"
|
||||
|
|
@ -956,8 +952,8 @@ def configs():
|
|||
)
|
||||
|
||||
if error:
|
||||
flash(operation, "error")
|
||||
return redirect(url_for("loading", next=url_for("configs")))
|
||||
return redirect_flash_error(operation, "configs", True)
|
||||
|
||||
else:
|
||||
operation, error = app.config["CONFIGFILES"].delete_path(variables["path"])
|
||||
|
||||
|
|
@ -968,8 +964,9 @@ def configs():
|
|||
flash(operation)
|
||||
|
||||
error = app.config["CONFIGFILES"].save_configs()
|
||||
|
||||
if error:
|
||||
flash("Couldn't save custom configs to database", "error")
|
||||
return redirect_flash_error("Couldn't save custom configs to disk", "configs", True)
|
||||
|
||||
return redirect(url_for("loading", next=url_for("configs")))
|
||||
|
||||
|
|
@ -1000,8 +997,7 @@ def plugins():
|
|||
del variables["csrf_token"]
|
||||
|
||||
if variables["type"] in ("core", "pro"):
|
||||
flash(f"Can't delete {variables['type']} plugin {variables['name']}", "error")
|
||||
return redirect(url_for("loading", next=url_for("plugins")))
|
||||
return redirect_flash_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True)
|
||||
|
||||
plugins = app.config["CONFIG"].get_plugins()
|
||||
for x, plugin in enumerate(deepcopy(plugins)):
|
||||
|
|
@ -1014,8 +1010,7 @@ def plugins():
|
|||
flash(f"Deleted plugin {variables['name']} successfully")
|
||||
else:
|
||||
if not tmp_ui_path.exists() or not listdir(str(tmp_ui_path)):
|
||||
flash("Please upload new plugins to reload plugins", "error")
|
||||
return redirect(url_for("loading", next=url_for("plugins")))
|
||||
return redirect_flash_error("Please upload new plugins to reload plugins", "plugins", True)
|
||||
|
||||
errors = 0
|
||||
files_count = 0
|
||||
|
|
@ -1334,6 +1329,9 @@ def custom_plugin(plugin: str):
|
|||
is_metrics_on = True
|
||||
break
|
||||
|
||||
# Check if the plugin is used
|
||||
|
||||
# Here we have specific cases for some plugins
|
||||
# {plugin_id: [[setting_name, setting_false], ...]}
|
||||
specific_cases = {
|
||||
"limit": [["USE_LIMIT_REQ", "no"], ["USE_LIMIT_CONN", "no"]],
|
||||
|
|
@ -1803,25 +1801,15 @@ def bans():
|
|||
|
||||
if request.method == "POST":
|
||||
# Check variables
|
||||
if not request.form:
|
||||
flash("Missing form data.", "error")
|
||||
return redirect(url_for("bans"))
|
||||
is_request_form("bans")
|
||||
|
||||
if "operation" not in request.form:
|
||||
flash("Operation unknown", "error")
|
||||
return redirect(url_for("bans"))
|
||||
|
||||
if "data" not in request.form:
|
||||
flash("No data to proceed", "error")
|
||||
return redirect(url_for("bans"))
|
||||
is_request_params(["operation", "data"], "bans")
|
||||
|
||||
try:
|
||||
data = json_loads(request.form["data"])
|
||||
assert isinstance(data, list)
|
||||
except BaseException:
|
||||
app.logger.exception(f"Couldn't load data: {request.form['data']}")
|
||||
flash("Data must be a list of dict", "error")
|
||||
return redirect(url_for("bans"))
|
||||
return redirect_flash_error("Data must be a list of dict", "bans", False, "exception")
|
||||
|
||||
if request.form["operation"] == "unban":
|
||||
for unban in data:
|
||||
|
|
@ -1872,8 +1860,7 @@ def bans():
|
|||
else:
|
||||
flash(f"Successfully banned {ban['ip']}")
|
||||
else:
|
||||
flash("Operation unknown", "error")
|
||||
return redirect(url_for("bans"))
|
||||
return redirect_flash_error("Operation unknown", "bans")
|
||||
|
||||
return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue