Refactor user management in web UI + Start adding roles logic + Enhanced 2FA feature with recovery codes

This commit is contained in:
Théophile Diot 2024-07-30 18:21:17 +01:00
parent e8b3dbc348
commit 26c3da657f
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
12 changed files with 820 additions and 423 deletions

View file

@ -29,7 +29,6 @@ from model import (
Jobs_cache,
Custom_configs,
Selects,
Users,
BwcliCommands,
Metadata,
)
@ -84,7 +83,7 @@ class Database:
if pool:
self.logger.warning("The pool parameter is deprecated, it will be removed in the next version")
self.__session_factory = None
self._session_factory = None
self.sql_engine = None
if not sqlalchemy_string:
@ -212,8 +211,8 @@ class Database:
def __del__(self) -> None:
"""Close the database"""
if self.__session_factory:
self.__session_factory.close_all()
if self._session_factory:
self._session_factory.close_all()
if self.sql_engine:
self.sql_engine.dispose()
@ -221,13 +220,13 @@ class Database:
def test_read(self):
"""Test the read access to the database"""
self.logger.debug("Testing read access to the database ...")
with self.__db_session() as session:
with self._db_session() as session:
session.execute(text("SELECT 1"))
def test_write(self):
"""Test the write access to the database"""
self.logger.debug("Testing write access to the database ...")
with self.__db_session() as session:
with self._db_session() as session:
table_name = uuid4().hex
session.execute(text(f"CREATE TABLE IF NOT EXISTS test_{table_name} (id INT)"))
session.execute(text(f"DROP TABLE IF EXISTS test_{table_name}"))
@ -259,7 +258,7 @@ class Database:
conn.execute(text(f"DROP TABLE IF EXISTS test_{table_name}"))
@contextmanager
def __db_session(self) -> Any:
def _db_session(self) -> Any:
try:
assert self.sql_engine is not None
except AssertionError:
@ -302,7 +301,7 @@ class Database:
def is_setting(self, setting: str, *, multisite: bool = False) -> bool:
"""Check if the setting exists in the database and optionally if it's multisite"""
with self.__db_session() as session:
with self._db_session() as session:
try:
multiple = False
if self.suffix_rx.search(setting):
@ -323,7 +322,7 @@ class Database:
def set_failover(self, value: bool = True) -> str:
"""Set the failover value"""
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -342,7 +341,7 @@ class Database:
def initialize_db(self, version: str, integration: str = "Unknown") -> str:
"""Initialize the database"""
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -397,7 +396,7 @@ class Database:
"database_version": "Unknown", # ? Extracted from the database
"default": True, # ? Extra field to know if the returned data is the default one
}
with self.__db_session() as session:
with self._db_session() as session:
try:
database = self.database_uri.split(":")[0].split("+")[0]
data["database_version"] = (
@ -422,7 +421,7 @@ class Database:
def set_metadata(self, data: Dict[str, Any]) -> str:
"""Set the metadata values"""
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -453,7 +452,7 @@ class Database:
"""Set changed bit for config, custom configs, instances and plugins"""
changes = changes or ["config", "custom_configs", "external_plugins", "pro_plugins", "instances"]
plugins_changes = plugins_changes or set()
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -536,14 +535,14 @@ class Database:
has_all_tables = False
continue
with self.__db_session() as session:
with self._db_session() as session:
old_data[table_name] = session.query(metadata.tables[table_name]).all()
# Rename the old tables
db_version_id = db_version.replace(".", "_")
for table_name in metadata.tables.keys():
if table_name in Base.metadata.tables:
with self.__db_session() as session:
with self._db_session() as session:
if inspector.has_table(f"{table_name}_{db_version_id}"):
self.logger.warning(f'Table "{table_name}" already exists, dropping it to make room for the new one')
session.execute(text(f"DROP TABLE {table_name}_{db_version_id}"))
@ -561,7 +560,7 @@ class Database:
return False, str(e)
to_put = []
with self.__db_session() as session:
with self._db_session() as session:
db_plugins = session.query(Plugins).with_entities(Plugins.id).all()
db_ids = []
@ -1048,7 +1047,7 @@ class Database:
if table_name == "bw_plugins" and "external" in row:
row["type"] = "external" if row.pop("external") else "core"
with self.__db_session() as session:
with self._db_session() as session:
try:
if table_name == "bw_metadata":
existing_row = session.query(Metadata).filter_by(id=1).first()
@ -1081,7 +1080,7 @@ class Database:
if method == "autoconf":
db_config = self.get_non_default_settings(with_drafts=True)
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -1365,7 +1364,7 @@ class Database:
) -> str:
"""Save the custom configs in the database"""
message = ""
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -1453,7 +1452,7 @@ class Database:
if filtered_settings and not global_only:
filtered_settings.update(("SERVER_NAME", "MULTISITE"))
with self.__db_session() as session:
with self._db_session() as session:
config = original_config or {}
multisite = original_multisite or set()
@ -1550,7 +1549,7 @@ class Database:
filtered_settings: Optional[Union[List[str], Set[str], Tuple[str]]] = None,
) -> Dict[str, Any]:
"""Get the config from the database"""
with self.__db_session() as session:
with self._db_session() as session:
config = {}
multisite = set()
@ -1585,7 +1584,7 @@ class Database:
def get_custom_configs(self) -> List[Dict[str, Any]]:
"""Get the custom configs from the database"""
with self.__db_session() as session:
with self._db_session() as session:
return [
{
"service_id": custom_config.service_id,
@ -1630,7 +1629,7 @@ class Database:
def update_job(self, plugin_id: str, job_name: str, success: bool) -> str:
"""Update the job last_run in the database"""
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -1657,7 +1656,7 @@ class Database:
if service_id:
filters["service_id"] = service_id
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -1680,7 +1679,7 @@ class Database:
"""Update the plugin cache in the database"""
job_name = job_name or argv[0].replace(".py", "")
service_id = service_id or None
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -1713,7 +1712,7 @@ class Database:
"""Update external plugins from the database"""
to_put = []
changes = False
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -2226,7 +2225,7 @@ class Database:
def get_plugins(self, *, _type: Literal["all", "external", "pro"] = "all", with_data: bool = False) -> List[Dict[str, Any]]:
"""Get all plugins from the database."""
plugins = []
with self.__db_session() as session:
with self._db_session() as session:
entities = [Plugins.id, Plugins.stream, Plugins.name, Plugins.description, Plugins.version, Plugins.type, Plugins.method, Plugins.checksum]
if with_data:
entities.append(Plugins.data) # type: ignore
@ -2292,12 +2291,12 @@ class Database:
def get_plugins_errors(self) -> int:
"""Get plugins errors."""
with self.__db_session() as session:
with self._db_session() as session:
return session.query(Jobs).filter(Jobs.success == False).count() # noqa: E712
def get_jobs(self) -> Dict[str, Dict[str, Any]]:
"""Get jobs."""
with self.__db_session() as session:
with self._db_session() as session:
return {
job.name: {
"plugin_id": job.plugin_id,
@ -2346,7 +2345,7 @@ class Database:
if service_id:
filters["service_id"] = service_id
with self.__db_session() as session:
with self._db_session() as session:
if plugin_id:
job = session.query(Jobs).filter_by(name=job_name, plugin_id=plugin_id).first()
if not job:
@ -2368,7 +2367,7 @@ class Database:
def get_jobs_cache_files(self, *, job_name: str = "", plugin_id: str = "") -> List[Dict[str, Any]]:
"""Get jobs cache files."""
with self.__db_session() as session:
with self._db_session() as session:
filters = {}
query = session.query(Jobs_cache).with_entities(Jobs_cache.job_name, Jobs_cache.service_id, Jobs_cache.file_name, Jobs_cache.data)
@ -2413,7 +2412,7 @@ class Database:
def add_instance(self, hostname: str, port: int, server_name: str, method: str, changed: Optional[bool] = True) -> str:
"""Add instance."""
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -2441,7 +2440,7 @@ class Database:
def update_instances(self, instances: List[Dict[str, Any]], method: str, changed: Optional[bool] = True) -> str:
"""Update instances."""
to_put = []
with self.__db_session() as session:
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -2474,7 +2473,7 @@ class Database:
def get_instances(self, *, method: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get instances."""
with self.__db_session() as session:
with self._db_session() as session:
query = session.query(Instances)
if method:
query = query.filter_by(method=method)
@ -2491,7 +2490,7 @@ class Database:
def get_plugin_actions(self, plugin: str) -> Optional[Any]:
"""get actions file for the plugin"""
with self.__db_session() as session:
with self._db_session() as session:
page = session.query(Plugin_pages).with_entities(Plugin_pages.actions_file).filter_by(plugin_id=plugin).first()
if not page:
@ -2501,7 +2500,7 @@ class Database:
def get_plugin_template(self, plugin: str) -> Optional[Any]:
"""get template file for the plugin"""
with self.__db_session() as session:
with self._db_session() as session:
page = session.query(Plugin_pages).with_entities(Plugin_pages.template_file).filter_by(plugin_id=plugin).first()
if not page:
@ -2511,73 +2510,10 @@ class Database:
def get_plugin_obfuscation(self, plugin: str) -> Optional[Any]:
"""get obfuscation file for the plugin"""
with self.__db_session() as session:
with self._db_session() as session:
page = session.query(Plugin_pages).with_entities(Plugin_pages.obfuscation_file).filter_by(plugin_id=plugin).first()
if not page:
return None
return page.obfuscation_file
def get_ui_user(self) -> Optional[dict]:
"""Get ui user."""
with self.__db_session() as session:
user = (
session.query(Users)
.with_entities(Users.username, Users.password, Users.is_two_factor_enabled, Users.secret_token, Users.method)
.filter_by(id=1)
.first()
)
if not user:
return None
return {
"username": user.username,
"password_hash": user.password.encode("utf-8"),
"is_two_factor_enabled": user.is_two_factor_enabled,
"secret_token": user.secret_token,
"method": user.method,
}
def create_ui_user(self, username: str, password: bytes, *, secret_token: Optional[str] = None, method: str = "manual") -> str:
"""Create ui user."""
with self.__db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
user = session.query(Users).filter_by(id=1).first()
if user:
return "User already exists"
session.add(Users(id=1, username=username, password=password.decode("utf-8"), secret_token=secret_token, method=method))
try:
session.commit()
except BaseException as e:
return str(e)
return ""
def update_ui_user(
self, username: str, password: bytes, is_two_factor_enabled: bool = False, secret_token: Optional[str] = None, method: str = "ui"
) -> str:
"""Update ui user."""
with self.__db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
user = session.query(Users).filter_by(id=1).first()
if not user:
return "User not found"
user.username = username
user.password = password.decode("utf-8")
user.is_two_factor_enabled = is_two_factor_enabled
user.secret_token = secret_token
user.method = method
try:
session.commit()
except BaseException as e:
return str(e)
return ""

View file

@ -209,17 +209,6 @@ class Instances(Base):
method = Column(METHODS_ENUM, nullable=False, default="manual")
class Users(Base):
__tablename__ = "bw_ui_users"
id = Column(Integer, primary_key=True, default=1)
username = Column(String(256), nullable=False, unique=True)
password = Column(String(60), nullable=False)
is_two_factor_enabled = Column(Boolean, nullable=False, default=False)
secret_token = Column(String(32), nullable=True, unique=True, default=None)
method = Column(METHODS_ENUM, nullable=False, default="manual")
class BwcliCommands(Base):
__tablename__ = "bw_cli_commands"
__table_args__ = (UniqueConstraint("plugin_id", "name"),)

View file

@ -6,7 +6,6 @@ from os.path import join
from pathlib import Path
from signal import SIGINT, SIGTERM, signal
from threading import Lock
from regex import compile as re_compile
from sys import path as sys_path
from time import sleep
@ -14,10 +13,12 @@ for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in ((
if deps_path not in sys_path:
sys_path.append(deps_path)
from Database import Database # type: ignore
from common_utils import get_version # type: ignore
from logger import setup_logger # type: ignore
from src.User import User
from ui_database import UIDatabase # type: ignore
from utils import USER_PASSWORD_RX, check_password, gen_password_hash
TMP_DIR = Path(sep, "var", "tmp", "bunkerweb")
RUN_DIR = Path(sep, "var", "run", "bunkerweb")
@ -58,7 +59,7 @@ def on_starting(server):
LOGGER = setup_logger("UI")
db = Database(LOGGER, ui=True)
db = UIDatabase(LOGGER)
ready = False
while not ready:
@ -70,57 +71,85 @@ def on_starting(server):
continue
sleep(5)
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")
ret, err = db.init_ui_tables(get_version())
USER = "Error"
while USER == "Error":
with suppress(Exception):
USER = db.get_ui_user()
if not ret and err:
LOGGER.error(f"Exception while checking database tables : {err}")
exit(1)
elif not ret:
LOGGER.info("Database ui tables didn't change, skipping update ...")
else:
LOGGER.info("Database ui tables successfully updated")
if USER:
USER = User(**USER)
if not db.get_ui_roles(as_dict=True):
ret = db.create_ui_role("admin", "Admin can create account, manager software and read data.", ["manage", "write", "read"])
if ret:
LOGGER.error(f"Couldn't create the admin role in the database: {ret}")
exit(1)
if getenv("ADMIN_USERNAME") or getenv("ADMIN_PASSWORD"):
ret = db.create_ui_role("writer", "Write can manage software and read data but can't create account.", ["write", "read"])
if ret:
LOGGER.error(f"Couldn't create the admin role in the database: {ret}")
exit(1)
ret = db.create_ui_role("reader", "Reader can read data but can't proceed to any actions.", ["read"])
if ret:
LOGGER.error(f"Couldn't create the admin role in the database: {ret}")
exit(1)
ADMIN_USER = "Error"
while ADMIN_USER == "Error":
try:
ADMIN_USER = db.get_ui_user(as_dict=True)
except BaseException as e:
LOGGER.debug(f"Couldn't get the admin user: {e}")
sleep(1)
env_admin_username = getenv("ADMIN_USERNAME", "")
env_admin_password = getenv("ADMIN_PASSWORD", "")
if ADMIN_USER:
if env_admin_username or env_admin_password:
override_admin_creds = getenv("OVERRIDE_ADMIN_CREDS", "no").lower() == "yes"
if USER.method == "manual" or override_admin_creds:
if ADMIN_USER["method"] == "manual" or override_admin_creds:
updated = False
if getenv("ADMIN_USERNAME", "") and USER.get_id() != getenv("ADMIN_USERNAME", ""):
USER.id = getenv("ADMIN_USERNAME", "")
if env_admin_username and ADMIN_USER["username"] != env_admin_username:
ADMIN_USER["username"] = env_admin_username
updated = True
if getenv("ADMIN_PASSWORD", "") and not USER.check_password(getenv("ADMIN_PASSWORD", "")):
if not USER_PASSWORD_RX.match(getenv("ADMIN_PASSWORD", "")):
if env_admin_password and not check_password(env_admin_password, ADMIN_USER["password"]):
if not USER_PASSWORD_RX.match(env_admin_password):
LOGGER.warning(
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). It will not be updated."
)
else:
USER.update_password(getenv("ADMIN_PASSWORD", ""))
ADMIN_USER["password"] = gen_password_hash(env_admin_password)
updated = True
if updated:
if override_admin_creds:
LOGGER.warning("Overriding the admin user credentials, as the OVERRIDE_ADMIN_CREDS environment variable is set to 'yes'.")
err = db.update_ui_user(USER.get_id(), USER.password_hash, USER.is_two_factor_enabled, USER.secret_token, method="manual")
err = db.update_ui_user(ADMIN_USER["username"], ADMIN_USER["password"], ADMIN_USER["totp_secret"], method="manual")
if err:
LOGGER.error(f"Couldn't update the admin user in the database: {err}")
else:
LOGGER.info("The admin user was updated successfully")
else:
LOGGER.warning("The admin user wasn't created manually. You can't change it from the environment variables.")
elif getenv("ADMIN_USERNAME") and getenv("ADMIN_PASSWORD"):
elif env_admin_username and env_admin_password:
user_name = env_admin_username or "admin"
if not getenv("FLASK_DEBUG", False):
if len(getenv("ADMIN_USERNAME", "admin")) > 256:
if len(user_name) > 256:
LOGGER.error("The admin username is too long. It must be less than 256 characters.")
exit(1)
elif not USER_PASSWORD_RX.match(getenv("ADMIN_PASSWORD", "changeme")):
elif not USER_PASSWORD_RX.match(env_admin_password):
LOGGER.error(
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-)."
)
exit(1)
user_name = getenv("ADMIN_USERNAME", "admin")
USER = User(user_name, getenv("ADMIN_PASSWORD", "changeme"))
ret = db.create_ui_user(user_name, USER.password_hash)
ret = db.create_ui_user(user_name, gen_password_hash(env_admin_password), ["admin"], admin=True)
if ret:
LOGGER.error(f"Couldn't create the admin user in the database: {ret}")
exit(1)

View file

@ -28,6 +28,8 @@ from importlib.machinery import SourceFileLoader
from io import BytesIO
from json import JSONDecodeError, dumps, loads as json_loads
from jinja2 import Environment, FileSystemLoader, select_autoescape
from pyotp import random_base32
from pyotp.totp import TOTP
from redis import Redis, Sentinel
from regex import compile as re_compile, match as regex_match
from requests import get
@ -45,14 +47,15 @@ from src.Instances import Instances
from src.ConfigFiles import ConfigFiles
from src.Config import Config
from src.ReverseProxied import ReverseProxied
from src.User import AnonymousUser, User
from src.Templates import get_ui_templates
from utils import check_settings, get_b64encoded_qr_image, path_to_dict, get_remain
from utils import check_settings, gen_password_hash, get_b64encoded_qr_image, path_to_dict, get_remain
from common_utils import get_version # type: ignore
from Database import Database # type: ignore
from logger import setup_logger # type: ignore
from models import AnonymousUser
from ui_database import UIDatabase
TMP_DIR = Path(sep, "var", "tmp", "bunkerweb")
TMP_DIR.mkdir(parents=True, exist_ok=True)
@ -109,7 +112,6 @@ with app.app_context():
app.config["SESSION_COOKIE_SECURE"] = True # Required for __Host- prefix
app.config["SESSION_COOKIE_HTTPONLY"] = True # Recommended for security
app.config["SESSION_COOKIE_SAMESITE"] = "Lax" # Or 'Strict' for stricter settings
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=30)
login_manager = LoginManager()
login_manager.session_protection = "strong"
@ -118,7 +120,7 @@ with app.app_context():
login_manager.anonymous_user = AnonymousUser
PLUGIN_KEYS = ["id", "name", "description", "version", "stream", "settings"]
db = Database(app.logger, ui=True, log=False)
DB = UIDatabase(app.logger, log=False)
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")
@ -129,13 +131,12 @@ with app.app_context():
try:
app.config.update(
INSTANCES=Instances(db),
CONFIG=Config(db),
INSTANCES=Instances(DB),
CONFIG=Config(DB),
CONFIGFILES=ConfigFiles(),
WTF_CSRF_SSL_STRICT=False,
SEND_FILE_MAX_AGE_DEFAULT=86400,
SCRIPT_NONCE=sha256(urandom(32)).hexdigest(),
DB=db,
UI_TEMPLATES=get_ui_templates(),
)
except FileNotFoundError as e:
@ -159,7 +160,7 @@ def wait_applying():
current_time = datetime.now()
ready = False
while not ready and (datetime.now() - current_time).seconds < 120:
db_metadata = app.config["DB"].get_metadata()
db_metadata = DB.get_metadata()
if isinstance(db_metadata, str):
app.logger.error(f"An error occurred when checking for changes in the database : {db_metadata}")
elif not any(
@ -242,12 +243,12 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
# UTILS
def run_action(plugin: str, function_name: str = ""):
message = ""
module = app.config["DB"].get_plugin_actions(plugin)
module = DB.get_plugin_actions(plugin)
if module is None:
return {"status": "ko", "code": 404, "message": "The actions.py file for the plugin does not exist"}
obfuscation = app.config["DB"].get_plugin_obfuscation(plugin)
obfuscation = DB.get_plugin_obfuscation(plugin)
tmp_dir = None
try:
@ -320,7 +321,7 @@ def run_action(plugin: str, function_name: str = ""):
def get_user_info():
return current_user.get_id(), current_user.password_hash, current_user.is_two_factor_enabled, current_user.secret_token
return current_user.get_id(), current_user.password.encode("utf-8"), bool(current_user.totp_secret), current_user.totp_secret
def verify_data_in_form(data: dict[str, Union[tuple, any]] = {}, err_message: str = "", redirect_url: str = "", next: bool = False) -> Union[bool, Response]:
@ -366,11 +367,11 @@ def error_message(msg: str):
@app.context_processor
def inject_variables():
ui_data = get_ui_data()
metadata = app.config["DB"].get_metadata()
metadata = DB.get_metadata()
changes_ongoing = any(
v
for k, v in app.config["DB"].get_metadata().items()
for k, v in DB.get_metadata().items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
)
changes = False
@ -447,13 +448,20 @@ def set_security_headers(response):
@login_manager.user_loader
def load_user(user_id):
admin_user = app.config["DB"].get_ui_user()
if not admin_user:
app.logger.warning("Couldn't get the admin user from the database.")
def load_user(username):
ui_user = DB.get_ui_user(username=username)
if not ui_user:
app.logger.warning(f"Couldn't get the user {username} from the database.")
return None
admin_user = User(**admin_user)
return admin_user if user_id == admin_user.get_id() else None
ui_user.list_roles = DB.get_ui_user_roles(username)
for role in ui_user.list_roles:
ui_user.list_permissions.extend(DB.get_ui_role_permissions(role))
if ui_user.totp_secret:
ui_user.list_recovery_codes = DB.get_ui_user_recovery_codes(username)
return ui_user
@app.errorhandler(CSRFError)
@ -468,7 +476,7 @@ def handle_csrf_error(_):
flash("Wrong CSRF token !", "error")
if not current_user:
return render_template("setup.html"), 403
return render_template("login.html", is_totp=current_user.is_two_factor_enabled), 403
return render_template("login.html", is_totp=bool(current_user.totp_secret)), 403
@app.before_request
@ -484,58 +492,58 @@ def before_request():
if not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")):
if (
app.config["DB"].database_uri
and app.config["DB"].readonly
DB.database_uri
and DB.readonly
and (
datetime.now(timezone.utc) - datetime.fromisoformat(ui_data.get("LAST_DATABASE_RETRY", "1970-01-01T00:00:00")).replace(tzinfo=timezone.utc)
> timedelta(minutes=1)
)
):
try:
app.config["DB"].retry_connection(pool_timeout=1)
app.config["DB"].retry_connection(log=False)
DB.retry_connection(pool_timeout=1)
DB.retry_connection(log=False)
ui_data["READONLY_MODE"] = False
app.logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
app.config["DB"].retry_connection(readonly=True, pool_timeout=1)
app.config["DB"].retry_connection(readonly=True, log=False)
DB.retry_connection(readonly=True, pool_timeout=1)
DB.retry_connection(readonly=True, log=False)
except BaseException:
if app.config["DB"].database_uri_readonly:
if DB.database_uri_readonly:
with suppress(BaseException):
app.config["DB"].retry_connection(fallback=True, pool_timeout=1)
app.config["DB"].retry_connection(fallback=True, log=False)
DB.retry_connection(fallback=True, pool_timeout=1)
DB.retry_connection(fallback=True, log=False)
ui_data["READONLY_MODE"] = True
ui_data["LAST_DATABASE_RETRY"] = app.config["DB"].last_connection_retry.isoformat()
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat()
elif not ui_data.get("READONLY_MODE", False) and request.method == "POST" and not ("/totp" in request.path or "/login" in request.path):
try:
app.config["DB"].test_write()
DB.test_write()
ui_data["READONLY_MODE"] = False
except BaseException:
ui_data["READONLY_MODE"] = True
ui_data["LAST_DATABASE_RETRY"] = app.config["DB"].last_connection_retry.isoformat()
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat()
else:
try:
app.config["DB"].test_read()
DB.test_read()
except BaseException:
ui_data["LAST_DATABASE_RETRY"] = app.config["DB"].last_connection_retry.isoformat()
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat()
app.config["DB"].readonly = ui_data.get("READONLY_MODE", False)
DB.readonly = ui_data.get("READONLY_MODE", False)
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
if app.config["DB"].readonly:
if DB.readonly:
flash("Database connection is in read-only mode : no modification possible.", "error")
if current_user.is_authenticated:
passed = True
# Case not login page, keep on 2FA before any other access
if not session.get("totp_validated", False) and current_user.is_two_factor_enabled and "/totp" not in request.path:
if not session.get("totp_validated", False) and bool(current_user.totp_secret) and "/totp" not in request.path:
if not request.path.endswith("/login"):
return redirect(url_for("totp", next=request.form.get("next")))
passed = False
elif session.get("ip") != request.remote_addr:
elif current_user.last_login_ip != request.remote_addr:
passed = False
elif session.get("user_agent") != request.headers.get("User-Agent"):
passed = False
@ -546,7 +554,7 @@ def before_request():
@app.route("/", strict_slashes=False)
def index():
if app.config["DB"].get_ui_user():
if DB.get_ui_user():
if current_user.is_authenticated: # type: ignore
return redirect(url_for("home"))
return redirect(url_for("login"), 301)
@ -569,9 +577,7 @@ def check():
def setup():
db_config = app.config["CONFIG"].get_config(methods=False, filtered_settings=("SERVER_NAME", "MULTISITE", "USE_UI", "UI_HOST", "AUTO_LETS_ENCRYPT"))
admin_user = app.config["DB"].get_ui_user()
if admin_user:
admin_user = User(**admin_user)
admin_user = DB.get_ui_user()
ui_reverse_proxy = False
for server_name in db_config["SERVER_NAME"].split(" "):
@ -582,7 +588,7 @@ def setup():
break
if request.method == "POST":
if app.config["DB"].readonly:
if DB.readonly:
return handle_error("Database is in read-only mode", "setup")
required_keys = []
@ -607,9 +613,7 @@ def setup():
"setup",
)
admin_user = User(request.form["admin_username"], request.form["admin_password"], method="ui")
ret = app.config["DB"].create_ui_user(request.form["admin_username"], admin_user.password_hash, method="ui")
ret = DB.create_ui_user(request.form["admin_username"], gen_password_hash(request.form["admin_password"]), ["admin"], method="ui", admin=True)
if ret:
return handle_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error")
@ -685,16 +689,17 @@ def setup_loading():
@login_required
def totp():
if request.method == "POST":
verify_data_in_form(data={"totp_token": None}, err_message="No token provided on /totp.", redirect_url="totp")
if not current_user.check_otp(request.form["totp_token"]):
if request.form["totp_token"] not in current_user.list_recovery_codes and not current_user.check_otp(request.form["totp_token"]):
return handle_error("The token is invalid.", "totp")
elif request.form["totp_token"] in current_user.list_recovery_codes:
DB.use_ui_user_recovery_code(current_user.get_id(), request.form["totp_token"])
session["totp_validated"] = True
redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
if not current_user.is_two_factor_enabled or session.get("totp_validated", False):
if not bool(current_user.totp_secret) or session.get("totp_validated", False):
return redirect(url_for("home"))
return render_template("totp.html")
@ -748,7 +753,7 @@ def home():
services_autoconf_count += 1
services += 1
metadata = app.config["DB"].get_metadata()
metadata = DB.get_metadata()
data = {
"check_version": not remote_version or bw_version == remote_version,
@ -765,7 +770,7 @@ def home():
"pro_services": metadata["pro_services"],
"pro_overlapped": metadata["pro_overlapped"],
"plugins_number": len(app.config["CONFIG"].get_plugins()),
"plugins_errors": app.config["DB"].get_plugins_errors(),
"plugins_errors": DB.get_plugins_errors(),
}
data_server_builder = home_builder(data)
@ -777,7 +782,7 @@ def home():
@login_required
def account():
if request.method == "POST":
if app.config["DB"].readonly:
if DB.readonly:
return handle_error("Database is in read-only mode", "account")
verify_data_in_form(
@ -803,11 +808,11 @@ def account():
# Force job to contact PRO API
# by setting the last check to None
metadata = app.config["DB"].get_metadata()
metadata = DB.get_metadata()
metadata["last_pro_check"] = None
app.config["DB"].set_pro_metadata(metadata)
DB.set_pro_metadata(metadata)
curr_changes = app.config["DB"].check_changes()
curr_changes = DB.check_changes()
# Reload instances
def update_global_config(threaded: bool = False):
@ -843,8 +848,7 @@ def account():
username = current_user.get_id()
password = request.form["curr_password"]
is_two_factor_enabled = current_user.is_two_factor_enabled
secret_token = current_user.secret_token
secret_token = current_user.totp_secret
if request.form["operation"] == "username":
verify_data_in_form(data={"admin_username": None}, err_message="Missing admin username parameter on /account.", redirect_url="account")
@ -881,21 +885,19 @@ def account():
ui_data = get_ui_data()
if not current_user.check_otp(request.form["totp_token"], secret=ui_data.get("CURRENT_TOTP_TOKEN", None)):
if request.form["totp_token"] not in current_user.list_recovery_codes and not current_user.check_otp(
request.form["totp_token"], secret=session.get("tmp_totp_secret")
):
return handle_error("The totp token is invalid. (totp)", "account")
session["totp_validated"] = not current_user.is_two_factor_enabled
is_two_factor_enabled = session["totp_validated"]
secret_token = None if current_user.is_two_factor_enabled else ui_data.get("CURRENT_TOTP_TOKEN", None)
ui_data["CURRENT_TOTP_TOKEN"] = None
session["totp_validated"] = not bool(current_user.totp_secret)
secret_token = None if bool(current_user.totp_secret) else session.get("tmp_totp_secret")
session["tmp_totp_secret"] = None
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
user = User(username, password, is_two_factor_enabled=is_two_factor_enabled, secret_token=secret_token, method=current_user.method)
ret = app.config["DB"].update_ui_user(
username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui"
)
ret = DB.update_ui_user(username, gen_password_hash(password), secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
if ret:
return handle_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
@ -903,30 +905,24 @@ def account():
(
f"The {request.form['operation']} has been successfully updated."
if request.form["operation"] != "totp"
else f"The two-factor authentication was successfully {'disabled' if current_user.is_two_factor_enabled else 'enabled'}."
else f"The two-factor authentication was successfully {'disabled' if bool(current_user.totp_secret) else 'enabled'}."
),
)
return redirect(url_for("account" if request.form["operation"] == "totp" else "login"))
secret_token = ""
totp_qr_image = ""
if not current_user.is_two_factor_enabled:
current_user.refresh_totp()
secret_token = current_user.secret_token
totp_qr_image = get_b64encoded_qr_image(current_user.get_authentication_setup_uri())
ui_data = get_ui_data()
ui_data["CURRENT_TOTP_TOKEN"] = secret_token
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
if not bool(current_user.totp_secret):
session["tmp_totp_secret"] = random_base32()
totp = TOTP(session["tmp_totp_secret"])
totp_qr_image = get_b64encoded_qr_image(totp.provisioning_uri(name=current_user.get_id(), issuer_name="BunkerWeb UI"))
return render_template(
"account.html",
username=current_user.get_id(),
is_totp=current_user.is_two_factor_enabled,
secret_token=secret_token,
is_totp=bool(current_user.totp_secret),
secret_token=session["tmp_totp_secret"],
totp_qr_image=totp_qr_image,
)
@ -981,7 +977,7 @@ def instances():
def get_service_data():
config = app.config["DB"].get_config(methods=True, with_drafts=True)
config = DB.get_config(methods=True, with_drafts=True)
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
@ -1060,7 +1056,7 @@ def get_service_data():
# @login_required
# def services():
# if request.method == "POST":
# if app.config["DB"].readonly:
# if DB.readonly:
# return handle_error("Database is in read-only mode", "services")
# verify_data_in_form(
@ -1089,7 +1085,7 @@ def get_service_data():
# if config.get(f"{request.form['SERVER_NAME'].split(' ')[0]}_SERVER_NAME", {"method": "scheduler"})["method"] != "ui":
# return handle_error("The service cannot be deleted because it has not been created with the UI.", "services", True)
# db_metadata = app.config["DB"].get_metadata()
# db_metadata = DB.get_metadata()
# def update_services(threaded: bool = False):
# wait_applying()
@ -1136,7 +1132,7 @@ def get_service_data():
# # Display services
# services = []
# tmp_config = app.config["DB"].get_config(methods=True, with_drafts=True).copy()
# tmp_config = DB.get_config(methods=True, with_drafts=True).copy()
# service_names = tmp_config["SERVER_NAME"]["value"].split(" ")
# table_settings = (
@ -1156,7 +1152,7 @@ def get_service_data():
# for service in service_names:
# service_settings = {}
# # For each needed setting, get the service value if one, else the global (value), else defautl value
# # For each needed setting, get the service value if one, else the global (value), else default value
# for setting in table_settings:
# value = tmp_config.get(f"{service}_{setting}", tmp_config.get(setting, {"value": None}))["value"]
# method = tmp_config.get(f"{service}_{setting}", tmp_config.get(setting, {"method": None}))["method"]
@ -1178,7 +1174,7 @@ def get_service_data():
@login_required
def services():
if request.method == "POST":
if app.config["DB"].readonly:
if DB.readonly:
return handle_error("Database is in read-only mode", "services")
verify_data_in_form(
@ -1207,7 +1203,7 @@ def services():
if config.get(f"{request.form['SERVER_NAME'].split(' ')[0]}_SERVER_NAME", {"method": "scheduler"})["method"] != "ui":
return handle_error("The service cannot be deleted because it has not been created with the UI.", "services", True)
db_metadata = app.config["DB"].get_metadata()
db_metadata = DB.get_metadata()
def update_services(threaded: bool = False):
wait_applying()
@ -1254,7 +1250,7 @@ def services():
# Display services
services = []
global_config = app.config["DB"].get_config(methods=True, with_drafts=True)
global_config = DB.get_config(methods=True, with_drafts=True)
service_names = global_config["SERVER_NAME"]["value"].split(" ")
for service in service_names:
service_settings = []
@ -1305,7 +1301,7 @@ def services():
@login_required
def global_config():
if request.method == "POST":
if app.config["DB"].readonly:
if DB.readonly:
return handle_error("Database is in read-only mode", "global_config")
# Check variables
@ -1313,7 +1309,7 @@ def global_config():
del variables["csrf_token"]
# Edit check fields and remove already existing ones
config = app.config["DB"].get_config(methods=True, with_drafts=True)
config = DB.get_config(methods=True, with_drafts=True)
services = config["SERVER_NAME"]["value"].split(" ")
for variable, value in variables.copy().items():
setting = config.get(variable, {"value": None, "global": True})
@ -1332,7 +1328,7 @@ def global_config():
if setting and setting["global"] and (setting["value"] != value or setting["value"] == config.get(variable, {"value": None})["value"]):
variables[f"{service}_{variable}"] = value
db_metadata = app.config["DB"].get_metadata()
db_metadata = DB.get_metadata()
def update_global_config(threaded: bool = False):
wait_applying()
@ -1372,7 +1368,7 @@ def global_config():
)
)
global_config = app.config["DB"].get_config(global_only=True, methods=True)
global_config = DB.get_config(global_only=True, methods=True)
plugins = app.config["CONFIG"].get_plugins()
data_server_builder = global_config_builder(plugins, global_config)
return render_template("global-config.html", data_server_builder=data_server_builder)
@ -1381,10 +1377,10 @@ def global_config():
@app.route("/configs", methods=["GET", "POST"])
@login_required
def configs():
db_configs = app.config["DB"].get_custom_configs()
db_configs = DB.get_custom_configs()
if request.method == "POST":
if app.config["DB"].readonly:
if DB.readonly:
return handle_error("Database is in read-only mode", "configs")
operation = ""
@ -1471,7 +1467,7 @@ def configs():
del db_configs[index]
operation = f"Deleted config {name}{f' for service {service_id}' if service_id else ''}"
error = app.config["DB"].save_custom_configs([config for config in db_configs if config["method"] == "ui"], "ui")
error = DB.save_custom_configs([config for config in db_configs if config["method"] == "ui"], "ui")
if error:
app.logger.error(f"Could not save custom configs: {error}")
return handle_error("Couldn't save custom configs", "configs", True)
@ -1503,7 +1499,7 @@ def plugins():
tmp_ui_path = TMP_DIR.joinpath("ui")
if request.method == "POST":
if app.config["DB"].readonly:
if DB.readonly:
return handle_error("Database is in read-only mode", "plugins")
verify_data_in_form(
@ -1524,7 +1520,7 @@ def plugins():
if variables["type"] in ("core", "pro"):
return handle_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True)
db_metadata = app.config["DB"].get_metadata()
db_metadata = DB.get_metadata()
def update_plugins(threaded: bool = False): # type: ignore
wait_applying()
@ -1536,7 +1532,7 @@ def plugins():
ui_data = get_ui_data()
err = app.config["DB"].update_external_plugins(plugins)
err = DB.update_external_plugins(plugins)
if err:
message = f"Couldn't update external plugins to database: {err}"
if threaded:
@ -1737,7 +1733,7 @@ def plugins():
if errors >= files_count:
return redirect(url_for("loading", next=url_for("plugins")))
db_metadata = app.config["DB"].get_metadata()
db_metadata = DB.get_metadata()
def update_plugins(threaded: bool = False):
wait_applying()
@ -1750,7 +1746,7 @@ def plugins():
ui_data = get_ui_data()
err = app.config["DB"].update_external_plugins(new_plugins, delete_missing=False)
err = DB.update_external_plugins(new_plugins, delete_missing=False)
if err:
message = f"Couldn't update external plugins to database: {err}"
if threaded:
@ -1813,7 +1809,7 @@ def plugins():
@app.route("/plugins/upload", methods=["POST"])
@login_required
def upload_plugin():
if app.config["DB"].readonly:
if DB.readonly:
return {"status": "ko", "message": "Database is in read-only mode"}, 403
if not request.files:
@ -1893,7 +1889,7 @@ def custom_plugin(plugin: str):
if request.method == "GET":
# Check template
page = app.config["DB"].get_plugin_template(plugin)
page = DB.get_plugin_template(plugin)
if not page:
return error_message("The plugin does not have a template"), 404
@ -1917,7 +1913,7 @@ def custom_plugin(plugin: str):
if plugin_id is None:
return error_message("Plugin not found"), 404
config = app.config["DB"].get_config()
config = DB.get_config()
# Check if we are using metrics
for service in config.get("SERVER_NAME", "").split(" "):
@ -2025,7 +2021,7 @@ def cache():
path_to_dict(
join(sep, "var", "cache", "bunkerweb"),
is_cache=True,
db_data=app.config["DB"].get_jobs_cache_files(),
db_data=DB.get_jobs_cache_files(),
services=app.config["CONFIG"].get_config(global_only=True, methods=False, filtered_settings=("SERVER_NAME",)).get("SERVER_NAME", "").split(" "),
)
],
@ -2312,7 +2308,7 @@ def reports():
def bans():
if request.method == "POST":
if app.config["DB"].readonly:
if DB.readonly:
return handle_error("Database is in read-only mode", "bans")
# Check variables
@ -2516,7 +2512,7 @@ def bans():
@app.route("/jobs", methods=["GET"])
@login_required
def jobs():
data_server_builder = jobs_builder(app.config["DB"].get_jobs())
data_server_builder = jobs_builder(DB.get_jobs())
return render_template("jobs.html", data_server_builder=data_server_builder)
@ -2531,7 +2527,7 @@ def jobs_download():
if not plugin_id or not job_name or not file_name:
return jsonify({"status": "ko", "message": "plugin_id, job_name and file_name are required"}), 422
cache_file = app.config["DB"].get_job_cache_file(job_name, file_name, service_id=service_id, plugin_id=plugin_id)
cache_file = DB.get_job_cache_file(job_name, file_name, service_id=service_id, plugin_id=plugin_id)
if not cache_file:
return jsonify({"status": "ko", "message": "file not found"}), 404
@ -2543,7 +2539,7 @@ def jobs_download():
@app.route("/login", methods=["GET", "POST"])
def login():
admin_user = app.config["DB"].get_ui_user()
admin_user = DB.get_ui_user()
if not admin_user:
return redirect(url_for("setup"))
elif current_user.is_authenticated: # type: ignore
@ -2553,15 +2549,27 @@ def login():
if request.method == "POST" and "username" in request.form and "password" in request.form:
app.logger.warning(f"Login attempt from {request.remote_addr} with username \"{request.form['username']}\"")
admin_user = User(**admin_user)
if admin_user.get_id() == request.form["username"] and admin_user.check_password(request.form["password"]):
ui_user = DB.get_ui_user(username=request.form["username"])
if ui_user and ui_user.username == request.form["username"] and ui_user.check_password(request.form["password"]):
# log the user in
session.permanent = True
session["ip"] = request.remote_addr
session["user_agent"] = request.headers.get("User-Agent")
session["totp_validated"] = False
login_user(admin_user, duration=timedelta(hours=8), force=True)
session["tmp_totp_secret"] = None
ui_user.last_login_at = datetime.now()
ui_user.last_login_ip = request.remote_addr
ui_user.login_count += 1
DB.mark_ui_user_login(ui_user.username, ui_user.last_login_at, ui_user.last_login_ip)
if not login_user(ui_user, remember=request.form.get("remember") == "on"):
flash("Couldn't log you in, please try again", "error")
return (render_template("login.html", error="Couldn't log you in, please try again"),)
app.logger.info(
f"User {ui_user.username} logged in successfully for the {str(ui_user.login_count) + ('th' if 10 <= ui_user.login_count % 100 <= 20 else {1: 'st', 2: 'nd', 3: 'rd'}.get(ui_user.login_count % 10, 'th'))} time"
+ (" with remember me" if request.form.get("remember") == "on" else "")
)
# redirect him to the page he originally wanted or to the home page
return redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
@ -2570,7 +2578,7 @@ def login():
fail = True
kwargs = {
"is_totp": current_user.is_two_factor_enabled,
"is_totp": bool(current_user.totp_secret),
} | ({"error": "Invalid username or password"} if fail else {})
return render_template("login.html", **kwargs), 401 if fail else 200
@ -2606,4 +2614,6 @@ def check_reloading():
def logout():
session.clear()
logout_user()
return redirect(url_for("login"))
response = redirect(url_for("login"))
response.headers["Clear-Site-Data"] = '"cache", "cookies", "storage", "executionContexts"'
return response

130
src/ui/models.py Normal file
View file

@ -0,0 +1,130 @@
from datetime import datetime, timezone
from functools import partial
from os.path import join, sep
from sys import path as sys_path
from typing import Optional
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("db",))]:
if deps_path not in sys_path:
sys_path.append(deps_path)
from bcrypt import checkpw
from flask_login import AnonymousUserMixin, UserMixin
from pyotp.totp import TOTP
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy import Boolean, DateTime, Column, Integer, String, ForeignKey, func
from model import METHODS_ENUM # type: ignore
Base = declarative_base()
class AnonymousUser(AnonymousUserMixin):
username = "Anonymous"
email = None
password = ""
method = "manual"
admin = False
last_login_at = None
last_login_ip = None
login_count = 0
totp_secret = None
creation_date = datetime.now(timezone.utc)
update_date = datetime.now(timezone.utc)
def check_password(self, password: str) -> bool:
return False
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
if not secret:
raise ValueError("Secret is required for anonymous user")
return TOTP(secret).verify(otp, valid_window=3)
class Users(Base, UserMixin):
__tablename__ = "bw_ui_users"
username = Column(String(256), primary_key=True)
email = Column(String(256), unique=True, nullable=True)
password = Column(String(60), nullable=False)
method = Column(METHODS_ENUM, nullable=False, default="manual")
admin = Column(Boolean, nullable=False, default=False)
# Trackable
last_login_at = Column(DateTime(), nullable=True)
last_login_ip = Column(String(39), nullable=True)
login_count = Column(Integer, default=0, nullable=False)
# 2FA
totp_secret = Column(String(32), nullable=True)
totp_refreshed = Column(Boolean, nullable=False, default=False)
creation_date = Column(DateTime(), nullable=False, server_default=func.now())
update_date = Column(DateTime(), nullable=False, server_default=func.now(), onupdate=partial(datetime.now, timezone.utc))
roles = relationship("RolesUsers", back_populates="user", cascade="all")
recovery_codes = relationship("UserRecoveryCodes", back_populates="user", cascade="all")
list_roles: list[str] = []
list_permissions: list[str] = []
list_recovery_codes: list[str] = []
def get_id(self):
return self.username
def check_password(self, password: str) -> bool:
return checkpw(password.encode("utf-8"), self.password.encode("utf-8"))
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
if secret:
return TOTP(secret).verify(otp, valid_window=3)
return TOTP(self.totp_secret).verify(otp, valid_window=3)
class Roles(Base):
__tablename__ = "bw_ui_roles"
name = Column(String(64), primary_key=True)
description = Column(String(256), nullable=False)
update_datetime = Column(DateTime(), nullable=False, server_default=func.now(), onupdate=partial(datetime.now, timezone.utc))
users = relationship("RolesUsers", back_populates="role", cascade="all")
permissions = relationship("RolesPermissions", back_populates="role", cascade="all")
class RolesUsers(Base):
__tablename__ = "bw_ui_roles_users"
user_name = Column(String(256), ForeignKey("bw_ui_users.username", onupdate="cascade", ondelete="cascade"), primary_key=True)
role_name = Column(String(64), ForeignKey("bw_ui_roles.name", onupdate="cascade", ondelete="cascade"), primary_key=True)
user = relationship("Users", back_populates="roles")
role = relationship("Roles", back_populates="users")
class UserRecoveryCodes(Base):
__tablename__ = "bw_ui_user_recovery_codes"
id = Column(Integer, primary_key=True)
user_name = Column(String(256), ForeignKey("bw_ui_users.username", onupdate="cascade", ondelete="cascade"), nullable=False)
code = Column(String(19), nullable=False)
user = relationship("Users", back_populates="recovery_codes")
class RolesPermissions(Base):
__tablename__ = "bw_ui_roles_permissions"
role_name = Column(String(64), ForeignKey("bw_ui_roles.name", onupdate="cascade", ondelete="cascade"), primary_key=True)
permission_name = Column(String(64), ForeignKey("bw_ui_permissions.name", onupdate="cascade", ondelete="cascade"), primary_key=True)
role = relationship("Roles", back_populates="permissions")
permission = relationship("Permissions", back_populates="roles")
class Permissions(Base):
__tablename__ = "bw_ui_permissions"
name = Column(String(64), primary_key=True)
roles = relationship("RolesPermissions", back_populates="permission", cascade="all")

View file

@ -1,9 +1,10 @@
bcrypt==4.1.3
bcrypt==4.2.0
beautifulsoup4==4.12.3
Flask==3.0.3
Flask-Login==0.6.3
Flask_WTF==1.2.1
gunicorn[gthread]==22.0.0
passlib==1.7.4
pyotp==2.9.0
python-magic==0.4.27
python_dateutil==2.9.0.post0

View file

@ -4,34 +4,34 @@
#
# pip-compile --allow-unsafe --generate-hashes --strip-extras requirements.in
#
bcrypt==4.1.3 \
--hash=sha256:01746eb2c4299dd0ae1670234bf77704f581dd72cc180f444bfe74eb80495b64 \
--hash=sha256:037c5bf7c196a63dcce75545c8874610c600809d5d82c305dd327cd4969995bf \
--hash=sha256:094fd31e08c2b102a14880ee5b3d09913ecf334cd604af27e1013c76831f7b05 \
--hash=sha256:0d4cf6ef1525f79255ef048b3489602868c47aea61f375377f0d00514fe4a78c \
--hash=sha256:193bb49eeeb9c1e2db9ba65d09dc6384edd5608d9d672b4125e9320af9153a15 \
--hash=sha256:2505b54afb074627111b5a8dc9b6ae69d0f01fea65c2fcaea403448c503d3991 \
--hash=sha256:2ee15dd749f5952fe3f0430d0ff6b74082e159c50332a1413d51b5689cf06623 \
--hash=sha256:31adb9cbb8737a581a843e13df22ffb7c84638342de3708a98d5c986770f2834 \
--hash=sha256:3a5be252fef513363fe281bafc596c31b552cf81d04c5085bc5dac29670faa08 \
--hash=sha256:3d3b317050a9a711a5c7214bf04e28333cf528e0ed0ec9a4e55ba628d0f07c1a \
--hash=sha256:48429c83292b57bf4af6ab75809f8f4daf52aa5d480632e53707805cc1ce9b74 \
--hash=sha256:4a8bea4c152b91fd8319fef4c6a790da5c07840421c2b785084989bf8bbb7455 \
--hash=sha256:4fb253d65da30d9269e0a6f4b0de32bd657a0208a6f4e43d3e645774fb5457f3 \
--hash=sha256:551b320396e1d05e49cc18dd77d970accd52b322441628aca04801bbd1d52a73 \
--hash=sha256:5f7cd3399fbc4ec290378b541b0cf3d4398e4737a65d0f938c7c0f9d5e686611 \
--hash=sha256:6004f5229b50f8493c49232b8e75726b568535fd300e5039e255d919fc3a07f2 \
--hash=sha256:6717543d2c110a155e6821ce5670c1f512f602eabb77dba95717ca76af79867d \
--hash=sha256:6cac78a8d42f9d120b3987f82252bdbeb7e6e900a5e1ba37f6be6fe4e3848286 \
--hash=sha256:8a893d192dfb7c8e883c4576813bf18bb9d59e2cfd88b68b725990f033f1b978 \
--hash=sha256:8cbb119267068c2581ae38790e0d1fbae65d0725247a930fc9900c285d95725d \
--hash=sha256:9f8ea645eb94fb6e7bea0cf4ba121c07a3a182ac52876493870033141aa687bc \
--hash=sha256:c4c8d9b3e97209dd7111bf726e79f638ad9224b4691d1c7cfefa571a09b1b2d6 \
--hash=sha256:cb9c707c10bddaf9e5ba7cdb769f3e889e60b7d4fea22834b261f51ca2b89fed \
--hash=sha256:d84702adb8f2798d813b17d8187d27076cca3cd52fe3686bb07a9083930ce650 \
--hash=sha256:ec3c2e1ca3e5c4b9edb94290b356d082b721f3f50758bce7cce11d8a7c89ce84 \
--hash=sha256:f44a97780677e7ac0ca393bd7982b19dbbd8d7228c1afe10b128fd9550eef5f1 \
--hash=sha256:f5698ce5292a4e4b9e5861f7e53b1d89242ad39d54c3da451a93cac17b61921a
bcrypt==4.2.0 \
--hash=sha256:096a15d26ed6ce37a14c1ac1e48119660f21b24cba457f160a4b830f3fe6b5cb \
--hash=sha256:0da52759f7f30e83f1e30a888d9163a81353ef224d82dc58eb5bb52efcabc399 \
--hash=sha256:1bb429fedbe0249465cdd85a58e8376f31bb315e484f16e68ca4c786dcc04291 \
--hash=sha256:1d84cf6d877918620b687b8fd1bf7781d11e8a0998f576c7aa939776b512b98d \
--hash=sha256:1ee38e858bf5d0287c39b7a1fc59eec64bbf880c7d504d3a06a96c16e14058e7 \
--hash=sha256:1ff39b78a52cf03fdf902635e4c81e544714861ba3f0efc56558979dd4f09170 \
--hash=sha256:27fe0f57bb5573104b5a6de5e4153c60814c711b29364c10a75a54bb6d7ff48d \
--hash=sha256:3413bd60460f76097ee2e0a493ccebe4a7601918219c02f503984f0a7ee0aebe \
--hash=sha256:3698393a1b1f1fd5714524193849d0c6d524d33523acca37cd28f02899285060 \
--hash=sha256:373db9abe198e8e2c70d12b479464e0d5092cc122b20ec504097b5f2297ed184 \
--hash=sha256:39e1d30c7233cfc54f5c3f2c825156fe044efdd3e0b9d309512cc514a263ec2a \
--hash=sha256:3bbbfb2734f0e4f37c5136130405332640a1e46e6b23e000eeff2ba8d005da68 \
--hash=sha256:3d3a6d28cb2305b43feac298774b997e372e56c7c7afd90a12b3dc49b189151c \
--hash=sha256:5a1e8aa9b28ae28020a3ac4b053117fb51c57a010b9f969603ed885f23841458 \
--hash=sha256:61ed14326ee023917ecd093ee6ef422a72f3aec6f07e21ea5f10622b735538a9 \
--hash=sha256:655ea221910bcac76ea08aaa76df427ef8625f92e55a8ee44fbf7753dbabb328 \
--hash=sha256:762a2c5fb35f89606a9fde5e51392dad0cd1ab7ae64149a8b935fe8d79dd5ed7 \
--hash=sha256:77800b7147c9dc905db1cba26abe31e504d8247ac73580b4aa179f98e6608f34 \
--hash=sha256:8ac68872c82f1add6a20bd489870c71b00ebacd2e9134a8aa3f98a0052ab4b0e \
--hash=sha256:8d7bb9c42801035e61c109c345a28ed7e84426ae4865511eb82e913df18f58c2 \
--hash=sha256:8f6ede91359e5df88d1f5c1ef47428a4420136f3ce97763e31b86dd8280fbdf5 \
--hash=sha256:9c1c4ad86351339c5f320ca372dfba6cb6beb25e8efc659bedd918d921956bae \
--hash=sha256:c02d944ca89d9b1922ceb8a46460dd17df1ba37ab66feac4870f6862a1533c00 \
--hash=sha256:c52aac18ea1f4a4f65963ea4f9530c306b56ccd0c6f8c8da0c06976e34a6e841 \
--hash=sha256:cb2a8ec2bc07d3553ccebf0746bbf3d19426d1c6d1adbd4fa48925f66af7b9e8 \
--hash=sha256:cf69eaf5185fd58f268f805b505ce31f9b9fc2d64b376642164e9244540c1221 \
--hash=sha256:f4f4acf526fcd1c34e7ce851147deedd4e26e6402369304220250598b26448db
# via -r requirements.in
beautifulsoup4==4.12.3 \
--hash=sha256:74e3d1928edc070d21748185c46e3fb33490f22f52a3addee9aee0f4f7781051 \
@ -64,9 +64,9 @@ gunicorn==22.0.0 \
--hash=sha256:350679f91b24062c86e386e198a15438d53a7a8207235a78ba1b53df4c4378d9 \
--hash=sha256:4a0b436239ff76fb33f11c07a16482c521a7e09c1ce3cc293c2330afe01bec63
# via -r requirements.in
importlib-metadata==8.0.0 \
--hash=sha256:15584cf2b1bf449d98ff8a6ff1abef57bf20f3ac6454f431736cd3e660921b2f \
--hash=sha256:188bd24e4c346d3f0a933f275c2fec67050326a856b9a359881d7c2a697e8812
importlib-metadata==8.2.0 \
--hash=sha256:11901fa0c2f97919b288679932bb64febaeacf289d18ac84dd68cb2e74213369 \
--hash=sha256:72e8d4399996132204f9a16dcc751af254a48f8d1b20b9ff0f98d4a8f901e73d
# via flask
itsdangerous==2.2.0 \
--hash=sha256:c6242fc49e35958c8b15141343aa660db5fc54d4f13a1db01a3f5891b98700ef \
@ -147,6 +147,10 @@ packaging==24.1 \
--hash=sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002 \
--hash=sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124
# via gunicorn
passlib==1.7.4 \
--hash=sha256:aa6bca462b8d8bda89c70b382f0c298a20b5560af6cbfa2dce410c0a2fb669f1 \
--hash=sha256:defd50f72b65c5402ab2c573830a6978e5f202ad0d984793c8dde2c4152ebe04
# via -r requirements.in
pyotp==2.9.0 \
--hash=sha256:346b6642e0dbdde3b4ff5a930b664ca82abfa116356ed48cc42c7d6590d36f63 \
--hash=sha256:81c2e5865b8ac55e825b0358e496e1d9387c811e85bb40e71a3b29b288963612

View file

@ -1,137 +0,0 @@
#!/usr/bin/env python3
from typing import Optional
from bcrypt import checkpw, hashpw, gensalt
from flask_login import AnonymousUserMixin, UserMixin
from pyotp import random_base32
from pyotp.totp import TOTP
class User(UserMixin):
def __init__(
self,
username: str,
password: Optional[str] = None,
*,
is_two_factor_enabled: bool = False,
password_hash: Optional[bytes] = None,
secret_token: Optional[str] = None,
method: str = "manual",
):
self.id = username
if not password:
assert password_hash, "Either password or password_hash must be provided"
self.__password = password_hash or hashpw(password.encode("utf-8"), gensalt(rounds=13)) # type: ignore
self.is_two_factor_enabled = is_two_factor_enabled
self.secret_token = secret_token
self.method = method
self.__totp = TOTP(secret_token) if secret_token else None
@property
def password_hash(self) -> bytes:
"""
Get the password hash
:return: The password hash
"""
return self.__password
def update_password(self, password: str):
"""
Set the password by hashing it
:param password: The password to be hashed
"""
self.__password = hashpw(password.encode("utf-8"), gensalt(rounds=13))
def check_password(self, password: str):
"""
Check if the password is correct by hashing it and comparing it to the stored hash
:param password: The password to be checked
:return: The password is being checked against the password hash. If the password is correct,
the user is returned.
"""
return checkpw(password.encode("utf-8"), self.__password)
def get_authentication_setup_uri(self) -> str:
if not self.__totp:
return ""
return self.__totp.provisioning_uri(name=self.id, issuer_name="BunkerWeb UI")
def refresh_totp(self):
self.secret_token = random_base32()
self.__totp = TOTP(self.secret_token)
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
"""
Check if the otp is correct by comparing it to the stored secret token
:param otp: The otp to be checked
:return: The otp is being checked against the secret token. If the otp is correct,
the user is returned.
"""
if secret:
return TOTP(secret).verify(otp, valid_window=3)
if not self.__totp:
return False
return self.__totp.verify(otp, valid_window=3)
def __repr__(self):
return f"User({self.id!r}, {self.__password!r}, {self.is_two_factor_enabled!r}, {self.secret_token!r}, {self.method!r})"
class AnonymousUser(AnonymousUserMixin):
def __init__(self):
self.id = None
self.is_two_factor_enabled = False
self.secret_token = None
self.method = "manual"
@property
def password_hash(self) -> None:
"""
Get the password hash
:return: The password hash
"""
return None
def update_password(self, password: str):
"""
Set the password by hashing it
:param password: The password to be hashed
"""
self.__password = hashpw(password.encode("utf-8"), gensalt(rounds=13))
def check_password(self, password: str):
"""
Check if the password is correct by hashing it and comparing it to the stored hash
:param password: The password to be checked
:return: The password is being checked against the password hash. If the password is correct,
the user is returned.
"""
return False
def get_authentication_setup_uri(self) -> str:
return ""
def refresh_totp(self):
return
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
"""
Check if the otp is correct by comparing it to the stored secret token
:param otp: The otp to be checked
:return: The otp is being checked against the secret token. If the otp is correct,
the user is returned.
"""
if secret:
return TOTP(secret).verify(otp, valid_window=3)
return False

View file

@ -498,7 +498,7 @@
{% if is_totp or not is_totp %}
<!-- username inpt-->
<div class="flex flex-col relative col-span-12 px-4 my-2 md:px-6 md:my-3 lg:px-6 lg:my-3 max-w-[400px] w-full">
<h5 class="input-title">2FA code</h5>
<h5 class="input-title">2FA code or Backup code</h5>
<label class="sr-only" for="totp_token">totp code</label>
<input {% if is_readonly %}disabled{% endif %}
type="text"

View file

@ -96,6 +96,15 @@
required />
</div>
<!-- end password inpt-->
<!-- remember me inpt-->
<div class="flex items-center my-3">
<label class="sr-only" for="remember">Remember me</label>
<input type="checkbox" id="remember" name="remember" class="mr-2" value="on" />
<h5 class="text-sm font-bold dark:text-gray-300 dark:opacity-90 transition duration-300 ease-in-out m-0">
Remember me
</h5>
</div>
<!-- end remember me inpt-->
<!-- totp -->
<div class="flex justify-center">
<button type="submit"

406
src/ui/ui_database.py Normal file
View file

@ -0,0 +1,406 @@
from datetime import datetime
from logging import Logger
from os import sep
from os.path import join
from sys import path as sys_path
from time import sleep
from typing import List, Optional, Tuple, Union
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
if deps_path not in sys_path:
sys_path.append(deps_path)
from sqlalchemy import MetaData, inspect, text
from sqlalchemy.exc import IntegrityError
from Database import Database # type: ignore
from models import Base, Users, Roles, RolesUsers, UserRecoveryCodes, RolesPermissions, Permissions
from utils import gen_recovery_codes
class UIDatabase(Database):
def __init__(self, logger: Logger, sqlalchemy_string: Optional[str] = None, *, pool: Optional[bool] = None, log: bool = True, **kwargs) -> None:
super().__init__(logger, sqlalchemy_string, ui=True, pool=pool, log=log, **kwargs)
def init_ui_tables(self, bunkerweb_version: str) -> Tuple[bool, str]:
"""Initialize the database ui tables and return the result"""
if self.readonly:
return False, "The database is read-only, the changes will not be saved"
assert self.sql_engine is not None, "The database engine is not initialized"
inspector = inspect(self.sql_engine)
db_version = None
has_all_tables = True
old_data = {}
if inspector and len(inspector.get_table_names()):
metadata = self.get_metadata()
db_version = metadata["version"]
if metadata["default"]:
db_version = "error"
if db_version != bunkerweb_version:
self.logger.warning(f"Database version ({db_version}) is different from BunkerWeb version ({bunkerweb_version}), migrating ui tables ...")
current_time = datetime.now()
error = True
while error:
try:
metadata = MetaData()
metadata.reflect(self.sql_engine)
error = False
except BaseException as e:
if (datetime.now() - current_time).total_seconds() > 10:
raise e
sleep(1)
assert isinstance(metadata, MetaData)
for table_name in Base.metadata.tables.keys():
if not inspector.has_table(table_name):
self.logger.warning(f'UI table "{table_name}" is missing, creating it')
has_all_tables = False
continue
with self._db_session() as session:
old_data[table_name] = session.query(metadata.tables[table_name]).all()
# Rename the old tables
db_version_id = db_version.replace(".", "_")
for table_name in metadata.tables.keys():
if table_name in Base.metadata.tables:
with self._db_session() as session:
if inspector.has_table(f"{table_name}_{db_version_id}"):
self.logger.warning(f'UI table "{table_name}" already exists, dropping it to make room for the new one')
session.execute(text(f"DROP TABLE {table_name}_{db_version_id}"))
session.execute(text(f"ALTER TABLE {table_name} RENAME TO {table_name}_{db_version_id}"))
session.commit()
Base.metadata.drop_all(self.sql_engine)
else:
for table_name in Base.metadata.tables.keys():
if not inspector.has_table(table_name):
self.logger.warning(f'UI table "{table_name}" is missing, creating it')
has_all_tables = False
continue
if has_all_tables and db_version and db_version == bunkerweb_version:
return False, ""
self.logger.info("Creating UI tables ...")
try:
Base.metadata.create_all(self.sql_engine, checkfirst=True)
except BaseException as e:
return False, str(e)
if db_version and db_version != bunkerweb_version:
for table_name, data in old_data.items():
if not data:
continue
self.logger.warning(f'Restoring data for ui table "{table_name}"')
self.logger.debug(f"Data: {data}")
for row in data:
row = {column: getattr(row, column) for column in Base.metadata.tables[table_name].columns.keys() if hasattr(row, column)}
# TODO: Add special handling for newer UI version
with self._db_session() as session:
try:
# Check if the row already exists in the table
existing_row = session.query(Base.metadata.tables[table_name]).filter_by(**row).first()
if not existing_row:
session.execute(Base.metadata.tables[table_name].insert().values(row))
session.commit()
except IntegrityError as e:
session.rollback()
if "Duplicate entry" not in str(e):
self.logger.error(f"Error when trying to restore data for table {table_name}: {e}")
continue
self.logger.debug(e)
return True, ""
def get_ui_user(self, *, username: Optional[str] = None, as_dict: bool = False) -> Optional[Union[Users, dict]]:
"""Get ui user. If username is None, return the first admin user."""
with self._db_session() as session:
if username:
ui_user = session.query(Users).filter_by(username=username).first()
else:
ui_user = session.query(Users).filter_by(admin=True).first()
if not ui_user:
return None
elif not as_dict:
return ui_user
ui_user_data = {
"username": ui_user.username,
"email": ui_user.email,
"password": ui_user.password.encode("utf-8"),
"method": ui_user.method,
"last_login_at": ui_user.last_login_at,
"last_login_ip": ui_user.last_login_ip,
"login_count": ui_user.login_count,
"totp_secret": ui_user.totp_secret,
"creation_date": ui_user.creation_date,
"update_date": ui_user.update_date,
"roles": [],
"recovery_codes": [],
}
for role in session.query(RolesUsers).filter_by(user_name=ui_user.username).all():
ui_user_data["roles"].append(role.role_name)
for recovery_code in session.query(UserRecoveryCodes).filter_by(user_name=ui_user.username).all():
ui_user_data["recovery_codes"].append(recovery_code.code)
return ui_user_data
def create_ui_user(
self,
username: str,
password: bytes,
roles: List[str],
email: Optional[str] = None,
*,
totp_secret: Optional[str] = None,
method: str = "manual",
admin: bool = False,
) -> str:
"""Create ui user."""
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
if admin and session.query(Users).filter_by(admin=True).first():
return "An admin user already exists"
user = session.query(Users).filter_by(username=username).first()
if user:
return f"User {username} already exists"
for role in roles:
if not session.query(Roles).filter_by(name=role).first():
return f"Role {role} doesn't exist"
session.add(RolesUsers(user_name=username, role_name=role))
session.add(
Users(
username=username,
email=email,
password=password.decode("utf-8"),
method=method,
admin=admin,
totp_secret=totp_secret,
totp_refreshed=bool(totp_secret),
)
)
if totp_secret:
for code in gen_recovery_codes():
session.add(UserRecoveryCodes(user_name=username, code=code))
try:
session.commit()
except BaseException as e:
return str(e)
return ""
def update_ui_user(self, username: str, password: bytes, totp_secret: Optional[str], method: str = "manual") -> str:
"""Update ui user."""
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
user = session.query(Users).filter_by(username=username).first()
if not user:
return f"User {username} doesn't exist"
if user.totp_secret != totp_secret:
user.totp_refreshed = True
user.password = password.decode("utf-8")
user.totp_secret = totp_secret
user.method = method
try:
session.commit()
except BaseException as e:
return str(e)
if user.totp_refreshed:
if totp_secret:
self.refresh_ui_user_recovery_codes(username)
else:
self.delete_ui_user_recovery_codes(username)
return ""
def delete_ui_user(self, username: str) -> str:
"""Delete ui user."""
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
user = session.query(Users).filter_by(username=username).first()
if not user:
return f"User {username} doesn't exist"
session.query(RolesUsers).filter_by(user_name=username).delete()
session.query(UserRecoveryCodes).filter_by(user_name=username).delete()
session.delete(user)
try:
session.commit()
except BaseException as e:
return str(e)
return ""
def mark_ui_user_login(self, username: str, date: datetime, ip: str) -> str:
"""Mark ui user login."""
with self._db_session() as session:
user = session.query(Users).filter_by(username=username).first()
if not user:
return f"User {username} doesn't exist"
user.last_login_at = date
user.last_login_ip = ip
user.login_count += 1
try:
session.commit()
except BaseException as e:
return str(e)
return ""
def create_ui_role(self, name: str, description: str, permissions: List[str]) -> str:
"""Create ui role."""
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
if session.query(Roles).filter_by(name=name).first():
return f"Role {name} already exists"
session.add(Roles(name=name, description=description))
for permission in permissions:
if not session.query(Permissions).filter_by(name=permission).first():
session.add(Permissions(name=permission))
session.add(RolesPermissions(role_name=name, permission_name=permission))
try:
session.commit()
except BaseException as e:
return str(e)
return ""
def get_ui_roles(self, *, as_dict: bool = False) -> List[Union[Roles, dict]]:
"""Get ui roles."""
with self._db_session() as session:
roles = session.query(Roles).all()
if not as_dict:
return roles
roles_data = []
for role in roles:
role_data = {
"name": role.name,
"description": role.description,
"update_datetime": role.update_datetime,
"permissions": [],
}
for permission in session.query(RolesPermissions).filter_by(role_name=role.name).all():
role_data["permissions"].append(permission.permission_name)
roles_data.append(role_data)
return roles_data
def refresh_ui_user_recovery_codes(self, username: str) -> str:
"""Refresh ui user recovery codes."""
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
user = session.query(Users).filter_by(username=username).first()
if not user:
return f"User {username} doesn't exist"
session.query(UserRecoveryCodes).filter_by(user_name=username).delete()
for code in gen_recovery_codes():
session.add(UserRecoveryCodes(user_name=username, code=code))
try:
session.commit()
except BaseException as e:
return str(e)
return ""
def delete_ui_user_recovery_codes(self, username: str) -> str:
"""Delete ui user recovery codes."""
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
session.query(UserRecoveryCodes).filter_by(user_name=username).delete()
try:
session.commit()
except BaseException as e:
return str(e)
return ""
def get_ui_user_roles(self, username: str) -> List[str]:
"""Get ui user roles."""
with self._db_session() as session:
return [role.role_name for role in session.query(RolesUsers).filter_by(user_name=username).all()]
def get_ui_role_permissions(self, role_name: str) -> List[str]:
"""Get ui role permissions."""
with self._db_session() as session:
return [permission.permission_name for permission in session.query(RolesPermissions).filter_by(role_name=role_name).all()]
def get_ui_user_recovery_codes(self, username: str) -> List[str]:
"""Get ui user recovery codes."""
with self._db_session() as session:
return [code.code for code in session.query(UserRecoveryCodes).filter_by(user_name=username).all()]
def get_ui_user_permissions(self, username: str) -> List[str]:
"""Get ui user permissions."""
with self._db_session() as session:
roles = session.query(RolesUsers).filter_by(user_name=username).all()
permissions = []
for role in roles:
permissions.extend(self.get_ui_role_permissions(role.role_name))
return permissions
def use_ui_user_recovery_code(self, username: str, code: str) -> str:
"""Use ui user recovery code."""
with self._db_session() as session:
user = session.query(Users).filter_by(username=username).first()
if not user:
return f"User {username} doesn't exist"
recovery_code = session.query(UserRecoveryCodes).filter_by(user_name=username, code=code).first()
if not recovery_code:
return "Invalid recovery code"
session.delete(recovery_code)
try:
session.commit()
except BaseException as e:
return str(e)

View file

@ -3,10 +3,17 @@
from base64 import b64encode
from io import BytesIO
from os.path import join
from threading import Lock
from typing import List, Optional
from bcrypt import checkpw, gensalt, hashpw
from magic import Magic
from passlib.pwd import genword
from qrcode.main import QRCode
from regex import compile as re_compile
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")
LOCK = Lock()
def get_remain(seconds):
@ -207,6 +214,19 @@ def check_settings(settings: dict, check: str) -> bool:
return any(setting["context"] == check for setting in settings.values())
def gen_password_hash(password: str) -> bytes:
return hashpw(password.encode("utf-8"), gensalt(rounds=13))
def check_password(password: str, hashed: bytes) -> bool:
return checkpw(password.encode("utf-8"), hashed)
def gen_recovery_codes() -> List[str]:
pwds = genword(length=16, charset="hex", returns=5)
return ["-".join([pwd[i : i + 4] for i in range(0, len(pwd), 4)]) for pwd in pwds] # noqa: E203
def get_b64encoded_qr_image(data: str):
qr = QRCode(version=1, box_size=10, border=5)
qr.add_data(data)