Merge branch 'dev' of github.com:bunkerity/bunkerweb into dev

This commit is contained in:
fl0ppy-d1sk 2024-03-07 18:37:31 +01:00
commit 866f4569e4
No known key found for this signature in database
GPG key ID: 93EE47CC3D061500
29 changed files with 256 additions and 88 deletions

View file

@ -47,7 +47,7 @@ repos:
hooks:
- id: flake8
name: Flake8 Python Linter
args: ["--max-line-length=160", "--ignore=E266,E402,E722,W503"]
args: ["--max-line-length=160", "--ignore=E266,E402,E501,E722,W503"]
- repo: https://github.com/dosisod/refurb
rev: a9a4edd45687e664dee0905ba1c848bda227d1d6 # frozen: v1.28.0

View file

@ -101,7 +101,11 @@ class DockerController(Controller):
)
def __process_event(self, event):
return "Actor" in event and "Attributes" in event["Actor"] and ("bunkerweb.INSTANCE" in event["Actor"]["Attributes"] or "bunkerweb.SERVER_NAME" in event["Actor"]["Attributes"])
return (
"Actor" in event
and "Attributes" in event["Actor"]
and ("bunkerweb.INSTANCE" in event["Actor"]["Attributes"] or "bunkerweb.SERVER_NAME" in event["Actor"]["Attributes"])
)
def process_events(self):
self._set_autoconf_load_db()

View file

@ -19,7 +19,11 @@ class IngressController(Controller):
self.__networkingv1 = client.NetworkingV1Api()
def _get_controller_instances(self) -> list:
return [pod for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items if (pod.metadata.annotations and "bunkerweb.io/INSTANCE" in pod.metadata.annotations)]
return [
pod
for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items
if (pod.metadata.annotations and "bunkerweb.io/INSTANCE" in pod.metadata.annotations)
]
def _to_instances(self, controller_instance) -> List[dict]:
instance = {}

View file

@ -787,7 +787,7 @@ utils.get_phases = function()
"log_stream",
"log_default",
"timer",
"init_workers"
"init_workers",
}
end

View file

@ -140,7 +140,9 @@ class CLI(ApiCaller):
self.__use_redis = False
if self.__use_redis:
self.__logger.info(f"Connected to redis sentinel cluster, getting master with the following parameters:\n{sentinel_master=}\n{redis_db=}\n{username=}\n{password=}")
self.__logger.info(
f"Connected to redis sentinel cluster, getting master with the following parameters:\n{sentinel_master=}\n{redis_db=}\n{username=}\n{password=}"
)
self.__redis = sentinel.master_for(
sentinel_master,
db=redis_db,
@ -148,7 +150,9 @@ class CLI(ApiCaller):
password=password,
)
else:
self.__logger.info(f"Connecting to redis with the following parameters:\n{redis_host=}\n{redis_port=}\n{redis_db=}\n{username=}\n{password=}\n{redis_timeout=}\nmax_connections={redis_keepalive_pool}\n{redis_ssl=}")
self.__logger.info(
f"Connecting to redis with the following parameters:\n{redis_host=}\n{redis_port=}\n{redis_db=}\n{username=}\n{password=}\n{redis_timeout=}\nmax_connections={redis_keepalive_pool}\n{redis_ssl=}"
)
self.__redis = StrictRedis(
host=redis_host,
port=redis_port,

View file

@ -124,7 +124,9 @@ try:
else:
logger.info(f"No change for certificate {cert_path}")
elif not cert_path or not key_path:
logger.warning("Both variables CUSTOM_SSL_CERT and CUSTOM_SSL_KEY (or CUSTOM_SSL_CERT_DATA and CUSTOM_SSL_KEY_DATA) have to be set to use custom certificates, clearing cache ...")
logger.warning(
"Both variables CUSTOM_SSL_CERT and CUSTOM_SSL_KEY (or CUSTOM_SSL_CERT_DATA and CUSTOM_SSL_KEY_DATA) have to be set to use custom certificates, clearing cache ..."
)
cert_cache_path = Path(
sep,
"var",
@ -188,7 +190,9 @@ try:
f"No change for certificate {cert_path}",
)
elif not cert_path or not key_path:
logger.warning("Both variables CUSTOM_SSL_CERT and CUSTOM_SSL_KEY (or CUSTOM_SSL_CERT_DATA and CUSTOM_SSL_KEY_DATA) have to be set to use custom certificates, clearing cache ...")
logger.warning(
"Both variables CUSTOM_SSL_CERT and CUSTOM_SSL_KEY (or CUSTOM_SSL_CERT_DATA and CUSTOM_SSL_KEY_DATA) have to be set to use custom certificates, clearing cache ..."
)
cert_cache_path = Path(
sep,
"var",

View file

@ -127,7 +127,16 @@ try:
domains_sever_names = {server_names[0]: all_domains}
proc = run(
[CERTBOT_BIN, "certificates", "--config-dir", LETS_ENCRYPT_PATH.joinpath("etc").as_posix(), "--work-dir", LETS_ENCRYPT_WORK_DIR, "--logs-dir", LETS_ENCRYPT_LOGS_DIR],
[
CERTBOT_BIN,
"certificates",
"--config-dir",
LETS_ENCRYPT_PATH.joinpath("etc").as_posix(),
"--work-dir",
LETS_ENCRYPT_WORK_DIR,
"--logs-dir",
LETS_ENCRYPT_LOGS_DIR,
],
stdin=DEVNULL,
stdout=PIPE,
stderr=STDOUT,

View file

@ -81,10 +81,14 @@ def install_plugin(plugin_dir: str, db, preview: bool = True) -> bool:
break
if old_version == metadata["version"]:
logger.warning(f"Skipping installation of {'preview version of ' if preview else ''}Pro plugin {metadata['id']} (version {metadata['version']} already installed)")
logger.warning(
f"Skipping installation of {'preview version of ' if preview else ''}Pro plugin {metadata['id']} (version {metadata['version']} already installed)"
)
return False
logger.warning(f"{'Preview version of ' if preview else ''}Pro plugin {metadata['id']} is already installed but version {metadata['version']} is different from database ({old_version}), updating it...")
logger.warning(
f"{'Preview version of ' if preview else ''}Pro plugin {metadata['id']} is already installed but version {metadata['version']} is different from database ({old_version}), updating it..."
)
rmtree(PRO_PLUGINS_DIR.joinpath(metadata["id"]), ignore_errors=True)
# Copy the plugin
@ -180,9 +184,13 @@ try:
if not metadata["is_pro"]:
if metadata["pro_overlapped"]:
message = f"You have exceeded the number of services allowed by your BunkerWeb Pro license: {metadata['pro_services']} (current: {data['service_number']}"
message = (
f"You have exceeded the number of services allowed by your BunkerWeb Pro license: {metadata['pro_services']} (current: {data['service_number']}"
)
elif pro_license_key:
message = "Your BunkerWeb Pro license " + (STATUS_MESSAGES.get(metadata["pro_status"], "is not valid or has expired") if not error else "is not valid or has expired")
message = "Your BunkerWeb Pro license " + (
STATUS_MESSAGES.get(metadata["pro_status"], "is not valid or has expired") if not error else "is not valid or has expired"
)
else:
logger.info("If you wish to purchase a BunkerWeb Pro license, please visit https://panel.bunkerweb.io/")
message = "No BunkerWeb Pro license key provided"

View file

@ -62,7 +62,9 @@ def generate_cert(first_server: str, days: str, subj: str, self_signed_path: Pat
if sorted(attribute.rfc4514_string() for attribute in certificate.subject) != sorted(v for v in subj.split("/") if v):
logger.warning(f"Subject of self-signed certificate for {first_server} is different from the one in the configuration, regenerating ...")
elif certificate.not_valid_after - certificate.not_valid_before != timedelta(days=int(days)):
logger.warning(f"Expiration date of self-signed certificate for {first_server} is different from the one in the configuration, regenerating ...")
logger.warning(
f"Expiration date of self-signed certificate for {first_server} is different from the one in the configuration, regenerating ..."
)
else:
return True, 0

View file

@ -281,7 +281,10 @@ class Configurator:
return (False, f"missing keys for setting {setting} in plugin {plugin['id']}, must have context, default, help, id, label, regex and type")
if not self.__setting_id_rx.match(setting):
return (False, f"Invalid setting name for setting {setting} in plugin {plugin['id']} (Can only contain capital letters and underscores (min 1 characters and max 256))")
return (
False,
f"Invalid setting name for setting {setting} in plugin {plugin['id']} (Can only contain capital letters and underscores (min 1 characters and max 256))",
)
elif data["context"] not in ("global", "multisite"):
return (False, f"Invalid context for setting {setting} in plugin {plugin['id']} (Must be global or multisite)")
elif len(data["default"]) > 4096:
@ -297,7 +300,10 @@ class Configurator:
if "multiple" in data:
if not self.__name_rx.match(data["multiple"]):
return (False, f"Invalid multiple for setting {setting} in plugin {plugin['id']} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128))")
return (
False,
f"Invalid multiple for setting {setting} in plugin {plugin['id']} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128))",
)
for select in data.get("select", []):
if len(select) > 256:
@ -310,7 +316,10 @@ class Configurator:
if not self.__name_rx.match(job["name"]):
return (False, f"Invalid name for job {job['name']} in plugin {plugin['id']}")
elif not self.__job_file_rx.match(job["file"]):
return (False, f"Invalid file for job {job['name']} in plugin {plugin['id']} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256))")
return (
False,
f"Invalid file for job {job['name']} in plugin {plugin['id']} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256))",
)
elif job["every"] not in ("once", "minute", "hour", "day", "week"):
return (False, f"Invalid every for job {job['name']} in plugin {plugin['id']} (Must be once, minute, hour, day or week)")
elif job["reload"] is not True and job["reload"] is not False:

View file

@ -29,7 +29,9 @@ if __name__ == "__main__":
# Parse arguments
parser = ArgumentParser(description="BunkerWeb config generator")
parser.add_argument("--settings", default=join(sep, "usr", "share", "bunkerweb", "settings.json"), type=str, help="file containing the main settings")
parser.add_argument("--templates", default=join(sep, "usr", "share", "bunkerweb", "confs"), type=str, help="directory containing the main template files")
parser.add_argument(
"--templates", default=join(sep, "usr", "share", "bunkerweb", "confs"), type=str, help="directory containing the main template files"
)
parser.add_argument("--core", default=join(sep, "usr", "share", "bunkerweb", "core"), type=str, help="directory containing the core plugins")
parser.add_argument("--plugins", default=join(sep, "etc", "bunkerweb", "plugins"), type=str, help="directory containing the external plugins")
parser.add_argument("--pro-plugins", default=join(sep, "etc", "bunkerweb", "pro", "plugins"), type=str, help="directory containing the pro plugins")
@ -106,7 +108,9 @@ if __name__ == "__main__":
# Compute the config
logger.info("Computing config ...")
config: Dict[str, Any] = Configurator(str(settings_path), str(core_path), str(plugins_path), str(pro_plugins_path), str(variables_path), logger).get_config()
config: Dict[str, Any] = Configurator(
str(settings_path), str(core_path), str(plugins_path), str(pro_plugins_path), str(variables_path), logger
).get_config()
else:
if join(sep, "usr", "share", "bunkerweb", "db") not in sys_path:
sys_path.append(join(sep, "usr", "share", "bunkerweb", "db"))

View file

@ -45,7 +45,9 @@ def get_instance_configs_and_apis(instance: Any, db, _type="Docker"):
),
}
)
logger.info(f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}")
logger.info(
f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}"
)
else:
tmp_config[split[0]] = split[1]
@ -175,7 +177,9 @@ if __name__ == "__main__":
),
}
)
logger.info(f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}")
logger.info(
f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}"
)
db = Database(logger, config_files.get("DATABASE_URI", None), pool=False)
else:
@ -206,7 +210,9 @@ if __name__ == "__main__":
),
}
)
logger.info(f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}")
logger.info(
f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}"
)
else:
tmp_config[split[0]] = split[1]

View file

@ -16,7 +16,9 @@ class ConfigCaller:
def __init__(self):
self.__logger = setup_logger("Config", "INFO")
self._settings = loads(Path(sep, "usr", "share", "bunkerweb", "settings.json").read_text(encoding="utf-8"))
for plugin in glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json")) + glob(join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json")):
for plugin in glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json")) + glob(
join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json")
):
try:
self._settings.update(loads(Path(plugin).read_text(encoding="utf-8"))["settings"])
except KeyError:
@ -36,7 +38,9 @@ class ConfigCaller:
return self._settings[setting]["context"] == context
elif match(r"^.+_\d+$", setting):
multiple_setting = "_".join(setting.split("_")[:-1])
return self._is_setting(multiple_setting) and self._settings[multiple_setting]["context"] == context and "multiple" in self._settings[multiple_setting]
return (
self._is_setting(multiple_setting) and self._settings[multiple_setting]["context"] == context and "multiple" in self._settings[multiple_setting]
)
return False
def _full_env(self, env_instances: Dict[str, Any], env_services: Dict[str, Any]) -> Dict[str, Any]:

View file

@ -89,7 +89,9 @@ class JobScheduler(ApiCaller):
def __get_jobs(self):
jobs = {}
for plugin_file in (
glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json")) + glob(join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json")) + glob(join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"))
glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json"))
+ glob(join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json"))
+ glob(join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"))
): # core plugins # external plugins # pro plugins
plugin_name = basename(dirname(plugin_file))
jobs[plugin_name] = []
@ -102,20 +104,28 @@ class JobScheduler(ApiCaller):
for x, job in enumerate(deepcopy(plugin_jobs)):
if not all(key in job.keys() for key in ("name", "file", "every", "reload")):
self.__logger.warning(f"missing keys for job {job['name']} in plugin {plugin_name}, must have name, file, every and reload, ignoring job")
self.__logger.warning(
f"missing keys for job {job['name']} in plugin {plugin_name}, must have name, file, every and reload, ignoring job"
)
plugin_jobs.pop(x)
continue
if not match(r"^[\w.-]{1,128}$", job["name"]):
self.__logger.warning(f"Invalid name for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128)), ignoring job")
self.__logger.warning(
f"Invalid name for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128)), ignoring job"
)
plugin_jobs.pop(x)
continue
elif not match(r"^[\w./-]{1,256}$", job["file"]):
self.__logger.warning(f"Invalid file for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256)), ignoring job")
self.__logger.warning(
f"Invalid file for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256)), ignoring job"
)
plugin_jobs.pop(x)
continue
elif job["every"] not in ("once", "minute", "hour", "day", "week"):
self.__logger.warning(f"Invalid every for job {job['name']} in plugin {plugin_name} (Must be once, minute, hour, day or week), ignoring job")
self.__logger.warning(
f"Invalid every for job {job['name']} in plugin {plugin_name} (Must be once, minute, hour, day or week), ignoring job"
)
plugin_jobs.pop(x)
continue
elif job["reload"] is not True and job["reload"] is not False:
@ -152,7 +162,9 @@ class JobScheduler(ApiCaller):
if reload:
self.__logger.info("Successfully reloaded nginx")
else:
self.__logger.error(f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stderr.decode() if proc.stderr else 'Missing stderr'}")
self.__logger.error(
f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stderr.decode() if proc.stderr else 'Missing stderr'}"
)
else:
self.__logger.info("Reloading nginx ...")
reload = self.send_to_apis("POST", "/reload")

View file

@ -217,7 +217,13 @@ if __name__ == "__main__":
sleep(5)
env = db.get_config()
elif not tmp_variables_path.exists() or not nginx_variables_path.exists() or (tmp_variables_path.read_text(encoding="utf-8") != nginx_variables_path.read_text(encoding="utf-8")) or db.is_initialized() and db.get_config() != dotenv_env:
elif (
not tmp_variables_path.exists()
or not nginx_variables_path.exists()
or (tmp_variables_path.read_text(encoding="utf-8") != nginx_variables_path.read_text(encoding="utf-8"))
or db.is_initialized()
and db.get_config() != dotenv_env
):
# run the config saver
proc = subprocess_run(
[
@ -494,7 +500,9 @@ if __name__ == "__main__":
if proc.returncode == 0:
logger.info("Successfully sent reload signal to nginx")
else:
logger.error(f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stdout.decode('utf-8') if proc.stdout else 'no output'}")
logger.error(
f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stdout.decode('utf-8') if proc.stdout else 'no output'}"
)
else:
logger.warning("No BunkerWeb instance found, skipping nginx reload ...")
except:

View file

@ -167,7 +167,9 @@ elif getenv("ADMIN_USERNAME") and getenv("ADMIN_PASSWORD"):
app.logger.error("The admin username is too long. It must be less than 256 characters.")
stop(1)
elif not USER_PASSWORD_RX.match(getenv("ADMIN_PASSWORD", "changeme")):
app.logger.error("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).")
app.logger.error(
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-)."
)
stop(1)
user_name = getenv("ADMIN_USERNAME", "admin")
@ -231,7 +233,9 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
if listdir(service_custom_conf):
move(service_custom_conf, service_custom_conf.replace(f"{sep}{args[1].split(' ')[0]}", f"{sep}{args[2].split(' ')[0]}"))
moved = True
operation, error = app.config["CONFIG"].edit_service(args[1], args[0], check_changes=(was_draft != is_draft or not is_draft) and not moved, is_draft=is_draft)
operation, error = app.config["CONFIG"].edit_service(
args[1], args[0], check_changes=(was_draft != is_draft or not is_draft) and not moved, is_draft=is_draft
)
elif operation == "delete":
for service_custom_conf in glob(join(sep, "etc", "bunkerweb", "configs", "*", args[2].split(" ")[0])):
if listdir(service_custom_conf):
@ -255,7 +259,9 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
ret = db.checked_changes(changes, value=True)
if ret:
app.logger.error(f"Couldn't set the changes to checked in the database: {ret}")
app.config["TO_FLASH"].append({"content": f"An error occurred when setting the changes to checked in the database : {ret}", "type": "error"})
app.config["TO_FLASH"].append(
{"content": f"An error occurred when setting the changes to checked in the database : {ret}", "type": "error"}
)
if method == "global_config":
operation = app.config["CONFIG"].edit_global_conf(args[0])
elif method == "plugins":
@ -393,11 +399,22 @@ def before_request():
passed = True
# Go back from totp to login
if not session.get("totp_validated", False) and current_user.is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")) and request.path.endswith("/login"):
if (
not session.get("totp_validated", False)
and current_user.is_two_factor_enabled
and "/totp" not in request.path
and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts"))
and request.path.endswith("/login")
):
return redirect(url_for("login", next=request.path))
# Case not login page, keep on 2FA before any other access
if not session.get("totp_validated", False) and current_user.is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")):
if (
not session.get("totp_validated", False)
and current_user.is_two_factor_enabled
and "/totp" not in request.path
and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts"))
):
return redirect(url_for("totp", next=request.form.get("next")))
elif session.get("ip") != request.remote_addr:
passed = False
@ -448,7 +465,9 @@ def setup():
is_request_form("setup")
if not any(key in request.form for key in ("admin_username", "admin_password", "admin_password_check", "server_name", "ui_host", "ui_url")):
return redirect_flash_error("Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "setup")
return redirect_flash_error(
"Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "setup"
)
if len(request.form["admin_username"]) > 256:
return redirect_flash_error("The admin username is too long. It must be less than 256 characters.", "setup")
@ -457,7 +476,10 @@ def setup():
return redirect_flash_error("The passwords do not match.", "setup")
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
return redirect_flash_error("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).", "setup")
return redirect_flash_error(
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).",
"setup",
)
server_names = db_config["SERVER_NAME"].split(" ")
if request.form["server_name"] in server_names:
@ -631,7 +653,8 @@ def account():
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
return redirect_flash_error(
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)", "account"
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)",
"account",
)
password = request.form["admin_password"]
@ -652,12 +675,18 @@ def account():
app.config["CURRENT_TOTP_TOKEN"] = None
user = User(username, password, is_two_factor_enabled=is_two_factor_enabled, secret_token=secret_token, method=current_user.method)
ret = db.update_ui_user(username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
ret = db.update_ui_user(
username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui"
)
if ret:
return redirect_flash_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
flash(
f"The {request.form['operation']} has been successfully updated." if request.form["operation"] != "totp" else f"The two-factor authentication was successfully {'disabled' if current_user.is_two_factor_enabled else 'enabled'}.",
(
f"The {request.form['operation']} has been successfully updated."
if request.form["operation"] != "totp"
else f"The two-factor authentication was successfully {'disabled' if current_user.is_two_factor_enabled else 'enabled'}."
),
)
return redirect(url_for("account" if request.form["operation"] == "totp" else "login"))
@ -763,10 +792,20 @@ def services():
elif value == "off":
value = "no"
if variable in variables and variable != "SERVER_NAME" and value == config.get(f"{server_name}_{variable}" if request.form["operation"] == "edit" else variable, None):
if (
variable in variables
and variable != "SERVER_NAME"
and value == config.get(f"{server_name}_{variable}" if request.form["operation"] == "edit" else variable, None)
):
del variables[variable]
if was_draft == is_draft and request.form["operation"] == "edit" and len(variables) == 1 and "SERVER_NAME" in variables and variables["SERVER_NAME"] == request.form.get("OLD_SERVER_NAME", ""):
if (
was_draft == is_draft
and request.form["operation"] == "edit"
and len(variables) == 1
and "SERVER_NAME" in variables
and variables["SERVER_NAME"] == request.form.get("OLD_SERVER_NAME", "")
):
return redirect_flash_error("The service was not edited because no values were changed.", "services", True)
elif request.form["operation"] == "new" and not variables:
@ -1298,7 +1337,7 @@ def upload_plugin():
def custom_plugin(plugin: str):
message = ""
if not plugin_id_rx.match(plugin):
return error_message(f"Invalid plugin id, (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)"), 400
return error_message("Invalid plugin id, (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)"), 400
# Case we ware looking for a plugin template
# We need to check if a page exists, and if it does, we need to check if the plugin is activated and metrics are on
@ -1308,7 +1347,7 @@ def custom_plugin(plugin: str):
page = db.get_plugin_template(plugin)
if not page:
return error_message(f"The plugin does not have a template"), 404
return error_message("The plugin does not have a template"), 404
# Case template, prepare data
plugins = app.config["CONFIG"].get_plugins()
@ -1327,7 +1366,7 @@ def custom_plugin(plugin: str):
# Case no plugin found
if plugin_id is None:
return error_message(f"Plugin not found"), 404
return error_message("Plugin not found"), 404
config = app.config["CONFIG"].get_config(methods=False)
@ -1409,7 +1448,7 @@ def custom_plugin(plugin: str):
module = db.get_plugin_actions(plugin)
if module is None:
return error_message(f"The actions.py file for the plugin does not exist"), 404
return error_message("The actions.py file for the plugin does not exist"), 404
try:
# Try to import the custom plugin
@ -1420,7 +1459,7 @@ def custom_plugin(plugin: str):
loader = SourceFileLoader("actions", temp.name)
actions = loader.load_module()
except:
return error_message(f"An error occurred while importing the plugin, see logs for more details"), 500
return error_message("An error occurred while importing the plugin, see logs for more details"), 500
res = None
@ -1435,9 +1474,9 @@ def custom_plugin(plugin: str):
res = method(app=app, args=queries, data=data)
except AttributeError:
message = f"The plugin does not have a method, see logs for more details"
message = "The plugin does not have a method, see logs for more details"
except:
message = f"An error occurred while executing the plugin, see logs for more details"
message = "An error occurred while executing the plugin, see logs for more details"
finally:
if sbin_nginx_path.is_file():
# Remove the custom plugin from the shared library
@ -1533,7 +1572,9 @@ def logs_linux():
if nginx_access_file.is_file():
with open(nginx_access_file, encoding="utf-8") as f:
for line in f.readlines()[int(last_update.split(".")[1]) if last_update else 0 :]: # noqa: E203
logs_access.append(f"{datetime.strptime(line[line.find('[') + 1: line.find(']')], '%d/%b/%Y:%H:%M:%S %z').replace(tzinfo=timezone.utc).timestamp()} {line}")
logs_access.append(
f"{datetime.strptime(line[line.find('[') + 1: line.find(']')], '%d/%b/%Y:%H:%M:%S %z').replace(tzinfo=timezone.utc).timestamp()} {line}"
)
raw_logs = logs_error + logs_access
@ -1584,7 +1625,11 @@ def logs_linux():
return jsonify(
{
"logs": logs,
"last_update": f"{count_error_logs + int(last_update.split('.')[0])}.{len(logs_access) + int(last_update.split('.')[1])}" if last_update else f"{count_error_logs}.{len(logs_access)}",
"last_update": (
f"{count_error_logs + int(last_update.split('.')[0])}.{len(logs_access) + int(last_update.split('.')[1])}"
if last_update
else f"{count_error_logs}.{len(logs_access)}"
),
}
)

View file

@ -34,7 +34,9 @@ class Instance:
self.name = name
self.hostname = hostname
self._type = _type
self.health = status == "up" and ((data.attrs["State"]["Health"]["Status"] == "healthy" if "Health" in data.attrs["State"] else False) if _type == "container" and data else True)
self.health = status == "up" and (
(data.attrs["State"]["Health"]["Status"] == "healthy" if "Health" in data.attrs["State"] else False) if _type == "container" and data else True
)
self.env = data
self.apiCaller = apiCaller or ApiCaller()
@ -396,7 +398,10 @@ class Instances:
if not resp:
continue
if not isinstance(instance_metrics.get(instance_name, {"msg": None}).get("msg"), dict) or instance_metrics[instance_name].get("status", "error") != "success":
if (
not isinstance(instance_metrics.get(instance_name, {"msg": None}).get("msg"), dict)
or instance_metrics[instance_name].get("status", "error") != "success"
):
continue
metric_data = instance_metrics[instance_name]["msg"]

View file

@ -117,7 +117,9 @@ def path_to_dict(
}
if conf["service_id"]:
d["children"][config_types.index(type_lower)]["children"][[x["name"] for x in d["children"][config_types.index(type_lower)]["children"]].index(conf["service_id"])]["children"].append(file_info)
d["children"][config_types.index(type_lower)]["children"][
[x["name"] for x in d["children"][config_types.index(type_lower)]["children"]].index(conf["service_id"])
]["children"].append(file_info)
else:
d["children"][config_types.index(type_lower)]["children"].append(file_info)
else:

View file

@ -324,7 +324,12 @@ if distro == "ubuntu":
print("❌ /usr/bin/bwcli found.")
# Checking Removing test
try:
if pathlib.Path("/usr/share/bunkerweb").is_dir() or pathlib.Path("/var/tmp/bunkerweb").is_dir() or pathlib.Path("/var/cache/bunkerweb").is_dir() or pathlib.Path("/usr/bin/bwcli").is_file():
if (
pathlib.Path("/usr/share/bunkerweb").is_dir()
or pathlib.Path("/var/tmp/bunkerweb").is_dir()
or pathlib.Path("/var/cache/bunkerweb").is_dir()
or pathlib.Path("/usr/bin/bwcli").is_file()
):
test_results["Removing test"] = "KO"
else:
test_results["Removing test"] = "OK"
@ -839,7 +844,12 @@ elif distro == "debian":
print("❌ /usr/bin/bwcli found.")
# Checking Removing test
try:
if pathlib.Path("/usr/share/bunkerweb").is_dir() or pathlib.Path("/var/tmp/bunkerweb").is_dir() or pathlib.Path("/var/cache/bunkerweb").is_dir() or pathlib.Path("/usr/bin/bwcli").is_file():
if (
pathlib.Path("/usr/share/bunkerweb").is_dir()
or pathlib.Path("/var/tmp/bunkerweb").is_dir()
or pathlib.Path("/var/cache/bunkerweb").is_dir()
or pathlib.Path("/usr/bin/bwcli").is_file()
):
test_results["Removing test"] = "KO"
else:
test_results["Removing test"] = "OK"

View file

@ -23,7 +23,7 @@
- name: Add IP address of all hosts to all hosts
lineinfile:
dest: /etc/hosts
regexp: '.*{{ item }}$'
regexp: ".*{{ item }}$"
line: "{{ hostvars[item].local_ip }} {{item}}"
state: present
when: hostvars[item].local_ip is defined

View file

@ -39,7 +39,8 @@ try:
status_code = get(
"http://www.example.com",
headers={"Host": "www.example.com"} | ({"X-Forwarded-For": "2.0.0.3" if country == "GB" else "8.0.0.3"} if getenv("TEST_TYPE", "docker") == "linux" else {}),
headers={"Host": "www.example.com"}
| ({"X-Forwarded-For": "2.0.0.3" if country == "GB" else "8.0.0.3"} if getenv("TEST_TYPE", "docker") == "linux" else {}),
).status_code
if status_code == 403:

View file

@ -337,7 +337,12 @@ try:
)
exit(1)
if plugin.name != current_plugin[plugin.id]["name"] or plugin.description != current_plugin[plugin.id]["description"] or plugin.version != current_plugin[plugin.id]["version"] or plugin.stream != current_plugin[plugin.id]["stream"]:
if (
plugin.name != current_plugin[plugin.id]["name"]
or plugin.description != current_plugin[plugin.id]["description"]
or plugin.version != current_plugin[plugin.id]["version"]
or plugin.stream != current_plugin[plugin.id]["stream"]
):
print(
f"❌ The {'external' if plugin.type == 'external' else 'core'} plugin {plugin.name} (id: {plugin.id}) is in the database but is not correct, exiting ...\n"
+ f"{dumps({'name': plugin.name, 'description': plugin.description, 'version': plugin.version, 'stream': plugin.stream})}"
@ -472,7 +477,11 @@ try:
)
exit(1)
path_ui = Path(join("bunkerweb", "core", plugin_page.plugin_id, "ui")) if Path(join("bunkerweb", "core", plugin_page.plugin_id, "ui")).exists() else Path(join("external", plugin_page.plugin_id, "ui"))
path_ui = (
Path(join("bunkerweb", "core", plugin_page.plugin_id, "ui"))
if Path(join("bunkerweb", "core", plugin_page.plugin_id, "ui")).exists()
else Path(join("external", plugin_page.plugin_id, "ui"))
)
if not path_ui.exists():
print(
@ -571,7 +580,10 @@ try:
flush=True,
)
exit(1)
elif custom_config.data.replace(b"# CREATED BY ENV\n", b"") != current_custom_configs[custom_config.name]["value"] and custom_config.data.replace(b"# CREATED BY ENV\n", b"") != current_custom_configs[custom_config.name]["value"] + b"\n":
elif (
custom_config.data.replace(b"# CREATED BY ENV\n", b"") != current_custom_configs[custom_config.name]["value"]
and custom_config.data.replace(b"# CREATED BY ENV\n", b"") != current_custom_configs[custom_config.name]["value"] + b"\n"
):
print(
f"❌ The custom config {custom_config.name} is in the database but the value differ, exiting ...\n{custom_config.data} (database) != {current_custom_configs[custom_config.name]['value']} (env)",
flush=True,

View file

@ -41,7 +41,9 @@ try:
retries = 0
while not passed and retries < 10:
status_code = get("http://www.example.com", headers={"Host": "www.example.com"} | ({"X-Forwarded-For": getenv("IP_ADDRESS", "")} if TEST_TYPE == "linux" else {})).status_code
status_code = get(
"http://www.example.com", headers={"Host": "www.example.com"} | ({"X-Forwarded-For": getenv("IP_ADDRESS", "")} if TEST_TYPE == "linux" else {})
).status_code
if status_code == 403:
if not use_dnsbl:

View file

@ -37,7 +37,7 @@ try:
log_info("username 'admin' is correctly set by default, trying username update ...")
DRIVER.execute_script(f"return arguments[0].value = 'admin2'", username_input)
DRIVER.execute_script("return arguments[0].value = 'admin2'", username_input)
password_input = safe_get_element(DRIVER, By.ID, "curr_password")
assert isinstance(password_input, WebElement), "The password input is not an instance of WebElement"

View file

@ -1,5 +1,4 @@
from logging import info as log_info, exception as log_exception, error as log_error, warning as log_warning
from time import sleep
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
@ -25,7 +24,7 @@ try:
is_no_match_hidden = DRIVER.execute_script('return document.querySelector("[data-global-config-nomatch]").classList.contains("hidden")')
if is_no_match_hidden:
log_error(f"Filter keyword shouldn't match something.")
log_error("Filter keyword shouldn't match something.")
exit(1)
# Reset
@ -37,16 +36,16 @@ try:
input_keyword.send_keys("http port")
# Check that the matching element is shown and other card hide
is_http_port_hidden = DRIVER.execute_script(f"""return document.querySelector('#form-edit-global-config-http-port').classList.contains('hidden')""")
is_http_port_hidden = DRIVER.execute_script("return document.querySelector('#form-edit-global-config-http-port').classList.contains('hidden')")
if is_http_port_hidden:
log_error(f"hidden http port should be match.")
log_error("hidden http port should be match.")
exit(1)
is_https_port_hidden = DRIVER.execute_script(f"""return document.querySelector('#form-edit-global-config-https-port').classList.contains('hidden')""")
is_https_port_hidden = DRIVER.execute_script("return document.querySelector('#form-edit-global-config-https-port').classList.contains('hidden')")
if not is_https_port_hidden:
log_error(f"Setting https port should not be match.")
log_error("Setting https port should not be match.")
exit(1)
# Reset

View file

@ -59,7 +59,9 @@ try:
]
for item in select_filters:
DRIVER.execute_script(f"""return document.querySelector('[data-plugins-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()""")
DRIVER.execute_script(
f"""return document.querySelector('[data-plugins-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()"""
)
log_info("The filter is working, trying to add a bad plugin ...")

View file

@ -48,7 +48,9 @@ try:
]
for item in select_filters:
DRIVER.execute_script(f"""return document.querySelector('[data-reports-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()""")
DRIVER.execute_script(
f"""return document.querySelector('[data-reports-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()"""
)
filter_input = safe_get_element(DRIVER, By.ID, "keyword")
assert isinstance(filter_input, WebElement), "Keyword filter input is not a WebElement"

View file

@ -64,16 +64,16 @@ try:
log_info("Check only one plugin is visible ...")
is_general_plugin_hidden = DRIVER.execute_script(f"""return document.querySelector('[data-plugin-item="general"]').classList.contains('hidden')""")
is_general_plugin_hidden = DRIVER.execute_script("""return document.querySelector('[data-plugin-item="general"]').classList.contains('hidden')""")
if is_general_plugin_hidden:
log_error(f"Plugin general should be visible.")
log_error("Plugin general should be visible.")
exit(1)
is_antibot_plugin_hidden = DRIVER.execute_script(f"""return document.querySelector('[data-plugin-item="antibot"]').classList.contains('hidden')""")
is_antibot_plugin_hidden = DRIVER.execute_script("""return document.querySelector('[data-plugin-item="antibot"]').classList.contains('hidden')""")
if not is_antibot_plugin_hidden:
log_error(f"Plugin antibot should not be visible.")
log_error("Plugin antibot should not be visible.")
exit(1)
log_info("Only one plugin visible checked, trying keyword no match ...")
@ -85,7 +85,7 @@ try:
# Check that the no matching element is shown and other card hide
is_no_match = DRIVER.execute_script('return document.querySelector("[data-services-nomatch]").classList.contains("hidden")')
if is_no_match:
log_error(f"Filter keyword shouldn't match something.")
log_error("Filter keyword shouldn't match something.")
exit(1)
# Reset
@ -97,16 +97,16 @@ try:
input_keyword.send_keys("server type")
# Check that the matching element is shown and other card hide
is_server_type_hidden = DRIVER.execute_script(f"""return document.querySelector('#form-edit-services-server-type').classList.contains('hidden')""")
is_server_type_hidden = DRIVER.execute_script("return document.querySelector('#form-edit-services-server-type').classList.contains('hidden')")
if is_server_type_hidden:
log_error(f"Setting server type should be match.")
log_error("Setting server type should be match.")
exit(1)
is_server_name_hidden = DRIVER.execute_script(f"""return document.querySelector('#form-edit-services-server-name').classList.contains('hidden')""")
is_server_name_hidden = DRIVER.execute_script("return document.querySelector('#form-edit-services-server-name').classList.contains('hidden')")
if not is_server_name_hidden:
log_error(f"Setting server name should not be match.")
log_error("Setting server name should not be match.")
exit(1)
# Reset
@ -405,7 +405,7 @@ try:
# Check that the no matching element is shown and other card hide
is_no_match = DRIVER.execute_script('return document.querySelector("[data-services-nomatch-card]").classList.contains("hidden")')
if is_no_match:
log_error(f"Filter keyword shouldn't match something.")
log_error("Filter keyword shouldn't match something.")
exit(1)
# Reset
@ -421,12 +421,16 @@ try:
]
for item in select_filters:
DRIVER.execute_script(f"""return document.querySelector('[data-services-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()""")
DRIVER.execute_script(
f"""return document.querySelector('[data-services-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()"""
)
log_info("Filters working as expected, trying to delete app3.example.com ...")
try:
delete_card_button = safe_get_element(DRIVER, By.XPATH, "//button[@data-services-action='delete' and @data-services-name='app3.example.com']", error=True)
delete_card_button = safe_get_element(
DRIVER, By.XPATH, "//button[@data-services-action='delete' and @data-services-name='app3.example.com']", error=True
)
assert isinstance(delete_card_button, WebElement), "Delete button is not a WebElement"
assert_button_click(DRIVER, delete_card_button)

View file

@ -10,7 +10,9 @@ from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import ElementClickInterceptedException, TimeoutException, WebDriverException
def safe_get_element(driver, by: str, selector: str, *, driver_wait: Optional[WebDriverWait] = None, multiple: bool = False, error: bool = False) -> Union[WebElement, List[WebElement]]:
def safe_get_element(
driver, by: str, selector: str, *, driver_wait: Optional[WebDriverWait] = None, multiple: bool = False, error: bool = False
) -> Union[WebElement, List[WebElement]]:
try:
# Retrieve by js script
if by == "js":
@ -35,7 +37,9 @@ def safe_get_element(driver, by: str, selector: str, *, driver_wait: Optional[We
return el
# Retrieve with XPATH
return (driver_wait or WebDriverWait(driver, 4)).until(EC.presence_of_element_located((by, selector)) if not multiple else EC.presence_of_all_elements_located((by, selector)))
return (driver_wait or WebDriverWait(driver, 4)).until(
EC.presence_of_element_located((by, selector)) if not multiple else EC.presence_of_all_elements_located((by, selector))
)
except TimeoutException as e:
if error:
@ -115,7 +119,9 @@ def access_page(driver, button: Union[bool, str, WebElement], name: str, message
if not isinstance(button, bool) and not clicked:
clicked = assert_button_click(driver, button)
title: Union[WebElement, List[WebElement]] = safe_get_element(driver, By.XPATH, "/html/body/div[3]/header/div/nav/h6", driver_wait=WebDriverWait(driver, 45))
title: Union[WebElement, List[WebElement]] = safe_get_element(
driver, By.XPATH, "/html/body/div[3]/header/div/nav/h6", driver_wait=WebDriverWait(driver, 45)
)
assert isinstance(title, WebElement), "Title is not a WebElement"
if title.text != name.title():