From a8bfd03368213194fd61d36db61aeba7a5c12403 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Diot?= Date: Tue, 2 Jan 2024 16:20:18 +0000 Subject: [PATCH] Update and fix the whole user management of the web UI --- src/ui/main.py | 121 ++++++++++++++++++++++----------------------- src/ui/src/User.py | 61 +++++++++++++++++++++-- 2 files changed, 116 insertions(+), 66 deletions(-) diff --git a/src/ui/main.py b/src/ui/main.py index cdce3d0d5..500874099 100755 --- a/src/ui/main.py +++ b/src/ui/main.py @@ -27,23 +27,8 @@ from copy import deepcopy from datetime import datetime, timedelta, timezone from dateutil.parser import parse as dateutil_parse from docker import DockerClient -from docker.errors import ( - NotFound as docker_NotFound, - APIError as docker_APIError, - DockerException, -) -from flask import ( - Flask, - Response, - flash, - jsonify, - redirect, - render_template, - request, - send_file, - session, - url_for, -) +from docker.errors import NotFound as docker_NotFound, APIError as docker_APIError, DockerException +from flask import Flask, Response, flash, jsonify, redirect, render_template, request, send_file, session, url_for from flask_login import current_user, LoginManager, login_required, login_user, logout_user from flask_wtf.csrf import CSRFProtect, CSRFError, generate_csrf from glob import glob @@ -71,7 +56,7 @@ from src.Instances import Instances from src.ConfigFiles import ConfigFiles from src.Config import Config from src.ReverseProxied import ReverseProxied -from src.User import User +from src.User import AnonymousUser, User from utils import check_settings, get_b64encoded_qr_image, path_to_dict from Database import Database # type: ignore @@ -123,6 +108,7 @@ app.logger.setLevel(gunicorn_logger.level) login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "login" +login_manager.anonymous_user = AnonymousUser PLUGIN_KEYS = ["id", "name", "description", "version", "stream", "settings"] INTEGRATION = "Linux" @@ -181,7 +167,7 @@ if USER: updated = True if updated: - ret = db.update_ui_user(USER.get_id(), USER.password_hash, USER.is_two_factor_enabled, app.config["USER"].secret_token) + ret = db.update_ui_user(USER.get_id(), USER.password_hash, USER.is_two_factor_enabled, USER.secret_token) if ret: app.logger.error(f"Couldn't update the admin user in the database: {ret}") stop(1) @@ -224,6 +210,7 @@ try: LAST_RELOAD=0, TO_FLASH=[], DARK_MODE=False, + CURRENT_TOTP_TOKEN=None, ) except FileNotFoundError as e: app.logger.error(repr(e), e.filename) @@ -322,7 +309,12 @@ def set_csp_header(response): @login_manager.user_loader def load_user(user_id): - return app.config["USER"] if app.config["USER"] and user_id == app.config["USER"].get_id() else None + db_user = db.get_ui_user() + if not db_user: + app.logger.warning("Couldn't get the admin user from the database.") + return None + user = User(**db_user) + return user if user_id == user.get_id() else None @app.errorhandler(CSRFError) @@ -338,14 +330,14 @@ def handle_csrf_error(_): flash("Wrong CSRF token !", "error") if not app.config["USER"]: return render_template("setup.html"), 403 - return render_template("login.html", is_totp=app.config["USER"].is_two_factor_enabled), 403 + return render_template("login.html", is_totp=current_user.is_two_factor_enabled), 403 @app.before_request def before_request(): - if app.config["USER"] and current_user.is_authenticated: + if current_user.is_authenticated: passed = True - if not session.get("totp_validated", False) and app.config["USER"].is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")): + if not session.get("totp_validated", False) and current_user.is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")): return redirect(url_for("totp", next=request.form.get("next"))) elif session.get("ip") != request.remote_addr: passed = False @@ -485,14 +477,14 @@ def totp(): flash("Missing token parameter.", "error") return redirect(url_for("totp")) - if not app.config["USER"].check_otp(request.form["totp_token"]): + if not current_user.check_otp(request.form["totp_token"]): flash("The token is invalid.", "error") return redirect(url_for("totp")) session["totp_validated"] = True redirect(url_for("loading", next=request.form.get("next") or url_for("home"))) - if app.config["USER"] and (not app.config["USER"].is_two_factor_enabled or session.get("totp_validated", False)): + if not current_user.is_two_factor_enabled or session.get("totp_validated", False): return redirect(url_for("home")) return render_template("totp.html", dark_mode=app.config["DARK_MODE"]) @@ -569,7 +561,7 @@ def profile(): error = False if "curr_password" in request.form: - if not app.config["USER"].check_password(request.form["curr_password"]): + if not current_user.check_password(request.form["curr_password"]): flash("The current password is incorrect.", "error") error = True @@ -598,12 +590,12 @@ def profile(): if error: return redirect(url_for("profile")) - app.config["USER"] = User( - request.form.get("admin_username") or app.config["USER"].get_id(), + user = User( + request.form.get("admin_username") or current_user.get_id(), request.form.get("admin_password") or request.form["curr_password"], - is_two_factor_enabled=app.config["USER"].is_two_factor_enabled, - secret_token=app.config["USER"].secret_token, - method=app.config["USER"].method, + is_two_factor_enabled=current_user.is_two_factor_enabled, + secret_token=current_user.secret_token, + method=current_user.method, ) session.clear() @@ -613,57 +605,54 @@ def profile(): flash("Missing totp_token parameter.", "error") return redirect(url_for("profile")) - if not app.config["USER"].check_password(request.form.get("totp_password", "")): + if not current_user.check_password(request.form.get("totp_password", "")): flash("The current password is incorrect.", "error") error = True - if not app.config["USER"].check_otp(request.form["totp_token"]): + if not current_user.check_otp(request.form["totp_token"], secret=app.config["CURRENT_TOTP_TOKEN"]): flash("The token is invalid.", "error") error = True - app.logger.warning(request.form["totp_password"]) - if error: return redirect(url_for("profile")) - app.logger.warning("TOTP validated") + session["totp_validated"] = not current_user.is_two_factor_enabled - session["totp_validated"] = not app.config["USER"].is_two_factor_enabled - - if app.config["USER"].is_two_factor_enabled: - app.config["USER"].secret_token = None - - app.config["USER"].is_two_factor_enabled = session["totp_validated"] - - app.logger.warning(app.config["USER"]) + user = User( + current_user.get_id(), + request.form["totp_password"], + is_two_factor_enabled=session["totp_validated"], + secret_token=None if current_user.is_two_factor_enabled else app.config["CURRENT_TOTP_TOKEN"], + method=current_user.method, + ) + app.config["CURRENT_TOTP_TOKEN"] = None else: flash("Missing form data.", "error") return redirect(url_for("profile")) ret = db.update_ui_user( - app.config["USER"].get_id(), - app.config["USER"].password_hash, - app.config["USER"].is_two_factor_enabled, - app.config["USER"].secret_token if app.config["USER"].is_two_factor_enabled else None, + user.get_id(), + user.password_hash, + user.is_two_factor_enabled, + user.secret_token if user.is_two_factor_enabled else None, ) if ret: app.logger.error(f"Couldn't update the admin user in the database: {ret}") flash(f"Couldn't update the admin user in the database: {ret}", "error") return redirect(url_for("profile")) - app.logger.warning("User updated") - return redirect(url_for("profile")) secret_token = "" totp_qr_image = "" - if not app.config["USER"].is_two_factor_enabled: - app.config["USER"].refresh_totp() - secret_token = app.config["USER"].secret_token - totp_qr_image = get_b64encoded_qr_image(app.config["USER"].get_authentication_setup_uri()) + if not current_user.is_two_factor_enabled: + current_user.refresh_totp() + secret_token = current_user.secret_token + totp_qr_image = get_b64encoded_qr_image(current_user.get_authentication_setup_uri()) + app.config["CURRENT_TOTP_TOKEN"] = secret_token - return render_template("profile.html", username=app.config["USER"].get_id(), is_totp=app.config["USER"].is_two_factor_enabled, secret_token=secret_token, totp_qr_image=totp_qr_image, dark_mode=app.config["DARK_MODE"]) + return render_template("profile.html", username=current_user.get_id(), is_totp=current_user.is_two_factor_enabled, secret_token=secret_token, totp_qr_image=totp_qr_image, dark_mode=app.config["DARK_MODE"]) @app.route("/instances", methods=["GET", "POST"]) @@ -1662,15 +1651,26 @@ def jobs_download(): @app.route("/login", methods=["GET", "POST"]) def login(): + if not app.config["USER"]: + return redirect(url_for("setup")) + elif current_user.is_authenticated: # type: ignore + return redirect(url_for("home")) + fail = False if request.method == "POST" and "username" in request.form and "password" in request.form: app.logger.warning(f"Login attempt from {request.remote_addr} with username \"{request.form['username']}\"") - if app.config["USER"].get_id() == request.form["username"] and app.config["USER"].check_password(request.form["password"]): + db_user = db.get_ui_user() + if not db_user: + app.logger.error("Couldn't get user from database") + stop(1) + user = User(**db_user) + + if user.get_id() == request.form["username"] and user.check_password(request.form["password"]): # log the user in session["ip"] = request.remote_addr session["user_agent"] = request.headers.get("User-Agent") session["totp_validated"] = False - login_user(app.config["USER"], duration=timedelta(hours=1)) + login_user(user, duration=timedelta(hours=1)) # redirect him to the page he originally wanted or to the home page return redirect(url_for("loading", next=request.form.get("next") or url_for("home"))) @@ -1678,13 +1678,8 @@ def login(): flash("Invalid username or password", "error") fail = True - if not app.config["USER"]: - return redirect(url_for("setup")) - elif current_user.is_authenticated: # type: ignore - return redirect(url_for("home")) - kwargs = { - "is_totp": app.config["USER"].is_two_factor_enabled, + "is_totp": current_user.is_two_factor_enabled, } | ({"error": "Invalid username or password"} if fail else {}) return render_template("login.html", **kwargs), 401 if fail else 200 diff --git a/src/ui/src/User.py b/src/ui/src/User.py index 6b94673b1..237c60c1f 100644 --- a/src/ui/src/User.py +++ b/src/ui/src/User.py @@ -3,7 +3,7 @@ from typing import Optional from bcrypt import checkpw, hashpw, gensalt -from flask_login import UserMixin +from flask_login import AnonymousUserMixin, UserMixin from pyotp import random_base32 from pyotp.totp import TOTP @@ -28,7 +28,7 @@ class User(UserMixin): self.is_two_factor_enabled = is_two_factor_enabled self.secret_token = secret_token self.method = method - self.__totp = None + self.__totp = TOTP(secret_token) if secret_token else None @property def password_hash(self) -> bytes: @@ -66,7 +66,7 @@ class User(UserMixin): self.secret_token = random_base32() self.__totp = TOTP(self.secret_token) - def check_otp(self, otp: str) -> bool: + def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool: """ Check if the otp is correct by comparing it to the stored secret token @@ -74,9 +74,64 @@ class User(UserMixin): :return: The otp is being checked against the secret token. If the otp is correct, the user is returned. """ + if secret: + return TOTP(secret).verify(otp, valid_window=3) if not self.__totp: return False return self.__totp.verify(otp, valid_window=3) def __repr__(self): return f"User({self.id!r}, {self.__password!r}, {self.is_two_factor_enabled!r}, {self.secret_token!r}, {self.method!r})" + + +class AnonymousUser(AnonymousUserMixin): + def __init__(self): + self.id = None + self.is_two_factor_enabled = False + self.secret_token = None + self.method = "manual" + + @property + def password_hash(self) -> None: + """ + Get the password hash + + :return: The password hash + """ + return None + + def update_password(self, password: str): + """ + Set the password by hashing it + + :param password: The password to be hashed + """ + self.__password = hashpw(password.encode("utf-8"), gensalt(rounds=13)) + + def check_password(self, password: str): + """ + Check if the password is correct by hashing it and comparing it to the stored hash + + :param password: The password to be checked + :return: The password is being checked against the password hash. If the password is correct, + the user is returned. + """ + return False + + def get_authentication_setup_uri(self) -> str: + return "" + + def refresh_totp(self): + return + + def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool: + """ + Check if the otp is correct by comparing it to the stored secret token + + :param otp: The otp to be checked + :return: The otp is being checked against the secret token. If the otp is correct, + the user is returned. + """ + if secret: + return TOTP(secret).verify(otp, valid_window=3) + return False