mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Update and fix the whole user management of the web UI
This commit is contained in:
parent
ce8022a436
commit
a8bfd03368
2 changed files with 116 additions and 66 deletions
121
src/ui/main.py
121
src/ui/main.py
|
|
@ -27,23 +27,8 @@ from copy import deepcopy
|
|||
from datetime import datetime, timedelta, timezone
|
||||
from dateutil.parser import parse as dateutil_parse
|
||||
from docker import DockerClient
|
||||
from docker.errors import (
|
||||
NotFound as docker_NotFound,
|
||||
APIError as docker_APIError,
|
||||
DockerException,
|
||||
)
|
||||
from flask import (
|
||||
Flask,
|
||||
Response,
|
||||
flash,
|
||||
jsonify,
|
||||
redirect,
|
||||
render_template,
|
||||
request,
|
||||
send_file,
|
||||
session,
|
||||
url_for,
|
||||
)
|
||||
from docker.errors import NotFound as docker_NotFound, APIError as docker_APIError, DockerException
|
||||
from flask import Flask, Response, flash, jsonify, redirect, render_template, request, send_file, session, url_for
|
||||
from flask_login import current_user, LoginManager, login_required, login_user, logout_user
|
||||
from flask_wtf.csrf import CSRFProtect, CSRFError, generate_csrf
|
||||
from glob import glob
|
||||
|
|
@ -71,7 +56,7 @@ from src.Instances import Instances
|
|||
from src.ConfigFiles import ConfigFiles
|
||||
from src.Config import Config
|
||||
from src.ReverseProxied import ReverseProxied
|
||||
from src.User import User
|
||||
from src.User import AnonymousUser, User
|
||||
|
||||
from utils import check_settings, get_b64encoded_qr_image, path_to_dict
|
||||
from Database import Database # type: ignore
|
||||
|
|
@ -123,6 +108,7 @@ app.logger.setLevel(gunicorn_logger.level)
|
|||
login_manager = LoginManager()
|
||||
login_manager.init_app(app)
|
||||
login_manager.login_view = "login"
|
||||
login_manager.anonymous_user = AnonymousUser
|
||||
PLUGIN_KEYS = ["id", "name", "description", "version", "stream", "settings"]
|
||||
|
||||
INTEGRATION = "Linux"
|
||||
|
|
@ -181,7 +167,7 @@ if USER:
|
|||
updated = True
|
||||
|
||||
if updated:
|
||||
ret = db.update_ui_user(USER.get_id(), USER.password_hash, USER.is_two_factor_enabled, app.config["USER"].secret_token)
|
||||
ret = db.update_ui_user(USER.get_id(), USER.password_hash, USER.is_two_factor_enabled, USER.secret_token)
|
||||
if ret:
|
||||
app.logger.error(f"Couldn't update the admin user in the database: {ret}")
|
||||
stop(1)
|
||||
|
|
@ -224,6 +210,7 @@ try:
|
|||
LAST_RELOAD=0,
|
||||
TO_FLASH=[],
|
||||
DARK_MODE=False,
|
||||
CURRENT_TOTP_TOKEN=None,
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
app.logger.error(repr(e), e.filename)
|
||||
|
|
@ -322,7 +309,12 @@ def set_csp_header(response):
|
|||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
return app.config["USER"] if app.config["USER"] and user_id == app.config["USER"].get_id() else None
|
||||
db_user = db.get_ui_user()
|
||||
if not db_user:
|
||||
app.logger.warning("Couldn't get the admin user from the database.")
|
||||
return None
|
||||
user = User(**db_user)
|
||||
return user if user_id == user.get_id() else None
|
||||
|
||||
|
||||
@app.errorhandler(CSRFError)
|
||||
|
|
@ -338,14 +330,14 @@ def handle_csrf_error(_):
|
|||
flash("Wrong CSRF token !", "error")
|
||||
if not app.config["USER"]:
|
||||
return render_template("setup.html"), 403
|
||||
return render_template("login.html", is_totp=app.config["USER"].is_two_factor_enabled), 403
|
||||
return render_template("login.html", is_totp=current_user.is_two_factor_enabled), 403
|
||||
|
||||
|
||||
@app.before_request
|
||||
def before_request():
|
||||
if app.config["USER"] and current_user.is_authenticated:
|
||||
if current_user.is_authenticated:
|
||||
passed = True
|
||||
if not session.get("totp_validated", False) and app.config["USER"].is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")):
|
||||
if not session.get("totp_validated", False) and current_user.is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")):
|
||||
return redirect(url_for("totp", next=request.form.get("next")))
|
||||
elif session.get("ip") != request.remote_addr:
|
||||
passed = False
|
||||
|
|
@ -485,14 +477,14 @@ def totp():
|
|||
flash("Missing token parameter.", "error")
|
||||
return redirect(url_for("totp"))
|
||||
|
||||
if not app.config["USER"].check_otp(request.form["totp_token"]):
|
||||
if not current_user.check_otp(request.form["totp_token"]):
|
||||
flash("The token is invalid.", "error")
|
||||
return redirect(url_for("totp"))
|
||||
|
||||
session["totp_validated"] = True
|
||||
redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
|
||||
|
||||
if app.config["USER"] and (not app.config["USER"].is_two_factor_enabled or session.get("totp_validated", False)):
|
||||
if not current_user.is_two_factor_enabled or session.get("totp_validated", False):
|
||||
return redirect(url_for("home"))
|
||||
|
||||
return render_template("totp.html", dark_mode=app.config["DARK_MODE"])
|
||||
|
|
@ -569,7 +561,7 @@ def profile():
|
|||
error = False
|
||||
|
||||
if "curr_password" in request.form:
|
||||
if not app.config["USER"].check_password(request.form["curr_password"]):
|
||||
if not current_user.check_password(request.form["curr_password"]):
|
||||
flash("The current password is incorrect.", "error")
|
||||
error = True
|
||||
|
||||
|
|
@ -598,12 +590,12 @@ def profile():
|
|||
if error:
|
||||
return redirect(url_for("profile"))
|
||||
|
||||
app.config["USER"] = User(
|
||||
request.form.get("admin_username") or app.config["USER"].get_id(),
|
||||
user = User(
|
||||
request.form.get("admin_username") or current_user.get_id(),
|
||||
request.form.get("admin_password") or request.form["curr_password"],
|
||||
is_two_factor_enabled=app.config["USER"].is_two_factor_enabled,
|
||||
secret_token=app.config["USER"].secret_token,
|
||||
method=app.config["USER"].method,
|
||||
is_two_factor_enabled=current_user.is_two_factor_enabled,
|
||||
secret_token=current_user.secret_token,
|
||||
method=current_user.method,
|
||||
)
|
||||
|
||||
session.clear()
|
||||
|
|
@ -613,57 +605,54 @@ def profile():
|
|||
flash("Missing totp_token parameter.", "error")
|
||||
return redirect(url_for("profile"))
|
||||
|
||||
if not app.config["USER"].check_password(request.form.get("totp_password", "")):
|
||||
if not current_user.check_password(request.form.get("totp_password", "")):
|
||||
flash("The current password is incorrect.", "error")
|
||||
error = True
|
||||
|
||||
if not app.config["USER"].check_otp(request.form["totp_token"]):
|
||||
if not current_user.check_otp(request.form["totp_token"], secret=app.config["CURRENT_TOTP_TOKEN"]):
|
||||
flash("The token is invalid.", "error")
|
||||
error = True
|
||||
|
||||
app.logger.warning(request.form["totp_password"])
|
||||
|
||||
if error:
|
||||
return redirect(url_for("profile"))
|
||||
|
||||
app.logger.warning("TOTP validated")
|
||||
session["totp_validated"] = not current_user.is_two_factor_enabled
|
||||
|
||||
session["totp_validated"] = not app.config["USER"].is_two_factor_enabled
|
||||
|
||||
if app.config["USER"].is_two_factor_enabled:
|
||||
app.config["USER"].secret_token = None
|
||||
|
||||
app.config["USER"].is_two_factor_enabled = session["totp_validated"]
|
||||
|
||||
app.logger.warning(app.config["USER"])
|
||||
user = User(
|
||||
current_user.get_id(),
|
||||
request.form["totp_password"],
|
||||
is_two_factor_enabled=session["totp_validated"],
|
||||
secret_token=None if current_user.is_two_factor_enabled else app.config["CURRENT_TOTP_TOKEN"],
|
||||
method=current_user.method,
|
||||
)
|
||||
app.config["CURRENT_TOTP_TOKEN"] = None
|
||||
else:
|
||||
flash("Missing form data.", "error")
|
||||
return redirect(url_for("profile"))
|
||||
|
||||
ret = db.update_ui_user(
|
||||
app.config["USER"].get_id(),
|
||||
app.config["USER"].password_hash,
|
||||
app.config["USER"].is_two_factor_enabled,
|
||||
app.config["USER"].secret_token if app.config["USER"].is_two_factor_enabled else None,
|
||||
user.get_id(),
|
||||
user.password_hash,
|
||||
user.is_two_factor_enabled,
|
||||
user.secret_token if user.is_two_factor_enabled else None,
|
||||
)
|
||||
if ret:
|
||||
app.logger.error(f"Couldn't update the admin user in the database: {ret}")
|
||||
flash(f"Couldn't update the admin user in the database: {ret}", "error")
|
||||
return redirect(url_for("profile"))
|
||||
|
||||
app.logger.warning("User updated")
|
||||
|
||||
return redirect(url_for("profile"))
|
||||
|
||||
secret_token = ""
|
||||
totp_qr_image = ""
|
||||
|
||||
if not app.config["USER"].is_two_factor_enabled:
|
||||
app.config["USER"].refresh_totp()
|
||||
secret_token = app.config["USER"].secret_token
|
||||
totp_qr_image = get_b64encoded_qr_image(app.config["USER"].get_authentication_setup_uri())
|
||||
if not current_user.is_two_factor_enabled:
|
||||
current_user.refresh_totp()
|
||||
secret_token = current_user.secret_token
|
||||
totp_qr_image = get_b64encoded_qr_image(current_user.get_authentication_setup_uri())
|
||||
app.config["CURRENT_TOTP_TOKEN"] = secret_token
|
||||
|
||||
return render_template("profile.html", username=app.config["USER"].get_id(), is_totp=app.config["USER"].is_two_factor_enabled, secret_token=secret_token, totp_qr_image=totp_qr_image, dark_mode=app.config["DARK_MODE"])
|
||||
return render_template("profile.html", username=current_user.get_id(), is_totp=current_user.is_two_factor_enabled, secret_token=secret_token, totp_qr_image=totp_qr_image, dark_mode=app.config["DARK_MODE"])
|
||||
|
||||
|
||||
@app.route("/instances", methods=["GET", "POST"])
|
||||
|
|
@ -1662,15 +1651,26 @@ def jobs_download():
|
|||
|
||||
@app.route("/login", methods=["GET", "POST"])
|
||||
def login():
|
||||
if not app.config["USER"]:
|
||||
return redirect(url_for("setup"))
|
||||
elif current_user.is_authenticated: # type: ignore
|
||||
return redirect(url_for("home"))
|
||||
|
||||
fail = False
|
||||
if request.method == "POST" and "username" in request.form and "password" in request.form:
|
||||
app.logger.warning(f"Login attempt from {request.remote_addr} with username \"{request.form['username']}\"")
|
||||
if app.config["USER"].get_id() == request.form["username"] and app.config["USER"].check_password(request.form["password"]):
|
||||
db_user = db.get_ui_user()
|
||||
if not db_user:
|
||||
app.logger.error("Couldn't get user from database")
|
||||
stop(1)
|
||||
user = User(**db_user)
|
||||
|
||||
if user.get_id() == request.form["username"] and user.check_password(request.form["password"]):
|
||||
# log the user in
|
||||
session["ip"] = request.remote_addr
|
||||
session["user_agent"] = request.headers.get("User-Agent")
|
||||
session["totp_validated"] = False
|
||||
login_user(app.config["USER"], duration=timedelta(hours=1))
|
||||
login_user(user, duration=timedelta(hours=1))
|
||||
|
||||
# redirect him to the page he originally wanted or to the home page
|
||||
return redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
|
||||
|
|
@ -1678,13 +1678,8 @@ def login():
|
|||
flash("Invalid username or password", "error")
|
||||
fail = True
|
||||
|
||||
if not app.config["USER"]:
|
||||
return redirect(url_for("setup"))
|
||||
elif current_user.is_authenticated: # type: ignore
|
||||
return redirect(url_for("home"))
|
||||
|
||||
kwargs = {
|
||||
"is_totp": app.config["USER"].is_two_factor_enabled,
|
||||
"is_totp": current_user.is_two_factor_enabled,
|
||||
} | ({"error": "Invalid username or password"} if fail else {})
|
||||
|
||||
return render_template("login.html", **kwargs), 401 if fail else 200
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
from typing import Optional
|
||||
|
||||
from bcrypt import checkpw, hashpw, gensalt
|
||||
from flask_login import UserMixin
|
||||
from flask_login import AnonymousUserMixin, UserMixin
|
||||
from pyotp import random_base32
|
||||
from pyotp.totp import TOTP
|
||||
|
||||
|
|
@ -28,7 +28,7 @@ class User(UserMixin):
|
|||
self.is_two_factor_enabled = is_two_factor_enabled
|
||||
self.secret_token = secret_token
|
||||
self.method = method
|
||||
self.__totp = None
|
||||
self.__totp = TOTP(secret_token) if secret_token else None
|
||||
|
||||
@property
|
||||
def password_hash(self) -> bytes:
|
||||
|
|
@ -66,7 +66,7 @@ class User(UserMixin):
|
|||
self.secret_token = random_base32()
|
||||
self.__totp = TOTP(self.secret_token)
|
||||
|
||||
def check_otp(self, otp: str) -> bool:
|
||||
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Check if the otp is correct by comparing it to the stored secret token
|
||||
|
||||
|
|
@ -74,9 +74,64 @@ class User(UserMixin):
|
|||
:return: The otp is being checked against the secret token. If the otp is correct,
|
||||
the user is returned.
|
||||
"""
|
||||
if secret:
|
||||
return TOTP(secret).verify(otp, valid_window=3)
|
||||
if not self.__totp:
|
||||
return False
|
||||
return self.__totp.verify(otp, valid_window=3)
|
||||
|
||||
def __repr__(self):
|
||||
return f"User({self.id!r}, {self.__password!r}, {self.is_two_factor_enabled!r}, {self.secret_token!r}, {self.method!r})"
|
||||
|
||||
|
||||
class AnonymousUser(AnonymousUserMixin):
|
||||
def __init__(self):
|
||||
self.id = None
|
||||
self.is_two_factor_enabled = False
|
||||
self.secret_token = None
|
||||
self.method = "manual"
|
||||
|
||||
@property
|
||||
def password_hash(self) -> None:
|
||||
"""
|
||||
Get the password hash
|
||||
|
||||
:return: The password hash
|
||||
"""
|
||||
return None
|
||||
|
||||
def update_password(self, password: str):
|
||||
"""
|
||||
Set the password by hashing it
|
||||
|
||||
:param password: The password to be hashed
|
||||
"""
|
||||
self.__password = hashpw(password.encode("utf-8"), gensalt(rounds=13))
|
||||
|
||||
def check_password(self, password: str):
|
||||
"""
|
||||
Check if the password is correct by hashing it and comparing it to the stored hash
|
||||
|
||||
:param password: The password to be checked
|
||||
:return: The password is being checked against the password hash. If the password is correct,
|
||||
the user is returned.
|
||||
"""
|
||||
return False
|
||||
|
||||
def get_authentication_setup_uri(self) -> str:
|
||||
return ""
|
||||
|
||||
def refresh_totp(self):
|
||||
return
|
||||
|
||||
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Check if the otp is correct by comparing it to the stored secret token
|
||||
|
||||
:param otp: The otp to be checked
|
||||
:return: The otp is being checked against the secret token. If the otp is correct,
|
||||
the user is returned.
|
||||
"""
|
||||
if secret:
|
||||
return TOTP(secret).verify(otp, valid_window=3)
|
||||
return False
|
||||
|
|
|
|||
Loading…
Reference in a new issue