mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Refactor user management in web UI + Start adding roles logic + Enhanced 2FA feature with recovery codes
This commit is contained in:
parent
e8b3dbc348
commit
26c3da657f
12 changed files with 820 additions and 423 deletions
|
|
@ -29,7 +29,6 @@ from model import (
|
|||
Jobs_cache,
|
||||
Custom_configs,
|
||||
Selects,
|
||||
Users,
|
||||
BwcliCommands,
|
||||
Metadata,
|
||||
)
|
||||
|
|
@ -84,7 +83,7 @@ class Database:
|
|||
if pool:
|
||||
self.logger.warning("The pool parameter is deprecated, it will be removed in the next version")
|
||||
|
||||
self.__session_factory = None
|
||||
self._session_factory = None
|
||||
self.sql_engine = None
|
||||
|
||||
if not sqlalchemy_string:
|
||||
|
|
@ -212,8 +211,8 @@ class Database:
|
|||
|
||||
def __del__(self) -> None:
|
||||
"""Close the database"""
|
||||
if self.__session_factory:
|
||||
self.__session_factory.close_all()
|
||||
if self._session_factory:
|
||||
self._session_factory.close_all()
|
||||
|
||||
if self.sql_engine:
|
||||
self.sql_engine.dispose()
|
||||
|
|
@ -221,13 +220,13 @@ class Database:
|
|||
def test_read(self):
|
||||
"""Test the read access to the database"""
|
||||
self.logger.debug("Testing read access to the database ...")
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
session.execute(text("SELECT 1"))
|
||||
|
||||
def test_write(self):
|
||||
"""Test the write access to the database"""
|
||||
self.logger.debug("Testing write access to the database ...")
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
table_name = uuid4().hex
|
||||
session.execute(text(f"CREATE TABLE IF NOT EXISTS test_{table_name} (id INT)"))
|
||||
session.execute(text(f"DROP TABLE IF EXISTS test_{table_name}"))
|
||||
|
|
@ -259,7 +258,7 @@ class Database:
|
|||
conn.execute(text(f"DROP TABLE IF EXISTS test_{table_name}"))
|
||||
|
||||
@contextmanager
|
||||
def __db_session(self) -> Any:
|
||||
def _db_session(self) -> Any:
|
||||
try:
|
||||
assert self.sql_engine is not None
|
||||
except AssertionError:
|
||||
|
|
@ -302,7 +301,7 @@ class Database:
|
|||
|
||||
def is_setting(self, setting: str, *, multisite: bool = False) -> bool:
|
||||
"""Check if the setting exists in the database and optionally if it's multisite"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
try:
|
||||
multiple = False
|
||||
if self.suffix_rx.search(setting):
|
||||
|
|
@ -323,7 +322,7 @@ class Database:
|
|||
|
||||
def set_failover(self, value: bool = True) -> str:
|
||||
"""Set the failover value"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -342,7 +341,7 @@ class Database:
|
|||
|
||||
def initialize_db(self, version: str, integration: str = "Unknown") -> str:
|
||||
"""Initialize the database"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -397,7 +396,7 @@ class Database:
|
|||
"database_version": "Unknown", # ? Extracted from the database
|
||||
"default": True, # ? Extra field to know if the returned data is the default one
|
||||
}
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
try:
|
||||
database = self.database_uri.split(":")[0].split("+")[0]
|
||||
data["database_version"] = (
|
||||
|
|
@ -422,7 +421,7 @@ class Database:
|
|||
|
||||
def set_metadata(self, data: Dict[str, Any]) -> str:
|
||||
"""Set the metadata values"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -453,7 +452,7 @@ class Database:
|
|||
"""Set changed bit for config, custom configs, instances and plugins"""
|
||||
changes = changes or ["config", "custom_configs", "external_plugins", "pro_plugins", "instances"]
|
||||
plugins_changes = plugins_changes or set()
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -536,14 +535,14 @@ class Database:
|
|||
has_all_tables = False
|
||||
continue
|
||||
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
old_data[table_name] = session.query(metadata.tables[table_name]).all()
|
||||
|
||||
# Rename the old tables
|
||||
db_version_id = db_version.replace(".", "_")
|
||||
for table_name in metadata.tables.keys():
|
||||
if table_name in Base.metadata.tables:
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if inspector.has_table(f"{table_name}_{db_version_id}"):
|
||||
self.logger.warning(f'Table "{table_name}" already exists, dropping it to make room for the new one')
|
||||
session.execute(text(f"DROP TABLE {table_name}_{db_version_id}"))
|
||||
|
|
@ -561,7 +560,7 @@ class Database:
|
|||
return False, str(e)
|
||||
|
||||
to_put = []
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
db_plugins = session.query(Plugins).with_entities(Plugins.id).all()
|
||||
|
||||
db_ids = []
|
||||
|
|
@ -1048,7 +1047,7 @@ class Database:
|
|||
if table_name == "bw_plugins" and "external" in row:
|
||||
row["type"] = "external" if row.pop("external") else "core"
|
||||
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
try:
|
||||
if table_name == "bw_metadata":
|
||||
existing_row = session.query(Metadata).filter_by(id=1).first()
|
||||
|
|
@ -1081,7 +1080,7 @@ class Database:
|
|||
if method == "autoconf":
|
||||
db_config = self.get_non_default_settings(with_drafts=True)
|
||||
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -1365,7 +1364,7 @@ class Database:
|
|||
) -> str:
|
||||
"""Save the custom configs in the database"""
|
||||
message = ""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -1453,7 +1452,7 @@ class Database:
|
|||
if filtered_settings and not global_only:
|
||||
filtered_settings.update(("SERVER_NAME", "MULTISITE"))
|
||||
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
config = original_config or {}
|
||||
multisite = original_multisite or set()
|
||||
|
||||
|
|
@ -1550,7 +1549,7 @@ class Database:
|
|||
filtered_settings: Optional[Union[List[str], Set[str], Tuple[str]]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Get the config from the database"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
config = {}
|
||||
multisite = set()
|
||||
|
||||
|
|
@ -1585,7 +1584,7 @@ class Database:
|
|||
|
||||
def get_custom_configs(self) -> List[Dict[str, Any]]:
|
||||
"""Get the custom configs from the database"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
return [
|
||||
{
|
||||
"service_id": custom_config.service_id,
|
||||
|
|
@ -1630,7 +1629,7 @@ class Database:
|
|||
|
||||
def update_job(self, plugin_id: str, job_name: str, success: bool) -> str:
|
||||
"""Update the job last_run in the database"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -1657,7 +1656,7 @@ class Database:
|
|||
if service_id:
|
||||
filters["service_id"] = service_id
|
||||
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -1680,7 +1679,7 @@ class Database:
|
|||
"""Update the plugin cache in the database"""
|
||||
job_name = job_name or argv[0].replace(".py", "")
|
||||
service_id = service_id or None
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -1713,7 +1712,7 @@ class Database:
|
|||
"""Update external plugins from the database"""
|
||||
to_put = []
|
||||
changes = False
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -2226,7 +2225,7 @@ class Database:
|
|||
def get_plugins(self, *, _type: Literal["all", "external", "pro"] = "all", with_data: bool = False) -> List[Dict[str, Any]]:
|
||||
"""Get all plugins from the database."""
|
||||
plugins = []
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
entities = [Plugins.id, Plugins.stream, Plugins.name, Plugins.description, Plugins.version, Plugins.type, Plugins.method, Plugins.checksum]
|
||||
if with_data:
|
||||
entities.append(Plugins.data) # type: ignore
|
||||
|
|
@ -2292,12 +2291,12 @@ class Database:
|
|||
|
||||
def get_plugins_errors(self) -> int:
|
||||
"""Get plugins errors."""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
return session.query(Jobs).filter(Jobs.success == False).count() # noqa: E712
|
||||
|
||||
def get_jobs(self) -> Dict[str, Dict[str, Any]]:
|
||||
"""Get jobs."""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
return {
|
||||
job.name: {
|
||||
"plugin_id": job.plugin_id,
|
||||
|
|
@ -2346,7 +2345,7 @@ class Database:
|
|||
if service_id:
|
||||
filters["service_id"] = service_id
|
||||
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if plugin_id:
|
||||
job = session.query(Jobs).filter_by(name=job_name, plugin_id=plugin_id).first()
|
||||
if not job:
|
||||
|
|
@ -2368,7 +2367,7 @@ class Database:
|
|||
|
||||
def get_jobs_cache_files(self, *, job_name: str = "", plugin_id: str = "") -> List[Dict[str, Any]]:
|
||||
"""Get jobs cache files."""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
filters = {}
|
||||
query = session.query(Jobs_cache).with_entities(Jobs_cache.job_name, Jobs_cache.service_id, Jobs_cache.file_name, Jobs_cache.data)
|
||||
|
||||
|
|
@ -2413,7 +2412,7 @@ class Database:
|
|||
|
||||
def add_instance(self, hostname: str, port: int, server_name: str, method: str, changed: Optional[bool] = True) -> str:
|
||||
"""Add instance."""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -2441,7 +2440,7 @@ class Database:
|
|||
def update_instances(self, instances: List[Dict[str, Any]], method: str, changed: Optional[bool] = True) -> str:
|
||||
"""Update instances."""
|
||||
to_put = []
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
|
|
@ -2474,7 +2473,7 @@ class Database:
|
|||
|
||||
def get_instances(self, *, method: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Get instances."""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
query = session.query(Instances)
|
||||
if method:
|
||||
query = query.filter_by(method=method)
|
||||
|
|
@ -2491,7 +2490,7 @@ class Database:
|
|||
|
||||
def get_plugin_actions(self, plugin: str) -> Optional[Any]:
|
||||
"""get actions file for the plugin"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
page = session.query(Plugin_pages).with_entities(Plugin_pages.actions_file).filter_by(plugin_id=plugin).first()
|
||||
|
||||
if not page:
|
||||
|
|
@ -2501,7 +2500,7 @@ class Database:
|
|||
|
||||
def get_plugin_template(self, plugin: str) -> Optional[Any]:
|
||||
"""get template file for the plugin"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
page = session.query(Plugin_pages).with_entities(Plugin_pages.template_file).filter_by(plugin_id=plugin).first()
|
||||
|
||||
if not page:
|
||||
|
|
@ -2511,73 +2510,10 @@ class Database:
|
|||
|
||||
def get_plugin_obfuscation(self, plugin: str) -> Optional[Any]:
|
||||
"""get obfuscation file for the plugin"""
|
||||
with self.__db_session() as session:
|
||||
with self._db_session() as session:
|
||||
page = session.query(Plugin_pages).with_entities(Plugin_pages.obfuscation_file).filter_by(plugin_id=plugin).first()
|
||||
|
||||
if not page:
|
||||
return None
|
||||
|
||||
return page.obfuscation_file
|
||||
|
||||
def get_ui_user(self) -> Optional[dict]:
|
||||
"""Get ui user."""
|
||||
with self.__db_session() as session:
|
||||
user = (
|
||||
session.query(Users)
|
||||
.with_entities(Users.username, Users.password, Users.is_two_factor_enabled, Users.secret_token, Users.method)
|
||||
.filter_by(id=1)
|
||||
.first()
|
||||
)
|
||||
if not user:
|
||||
return None
|
||||
return {
|
||||
"username": user.username,
|
||||
"password_hash": user.password.encode("utf-8"),
|
||||
"is_two_factor_enabled": user.is_two_factor_enabled,
|
||||
"secret_token": user.secret_token,
|
||||
"method": user.method,
|
||||
}
|
||||
|
||||
def create_ui_user(self, username: str, password: bytes, *, secret_token: Optional[str] = None, method: str = "manual") -> str:
|
||||
"""Create ui user."""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
user = session.query(Users).filter_by(id=1).first()
|
||||
if user:
|
||||
return "User already exists"
|
||||
|
||||
session.add(Users(id=1, username=username, password=password.decode("utf-8"), secret_token=secret_token, method=method))
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
def update_ui_user(
|
||||
self, username: str, password: bytes, is_two_factor_enabled: bool = False, secret_token: Optional[str] = None, method: str = "ui"
|
||||
) -> str:
|
||||
"""Update ui user."""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
user = session.query(Users).filter_by(id=1).first()
|
||||
if not user:
|
||||
return "User not found"
|
||||
|
||||
user.username = username
|
||||
user.password = password.decode("utf-8")
|
||||
user.is_two_factor_enabled = is_two_factor_enabled
|
||||
user.secret_token = secret_token
|
||||
user.method = method
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
|
|
|||
|
|
@ -209,17 +209,6 @@ class Instances(Base):
|
|||
method = Column(METHODS_ENUM, nullable=False, default="manual")
|
||||
|
||||
|
||||
class Users(Base):
|
||||
__tablename__ = "bw_ui_users"
|
||||
|
||||
id = Column(Integer, primary_key=True, default=1)
|
||||
username = Column(String(256), nullable=False, unique=True)
|
||||
password = Column(String(60), nullable=False)
|
||||
is_two_factor_enabled = Column(Boolean, nullable=False, default=False)
|
||||
secret_token = Column(String(32), nullable=True, unique=True, default=None)
|
||||
method = Column(METHODS_ENUM, nullable=False, default="manual")
|
||||
|
||||
|
||||
class BwcliCommands(Base):
|
||||
__tablename__ = "bw_cli_commands"
|
||||
__table_args__ = (UniqueConstraint("plugin_id", "name"),)
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ from os.path import join
|
|||
from pathlib import Path
|
||||
from signal import SIGINT, SIGTERM, signal
|
||||
from threading import Lock
|
||||
from regex import compile as re_compile
|
||||
from sys import path as sys_path
|
||||
from time import sleep
|
||||
|
||||
|
|
@ -14,10 +13,12 @@ for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in ((
|
|||
if deps_path not in sys_path:
|
||||
sys_path.append(deps_path)
|
||||
|
||||
from Database import Database # type: ignore
|
||||
from common_utils import get_version # type: ignore
|
||||
from logger import setup_logger # type: ignore
|
||||
|
||||
from src.User import User
|
||||
from ui_database import UIDatabase # type: ignore
|
||||
from utils import USER_PASSWORD_RX, check_password, gen_password_hash
|
||||
|
||||
|
||||
TMP_DIR = Path(sep, "var", "tmp", "bunkerweb")
|
||||
RUN_DIR = Path(sep, "var", "run", "bunkerweb")
|
||||
|
|
@ -58,7 +59,7 @@ def on_starting(server):
|
|||
|
||||
LOGGER = setup_logger("UI")
|
||||
|
||||
db = Database(LOGGER, ui=True)
|
||||
db = UIDatabase(LOGGER)
|
||||
|
||||
ready = False
|
||||
while not ready:
|
||||
|
|
@ -70,57 +71,85 @@ def on_starting(server):
|
|||
continue
|
||||
sleep(5)
|
||||
|
||||
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")
|
||||
ret, err = db.init_ui_tables(get_version())
|
||||
|
||||
USER = "Error"
|
||||
while USER == "Error":
|
||||
with suppress(Exception):
|
||||
USER = db.get_ui_user()
|
||||
if not ret and err:
|
||||
LOGGER.error(f"Exception while checking database tables : {err}")
|
||||
exit(1)
|
||||
elif not ret:
|
||||
LOGGER.info("Database ui tables didn't change, skipping update ...")
|
||||
else:
|
||||
LOGGER.info("Database ui tables successfully updated")
|
||||
|
||||
if USER:
|
||||
USER = User(**USER)
|
||||
if not db.get_ui_roles(as_dict=True):
|
||||
ret = db.create_ui_role("admin", "Admin can create account, manager software and read data.", ["manage", "write", "read"])
|
||||
if ret:
|
||||
LOGGER.error(f"Couldn't create the admin role in the database: {ret}")
|
||||
exit(1)
|
||||
|
||||
if getenv("ADMIN_USERNAME") or getenv("ADMIN_PASSWORD"):
|
||||
ret = db.create_ui_role("writer", "Write can manage software and read data but can't create account.", ["write", "read"])
|
||||
if ret:
|
||||
LOGGER.error(f"Couldn't create the admin role in the database: {ret}")
|
||||
exit(1)
|
||||
|
||||
ret = db.create_ui_role("reader", "Reader can read data but can't proceed to any actions.", ["read"])
|
||||
if ret:
|
||||
LOGGER.error(f"Couldn't create the admin role in the database: {ret}")
|
||||
exit(1)
|
||||
|
||||
ADMIN_USER = "Error"
|
||||
while ADMIN_USER == "Error":
|
||||
try:
|
||||
ADMIN_USER = db.get_ui_user(as_dict=True)
|
||||
except BaseException as e:
|
||||
LOGGER.debug(f"Couldn't get the admin user: {e}")
|
||||
sleep(1)
|
||||
|
||||
env_admin_username = getenv("ADMIN_USERNAME", "")
|
||||
env_admin_password = getenv("ADMIN_PASSWORD", "")
|
||||
|
||||
if ADMIN_USER:
|
||||
if env_admin_username or env_admin_password:
|
||||
override_admin_creds = getenv("OVERRIDE_ADMIN_CREDS", "no").lower() == "yes"
|
||||
if USER.method == "manual" or override_admin_creds:
|
||||
if ADMIN_USER["method"] == "manual" or override_admin_creds:
|
||||
updated = False
|
||||
if getenv("ADMIN_USERNAME", "") and USER.get_id() != getenv("ADMIN_USERNAME", ""):
|
||||
USER.id = getenv("ADMIN_USERNAME", "")
|
||||
if env_admin_username and ADMIN_USER["username"] != env_admin_username:
|
||||
ADMIN_USER["username"] = env_admin_username
|
||||
updated = True
|
||||
if getenv("ADMIN_PASSWORD", "") and not USER.check_password(getenv("ADMIN_PASSWORD", "")):
|
||||
if not USER_PASSWORD_RX.match(getenv("ADMIN_PASSWORD", "")):
|
||||
|
||||
if env_admin_password and not check_password(env_admin_password, ADMIN_USER["password"]):
|
||||
if not USER_PASSWORD_RX.match(env_admin_password):
|
||||
LOGGER.warning(
|
||||
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). It will not be updated."
|
||||
)
|
||||
else:
|
||||
USER.update_password(getenv("ADMIN_PASSWORD", ""))
|
||||
ADMIN_USER["password"] = gen_password_hash(env_admin_password)
|
||||
updated = True
|
||||
|
||||
if updated:
|
||||
if override_admin_creds:
|
||||
LOGGER.warning("Overriding the admin user credentials, as the OVERRIDE_ADMIN_CREDS environment variable is set to 'yes'.")
|
||||
err = db.update_ui_user(USER.get_id(), USER.password_hash, USER.is_two_factor_enabled, USER.secret_token, method="manual")
|
||||
err = db.update_ui_user(ADMIN_USER["username"], ADMIN_USER["password"], ADMIN_USER["totp_secret"], method="manual")
|
||||
if err:
|
||||
LOGGER.error(f"Couldn't update the admin user in the database: {err}")
|
||||
else:
|
||||
LOGGER.info("The admin user was updated successfully")
|
||||
else:
|
||||
LOGGER.warning("The admin user wasn't created manually. You can't change it from the environment variables.")
|
||||
elif getenv("ADMIN_USERNAME") and getenv("ADMIN_PASSWORD"):
|
||||
elif env_admin_username and env_admin_password:
|
||||
user_name = env_admin_username or "admin"
|
||||
|
||||
if not getenv("FLASK_DEBUG", False):
|
||||
if len(getenv("ADMIN_USERNAME", "admin")) > 256:
|
||||
if len(user_name) > 256:
|
||||
LOGGER.error("The admin username is too long. It must be less than 256 characters.")
|
||||
exit(1)
|
||||
elif not USER_PASSWORD_RX.match(getenv("ADMIN_PASSWORD", "changeme")):
|
||||
elif not USER_PASSWORD_RX.match(env_admin_password):
|
||||
LOGGER.error(
|
||||
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-)."
|
||||
)
|
||||
exit(1)
|
||||
|
||||
user_name = getenv("ADMIN_USERNAME", "admin")
|
||||
USER = User(user_name, getenv("ADMIN_PASSWORD", "changeme"))
|
||||
ret = db.create_ui_user(user_name, USER.password_hash)
|
||||
|
||||
ret = db.create_ui_user(user_name, gen_password_hash(env_admin_password), ["admin"], admin=True)
|
||||
if ret:
|
||||
LOGGER.error(f"Couldn't create the admin user in the database: {ret}")
|
||||
exit(1)
|
||||
|
|
|
|||
242
src/ui/main.py
242
src/ui/main.py
|
|
@ -28,6 +28,8 @@ from importlib.machinery import SourceFileLoader
|
|||
from io import BytesIO
|
||||
from json import JSONDecodeError, dumps, loads as json_loads
|
||||
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
||||
from pyotp import random_base32
|
||||
from pyotp.totp import TOTP
|
||||
from redis import Redis, Sentinel
|
||||
from regex import compile as re_compile, match as regex_match
|
||||
from requests import get
|
||||
|
|
@ -45,14 +47,15 @@ from src.Instances import Instances
|
|||
from src.ConfigFiles import ConfigFiles
|
||||
from src.Config import Config
|
||||
from src.ReverseProxied import ReverseProxied
|
||||
from src.User import AnonymousUser, User
|
||||
from src.Templates import get_ui_templates
|
||||
|
||||
from utils import check_settings, get_b64encoded_qr_image, path_to_dict, get_remain
|
||||
from utils import check_settings, gen_password_hash, get_b64encoded_qr_image, path_to_dict, get_remain
|
||||
from common_utils import get_version # type: ignore
|
||||
from Database import Database # type: ignore
|
||||
from logger import setup_logger # type: ignore
|
||||
|
||||
from models import AnonymousUser
|
||||
from ui_database import UIDatabase
|
||||
|
||||
TMP_DIR = Path(sep, "var", "tmp", "bunkerweb")
|
||||
TMP_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
|
@ -109,7 +112,6 @@ with app.app_context():
|
|||
app.config["SESSION_COOKIE_SECURE"] = True # Required for __Host- prefix
|
||||
app.config["SESSION_COOKIE_HTTPONLY"] = True # Recommended for security
|
||||
app.config["SESSION_COOKIE_SAMESITE"] = "Lax" # Or 'Strict' for stricter settings
|
||||
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=30)
|
||||
|
||||
login_manager = LoginManager()
|
||||
login_manager.session_protection = "strong"
|
||||
|
|
@ -118,7 +120,7 @@ with app.app_context():
|
|||
login_manager.anonymous_user = AnonymousUser
|
||||
PLUGIN_KEYS = ["id", "name", "description", "version", "stream", "settings"]
|
||||
|
||||
db = Database(app.logger, ui=True, log=False)
|
||||
DB = UIDatabase(app.logger, log=False)
|
||||
|
||||
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")
|
||||
|
||||
|
|
@ -129,13 +131,12 @@ with app.app_context():
|
|||
|
||||
try:
|
||||
app.config.update(
|
||||
INSTANCES=Instances(db),
|
||||
CONFIG=Config(db),
|
||||
INSTANCES=Instances(DB),
|
||||
CONFIG=Config(DB),
|
||||
CONFIGFILES=ConfigFiles(),
|
||||
WTF_CSRF_SSL_STRICT=False,
|
||||
SEND_FILE_MAX_AGE_DEFAULT=86400,
|
||||
SCRIPT_NONCE=sha256(urandom(32)).hexdigest(),
|
||||
DB=db,
|
||||
UI_TEMPLATES=get_ui_templates(),
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
|
|
@ -159,7 +160,7 @@ def wait_applying():
|
|||
current_time = datetime.now()
|
||||
ready = False
|
||||
while not ready and (datetime.now() - current_time).seconds < 120:
|
||||
db_metadata = app.config["DB"].get_metadata()
|
||||
db_metadata = DB.get_metadata()
|
||||
if isinstance(db_metadata, str):
|
||||
app.logger.error(f"An error occurred when checking for changes in the database : {db_metadata}")
|
||||
elif not any(
|
||||
|
|
@ -242,12 +243,12 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
|
|||
# UTILS
|
||||
def run_action(plugin: str, function_name: str = ""):
|
||||
message = ""
|
||||
module = app.config["DB"].get_plugin_actions(plugin)
|
||||
module = DB.get_plugin_actions(plugin)
|
||||
|
||||
if module is None:
|
||||
return {"status": "ko", "code": 404, "message": "The actions.py file for the plugin does not exist"}
|
||||
|
||||
obfuscation = app.config["DB"].get_plugin_obfuscation(plugin)
|
||||
obfuscation = DB.get_plugin_obfuscation(plugin)
|
||||
tmp_dir = None
|
||||
|
||||
try:
|
||||
|
|
@ -320,7 +321,7 @@ def run_action(plugin: str, function_name: str = ""):
|
|||
|
||||
|
||||
def get_user_info():
|
||||
return current_user.get_id(), current_user.password_hash, current_user.is_two_factor_enabled, current_user.secret_token
|
||||
return current_user.get_id(), current_user.password.encode("utf-8"), bool(current_user.totp_secret), current_user.totp_secret
|
||||
|
||||
|
||||
def verify_data_in_form(data: dict[str, Union[tuple, any]] = {}, err_message: str = "", redirect_url: str = "", next: bool = False) -> Union[bool, Response]:
|
||||
|
|
@ -366,11 +367,11 @@ def error_message(msg: str):
|
|||
@app.context_processor
|
||||
def inject_variables():
|
||||
ui_data = get_ui_data()
|
||||
metadata = app.config["DB"].get_metadata()
|
||||
metadata = DB.get_metadata()
|
||||
|
||||
changes_ongoing = any(
|
||||
v
|
||||
for k, v in app.config["DB"].get_metadata().items()
|
||||
for k, v in DB.get_metadata().items()
|
||||
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
|
||||
)
|
||||
changes = False
|
||||
|
|
@ -447,13 +448,20 @@ def set_security_headers(response):
|
|||
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
admin_user = app.config["DB"].get_ui_user()
|
||||
if not admin_user:
|
||||
app.logger.warning("Couldn't get the admin user from the database.")
|
||||
def load_user(username):
|
||||
ui_user = DB.get_ui_user(username=username)
|
||||
if not ui_user:
|
||||
app.logger.warning(f"Couldn't get the user {username} from the database.")
|
||||
return None
|
||||
admin_user = User(**admin_user)
|
||||
return admin_user if user_id == admin_user.get_id() else None
|
||||
|
||||
ui_user.list_roles = DB.get_ui_user_roles(username)
|
||||
for role in ui_user.list_roles:
|
||||
ui_user.list_permissions.extend(DB.get_ui_role_permissions(role))
|
||||
|
||||
if ui_user.totp_secret:
|
||||
ui_user.list_recovery_codes = DB.get_ui_user_recovery_codes(username)
|
||||
|
||||
return ui_user
|
||||
|
||||
|
||||
@app.errorhandler(CSRFError)
|
||||
|
|
@ -468,7 +476,7 @@ def handle_csrf_error(_):
|
|||
flash("Wrong CSRF token !", "error")
|
||||
if not current_user:
|
||||
return render_template("setup.html"), 403
|
||||
return render_template("login.html", is_totp=current_user.is_two_factor_enabled), 403
|
||||
return render_template("login.html", is_totp=bool(current_user.totp_secret)), 403
|
||||
|
||||
|
||||
@app.before_request
|
||||
|
|
@ -484,58 +492,58 @@ def before_request():
|
|||
|
||||
if not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")):
|
||||
if (
|
||||
app.config["DB"].database_uri
|
||||
and app.config["DB"].readonly
|
||||
DB.database_uri
|
||||
and DB.readonly
|
||||
and (
|
||||
datetime.now(timezone.utc) - datetime.fromisoformat(ui_data.get("LAST_DATABASE_RETRY", "1970-01-01T00:00:00")).replace(tzinfo=timezone.utc)
|
||||
> timedelta(minutes=1)
|
||||
)
|
||||
):
|
||||
try:
|
||||
app.config["DB"].retry_connection(pool_timeout=1)
|
||||
app.config["DB"].retry_connection(log=False)
|
||||
DB.retry_connection(pool_timeout=1)
|
||||
DB.retry_connection(log=False)
|
||||
ui_data["READONLY_MODE"] = False
|
||||
app.logger.info("The database is no longer read-only, defaulting to read-write mode")
|
||||
except BaseException:
|
||||
try:
|
||||
app.config["DB"].retry_connection(readonly=True, pool_timeout=1)
|
||||
app.config["DB"].retry_connection(readonly=True, log=False)
|
||||
DB.retry_connection(readonly=True, pool_timeout=1)
|
||||
DB.retry_connection(readonly=True, log=False)
|
||||
except BaseException:
|
||||
if app.config["DB"].database_uri_readonly:
|
||||
if DB.database_uri_readonly:
|
||||
with suppress(BaseException):
|
||||
app.config["DB"].retry_connection(fallback=True, pool_timeout=1)
|
||||
app.config["DB"].retry_connection(fallback=True, log=False)
|
||||
DB.retry_connection(fallback=True, pool_timeout=1)
|
||||
DB.retry_connection(fallback=True, log=False)
|
||||
ui_data["READONLY_MODE"] = True
|
||||
ui_data["LAST_DATABASE_RETRY"] = app.config["DB"].last_connection_retry.isoformat()
|
||||
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat()
|
||||
elif not ui_data.get("READONLY_MODE", False) and request.method == "POST" and not ("/totp" in request.path or "/login" in request.path):
|
||||
try:
|
||||
app.config["DB"].test_write()
|
||||
DB.test_write()
|
||||
ui_data["READONLY_MODE"] = False
|
||||
except BaseException:
|
||||
ui_data["READONLY_MODE"] = True
|
||||
ui_data["LAST_DATABASE_RETRY"] = app.config["DB"].last_connection_retry.isoformat()
|
||||
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat()
|
||||
else:
|
||||
try:
|
||||
app.config["DB"].test_read()
|
||||
DB.test_read()
|
||||
except BaseException:
|
||||
ui_data["LAST_DATABASE_RETRY"] = app.config["DB"].last_connection_retry.isoformat()
|
||||
ui_data["LAST_DATABASE_RETRY"] = DB.last_connection_retry.isoformat()
|
||||
|
||||
app.config["DB"].readonly = ui_data.get("READONLY_MODE", False)
|
||||
DB.readonly = ui_data.get("READONLY_MODE", False)
|
||||
with LOCK:
|
||||
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
|
||||
|
||||
if app.config["DB"].readonly:
|
||||
if DB.readonly:
|
||||
flash("Database connection is in read-only mode : no modification possible.", "error")
|
||||
|
||||
if current_user.is_authenticated:
|
||||
passed = True
|
||||
|
||||
# Case not login page, keep on 2FA before any other access
|
||||
if not session.get("totp_validated", False) and current_user.is_two_factor_enabled and "/totp" not in request.path:
|
||||
if not session.get("totp_validated", False) and bool(current_user.totp_secret) and "/totp" not in request.path:
|
||||
if not request.path.endswith("/login"):
|
||||
return redirect(url_for("totp", next=request.form.get("next")))
|
||||
passed = False
|
||||
elif session.get("ip") != request.remote_addr:
|
||||
elif current_user.last_login_ip != request.remote_addr:
|
||||
passed = False
|
||||
elif session.get("user_agent") != request.headers.get("User-Agent"):
|
||||
passed = False
|
||||
|
|
@ -546,7 +554,7 @@ def before_request():
|
|||
|
||||
@app.route("/", strict_slashes=False)
|
||||
def index():
|
||||
if app.config["DB"].get_ui_user():
|
||||
if DB.get_ui_user():
|
||||
if current_user.is_authenticated: # type: ignore
|
||||
return redirect(url_for("home"))
|
||||
return redirect(url_for("login"), 301)
|
||||
|
|
@ -569,9 +577,7 @@ def check():
|
|||
def setup():
|
||||
db_config = app.config["CONFIG"].get_config(methods=False, filtered_settings=("SERVER_NAME", "MULTISITE", "USE_UI", "UI_HOST", "AUTO_LETS_ENCRYPT"))
|
||||
|
||||
admin_user = app.config["DB"].get_ui_user()
|
||||
if admin_user:
|
||||
admin_user = User(**admin_user)
|
||||
admin_user = DB.get_ui_user()
|
||||
|
||||
ui_reverse_proxy = False
|
||||
for server_name in db_config["SERVER_NAME"].split(" "):
|
||||
|
|
@ -582,7 +588,7 @@ def setup():
|
|||
break
|
||||
|
||||
if request.method == "POST":
|
||||
if app.config["DB"].readonly:
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "setup")
|
||||
|
||||
required_keys = []
|
||||
|
|
@ -607,9 +613,7 @@ def setup():
|
|||
"setup",
|
||||
)
|
||||
|
||||
admin_user = User(request.form["admin_username"], request.form["admin_password"], method="ui")
|
||||
|
||||
ret = app.config["DB"].create_ui_user(request.form["admin_username"], admin_user.password_hash, method="ui")
|
||||
ret = DB.create_ui_user(request.form["admin_username"], gen_password_hash(request.form["admin_password"]), ["admin"], method="ui", admin=True)
|
||||
if ret:
|
||||
return handle_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error")
|
||||
|
||||
|
|
@ -685,16 +689,17 @@ def setup_loading():
|
|||
@login_required
|
||||
def totp():
|
||||
if request.method == "POST":
|
||||
|
||||
verify_data_in_form(data={"totp_token": None}, err_message="No token provided on /totp.", redirect_url="totp")
|
||||
|
||||
if not current_user.check_otp(request.form["totp_token"]):
|
||||
if request.form["totp_token"] not in current_user.list_recovery_codes and not current_user.check_otp(request.form["totp_token"]):
|
||||
return handle_error("The token is invalid.", "totp")
|
||||
elif request.form["totp_token"] in current_user.list_recovery_codes:
|
||||
DB.use_ui_user_recovery_code(current_user.get_id(), request.form["totp_token"])
|
||||
|
||||
session["totp_validated"] = True
|
||||
redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
|
||||
|
||||
if not current_user.is_two_factor_enabled or session.get("totp_validated", False):
|
||||
if not bool(current_user.totp_secret) or session.get("totp_validated", False):
|
||||
return redirect(url_for("home"))
|
||||
|
||||
return render_template("totp.html")
|
||||
|
|
@ -748,7 +753,7 @@ def home():
|
|||
services_autoconf_count += 1
|
||||
services += 1
|
||||
|
||||
metadata = app.config["DB"].get_metadata()
|
||||
metadata = DB.get_metadata()
|
||||
|
||||
data = {
|
||||
"check_version": not remote_version or bw_version == remote_version,
|
||||
|
|
@ -765,7 +770,7 @@ def home():
|
|||
"pro_services": metadata["pro_services"],
|
||||
"pro_overlapped": metadata["pro_overlapped"],
|
||||
"plugins_number": len(app.config["CONFIG"].get_plugins()),
|
||||
"plugins_errors": app.config["DB"].get_plugins_errors(),
|
||||
"plugins_errors": DB.get_plugins_errors(),
|
||||
}
|
||||
|
||||
data_server_builder = home_builder(data)
|
||||
|
|
@ -777,7 +782,7 @@ def home():
|
|||
@login_required
|
||||
def account():
|
||||
if request.method == "POST":
|
||||
if app.config["DB"].readonly:
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "account")
|
||||
|
||||
verify_data_in_form(
|
||||
|
|
@ -803,11 +808,11 @@ def account():
|
|||
|
||||
# Force job to contact PRO API
|
||||
# by setting the last check to None
|
||||
metadata = app.config["DB"].get_metadata()
|
||||
metadata = DB.get_metadata()
|
||||
metadata["last_pro_check"] = None
|
||||
app.config["DB"].set_pro_metadata(metadata)
|
||||
DB.set_pro_metadata(metadata)
|
||||
|
||||
curr_changes = app.config["DB"].check_changes()
|
||||
curr_changes = DB.check_changes()
|
||||
|
||||
# Reload instances
|
||||
def update_global_config(threaded: bool = False):
|
||||
|
|
@ -843,8 +848,7 @@ def account():
|
|||
|
||||
username = current_user.get_id()
|
||||
password = request.form["curr_password"]
|
||||
is_two_factor_enabled = current_user.is_two_factor_enabled
|
||||
secret_token = current_user.secret_token
|
||||
secret_token = current_user.totp_secret
|
||||
|
||||
if request.form["operation"] == "username":
|
||||
verify_data_in_form(data={"admin_username": None}, err_message="Missing admin username parameter on /account.", redirect_url="account")
|
||||
|
|
@ -881,21 +885,19 @@ def account():
|
|||
|
||||
ui_data = get_ui_data()
|
||||
|
||||
if not current_user.check_otp(request.form["totp_token"], secret=ui_data.get("CURRENT_TOTP_TOKEN", None)):
|
||||
if request.form["totp_token"] not in current_user.list_recovery_codes and not current_user.check_otp(
|
||||
request.form["totp_token"], secret=session.get("tmp_totp_secret")
|
||||
):
|
||||
return handle_error("The totp token is invalid. (totp)", "account")
|
||||
|
||||
session["totp_validated"] = not current_user.is_two_factor_enabled
|
||||
is_two_factor_enabled = session["totp_validated"]
|
||||
secret_token = None if current_user.is_two_factor_enabled else ui_data.get("CURRENT_TOTP_TOKEN", None)
|
||||
ui_data["CURRENT_TOTP_TOKEN"] = None
|
||||
session["totp_validated"] = not bool(current_user.totp_secret)
|
||||
secret_token = None if bool(current_user.totp_secret) else session.get("tmp_totp_secret")
|
||||
session["tmp_totp_secret"] = None
|
||||
|
||||
with LOCK:
|
||||
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
|
||||
|
||||
user = User(username, password, is_two_factor_enabled=is_two_factor_enabled, secret_token=secret_token, method=current_user.method)
|
||||
ret = app.config["DB"].update_ui_user(
|
||||
username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui"
|
||||
)
|
||||
ret = DB.update_ui_user(username, gen_password_hash(password), secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
|
||||
if ret:
|
||||
return handle_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
|
||||
|
||||
|
|
@ -903,30 +905,24 @@ def account():
|
|||
(
|
||||
f"The {request.form['operation']} has been successfully updated."
|
||||
if request.form["operation"] != "totp"
|
||||
else f"The two-factor authentication was successfully {'disabled' if current_user.is_two_factor_enabled else 'enabled'}."
|
||||
else f"The two-factor authentication was successfully {'disabled' if bool(current_user.totp_secret) else 'enabled'}."
|
||||
),
|
||||
)
|
||||
|
||||
return redirect(url_for("account" if request.form["operation"] == "totp" else "login"))
|
||||
|
||||
secret_token = ""
|
||||
totp_qr_image = ""
|
||||
|
||||
if not current_user.is_two_factor_enabled:
|
||||
current_user.refresh_totp()
|
||||
secret_token = current_user.secret_token
|
||||
totp_qr_image = get_b64encoded_qr_image(current_user.get_authentication_setup_uri())
|
||||
|
||||
ui_data = get_ui_data()
|
||||
ui_data["CURRENT_TOTP_TOKEN"] = secret_token
|
||||
with LOCK:
|
||||
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
|
||||
if not bool(current_user.totp_secret):
|
||||
session["tmp_totp_secret"] = random_base32()
|
||||
totp = TOTP(session["tmp_totp_secret"])
|
||||
totp_qr_image = get_b64encoded_qr_image(totp.provisioning_uri(name=current_user.get_id(), issuer_name="BunkerWeb UI"))
|
||||
|
||||
return render_template(
|
||||
"account.html",
|
||||
username=current_user.get_id(),
|
||||
is_totp=current_user.is_two_factor_enabled,
|
||||
secret_token=secret_token,
|
||||
is_totp=bool(current_user.totp_secret),
|
||||
secret_token=session["tmp_totp_secret"],
|
||||
totp_qr_image=totp_qr_image,
|
||||
)
|
||||
|
||||
|
|
@ -981,7 +977,7 @@ def instances():
|
|||
|
||||
|
||||
def get_service_data():
|
||||
config = app.config["DB"].get_config(methods=True, with_drafts=True)
|
||||
config = DB.get_config(methods=True, with_drafts=True)
|
||||
# Check variables
|
||||
variables = deepcopy(request.form.to_dict())
|
||||
del variables["csrf_token"]
|
||||
|
|
@ -1060,7 +1056,7 @@ def get_service_data():
|
|||
# @login_required
|
||||
# def services():
|
||||
# if request.method == "POST":
|
||||
# if app.config["DB"].readonly:
|
||||
# if DB.readonly:
|
||||
# return handle_error("Database is in read-only mode", "services")
|
||||
|
||||
# verify_data_in_form(
|
||||
|
|
@ -1089,7 +1085,7 @@ def get_service_data():
|
|||
# if config.get(f"{request.form['SERVER_NAME'].split(' ')[0]}_SERVER_NAME", {"method": "scheduler"})["method"] != "ui":
|
||||
# return handle_error("The service cannot be deleted because it has not been created with the UI.", "services", True)
|
||||
|
||||
# db_metadata = app.config["DB"].get_metadata()
|
||||
# db_metadata = DB.get_metadata()
|
||||
|
||||
# def update_services(threaded: bool = False):
|
||||
# wait_applying()
|
||||
|
|
@ -1136,7 +1132,7 @@ def get_service_data():
|
|||
|
||||
# # Display services
|
||||
# services = []
|
||||
# tmp_config = app.config["DB"].get_config(methods=True, with_drafts=True).copy()
|
||||
# tmp_config = DB.get_config(methods=True, with_drafts=True).copy()
|
||||
# service_names = tmp_config["SERVER_NAME"]["value"].split(" ")
|
||||
|
||||
# table_settings = (
|
||||
|
|
@ -1156,7 +1152,7 @@ def get_service_data():
|
|||
# for service in service_names:
|
||||
# service_settings = {}
|
||||
|
||||
# # For each needed setting, get the service value if one, else the global (value), else defautl value
|
||||
# # For each needed setting, get the service value if one, else the global (value), else default value
|
||||
# for setting in table_settings:
|
||||
# value = tmp_config.get(f"{service}_{setting}", tmp_config.get(setting, {"value": None}))["value"]
|
||||
# method = tmp_config.get(f"{service}_{setting}", tmp_config.get(setting, {"method": None}))["method"]
|
||||
|
|
@ -1178,7 +1174,7 @@ def get_service_data():
|
|||
@login_required
|
||||
def services():
|
||||
if request.method == "POST":
|
||||
if app.config["DB"].readonly:
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "services")
|
||||
|
||||
verify_data_in_form(
|
||||
|
|
@ -1207,7 +1203,7 @@ def services():
|
|||
if config.get(f"{request.form['SERVER_NAME'].split(' ')[0]}_SERVER_NAME", {"method": "scheduler"})["method"] != "ui":
|
||||
return handle_error("The service cannot be deleted because it has not been created with the UI.", "services", True)
|
||||
|
||||
db_metadata = app.config["DB"].get_metadata()
|
||||
db_metadata = DB.get_metadata()
|
||||
|
||||
def update_services(threaded: bool = False):
|
||||
wait_applying()
|
||||
|
|
@ -1254,7 +1250,7 @@ def services():
|
|||
|
||||
# Display services
|
||||
services = []
|
||||
global_config = app.config["DB"].get_config(methods=True, with_drafts=True)
|
||||
global_config = DB.get_config(methods=True, with_drafts=True)
|
||||
service_names = global_config["SERVER_NAME"]["value"].split(" ")
|
||||
for service in service_names:
|
||||
service_settings = []
|
||||
|
|
@ -1305,7 +1301,7 @@ def services():
|
|||
@login_required
|
||||
def global_config():
|
||||
if request.method == "POST":
|
||||
if app.config["DB"].readonly:
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "global_config")
|
||||
|
||||
# Check variables
|
||||
|
|
@ -1313,7 +1309,7 @@ def global_config():
|
|||
del variables["csrf_token"]
|
||||
|
||||
# Edit check fields and remove already existing ones
|
||||
config = app.config["DB"].get_config(methods=True, with_drafts=True)
|
||||
config = DB.get_config(methods=True, with_drafts=True)
|
||||
services = config["SERVER_NAME"]["value"].split(" ")
|
||||
for variable, value in variables.copy().items():
|
||||
setting = config.get(variable, {"value": None, "global": True})
|
||||
|
|
@ -1332,7 +1328,7 @@ def global_config():
|
|||
if setting and setting["global"] and (setting["value"] != value or setting["value"] == config.get(variable, {"value": None})["value"]):
|
||||
variables[f"{service}_{variable}"] = value
|
||||
|
||||
db_metadata = app.config["DB"].get_metadata()
|
||||
db_metadata = DB.get_metadata()
|
||||
|
||||
def update_global_config(threaded: bool = False):
|
||||
wait_applying()
|
||||
|
|
@ -1372,7 +1368,7 @@ def global_config():
|
|||
)
|
||||
)
|
||||
|
||||
global_config = app.config["DB"].get_config(global_only=True, methods=True)
|
||||
global_config = DB.get_config(global_only=True, methods=True)
|
||||
plugins = app.config["CONFIG"].get_plugins()
|
||||
data_server_builder = global_config_builder(plugins, global_config)
|
||||
return render_template("global-config.html", data_server_builder=data_server_builder)
|
||||
|
|
@ -1381,10 +1377,10 @@ def global_config():
|
|||
@app.route("/configs", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def configs():
|
||||
db_configs = app.config["DB"].get_custom_configs()
|
||||
db_configs = DB.get_custom_configs()
|
||||
|
||||
if request.method == "POST":
|
||||
if app.config["DB"].readonly:
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "configs")
|
||||
|
||||
operation = ""
|
||||
|
|
@ -1471,7 +1467,7 @@ def configs():
|
|||
del db_configs[index]
|
||||
operation = f"Deleted config {name}{f' for service {service_id}' if service_id else ''}"
|
||||
|
||||
error = app.config["DB"].save_custom_configs([config for config in db_configs if config["method"] == "ui"], "ui")
|
||||
error = DB.save_custom_configs([config for config in db_configs if config["method"] == "ui"], "ui")
|
||||
if error:
|
||||
app.logger.error(f"Could not save custom configs: {error}")
|
||||
return handle_error("Couldn't save custom configs", "configs", True)
|
||||
|
|
@ -1503,7 +1499,7 @@ def plugins():
|
|||
tmp_ui_path = TMP_DIR.joinpath("ui")
|
||||
|
||||
if request.method == "POST":
|
||||
if app.config["DB"].readonly:
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "plugins")
|
||||
|
||||
verify_data_in_form(
|
||||
|
|
@ -1524,7 +1520,7 @@ def plugins():
|
|||
if variables["type"] in ("core", "pro"):
|
||||
return handle_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True)
|
||||
|
||||
db_metadata = app.config["DB"].get_metadata()
|
||||
db_metadata = DB.get_metadata()
|
||||
|
||||
def update_plugins(threaded: bool = False): # type: ignore
|
||||
wait_applying()
|
||||
|
|
@ -1536,7 +1532,7 @@ def plugins():
|
|||
|
||||
ui_data = get_ui_data()
|
||||
|
||||
err = app.config["DB"].update_external_plugins(plugins)
|
||||
err = DB.update_external_plugins(plugins)
|
||||
if err:
|
||||
message = f"Couldn't update external plugins to database: {err}"
|
||||
if threaded:
|
||||
|
|
@ -1737,7 +1733,7 @@ def plugins():
|
|||
if errors >= files_count:
|
||||
return redirect(url_for("loading", next=url_for("plugins")))
|
||||
|
||||
db_metadata = app.config["DB"].get_metadata()
|
||||
db_metadata = DB.get_metadata()
|
||||
|
||||
def update_plugins(threaded: bool = False):
|
||||
wait_applying()
|
||||
|
|
@ -1750,7 +1746,7 @@ def plugins():
|
|||
|
||||
ui_data = get_ui_data()
|
||||
|
||||
err = app.config["DB"].update_external_plugins(new_plugins, delete_missing=False)
|
||||
err = DB.update_external_plugins(new_plugins, delete_missing=False)
|
||||
if err:
|
||||
message = f"Couldn't update external plugins to database: {err}"
|
||||
if threaded:
|
||||
|
|
@ -1813,7 +1809,7 @@ def plugins():
|
|||
@app.route("/plugins/upload", methods=["POST"])
|
||||
@login_required
|
||||
def upload_plugin():
|
||||
if app.config["DB"].readonly:
|
||||
if DB.readonly:
|
||||
return {"status": "ko", "message": "Database is in read-only mode"}, 403
|
||||
|
||||
if not request.files:
|
||||
|
|
@ -1893,7 +1889,7 @@ def custom_plugin(plugin: str):
|
|||
if request.method == "GET":
|
||||
|
||||
# Check template
|
||||
page = app.config["DB"].get_plugin_template(plugin)
|
||||
page = DB.get_plugin_template(plugin)
|
||||
|
||||
if not page:
|
||||
return error_message("The plugin does not have a template"), 404
|
||||
|
|
@ -1917,7 +1913,7 @@ def custom_plugin(plugin: str):
|
|||
if plugin_id is None:
|
||||
return error_message("Plugin not found"), 404
|
||||
|
||||
config = app.config["DB"].get_config()
|
||||
config = DB.get_config()
|
||||
|
||||
# Check if we are using metrics
|
||||
for service in config.get("SERVER_NAME", "").split(" "):
|
||||
|
|
@ -2025,7 +2021,7 @@ def cache():
|
|||
path_to_dict(
|
||||
join(sep, "var", "cache", "bunkerweb"),
|
||||
is_cache=True,
|
||||
db_data=app.config["DB"].get_jobs_cache_files(),
|
||||
db_data=DB.get_jobs_cache_files(),
|
||||
services=app.config["CONFIG"].get_config(global_only=True, methods=False, filtered_settings=("SERVER_NAME",)).get("SERVER_NAME", "").split(" "),
|
||||
)
|
||||
],
|
||||
|
|
@ -2312,7 +2308,7 @@ def reports():
|
|||
def bans():
|
||||
if request.method == "POST":
|
||||
|
||||
if app.config["DB"].readonly:
|
||||
if DB.readonly:
|
||||
return handle_error("Database is in read-only mode", "bans")
|
||||
|
||||
# Check variables
|
||||
|
|
@ -2516,7 +2512,7 @@ def bans():
|
|||
@app.route("/jobs", methods=["GET"])
|
||||
@login_required
|
||||
def jobs():
|
||||
data_server_builder = jobs_builder(app.config["DB"].get_jobs())
|
||||
data_server_builder = jobs_builder(DB.get_jobs())
|
||||
return render_template("jobs.html", data_server_builder=data_server_builder)
|
||||
|
||||
|
||||
|
|
@ -2531,7 +2527,7 @@ def jobs_download():
|
|||
if not plugin_id or not job_name or not file_name:
|
||||
return jsonify({"status": "ko", "message": "plugin_id, job_name and file_name are required"}), 422
|
||||
|
||||
cache_file = app.config["DB"].get_job_cache_file(job_name, file_name, service_id=service_id, plugin_id=plugin_id)
|
||||
cache_file = DB.get_job_cache_file(job_name, file_name, service_id=service_id, plugin_id=plugin_id)
|
||||
|
||||
if not cache_file:
|
||||
return jsonify({"status": "ko", "message": "file not found"}), 404
|
||||
|
|
@ -2543,7 +2539,7 @@ def jobs_download():
|
|||
|
||||
@app.route("/login", methods=["GET", "POST"])
|
||||
def login():
|
||||
admin_user = app.config["DB"].get_ui_user()
|
||||
admin_user = DB.get_ui_user()
|
||||
if not admin_user:
|
||||
return redirect(url_for("setup"))
|
||||
elif current_user.is_authenticated: # type: ignore
|
||||
|
|
@ -2553,15 +2549,27 @@ def login():
|
|||
if request.method == "POST" and "username" in request.form and "password" in request.form:
|
||||
app.logger.warning(f"Login attempt from {request.remote_addr} with username \"{request.form['username']}\"")
|
||||
|
||||
admin_user = User(**admin_user)
|
||||
|
||||
if admin_user.get_id() == request.form["username"] and admin_user.check_password(request.form["password"]):
|
||||
ui_user = DB.get_ui_user(username=request.form["username"])
|
||||
if ui_user and ui_user.username == request.form["username"] and ui_user.check_password(request.form["password"]):
|
||||
# log the user in
|
||||
session.permanent = True
|
||||
session["ip"] = request.remote_addr
|
||||
session["user_agent"] = request.headers.get("User-Agent")
|
||||
session["totp_validated"] = False
|
||||
login_user(admin_user, duration=timedelta(hours=8), force=True)
|
||||
session["tmp_totp_secret"] = None
|
||||
|
||||
ui_user.last_login_at = datetime.now()
|
||||
ui_user.last_login_ip = request.remote_addr
|
||||
ui_user.login_count += 1
|
||||
|
||||
DB.mark_ui_user_login(ui_user.username, ui_user.last_login_at, ui_user.last_login_ip)
|
||||
|
||||
if not login_user(ui_user, remember=request.form.get("remember") == "on"):
|
||||
flash("Couldn't log you in, please try again", "error")
|
||||
return (render_template("login.html", error="Couldn't log you in, please try again"),)
|
||||
|
||||
app.logger.info(
|
||||
f"User {ui_user.username} logged in successfully for the {str(ui_user.login_count) + ('th' if 10 <= ui_user.login_count % 100 <= 20 else {1: 'st', 2: 'nd', 3: 'rd'}.get(ui_user.login_count % 10, 'th'))} time"
|
||||
+ (" with remember me" if request.form.get("remember") == "on" else "")
|
||||
)
|
||||
|
||||
# redirect him to the page he originally wanted or to the home page
|
||||
return redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
|
||||
|
|
@ -2570,7 +2578,7 @@ def login():
|
|||
fail = True
|
||||
|
||||
kwargs = {
|
||||
"is_totp": current_user.is_two_factor_enabled,
|
||||
"is_totp": bool(current_user.totp_secret),
|
||||
} | ({"error": "Invalid username or password"} if fail else {})
|
||||
|
||||
return render_template("login.html", **kwargs), 401 if fail else 200
|
||||
|
|
@ -2606,4 +2614,6 @@ def check_reloading():
|
|||
def logout():
|
||||
session.clear()
|
||||
logout_user()
|
||||
return redirect(url_for("login"))
|
||||
response = redirect(url_for("login"))
|
||||
response.headers["Clear-Site-Data"] = '"cache", "cookies", "storage", "executionContexts"'
|
||||
return response
|
||||
|
|
|
|||
130
src/ui/models.py
Normal file
130
src/ui/models.py
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
from datetime import datetime, timezone
|
||||
from functools import partial
|
||||
from os.path import join, sep
|
||||
from sys import path as sys_path
|
||||
from typing import Optional
|
||||
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("db",))]:
|
||||
if deps_path not in sys_path:
|
||||
sys_path.append(deps_path)
|
||||
|
||||
from bcrypt import checkpw
|
||||
from flask_login import AnonymousUserMixin, UserMixin
|
||||
from pyotp.totp import TOTP
|
||||
from sqlalchemy.orm import declarative_base, relationship
|
||||
from sqlalchemy import Boolean, DateTime, Column, Integer, String, ForeignKey, func
|
||||
|
||||
|
||||
from model import METHODS_ENUM # type: ignore
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class AnonymousUser(AnonymousUserMixin):
|
||||
username = "Anonymous"
|
||||
email = None
|
||||
password = ""
|
||||
method = "manual"
|
||||
admin = False
|
||||
last_login_at = None
|
||||
last_login_ip = None
|
||||
login_count = 0
|
||||
totp_secret = None
|
||||
creation_date = datetime.now(timezone.utc)
|
||||
update_date = datetime.now(timezone.utc)
|
||||
|
||||
def check_password(self, password: str) -> bool:
|
||||
return False
|
||||
|
||||
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
|
||||
if not secret:
|
||||
raise ValueError("Secret is required for anonymous user")
|
||||
return TOTP(secret).verify(otp, valid_window=3)
|
||||
|
||||
|
||||
class Users(Base, UserMixin):
|
||||
__tablename__ = "bw_ui_users"
|
||||
|
||||
username = Column(String(256), primary_key=True)
|
||||
email = Column(String(256), unique=True, nullable=True)
|
||||
password = Column(String(60), nullable=False)
|
||||
method = Column(METHODS_ENUM, nullable=False, default="manual")
|
||||
admin = Column(Boolean, nullable=False, default=False)
|
||||
|
||||
# Trackable
|
||||
last_login_at = Column(DateTime(), nullable=True)
|
||||
last_login_ip = Column(String(39), nullable=True)
|
||||
login_count = Column(Integer, default=0, nullable=False)
|
||||
|
||||
# 2FA
|
||||
totp_secret = Column(String(32), nullable=True)
|
||||
totp_refreshed = Column(Boolean, nullable=False, default=False)
|
||||
|
||||
creation_date = Column(DateTime(), nullable=False, server_default=func.now())
|
||||
update_date = Column(DateTime(), nullable=False, server_default=func.now(), onupdate=partial(datetime.now, timezone.utc))
|
||||
|
||||
roles = relationship("RolesUsers", back_populates="user", cascade="all")
|
||||
recovery_codes = relationship("UserRecoveryCodes", back_populates="user", cascade="all")
|
||||
list_roles: list[str] = []
|
||||
list_permissions: list[str] = []
|
||||
list_recovery_codes: list[str] = []
|
||||
|
||||
def get_id(self):
|
||||
return self.username
|
||||
|
||||
def check_password(self, password: str) -> bool:
|
||||
return checkpw(password.encode("utf-8"), self.password.encode("utf-8"))
|
||||
|
||||
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
|
||||
if secret:
|
||||
return TOTP(secret).verify(otp, valid_window=3)
|
||||
return TOTP(self.totp_secret).verify(otp, valid_window=3)
|
||||
|
||||
|
||||
class Roles(Base):
|
||||
__tablename__ = "bw_ui_roles"
|
||||
|
||||
name = Column(String(64), primary_key=True)
|
||||
description = Column(String(256), nullable=False)
|
||||
update_datetime = Column(DateTime(), nullable=False, server_default=func.now(), onupdate=partial(datetime.now, timezone.utc))
|
||||
|
||||
users = relationship("RolesUsers", back_populates="role", cascade="all")
|
||||
permissions = relationship("RolesPermissions", back_populates="role", cascade="all")
|
||||
|
||||
|
||||
class RolesUsers(Base):
|
||||
__tablename__ = "bw_ui_roles_users"
|
||||
|
||||
user_name = Column(String(256), ForeignKey("bw_ui_users.username", onupdate="cascade", ondelete="cascade"), primary_key=True)
|
||||
role_name = Column(String(64), ForeignKey("bw_ui_roles.name", onupdate="cascade", ondelete="cascade"), primary_key=True)
|
||||
|
||||
user = relationship("Users", back_populates="roles")
|
||||
role = relationship("Roles", back_populates="users")
|
||||
|
||||
|
||||
class UserRecoveryCodes(Base):
|
||||
__tablename__ = "bw_ui_user_recovery_codes"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
user_name = Column(String(256), ForeignKey("bw_ui_users.username", onupdate="cascade", ondelete="cascade"), nullable=False)
|
||||
code = Column(String(19), nullable=False)
|
||||
|
||||
user = relationship("Users", back_populates="recovery_codes")
|
||||
|
||||
|
||||
class RolesPermissions(Base):
|
||||
__tablename__ = "bw_ui_roles_permissions"
|
||||
|
||||
role_name = Column(String(64), ForeignKey("bw_ui_roles.name", onupdate="cascade", ondelete="cascade"), primary_key=True)
|
||||
permission_name = Column(String(64), ForeignKey("bw_ui_permissions.name", onupdate="cascade", ondelete="cascade"), primary_key=True)
|
||||
|
||||
role = relationship("Roles", back_populates="permissions")
|
||||
permission = relationship("Permissions", back_populates="roles")
|
||||
|
||||
|
||||
class Permissions(Base):
|
||||
__tablename__ = "bw_ui_permissions"
|
||||
|
||||
name = Column(String(64), primary_key=True)
|
||||
|
||||
roles = relationship("RolesPermissions", back_populates="permission", cascade="all")
|
||||
|
|
@ -1,9 +1,10 @@
|
|||
bcrypt==4.1.3
|
||||
bcrypt==4.2.0
|
||||
beautifulsoup4==4.12.3
|
||||
Flask==3.0.3
|
||||
Flask-Login==0.6.3
|
||||
Flask_WTF==1.2.1
|
||||
gunicorn[gthread]==22.0.0
|
||||
passlib==1.7.4
|
||||
pyotp==2.9.0
|
||||
python-magic==0.4.27
|
||||
python_dateutil==2.9.0.post0
|
||||
|
|
|
|||
|
|
@ -4,34 +4,34 @@
|
|||
#
|
||||
# pip-compile --allow-unsafe --generate-hashes --strip-extras requirements.in
|
||||
#
|
||||
bcrypt==4.1.3 \
|
||||
--hash=sha256:01746eb2c4299dd0ae1670234bf77704f581dd72cc180f444bfe74eb80495b64 \
|
||||
--hash=sha256:037c5bf7c196a63dcce75545c8874610c600809d5d82c305dd327cd4969995bf \
|
||||
--hash=sha256:094fd31e08c2b102a14880ee5b3d09913ecf334cd604af27e1013c76831f7b05 \
|
||||
--hash=sha256:0d4cf6ef1525f79255ef048b3489602868c47aea61f375377f0d00514fe4a78c \
|
||||
--hash=sha256:193bb49eeeb9c1e2db9ba65d09dc6384edd5608d9d672b4125e9320af9153a15 \
|
||||
--hash=sha256:2505b54afb074627111b5a8dc9b6ae69d0f01fea65c2fcaea403448c503d3991 \
|
||||
--hash=sha256:2ee15dd749f5952fe3f0430d0ff6b74082e159c50332a1413d51b5689cf06623 \
|
||||
--hash=sha256:31adb9cbb8737a581a843e13df22ffb7c84638342de3708a98d5c986770f2834 \
|
||||
--hash=sha256:3a5be252fef513363fe281bafc596c31b552cf81d04c5085bc5dac29670faa08 \
|
||||
--hash=sha256:3d3b317050a9a711a5c7214bf04e28333cf528e0ed0ec9a4e55ba628d0f07c1a \
|
||||
--hash=sha256:48429c83292b57bf4af6ab75809f8f4daf52aa5d480632e53707805cc1ce9b74 \
|
||||
--hash=sha256:4a8bea4c152b91fd8319fef4c6a790da5c07840421c2b785084989bf8bbb7455 \
|
||||
--hash=sha256:4fb253d65da30d9269e0a6f4b0de32bd657a0208a6f4e43d3e645774fb5457f3 \
|
||||
--hash=sha256:551b320396e1d05e49cc18dd77d970accd52b322441628aca04801bbd1d52a73 \
|
||||
--hash=sha256:5f7cd3399fbc4ec290378b541b0cf3d4398e4737a65d0f938c7c0f9d5e686611 \
|
||||
--hash=sha256:6004f5229b50f8493c49232b8e75726b568535fd300e5039e255d919fc3a07f2 \
|
||||
--hash=sha256:6717543d2c110a155e6821ce5670c1f512f602eabb77dba95717ca76af79867d \
|
||||
--hash=sha256:6cac78a8d42f9d120b3987f82252bdbeb7e6e900a5e1ba37f6be6fe4e3848286 \
|
||||
--hash=sha256:8a893d192dfb7c8e883c4576813bf18bb9d59e2cfd88b68b725990f033f1b978 \
|
||||
--hash=sha256:8cbb119267068c2581ae38790e0d1fbae65d0725247a930fc9900c285d95725d \
|
||||
--hash=sha256:9f8ea645eb94fb6e7bea0cf4ba121c07a3a182ac52876493870033141aa687bc \
|
||||
--hash=sha256:c4c8d9b3e97209dd7111bf726e79f638ad9224b4691d1c7cfefa571a09b1b2d6 \
|
||||
--hash=sha256:cb9c707c10bddaf9e5ba7cdb769f3e889e60b7d4fea22834b261f51ca2b89fed \
|
||||
--hash=sha256:d84702adb8f2798d813b17d8187d27076cca3cd52fe3686bb07a9083930ce650 \
|
||||
--hash=sha256:ec3c2e1ca3e5c4b9edb94290b356d082b721f3f50758bce7cce11d8a7c89ce84 \
|
||||
--hash=sha256:f44a97780677e7ac0ca393bd7982b19dbbd8d7228c1afe10b128fd9550eef5f1 \
|
||||
--hash=sha256:f5698ce5292a4e4b9e5861f7e53b1d89242ad39d54c3da451a93cac17b61921a
|
||||
bcrypt==4.2.0 \
|
||||
--hash=sha256:096a15d26ed6ce37a14c1ac1e48119660f21b24cba457f160a4b830f3fe6b5cb \
|
||||
--hash=sha256:0da52759f7f30e83f1e30a888d9163a81353ef224d82dc58eb5bb52efcabc399 \
|
||||
--hash=sha256:1bb429fedbe0249465cdd85a58e8376f31bb315e484f16e68ca4c786dcc04291 \
|
||||
--hash=sha256:1d84cf6d877918620b687b8fd1bf7781d11e8a0998f576c7aa939776b512b98d \
|
||||
--hash=sha256:1ee38e858bf5d0287c39b7a1fc59eec64bbf880c7d504d3a06a96c16e14058e7 \
|
||||
--hash=sha256:1ff39b78a52cf03fdf902635e4c81e544714861ba3f0efc56558979dd4f09170 \
|
||||
--hash=sha256:27fe0f57bb5573104b5a6de5e4153c60814c711b29364c10a75a54bb6d7ff48d \
|
||||
--hash=sha256:3413bd60460f76097ee2e0a493ccebe4a7601918219c02f503984f0a7ee0aebe \
|
||||
--hash=sha256:3698393a1b1f1fd5714524193849d0c6d524d33523acca37cd28f02899285060 \
|
||||
--hash=sha256:373db9abe198e8e2c70d12b479464e0d5092cc122b20ec504097b5f2297ed184 \
|
||||
--hash=sha256:39e1d30c7233cfc54f5c3f2c825156fe044efdd3e0b9d309512cc514a263ec2a \
|
||||
--hash=sha256:3bbbfb2734f0e4f37c5136130405332640a1e46e6b23e000eeff2ba8d005da68 \
|
||||
--hash=sha256:3d3a6d28cb2305b43feac298774b997e372e56c7c7afd90a12b3dc49b189151c \
|
||||
--hash=sha256:5a1e8aa9b28ae28020a3ac4b053117fb51c57a010b9f969603ed885f23841458 \
|
||||
--hash=sha256:61ed14326ee023917ecd093ee6ef422a72f3aec6f07e21ea5f10622b735538a9 \
|
||||
--hash=sha256:655ea221910bcac76ea08aaa76df427ef8625f92e55a8ee44fbf7753dbabb328 \
|
||||
--hash=sha256:762a2c5fb35f89606a9fde5e51392dad0cd1ab7ae64149a8b935fe8d79dd5ed7 \
|
||||
--hash=sha256:77800b7147c9dc905db1cba26abe31e504d8247ac73580b4aa179f98e6608f34 \
|
||||
--hash=sha256:8ac68872c82f1add6a20bd489870c71b00ebacd2e9134a8aa3f98a0052ab4b0e \
|
||||
--hash=sha256:8d7bb9c42801035e61c109c345a28ed7e84426ae4865511eb82e913df18f58c2 \
|
||||
--hash=sha256:8f6ede91359e5df88d1f5c1ef47428a4420136f3ce97763e31b86dd8280fbdf5 \
|
||||
--hash=sha256:9c1c4ad86351339c5f320ca372dfba6cb6beb25e8efc659bedd918d921956bae \
|
||||
--hash=sha256:c02d944ca89d9b1922ceb8a46460dd17df1ba37ab66feac4870f6862a1533c00 \
|
||||
--hash=sha256:c52aac18ea1f4a4f65963ea4f9530c306b56ccd0c6f8c8da0c06976e34a6e841 \
|
||||
--hash=sha256:cb2a8ec2bc07d3553ccebf0746bbf3d19426d1c6d1adbd4fa48925f66af7b9e8 \
|
||||
--hash=sha256:cf69eaf5185fd58f268f805b505ce31f9b9fc2d64b376642164e9244540c1221 \
|
||||
--hash=sha256:f4f4acf526fcd1c34e7ce851147deedd4e26e6402369304220250598b26448db
|
||||
# via -r requirements.in
|
||||
beautifulsoup4==4.12.3 \
|
||||
--hash=sha256:74e3d1928edc070d21748185c46e3fb33490f22f52a3addee9aee0f4f7781051 \
|
||||
|
|
@ -64,9 +64,9 @@ gunicorn==22.0.0 \
|
|||
--hash=sha256:350679f91b24062c86e386e198a15438d53a7a8207235a78ba1b53df4c4378d9 \
|
||||
--hash=sha256:4a0b436239ff76fb33f11c07a16482c521a7e09c1ce3cc293c2330afe01bec63
|
||||
# via -r requirements.in
|
||||
importlib-metadata==8.0.0 \
|
||||
--hash=sha256:15584cf2b1bf449d98ff8a6ff1abef57bf20f3ac6454f431736cd3e660921b2f \
|
||||
--hash=sha256:188bd24e4c346d3f0a933f275c2fec67050326a856b9a359881d7c2a697e8812
|
||||
importlib-metadata==8.2.0 \
|
||||
--hash=sha256:11901fa0c2f97919b288679932bb64febaeacf289d18ac84dd68cb2e74213369 \
|
||||
--hash=sha256:72e8d4399996132204f9a16dcc751af254a48f8d1b20b9ff0f98d4a8f901e73d
|
||||
# via flask
|
||||
itsdangerous==2.2.0 \
|
||||
--hash=sha256:c6242fc49e35958c8b15141343aa660db5fc54d4f13a1db01a3f5891b98700ef \
|
||||
|
|
@ -147,6 +147,10 @@ packaging==24.1 \
|
|||
--hash=sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002 \
|
||||
--hash=sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124
|
||||
# via gunicorn
|
||||
passlib==1.7.4 \
|
||||
--hash=sha256:aa6bca462b8d8bda89c70b382f0c298a20b5560af6cbfa2dce410c0a2fb669f1 \
|
||||
--hash=sha256:defd50f72b65c5402ab2c573830a6978e5f202ad0d984793c8dde2c4152ebe04
|
||||
# via -r requirements.in
|
||||
pyotp==2.9.0 \
|
||||
--hash=sha256:346b6642e0dbdde3b4ff5a930b664ca82abfa116356ed48cc42c7d6590d36f63 \
|
||||
--hash=sha256:81c2e5865b8ac55e825b0358e496e1d9387c811e85bb40e71a3b29b288963612
|
||||
|
|
|
|||
|
|
@ -1,137 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from bcrypt import checkpw, hashpw, gensalt
|
||||
from flask_login import AnonymousUserMixin, UserMixin
|
||||
from pyotp import random_base32
|
||||
from pyotp.totp import TOTP
|
||||
|
||||
|
||||
class User(UserMixin):
|
||||
def __init__(
|
||||
self,
|
||||
username: str,
|
||||
password: Optional[str] = None,
|
||||
*,
|
||||
is_two_factor_enabled: bool = False,
|
||||
password_hash: Optional[bytes] = None,
|
||||
secret_token: Optional[str] = None,
|
||||
method: str = "manual",
|
||||
):
|
||||
self.id = username
|
||||
|
||||
if not password:
|
||||
assert password_hash, "Either password or password_hash must be provided"
|
||||
|
||||
self.__password = password_hash or hashpw(password.encode("utf-8"), gensalt(rounds=13)) # type: ignore
|
||||
self.is_two_factor_enabled = is_two_factor_enabled
|
||||
self.secret_token = secret_token
|
||||
self.method = method
|
||||
self.__totp = TOTP(secret_token) if secret_token else None
|
||||
|
||||
@property
|
||||
def password_hash(self) -> bytes:
|
||||
"""
|
||||
Get the password hash
|
||||
|
||||
:return: The password hash
|
||||
"""
|
||||
return self.__password
|
||||
|
||||
def update_password(self, password: str):
|
||||
"""
|
||||
Set the password by hashing it
|
||||
|
||||
:param password: The password to be hashed
|
||||
"""
|
||||
self.__password = hashpw(password.encode("utf-8"), gensalt(rounds=13))
|
||||
|
||||
def check_password(self, password: str):
|
||||
"""
|
||||
Check if the password is correct by hashing it and comparing it to the stored hash
|
||||
|
||||
:param password: The password to be checked
|
||||
:return: The password is being checked against the password hash. If the password is correct,
|
||||
the user is returned.
|
||||
"""
|
||||
return checkpw(password.encode("utf-8"), self.__password)
|
||||
|
||||
def get_authentication_setup_uri(self) -> str:
|
||||
if not self.__totp:
|
||||
return ""
|
||||
return self.__totp.provisioning_uri(name=self.id, issuer_name="BunkerWeb UI")
|
||||
|
||||
def refresh_totp(self):
|
||||
self.secret_token = random_base32()
|
||||
self.__totp = TOTP(self.secret_token)
|
||||
|
||||
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Check if the otp is correct by comparing it to the stored secret token
|
||||
|
||||
:param otp: The otp to be checked
|
||||
:return: The otp is being checked against the secret token. If the otp is correct,
|
||||
the user is returned.
|
||||
"""
|
||||
if secret:
|
||||
return TOTP(secret).verify(otp, valid_window=3)
|
||||
if not self.__totp:
|
||||
return False
|
||||
return self.__totp.verify(otp, valid_window=3)
|
||||
|
||||
def __repr__(self):
|
||||
return f"User({self.id!r}, {self.__password!r}, {self.is_two_factor_enabled!r}, {self.secret_token!r}, {self.method!r})"
|
||||
|
||||
|
||||
class AnonymousUser(AnonymousUserMixin):
|
||||
def __init__(self):
|
||||
self.id = None
|
||||
self.is_two_factor_enabled = False
|
||||
self.secret_token = None
|
||||
self.method = "manual"
|
||||
|
||||
@property
|
||||
def password_hash(self) -> None:
|
||||
"""
|
||||
Get the password hash
|
||||
|
||||
:return: The password hash
|
||||
"""
|
||||
return None
|
||||
|
||||
def update_password(self, password: str):
|
||||
"""
|
||||
Set the password by hashing it
|
||||
|
||||
:param password: The password to be hashed
|
||||
"""
|
||||
self.__password = hashpw(password.encode("utf-8"), gensalt(rounds=13))
|
||||
|
||||
def check_password(self, password: str):
|
||||
"""
|
||||
Check if the password is correct by hashing it and comparing it to the stored hash
|
||||
|
||||
:param password: The password to be checked
|
||||
:return: The password is being checked against the password hash. If the password is correct,
|
||||
the user is returned.
|
||||
"""
|
||||
return False
|
||||
|
||||
def get_authentication_setup_uri(self) -> str:
|
||||
return ""
|
||||
|
||||
def refresh_totp(self):
|
||||
return
|
||||
|
||||
def check_otp(self, otp: str, *, secret: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Check if the otp is correct by comparing it to the stored secret token
|
||||
|
||||
:param otp: The otp to be checked
|
||||
:return: The otp is being checked against the secret token. If the otp is correct,
|
||||
the user is returned.
|
||||
"""
|
||||
if secret:
|
||||
return TOTP(secret).verify(otp, valid_window=3)
|
||||
return False
|
||||
2
src/ui/templates/account.html
vendored
2
src/ui/templates/account.html
vendored
|
|
@ -498,7 +498,7 @@
|
|||
{% if is_totp or not is_totp %}
|
||||
<!-- username inpt-->
|
||||
<div class="flex flex-col relative col-span-12 px-4 my-2 md:px-6 md:my-3 lg:px-6 lg:my-3 max-w-[400px] w-full">
|
||||
<h5 class="input-title">2FA code</h5>
|
||||
<h5 class="input-title">2FA code or Backup code</h5>
|
||||
<label class="sr-only" for="totp_token">totp code</label>
|
||||
<input {% if is_readonly %}disabled{% endif %}
|
||||
type="text"
|
||||
|
|
|
|||
9
src/ui/templates/login.html
vendored
9
src/ui/templates/login.html
vendored
|
|
@ -96,6 +96,15 @@
|
|||
required />
|
||||
</div>
|
||||
<!-- end password inpt-->
|
||||
<!-- remember me inpt-->
|
||||
<div class="flex items-center my-3">
|
||||
<label class="sr-only" for="remember">Remember me</label>
|
||||
<input type="checkbox" id="remember" name="remember" class="mr-2" value="on" />
|
||||
<h5 class="text-sm font-bold dark:text-gray-300 dark:opacity-90 transition duration-300 ease-in-out m-0">
|
||||
Remember me
|
||||
</h5>
|
||||
</div>
|
||||
<!-- end remember me inpt-->
|
||||
<!-- totp -->
|
||||
<div class="flex justify-center">
|
||||
<button type="submit"
|
||||
|
|
|
|||
406
src/ui/ui_database.py
Normal file
406
src/ui/ui_database.py
Normal file
|
|
@ -0,0 +1,406 @@
|
|||
from datetime import datetime
|
||||
from logging import Logger
|
||||
from os import sep
|
||||
from os.path import join
|
||||
from sys import path as sys_path
|
||||
from time import sleep
|
||||
from typing import List, Optional, Tuple, Union
|
||||
|
||||
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
|
||||
if deps_path not in sys_path:
|
||||
sys_path.append(deps_path)
|
||||
|
||||
from sqlalchemy import MetaData, inspect, text
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from Database import Database # type: ignore
|
||||
from models import Base, Users, Roles, RolesUsers, UserRecoveryCodes, RolesPermissions, Permissions
|
||||
from utils import gen_recovery_codes
|
||||
|
||||
|
||||
class UIDatabase(Database):
|
||||
def __init__(self, logger: Logger, sqlalchemy_string: Optional[str] = None, *, pool: Optional[bool] = None, log: bool = True, **kwargs) -> None:
|
||||
super().__init__(logger, sqlalchemy_string, ui=True, pool=pool, log=log, **kwargs)
|
||||
|
||||
def init_ui_tables(self, bunkerweb_version: str) -> Tuple[bool, str]:
|
||||
"""Initialize the database ui tables and return the result"""
|
||||
|
||||
if self.readonly:
|
||||
return False, "The database is read-only, the changes will not be saved"
|
||||
|
||||
assert self.sql_engine is not None, "The database engine is not initialized"
|
||||
|
||||
inspector = inspect(self.sql_engine)
|
||||
db_version = None
|
||||
has_all_tables = True
|
||||
old_data = {}
|
||||
|
||||
if inspector and len(inspector.get_table_names()):
|
||||
metadata = self.get_metadata()
|
||||
db_version = metadata["version"]
|
||||
if metadata["default"]:
|
||||
db_version = "error"
|
||||
|
||||
if db_version != bunkerweb_version:
|
||||
self.logger.warning(f"Database version ({db_version}) is different from BunkerWeb version ({bunkerweb_version}), migrating ui tables ...")
|
||||
current_time = datetime.now()
|
||||
error = True
|
||||
while error:
|
||||
try:
|
||||
metadata = MetaData()
|
||||
metadata.reflect(self.sql_engine)
|
||||
error = False
|
||||
except BaseException as e:
|
||||
if (datetime.now() - current_time).total_seconds() > 10:
|
||||
raise e
|
||||
sleep(1)
|
||||
|
||||
assert isinstance(metadata, MetaData)
|
||||
|
||||
for table_name in Base.metadata.tables.keys():
|
||||
if not inspector.has_table(table_name):
|
||||
self.logger.warning(f'UI table "{table_name}" is missing, creating it')
|
||||
has_all_tables = False
|
||||
continue
|
||||
|
||||
with self._db_session() as session:
|
||||
old_data[table_name] = session.query(metadata.tables[table_name]).all()
|
||||
|
||||
# Rename the old tables
|
||||
db_version_id = db_version.replace(".", "_")
|
||||
for table_name in metadata.tables.keys():
|
||||
if table_name in Base.metadata.tables:
|
||||
with self._db_session() as session:
|
||||
if inspector.has_table(f"{table_name}_{db_version_id}"):
|
||||
self.logger.warning(f'UI table "{table_name}" already exists, dropping it to make room for the new one')
|
||||
session.execute(text(f"DROP TABLE {table_name}_{db_version_id}"))
|
||||
session.execute(text(f"ALTER TABLE {table_name} RENAME TO {table_name}_{db_version_id}"))
|
||||
session.commit()
|
||||
|
||||
Base.metadata.drop_all(self.sql_engine)
|
||||
else:
|
||||
for table_name in Base.metadata.tables.keys():
|
||||
if not inspector.has_table(table_name):
|
||||
self.logger.warning(f'UI table "{table_name}" is missing, creating it')
|
||||
has_all_tables = False
|
||||
continue
|
||||
|
||||
if has_all_tables and db_version and db_version == bunkerweb_version:
|
||||
return False, ""
|
||||
|
||||
self.logger.info("Creating UI tables ...")
|
||||
|
||||
try:
|
||||
Base.metadata.create_all(self.sql_engine, checkfirst=True)
|
||||
except BaseException as e:
|
||||
return False, str(e)
|
||||
|
||||
if db_version and db_version != bunkerweb_version:
|
||||
for table_name, data in old_data.items():
|
||||
if not data:
|
||||
continue
|
||||
|
||||
self.logger.warning(f'Restoring data for ui table "{table_name}"')
|
||||
self.logger.debug(f"Data: {data}")
|
||||
for row in data:
|
||||
row = {column: getattr(row, column) for column in Base.metadata.tables[table_name].columns.keys() if hasattr(row, column)}
|
||||
|
||||
# TODO: Add special handling for newer UI version
|
||||
|
||||
with self._db_session() as session:
|
||||
try:
|
||||
# Check if the row already exists in the table
|
||||
existing_row = session.query(Base.metadata.tables[table_name]).filter_by(**row).first()
|
||||
if not existing_row:
|
||||
session.execute(Base.metadata.tables[table_name].insert().values(row))
|
||||
session.commit()
|
||||
except IntegrityError as e:
|
||||
session.rollback()
|
||||
if "Duplicate entry" not in str(e):
|
||||
self.logger.error(f"Error when trying to restore data for table {table_name}: {e}")
|
||||
continue
|
||||
self.logger.debug(e)
|
||||
|
||||
return True, ""
|
||||
|
||||
def get_ui_user(self, *, username: Optional[str] = None, as_dict: bool = False) -> Optional[Union[Users, dict]]:
|
||||
"""Get ui user. If username is None, return the first admin user."""
|
||||
with self._db_session() as session:
|
||||
if username:
|
||||
ui_user = session.query(Users).filter_by(username=username).first()
|
||||
else:
|
||||
ui_user = session.query(Users).filter_by(admin=True).first()
|
||||
|
||||
if not ui_user:
|
||||
return None
|
||||
elif not as_dict:
|
||||
return ui_user
|
||||
|
||||
ui_user_data = {
|
||||
"username": ui_user.username,
|
||||
"email": ui_user.email,
|
||||
"password": ui_user.password.encode("utf-8"),
|
||||
"method": ui_user.method,
|
||||
"last_login_at": ui_user.last_login_at,
|
||||
"last_login_ip": ui_user.last_login_ip,
|
||||
"login_count": ui_user.login_count,
|
||||
"totp_secret": ui_user.totp_secret,
|
||||
"creation_date": ui_user.creation_date,
|
||||
"update_date": ui_user.update_date,
|
||||
"roles": [],
|
||||
"recovery_codes": [],
|
||||
}
|
||||
|
||||
for role in session.query(RolesUsers).filter_by(user_name=ui_user.username).all():
|
||||
ui_user_data["roles"].append(role.role_name)
|
||||
for recovery_code in session.query(UserRecoveryCodes).filter_by(user_name=ui_user.username).all():
|
||||
ui_user_data["recovery_codes"].append(recovery_code.code)
|
||||
|
||||
return ui_user_data
|
||||
|
||||
def create_ui_user(
|
||||
self,
|
||||
username: str,
|
||||
password: bytes,
|
||||
roles: List[str],
|
||||
email: Optional[str] = None,
|
||||
*,
|
||||
totp_secret: Optional[str] = None,
|
||||
method: str = "manual",
|
||||
admin: bool = False,
|
||||
) -> str:
|
||||
"""Create ui user."""
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
if admin and session.query(Users).filter_by(admin=True).first():
|
||||
return "An admin user already exists"
|
||||
|
||||
user = session.query(Users).filter_by(username=username).first()
|
||||
if user:
|
||||
return f"User {username} already exists"
|
||||
|
||||
for role in roles:
|
||||
if not session.query(Roles).filter_by(name=role).first():
|
||||
return f"Role {role} doesn't exist"
|
||||
session.add(RolesUsers(user_name=username, role_name=role))
|
||||
|
||||
session.add(
|
||||
Users(
|
||||
username=username,
|
||||
email=email,
|
||||
password=password.decode("utf-8"),
|
||||
method=method,
|
||||
admin=admin,
|
||||
totp_secret=totp_secret,
|
||||
totp_refreshed=bool(totp_secret),
|
||||
)
|
||||
)
|
||||
|
||||
if totp_secret:
|
||||
for code in gen_recovery_codes():
|
||||
session.add(UserRecoveryCodes(user_name=username, code=code))
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
def update_ui_user(self, username: str, password: bytes, totp_secret: Optional[str], method: str = "manual") -> str:
|
||||
"""Update ui user."""
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
user = session.query(Users).filter_by(username=username).first()
|
||||
if not user:
|
||||
return f"User {username} doesn't exist"
|
||||
|
||||
if user.totp_secret != totp_secret:
|
||||
user.totp_refreshed = True
|
||||
|
||||
user.password = password.decode("utf-8")
|
||||
user.totp_secret = totp_secret
|
||||
user.method = method
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
if user.totp_refreshed:
|
||||
if totp_secret:
|
||||
self.refresh_ui_user_recovery_codes(username)
|
||||
else:
|
||||
self.delete_ui_user_recovery_codes(username)
|
||||
|
||||
return ""
|
||||
|
||||
def delete_ui_user(self, username: str) -> str:
|
||||
"""Delete ui user."""
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
user = session.query(Users).filter_by(username=username).first()
|
||||
if not user:
|
||||
return f"User {username} doesn't exist"
|
||||
|
||||
session.query(RolesUsers).filter_by(user_name=username).delete()
|
||||
session.query(UserRecoveryCodes).filter_by(user_name=username).delete()
|
||||
session.delete(user)
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
def mark_ui_user_login(self, username: str, date: datetime, ip: str) -> str:
|
||||
"""Mark ui user login."""
|
||||
with self._db_session() as session:
|
||||
user = session.query(Users).filter_by(username=username).first()
|
||||
if not user:
|
||||
return f"User {username} doesn't exist"
|
||||
|
||||
user.last_login_at = date
|
||||
user.last_login_ip = ip
|
||||
user.login_count += 1
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
def create_ui_role(self, name: str, description: str, permissions: List[str]) -> str:
|
||||
"""Create ui role."""
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
if session.query(Roles).filter_by(name=name).first():
|
||||
return f"Role {name} already exists"
|
||||
|
||||
session.add(Roles(name=name, description=description))
|
||||
|
||||
for permission in permissions:
|
||||
if not session.query(Permissions).filter_by(name=permission).first():
|
||||
session.add(Permissions(name=permission))
|
||||
session.add(RolesPermissions(role_name=name, permission_name=permission))
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
def get_ui_roles(self, *, as_dict: bool = False) -> List[Union[Roles, dict]]:
|
||||
"""Get ui roles."""
|
||||
with self._db_session() as session:
|
||||
roles = session.query(Roles).all()
|
||||
if not as_dict:
|
||||
return roles
|
||||
|
||||
roles_data = []
|
||||
for role in roles:
|
||||
role_data = {
|
||||
"name": role.name,
|
||||
"description": role.description,
|
||||
"update_datetime": role.update_datetime,
|
||||
"permissions": [],
|
||||
}
|
||||
|
||||
for permission in session.query(RolesPermissions).filter_by(role_name=role.name).all():
|
||||
role_data["permissions"].append(permission.permission_name)
|
||||
|
||||
roles_data.append(role_data)
|
||||
|
||||
return roles_data
|
||||
|
||||
def refresh_ui_user_recovery_codes(self, username: str) -> str:
|
||||
"""Refresh ui user recovery codes."""
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
user = session.query(Users).filter_by(username=username).first()
|
||||
if not user:
|
||||
return f"User {username} doesn't exist"
|
||||
|
||||
session.query(UserRecoveryCodes).filter_by(user_name=username).delete()
|
||||
|
||||
for code in gen_recovery_codes():
|
||||
session.add(UserRecoveryCodes(user_name=username, code=code))
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
def delete_ui_user_recovery_codes(self, username: str) -> str:
|
||||
"""Delete ui user recovery codes."""
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
session.query(UserRecoveryCodes).filter_by(user_name=username).delete()
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
def get_ui_user_roles(self, username: str) -> List[str]:
|
||||
"""Get ui user roles."""
|
||||
with self._db_session() as session:
|
||||
return [role.role_name for role in session.query(RolesUsers).filter_by(user_name=username).all()]
|
||||
|
||||
def get_ui_role_permissions(self, role_name: str) -> List[str]:
|
||||
"""Get ui role permissions."""
|
||||
with self._db_session() as session:
|
||||
return [permission.permission_name for permission in session.query(RolesPermissions).filter_by(role_name=role_name).all()]
|
||||
|
||||
def get_ui_user_recovery_codes(self, username: str) -> List[str]:
|
||||
"""Get ui user recovery codes."""
|
||||
with self._db_session() as session:
|
||||
return [code.code for code in session.query(UserRecoveryCodes).filter_by(user_name=username).all()]
|
||||
|
||||
def get_ui_user_permissions(self, username: str) -> List[str]:
|
||||
"""Get ui user permissions."""
|
||||
with self._db_session() as session:
|
||||
roles = session.query(RolesUsers).filter_by(user_name=username).all()
|
||||
|
||||
permissions = []
|
||||
for role in roles:
|
||||
permissions.extend(self.get_ui_role_permissions(role.role_name))
|
||||
return permissions
|
||||
|
||||
def use_ui_user_recovery_code(self, username: str, code: str) -> str:
|
||||
"""Use ui user recovery code."""
|
||||
with self._db_session() as session:
|
||||
user = session.query(Users).filter_by(username=username).first()
|
||||
if not user:
|
||||
return f"User {username} doesn't exist"
|
||||
|
||||
recovery_code = session.query(UserRecoveryCodes).filter_by(user_name=username, code=code).first()
|
||||
if not recovery_code:
|
||||
return "Invalid recovery code"
|
||||
|
||||
session.delete(recovery_code)
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
|
@ -3,10 +3,17 @@
|
|||
from base64 import b64encode
|
||||
from io import BytesIO
|
||||
from os.path import join
|
||||
from threading import Lock
|
||||
from typing import List, Optional
|
||||
|
||||
from bcrypt import checkpw, gensalt, hashpw
|
||||
from magic import Magic
|
||||
from passlib.pwd import genword
|
||||
from qrcode.main import QRCode
|
||||
from regex import compile as re_compile
|
||||
|
||||
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")
|
||||
LOCK = Lock()
|
||||
|
||||
|
||||
def get_remain(seconds):
|
||||
|
|
@ -207,6 +214,19 @@ def check_settings(settings: dict, check: str) -> bool:
|
|||
return any(setting["context"] == check for setting in settings.values())
|
||||
|
||||
|
||||
def gen_password_hash(password: str) -> bytes:
|
||||
return hashpw(password.encode("utf-8"), gensalt(rounds=13))
|
||||
|
||||
|
||||
def check_password(password: str, hashed: bytes) -> bool:
|
||||
return checkpw(password.encode("utf-8"), hashed)
|
||||
|
||||
|
||||
def gen_recovery_codes() -> List[str]:
|
||||
pwds = genword(length=16, charset="hex", returns=5)
|
||||
return ["-".join([pwd[i : i + 4] for i in range(0, len(pwd), 4)]) for pwd in pwds] # noqa: E203
|
||||
|
||||
|
||||
def get_b64encoded_qr_image(data: str):
|
||||
qr = QRCode(version=1, box_size=10, border=5)
|
||||
qr.add_data(data)
|
||||
|
|
|
|||
Loading…
Reference in a new issue