mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Add new endpoints for profile page in web UI
This commit is contained in:
parent
8ce44bc0fc
commit
569abbd37c
4 changed files with 193 additions and 155 deletions
|
|
@ -355,12 +355,6 @@ def delete_plugin():
|
|||
if DB.readonly:
|
||||
return {"status": "ko", "message": "Database is in read-only mode"}, 403
|
||||
|
||||
verify_data_in_form(
|
||||
data={"csrf_token": None},
|
||||
err_message="Missing csrf_token parameter on /plugins/delete.",
|
||||
redirect_url="plugins",
|
||||
next=True,
|
||||
)
|
||||
verify_data_in_form(
|
||||
data={"plugin_name": None},
|
||||
err_message="Missing plugin name parameter on /plugins/delete.",
|
||||
|
|
|
|||
|
|
@ -1,164 +1,23 @@
|
|||
from base64 import b64encode
|
||||
from json import dumps
|
||||
from threading import Thread
|
||||
from time import time
|
||||
from flask import Blueprint, flash, redirect, render_template, request, url_for, session
|
||||
from flask import Blueprint, Response, flash, redirect, render_template, request, url_for, session
|
||||
from flask_login import current_user, login_required, logout_user
|
||||
|
||||
from builder.profile import profile_builder # type: ignore
|
||||
|
||||
from src.totp import totp as TOTP
|
||||
|
||||
from dependencies import BW_CONFIG, DATA, DB
|
||||
from utils import LOGGER, USER_PASSWORD_RX, gen_password_hash
|
||||
from dependencies import DB
|
||||
from utils import USER_PASSWORD_RX, gen_password_hash
|
||||
|
||||
from pages.utils import handle_error, manage_bunkerweb, verify_data_in_form, wait_applying
|
||||
from pages.utils import handle_error, verify_data_in_form
|
||||
|
||||
profile = Blueprint("profile", __name__)
|
||||
|
||||
|
||||
@profile.route("/profile", methods=["GET", "POST"])
|
||||
@profile.route("/profile", methods=["GET"])
|
||||
@login_required
|
||||
def profile_page():
|
||||
totp_recovery_codes = None
|
||||
if request.method == "POST":
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "profile")
|
||||
|
||||
verify_data_in_form(
|
||||
data={"operation": ("username", "password", "totp", "activate-key")}, err_message="Invalid operation parameter.", redirect_url="profile"
|
||||
)
|
||||
|
||||
if request.form["operation"] not in ("username", "password", "totp", "activate-key"):
|
||||
return handle_error("Invalid operation parameter.", "profile")
|
||||
|
||||
if request.form["operation"] == "activate-key":
|
||||
verify_data_in_form(data={"license": None}, err_message="Missing license for operation activate key on /account.", redirect_url="profile")
|
||||
|
||||
if len(request.form["license"]) == 0:
|
||||
return handle_error("The license key is empty", "profile")
|
||||
|
||||
variables = {"PRO_LICENSE_KEY": request.form["license"]}
|
||||
|
||||
variables = BW_CONFIG.check_variables(variables, {"PRO_LICENSE_KEY": request.form["license"]})
|
||||
|
||||
if not variables:
|
||||
return handle_error("The license key variable checks returned error", "profile", True)
|
||||
|
||||
# Force job to contact PRO API
|
||||
# by setting the last check to None
|
||||
metadata = DB.get_metadata()
|
||||
metadata["last_pro_check"] = None
|
||||
DB.set_pro_metadata(metadata)
|
||||
|
||||
curr_changes = DB.check_changes()
|
||||
|
||||
# Reload instances
|
||||
def update_global_config(threaded: bool = False):
|
||||
wait_applying()
|
||||
|
||||
if not manage_bunkerweb("global_config", variables, threaded=threaded):
|
||||
message = "Checking license key to upgrade."
|
||||
if threaded:
|
||||
DATA["TO_FLASH"].append({"content": message, "type": "success"})
|
||||
else:
|
||||
flash(message)
|
||||
|
||||
DATA["PRO_LOADING"] = True
|
||||
DATA["CONFIG_CHANGED"] = True
|
||||
|
||||
if any(curr_changes.values()):
|
||||
DATA["RELOADING"] = True
|
||||
DATA["LAST_RELOAD"] = time()
|
||||
Thread(target=update_global_config, args=(True,)).start()
|
||||
else:
|
||||
update_global_config()
|
||||
|
||||
return redirect(url_for("profile.profile_page"))
|
||||
|
||||
verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /account.", redirect_url="profile")
|
||||
|
||||
if not current_user.check_password(request.form["curr_password"]):
|
||||
return handle_error(f"The current password is incorrect. ({request.form['operation']})", "profile")
|
||||
|
||||
username = current_user.get_id()
|
||||
password = request.form["curr_password"]
|
||||
totp_secret = current_user.totp_secret
|
||||
totp_recovery_codes = current_user.list_recovery_codes
|
||||
|
||||
if request.form["operation"] == "username":
|
||||
verify_data_in_form(data={"admin_username": None}, err_message="Missing admin username parameter on /account.", redirect_url="profile")
|
||||
|
||||
if len(request.form["admin_username"]) > 256:
|
||||
return handle_error("The admin username is too long. It must be less than 256 characters. (username)", "profile")
|
||||
|
||||
username = request.form["admin_username"]
|
||||
|
||||
session.clear()
|
||||
logout_user()
|
||||
|
||||
if request.form["operation"] == "password":
|
||||
verify_data_in_form(
|
||||
data={"admin_password": None, "admin_password_check": None},
|
||||
err_message="Missing admin password or confirm password parameter on /account.",
|
||||
redirect_url="profile",
|
||||
)
|
||||
|
||||
if request.form["admin_password"] != request.form["admin_password_check"]:
|
||||
return handle_error("The passwords do not match. (password)", "profile")
|
||||
|
||||
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
|
||||
return handle_error(
|
||||
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)",
|
||||
"profile",
|
||||
)
|
||||
|
||||
password = request.form["admin_password"]
|
||||
|
||||
session.clear()
|
||||
logout_user()
|
||||
|
||||
if request.form["operation"] == "totp":
|
||||
verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /account.", redirect_url="profile")
|
||||
|
||||
if not TOTP.verify_totp(
|
||||
request.form["totp_token"], totp_secret=session.get("tmp_totp_secret", ""), user=current_user
|
||||
) and not TOTP.verify_recovery_code(request.form["totp_token"], user=current_user):
|
||||
return handle_error("The totp token is invalid. (totp)", "profile")
|
||||
|
||||
session["totp_validated"] = not bool(current_user.totp_secret)
|
||||
totp_secret = None if bool(current_user.totp_secret) else session.pop("tmp_totp_secret", "")
|
||||
|
||||
if totp_secret and totp_secret != current_user.totp_secret:
|
||||
totp_recovery_codes = TOTP.generate_recovery_codes()
|
||||
flash(
|
||||
"The recovery codes have been refreshed.\nPlease save them in a safe place. They will not be displayed again."
|
||||
+ "\n".join(totp_recovery_codes),
|
||||
"info",
|
||||
) # TODO: Remove this when we have a way to display the recovery codes
|
||||
|
||||
LOGGER.debug(f"totp recovery codes: {totp_recovery_codes or current_user.list_recovery_codes}")
|
||||
|
||||
ret = DB.update_ui_user(
|
||||
username,
|
||||
gen_password_hash(password),
|
||||
totp_secret,
|
||||
totp_recovery_codes=totp_recovery_codes or current_user.list_recovery_codes,
|
||||
method=current_user.method if request.form["operation"] == "totp" else "ui",
|
||||
)
|
||||
if ret:
|
||||
return handle_error(f"Couldn't update the admin user in the database: {ret}", "profile", False, "error")
|
||||
|
||||
flash(
|
||||
(
|
||||
f"The {request.form['operation']} has been successfully updated."
|
||||
if request.form["operation"] != "totp"
|
||||
else f"The two-factor authentication was successfully {'disabled' if bool(current_user.totp_secret) else 'enabled'}."
|
||||
),
|
||||
)
|
||||
|
||||
return redirect(url_for("profile.profile_page" if request.form["operation"] == "totp" else "login"))
|
||||
|
||||
totp_qr_image = ""
|
||||
if not bool(current_user.totp_secret):
|
||||
session["tmp_totp_secret"] = TOTP.generate_totp_secret()
|
||||
|
|
@ -169,11 +28,188 @@ def profile_page():
|
|||
{
|
||||
"is_totp": bool(current_user.totp_secret),
|
||||
"totp_image": totp_qr_image,
|
||||
"totp_recovery_codes": totp_recovery_codes or current_user.list_recovery_codes,
|
||||
"is_recovery_refreshed": bool(totp_recovery_codes),
|
||||
"totp_recovery_codes": current_user.list_recovery_codes,
|
||||
"is_recovery_refreshed": False,
|
||||
"totp_secret": TOTP.get_totp_pretty_key(session.get("tmp_totp_secret", "")),
|
||||
},
|
||||
)
|
||||
|
||||
# TODO: Show user backup codes after TOTP refresh + add refresh feature
|
||||
return render_template("profile.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
|
||||
|
||||
|
||||
@profile.route("/profile/totp-refresh", methods=["POST"])
|
||||
@login_required
|
||||
def totp_refresh():
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "profile")
|
||||
|
||||
if not bool(current_user.totp_secret):
|
||||
return handle_error("Two-factor authentication is not enabled.", "profile")
|
||||
|
||||
verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /profile/totp-refresh.", redirect_url="profile")
|
||||
|
||||
if not current_user.check_password(request.form["curr_password"]):
|
||||
return handle_error("The current password is incorrect.", "profile")
|
||||
|
||||
totp_recovery_codes = TOTP.generate_recovery_codes()
|
||||
|
||||
ret = DB.refresh_ui_user_recovery_codes(current_user.get_id(), totp_recovery_codes)
|
||||
if ret:
|
||||
return handle_error(f"Couldn't refresh the recovery codes in the database: {ret}", "profile")
|
||||
|
||||
flash("The recovery codes have been successfully refreshed. The old ones are no longer valid.")
|
||||
builder = profile_builder(
|
||||
current_user if current_user.is_authenticated else None,
|
||||
{
|
||||
"is_totp": True,
|
||||
"totp_image": "",
|
||||
"totp_recovery_codes": totp_recovery_codes,
|
||||
"is_recovery_refreshed": True,
|
||||
"totp_secret": TOTP.get_totp_pretty_key(current_user.totp_secret),
|
||||
},
|
||||
)
|
||||
return render_template("profile.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
|
||||
|
||||
|
||||
@profile.route("/profile/totp-disable", methods=["POST"])
|
||||
@login_required
|
||||
def totp_disable():
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "profile")
|
||||
|
||||
if not bool(current_user.totp_secret):
|
||||
return handle_error("Two-factor authentication is not enabled.", "profile")
|
||||
|
||||
verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /profile/totp-disable.", redirect_url="profile")
|
||||
|
||||
if not current_user.check_password(request.form["curr_password"]):
|
||||
return handle_error("The current password is incorrect.", "profile")
|
||||
|
||||
verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /profile/totp-enable.", redirect_url="profile")
|
||||
|
||||
if not TOTP.verify_totp(request.form["totp_token"], totp_secret=session.get("tmp_totp_secret", ""), user=current_user) and not TOTP.verify_recovery_code(
|
||||
request.form["totp_token"], user=current_user
|
||||
):
|
||||
return handle_error("The totp token is invalid.", "profile")
|
||||
|
||||
ret = DB.update_ui_user(current_user.get_id(), current_user.password, None, method=current_user.method)
|
||||
if ret:
|
||||
return handle_error(f"Couldn't disable the two-factor authentication in the database: {ret}", "profile")
|
||||
|
||||
session["totp_validated"] = False
|
||||
|
||||
flash("The two-factor authentication has been successfully disabled.")
|
||||
return redirect(url_for("profile.profile_page", message="Disabling two-factor authentication."))
|
||||
|
||||
|
||||
@profile.route("/profile/totp-enable", methods=["POST"])
|
||||
@login_required
|
||||
def totp_enable():
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "profile")
|
||||
|
||||
if bool(current_user.totp_secret):
|
||||
return handle_error("Two-factor authentication is already enabled.", "profile")
|
||||
|
||||
verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /profile/totp-enable.", redirect_url="profile")
|
||||
verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /profile/totp-enable.", redirect_url="profile")
|
||||
|
||||
if not current_user.check_password(request.form["curr_password"]):
|
||||
return handle_error("The current password is incorrect.", "profile")
|
||||
|
||||
if not TOTP.verify_totp(request.form["totp_token"], totp_secret=session.get("tmp_totp_secret", ""), user=current_user) and not TOTP.verify_recovery_code(
|
||||
request.form["totp_token"], user=current_user
|
||||
):
|
||||
return handle_error("The totp token is invalid.", "profile")
|
||||
|
||||
totp_recovery_codes = TOTP.generate_recovery_codes()
|
||||
totp_secret = session.pop("tmp_totp_secret", "")
|
||||
|
||||
ret = DB.update_ui_user(
|
||||
current_user.get_id(),
|
||||
current_user.password,
|
||||
totp_secret,
|
||||
totp_recovery_codes=totp_recovery_codes,
|
||||
method=current_user.method,
|
||||
)
|
||||
if ret:
|
||||
return handle_error(f"Couldn't enable the two-factor authentication in the database: {ret}", "profile")
|
||||
|
||||
session["totp_validated"] = True
|
||||
|
||||
flash("The two-factor authentication has been successfully enabled.")
|
||||
builder = profile_builder(
|
||||
current_user if current_user.is_authenticated else None,
|
||||
{
|
||||
"is_totp": True,
|
||||
"totp_image": "",
|
||||
"totp_recovery_codes": totp_recovery_codes,
|
||||
"is_recovery_refreshed": True,
|
||||
"totp_secret": TOTP.get_totp_pretty_key(totp_secret),
|
||||
},
|
||||
)
|
||||
return render_template("profile.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
|
||||
|
||||
|
||||
@profile.route("/profile/edit/<string:field>", methods=["POST"])
|
||||
@login_required
|
||||
def edit_profile(field: str):
|
||||
if field not in ("email", "password"):
|
||||
return Response(status=404)
|
||||
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "profile")
|
||||
|
||||
verify_data_in_form(data={"curr_password": None}, err_message=f"Missing current password parameter on /profile/edit/{field}.", redirect_url="profile")
|
||||
|
||||
if not current_user.check_password(request.form["curr_password"]):
|
||||
return handle_error("The current password is incorrect.", "profile")
|
||||
|
||||
user_data = {
|
||||
"username": current_user.get_id(),
|
||||
"password": current_user.password,
|
||||
"email": current_user.email,
|
||||
"totp_secret": current_user.totp_secret,
|
||||
"method": current_user.method,
|
||||
}
|
||||
|
||||
if field == "email":
|
||||
verify_data_in_form(data={"email": None}, err_message="Missing email parameter on /profile/edit/email.", redirect_url="profile")
|
||||
|
||||
if len(request.form["email"]) > 256:
|
||||
return handle_error("The email is too long. It must be less than 256 characters.", "profile")
|
||||
|
||||
user_data["email"] = request.form["email"]
|
||||
elif field == "password":
|
||||
verify_data_in_form(
|
||||
data={"new_password": None, "new_password_check": None},
|
||||
err_message="Missing new password or confirm password parameter on /profile/edit/password.",
|
||||
redirect_url="profile",
|
||||
)
|
||||
verify_data_in_form(
|
||||
data={"new_password_confirm": None},
|
||||
err_message="Missing new password confirm parameter on /profile/edit/password.",
|
||||
redirect_url="profile",
|
||||
)
|
||||
|
||||
if request.form["new_password"] != request.form["new_password_confirm"]:
|
||||
return handle_error("The passwords do not match the confirm password.", "profile")
|
||||
elif not USER_PASSWORD_RX.match(request.form["new_password"]):
|
||||
return handle_error(
|
||||
"The new password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).",
|
||||
"profile",
|
||||
)
|
||||
|
||||
user_data["password"] = gen_password_hash(request.form["new_password"])
|
||||
|
||||
ret = DB.update_ui_user(**user_data)
|
||||
if ret:
|
||||
return handle_error(f"Couldn't update the admin user in the database: {ret}", "profile")
|
||||
|
||||
if field == "password":
|
||||
session.clear()
|
||||
logout_user()
|
||||
|
||||
flash(f"The {field} has been successfully updated.")
|
||||
return redirect(url_for("profile.profile_page" if field == "email" else "login.login_page"))
|
||||
|
|
|
|||
|
|
@ -226,7 +226,14 @@ class UIDatabase(Database):
|
|||
return ""
|
||||
|
||||
def update_ui_user(
|
||||
self, username: str, password: bytes, totp_secret: Optional[str], *, totp_recovery_codes: Optional[List[str]] = None, method: str = "manual"
|
||||
self,
|
||||
username: str,
|
||||
password: bytes,
|
||||
totp_secret: Optional[str],
|
||||
*,
|
||||
email: Optional[str] = None,
|
||||
totp_recovery_codes: Optional[List[str]] = None,
|
||||
method: str = "manual",
|
||||
) -> str:
|
||||
"""Update ui user."""
|
||||
totp_changed = False
|
||||
|
|
@ -240,6 +247,7 @@ class UIDatabase(Database):
|
|||
|
||||
totp_changed = user.totp_secret != totp_secret
|
||||
|
||||
user.email = email
|
||||
user.password = password.decode("utf-8")
|
||||
user.totp_secret = totp_secret
|
||||
user.method = method
|
||||
|
|
|
|||
Loading…
Reference in a new issue