Refactor TOTP logic in web UI + Add the possibility to have a custom salt for passwords

This commit is contained in:
Théophile Diot 2024-07-31 14:59:39 +01:00
parent bb3e4bd007
commit 2b27caea2a
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
8 changed files with 314 additions and 138 deletions

View file

@ -5,12 +5,15 @@ from math import floor
from multiprocessing import Manager
from os import _exit, getenv, listdir, sep, urandom
from os.path import basename, dirname, isabs, join
from random import randint
from secrets import choice, token_urlsafe
from string import ascii_letters, digits
from sys import path as sys_path, modules as sys_modules
from pathlib import Path
from typing import Union
from uuid import uuid4
from bcrypt import gensalt
from builder import home_builder, instances_builder, global_config_builder, jobs_builder, services_builder
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
@ -19,6 +22,7 @@ for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in ((
from bs4 import BeautifulSoup
from copy import deepcopy
from cryptography.fernet import Fernet
from datetime import datetime, timedelta, timezone
from dateutil.parser import parse as dateutil_parse
from flask import Flask, Response, flash, jsonify, make_response, redirect, render_template, request, send_file, session, url_for
@ -29,8 +33,7 @@ from importlib.machinery import SourceFileLoader
from io import BytesIO
from json import JSONDecodeError, dumps, loads as json_loads
from jinja2 import Environment, FileSystemLoader, select_autoescape
from pyotp import random_base32
from pyotp.totp import TOTP
from passlib import totp
from redis import Redis, Sentinel
from regex import compile as re_compile, match as regex_match
from requests import get
@ -38,7 +41,7 @@ from shutil import move, rmtree
from signal import SIGINT, signal, SIGTERM
from subprocess import PIPE, Popen, call
from tarfile import CompressionError, HeaderError, ReadError, TarError, open as tar_open
from threading import Thread, Lock
from threading import Thread
from tempfile import NamedTemporaryFile
from time import sleep, time
from werkzeug.utils import secure_filename
@ -49,18 +52,20 @@ from src.ConfigFiles import ConfigFiles
from src.Config import Config
from src.ReverseProxied import ReverseProxied
from src.Templates import get_ui_templates
from src.totp import Totp
from common_utils import get_version # type: ignore
from logger import setup_logger # type: ignore
from models import AnonymousUser
from ui_database import UIDatabase
from utils import USER_PASSWORD_RX, PLUGIN_KEYS, check_password, check_settings, gen_password_hash, get_b64encoded_qr_image, path_to_dict, get_remain
from utils import USER_PASSWORD_RX, PLUGIN_KEYS, PLUGIN_ID_RX, check_password, check_settings, gen_password_hash, path_to_dict, get_remain
TMP_DIR = Path(sep, "var", "tmp", "bunkerweb")
TMP_DIR.mkdir(parents=True, exist_ok=True)
LOCK = Lock()
LIB_DIR = Path(sep, "var", "lib", "bunkerweb")
LIB_DIR.mkdir(parents=True, exist_ok=True)
def stop_gunicorn():
@ -104,12 +109,61 @@ with app.app_context():
TMP_DIR.joinpath(".flask_secret").write_text(token_urlsafe(32), encoding="utf-8")
FLASK_SECRET = TMP_DIR.joinpath(".flask_secret").read_text(encoding="utf-8").strip()
PASSWORD_SALT = getenv("PASSWORD_SALT", "")
if not PASSWORD_SALT.isdigit():
if not LIB_DIR.joinpath(".password_salt").is_file():
app.logger.warning(
"The PASSWORD_SALT environment variable is missing or invalid (must be an integer) or the .password_salt file is missing, generating a random one ..."
)
LIB_DIR.joinpath(".password_salt").write_bytes(gensalt(rounds=13))
PASSWORD_SALT = LIB_DIR.joinpath(".password_salt").read_text(encoding="utf-8").strip()
TOTP_SECRETS = getenv("TOTP_SECRETS", "")
if TOTP_SECRETS:
try:
TOTP_SECRETS = json_loads(TOTP_SECRETS)
except JSONDecodeError:
app.logger.warning(
"The TOTP_SECRETS environment variable is invalid, generating a random one ... (check the format via the documentation: https://passlib.readthedocs.io/en/stable/narr/totp-tutorial.html#application-secrets)"
)
TOTP_SECRETS = None
if not TOTP_SECRETS:
if not LIB_DIR.joinpath(".totp_secrets.json").is_file():
if TOTP_SECRETS is not None:
app.logger.warning("The TOTP_SECRETS environment variable is missing or the .totp_secrets.json file is missing, generating a random one ...")
LIB_DIR.joinpath(".totp_secrets.json").write_text(dumps({k: totp.generate_secret() for k in range(randint(1, 5))}), encoding="utf-8")
TOTP_SECRETS = json_loads(LIB_DIR.joinpath(".totp_secrets.json").read_text(encoding="utf-8"))
MF_RECOVERY_CODES_KEYS = []
if getenv("MF_ENCRYPT_RECOVERY_CODES", "yes").lower() != "no":
MF_RECOVERY_CODES_KEYS = getenv("MF_RECOVERY_CODES_KEYS", "")
if MF_RECOVERY_CODES_KEYS:
try:
MF_RECOVERY_CODES_KEYS = json_loads(MF_RECOVERY_CODES_KEYS)
except JSONDecodeError:
app.logger.warning(
"The MF_RECOVERY_CODES_KEYS environment variable is invalid, generating a random one ... (check the format via the documentation: https://cryptography.io/en/latest/fernet/#fernet-symmetric-encryption)"
)
MF_RECOVERY_CODES_KEYS = None
if not MF_RECOVERY_CODES_KEYS:
if MF_RECOVERY_CODES_KEYS is not None and not LIB_DIR.joinpath(".mf_recovery_codes_keys.json").is_file():
app.logger.warning("The MF_RECOVERY_CODES_KEYS environment variable is missing, generating a random one ...")
LIB_DIR.joinpath(".mf_recovery_codes_keys.json").write_text(
dumps([Fernet.generate_key().decode() for _ in range(randint(1, 5))]), encoding="utf-8"
)
MF_RECOVERY_CODES_KEYS = json_loads(LIB_DIR.joinpath(".mf_recovery_codes_keys.json").read_text(encoding="utf-8"))
else:
app.logger.warning("MF_ENCRYPT_RECOVERY_CODES is set to 'no', multi-factor recovery codes will not be encrypted")
app.config["SECRET_KEY"] = FLASK_SECRET
app.config["SESSION_COOKIE_NAME"] = "__Host-bw_ui_session"
app.config["SESSION_COOKIE_PATH"] = "/"
app.config["SESSION_COOKIE_SECURE"] = True # Required for __Host- prefix
app.config["SESSION_COOKIE_HTTPONLY"] = True # Recommended for security
app.config["SESSION_COOKIE_SAMESITE"] = "Lax" # Or 'Strict' for stricter settings
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
login_manager = LoginManager()
login_manager.session_protection = "strong"
@ -129,7 +183,9 @@ with app.app_context():
continue
sleep(5)
ret, err = DB.init_ui_tables(get_version())
BW_VERSION = get_version()
ret, err = DB.init_ui_tables(BW_VERSION)
if not ret and err:
app.logger.error(f"Exception while checking database tables : {err}")
@ -181,7 +237,7 @@ with app.app_context():
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). It will not be updated."
)
else:
ADMIN_USER["password"] = gen_password_hash(env_admin_password)
ADMIN_USER["password"] = gen_password_hash(env_admin_password, PASSWORD_SALT)
updated = True
if updated:
@ -207,16 +263,11 @@ with app.app_context():
)
exit(1)
ret = DB.create_ui_user(user_name, gen_password_hash(env_admin_password), ["admin"], admin=True)
ret = DB.create_ui_user(user_name, gen_password_hash(env_admin_password, PASSWORD_SALT), ["admin"], admin=True)
if ret:
app.logger.error(f"Couldn't create the admin user in the database: {ret}")
exit(1)
bw_version = get_version()
if not TMP_DIR.joinpath(".ui.json").is_file():
TMP_DIR.joinpath(".ui.json").write_text("{}", encoding="utf-8")
try:
app.config.update(
INSTANCES=Instances(DB),
@ -231,8 +282,6 @@ with app.app_context():
app.logger.error(repr(e), e.filename)
stop(1)
plugin_id_rx = re_compile(r"^[\w_-]{1,64}$")
# Declare functions for jinja2
app.jinja_env.globals.update(check_settings=check_settings)
@ -240,7 +289,8 @@ with app.app_context():
csrf = CSRFProtect()
csrf.init_app(app)
ui_data = Manager().dict()
app.data = Manager().dict()
app.totp = Totp(app, TOTP_SECRETS, [key.encode() for key in MF_RECOVERY_CODES_KEYS])
LOG_RX = re_compile(r"^(?P<date>\d+/\d+/\d+\s\d+:\d+:\d+)\s\[(?P<level>[a-z]+)\]\s\d+#\d+:\s(?P<message>[^\n]+)$")
REVERSE_PROXY_PATH = re_compile(r"^(?P<host>https?://.{1,255}(:((6553[0-5])|(655[0-2]\d)|(65[0-4]\d{2})|(6[0-4]\d{3})|([1-5]\d{4})|([0-5]{0,5})|(\d{1,4})))?)$")
@ -274,8 +324,8 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
# Do the operation
error = 0
if "TO_FLASH" not in ui_data:
ui_data["TO_FLASH"] = []
if "TO_FLASH" not in app.data:
app.data["TO_FLASH"] = []
if method == "services":
if operation == "new":
@ -301,22 +351,22 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
if operation:
if isinstance(operation, list):
for op in operation:
ui_data["TO_FLASH"].append({"content": f"Reload failed for the instance {op}", "type": "error"})
app.data["TO_FLASH"].append({"content": f"Reload failed for the instance {op}", "type": "error"})
elif operation.startswith(("Can't", "The database is read-only")):
ui_data["TO_FLASH"].append({"content": operation, "type": "error"})
app.data["TO_FLASH"].append({"content": operation, "type": "error"})
else:
ui_data["TO_FLASH"].append({"content": operation, "type": "success"})
app.data["TO_FLASH"].append({"content": operation, "type": "success"})
if not threaded:
for f in ui_data.get("TO_FLASH", []):
for f in app.data.get("TO_FLASH", []):
if f["type"] == "error":
flash(f["content"], "error")
else:
flash(f["content"])
ui_data["TO_FLASH"] = []
app.data["TO_FLASH"] = []
ui_data["RELOADING"] = False
app.data["RELOADING"] = False
return error
@ -455,17 +505,17 @@ def inject_variables():
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
)
if not changes_ongoing and ui_data.get("PRO_LOADING"):
ui_data["PRO_LOADING"] = False
if not changes_ongoing and app.data.get("PRO_LOADING"):
app.data["PRO_LOADING"] = False
if not changes_ongoing and metadata["failover"]:
flash(
"The last changes could not be applied because it creates a configuration error on NGINX, please check the logs for more information. The configured fell back to the last working one.",
"error",
)
elif not changes_ongoing and not metadata["failover"] and ui_data.get("CONFIG_CHANGED", False):
elif not changes_ongoing and not metadata["failover"] and app.data.get("CONFIG_CHANGED", False):
flash("The last changes have been applied successfully.", "success")
ui_data["CONFIG_CHANGED"] = False
app.data["CONFIG_CHANGED"] = False
# check that is value is in tuple
return dict(
@ -477,9 +527,9 @@ def inject_variables():
pro_expire=metadata["pro_expire"].strftime("%d-%m-%Y") if metadata["pro_expire"] else "Unknown",
pro_overlapped=metadata["pro_overlapped"],
plugins=app.config["CONFIG"].get_plugins(),
pro_loading=ui_data.get("PRO_LOADING", False),
pro_loading=app.data.get("PRO_LOADING", False),
bw_version=metadata["version"],
is_readonly=ui_data.get("READONLY_MODE", False),
is_readonly=app.data.get("READONLY_MODE", False),
username=current_user.get_id() if current_user.is_authenticated else "",
)
@ -517,7 +567,8 @@ def set_security_headers(response):
# * Referrer-Policy header to prevent leaking of sensitive data
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
app.logger.debug(f"UI data: {ui_data}")
if current_user.totp_refreshed:
DB.set_ui_user_recovery_code_refreshed(current_user.get_id(), False)
return response
@ -556,7 +607,7 @@ def handle_csrf_error(_):
@app.before_request
def before_request():
if ui_data.get("SERVER_STOPPING", False):
if app.data.get("SERVER_STOPPING", False):
response = make_response(jsonify({"message": "Server is shutting down, try again later."}), 503)
response.headers["Retry-After"] = 30 # Clients should retry after 30 seconds # type: ignore
return response
@ -568,14 +619,14 @@ def before_request():
DB.database_uri
and DB.readonly
and (
datetime.now(timezone.utc) - datetime.fromisoformat(ui_data.get("LAST_DATABASE_RETRY", "1970-01-01T00:00:00")).replace(tzinfo=timezone.utc)
datetime.now(timezone.utc) - datetime.fromisoformat(app.data.get("LAST_DATABASE_RETRY", "1970-01-01T00:00:00")).replace(tzinfo=timezone.utc)
> timedelta(minutes=1)
)
):
try:
DB.retry_connection(pool_timeout=1)
DB.retry_connection(log=False)
ui_data["READONLY_MODE"] = False
app.data["READONLY_MODE"] = False
app.logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
@ -586,22 +637,22 @@ def before_request():
with suppress(BaseException):
DB.retry_connection(fallback=True, pool_timeout=1)
DB.retry_connection(fallback=True, log=False)
ui_data["READONLY_MODE"] = True
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
elif not ui_data.get("READONLY_MODE", False) and request.method == "POST" and not ("/totp" in request.path or "/login" in request.path):
app.data["READONLY_MODE"] = True
app.data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
elif not app.data.get("READONLY_MODE", False) and request.method == "POST" and not ("/totp" in request.path or "/login" in request.path):
try:
DB.test_write()
ui_data["READONLY_MODE"] = False
app.data["READONLY_MODE"] = False
except BaseException:
ui_data["READONLY_MODE"] = True
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
app.data["READONLY_MODE"] = True
app.data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
else:
try:
DB.test_read()
except BaseException:
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
app.data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat() if DB.last_connection_retry else datetime.now().isoformat()
DB.readonly = ui_data.get("READONLY_MODE", False)
DB.readonly = app.data.get("READONLY_MODE", False)
if DB.readonly:
flash("Database connection is in read-only mode : no modification possible.", "error")
@ -684,7 +735,9 @@ def setup():
"setup",
)
ret = DB.create_ui_user(request.form["admin_username"], gen_password_hash(request.form["admin_password"]), ["admin"], method="ui", admin=True)
ret = DB.create_ui_user(
request.form["admin_username"], gen_password_hash(request.form["admin_password"], PASSWORD_SALT), ["admin"], method="ui", admin=True
)
if ret:
return handle_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error")
@ -702,8 +755,8 @@ def setup():
if not REVERSE_PROXY_PATH.match(request.form["ui_host"]):
return handle_error("The hostname is not valid.", "setup")
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
app.data["RELOADING"] = True
app.data["LAST_RELOAD"] = time()
config = {
"SERVER_NAME": request.form["server_name"],
@ -758,10 +811,11 @@ def totp():
if request.method == "POST":
verify_data_in_form(data={"totp_token": None}, err_message="No token provided on /totp.", redirect_url="totp")
if request.form["totp_token"] not in current_user.list_recovery_codes and not current_user.check_otp(request.form["totp_token"]):
return handle_error("The token is invalid.", "totp")
elif request.form["totp_token"] in current_user.list_recovery_codes:
DB.use_ui_user_recovery_code(current_user.get_id(), request.form["totp_token"])
if not app.totp.verify_totp(request.form["totp_token"], user=current_user):
if not app.totp.verify_recovery_code(request.form["totp_token"], user=current_user):
return handle_error("The token is invalid.", "totp")
else:
DB.use_ui_user_recovery_code(current_user.get_id(), app.totp.encrypt_recovery_code(request.form["totp_token"]))
session["totp_validated"] = True
redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
@ -823,7 +877,7 @@ def home():
metadata = DB.get_metadata()
data = {
"check_version": not remote_version or bw_version == remote_version,
"check_version": not remote_version or BW_VERSION == remote_version,
"remote_version": remote_version,
"version": metadata["version"],
"instances_number": len(instances),
@ -888,16 +942,16 @@ def account():
if not manage_bunkerweb("global_config", variable, threaded=threaded):
message = "Checking license key to upgrade."
if threaded:
ui_data["TO_FLASH"].append({"content": message, "type": "success"})
app.data["TO_FLASH"].append({"content": message, "type": "success"})
else:
flash(message)
ui_data["PRO_LOADING"] = True
ui_data["CONFIG_CHANGED"] = True
app.data["PRO_LOADING"] = True
app.data["CONFIG_CHANGED"] = True
if any(curr_changes.values()):
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
app.data["RELOADING"] = True
app.data["LAST_RELOAD"] = time()
Thread(target=update_global_config, args=(True,)).start()
else:
update_global_config()
@ -911,7 +965,8 @@ def account():
username = current_user.get_id()
password = request.form["curr_password"]
secret_token = current_user.totp_secret
totp_secret = current_user.totp_secret
totp_recovery_codes = current_user.list_recovery_codes
if request.form["operation"] == "username":
verify_data_in_form(data={"admin_username": None}, err_message="Missing admin username parameter on /account.", redirect_url="account")
@ -946,16 +1001,26 @@ def account():
if request.form["operation"] == "totp":
verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /account.", redirect_url="account")
if request.form["totp_token"] not in current_user.list_recovery_codes and not current_user.check_otp(
request.form["totp_token"], secret=session.get("tmp_totp_secret")
):
if not app.totp.verify_totp(
request.form["totp_token"], totp_secret=session.get("tmp_totp_secret", ""), user=current_user
) and not app.totp.verify_recovery_code(request.form["totp_token"], user=current_user):
return handle_error("The totp token is invalid. (totp)", "account")
session["totp_validated"] = not bool(current_user.totp_secret)
secret_token = None if bool(current_user.totp_secret) else session.get("tmp_totp_secret")
session["tmp_totp_secret"] = None
totp_secret = None if bool(current_user.totp_secret) else session.pop("tmp_totp_secret", "")
ret = DB.update_ui_user(username, gen_password_hash(password), secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
if totp_secret and totp_secret != current_user.totp_secret:
totp_recovery_codes = app.totp.generate_recovery_codes()
app.logger.debug(f"totp recovery codes: {totp_recovery_codes}")
ret = DB.update_ui_user(
username,
gen_password_hash(password, PASSWORD_SALT),
totp_secret,
totp_recovery_codes=totp_recovery_codes,
method=current_user.method if request.form["operation"] == "totp" else "ui",
)
if ret:
return handle_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
@ -970,18 +1035,19 @@ def account():
return redirect(url_for("account" if request.form["operation"] == "totp" else "login"))
totp_qr_image = ""
if not bool(current_user.totp_secret):
session["tmp_totp_secret"] = random_base32()
totp = TOTP(session["tmp_totp_secret"])
totp_qr_image = get_b64encoded_qr_image(totp.provisioning_uri(name=current_user.get_id(), issuer_name="BunkerWeb UI"))
session["tmp_totp_secret"] = app.totp.generate_totp_secret()
totp_qr_image = app.totp.generate_qrcode(current_user.get_id(), session["tmp_totp_secret"])
# TODO: Show user backup codes after TOTP refresh + add refresh feature
return render_template(
"account.html",
username=current_user.get_id(),
is_totp=bool(current_user.totp_secret),
secret_token=session["tmp_totp_secret"],
secret_token=app.totp.get_totp_pretty_key(session.get("tmp_totp_secret", "")),
totp_qr_image=totp_qr_image,
totp_refrehed=current_user.totp_refreshed,
totp_recovery_codes=app.totp.decrypt_recovery_codes(current_user) if current_user.totp_refreshed else [],
)
@ -1006,8 +1072,8 @@ def instances():
next=True,
)
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
app.data["RELOADING"] = True
app.data["LAST_RELOAD"] = time()
Thread(
target=manage_bunkerweb,
name="Reloading instances",
@ -1160,13 +1226,13 @@ def services():
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
):
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
app.data["RELOADING"] = True
app.data["LAST_RELOAD"] = time()
Thread(target=update_services, args=(True,)).start()
else:
update_services()
ui_data["CONFIG_CHANGED"] = True
app.data["CONFIG_CHANGED"] = True
message = ""
@ -1271,13 +1337,13 @@ def services_raw(service_name: str):
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
):
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
app.data["RELOADING"] = True
app.data["LAST_RELOAD"] = time()
Thread(target=update_services, args=(True,)).start()
else:
update_services()
ui_data["CONFIG_CHANGED"] = True
app.data["CONFIG_CHANGED"] = True
message = ""
@ -1378,20 +1444,20 @@ def global_config():
manage_bunkerweb("global_config", variables, threaded=threaded)
if "PRO_LICENSE_KEY" in variables:
ui_data["PRO_LOADING"] = True
app.data["PRO_LOADING"] = True
if any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
):
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
app.data["RELOADING"] = True
app.data["LAST_RELOAD"] = time()
Thread(target=update_global_config, args=(True,)).start()
else:
update_global_config()
ui_data["CONFIG_CHANGED"] = True
app.data["CONFIG_CHANGED"] = True
with suppress(BaseException):
if config["PRO_LICENSE_KEY"]["value"] != variables["PRO_LICENSE_KEY"]:
@ -1509,7 +1575,7 @@ def configs():
app.logger.error(f"Could not save custom configs: {error}")
return handle_error("Couldn't save custom configs", "configs", True)
ui_data["CONFIG_CHANGED"] = True
app.data["CONFIG_CHANGED"] = True
flash(operation)
@ -1568,25 +1634,25 @@ def plugins():
if err:
message = f"Couldn't update external plugins to database: {err}"
if threaded:
ui_data["TO_FLASH"].append({"content": message, "type": "error"})
app.data["TO_FLASH"].append({"content": message, "type": "error"})
else:
error_message(message)
else:
message = f"Deleted plugin {variables['name']} successfully"
if threaded:
ui_data["TO_FLASH"].append({"content": message, "type": "success"})
app.data["TO_FLASH"].append({"content": message, "type": "success"})
else:
flash(message)
ui_data["RELOADING"] = False
app.data["RELOADING"] = False
if any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
):
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
app.data["RELOADING"] = True
app.data["LAST_RELOAD"] = time()
Thread(target=update_plugins, args=(True,)).start()
else:
@ -1775,25 +1841,25 @@ def plugins():
if err:
message = f"Couldn't update external plugins to database: {err}"
if threaded:
ui_data["TO_FLASH"].append({"content": message, "type": "error"})
app.data["TO_FLASH"].append({"content": message, "type": "error"})
else:
flash(message, "error")
else:
message = "Plugins uploaded successfully"
if threaded:
ui_data["TO_FLASH"].append({"content": message, "type": "success"})
app.data["TO_FLASH"].append({"content": message, "type": "success"})
else:
flash("Plugins uploaded successfully")
ui_data["RELOADING"] = False
app.data["RELOADING"] = False
if any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
):
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
app.data["RELOADING"] = True
app.data["LAST_RELOAD"] = time()
Thread(target=update_plugins, args=(True,)).start()
else:
@ -1901,7 +1967,7 @@ def upload_plugin():
@app.route("/plugins/<plugin>", methods=["GET", "POST"])
@login_required
def custom_plugin(plugin: str):
if not plugin_id_rx.match(plugin):
if not PLUGIN_ID_RX.match(plugin):
return error_message("Invalid plugin id, (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)"), 400
# Case we ware looking for a plugin template
@ -2574,7 +2640,6 @@ def login():
# log the user in
session["user_agent"] = request.headers.get("User-Agent")
session["totp_validated"] = False
session["tmp_totp_secret"] = None
ui_user.last_login_at = datetime.now()
ui_user.last_login_ip = request.remote_addr
@ -2607,21 +2672,21 @@ def login():
@app.route("/check_reloading")
@login_required
def check_reloading():
if not ui_data.get("RELOADING", False) or ui_data.get("LAST_RELOAD", 0) + 60 < time():
if ui_data.get("RELOADING", False):
if not app.data.get("RELOADING", False) or app.data.get("LAST_RELOAD", 0) + 60 < time():
if app.data.get("RELOADING", False):
app.logger.warning("Reloading took too long, forcing the state to be reloaded")
flash("Forced the status to be reloaded", "error")
ui_data["RELOADING"] = False
app.data["RELOADING"] = False
for f in ui_data.get("TO_FLASH", []):
for f in app.data.get("TO_FLASH", []):
if f["type"] == "error":
flash(f["content"], "error")
else:
flash(f["content"])
ui_data["TO_FLASH"] = []
app.data["TO_FLASH"] = []
return jsonify({"reloading": ui_data.get("RELOADING", False)})
return jsonify({"reloading": app.data.get("RELOADING", False)})
@app.route("/logout")

View file

@ -2,7 +2,6 @@ from datetime import datetime, timezone
from functools import partial
from os.path import join, sep
from sys import path as sys_path
from typing import Optional
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("db",))]:
if deps_path not in sys_path:
@ -10,9 +9,8 @@ for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in ((
from bcrypt import checkpw
from flask_login import AnonymousUserMixin, UserMixin
from pyotp.totp import TOTP
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy import Boolean, DateTime, Column, Integer, String, ForeignKey, func
from sqlalchemy import Boolean, DateTime, Column, Integer, String, ForeignKey, UnicodeText, func
from model import METHODS_ENUM # type: ignore
@ -30,17 +28,19 @@ class AnonymousUser(AnonymousUserMixin):
last_login_ip = None
login_count = 0
totp_secret = None
totp_refreshed = False
creation_date = datetime.now(timezone.utc)
update_date = datetime.now(timezone.utc)
list_roles = []
list_permissions = []
list_recovery_codes = []
def get_id(self):
return self.username
def check_password(self, password: str) -> bool:
return False
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
if not secret:
raise ValueError("Secret is required for anonymous user")
return TOTP(secret).verify(otp, valid_window=3)
class Users(Base, UserMixin):
__tablename__ = "bw_ui_users"
@ -57,7 +57,7 @@ class Users(Base, UserMixin):
login_count = Column(Integer, default=0, nullable=False)
# 2FA
totp_secret = Column(String(32), nullable=True)
totp_secret = Column(String(256), nullable=True)
totp_refreshed = Column(Boolean, nullable=False, default=False)
creation_date = Column(DateTime(), nullable=False, server_default=func.now())
@ -75,11 +75,6 @@ class Users(Base, UserMixin):
def check_password(self, password: str) -> bool:
return checkpw(password.encode("utf-8"), self.password.encode("utf-8"))
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
if secret:
return TOTP(secret).verify(otp, valid_window=3)
return TOTP(self.totp_secret).verify(otp, valid_window=3)
class Roles(Base):
__tablename__ = "bw_ui_roles"
@ -107,7 +102,7 @@ class UserRecoveryCodes(Base):
id = Column(Integer, primary_key=True)
user_name = Column(String(256), ForeignKey("bw_ui_users.username", onupdate="cascade", ondelete="cascade"), nullable=False)
code = Column(String(19), nullable=False)
code = Column(UnicodeText, nullable=False)
user = relationship("Users", back_populates="recovery_codes")

View file

@ -5,7 +5,6 @@ Flask-Login==0.6.3
Flask_WTF==1.2.1
gunicorn[gthread]==22.0.0
passlib==1.7.4
pyotp==2.9.0
python-magic==0.4.27
python_dateutil==2.9.0.post0
qrcode==7.4.2

View file

@ -151,10 +151,6 @@ passlib==1.7.4 \
--hash=sha256:aa6bca462b8d8bda89c70b382f0c298a20b5560af6cbfa2dce410c0a2fb669f1 \
--hash=sha256:defd50f72b65c5402ab2c573830a6978e5f202ad0d984793c8dde2c4152ebe04
# via -r requirements.in
pyotp==2.9.0 \
--hash=sha256:346b6642e0dbdde3b4ff5a930b664ca82abfa116356ed48cc42c7d6590d36f63 \
--hash=sha256:81c2e5865b8ac55e825b0358e496e1d9387c811e85bb40e71a3b29b288963612
# via -r requirements.in
pypng==0.20220715.0 \
--hash=sha256:4a43e969b8f5aaafb2a415536c1a8ec7e341cd6a3f957fd5b5f32a4cfeed902c \
--hash=sha256:739c433ba96f078315de54c0db975aee537cbc3e1d0ae4ed9aab0ca1e427e2c1

104
src/ui/src/totp.py Normal file
View file

@ -0,0 +1,104 @@
from base64 import b64encode
from contextlib import suppress
from io import BytesIO
from cryptography.fernet import Fernet, InvalidToken, MultiFernet
from typing import Dict, List, Optional, Union
from flask import Flask
from passlib.totp import TOTP, MalformedTokenError, TokenError, TotpMatch
from passlib.pwd import genword
from qrcode import make
from qrcode.image.svg import SvgImage
from models import Users
class Totp:
def __init__(self, app: Flask, secrets: Dict[Union[str, int], str], recovery_codes_keys: List[bytes]):
"""Initialize a totp factory.
secrets are used to encrypt the per-user totp_secret on disk.
recovery_codes_keys are used to encrypt the per-user recovery codes on disk.
"""
# This should be a dict with at least one entry
self.app = app
self._totp = TOTP.using(secrets=secrets, issuer="BunkerWeb UI")
self.cryptor: Optional[MultiFernet] = None
if recovery_codes_keys:
self.cryptor = MultiFernet([Fernet(key) for key in recovery_codes_keys])
def generate_totp_secret(self) -> str:
"""Create new user-unique totp_secret."""
return self._totp.new().to_json(encrypt=True)
def get_totp_pretty_key(self, totp_secret: str) -> str:
"""Generate pretty key for manual input"""
if not totp_secret:
return ""
return self._totp.from_source(totp_secret).pretty_key()
def generate_recovery_codes(self) -> List[str]:
codes = ["-".join([pwd[i : i + 4] for i in range(0, len(pwd), 4)]) for pwd in genword(length=16, charset="hex", returns=5)] # noqa: E203
if not self.cryptor:
return codes
return [self.cryptor.encrypt(code.encode()).decode() for code in codes]
def decrypt_recovery_code(self, code: str) -> Optional[str]:
if not self.cryptor:
return code
return self.cryptor.decrypt(code.encode()).decode()
def decrypt_recovery_codes(self, user: Users) -> List[str]:
return [self.decrypt_recovery_code(code) for code in user.list_recovery_codes]
def encrypt_recovery_code(self, code: str) -> Optional[str]:
if not self.cryptor:
return code
return self.cryptor.encrypt(code.encode()).decode()
def verify_recovery_code(self, code: str, user: Users) -> bool:
"""Check if recovery code is valid for user."""
if not user.list_recovery_codes:
return False
with suppress(InvalidToken):
if code in self.decrypt_recovery_codes(user):
return True
return False
def verify_totp(self, token: str, *, totp_secret: Optional[str] = None, user: Optional[Users] = None) -> bool:
"""Verifies token for specific user."""
if not totp_secret and not user:
raise ValueError("Either totp_secret or user must be provided")
elif not totp_secret:
totp_secret = user.totp_secret
try:
tmatch = self._totp.verify(token, totp_secret, last_counter=self.get_last_counter(user))
if user:
self.set_last_counter(user, tmatch)
return True
except (MalformedTokenError, TokenError):
return False
def get_totp_uri(self, username: str, totp_secret: str) -> str:
"""Generate provisioning url for use with the qrcode scanner built into the app"""
return self._totp.from_source(totp_secret).to_uri(username, "BunkerWeb UI")
def generate_qrcode(self, username: str, totp: str) -> str:
"""Generate QRcode Using username, totp, generate the actual QRcode image."""
totp_image = make(self.get_totp_uri(username, totp), image_factory=SvgImage)
with BytesIO() as virtual_file:
totp_image.save(virtual_file)
image_as_str = b64encode(virtual_file.getvalue()).decode("ascii")
return f"data:image/svg+xml;base64,{image_as_str}"
def get_last_counter(self, user: Users) -> Optional[int]:
"""Fetch stored last_counter from cache."""
return self.app.data.get("totp_last_counter", {}).get(user.get_id())
def set_last_counter(self, user: Users, tmatch: TotpMatch) -> None:
"""Cache last_counter."""
if "totp_last_counter" not in self.app.data:
self.app.data["totp_last_counter"] = {}
self.app.data["totp_last_counter"][user.get_id()] = tmatch.counter

View file

@ -436,7 +436,7 @@
class="flex flex-col relative col-span-12 px-4 my-2 md:px-6 md:my-3 lg:px-6 lg:my-3 max-w-[400px] w-full">
<h5 class="input-title text-center">2FA QR CODE</h5>
<div class="flex justify-center mt-2">
<img src="data:image/png;base64, {{ totp_qr_image }}"
<img src="{{ totp_qr_image }}"
alt="Secret Token"
style="width: 200px;
height: 200px" />

View file

@ -16,7 +16,6 @@ from sqlalchemy.exc import IntegrityError
from Database import Database # type: ignore
from models import Base, Users, Roles, RolesUsers, UserRecoveryCodes, RolesPermissions, Permissions
from utils import gen_recovery_codes
class UIDatabase(Database):
@ -167,6 +166,7 @@ class UIDatabase(Database):
email: Optional[str] = None,
*,
totp_secret: Optional[str] = None,
totp_recovery_codes: Optional[List[str]] = None,
method: str = "manual",
admin: bool = False,
) -> str:
@ -199,9 +199,8 @@ class UIDatabase(Database):
)
)
if totp_secret:
for code in gen_recovery_codes():
session.add(UserRecoveryCodes(user_name=username, code=code))
for code in totp_recovery_codes or []:
session.add(UserRecoveryCodes(user_name=username, code=code))
try:
session.commit()
@ -210,7 +209,9 @@ class UIDatabase(Database):
return ""
def update_ui_user(self, username: str, password: bytes, totp_secret: Optional[str], method: str = "manual") -> str:
def update_ui_user(
self, username: str, password: bytes, totp_secret: Optional[str], *, totp_recovery_codes: Optional[List[str]] = None, method: str = "manual"
) -> str:
"""Update ui user."""
with self._db_session() as session:
if self.readonly:
@ -233,8 +234,8 @@ class UIDatabase(Database):
return str(e)
if user.totp_refreshed:
if totp_secret:
self.refresh_ui_user_recovery_codes(username)
if totp_recovery_codes:
self.refresh_ui_user_recovery_codes(username, totp_recovery_codes or [])
else:
self.delete_ui_user_recovery_codes(username)
@ -325,7 +326,7 @@ class UIDatabase(Database):
return roles_data
def refresh_ui_user_recovery_codes(self, username: str) -> str:
def refresh_ui_user_recovery_codes(self, username: str, codes: List[str]) -> str:
"""Refresh ui user recovery codes."""
with self._db_session() as session:
if self.readonly:
@ -335,11 +336,16 @@ class UIDatabase(Database):
if not user:
return f"User {username} doesn't exist"
if not codes:
return "No recovery codes provided"
session.query(UserRecoveryCodes).filter_by(user_name=username).delete()
for code in gen_recovery_codes():
for code in codes:
session.add(UserRecoveryCodes(user_name=username, code=code))
user.totp_refreshed = True
try:
session.commit()
except BaseException as e:
@ -404,3 +410,19 @@ class UIDatabase(Database):
session.commit()
except BaseException as e:
return str(e)
def set_ui_user_recovery_code_refreshed(self, username: str, value: bool) -> str:
"""Set ui user recovery code refreshed."""
with self._db_session() as session:
user = session.query(Users).filter_by(username=username).first()
if not user:
return f"User {username} doesn't exist"
user.totp_refreshed = value
try:
session.commit()
except BaseException as e:
return str(e)
return ""

View file

@ -6,14 +6,14 @@ from os.path import join
from threading import Lock
from typing import List, Optional
from bcrypt import checkpw, gensalt, hashpw
from bcrypt import checkpw, hashpw
from magic import Magic
from passlib.pwd import genword
from qrcode.main import QRCode
from regex import compile as re_compile
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")
PLUGIN_KEYS = ["id", "name", "description", "version", "stream", "settings"]
PLUGIN_ID_RX = re_compile(r"^[\w_-]{1,64}$")
LOCK = Lock()
@ -215,19 +215,14 @@ def check_settings(settings: dict, check: str) -> bool:
return any(setting["context"] == check for setting in settings.values())
def gen_password_hash(password: str) -> bytes:
return hashpw(password.encode("utf-8"), gensalt(rounds=13))
def gen_password_hash(password: str, salt: str) -> bytes:
return hashpw(password.encode("utf-8"), salt.encode("utf-8"))
def check_password(password: str, hashed: bytes) -> bool:
return checkpw(password.encode("utf-8"), hashed)
def gen_recovery_codes() -> List[str]:
pwds = genword(length=16, charset="hex", returns=5)
return ["-".join([pwd[i : i + 4] for i in range(0, len(pwd), 4)]) for pwd in pwds] # noqa: E203
def get_b64encoded_qr_image(data: str):
qr = QRCode(version=1, box_size=10, border=5)
qr.add_data(data)