Update and fix the whole user management of the web UI

This commit is contained in:
Théophile Diot 2024-01-02 16:20:18 +00:00
parent ce8022a436
commit a8bfd03368
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
2 changed files with 116 additions and 66 deletions

View file

@ -27,23 +27,8 @@ from copy import deepcopy
from datetime import datetime, timedelta, timezone
from dateutil.parser import parse as dateutil_parse
from docker import DockerClient
from docker.errors import (
NotFound as docker_NotFound,
APIError as docker_APIError,
DockerException,
)
from flask import (
Flask,
Response,
flash,
jsonify,
redirect,
render_template,
request,
send_file,
session,
url_for,
)
from docker.errors import NotFound as docker_NotFound, APIError as docker_APIError, DockerException
from flask import Flask, Response, flash, jsonify, redirect, render_template, request, send_file, session, url_for
from flask_login import current_user, LoginManager, login_required, login_user, logout_user
from flask_wtf.csrf import CSRFProtect, CSRFError, generate_csrf
from glob import glob
@ -71,7 +56,7 @@ from src.Instances import Instances
from src.ConfigFiles import ConfigFiles
from src.Config import Config
from src.ReverseProxied import ReverseProxied
from src.User import User
from src.User import AnonymousUser, User
from utils import check_settings, get_b64encoded_qr_image, path_to_dict
from Database import Database # type: ignore
@ -123,6 +108,7 @@ app.logger.setLevel(gunicorn_logger.level)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
login_manager.anonymous_user = AnonymousUser
PLUGIN_KEYS = ["id", "name", "description", "version", "stream", "settings"]
INTEGRATION = "Linux"
@ -181,7 +167,7 @@ if USER:
updated = True
if updated:
ret = db.update_ui_user(USER.get_id(), USER.password_hash, USER.is_two_factor_enabled, app.config["USER"].secret_token)
ret = db.update_ui_user(USER.get_id(), USER.password_hash, USER.is_two_factor_enabled, USER.secret_token)
if ret:
app.logger.error(f"Couldn't update the admin user in the database: {ret}")
stop(1)
@ -224,6 +210,7 @@ try:
LAST_RELOAD=0,
TO_FLASH=[],
DARK_MODE=False,
CURRENT_TOTP_TOKEN=None,
)
except FileNotFoundError as e:
app.logger.error(repr(e), e.filename)
@ -322,7 +309,12 @@ def set_csp_header(response):
@login_manager.user_loader
def load_user(user_id):
return app.config["USER"] if app.config["USER"] and user_id == app.config["USER"].get_id() else None
db_user = db.get_ui_user()
if not db_user:
app.logger.warning("Couldn't get the admin user from the database.")
return None
user = User(**db_user)
return user if user_id == user.get_id() else None
@app.errorhandler(CSRFError)
@ -338,14 +330,14 @@ def handle_csrf_error(_):
flash("Wrong CSRF token !", "error")
if not app.config["USER"]:
return render_template("setup.html"), 403
return render_template("login.html", is_totp=app.config["USER"].is_two_factor_enabled), 403
return render_template("login.html", is_totp=current_user.is_two_factor_enabled), 403
@app.before_request
def before_request():
if app.config["USER"] and current_user.is_authenticated:
if current_user.is_authenticated:
passed = True
if not session.get("totp_validated", False) and app.config["USER"].is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")):
if not session.get("totp_validated", False) and current_user.is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")):
return redirect(url_for("totp", next=request.form.get("next")))
elif session.get("ip") != request.remote_addr:
passed = False
@ -485,14 +477,14 @@ def totp():
flash("Missing token parameter.", "error")
return redirect(url_for("totp"))
if not app.config["USER"].check_otp(request.form["totp_token"]):
if not current_user.check_otp(request.form["totp_token"]):
flash("The token is invalid.", "error")
return redirect(url_for("totp"))
session["totp_validated"] = True
redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
if app.config["USER"] and (not app.config["USER"].is_two_factor_enabled or session.get("totp_validated", False)):
if not current_user.is_two_factor_enabled or session.get("totp_validated", False):
return redirect(url_for("home"))
return render_template("totp.html", dark_mode=app.config["DARK_MODE"])
@ -569,7 +561,7 @@ def profile():
error = False
if "curr_password" in request.form:
if not app.config["USER"].check_password(request.form["curr_password"]):
if not current_user.check_password(request.form["curr_password"]):
flash("The current password is incorrect.", "error")
error = True
@ -598,12 +590,12 @@ def profile():
if error:
return redirect(url_for("profile"))
app.config["USER"] = User(
request.form.get("admin_username") or app.config["USER"].get_id(),
user = User(
request.form.get("admin_username") or current_user.get_id(),
request.form.get("admin_password") or request.form["curr_password"],
is_two_factor_enabled=app.config["USER"].is_two_factor_enabled,
secret_token=app.config["USER"].secret_token,
method=app.config["USER"].method,
is_two_factor_enabled=current_user.is_two_factor_enabled,
secret_token=current_user.secret_token,
method=current_user.method,
)
session.clear()
@ -613,57 +605,54 @@ def profile():
flash("Missing totp_token parameter.", "error")
return redirect(url_for("profile"))
if not app.config["USER"].check_password(request.form.get("totp_password", "")):
if not current_user.check_password(request.form.get("totp_password", "")):
flash("The current password is incorrect.", "error")
error = True
if not app.config["USER"].check_otp(request.form["totp_token"]):
if not current_user.check_otp(request.form["totp_token"], secret=app.config["CURRENT_TOTP_TOKEN"]):
flash("The token is invalid.", "error")
error = True
app.logger.warning(request.form["totp_password"])
if error:
return redirect(url_for("profile"))
app.logger.warning("TOTP validated")
session["totp_validated"] = not current_user.is_two_factor_enabled
session["totp_validated"] = not app.config["USER"].is_two_factor_enabled
if app.config["USER"].is_two_factor_enabled:
app.config["USER"].secret_token = None
app.config["USER"].is_two_factor_enabled = session["totp_validated"]
app.logger.warning(app.config["USER"])
user = User(
current_user.get_id(),
request.form["totp_password"],
is_two_factor_enabled=session["totp_validated"],
secret_token=None if current_user.is_two_factor_enabled else app.config["CURRENT_TOTP_TOKEN"],
method=current_user.method,
)
app.config["CURRENT_TOTP_TOKEN"] = None
else:
flash("Missing form data.", "error")
return redirect(url_for("profile"))
ret = db.update_ui_user(
app.config["USER"].get_id(),
app.config["USER"].password_hash,
app.config["USER"].is_two_factor_enabled,
app.config["USER"].secret_token if app.config["USER"].is_two_factor_enabled else None,
user.get_id(),
user.password_hash,
user.is_two_factor_enabled,
user.secret_token if user.is_two_factor_enabled else None,
)
if ret:
app.logger.error(f"Couldn't update the admin user in the database: {ret}")
flash(f"Couldn't update the admin user in the database: {ret}", "error")
return redirect(url_for("profile"))
app.logger.warning("User updated")
return redirect(url_for("profile"))
secret_token = ""
totp_qr_image = ""
if not app.config["USER"].is_two_factor_enabled:
app.config["USER"].refresh_totp()
secret_token = app.config["USER"].secret_token
totp_qr_image = get_b64encoded_qr_image(app.config["USER"].get_authentication_setup_uri())
if not current_user.is_two_factor_enabled:
current_user.refresh_totp()
secret_token = current_user.secret_token
totp_qr_image = get_b64encoded_qr_image(current_user.get_authentication_setup_uri())
app.config["CURRENT_TOTP_TOKEN"] = secret_token
return render_template("profile.html", username=app.config["USER"].get_id(), is_totp=app.config["USER"].is_two_factor_enabled, secret_token=secret_token, totp_qr_image=totp_qr_image, dark_mode=app.config["DARK_MODE"])
return render_template("profile.html", username=current_user.get_id(), is_totp=current_user.is_two_factor_enabled, secret_token=secret_token, totp_qr_image=totp_qr_image, dark_mode=app.config["DARK_MODE"])
@app.route("/instances", methods=["GET", "POST"])
@ -1662,15 +1651,26 @@ def jobs_download():
@app.route("/login", methods=["GET", "POST"])
def login():
if not app.config["USER"]:
return redirect(url_for("setup"))
elif current_user.is_authenticated: # type: ignore
return redirect(url_for("home"))
fail = False
if request.method == "POST" and "username" in request.form and "password" in request.form:
app.logger.warning(f"Login attempt from {request.remote_addr} with username \"{request.form['username']}\"")
if app.config["USER"].get_id() == request.form["username"] and app.config["USER"].check_password(request.form["password"]):
db_user = db.get_ui_user()
if not db_user:
app.logger.error("Couldn't get user from database")
stop(1)
user = User(**db_user)
if user.get_id() == request.form["username"] and user.check_password(request.form["password"]):
# log the user in
session["ip"] = request.remote_addr
session["user_agent"] = request.headers.get("User-Agent")
session["totp_validated"] = False
login_user(app.config["USER"], duration=timedelta(hours=1))
login_user(user, duration=timedelta(hours=1))
# redirect him to the page he originally wanted or to the home page
return redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
@ -1678,13 +1678,8 @@ def login():
flash("Invalid username or password", "error")
fail = True
if not app.config["USER"]:
return redirect(url_for("setup"))
elif current_user.is_authenticated: # type: ignore
return redirect(url_for("home"))
kwargs = {
"is_totp": app.config["USER"].is_two_factor_enabled,
"is_totp": current_user.is_two_factor_enabled,
} | ({"error": "Invalid username or password"} if fail else {})
return render_template("login.html", **kwargs), 401 if fail else 200

View file

@ -3,7 +3,7 @@
from typing import Optional
from bcrypt import checkpw, hashpw, gensalt
from flask_login import UserMixin
from flask_login import AnonymousUserMixin, UserMixin
from pyotp import random_base32
from pyotp.totp import TOTP
@ -28,7 +28,7 @@ class User(UserMixin):
self.is_two_factor_enabled = is_two_factor_enabled
self.secret_token = secret_token
self.method = method
self.__totp = None
self.__totp = TOTP(secret_token) if secret_token else None
@property
def password_hash(self) -> bytes:
@ -66,7 +66,7 @@ class User(UserMixin):
self.secret_token = random_base32()
self.__totp = TOTP(self.secret_token)
def check_otp(self, otp: str) -> bool:
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
"""
Check if the otp is correct by comparing it to the stored secret token
@ -74,9 +74,64 @@ class User(UserMixin):
:return: The otp is being checked against the secret token. If the otp is correct,
the user is returned.
"""
if secret:
return TOTP(secret).verify(otp, valid_window=3)
if not self.__totp:
return False
return self.__totp.verify(otp, valid_window=3)
def __repr__(self):
return f"User({self.id!r}, {self.__password!r}, {self.is_two_factor_enabled!r}, {self.secret_token!r}, {self.method!r})"
class AnonymousUser(AnonymousUserMixin):
def __init__(self):
self.id = None
self.is_two_factor_enabled = False
self.secret_token = None
self.method = "manual"
@property
def password_hash(self) -> None:
"""
Get the password hash
:return: The password hash
"""
return None
def update_password(self, password: str):
"""
Set the password by hashing it
:param password: The password to be hashed
"""
self.__password = hashpw(password.encode("utf-8"), gensalt(rounds=13))
def check_password(self, password: str):
"""
Check if the password is correct by hashing it and comparing it to the stored hash
:param password: The password to be checked
:return: The password is being checked against the password hash. If the password is correct,
the user is returned.
"""
return False
def get_authentication_setup_uri(self) -> str:
return ""
def refresh_totp(self):
return
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
"""
Check if the otp is correct by comparing it to the stored secret token
:param otp: The otp to be checked
:return: The otp is being checked against the secret token. If the otp is correct,
the user is returned.
"""
if secret:
return TOTP(secret).verify(otp, valid_window=3)
return False