mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Refactor TOTP logic in web UI + Add the possibility to have a custom salt for passwords
This commit is contained in:
parent
bb3e4bd007
commit
2b27caea2a
8 changed files with 314 additions and 138 deletions
263
src/ui/main.py
263
src/ui/main.py
|
|
@ -5,12 +5,15 @@ from math import floor
|
|||
from multiprocessing import Manager
|
||||
from os import _exit, getenv, listdir, sep, urandom
|
||||
from os.path import basename, dirname, isabs, join
|
||||
from random import randint
|
||||
from secrets import choice, token_urlsafe
|
||||
from string import ascii_letters, digits
|
||||
from sys import path as sys_path, modules as sys_modules
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
from uuid import uuid4
|
||||
|
||||
from bcrypt import gensalt
|
||||
from builder import home_builder, instances_builder, global_config_builder, jobs_builder, services_builder
|
||||
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
|
||||
|
|
@ -19,6 +22,7 @@ for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in ((
|
|||
|
||||
from bs4 import BeautifulSoup
|
||||
from copy import deepcopy
|
||||
from cryptography.fernet import Fernet
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from dateutil.parser import parse as dateutil_parse
|
||||
from flask import Flask, Response, flash, jsonify, make_response, redirect, render_template, request, send_file, session, url_for
|
||||
|
|
@ -29,8 +33,7 @@ from importlib.machinery import SourceFileLoader
|
|||
from io import BytesIO
|
||||
from json import JSONDecodeError, dumps, loads as json_loads
|
||||
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
||||
from pyotp import random_base32
|
||||
from pyotp.totp import TOTP
|
||||
from passlib import totp
|
||||
from redis import Redis, Sentinel
|
||||
from regex import compile as re_compile, match as regex_match
|
||||
from requests import get
|
||||
|
|
@ -38,7 +41,7 @@ from shutil import move, rmtree
|
|||
from signal import SIGINT, signal, SIGTERM
|
||||
from subprocess import PIPE, Popen, call
|
||||
from tarfile import CompressionError, HeaderError, ReadError, TarError, open as tar_open
|
||||
from threading import Thread, Lock
|
||||
from threading import Thread
|
||||
from tempfile import NamedTemporaryFile
|
||||
from time import sleep, time
|
||||
from werkzeug.utils import secure_filename
|
||||
|
|
@ -49,18 +52,20 @@ from src.ConfigFiles import ConfigFiles
|
|||
from src.Config import Config
|
||||
from src.ReverseProxied import ReverseProxied
|
||||
from src.Templates import get_ui_templates
|
||||
from src.totp import Totp
|
||||
|
||||
from common_utils import get_version # type: ignore
|
||||
from logger import setup_logger # type: ignore
|
||||
|
||||
from models import AnonymousUser
|
||||
from ui_database import UIDatabase
|
||||
from utils import USER_PASSWORD_RX, PLUGIN_KEYS, check_password, check_settings, gen_password_hash, get_b64encoded_qr_image, path_to_dict, get_remain
|
||||
from utils import USER_PASSWORD_RX, PLUGIN_KEYS, PLUGIN_ID_RX, check_password, check_settings, gen_password_hash, path_to_dict, get_remain
|
||||
|
||||
TMP_DIR = Path(sep, "var", "tmp", "bunkerweb")
|
||||
TMP_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
LOCK = Lock()
|
||||
LIB_DIR = Path(sep, "var", "lib", "bunkerweb")
|
||||
LIB_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def stop_gunicorn():
|
||||
|
|
@ -104,12 +109,61 @@ with app.app_context():
|
|||
TMP_DIR.joinpath(".flask_secret").write_text(token_urlsafe(32), encoding="utf-8")
|
||||
FLASK_SECRET = TMP_DIR.joinpath(".flask_secret").read_text(encoding="utf-8").strip()
|
||||
|
||||
PASSWORD_SALT = getenv("PASSWORD_SALT", "")
|
||||
if not PASSWORD_SALT.isdigit():
|
||||
if not LIB_DIR.joinpath(".password_salt").is_file():
|
||||
app.logger.warning(
|
||||
"The PASSWORD_SALT environment variable is missing or invalid (must be an integer) or the .password_salt file is missing, generating a random one ..."
|
||||
)
|
||||
LIB_DIR.joinpath(".password_salt").write_bytes(gensalt(rounds=13))
|
||||
PASSWORD_SALT = LIB_DIR.joinpath(".password_salt").read_text(encoding="utf-8").strip()
|
||||
|
||||
TOTP_SECRETS = getenv("TOTP_SECRETS", "")
|
||||
if TOTP_SECRETS:
|
||||
try:
|
||||
TOTP_SECRETS = json_loads(TOTP_SECRETS)
|
||||
except JSONDecodeError:
|
||||
app.logger.warning(
|
||||
"The TOTP_SECRETS environment variable is invalid, generating a random one ... (check the format via the documentation: https://passlib.readthedocs.io/en/stable/narr/totp-tutorial.html#application-secrets)"
|
||||
)
|
||||
TOTP_SECRETS = None
|
||||
|
||||
if not TOTP_SECRETS:
|
||||
if not LIB_DIR.joinpath(".totp_secrets.json").is_file():
|
||||
if TOTP_SECRETS is not None:
|
||||
app.logger.warning("The TOTP_SECRETS environment variable is missing or the .totp_secrets.json file is missing, generating a random one ...")
|
||||
LIB_DIR.joinpath(".totp_secrets.json").write_text(dumps({k: totp.generate_secret() for k in range(randint(1, 5))}), encoding="utf-8")
|
||||
TOTP_SECRETS = json_loads(LIB_DIR.joinpath(".totp_secrets.json").read_text(encoding="utf-8"))
|
||||
|
||||
MF_RECOVERY_CODES_KEYS = []
|
||||
if getenv("MF_ENCRYPT_RECOVERY_CODES", "yes").lower() != "no":
|
||||
MF_RECOVERY_CODES_KEYS = getenv("MF_RECOVERY_CODES_KEYS", "")
|
||||
if MF_RECOVERY_CODES_KEYS:
|
||||
try:
|
||||
MF_RECOVERY_CODES_KEYS = json_loads(MF_RECOVERY_CODES_KEYS)
|
||||
except JSONDecodeError:
|
||||
app.logger.warning(
|
||||
"The MF_RECOVERY_CODES_KEYS environment variable is invalid, generating a random one ... (check the format via the documentation: https://cryptography.io/en/latest/fernet/#fernet-symmetric-encryption)"
|
||||
)
|
||||
MF_RECOVERY_CODES_KEYS = None
|
||||
|
||||
if not MF_RECOVERY_CODES_KEYS:
|
||||
if MF_RECOVERY_CODES_KEYS is not None and not LIB_DIR.joinpath(".mf_recovery_codes_keys.json").is_file():
|
||||
app.logger.warning("The MF_RECOVERY_CODES_KEYS environment variable is missing, generating a random one ...")
|
||||
LIB_DIR.joinpath(".mf_recovery_codes_keys.json").write_text(
|
||||
dumps([Fernet.generate_key().decode() for _ in range(randint(1, 5))]), encoding="utf-8"
|
||||
)
|
||||
MF_RECOVERY_CODES_KEYS = json_loads(LIB_DIR.joinpath(".mf_recovery_codes_keys.json").read_text(encoding="utf-8"))
|
||||
else:
|
||||
app.logger.warning("MF_ENCRYPT_RECOVERY_CODES is set to 'no', multi-factor recovery codes will not be encrypted")
|
||||
|
||||
app.config["SECRET_KEY"] = FLASK_SECRET
|
||||
|
||||
app.config["SESSION_COOKIE_NAME"] = "__Host-bw_ui_session"
|
||||
app.config["SESSION_COOKIE_PATH"] = "/"
|
||||
app.config["SESSION_COOKIE_SECURE"] = True # Required for __Host- prefix
|
||||
app.config["SESSION_COOKIE_HTTPONLY"] = True # Recommended for security
|
||||
app.config["SESSION_COOKIE_SAMESITE"] = "Lax" # Or 'Strict' for stricter settings
|
||||
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
|
||||
|
||||
login_manager = LoginManager()
|
||||
login_manager.session_protection = "strong"
|
||||
|
|
@ -129,7 +183,9 @@ with app.app_context():
|
|||
continue
|
||||
sleep(5)
|
||||
|
||||
ret, err = DB.init_ui_tables(get_version())
|
||||
BW_VERSION = get_version()
|
||||
|
||||
ret, err = DB.init_ui_tables(BW_VERSION)
|
||||
|
||||
if not ret and err:
|
||||
app.logger.error(f"Exception while checking database tables : {err}")
|
||||
|
|
@ -181,7 +237,7 @@ with app.app_context():
|
|||
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). It will not be updated."
|
||||
)
|
||||
else:
|
||||
ADMIN_USER["password"] = gen_password_hash(env_admin_password)
|
||||
ADMIN_USER["password"] = gen_password_hash(env_admin_password, PASSWORD_SALT)
|
||||
updated = True
|
||||
|
||||
if updated:
|
||||
|
|
@ -207,16 +263,11 @@ with app.app_context():
|
|||
)
|
||||
exit(1)
|
||||
|
||||
ret = DB.create_ui_user(user_name, gen_password_hash(env_admin_password), ["admin"], admin=True)
|
||||
ret = DB.create_ui_user(user_name, gen_password_hash(env_admin_password, PASSWORD_SALT), ["admin"], admin=True)
|
||||
if ret:
|
||||
app.logger.error(f"Couldn't create the admin user in the database: {ret}")
|
||||
exit(1)
|
||||
|
||||
bw_version = get_version()
|
||||
|
||||
if not TMP_DIR.joinpath(".ui.json").is_file():
|
||||
TMP_DIR.joinpath(".ui.json").write_text("{}", encoding="utf-8")
|
||||
|
||||
try:
|
||||
app.config.update(
|
||||
INSTANCES=Instances(DB),
|
||||
|
|
@ -231,8 +282,6 @@ with app.app_context():
|
|||
app.logger.error(repr(e), e.filename)
|
||||
stop(1)
|
||||
|
||||
plugin_id_rx = re_compile(r"^[\w_-]{1,64}$")
|
||||
|
||||
# Declare functions for jinja2
|
||||
app.jinja_env.globals.update(check_settings=check_settings)
|
||||
|
||||
|
|
@ -240,7 +289,8 @@ with app.app_context():
|
|||
csrf = CSRFProtect()
|
||||
csrf.init_app(app)
|
||||
|
||||
ui_data = Manager().dict()
|
||||
app.data = Manager().dict()
|
||||
app.totp = Totp(app, TOTP_SECRETS, [key.encode() for key in MF_RECOVERY_CODES_KEYS])
|
||||
|
||||
LOG_RX = re_compile(r"^(?P<date>\d+/\d+/\d+\s\d+:\d+:\d+)\s\[(?P<level>[a-z]+)\]\s\d+#\d+:\s(?P<message>[^\n]+)$")
|
||||
REVERSE_PROXY_PATH = re_compile(r"^(?P<host>https?://.{1,255}(:((6553[0-5])|(655[0-2]\d)|(65[0-4]\d{2})|(6[0-4]\d{3})|([1-5]\d{4})|([0-5]{0,5})|(\d{1,4})))?)$")
|
||||
|
|
@ -274,8 +324,8 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
|
|||
# Do the operation
|
||||
error = 0
|
||||
|
||||
if "TO_FLASH" not in ui_data:
|
||||
ui_data["TO_FLASH"] = []
|
||||
if "TO_FLASH" not in app.data:
|
||||
app.data["TO_FLASH"] = []
|
||||
|
||||
if method == "services":
|
||||
if operation == "new":
|
||||
|
|
@ -301,22 +351,22 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
|
|||
if operation:
|
||||
if isinstance(operation, list):
|
||||
for op in operation:
|
||||
ui_data["TO_FLASH"].append({"content": f"Reload failed for the instance {op}", "type": "error"})
|
||||
app.data["TO_FLASH"].append({"content": f"Reload failed for the instance {op}", "type": "error"})
|
||||
elif operation.startswith(("Can't", "The database is read-only")):
|
||||
ui_data["TO_FLASH"].append({"content": operation, "type": "error"})
|
||||
app.data["TO_FLASH"].append({"content": operation, "type": "error"})
|
||||
else:
|
||||
ui_data["TO_FLASH"].append({"content": operation, "type": "success"})
|
||||
app.data["TO_FLASH"].append({"content": operation, "type": "success"})
|
||||
|
||||
if not threaded:
|
||||
for f in ui_data.get("TO_FLASH", []):
|
||||
for f in app.data.get("TO_FLASH", []):
|
||||
if f["type"] == "error":
|
||||
flash(f["content"], "error")
|
||||
else:
|
||||
flash(f["content"])
|
||||
|
||||
ui_data["TO_FLASH"] = []
|
||||
app.data["TO_FLASH"] = []
|
||||
|
||||
ui_data["RELOADING"] = False
|
||||
app.data["RELOADING"] = False
|
||||
|
||||
return error
|
||||
|
||||
|
|
@ -455,17 +505,17 @@ def inject_variables():
|
|||
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
|
||||
)
|
||||
|
||||
if not changes_ongoing and ui_data.get("PRO_LOADING"):
|
||||
ui_data["PRO_LOADING"] = False
|
||||
if not changes_ongoing and app.data.get("PRO_LOADING"):
|
||||
app.data["PRO_LOADING"] = False
|
||||
|
||||
if not changes_ongoing and metadata["failover"]:
|
||||
flash(
|
||||
"The last changes could not be applied because it creates a configuration error on NGINX, please check the logs for more information. The configured fell back to the last working one.",
|
||||
"error",
|
||||
)
|
||||
elif not changes_ongoing and not metadata["failover"] and ui_data.get("CONFIG_CHANGED", False):
|
||||
elif not changes_ongoing and not metadata["failover"] and app.data.get("CONFIG_CHANGED", False):
|
||||
flash("The last changes have been applied successfully.", "success")
|
||||
ui_data["CONFIG_CHANGED"] = False
|
||||
app.data["CONFIG_CHANGED"] = False
|
||||
|
||||
# check that is value is in tuple
|
||||
return dict(
|
||||
|
|
@ -477,9 +527,9 @@ def inject_variables():
|
|||
pro_expire=metadata["pro_expire"].strftime("%d-%m-%Y") if metadata["pro_expire"] else "Unknown",
|
||||
pro_overlapped=metadata["pro_overlapped"],
|
||||
plugins=app.config["CONFIG"].get_plugins(),
|
||||
pro_loading=ui_data.get("PRO_LOADING", False),
|
||||
pro_loading=app.data.get("PRO_LOADING", False),
|
||||
bw_version=metadata["version"],
|
||||
is_readonly=ui_data.get("READONLY_MODE", False),
|
||||
is_readonly=app.data.get("READONLY_MODE", False),
|
||||
username=current_user.get_id() if current_user.is_authenticated else "",
|
||||
)
|
||||
|
||||
|
|
@ -517,7 +567,8 @@ def set_security_headers(response):
|
|||
# * Referrer-Policy header to prevent leaking of sensitive data
|
||||
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
|
||||
|
||||
app.logger.debug(f"UI data: {ui_data}")
|
||||
if current_user.totp_refreshed:
|
||||
DB.set_ui_user_recovery_code_refreshed(current_user.get_id(), False)
|
||||
|
||||
return response
|
||||
|
||||
|
|
@ -556,7 +607,7 @@ def handle_csrf_error(_):
|
|||
|
||||
@app.before_request
|
||||
def before_request():
|
||||
if ui_data.get("SERVER_STOPPING", False):
|
||||
if app.data.get("SERVER_STOPPING", False):
|
||||
response = make_response(jsonify({"message": "Server is shutting down, try again later."}), 503)
|
||||
response.headers["Retry-After"] = 30 # Clients should retry after 30 seconds # type: ignore
|
||||
return response
|
||||
|
|
@ -568,14 +619,14 @@ def before_request():
|
|||
DB.database_uri
|
||||
and DB.readonly
|
||||
and (
|
||||
datetime.now(timezone.utc) - datetime.fromisoformat(ui_data.get("LAST_DATABASE_RETRY", "1970-01-01T00:00:00")).replace(tzinfo=timezone.utc)
|
||||
datetime.now(timezone.utc) - datetime.fromisoformat(app.data.get("LAST_DATABASE_RETRY", "1970-01-01T00:00:00")).replace(tzinfo=timezone.utc)
|
||||
> timedelta(minutes=1)
|
||||
)
|
||||
):
|
||||
try:
|
||||
DB.retry_connection(pool_timeout=1)
|
||||
DB.retry_connection(log=False)
|
||||
ui_data["READONLY_MODE"] = False
|
||||
app.data["READONLY_MODE"] = False
|
||||
app.logger.info("The database is no longer read-only, defaulting to read-write mode")
|
||||
except BaseException:
|
||||
try:
|
||||
|
|
@ -586,22 +637,22 @@ def before_request():
|
|||
with suppress(BaseException):
|
||||
DB.retry_connection(fallback=True, pool_timeout=1)
|
||||
DB.retry_connection(fallback=True, log=False)
|
||||
ui_data["READONLY_MODE"] = True
|
||||
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
|
||||
elif not ui_data.get("READONLY_MODE", False) and request.method == "POST" and not ("/totp" in request.path or "/login" in request.path):
|
||||
app.data["READONLY_MODE"] = True
|
||||
app.data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
|
||||
elif not app.data.get("READONLY_MODE", False) and request.method == "POST" and not ("/totp" in request.path or "/login" in request.path):
|
||||
try:
|
||||
DB.test_write()
|
||||
ui_data["READONLY_MODE"] = False
|
||||
app.data["READONLY_MODE"] = False
|
||||
except BaseException:
|
||||
ui_data["READONLY_MODE"] = True
|
||||
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
|
||||
app.data["READONLY_MODE"] = True
|
||||
app.data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
|
||||
else:
|
||||
try:
|
||||
DB.test_read()
|
||||
except BaseException:
|
||||
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
|
||||
app.data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
|
||||
|
||||
DB.readonly = ui_data.get("READONLY_MODE", False)
|
||||
DB.readonly = app.data.get("READONLY_MODE", False)
|
||||
|
||||
if DB.readonly:
|
||||
flash("Database connection is in read-only mode : no modification possible.", "error")
|
||||
|
|
@ -684,7 +735,9 @@ def setup():
|
|||
"setup",
|
||||
)
|
||||
|
||||
ret = DB.create_ui_user(request.form["admin_username"], gen_password_hash(request.form["admin_password"]), ["admin"], method="ui", admin=True)
|
||||
ret = DB.create_ui_user(
|
||||
request.form["admin_username"], gen_password_hash(request.form["admin_password"], PASSWORD_SALT), ["admin"], method="ui", admin=True
|
||||
)
|
||||
if ret:
|
||||
return handle_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error")
|
||||
|
||||
|
|
@ -702,8 +755,8 @@ def setup():
|
|||
if not REVERSE_PROXY_PATH.match(request.form["ui_host"]):
|
||||
return handle_error("The hostname is not valid.", "setup")
|
||||
|
||||
ui_data["RELOADING"] = True
|
||||
ui_data["LAST_RELOAD"] = time()
|
||||
app.data["RELOADING"] = True
|
||||
app.data["LAST_RELOAD"] = time()
|
||||
|
||||
config = {
|
||||
"SERVER_NAME": request.form["server_name"],
|
||||
|
|
@ -758,10 +811,11 @@ def totp():
|
|||
if request.method == "POST":
|
||||
verify_data_in_form(data={"totp_token": None}, err_message="No token provided on /totp.", redirect_url="totp")
|
||||
|
||||
if request.form["totp_token"] not in current_user.list_recovery_codes and not current_user.check_otp(request.form["totp_token"]):
|
||||
return handle_error("The token is invalid.", "totp")
|
||||
elif request.form["totp_token"] in current_user.list_recovery_codes:
|
||||
DB.use_ui_user_recovery_code(current_user.get_id(), request.form["totp_token"])
|
||||
if not app.totp.verify_totp(request.form["totp_token"], user=current_user):
|
||||
if not app.totp.verify_recovery_code(request.form["totp_token"], user=current_user):
|
||||
return handle_error("The token is invalid.", "totp")
|
||||
else:
|
||||
DB.use_ui_user_recovery_code(current_user.get_id(), app.totp.encrypt_recovery_code(request.form["totp_token"]))
|
||||
|
||||
session["totp_validated"] = True
|
||||
redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
|
||||
|
|
@ -823,7 +877,7 @@ def home():
|
|||
metadata = DB.get_metadata()
|
||||
|
||||
data = {
|
||||
"check_version": not remote_version or bw_version == remote_version,
|
||||
"check_version": not remote_version or BW_VERSION == remote_version,
|
||||
"remote_version": remote_version,
|
||||
"version": metadata["version"],
|
||||
"instances_number": len(instances),
|
||||
|
|
@ -888,16 +942,16 @@ def account():
|
|||
if not manage_bunkerweb("global_config", variable, threaded=threaded):
|
||||
message = "Checking license key to upgrade."
|
||||
if threaded:
|
||||
ui_data["TO_FLASH"].append({"content": message, "type": "success"})
|
||||
app.data["TO_FLASH"].append({"content": message, "type": "success"})
|
||||
else:
|
||||
flash(message)
|
||||
|
||||
ui_data["PRO_LOADING"] = True
|
||||
ui_data["CONFIG_CHANGED"] = True
|
||||
app.data["PRO_LOADING"] = True
|
||||
app.data["CONFIG_CHANGED"] = True
|
||||
|
||||
if any(curr_changes.values()):
|
||||
ui_data["RELOADING"] = True
|
||||
ui_data["LAST_RELOAD"] = time()
|
||||
app.data["RELOADING"] = True
|
||||
app.data["LAST_RELOAD"] = time()
|
||||
Thread(target=update_global_config, args=(True,)).start()
|
||||
else:
|
||||
update_global_config()
|
||||
|
|
@ -911,7 +965,8 @@ def account():
|
|||
|
||||
username = current_user.get_id()
|
||||
password = request.form["curr_password"]
|
||||
secret_token = current_user.totp_secret
|
||||
totp_secret = current_user.totp_secret
|
||||
totp_recovery_codes = current_user.list_recovery_codes
|
||||
|
||||
if request.form["operation"] == "username":
|
||||
verify_data_in_form(data={"admin_username": None}, err_message="Missing admin username parameter on /account.", redirect_url="account")
|
||||
|
|
@ -946,16 +1001,26 @@ def account():
|
|||
if request.form["operation"] == "totp":
|
||||
verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /account.", redirect_url="account")
|
||||
|
||||
if request.form["totp_token"] not in current_user.list_recovery_codes and not current_user.check_otp(
|
||||
request.form["totp_token"], secret=session.get("tmp_totp_secret")
|
||||
):
|
||||
if not app.totp.verify_totp(
|
||||
request.form["totp_token"], totp_secret=session.get("tmp_totp_secret", ""), user=current_user
|
||||
) and not app.totp.verify_recovery_code(request.form["totp_token"], user=current_user):
|
||||
return handle_error("The totp token is invalid. (totp)", "account")
|
||||
|
||||
session["totp_validated"] = not bool(current_user.totp_secret)
|
||||
secret_token = None if bool(current_user.totp_secret) else session.get("tmp_totp_secret")
|
||||
session["tmp_totp_secret"] = None
|
||||
totp_secret = None if bool(current_user.totp_secret) else session.pop("tmp_totp_secret", "")
|
||||
|
||||
ret = DB.update_ui_user(username, gen_password_hash(password), secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
|
||||
if totp_secret and totp_secret != current_user.totp_secret:
|
||||
totp_recovery_codes = app.totp.generate_recovery_codes()
|
||||
|
||||
app.logger.debug(f"totp recovery codes: {totp_recovery_codes}")
|
||||
|
||||
ret = DB.update_ui_user(
|
||||
username,
|
||||
gen_password_hash(password, PASSWORD_SALT),
|
||||
totp_secret,
|
||||
totp_recovery_codes=totp_recovery_codes,
|
||||
method=current_user.method if request.form["operation"] == "totp" else "ui",
|
||||
)
|
||||
if ret:
|
||||
return handle_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
|
||||
|
||||
|
|
@ -970,18 +1035,19 @@ def account():
|
|||
return redirect(url_for("account" if request.form["operation"] == "totp" else "login"))
|
||||
|
||||
totp_qr_image = ""
|
||||
|
||||
if not bool(current_user.totp_secret):
|
||||
session["tmp_totp_secret"] = random_base32()
|
||||
totp = TOTP(session["tmp_totp_secret"])
|
||||
totp_qr_image = get_b64encoded_qr_image(totp.provisioning_uri(name=current_user.get_id(), issuer_name="BunkerWeb UI"))
|
||||
session["tmp_totp_secret"] = app.totp.generate_totp_secret()
|
||||
totp_qr_image = app.totp.generate_qrcode(current_user.get_id(), session["tmp_totp_secret"])
|
||||
|
||||
# TODO: Show user backup codes after TOTP refresh + add refresh feature
|
||||
return render_template(
|
||||
"account.html",
|
||||
username=current_user.get_id(),
|
||||
is_totp=bool(current_user.totp_secret),
|
||||
secret_token=session["tmp_totp_secret"],
|
||||
secret_token=app.totp.get_totp_pretty_key(session.get("tmp_totp_secret", "")),
|
||||
totp_qr_image=totp_qr_image,
|
||||
totp_refrehed=current_user.totp_refreshed,
|
||||
totp_recovery_codes=app.totp.decrypt_recovery_codes(current_user) if current_user.totp_refreshed else [],
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -1006,8 +1072,8 @@ def instances():
|
|||
next=True,
|
||||
)
|
||||
|
||||
ui_data["RELOADING"] = True
|
||||
ui_data["LAST_RELOAD"] = time()
|
||||
app.data["RELOADING"] = True
|
||||
app.data["LAST_RELOAD"] = time()
|
||||
Thread(
|
||||
target=manage_bunkerweb,
|
||||
name="Reloading instances",
|
||||
|
|
@ -1160,13 +1226,13 @@ def services():
|
|||
for k, v in db_metadata.items()
|
||||
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
|
||||
):
|
||||
ui_data["RELOADING"] = True
|
||||
ui_data["LAST_RELOAD"] = time()
|
||||
app.data["RELOADING"] = True
|
||||
app.data["LAST_RELOAD"] = time()
|
||||
Thread(target=update_services, args=(True,)).start()
|
||||
else:
|
||||
update_services()
|
||||
|
||||
ui_data["CONFIG_CHANGED"] = True
|
||||
app.data["CONFIG_CHANGED"] = True
|
||||
|
||||
message = ""
|
||||
|
||||
|
|
@ -1271,13 +1337,13 @@ def services_raw(service_name: str):
|
|||
for k, v in db_metadata.items()
|
||||
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
|
||||
):
|
||||
ui_data["RELOADING"] = True
|
||||
ui_data["LAST_RELOAD"] = time()
|
||||
app.data["RELOADING"] = True
|
||||
app.data["LAST_RELOAD"] = time()
|
||||
Thread(target=update_services, args=(True,)).start()
|
||||
else:
|
||||
update_services()
|
||||
|
||||
ui_data["CONFIG_CHANGED"] = True
|
||||
app.data["CONFIG_CHANGED"] = True
|
||||
|
||||
message = ""
|
||||
|
||||
|
|
@ -1378,20 +1444,20 @@ def global_config():
|
|||
manage_bunkerweb("global_config", variables, threaded=threaded)
|
||||
|
||||
if "PRO_LICENSE_KEY" in variables:
|
||||
ui_data["PRO_LOADING"] = True
|
||||
app.data["PRO_LOADING"] = True
|
||||
|
||||
if any(
|
||||
v
|
||||
for k, v in db_metadata.items()
|
||||
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
|
||||
):
|
||||
ui_data["RELOADING"] = True
|
||||
ui_data["LAST_RELOAD"] = time()
|
||||
app.data["RELOADING"] = True
|
||||
app.data["LAST_RELOAD"] = time()
|
||||
Thread(target=update_global_config, args=(True,)).start()
|
||||
else:
|
||||
update_global_config()
|
||||
|
||||
ui_data["CONFIG_CHANGED"] = True
|
||||
app.data["CONFIG_CHANGED"] = True
|
||||
|
||||
with suppress(BaseException):
|
||||
if config["PRO_LICENSE_KEY"]["value"] != variables["PRO_LICENSE_KEY"]:
|
||||
|
|
@ -1509,7 +1575,7 @@ def configs():
|
|||
app.logger.error(f"Could not save custom configs: {error}")
|
||||
return handle_error("Couldn't save custom configs", "configs", True)
|
||||
|
||||
ui_data["CONFIG_CHANGED"] = True
|
||||
app.data["CONFIG_CHANGED"] = True
|
||||
|
||||
flash(operation)
|
||||
|
||||
|
|
@ -1568,25 +1634,25 @@ def plugins():
|
|||
if err:
|
||||
message = f"Couldn't update external plugins to database: {err}"
|
||||
if threaded:
|
||||
ui_data["TO_FLASH"].append({"content": message, "type": "error"})
|
||||
app.data["TO_FLASH"].append({"content": message, "type": "error"})
|
||||
else:
|
||||
error_message(message)
|
||||
else:
|
||||
message = f"Deleted plugin {variables['name']} successfully"
|
||||
if threaded:
|
||||
ui_data["TO_FLASH"].append({"content": message, "type": "success"})
|
||||
app.data["TO_FLASH"].append({"content": message, "type": "success"})
|
||||
else:
|
||||
flash(message)
|
||||
|
||||
ui_data["RELOADING"] = False
|
||||
app.data["RELOADING"] = False
|
||||
|
||||
if any(
|
||||
v
|
||||
for k, v in db_metadata.items()
|
||||
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
|
||||
):
|
||||
ui_data["RELOADING"] = True
|
||||
ui_data["LAST_RELOAD"] = time()
|
||||
app.data["RELOADING"] = True
|
||||
app.data["LAST_RELOAD"] = time()
|
||||
|
||||
Thread(target=update_plugins, args=(True,)).start()
|
||||
else:
|
||||
|
|
@ -1775,25 +1841,25 @@ def plugins():
|
|||
if err:
|
||||
message = f"Couldn't update external plugins to database: {err}"
|
||||
if threaded:
|
||||
ui_data["TO_FLASH"].append({"content": message, "type": "error"})
|
||||
app.data["TO_FLASH"].append({"content": message, "type": "error"})
|
||||
else:
|
||||
flash(message, "error")
|
||||
else:
|
||||
message = "Plugins uploaded successfully"
|
||||
if threaded:
|
||||
ui_data["TO_FLASH"].append({"content": message, "type": "success"})
|
||||
app.data["TO_FLASH"].append({"content": message, "type": "success"})
|
||||
else:
|
||||
flash("Plugins uploaded successfully")
|
||||
|
||||
ui_data["RELOADING"] = False
|
||||
app.data["RELOADING"] = False
|
||||
|
||||
if any(
|
||||
v
|
||||
for k, v in db_metadata.items()
|
||||
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
|
||||
):
|
||||
ui_data["RELOADING"] = True
|
||||
ui_data["LAST_RELOAD"] = time()
|
||||
app.data["RELOADING"] = True
|
||||
app.data["LAST_RELOAD"] = time()
|
||||
|
||||
Thread(target=update_plugins, args=(True,)).start()
|
||||
else:
|
||||
|
|
@ -1901,7 +1967,7 @@ def upload_plugin():
|
|||
@app.route("/plugins/<plugin>", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def custom_plugin(plugin: str):
|
||||
if not plugin_id_rx.match(plugin):
|
||||
if not PLUGIN_ID_RX.match(plugin):
|
||||
return error_message("Invalid plugin id, (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)"), 400
|
||||
|
||||
# Case we ware looking for a plugin template
|
||||
|
|
@ -2574,7 +2640,6 @@ def login():
|
|||
# log the user in
|
||||
session["user_agent"] = request.headers.get("User-Agent")
|
||||
session["totp_validated"] = False
|
||||
session["tmp_totp_secret"] = None
|
||||
|
||||
ui_user.last_login_at = datetime.now()
|
||||
ui_user.last_login_ip = request.remote_addr
|
||||
|
|
@ -2607,21 +2672,21 @@ def login():
|
|||
@app.route("/check_reloading")
|
||||
@login_required
|
||||
def check_reloading():
|
||||
if not ui_data.get("RELOADING", False) or ui_data.get("LAST_RELOAD", 0) + 60 < time():
|
||||
if ui_data.get("RELOADING", False):
|
||||
if not app.data.get("RELOADING", False) or app.data.get("LAST_RELOAD", 0) + 60 < time():
|
||||
if app.data.get("RELOADING", False):
|
||||
app.logger.warning("Reloading took too long, forcing the state to be reloaded")
|
||||
flash("Forced the status to be reloaded", "error")
|
||||
ui_data["RELOADING"] = False
|
||||
app.data["RELOADING"] = False
|
||||
|
||||
for f in ui_data.get("TO_FLASH", []):
|
||||
for f in app.data.get("TO_FLASH", []):
|
||||
if f["type"] == "error":
|
||||
flash(f["content"], "error")
|
||||
else:
|
||||
flash(f["content"])
|
||||
|
||||
ui_data["TO_FLASH"] = []
|
||||
app.data["TO_FLASH"] = []
|
||||
|
||||
return jsonify({"reloading": ui_data.get("RELOADING", False)})
|
||||
return jsonify({"reloading": app.data.get("RELOADING", False)})
|
||||
|
||||
|
||||
@app.route("/logout")
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ from datetime import datetime, timezone
|
|||
from functools import partial
|
||||
from os.path import join, sep
|
||||
from sys import path as sys_path
|
||||
from typing import Optional
|
||||
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("db",))]:
|
||||
if deps_path not in sys_path:
|
||||
|
|
@ -10,9 +9,8 @@ for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in ((
|
|||
|
||||
from bcrypt import checkpw
|
||||
from flask_login import AnonymousUserMixin, UserMixin
|
||||
from pyotp.totp import TOTP
|
||||
from sqlalchemy.orm import declarative_base, relationship
|
||||
from sqlalchemy import Boolean, DateTime, Column, Integer, String, ForeignKey, func
|
||||
from sqlalchemy import Boolean, DateTime, Column, Integer, String, ForeignKey, UnicodeText, func
|
||||
|
||||
|
||||
from model import METHODS_ENUM # type: ignore
|
||||
|
|
@ -30,17 +28,19 @@ class AnonymousUser(AnonymousUserMixin):
|
|||
last_login_ip = None
|
||||
login_count = 0
|
||||
totp_secret = None
|
||||
totp_refreshed = False
|
||||
creation_date = datetime.now(timezone.utc)
|
||||
update_date = datetime.now(timezone.utc)
|
||||
list_roles = []
|
||||
list_permissions = []
|
||||
list_recovery_codes = []
|
||||
|
||||
def get_id(self):
|
||||
return self.username
|
||||
|
||||
def check_password(self, password: str) -> bool:
|
||||
return False
|
||||
|
||||
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
|
||||
if not secret:
|
||||
raise ValueError("Secret is required for anonymous user")
|
||||
return TOTP(secret).verify(otp, valid_window=3)
|
||||
|
||||
|
||||
class Users(Base, UserMixin):
|
||||
__tablename__ = "bw_ui_users"
|
||||
|
|
@ -57,7 +57,7 @@ class Users(Base, UserMixin):
|
|||
login_count = Column(Integer, default=0, nullable=False)
|
||||
|
||||
# 2FA
|
||||
totp_secret = Column(String(32), nullable=True)
|
||||
totp_secret = Column(String(256), nullable=True)
|
||||
totp_refreshed = Column(Boolean, nullable=False, default=False)
|
||||
|
||||
creation_date = Column(DateTime(), nullable=False, server_default=func.now())
|
||||
|
|
@ -75,11 +75,6 @@ class Users(Base, UserMixin):
|
|||
def check_password(self, password: str) -> bool:
|
||||
return checkpw(password.encode("utf-8"), self.password.encode("utf-8"))
|
||||
|
||||
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
|
||||
if secret:
|
||||
return TOTP(secret).verify(otp, valid_window=3)
|
||||
return TOTP(self.totp_secret).verify(otp, valid_window=3)
|
||||
|
||||
|
||||
class Roles(Base):
|
||||
__tablename__ = "bw_ui_roles"
|
||||
|
|
@ -107,7 +102,7 @@ class UserRecoveryCodes(Base):
|
|||
|
||||
id = Column(Integer, primary_key=True)
|
||||
user_name = Column(String(256), ForeignKey("bw_ui_users.username", onupdate="cascade", ondelete="cascade"), nullable=False)
|
||||
code = Column(String(19), nullable=False)
|
||||
code = Column(UnicodeText, nullable=False)
|
||||
|
||||
user = relationship("Users", back_populates="recovery_codes")
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ Flask-Login==0.6.3
|
|||
Flask_WTF==1.2.1
|
||||
gunicorn[gthread]==22.0.0
|
||||
passlib==1.7.4
|
||||
pyotp==2.9.0
|
||||
python-magic==0.4.27
|
||||
python_dateutil==2.9.0.post0
|
||||
qrcode==7.4.2
|
||||
|
|
|
|||
|
|
@ -151,10 +151,6 @@ passlib==1.7.4 \
|
|||
--hash=sha256:aa6bca462b8d8bda89c70b382f0c298a20b5560af6cbfa2dce410c0a2fb669f1 \
|
||||
--hash=sha256:defd50f72b65c5402ab2c573830a6978e5f202ad0d984793c8dde2c4152ebe04
|
||||
# via -r requirements.in
|
||||
pyotp==2.9.0 \
|
||||
--hash=sha256:346b6642e0dbdde3b4ff5a930b664ca82abfa116356ed48cc42c7d6590d36f63 \
|
||||
--hash=sha256:81c2e5865b8ac55e825b0358e496e1d9387c811e85bb40e71a3b29b288963612
|
||||
# via -r requirements.in
|
||||
pypng==0.20220715.0 \
|
||||
--hash=sha256:4a43e969b8f5aaafb2a415536c1a8ec7e341cd6a3f957fd5b5f32a4cfeed902c \
|
||||
--hash=sha256:739c433ba96f078315de54c0db975aee537cbc3e1d0ae4ed9aab0ca1e427e2c1
|
||||
|
|
|
|||
104
src/ui/src/totp.py
Normal file
104
src/ui/src/totp.py
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
from base64 import b64encode
|
||||
from contextlib import suppress
|
||||
from io import BytesIO
|
||||
from cryptography.fernet import Fernet, InvalidToken, MultiFernet
|
||||
from typing import Dict, List, Optional, Union
|
||||
from flask import Flask
|
||||
from passlib.totp import TOTP, MalformedTokenError, TokenError, TotpMatch
|
||||
from passlib.pwd import genword
|
||||
from qrcode import make
|
||||
from qrcode.image.svg import SvgImage
|
||||
|
||||
from models import Users
|
||||
|
||||
|
||||
class Totp:
|
||||
def __init__(self, app: Flask, secrets: Dict[Union[str, int], str], recovery_codes_keys: List[bytes]):
|
||||
"""Initialize a totp factory.
|
||||
secrets are used to encrypt the per-user totp_secret on disk.
|
||||
recovery_codes_keys are used to encrypt the per-user recovery codes on disk.
|
||||
"""
|
||||
# This should be a dict with at least one entry
|
||||
self.app = app
|
||||
self._totp = TOTP.using(secrets=secrets, issuer="BunkerWeb UI")
|
||||
|
||||
self.cryptor: Optional[MultiFernet] = None
|
||||
if recovery_codes_keys:
|
||||
self.cryptor = MultiFernet([Fernet(key) for key in recovery_codes_keys])
|
||||
|
||||
def generate_totp_secret(self) -> str:
|
||||
"""Create new user-unique totp_secret."""
|
||||
return self._totp.new().to_json(encrypt=True)
|
||||
|
||||
def get_totp_pretty_key(self, totp_secret: str) -> str:
|
||||
"""Generate pretty key for manual input"""
|
||||
if not totp_secret:
|
||||
return ""
|
||||
return self._totp.from_source(totp_secret).pretty_key()
|
||||
|
||||
def generate_recovery_codes(self) -> List[str]:
|
||||
codes = ["-".join([pwd[i : i + 4] for i in range(0, len(pwd), 4)]) for pwd in genword(length=16, charset="hex", returns=5)] # noqa: E203
|
||||
if not self.cryptor:
|
||||
return codes
|
||||
return [self.cryptor.encrypt(code.encode()).decode() for code in codes]
|
||||
|
||||
def decrypt_recovery_code(self, code: str) -> Optional[str]:
|
||||
if not self.cryptor:
|
||||
return code
|
||||
return self.cryptor.decrypt(code.encode()).decode()
|
||||
|
||||
def decrypt_recovery_codes(self, user: Users) -> List[str]:
|
||||
return [self.decrypt_recovery_code(code) for code in user.list_recovery_codes]
|
||||
|
||||
def encrypt_recovery_code(self, code: str) -> Optional[str]:
|
||||
if not self.cryptor:
|
||||
return code
|
||||
return self.cryptor.encrypt(code.encode()).decode()
|
||||
|
||||
def verify_recovery_code(self, code: str, user: Users) -> bool:
|
||||
"""Check if recovery code is valid for user."""
|
||||
if not user.list_recovery_codes:
|
||||
return False
|
||||
|
||||
with suppress(InvalidToken):
|
||||
if code in self.decrypt_recovery_codes(user):
|
||||
return True
|
||||
return False
|
||||
|
||||
def verify_totp(self, token: str, *, totp_secret: Optional[str] = None, user: Optional[Users] = None) -> bool:
|
||||
"""Verifies token for specific user."""
|
||||
if not totp_secret and not user:
|
||||
raise ValueError("Either totp_secret or user must be provided")
|
||||
elif not totp_secret:
|
||||
totp_secret = user.totp_secret
|
||||
|
||||
try:
|
||||
tmatch = self._totp.verify(token, totp_secret, last_counter=self.get_last_counter(user))
|
||||
if user:
|
||||
self.set_last_counter(user, tmatch)
|
||||
return True
|
||||
except (MalformedTokenError, TokenError):
|
||||
return False
|
||||
|
||||
def get_totp_uri(self, username: str, totp_secret: str) -> str:
|
||||
"""Generate provisioning url for use with the qrcode scanner built into the app"""
|
||||
return self._totp.from_source(totp_secret).to_uri(username, "BunkerWeb UI")
|
||||
|
||||
def generate_qrcode(self, username: str, totp: str) -> str:
|
||||
"""Generate QRcode Using username, totp, generate the actual QRcode image."""
|
||||
totp_image = make(self.get_totp_uri(username, totp), image_factory=SvgImage)
|
||||
with BytesIO() as virtual_file:
|
||||
totp_image.save(virtual_file)
|
||||
image_as_str = b64encode(virtual_file.getvalue()).decode("ascii")
|
||||
|
||||
return f"data:image/svg+xml;base64,{image_as_str}"
|
||||
|
||||
def get_last_counter(self, user: Users) -> Optional[int]:
|
||||
"""Fetch stored last_counter from cache."""
|
||||
return self.app.data.get("totp_last_counter", {}).get(user.get_id())
|
||||
|
||||
def set_last_counter(self, user: Users, tmatch: TotpMatch) -> None:
|
||||
"""Cache last_counter."""
|
||||
if "totp_last_counter" not in self.app.data:
|
||||
self.app.data["totp_last_counter"] = {}
|
||||
self.app.data["totp_last_counter"][user.get_id()] = tmatch.counter
|
||||
2
src/ui/templates/account.html
vendored
2
src/ui/templates/account.html
vendored
|
|
@ -436,7 +436,7 @@
|
|||
class="flex flex-col relative col-span-12 px-4 my-2 md:px-6 md:my-3 lg:px-6 lg:my-3 max-w-[400px] w-full">
|
||||
<h5 class="input-title text-center">2FA QR CODE</h5>
|
||||
<div class="flex justify-center mt-2">
|
||||
<img src="data:image/png;base64, {{ totp_qr_image }}"
|
||||
<img src="{{ totp_qr_image }}"
|
||||
alt="Secret Token"
|
||||
style="width: 200px;
|
||||
height: 200px" />
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ from sqlalchemy.exc import IntegrityError
|
|||
|
||||
from Database import Database # type: ignore
|
||||
from models import Base, Users, Roles, RolesUsers, UserRecoveryCodes, RolesPermissions, Permissions
|
||||
from utils import gen_recovery_codes
|
||||
|
||||
|
||||
class UIDatabase(Database):
|
||||
|
|
@ -167,6 +166,7 @@ class UIDatabase(Database):
|
|||
email: Optional[str] = None,
|
||||
*,
|
||||
totp_secret: Optional[str] = None,
|
||||
totp_recovery_codes: Optional[List[str]] = None,
|
||||
method: str = "manual",
|
||||
admin: bool = False,
|
||||
) -> str:
|
||||
|
|
@ -199,9 +199,8 @@ class UIDatabase(Database):
|
|||
)
|
||||
)
|
||||
|
||||
if totp_secret:
|
||||
for code in gen_recovery_codes():
|
||||
session.add(UserRecoveryCodes(user_name=username, code=code))
|
||||
for code in totp_recovery_codes or []:
|
||||
session.add(UserRecoveryCodes(user_name=username, code=code))
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
|
|
@ -210,7 +209,9 @@ class UIDatabase(Database):
|
|||
|
||||
return ""
|
||||
|
||||
def update_ui_user(self, username: str, password: bytes, totp_secret: Optional[str], method: str = "manual") -> str:
|
||||
def update_ui_user(
|
||||
self, username: str, password: bytes, totp_secret: Optional[str], *, totp_recovery_codes: Optional[List[str]] = None, method: str = "manual"
|
||||
) -> str:
|
||||
"""Update ui user."""
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
|
|
@ -233,8 +234,8 @@ class UIDatabase(Database):
|
|||
return str(e)
|
||||
|
||||
if user.totp_refreshed:
|
||||
if totp_secret:
|
||||
self.refresh_ui_user_recovery_codes(username)
|
||||
if totp_recovery_codes:
|
||||
self.refresh_ui_user_recovery_codes(username, totp_recovery_codes or [])
|
||||
else:
|
||||
self.delete_ui_user_recovery_codes(username)
|
||||
|
||||
|
|
@ -325,7 +326,7 @@ class UIDatabase(Database):
|
|||
|
||||
return roles_data
|
||||
|
||||
def refresh_ui_user_recovery_codes(self, username: str) -> str:
|
||||
def refresh_ui_user_recovery_codes(self, username: str, codes: List[str]) -> str:
|
||||
"""Refresh ui user recovery codes."""
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
|
|
@ -335,11 +336,16 @@ class UIDatabase(Database):
|
|||
if not user:
|
||||
return f"User {username} doesn't exist"
|
||||
|
||||
if not codes:
|
||||
return "No recovery codes provided"
|
||||
|
||||
session.query(UserRecoveryCodes).filter_by(user_name=username).delete()
|
||||
|
||||
for code in gen_recovery_codes():
|
||||
for code in codes:
|
||||
session.add(UserRecoveryCodes(user_name=username, code=code))
|
||||
|
||||
user.totp_refreshed = True
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
|
|
@ -404,3 +410,19 @@ class UIDatabase(Database):
|
|||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
def set_ui_user_recovery_code_refreshed(self, username: str, value: bool) -> str:
|
||||
"""Set ui user recovery code refreshed."""
|
||||
with self._db_session() as session:
|
||||
user = session.query(Users).filter_by(username=username).first()
|
||||
if not user:
|
||||
return f"User {username} doesn't exist"
|
||||
|
||||
user.totp_refreshed = value
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
|
|
|||
|
|
@ -6,14 +6,14 @@ from os.path import join
|
|||
from threading import Lock
|
||||
from typing import List, Optional
|
||||
|
||||
from bcrypt import checkpw, gensalt, hashpw
|
||||
from bcrypt import checkpw, hashpw
|
||||
from magic import Magic
|
||||
from passlib.pwd import genword
|
||||
from qrcode.main import QRCode
|
||||
from regex import compile as re_compile
|
||||
|
||||
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")
|
||||
PLUGIN_KEYS = ["id", "name", "description", "version", "stream", "settings"]
|
||||
PLUGIN_ID_RX = re_compile(r"^[\w_-]{1,64}$")
|
||||
LOCK = Lock()
|
||||
|
||||
|
||||
|
|
@ -215,19 +215,14 @@ def check_settings(settings: dict, check: str) -> bool:
|
|||
return any(setting["context"] == check for setting in settings.values())
|
||||
|
||||
|
||||
def gen_password_hash(password: str) -> bytes:
|
||||
return hashpw(password.encode("utf-8"), gensalt(rounds=13))
|
||||
def gen_password_hash(password: str, salt: str) -> bytes:
|
||||
return hashpw(password.encode("utf-8"), salt.encode("utf-8"))
|
||||
|
||||
|
||||
def check_password(password: str, hashed: bytes) -> bool:
|
||||
return checkpw(password.encode("utf-8"), hashed)
|
||||
|
||||
|
||||
def gen_recovery_codes() -> List[str]:
|
||||
pwds = genword(length=16, charset="hex", returns=5)
|
||||
return ["-".join([pwd[i : i + 4] for i in range(0, len(pwd), 4)]) for pwd in pwds] # noqa: E203
|
||||
|
||||
|
||||
def get_b64encoded_qr_image(data: str):
|
||||
qr = QRCode(version=1, box_size=10, border=5)
|
||||
qr.add_data(data)
|
||||
|
|
|
|||
Loading…
Reference in a new issue