fix: streamline secret loading by checking for files in LIB_DIR and removing redundant environment variable handling

This commit is contained in:
Théophile Diot 2024-11-26 10:46:20 +01:00
parent 69b59ef68e
commit afa22f627c
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
3 changed files with 152 additions and 62 deletions

View file

@ -1,7 +1,6 @@
from base64 import b64encode
from io import BytesIO
from json import JSONDecodeError, loads as json_loads
from os import getenv
from json import loads as json_loads
from bcrypt import checkpw
from typing import List, Optional
from passlib.totp import TOTP, MalformedTokenError, TokenError, TotpMatch
@ -14,25 +13,10 @@ from app.dependencies import DATA
from app.utils import LIB_DIR, LOGGER, stop
TOTP_SECRETS = getenv("TOTP_SECRETS", "")
if TOTP_SECRETS:
try:
TOTP_SECRETS = json_loads(TOTP_SECRETS)
except JSONDecodeError:
x = 1
tmp_secrets = {}
for secret in TOTP_SECRETS.strip().split(" "):
if secret:
tmp_secrets[x] = secret
x += 1
TOTP_SECRETS = tmp_secrets.copy()
del tmp_secrets
if not TOTP_SECRETS:
if not LIB_DIR.joinpath(".totp_secrets.json").is_file():
LOGGER.error("The TOTP_SECRETS environment variable is missing and the .totp_secrets.json file is missing, exiting ...")
stop(1)
TOTP_SECRETS = json_loads(LIB_DIR.joinpath(".totp_secrets.json").read_text(encoding="utf-8"))
if not LIB_DIR.joinpath(".totp_secrets.json").is_file():
LOGGER.error("The .totp_secrets.json file is missing, exiting ...")
stop(1)
TOTP_SECRETS = json_loads(LIB_DIR.joinpath(".totp_secrets.json").read_text(encoding="utf-8"))
class Totp:

View file

@ -1,10 +1,11 @@
from datetime import datetime
from json import JSONDecodeError, dumps, loads
from os import cpu_count, getenv, getpid, sep
from hashlib import sha256
from json import JSONDecodeError, dump, dumps, loads
from os import cpu_count, getenv, sep
from os.path import join
from pathlib import Path
from random import randint
from secrets import token_urlsafe
from secrets import token_hex
from stat import S_IRUSR, S_IWUSR
from sys import exit, path as sys_path
from time import sleep
@ -16,7 +17,7 @@ from gevent.monkey import patch_all
patch_all()
from passlib import totp
from passlib.totp import generate_secret
from logger import setup_logger # type: ignore
@ -27,6 +28,16 @@ TMP_DIR = Path(sep, "var", "tmp", "bunkerweb")
RUN_DIR = Path(sep, "var", "run", "bunkerweb")
LIB_DIR = Path(sep, "var", "lib", "bunkerweb")
UI_DATA_FILE = TMP_DIR.joinpath("ui_data.json")
HEALTH_FILE = TMP_DIR.joinpath("ui.healthy")
PID_FILE = RUN_DIR.joinpath("ui.pid")
FLASK_SECRET_FILE = LIB_DIR.joinpath(".flask_secret")
FLASK_SECRET_HASH_FILE = FLASK_SECRET_FILE.with_suffix(".hash") # File to store hash of Flask secret
TOTP_SECRETS_FILE = LIB_DIR.joinpath(".totp_secrets.json")
TOTP_HASH_FILE = TOTP_SECRETS_FILE.with_suffix(".hash") # File to store hash of TOTP secrets
MAX_WORKERS = int(getenv("MAX_WORKERS", max((cpu_count() or 1) - 1, 1)))
LOG_LEVEL = getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "info"))
@ -38,6 +49,7 @@ errorlog = join(sep, "var", "log", "bunkerweb", "ui.log")
reuse_port = True
chdir = join(sep, "usr", "share", "bunkerweb", "ui")
umask = 0x027
pidfile = PID_FILE.as_posix()
worker_tmp_dir = join(sep, "dev", "shm")
tmp_upload_dir = join(sep, "var", "tmp", "bunkerweb", "ui")
secure_scheme_headers = {}
@ -71,30 +83,128 @@ def on_starting(server):
LOGGER = setup_logger("UI", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
FLASK_SECRET = getenv("FLASK_SECRET")
if not FLASK_SECRET and not TMP_DIR.joinpath(".flask_secret").is_file():
LOGGER.warning("The FLASK_SECRET environment variable is missing, generating a random one ...")
TMP_DIR.joinpath(".flask_secret").write_text(token_urlsafe(32), encoding="utf-8")
def set_secure_permissions(file_path: Path):
"""Set file permissions to 600 (owner read/write only)."""
file_path.chmod(S_IRUSR | S_IWUSR)
LOGGER.info(f"Permissions set to 600 for {file_path}")
TOTP_SECRETS = getenv("TOTP_SECRETS", "")
# * Handle Flask secret
VALID_FLASK_SECRET_LENGTH = 64
try:
flask_secret = None
# * Step 1: Load Flask secret from file
if FLASK_SECRET_FILE.is_file():
try:
flask_secret = FLASK_SECRET_FILE.read_text(encoding="utf-8").strip()
if not flask_secret:
raise ValueError("Secret file is empty.")
LOGGER.info("Flask secret successfully loaded from the file.")
except (ValueError, Exception) as e:
LOGGER.error(f"Failed to load Flask secret from file: {e}. Falling back to environment variable or generating a new secret.")
# * Step 2: Check environment variable if no valid file
if not flask_secret:
flask_secret_env = getenv("FLASK_SECRET", "").strip()
if flask_secret_env:
if len(flask_secret_env) != VALID_FLASK_SECRET_LENGTH:
LOGGER.warning("Invalid Flask secret length. Ignoring environment variable.")
else:
flask_secret = flask_secret_env
LOGGER.info("Flask secret successfully loaded from the environment variable.")
# * Step 3: Generate new secret if none found
if not flask_secret:
LOGGER.warning("No valid Flask secret found. Generating a new random secret...")
flask_secret = token_hex(VALID_FLASK_SECRET_LENGTH)
LOGGER.info("Generated a new Flask secret.")
# * Step 4: Hash for change detection
current_env_hash = sha256(flask_secret.encode("utf-8")).hexdigest()
previous_env_hash = FLASK_SECRET_HASH_FILE.read_text(encoding="utf-8").strip() if FLASK_SECRET_HASH_FILE.is_file() else None
# * Step 5: Compare hashes and update if necessary
if previous_env_hash and current_env_hash == previous_env_hash:
LOGGER.info("The FLASK_SECRET environment variable has not changed since the last restart.")
else:
LOGGER.warning("The FLASK_SECRET environment variable has changed or is being set for the first time.")
with FLASK_SECRET_FILE.open("w", encoding="utf-8") as file:
file.write(flask_secret)
set_secure_permissions(FLASK_SECRET_FILE)
with FLASK_SECRET_HASH_FILE.open("w", encoding="utf-8") as file:
file.write(current_env_hash)
set_secure_permissions(FLASK_SECRET_HASH_FILE)
LOGGER.info(f"Flask secret securely stored in {FLASK_SECRET_FILE}.")
LOGGER.info(f"Flask secret hash stored in {FLASK_SECRET_HASH_FILE} for change detection.")
except Exception as e:
LOGGER.critical(f"An error occurred while handling the Flask secret: {e}")
exit(1)
# * Handle TOTP secrets
VALID_TOTP_SECRET_LENGTH = 43 # (generated by generate_secret())
invalid_totp_secrets = False
if TOTP_SECRETS:
try:
TOTP_SECRETS = loads(TOTP_SECRETS)
except JSONDecodeError:
x = 1
tmp_secrets = {}
for secret in TOTP_SECRETS.strip().split(" "):
if secret:
tmp_secrets[x] = secret
x += 1
TOTP_SECRETS = tmp_secrets.copy()
del tmp_secrets
invalid_totp_secrets = x == 1
try:
totp_secrets = {}
if not TOTP_SECRETS:
LOGGER.warning("The TOTP_SECRETS environment variable is missing, generating a random one ...")
LIB_DIR.joinpath(".totp_secrets.json").write_text(dumps({k: totp.generate_secret() for k in range(randint(1, 5))}), encoding="utf-8")
# * Step 1: Load TOTP secrets from file
if TOTP_SECRETS_FILE.is_file():
try:
totp_secrets = loads(TOTP_SECRETS_FILE.read_text(encoding="utf-8"))
LOGGER.info("TOTP secrets successfully loaded from the file.")
except JSONDecodeError:
LOGGER.error("Failed to load TOTP secrets from file. Falling back to environment or generating new secrets.")
# * Step 2: Check environment variable if no valid file
if not totp_secrets:
totp_secrets_env = getenv("TOTP_SECRETS", "").strip()
if totp_secrets_env:
try:
parsed_secrets = loads(totp_secrets_env)
if isinstance(parsed_secrets, dict):
totp_secrets = parsed_secrets
elif isinstance(parsed_secrets, list):
totp_secrets = {f"key-{i+1}": secret for i, secret in enumerate(parsed_secrets)}
except JSONDecodeError:
LOGGER.info("TOTP_SECRETS is not valid JSON. Treating as space-separated secrets.")
totp_secrets = {f"key-{i+1}": secret for i, secret in enumerate(totp_secrets_env.split())}
# * Step 3: Validate and clean secrets
for key, secret in list(totp_secrets.items()):
if not isinstance(secret, str) or len(secret) != VALID_TOTP_SECRET_LENGTH:
LOGGER.warning(f"Invalid TOTP secret for key: {key}. Secret will be excluded.")
totp_secrets.pop(key)
# * Step 4: Generate new secrets if none are valid
if not totp_secrets:
LOGGER.warning("No valid TOTP secrets found. Generating new secure secrets...")
totp_secrets = {f"key-{i}": generate_secret() for i in range(1, 6)}
LOGGER.info(f"Generated {len(totp_secrets)} secure TOTP secrets.")
# * Step 5: Hash for change detection
current_env_hash = sha256(dumps(totp_secrets, sort_keys=True).encode("utf-8")).hexdigest()
previous_env_hash = TOTP_HASH_FILE.read_text(encoding="utf-8").strip() if TOTP_HASH_FILE.is_file() else None
# * Step 6: Compare hashes and update if necessary
if previous_env_hash and current_env_hash == previous_env_hash:
LOGGER.info("The TOTP_SECRETS environment variable has not changed since the last restart.")
else:
LOGGER.warning("The TOTP_SECRETS environment variable has changed or is being set for the first time.")
invalid_totp_secrets = True
with TOTP_SECRETS_FILE.open("w", encoding="utf-8") as file:
dump(totp_secrets, file, indent=2)
set_secure_permissions(TOTP_SECRETS_FILE)
with TOTP_HASH_FILE.open("w", encoding="utf-8") as file:
file.write(current_env_hash)
set_secure_permissions(TOTP_HASH_FILE)
LOGGER.info(f"TOTP secrets securely stored in {TOTP_SECRETS_FILE}.")
LOGGER.info(f"TOTP environment hash stored in {TOTP_HASH_FILE} for change detection.")
except Exception as e:
LOGGER.critical(f"An error occurred while handling TOTP secrets: {e}")
exit(1)
DB = UIDatabase(LOGGER)
@ -137,7 +247,7 @@ def on_starting(server):
env_admin_password = getenv("ADMIN_PASSWORD", "")
if ADMIN_USER:
if not getenv("TOTP_SECRETS") or invalid_totp_secrets:
if invalid_totp_secrets:
LOGGER.warning("The TOTP secrets have changed, removing admin TOTP secrets ...")
err = DB.update_ui_user(ADMIN_USER["username"], ADMIN_USER["password"], None, method=ADMIN_USER["method"])
if err:
@ -201,7 +311,7 @@ def on_starting(server):
else:
latest_version = latest_release["tag_name"].removeprefix("v")
TMP_DIR.joinpath("ui_data.json").write_text(
UI_DATA_FILE.write_text(
dumps(
{
"LATEST_VERSION": latest_version,
@ -212,17 +322,15 @@ def on_starting(server):
),
encoding="utf-8",
)
set_secure_permissions(UI_DATA_FILE)
LOGGER.info("UI is ready")
def when_ready(server):
RUN_DIR.joinpath("ui.pid").write_text(str(getpid()), encoding="utf-8")
TMP_DIR.joinpath("ui.healthy").write_text("ok", encoding="utf-8")
HEALTH_FILE.write_text("ok", encoding="utf-8")
def on_exit(server):
RUN_DIR.joinpath("ui.pid").unlink(missing_ok=True)
TMP_DIR.joinpath("ui.healthy").unlink(missing_ok=True)
TMP_DIR.joinpath(".flask_secret").unlink(missing_ok=True)
TMP_DIR.joinpath("ui_data.json").unlink(missing_ok=True)
HEALTH_FILE.unlink(missing_ok=True)
UI_DATA_FILE.unlink(missing_ok=True)

View file

@ -46,7 +46,7 @@ from app.dependencies import BW_CONFIG, DATA, DB
from app.models.models import AnonymousUser
from app.utils import (
COLUMNS_PREFERENCES_DEFAULTS,
TMP_DIR,
LIB_DIR,
LOGGER,
flash,
get_blacklisted_settings,
@ -68,12 +68,10 @@ with app.app_context():
PROXY_NUMBERS = int(getenv("PROXY_NUMBERS", "1"))
app.wsgi_app = ReverseProxied(app.wsgi_app, x_for=PROXY_NUMBERS, x_proto=PROXY_NUMBERS, x_host=PROXY_NUMBERS, x_prefix=PROXY_NUMBERS)
FLASK_SECRET = getenv("FLASK_SECRET")
if not FLASK_SECRET:
if not TMP_DIR.joinpath(".flask_secret").is_file():
LOGGER.error("The FLASK_SECRET environment variable is missing and the .flask_secret file is missing, exiting ...")
stop(1)
FLASK_SECRET = TMP_DIR.joinpath(".flask_secret").read_text(encoding="utf-8").strip()
if not LIB_DIR.joinpath(".flask_secret").is_file():
LOGGER.error("The .flask_secret file is missing, exiting ...")
stop(1)
FLASK_SECRET = LIB_DIR.joinpath(".flask_secret").read_text(encoding="utf-8").strip()
app.config["SECRET_KEY"] = FLASK_SECRET