Add new endpoints for profile page in web UI

This commit is contained in:
Théophile Diot 2024-08-19 17:26:57 +01:00
parent 8ce44bc0fc
commit 569abbd37c
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
4 changed files with 193 additions and 155 deletions

View file

@ -355,12 +355,6 @@ def delete_plugin():
if DB.readonly:
return {"status": "ko", "message": "Database is in read-only mode"}, 403
verify_data_in_form(
data={"csrf_token": None},
err_message="Missing csrf_token parameter on /plugins/delete.",
redirect_url="plugins",
next=True,
)
verify_data_in_form(
data={"plugin_name": None},
err_message="Missing plugin name parameter on /plugins/delete.",

View file

@ -1,164 +1,23 @@
from base64 import b64encode
from json import dumps
from threading import Thread
from time import time
from flask import Blueprint, flash, redirect, render_template, request, url_for, session
from flask import Blueprint, Response, flash, redirect, render_template, request, url_for, session
from flask_login import current_user, login_required, logout_user
from builder.profile import profile_builder # type: ignore
from src.totp import totp as TOTP
from dependencies import BW_CONFIG, DATA, DB
from utils import LOGGER, USER_PASSWORD_RX, gen_password_hash
from dependencies import DB
from utils import USER_PASSWORD_RX, gen_password_hash
from pages.utils import handle_error, manage_bunkerweb, verify_data_in_form, wait_applying
from pages.utils import handle_error, verify_data_in_form
profile = Blueprint("profile", __name__)
@profile.route("/profile", methods=["GET", "POST"])
@profile.route("/profile", methods=["GET"])
@login_required
def profile_page():
totp_recovery_codes = None
if request.method == "POST":
if DB.readonly:
return handle_error("Database is in read-only mode", "profile")
verify_data_in_form(
data={"operation": ("username", "password", "totp", "activate-key")}, err_message="Invalid operation parameter.", redirect_url="profile"
)
if request.form["operation"] not in ("username", "password", "totp", "activate-key"):
return handle_error("Invalid operation parameter.", "profile")
if request.form["operation"] == "activate-key":
verify_data_in_form(data={"license": None}, err_message="Missing license for operation activate key on /account.", redirect_url="profile")
if len(request.form["license"]) == 0:
return handle_error("The license key is empty", "profile")
variables = {"PRO_LICENSE_KEY": request.form["license"]}
variables = BW_CONFIG.check_variables(variables, {"PRO_LICENSE_KEY": request.form["license"]})
if not variables:
return handle_error("The license key variable checks returned error", "profile", True)
# Force job to contact PRO API
# by setting the last check to None
metadata = DB.get_metadata()
metadata["last_pro_check"] = None
DB.set_pro_metadata(metadata)
curr_changes = DB.check_changes()
# Reload instances
def update_global_config(threaded: bool = False):
wait_applying()
if not manage_bunkerweb("global_config", variables, threaded=threaded):
message = "Checking license key to upgrade."
if threaded:
DATA["TO_FLASH"].append({"content": message, "type": "success"})
else:
flash(message)
DATA["PRO_LOADING"] = True
DATA["CONFIG_CHANGED"] = True
if any(curr_changes.values()):
DATA["RELOADING"] = True
DATA["LAST_RELOAD"] = time()
Thread(target=update_global_config, args=(True,)).start()
else:
update_global_config()
return redirect(url_for("profile.profile_page"))
verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /account.", redirect_url="profile")
if not current_user.check_password(request.form["curr_password"]):
return handle_error(f"The current password is incorrect. ({request.form['operation']})", "profile")
username = current_user.get_id()
password = request.form["curr_password"]
totp_secret = current_user.totp_secret
totp_recovery_codes = current_user.list_recovery_codes
if request.form["operation"] == "username":
verify_data_in_form(data={"admin_username": None}, err_message="Missing admin username parameter on /account.", redirect_url="profile")
if len(request.form["admin_username"]) > 256:
return handle_error("The admin username is too long. It must be less than 256 characters. (username)", "profile")
username = request.form["admin_username"]
session.clear()
logout_user()
if request.form["operation"] == "password":
verify_data_in_form(
data={"admin_password": None, "admin_password_check": None},
err_message="Missing admin password or confirm password parameter on /account.",
redirect_url="profile",
)
if request.form["admin_password"] != request.form["admin_password_check"]:
return handle_error("The passwords do not match. (password)", "profile")
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
return handle_error(
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)",
"profile",
)
password = request.form["admin_password"]
session.clear()
logout_user()
if request.form["operation"] == "totp":
verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /account.", redirect_url="profile")
if not TOTP.verify_totp(
request.form["totp_token"], totp_secret=session.get("tmp_totp_secret", ""), user=current_user
) and not TOTP.verify_recovery_code(request.form["totp_token"], user=current_user):
return handle_error("The totp token is invalid. (totp)", "profile")
session["totp_validated"] = not bool(current_user.totp_secret)
totp_secret = None if bool(current_user.totp_secret) else session.pop("tmp_totp_secret", "")
if totp_secret and totp_secret != current_user.totp_secret:
totp_recovery_codes = TOTP.generate_recovery_codes()
flash(
"The recovery codes have been refreshed.\nPlease save them in a safe place. They will not be displayed again."
+ "\n".join(totp_recovery_codes),
"info",
) # TODO: Remove this when we have a way to display the recovery codes
LOGGER.debug(f"totp recovery codes: {totp_recovery_codes or current_user.list_recovery_codes}")
ret = DB.update_ui_user(
username,
gen_password_hash(password),
totp_secret,
totp_recovery_codes=totp_recovery_codes or current_user.list_recovery_codes,
method=current_user.method if request.form["operation"] == "totp" else "ui",
)
if ret:
return handle_error(f"Couldn't update the admin user in the database: {ret}", "profile", False, "error")
flash(
(
f"The {request.form['operation']} has been successfully updated."
if request.form["operation"] != "totp"
else f"The two-factor authentication was successfully {'disabled' if bool(current_user.totp_secret) else 'enabled'}."
),
)
return redirect(url_for("profile.profile_page" if request.form["operation"] == "totp" else "login"))
totp_qr_image = ""
if not bool(current_user.totp_secret):
session["tmp_totp_secret"] = TOTP.generate_totp_secret()
@ -169,11 +28,188 @@ def profile_page():
{
"is_totp": bool(current_user.totp_secret),
"totp_image": totp_qr_image,
"totp_recovery_codes": totp_recovery_codes or current_user.list_recovery_codes,
"is_recovery_refreshed": bool(totp_recovery_codes),
"totp_recovery_codes": current_user.list_recovery_codes,
"is_recovery_refreshed": False,
"totp_secret": TOTP.get_totp_pretty_key(session.get("tmp_totp_secret", "")),
},
)
# TODO: Show user backup codes after TOTP refresh + add refresh feature
return render_template("profile.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@profile.route("/profile/totp-refresh", methods=["POST"])
@login_required
def totp_refresh():
if DB.readonly:
return handle_error("Database is in read-only mode", "profile")
if not bool(current_user.totp_secret):
return handle_error("Two-factor authentication is not enabled.", "profile")
verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /profile/totp-refresh.", redirect_url="profile")
if not current_user.check_password(request.form["curr_password"]):
return handle_error("The current password is incorrect.", "profile")
totp_recovery_codes = TOTP.generate_recovery_codes()
ret = DB.refresh_ui_user_recovery_codes(current_user.get_id(), totp_recovery_codes)
if ret:
return handle_error(f"Couldn't refresh the recovery codes in the database: {ret}", "profile")
flash("The recovery codes have been successfully refreshed. The old ones are no longer valid.")
builder = profile_builder(
current_user if current_user.is_authenticated else None,
{
"is_totp": True,
"totp_image": "",
"totp_recovery_codes": totp_recovery_codes,
"is_recovery_refreshed": True,
"totp_secret": TOTP.get_totp_pretty_key(current_user.totp_secret),
},
)
return render_template("profile.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@profile.route("/profile/totp-disable", methods=["POST"])
@login_required
def totp_disable():
if DB.readonly:
return handle_error("Database is in read-only mode", "profile")
if not bool(current_user.totp_secret):
return handle_error("Two-factor authentication is not enabled.", "profile")
verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /profile/totp-disable.", redirect_url="profile")
if not current_user.check_password(request.form["curr_password"]):
return handle_error("The current password is incorrect.", "profile")
verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /profile/totp-enable.", redirect_url="profile")
if not TOTP.verify_totp(request.form["totp_token"], totp_secret=session.get("tmp_totp_secret", ""), user=current_user) and not TOTP.verify_recovery_code(
request.form["totp_token"], user=current_user
):
return handle_error("The totp token is invalid.", "profile")
ret = DB.update_ui_user(current_user.get_id(), current_user.password, None, method=current_user.method)
if ret:
return handle_error(f"Couldn't disable the two-factor authentication in the database: {ret}", "profile")
session["totp_validated"] = False
flash("The two-factor authentication has been successfully disabled.")
return redirect(url_for("profile.profile_page", message="Disabling two-factor authentication."))
@profile.route("/profile/totp-enable", methods=["POST"])
@login_required
def totp_enable():
if DB.readonly:
return handle_error("Database is in read-only mode", "profile")
if bool(current_user.totp_secret):
return handle_error("Two-factor authentication is already enabled.", "profile")
verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /profile/totp-enable.", redirect_url="profile")
verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /profile/totp-enable.", redirect_url="profile")
if not current_user.check_password(request.form["curr_password"]):
return handle_error("The current password is incorrect.", "profile")
if not TOTP.verify_totp(request.form["totp_token"], totp_secret=session.get("tmp_totp_secret", ""), user=current_user) and not TOTP.verify_recovery_code(
request.form["totp_token"], user=current_user
):
return handle_error("The totp token is invalid.", "profile")
totp_recovery_codes = TOTP.generate_recovery_codes()
totp_secret = session.pop("tmp_totp_secret", "")
ret = DB.update_ui_user(
current_user.get_id(),
current_user.password,
totp_secret,
totp_recovery_codes=totp_recovery_codes,
method=current_user.method,
)
if ret:
return handle_error(f"Couldn't enable the two-factor authentication in the database: {ret}", "profile")
session["totp_validated"] = True
flash("The two-factor authentication has been successfully enabled.")
builder = profile_builder(
current_user if current_user.is_authenticated else None,
{
"is_totp": True,
"totp_image": "",
"totp_recovery_codes": totp_recovery_codes,
"is_recovery_refreshed": True,
"totp_secret": TOTP.get_totp_pretty_key(totp_secret),
},
)
return render_template("profile.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@profile.route("/profile/edit/<string:field>", methods=["POST"])
@login_required
def edit_profile(field: str):
if field not in ("email", "password"):
return Response(status=404)
if DB.readonly:
return handle_error("Database is in read-only mode", "profile")
verify_data_in_form(data={"curr_password": None}, err_message=f"Missing current password parameter on /profile/edit/{field}.", redirect_url="profile")
if not current_user.check_password(request.form["curr_password"]):
return handle_error("The current password is incorrect.", "profile")
user_data = {
"username": current_user.get_id(),
"password": current_user.password,
"email": current_user.email,
"totp_secret": current_user.totp_secret,
"method": current_user.method,
}
if field == "email":
verify_data_in_form(data={"email": None}, err_message="Missing email parameter on /profile/edit/email.", redirect_url="profile")
if len(request.form["email"]) > 256:
return handle_error("The email is too long. It must be less than 256 characters.", "profile")
user_data["email"] = request.form["email"]
elif field == "password":
verify_data_in_form(
data={"new_password": None, "new_password_check": None},
err_message="Missing new password or confirm password parameter on /profile/edit/password.",
redirect_url="profile",
)
verify_data_in_form(
data={"new_password_confirm": None},
err_message="Missing new password confirm parameter on /profile/edit/password.",
redirect_url="profile",
)
if request.form["new_password"] != request.form["new_password_confirm"]:
return handle_error("The passwords do not match the confirm password.", "profile")
elif not USER_PASSWORD_RX.match(request.form["new_password"]):
return handle_error(
"The new password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).",
"profile",
)
user_data["password"] = gen_password_hash(request.form["new_password"])
ret = DB.update_ui_user(**user_data)
if ret:
return handle_error(f"Couldn't update the admin user in the database: {ret}", "profile")
if field == "password":
session.clear()
logout_user()
flash(f"The {field} has been successfully updated.")
return redirect(url_for("profile.profile_page" if field == "email" else "login.login_page"))

View file

@ -226,7 +226,14 @@ class UIDatabase(Database):
return ""
def update_ui_user(
self, username: str, password: bytes, totp_secret: Optional[str], *, totp_recovery_codes: Optional[List[str]] = None, method: str = "manual"
self,
username: str,
password: bytes,
totp_secret: Optional[str],
*,
email: Optional[str] = None,
totp_recovery_codes: Optional[List[str]] = None,
method: str = "manual",
) -> str:
"""Update ui user."""
totp_changed = False
@ -240,6 +247,7 @@ class UIDatabase(Database):
totp_changed = user.totp_secret != totp_secret
user.email = email
user.password = password.decode("utf-8")
user.totp_secret = totp_secret
user.method = method