Refactor how we handle the admin user in the web UI to avoid unnecessary database calls

This commit is contained in:
Théophile Diot 2024-05-27 09:21:56 +01:00
parent 803ad5e6ed
commit 5ba88cce02
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06

View file

@ -129,14 +129,6 @@ elif INTEGRATION == "Kubernetes":
db = Database(app.logger, ui=True, log=False)
USER = "Error"
while USER == "Error":
with suppress(Exception):
USER = db.get_ui_user()
if USER:
USER = User(**USER)
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")
bw_version = get_version()
@ -150,7 +142,6 @@ try:
CONFIG=Config(db),
CONFIGFILES=ConfigFiles(),
WTF_CSRF_SSL_STRICT=False,
USER=USER,
SEND_FILE_MAX_AGE_DEFAULT=86400,
SCRIPT_NONCE=sha256(urandom(32)).hexdigest(),
DB=db,
@ -413,8 +404,8 @@ def set_csp_header(response):
+ " style-src 'self' 'unsafe-inline';"
+ " img-src 'self' data: https://assets.bunkerity.com;"
+ " font-src 'self' data:;"
+ " connect-src *;"
+ " base-uri 'self';"
+ (" connect-src *;" if request.path.startswith(("/check", "/setup")) else "")
)
if app.config["DB"].readonly:
flash("Database connection is in read-only mode : no modification possible.", "error")
@ -424,10 +415,12 @@ def set_csp_header(response):
@login_manager.user_loader
def load_user(user_id):
if not app.config["USER"]:
admin_user = app.config["DB"].get_ui_user()
if not admin_user:
app.logger.warning("Couldn't get the admin user from the database.")
return None
return app.config["USER"] if user_id == app.config["USER"].get_id() else None
admin_user = User(**admin_user)
return admin_user if user_id == admin_user.get_id() else None
@app.errorhandler(CSRFError)
@ -441,7 +434,7 @@ def handle_csrf_error(_):
session.clear()
logout_user()
flash("Wrong CSRF token !", "error")
if not app.config["USER"]:
if not current_user:
return render_template("setup.html"), 403
return render_template("login.html", is_totp=current_user.is_two_factor_enabled), 403
@ -471,10 +464,6 @@ def before_request():
except BaseException:
app.config["DB"].readonly = True
db_user = app.config["DB"].get_ui_user()
if db_user:
app.config["USER"] = User(**db_user)
if current_user.is_authenticated:
passed = True
@ -495,7 +484,7 @@ def before_request():
@app.route("/", strict_slashes=False)
def index():
if app.config["USER"]:
if app.config["DB"].get_ui_user():
if current_user.is_authenticated: # type: ignore
return redirect(url_for("home"))
return redirect(url_for("login"), 301)
@ -522,6 +511,10 @@ def setup():
if db_config.get(f"{server_name}_USE_UI", "no") == "yes":
return redirect(url_for("login"), 301)
admin_user = app.config["DB"].get_ui_user()
if admin_user:
admin_user = User(**admin_user)
if request.method == "POST":
if app.config["DB"].readonly:
return redirect_flash_error("Database is in read-only mode", "setup")
@ -529,13 +522,13 @@ def setup():
is_request_form("setup")
required_keys = ["server_name", "ui_host", "ui_url"]
if not app.config["USER"]:
if not admin_user:
required_keys.extend(["admin_username", "admin_password", "admin_password_check"])
if not any(key in request.form for key in required_keys):
return redirect_flash_error(f"Missing either one of the following parameters: {', '.join(required_keys)}.", "setup")
if not app.config["USER"]:
if not admin_user:
if len(request.form["admin_username"]) > 256:
return redirect_flash_error("The admin username is too long. It must be less than 256 characters.", "setup")
@ -559,10 +552,10 @@ def setup():
if not REVERSE_PROXY_PATH.match(request.form["ui_host"]):
return redirect_flash_error("The hostname is not valid.", "setup")
if not app.config["USER"]:
app.config["USER"] = User(request.form["admin_username"], request.form["admin_password"], method="ui")
if not admin_user:
admin_user = User(request.form["admin_username"], request.form["admin_password"], method="ui")
ret = app.config["DB"].create_ui_user(request.form["admin_username"], app.config["USER"].password_hash, method="ui")
ret = app.config["DB"].create_ui_user(request.form["admin_username"], admin_user.password_hash, method="ui")
if ret:
return redirect_flash_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error")
@ -601,7 +594,7 @@ def setup():
return render_template(
"setup.html",
ui_user=app.config["USER"],
ui_user=admin_user,
username=getenv("ADMIN_USERNAME", ""),
password=getenv("ADMIN_PASSWORD", ""),
ui_host=db_config.get("UI_HOST", getenv("UI_HOST", "")),
@ -2308,7 +2301,8 @@ def jobs_download():
@app.route("/login", methods=["GET", "POST"])
def login():
if not app.config["USER"]:
admin_user = app.config["DB"].get_ui_user()
if not admin_user:
return redirect(url_for("setup"))
elif current_user.is_authenticated: # type: ignore
return redirect(url_for("home"))
@ -2316,16 +2310,15 @@ def login():
fail = False
if request.method == "POST" and "username" in request.form and "password" in request.form:
app.logger.warning(f"Login attempt from {request.remote_addr} with username \"{request.form['username']}\"")
if not app.config["USER"]:
app.logger.error("Couldn't get user from database")
stop(1)
if app.config["USER"].get_id() == request.form["username"] and app.config["USER"].check_password(request.form["password"]):
admin_user = User(**admin_user)
if admin_user.get_id() == request.form["username"] and admin_user.check_password(request.form["password"]):
# log the user in
session["ip"] = request.remote_addr
session["user_agent"] = request.headers.get("User-Agent")
session["totp_validated"] = False
login_user(app.config["USER"], duration=timedelta(hours=1), force=True)
login_user(admin_user, duration=timedelta(hours=8), force=True)
# redirect him to the page he originally wanted or to the home page
return redirect(url_for("loading", next=request.form.get("next") or url_for("home")))