mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Execute pre-commit-config and apply it
This commit is contained in:
parent
84d8c41faf
commit
2a0f9567de
29 changed files with 256 additions and 88 deletions
|
|
@ -47,7 +47,7 @@ repos:
|
|||
hooks:
|
||||
- id: flake8
|
||||
name: Flake8 Python Linter
|
||||
args: ["--max-line-length=160", "--ignore=E266,E402,E722,W503"]
|
||||
args: ["--max-line-length=160", "--ignore=E266,E402,E501,E722,W503"]
|
||||
|
||||
- repo: https://github.com/dosisod/refurb
|
||||
rev: a9a4edd45687e664dee0905ba1c848bda227d1d6 # frozen: v1.28.0
|
||||
|
|
|
|||
|
|
@ -101,7 +101,11 @@ class DockerController(Controller):
|
|||
)
|
||||
|
||||
def __process_event(self, event):
|
||||
return "Actor" in event and "Attributes" in event["Actor"] and ("bunkerweb.INSTANCE" in event["Actor"]["Attributes"] or "bunkerweb.SERVER_NAME" in event["Actor"]["Attributes"])
|
||||
return (
|
||||
"Actor" in event
|
||||
and "Attributes" in event["Actor"]
|
||||
and ("bunkerweb.INSTANCE" in event["Actor"]["Attributes"] or "bunkerweb.SERVER_NAME" in event["Actor"]["Attributes"])
|
||||
)
|
||||
|
||||
def process_events(self):
|
||||
self._set_autoconf_load_db()
|
||||
|
|
|
|||
|
|
@ -19,7 +19,11 @@ class IngressController(Controller):
|
|||
self.__networkingv1 = client.NetworkingV1Api()
|
||||
|
||||
def _get_controller_instances(self) -> list:
|
||||
return [pod for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items if (pod.metadata.annotations and "bunkerweb.io/INSTANCE" in pod.metadata.annotations)]
|
||||
return [
|
||||
pod
|
||||
for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items
|
||||
if (pod.metadata.annotations and "bunkerweb.io/INSTANCE" in pod.metadata.annotations)
|
||||
]
|
||||
|
||||
def _to_instances(self, controller_instance) -> List[dict]:
|
||||
instance = {}
|
||||
|
|
|
|||
|
|
@ -787,7 +787,7 @@ utils.get_phases = function()
|
|||
"log_stream",
|
||||
"log_default",
|
||||
"timer",
|
||||
"init_workers"
|
||||
"init_workers",
|
||||
}
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -140,7 +140,9 @@ class CLI(ApiCaller):
|
|||
self.__use_redis = False
|
||||
|
||||
if self.__use_redis:
|
||||
self.__logger.info(f"Connected to redis sentinel cluster, getting master with the following parameters:\n{sentinel_master=}\n{redis_db=}\n{username=}\n{password=}")
|
||||
self.__logger.info(
|
||||
f"Connected to redis sentinel cluster, getting master with the following parameters:\n{sentinel_master=}\n{redis_db=}\n{username=}\n{password=}"
|
||||
)
|
||||
self.__redis = sentinel.master_for(
|
||||
sentinel_master,
|
||||
db=redis_db,
|
||||
|
|
@ -148,7 +150,9 @@ class CLI(ApiCaller):
|
|||
password=password,
|
||||
)
|
||||
else:
|
||||
self.__logger.info(f"Connecting to redis with the following parameters:\n{redis_host=}\n{redis_port=}\n{redis_db=}\n{username=}\n{password=}\n{redis_timeout=}\nmax_connections={redis_keepalive_pool}\n{redis_ssl=}")
|
||||
self.__logger.info(
|
||||
f"Connecting to redis with the following parameters:\n{redis_host=}\n{redis_port=}\n{redis_db=}\n{username=}\n{password=}\n{redis_timeout=}\nmax_connections={redis_keepalive_pool}\n{redis_ssl=}"
|
||||
)
|
||||
self.__redis = StrictRedis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
|
|
|
|||
|
|
@ -124,7 +124,9 @@ try:
|
|||
else:
|
||||
logger.info(f"No change for certificate {cert_path}")
|
||||
elif not cert_path or not key_path:
|
||||
logger.warning("Both variables CUSTOM_SSL_CERT and CUSTOM_SSL_KEY (or CUSTOM_SSL_CERT_DATA and CUSTOM_SSL_KEY_DATA) have to be set to use custom certificates, clearing cache ...")
|
||||
logger.warning(
|
||||
"Both variables CUSTOM_SSL_CERT and CUSTOM_SSL_KEY (or CUSTOM_SSL_CERT_DATA and CUSTOM_SSL_KEY_DATA) have to be set to use custom certificates, clearing cache ..."
|
||||
)
|
||||
cert_cache_path = Path(
|
||||
sep,
|
||||
"var",
|
||||
|
|
@ -188,7 +190,9 @@ try:
|
|||
f"No change for certificate {cert_path}",
|
||||
)
|
||||
elif not cert_path or not key_path:
|
||||
logger.warning("Both variables CUSTOM_SSL_CERT and CUSTOM_SSL_KEY (or CUSTOM_SSL_CERT_DATA and CUSTOM_SSL_KEY_DATA) have to be set to use custom certificates, clearing cache ...")
|
||||
logger.warning(
|
||||
"Both variables CUSTOM_SSL_CERT and CUSTOM_SSL_KEY (or CUSTOM_SSL_CERT_DATA and CUSTOM_SSL_KEY_DATA) have to be set to use custom certificates, clearing cache ..."
|
||||
)
|
||||
cert_cache_path = Path(
|
||||
sep,
|
||||
"var",
|
||||
|
|
|
|||
|
|
@ -127,7 +127,16 @@ try:
|
|||
domains_sever_names = {server_names[0]: all_domains}
|
||||
|
||||
proc = run(
|
||||
[CERTBOT_BIN, "certificates", "--config-dir", LETS_ENCRYPT_PATH.joinpath("etc").as_posix(), "--work-dir", LETS_ENCRYPT_WORK_DIR, "--logs-dir", LETS_ENCRYPT_LOGS_DIR],
|
||||
[
|
||||
CERTBOT_BIN,
|
||||
"certificates",
|
||||
"--config-dir",
|
||||
LETS_ENCRYPT_PATH.joinpath("etc").as_posix(),
|
||||
"--work-dir",
|
||||
LETS_ENCRYPT_WORK_DIR,
|
||||
"--logs-dir",
|
||||
LETS_ENCRYPT_LOGS_DIR,
|
||||
],
|
||||
stdin=DEVNULL,
|
||||
stdout=PIPE,
|
||||
stderr=STDOUT,
|
||||
|
|
|
|||
|
|
@ -81,10 +81,14 @@ def install_plugin(plugin_dir: str, db, preview: bool = True) -> bool:
|
|||
break
|
||||
|
||||
if old_version == metadata["version"]:
|
||||
logger.warning(f"Skipping installation of {'preview version of ' if preview else ''}Pro plugin {metadata['id']} (version {metadata['version']} already installed)")
|
||||
logger.warning(
|
||||
f"Skipping installation of {'preview version of ' if preview else ''}Pro plugin {metadata['id']} (version {metadata['version']} already installed)"
|
||||
)
|
||||
return False
|
||||
|
||||
logger.warning(f"{'Preview version of ' if preview else ''}Pro plugin {metadata['id']} is already installed but version {metadata['version']} is different from database ({old_version}), updating it...")
|
||||
logger.warning(
|
||||
f"{'Preview version of ' if preview else ''}Pro plugin {metadata['id']} is already installed but version {metadata['version']} is different from database ({old_version}), updating it..."
|
||||
)
|
||||
rmtree(PRO_PLUGINS_DIR.joinpath(metadata["id"]), ignore_errors=True)
|
||||
|
||||
# Copy the plugin
|
||||
|
|
@ -180,9 +184,13 @@ try:
|
|||
|
||||
if not metadata["is_pro"]:
|
||||
if metadata["pro_overlapped"]:
|
||||
message = f"You have exceeded the number of services allowed by your BunkerWeb Pro license: {metadata['pro_services']} (current: {data['service_number']}"
|
||||
message = (
|
||||
f"You have exceeded the number of services allowed by your BunkerWeb Pro license: {metadata['pro_services']} (current: {data['service_number']}"
|
||||
)
|
||||
elif pro_license_key:
|
||||
message = "Your BunkerWeb Pro license " + (STATUS_MESSAGES.get(metadata["pro_status"], "is not valid or has expired") if not error else "is not valid or has expired")
|
||||
message = "Your BunkerWeb Pro license " + (
|
||||
STATUS_MESSAGES.get(metadata["pro_status"], "is not valid or has expired") if not error else "is not valid or has expired"
|
||||
)
|
||||
else:
|
||||
logger.info("If you wish to purchase a BunkerWeb Pro license, please visit https://panel.bunkerweb.io/")
|
||||
message = "No BunkerWeb Pro license key provided"
|
||||
|
|
|
|||
|
|
@ -62,7 +62,9 @@ def generate_cert(first_server: str, days: str, subj: str, self_signed_path: Pat
|
|||
if sorted(attribute.rfc4514_string() for attribute in certificate.subject) != sorted(v for v in subj.split("/") if v):
|
||||
logger.warning(f"Subject of self-signed certificate for {first_server} is different from the one in the configuration, regenerating ...")
|
||||
elif certificate.not_valid_after - certificate.not_valid_before != timedelta(days=int(days)):
|
||||
logger.warning(f"Expiration date of self-signed certificate for {first_server} is different from the one in the configuration, regenerating ...")
|
||||
logger.warning(
|
||||
f"Expiration date of self-signed certificate for {first_server} is different from the one in the configuration, regenerating ..."
|
||||
)
|
||||
else:
|
||||
return True, 0
|
||||
|
||||
|
|
|
|||
|
|
@ -281,7 +281,10 @@ class Configurator:
|
|||
return (False, f"missing keys for setting {setting} in plugin {plugin['id']}, must have context, default, help, id, label, regex and type")
|
||||
|
||||
if not self.__setting_id_rx.match(setting):
|
||||
return (False, f"Invalid setting name for setting {setting} in plugin {plugin['id']} (Can only contain capital letters and underscores (min 1 characters and max 256))")
|
||||
return (
|
||||
False,
|
||||
f"Invalid setting name for setting {setting} in plugin {plugin['id']} (Can only contain capital letters and underscores (min 1 characters and max 256))",
|
||||
)
|
||||
elif data["context"] not in ("global", "multisite"):
|
||||
return (False, f"Invalid context for setting {setting} in plugin {plugin['id']} (Must be global or multisite)")
|
||||
elif len(data["default"]) > 4096:
|
||||
|
|
@ -297,7 +300,10 @@ class Configurator:
|
|||
|
||||
if "multiple" in data:
|
||||
if not self.__name_rx.match(data["multiple"]):
|
||||
return (False, f"Invalid multiple for setting {setting} in plugin {plugin['id']} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128))")
|
||||
return (
|
||||
False,
|
||||
f"Invalid multiple for setting {setting} in plugin {plugin['id']} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128))",
|
||||
)
|
||||
|
||||
for select in data.get("select", []):
|
||||
if len(select) > 256:
|
||||
|
|
@ -310,7 +316,10 @@ class Configurator:
|
|||
if not self.__name_rx.match(job["name"]):
|
||||
return (False, f"Invalid name for job {job['name']} in plugin {plugin['id']}")
|
||||
elif not self.__job_file_rx.match(job["file"]):
|
||||
return (False, f"Invalid file for job {job['name']} in plugin {plugin['id']} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256))")
|
||||
return (
|
||||
False,
|
||||
f"Invalid file for job {job['name']} in plugin {plugin['id']} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256))",
|
||||
)
|
||||
elif job["every"] not in ("once", "minute", "hour", "day", "week"):
|
||||
return (False, f"Invalid every for job {job['name']} in plugin {plugin['id']} (Must be once, minute, hour, day or week)")
|
||||
elif job["reload"] is not True and job["reload"] is not False:
|
||||
|
|
|
|||
|
|
@ -29,7 +29,9 @@ if __name__ == "__main__":
|
|||
# Parse arguments
|
||||
parser = ArgumentParser(description="BunkerWeb config generator")
|
||||
parser.add_argument("--settings", default=join(sep, "usr", "share", "bunkerweb", "settings.json"), type=str, help="file containing the main settings")
|
||||
parser.add_argument("--templates", default=join(sep, "usr", "share", "bunkerweb", "confs"), type=str, help="directory containing the main template files")
|
||||
parser.add_argument(
|
||||
"--templates", default=join(sep, "usr", "share", "bunkerweb", "confs"), type=str, help="directory containing the main template files"
|
||||
)
|
||||
parser.add_argument("--core", default=join(sep, "usr", "share", "bunkerweb", "core"), type=str, help="directory containing the core plugins")
|
||||
parser.add_argument("--plugins", default=join(sep, "etc", "bunkerweb", "plugins"), type=str, help="directory containing the external plugins")
|
||||
parser.add_argument("--pro-plugins", default=join(sep, "etc", "bunkerweb", "pro", "plugins"), type=str, help="directory containing the pro plugins")
|
||||
|
|
@ -106,7 +108,9 @@ if __name__ == "__main__":
|
|||
|
||||
# Compute the config
|
||||
logger.info("Computing config ...")
|
||||
config: Dict[str, Any] = Configurator(str(settings_path), str(core_path), str(plugins_path), str(pro_plugins_path), str(variables_path), logger).get_config()
|
||||
config: Dict[str, Any] = Configurator(
|
||||
str(settings_path), str(core_path), str(plugins_path), str(pro_plugins_path), str(variables_path), logger
|
||||
).get_config()
|
||||
else:
|
||||
if join(sep, "usr", "share", "bunkerweb", "db") not in sys_path:
|
||||
sys_path.append(join(sep, "usr", "share", "bunkerweb", "db"))
|
||||
|
|
|
|||
|
|
@ -45,7 +45,9 @@ def get_instance_configs_and_apis(instance: Any, db, _type="Docker"):
|
|||
),
|
||||
}
|
||||
)
|
||||
logger.info(f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}")
|
||||
logger.info(
|
||||
f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}"
|
||||
)
|
||||
else:
|
||||
tmp_config[split[0]] = split[1]
|
||||
|
||||
|
|
@ -175,7 +177,9 @@ if __name__ == "__main__":
|
|||
),
|
||||
}
|
||||
)
|
||||
logger.info(f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}")
|
||||
logger.info(
|
||||
f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}"
|
||||
)
|
||||
|
||||
db = Database(logger, config_files.get("DATABASE_URI", None), pool=False)
|
||||
else:
|
||||
|
|
@ -206,7 +210,9 @@ if __name__ == "__main__":
|
|||
),
|
||||
}
|
||||
)
|
||||
logger.info(f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}")
|
||||
logger.info(
|
||||
f"Found custom conf env var {'for service ' + custom_conf[0] if custom_conf[0] else 'without service'} with type {custom_conf[1]} and name {custom_conf[2]}"
|
||||
)
|
||||
else:
|
||||
tmp_config[split[0]] = split[1]
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,9 @@ class ConfigCaller:
|
|||
def __init__(self):
|
||||
self.__logger = setup_logger("Config", "INFO")
|
||||
self._settings = loads(Path(sep, "usr", "share", "bunkerweb", "settings.json").read_text(encoding="utf-8"))
|
||||
for plugin in glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json")) + glob(join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json")):
|
||||
for plugin in glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json")) + glob(
|
||||
join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json")
|
||||
):
|
||||
try:
|
||||
self._settings.update(loads(Path(plugin).read_text(encoding="utf-8"))["settings"])
|
||||
except KeyError:
|
||||
|
|
@ -36,7 +38,9 @@ class ConfigCaller:
|
|||
return self._settings[setting]["context"] == context
|
||||
elif match(r"^.+_\d+$", setting):
|
||||
multiple_setting = "_".join(setting.split("_")[:-1])
|
||||
return self._is_setting(multiple_setting) and self._settings[multiple_setting]["context"] == context and "multiple" in self._settings[multiple_setting]
|
||||
return (
|
||||
self._is_setting(multiple_setting) and self._settings[multiple_setting]["context"] == context and "multiple" in self._settings[multiple_setting]
|
||||
)
|
||||
return False
|
||||
|
||||
def _full_env(self, env_instances: Dict[str, Any], env_services: Dict[str, Any]) -> Dict[str, Any]:
|
||||
|
|
|
|||
|
|
@ -89,7 +89,9 @@ class JobScheduler(ApiCaller):
|
|||
def __get_jobs(self):
|
||||
jobs = {}
|
||||
for plugin_file in (
|
||||
glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json")) + glob(join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json")) + glob(join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"))
|
||||
glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json"))
|
||||
+ glob(join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json"))
|
||||
+ glob(join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"))
|
||||
): # core plugins # external plugins # pro plugins
|
||||
plugin_name = basename(dirname(plugin_file))
|
||||
jobs[plugin_name] = []
|
||||
|
|
@ -102,20 +104,28 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
for x, job in enumerate(deepcopy(plugin_jobs)):
|
||||
if not all(key in job.keys() for key in ("name", "file", "every", "reload")):
|
||||
self.__logger.warning(f"missing keys for job {job['name']} in plugin {plugin_name}, must have name, file, every and reload, ignoring job")
|
||||
self.__logger.warning(
|
||||
f"missing keys for job {job['name']} in plugin {plugin_name}, must have name, file, every and reload, ignoring job"
|
||||
)
|
||||
plugin_jobs.pop(x)
|
||||
continue
|
||||
|
||||
if not match(r"^[\w.-]{1,128}$", job["name"]):
|
||||
self.__logger.warning(f"Invalid name for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128)), ignoring job")
|
||||
self.__logger.warning(
|
||||
f"Invalid name for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128)), ignoring job"
|
||||
)
|
||||
plugin_jobs.pop(x)
|
||||
continue
|
||||
elif not match(r"^[\w./-]{1,256}$", job["file"]):
|
||||
self.__logger.warning(f"Invalid file for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256)), ignoring job")
|
||||
self.__logger.warning(
|
||||
f"Invalid file for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256)), ignoring job"
|
||||
)
|
||||
plugin_jobs.pop(x)
|
||||
continue
|
||||
elif job["every"] not in ("once", "minute", "hour", "day", "week"):
|
||||
self.__logger.warning(f"Invalid every for job {job['name']} in plugin {plugin_name} (Must be once, minute, hour, day or week), ignoring job")
|
||||
self.__logger.warning(
|
||||
f"Invalid every for job {job['name']} in plugin {plugin_name} (Must be once, minute, hour, day or week), ignoring job"
|
||||
)
|
||||
plugin_jobs.pop(x)
|
||||
continue
|
||||
elif job["reload"] is not True and job["reload"] is not False:
|
||||
|
|
@ -152,7 +162,9 @@ class JobScheduler(ApiCaller):
|
|||
if reload:
|
||||
self.__logger.info("Successfully reloaded nginx")
|
||||
else:
|
||||
self.__logger.error(f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stderr.decode() if proc.stderr else 'Missing stderr'}")
|
||||
self.__logger.error(
|
||||
f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stderr.decode() if proc.stderr else 'Missing stderr'}"
|
||||
)
|
||||
else:
|
||||
self.__logger.info("Reloading nginx ...")
|
||||
reload = self.send_to_apis("POST", "/reload")
|
||||
|
|
|
|||
|
|
@ -217,7 +217,13 @@ if __name__ == "__main__":
|
|||
sleep(5)
|
||||
|
||||
env = db.get_config()
|
||||
elif not tmp_variables_path.exists() or not nginx_variables_path.exists() or (tmp_variables_path.read_text(encoding="utf-8") != nginx_variables_path.read_text(encoding="utf-8")) or db.is_initialized() and db.get_config() != dotenv_env:
|
||||
elif (
|
||||
not tmp_variables_path.exists()
|
||||
or not nginx_variables_path.exists()
|
||||
or (tmp_variables_path.read_text(encoding="utf-8") != nginx_variables_path.read_text(encoding="utf-8"))
|
||||
or db.is_initialized()
|
||||
and db.get_config() != dotenv_env
|
||||
):
|
||||
# run the config saver
|
||||
proc = subprocess_run(
|
||||
[
|
||||
|
|
@ -494,7 +500,9 @@ if __name__ == "__main__":
|
|||
if proc.returncode == 0:
|
||||
logger.info("Successfully sent reload signal to nginx")
|
||||
else:
|
||||
logger.error(f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stdout.decode('utf-8') if proc.stdout else 'no output'}")
|
||||
logger.error(
|
||||
f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stdout.decode('utf-8') if proc.stdout else 'no output'}"
|
||||
)
|
||||
else:
|
||||
logger.warning("No BunkerWeb instance found, skipping nginx reload ...")
|
||||
except:
|
||||
|
|
|
|||
|
|
@ -167,7 +167,9 @@ elif getenv("ADMIN_USERNAME") and getenv("ADMIN_PASSWORD"):
|
|||
app.logger.error("The admin username is too long. It must be less than 256 characters.")
|
||||
stop(1)
|
||||
elif not USER_PASSWORD_RX.match(getenv("ADMIN_PASSWORD", "changeme")):
|
||||
app.logger.error("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).")
|
||||
app.logger.error(
|
||||
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-)."
|
||||
)
|
||||
stop(1)
|
||||
|
||||
user_name = getenv("ADMIN_USERNAME", "admin")
|
||||
|
|
@ -231,7 +233,9 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
|
|||
if listdir(service_custom_conf):
|
||||
move(service_custom_conf, service_custom_conf.replace(f"{sep}{args[1].split(' ')[0]}", f"{sep}{args[2].split(' ')[0]}"))
|
||||
moved = True
|
||||
operation, error = app.config["CONFIG"].edit_service(args[1], args[0], check_changes=(was_draft != is_draft or not is_draft) and not moved, is_draft=is_draft)
|
||||
operation, error = app.config["CONFIG"].edit_service(
|
||||
args[1], args[0], check_changes=(was_draft != is_draft or not is_draft) and not moved, is_draft=is_draft
|
||||
)
|
||||
elif operation == "delete":
|
||||
for service_custom_conf in glob(join(sep, "etc", "bunkerweb", "configs", "*", args[2].split(" ")[0])):
|
||||
if listdir(service_custom_conf):
|
||||
|
|
@ -255,7 +259,9 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
|
|||
ret = db.checked_changes(changes, value=True)
|
||||
if ret:
|
||||
app.logger.error(f"Couldn't set the changes to checked in the database: {ret}")
|
||||
app.config["TO_FLASH"].append({"content": f"An error occurred when setting the changes to checked in the database : {ret}", "type": "error"})
|
||||
app.config["TO_FLASH"].append(
|
||||
{"content": f"An error occurred when setting the changes to checked in the database : {ret}", "type": "error"}
|
||||
)
|
||||
if method == "global_config":
|
||||
operation = app.config["CONFIG"].edit_global_conf(args[0])
|
||||
elif method == "plugins":
|
||||
|
|
@ -393,11 +399,22 @@ def before_request():
|
|||
passed = True
|
||||
|
||||
# Go back from totp to login
|
||||
if not session.get("totp_validated", False) and current_user.is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")) and request.path.endswith("/login"):
|
||||
if (
|
||||
not session.get("totp_validated", False)
|
||||
and current_user.is_two_factor_enabled
|
||||
and "/totp" not in request.path
|
||||
and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts"))
|
||||
and request.path.endswith("/login")
|
||||
):
|
||||
return redirect(url_for("login", next=request.path))
|
||||
|
||||
# Case not login page, keep on 2FA before any other access
|
||||
if not session.get("totp_validated", False) and current_user.is_two_factor_enabled and "/totp" not in request.path and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts")):
|
||||
if (
|
||||
not session.get("totp_validated", False)
|
||||
and current_user.is_two_factor_enabled
|
||||
and "/totp" not in request.path
|
||||
and not request.path.startswith(("/css", "/images", "/js", "/json", "/webfonts"))
|
||||
):
|
||||
return redirect(url_for("totp", next=request.form.get("next")))
|
||||
elif session.get("ip") != request.remote_addr:
|
||||
passed = False
|
||||
|
|
@ -448,7 +465,9 @@ def setup():
|
|||
is_request_form("setup")
|
||||
|
||||
if not any(key in request.form for key in ("admin_username", "admin_password", "admin_password_check", "server_name", "ui_host", "ui_url")):
|
||||
return redirect_flash_error("Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "setup")
|
||||
return redirect_flash_error(
|
||||
"Missing either admin_username, admin_password, admin_password_check, server_name, ui_host, ui_url or auto_lets_encrypt parameter.", "setup"
|
||||
)
|
||||
|
||||
if len(request.form["admin_username"]) > 256:
|
||||
return redirect_flash_error("The admin username is too long. It must be less than 256 characters.", "setup")
|
||||
|
|
@ -457,7 +476,10 @@ def setup():
|
|||
return redirect_flash_error("The passwords do not match.", "setup")
|
||||
|
||||
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
|
||||
return redirect_flash_error("The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).", "setup")
|
||||
return redirect_flash_error(
|
||||
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).",
|
||||
"setup",
|
||||
)
|
||||
|
||||
server_names = db_config["SERVER_NAME"].split(" ")
|
||||
if request.form["server_name"] in server_names:
|
||||
|
|
@ -631,7 +653,8 @@ def account():
|
|||
|
||||
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
|
||||
return redirect_flash_error(
|
||||
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)", "account"
|
||||
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)",
|
||||
"account",
|
||||
)
|
||||
|
||||
password = request.form["admin_password"]
|
||||
|
|
@ -652,12 +675,18 @@ def account():
|
|||
app.config["CURRENT_TOTP_TOKEN"] = None
|
||||
|
||||
user = User(username, password, is_two_factor_enabled=is_two_factor_enabled, secret_token=secret_token, method=current_user.method)
|
||||
ret = db.update_ui_user(username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
|
||||
ret = db.update_ui_user(
|
||||
username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui"
|
||||
)
|
||||
if ret:
|
||||
return redirect_flash_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
|
||||
|
||||
flash(
|
||||
f"The {request.form['operation']} has been successfully updated." if request.form["operation"] != "totp" else f"The two-factor authentication was successfully {'disabled' if current_user.is_two_factor_enabled else 'enabled'}.",
|
||||
(
|
||||
f"The {request.form['operation']} has been successfully updated."
|
||||
if request.form["operation"] != "totp"
|
||||
else f"The two-factor authentication was successfully {'disabled' if current_user.is_two_factor_enabled else 'enabled'}."
|
||||
),
|
||||
)
|
||||
|
||||
return redirect(url_for("account" if request.form["operation"] == "totp" else "login"))
|
||||
|
|
@ -763,10 +792,20 @@ def services():
|
|||
elif value == "off":
|
||||
value = "no"
|
||||
|
||||
if variable in variables and variable != "SERVER_NAME" and value == config.get(f"{server_name}_{variable}" if request.form["operation"] == "edit" else variable, None):
|
||||
if (
|
||||
variable in variables
|
||||
and variable != "SERVER_NAME"
|
||||
and value == config.get(f"{server_name}_{variable}" if request.form["operation"] == "edit" else variable, None)
|
||||
):
|
||||
del variables[variable]
|
||||
|
||||
if was_draft == is_draft and request.form["operation"] == "edit" and len(variables) == 1 and "SERVER_NAME" in variables and variables["SERVER_NAME"] == request.form.get("OLD_SERVER_NAME", ""):
|
||||
if (
|
||||
was_draft == is_draft
|
||||
and request.form["operation"] == "edit"
|
||||
and len(variables) == 1
|
||||
and "SERVER_NAME" in variables
|
||||
and variables["SERVER_NAME"] == request.form.get("OLD_SERVER_NAME", "")
|
||||
):
|
||||
return redirect_flash_error("The service was not edited because no values were changed.", "services", True)
|
||||
|
||||
elif request.form["operation"] == "new" and not variables:
|
||||
|
|
@ -1298,7 +1337,7 @@ def upload_plugin():
|
|||
def custom_plugin(plugin: str):
|
||||
message = ""
|
||||
if not plugin_id_rx.match(plugin):
|
||||
return error_message(f"Invalid plugin id, (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)"), 400
|
||||
return error_message("Invalid plugin id, (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)"), 400
|
||||
|
||||
# Case we ware looking for a plugin template
|
||||
# We need to check if a page exists, and if it does, we need to check if the plugin is activated and metrics are on
|
||||
|
|
@ -1308,7 +1347,7 @@ def custom_plugin(plugin: str):
|
|||
page = db.get_plugin_template(plugin)
|
||||
|
||||
if not page:
|
||||
return error_message(f"The plugin does not have a template"), 404
|
||||
return error_message("The plugin does not have a template"), 404
|
||||
|
||||
# Case template, prepare data
|
||||
plugins = app.config["CONFIG"].get_plugins()
|
||||
|
|
@ -1327,7 +1366,7 @@ def custom_plugin(plugin: str):
|
|||
|
||||
# Case no plugin found
|
||||
if plugin_id is None:
|
||||
return error_message(f"Plugin not found"), 404
|
||||
return error_message("Plugin not found"), 404
|
||||
|
||||
config = app.config["CONFIG"].get_config(methods=False)
|
||||
|
||||
|
|
@ -1409,7 +1448,7 @@ def custom_plugin(plugin: str):
|
|||
module = db.get_plugin_actions(plugin)
|
||||
|
||||
if module is None:
|
||||
return error_message(f"The actions.py file for the plugin does not exist"), 404
|
||||
return error_message("The actions.py file for the plugin does not exist"), 404
|
||||
|
||||
try:
|
||||
# Try to import the custom plugin
|
||||
|
|
@ -1420,7 +1459,7 @@ def custom_plugin(plugin: str):
|
|||
loader = SourceFileLoader("actions", temp.name)
|
||||
actions = loader.load_module()
|
||||
except:
|
||||
return error_message(f"An error occurred while importing the plugin, see logs for more details"), 500
|
||||
return error_message("An error occurred while importing the plugin, see logs for more details"), 500
|
||||
|
||||
res = None
|
||||
|
||||
|
|
@ -1435,9 +1474,9 @@ def custom_plugin(plugin: str):
|
|||
|
||||
res = method(app=app, args=queries, data=data)
|
||||
except AttributeError:
|
||||
message = f"The plugin does not have a method, see logs for more details"
|
||||
message = "The plugin does not have a method, see logs for more details"
|
||||
except:
|
||||
message = f"An error occurred while executing the plugin, see logs for more details"
|
||||
message = "An error occurred while executing the plugin, see logs for more details"
|
||||
finally:
|
||||
if sbin_nginx_path.is_file():
|
||||
# Remove the custom plugin from the shared library
|
||||
|
|
@ -1533,7 +1572,9 @@ def logs_linux():
|
|||
if nginx_access_file.is_file():
|
||||
with open(nginx_access_file, encoding="utf-8") as f:
|
||||
for line in f.readlines()[int(last_update.split(".")[1]) if last_update else 0 :]: # noqa: E203
|
||||
logs_access.append(f"{datetime.strptime(line[line.find('[') + 1: line.find(']')], '%d/%b/%Y:%H:%M:%S %z').replace(tzinfo=timezone.utc).timestamp()} {line}")
|
||||
logs_access.append(
|
||||
f"{datetime.strptime(line[line.find('[') + 1: line.find(']')], '%d/%b/%Y:%H:%M:%S %z').replace(tzinfo=timezone.utc).timestamp()} {line}"
|
||||
)
|
||||
|
||||
raw_logs = logs_error + logs_access
|
||||
|
||||
|
|
@ -1584,7 +1625,11 @@ def logs_linux():
|
|||
return jsonify(
|
||||
{
|
||||
"logs": logs,
|
||||
"last_update": f"{count_error_logs + int(last_update.split('.')[0])}.{len(logs_access) + int(last_update.split('.')[1])}" if last_update else f"{count_error_logs}.{len(logs_access)}",
|
||||
"last_update": (
|
||||
f"{count_error_logs + int(last_update.split('.')[0])}.{len(logs_access) + int(last_update.split('.')[1])}"
|
||||
if last_update
|
||||
else f"{count_error_logs}.{len(logs_access)}"
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -34,7 +34,9 @@ class Instance:
|
|||
self.name = name
|
||||
self.hostname = hostname
|
||||
self._type = _type
|
||||
self.health = status == "up" and ((data.attrs["State"]["Health"]["Status"] == "healthy" if "Health" in data.attrs["State"] else False) if _type == "container" and data else True)
|
||||
self.health = status == "up" and (
|
||||
(data.attrs["State"]["Health"]["Status"] == "healthy" if "Health" in data.attrs["State"] else False) if _type == "container" and data else True
|
||||
)
|
||||
self.env = data
|
||||
self.apiCaller = apiCaller or ApiCaller()
|
||||
|
||||
|
|
@ -396,7 +398,10 @@ class Instances:
|
|||
if not resp:
|
||||
continue
|
||||
|
||||
if not isinstance(instance_metrics.get(instance_name, {"msg": None}).get("msg"), dict) or instance_metrics[instance_name].get("status", "error") != "success":
|
||||
if (
|
||||
not isinstance(instance_metrics.get(instance_name, {"msg": None}).get("msg"), dict)
|
||||
or instance_metrics[instance_name].get("status", "error") != "success"
|
||||
):
|
||||
continue
|
||||
|
||||
metric_data = instance_metrics[instance_name]["msg"]
|
||||
|
|
|
|||
|
|
@ -117,7 +117,9 @@ def path_to_dict(
|
|||
}
|
||||
|
||||
if conf["service_id"]:
|
||||
d["children"][config_types.index(type_lower)]["children"][[x["name"] for x in d["children"][config_types.index(type_lower)]["children"]].index(conf["service_id"])]["children"].append(file_info)
|
||||
d["children"][config_types.index(type_lower)]["children"][
|
||||
[x["name"] for x in d["children"][config_types.index(type_lower)]["children"]].index(conf["service_id"])
|
||||
]["children"].append(file_info)
|
||||
else:
|
||||
d["children"][config_types.index(type_lower)]["children"].append(file_info)
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -324,7 +324,12 @@ if distro == "ubuntu":
|
|||
print("❌ /usr/bin/bwcli found.")
|
||||
# Checking Removing test
|
||||
try:
|
||||
if pathlib.Path("/usr/share/bunkerweb").is_dir() or pathlib.Path("/var/tmp/bunkerweb").is_dir() or pathlib.Path("/var/cache/bunkerweb").is_dir() or pathlib.Path("/usr/bin/bwcli").is_file():
|
||||
if (
|
||||
pathlib.Path("/usr/share/bunkerweb").is_dir()
|
||||
or pathlib.Path("/var/tmp/bunkerweb").is_dir()
|
||||
or pathlib.Path("/var/cache/bunkerweb").is_dir()
|
||||
or pathlib.Path("/usr/bin/bwcli").is_file()
|
||||
):
|
||||
test_results["Removing test"] = "KO"
|
||||
else:
|
||||
test_results["Removing test"] = "OK"
|
||||
|
|
@ -839,7 +844,12 @@ elif distro == "debian":
|
|||
print("❌ /usr/bin/bwcli found.")
|
||||
# Checking Removing test
|
||||
try:
|
||||
if pathlib.Path("/usr/share/bunkerweb").is_dir() or pathlib.Path("/var/tmp/bunkerweb").is_dir() or pathlib.Path("/var/cache/bunkerweb").is_dir() or pathlib.Path("/usr/bin/bwcli").is_file():
|
||||
if (
|
||||
pathlib.Path("/usr/share/bunkerweb").is_dir()
|
||||
or pathlib.Path("/var/tmp/bunkerweb").is_dir()
|
||||
or pathlib.Path("/var/cache/bunkerweb").is_dir()
|
||||
or pathlib.Path("/usr/bin/bwcli").is_file()
|
||||
):
|
||||
test_results["Removing test"] = "KO"
|
||||
else:
|
||||
test_results["Removing test"] = "OK"
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@
|
|||
- name: Add IP address of all hosts to all hosts
|
||||
lineinfile:
|
||||
dest: /etc/hosts
|
||||
regexp: '.*{{ item }}$'
|
||||
regexp: ".*{{ item }}$"
|
||||
line: "{{ hostvars[item].local_ip }} {{item}}"
|
||||
state: present
|
||||
when: hostvars[item].local_ip is defined
|
||||
|
|
|
|||
|
|
@ -39,7 +39,8 @@ try:
|
|||
|
||||
status_code = get(
|
||||
"http://www.example.com",
|
||||
headers={"Host": "www.example.com"} | ({"X-Forwarded-For": "2.0.0.3" if country == "GB" else "8.0.0.3"} if getenv("TEST_TYPE", "docker") == "linux" else {}),
|
||||
headers={"Host": "www.example.com"}
|
||||
| ({"X-Forwarded-For": "2.0.0.3" if country == "GB" else "8.0.0.3"} if getenv("TEST_TYPE", "docker") == "linux" else {}),
|
||||
).status_code
|
||||
|
||||
if status_code == 403:
|
||||
|
|
|
|||
|
|
@ -337,7 +337,12 @@ try:
|
|||
)
|
||||
exit(1)
|
||||
|
||||
if plugin.name != current_plugin[plugin.id]["name"] or plugin.description != current_plugin[plugin.id]["description"] or plugin.version != current_plugin[plugin.id]["version"] or plugin.stream != current_plugin[plugin.id]["stream"]:
|
||||
if (
|
||||
plugin.name != current_plugin[plugin.id]["name"]
|
||||
or plugin.description != current_plugin[plugin.id]["description"]
|
||||
or plugin.version != current_plugin[plugin.id]["version"]
|
||||
or plugin.stream != current_plugin[plugin.id]["stream"]
|
||||
):
|
||||
print(
|
||||
f"❌ The {'external' if plugin.type == 'external' else 'core'} plugin {plugin.name} (id: {plugin.id}) is in the database but is not correct, exiting ...\n"
|
||||
+ f"{dumps({'name': plugin.name, 'description': plugin.description, 'version': plugin.version, 'stream': plugin.stream})}"
|
||||
|
|
@ -472,7 +477,11 @@ try:
|
|||
)
|
||||
exit(1)
|
||||
|
||||
path_ui = Path(join("bunkerweb", "core", plugin_page.plugin_id, "ui")) if Path(join("bunkerweb", "core", plugin_page.plugin_id, "ui")).exists() else Path(join("external", plugin_page.plugin_id, "ui"))
|
||||
path_ui = (
|
||||
Path(join("bunkerweb", "core", plugin_page.plugin_id, "ui"))
|
||||
if Path(join("bunkerweb", "core", plugin_page.plugin_id, "ui")).exists()
|
||||
else Path(join("external", plugin_page.plugin_id, "ui"))
|
||||
)
|
||||
|
||||
if not path_ui.exists():
|
||||
print(
|
||||
|
|
@ -571,7 +580,10 @@ try:
|
|||
flush=True,
|
||||
)
|
||||
exit(1)
|
||||
elif custom_config.data.replace(b"# CREATED BY ENV\n", b"") != current_custom_configs[custom_config.name]["value"] and custom_config.data.replace(b"# CREATED BY ENV\n", b"") != current_custom_configs[custom_config.name]["value"] + b"\n":
|
||||
elif (
|
||||
custom_config.data.replace(b"# CREATED BY ENV\n", b"") != current_custom_configs[custom_config.name]["value"]
|
||||
and custom_config.data.replace(b"# CREATED BY ENV\n", b"") != current_custom_configs[custom_config.name]["value"] + b"\n"
|
||||
):
|
||||
print(
|
||||
f"❌ The custom config {custom_config.name} is in the database but the value differ, exiting ...\n{custom_config.data} (database) != {current_custom_configs[custom_config.name]['value']} (env)",
|
||||
flush=True,
|
||||
|
|
|
|||
|
|
@ -41,7 +41,9 @@ try:
|
|||
retries = 0
|
||||
|
||||
while not passed and retries < 10:
|
||||
status_code = get("http://www.example.com", headers={"Host": "www.example.com"} | ({"X-Forwarded-For": getenv("IP_ADDRESS", "")} if TEST_TYPE == "linux" else {})).status_code
|
||||
status_code = get(
|
||||
"http://www.example.com", headers={"Host": "www.example.com"} | ({"X-Forwarded-For": getenv("IP_ADDRESS", "")} if TEST_TYPE == "linux" else {})
|
||||
).status_code
|
||||
|
||||
if status_code == 403:
|
||||
if not use_dnsbl:
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ try:
|
|||
|
||||
log_info("username 'admin' is correctly set by default, trying username update ...")
|
||||
|
||||
DRIVER.execute_script(f"return arguments[0].value = 'admin2'", username_input)
|
||||
DRIVER.execute_script("return arguments[0].value = 'admin2'", username_input)
|
||||
|
||||
password_input = safe_get_element(DRIVER, By.ID, "curr_password")
|
||||
assert isinstance(password_input, WebElement), "The password input is not an instance of WebElement"
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
from logging import info as log_info, exception as log_exception, error as log_error, warning as log_warning
|
||||
from time import sleep
|
||||
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.remote.webelement import WebElement
|
||||
|
|
@ -25,7 +24,7 @@ try:
|
|||
is_no_match_hidden = DRIVER.execute_script('return document.querySelector("[data-global-config-nomatch]").classList.contains("hidden")')
|
||||
|
||||
if is_no_match_hidden:
|
||||
log_error(f"Filter keyword shouldn't match something.")
|
||||
log_error("Filter keyword shouldn't match something.")
|
||||
exit(1)
|
||||
|
||||
# Reset
|
||||
|
|
@ -37,16 +36,16 @@ try:
|
|||
input_keyword.send_keys("http port")
|
||||
|
||||
# Check that the matching element is shown and other card hide
|
||||
is_http_port_hidden = DRIVER.execute_script(f"""return document.querySelector('#form-edit-global-config-http-port').classList.contains('hidden')""")
|
||||
is_http_port_hidden = DRIVER.execute_script("return document.querySelector('#form-edit-global-config-http-port').classList.contains('hidden')")
|
||||
|
||||
if is_http_port_hidden:
|
||||
log_error(f"hidden http port should be match.")
|
||||
log_error("hidden http port should be match.")
|
||||
exit(1)
|
||||
|
||||
is_https_port_hidden = DRIVER.execute_script(f"""return document.querySelector('#form-edit-global-config-https-port').classList.contains('hidden')""")
|
||||
is_https_port_hidden = DRIVER.execute_script("return document.querySelector('#form-edit-global-config-https-port').classList.contains('hidden')")
|
||||
|
||||
if not is_https_port_hidden:
|
||||
log_error(f"Setting https port should not be match.")
|
||||
log_error("Setting https port should not be match.")
|
||||
exit(1)
|
||||
|
||||
# Reset
|
||||
|
|
|
|||
|
|
@ -59,7 +59,9 @@ try:
|
|||
]
|
||||
|
||||
for item in select_filters:
|
||||
DRIVER.execute_script(f"""return document.querySelector('[data-plugins-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()""")
|
||||
DRIVER.execute_script(
|
||||
f"""return document.querySelector('[data-plugins-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()"""
|
||||
)
|
||||
|
||||
log_info("The filter is working, trying to add a bad plugin ...")
|
||||
|
||||
|
|
|
|||
|
|
@ -48,7 +48,9 @@ try:
|
|||
]
|
||||
|
||||
for item in select_filters:
|
||||
DRIVER.execute_script(f"""return document.querySelector('[data-reports-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()""")
|
||||
DRIVER.execute_script(
|
||||
f"""return document.querySelector('[data-reports-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()"""
|
||||
)
|
||||
|
||||
filter_input = safe_get_element(DRIVER, By.ID, "keyword")
|
||||
assert isinstance(filter_input, WebElement), "Keyword filter input is not a WebElement"
|
||||
|
|
|
|||
|
|
@ -64,16 +64,16 @@ try:
|
|||
|
||||
log_info("Check only one plugin is visible ...")
|
||||
|
||||
is_general_plugin_hidden = DRIVER.execute_script(f"""return document.querySelector('[data-plugin-item="general"]').classList.contains('hidden')""")
|
||||
is_general_plugin_hidden = DRIVER.execute_script("""return document.querySelector('[data-plugin-item="general"]').classList.contains('hidden')""")
|
||||
|
||||
if is_general_plugin_hidden:
|
||||
log_error(f"Plugin general should be visible.")
|
||||
log_error("Plugin general should be visible.")
|
||||
exit(1)
|
||||
|
||||
is_antibot_plugin_hidden = DRIVER.execute_script(f"""return document.querySelector('[data-plugin-item="antibot"]').classList.contains('hidden')""")
|
||||
is_antibot_plugin_hidden = DRIVER.execute_script("""return document.querySelector('[data-plugin-item="antibot"]').classList.contains('hidden')""")
|
||||
|
||||
if not is_antibot_plugin_hidden:
|
||||
log_error(f"Plugin antibot should not be visible.")
|
||||
log_error("Plugin antibot should not be visible.")
|
||||
exit(1)
|
||||
|
||||
log_info("Only one plugin visible checked, trying keyword no match ...")
|
||||
|
|
@ -85,7 +85,7 @@ try:
|
|||
# Check that the no matching element is shown and other card hide
|
||||
is_no_match = DRIVER.execute_script('return document.querySelector("[data-services-nomatch]").classList.contains("hidden")')
|
||||
if is_no_match:
|
||||
log_error(f"Filter keyword shouldn't match something.")
|
||||
log_error("Filter keyword shouldn't match something.")
|
||||
exit(1)
|
||||
|
||||
# Reset
|
||||
|
|
@ -97,16 +97,16 @@ try:
|
|||
input_keyword.send_keys("server type")
|
||||
|
||||
# Check that the matching element is shown and other card hide
|
||||
is_server_type_hidden = DRIVER.execute_script(f"""return document.querySelector('#form-edit-services-server-type').classList.contains('hidden')""")
|
||||
is_server_type_hidden = DRIVER.execute_script("return document.querySelector('#form-edit-services-server-type').classList.contains('hidden')")
|
||||
|
||||
if is_server_type_hidden:
|
||||
log_error(f"Setting server type should be match.")
|
||||
log_error("Setting server type should be match.")
|
||||
exit(1)
|
||||
|
||||
is_server_name_hidden = DRIVER.execute_script(f"""return document.querySelector('#form-edit-services-server-name').classList.contains('hidden')""")
|
||||
is_server_name_hidden = DRIVER.execute_script("return document.querySelector('#form-edit-services-server-name').classList.contains('hidden')")
|
||||
|
||||
if not is_server_name_hidden:
|
||||
log_error(f"Setting server name should not be match.")
|
||||
log_error("Setting server name should not be match.")
|
||||
exit(1)
|
||||
|
||||
# Reset
|
||||
|
|
@ -405,7 +405,7 @@ try:
|
|||
# Check that the no matching element is shown and other card hide
|
||||
is_no_match = DRIVER.execute_script('return document.querySelector("[data-services-nomatch-card]").classList.contains("hidden")')
|
||||
if is_no_match:
|
||||
log_error(f"Filter keyword shouldn't match something.")
|
||||
log_error("Filter keyword shouldn't match something.")
|
||||
exit(1)
|
||||
|
||||
# Reset
|
||||
|
|
@ -421,12 +421,16 @@ try:
|
|||
]
|
||||
|
||||
for item in select_filters:
|
||||
DRIVER.execute_script(f"""return document.querySelector('[data-services-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()""")
|
||||
DRIVER.execute_script(
|
||||
f"""return document.querySelector('[data-services-setting-select-dropdown-btn="{item["id"]}"][value="{item["value"]}"]').click()"""
|
||||
)
|
||||
|
||||
log_info("Filters working as expected, trying to delete app3.example.com ...")
|
||||
|
||||
try:
|
||||
delete_card_button = safe_get_element(DRIVER, By.XPATH, "//button[@data-services-action='delete' and @data-services-name='app3.example.com']", error=True)
|
||||
delete_card_button = safe_get_element(
|
||||
DRIVER, By.XPATH, "//button[@data-services-action='delete' and @data-services-name='app3.example.com']", error=True
|
||||
)
|
||||
assert isinstance(delete_card_button, WebElement), "Delete button is not a WebElement"
|
||||
assert_button_click(DRIVER, delete_card_button)
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,9 @@ from selenium.webdriver.support import expected_conditions as EC
|
|||
from selenium.common.exceptions import ElementClickInterceptedException, TimeoutException, WebDriverException
|
||||
|
||||
|
||||
def safe_get_element(driver, by: str, selector: str, *, driver_wait: Optional[WebDriverWait] = None, multiple: bool = False, error: bool = False) -> Union[WebElement, List[WebElement]]:
|
||||
def safe_get_element(
|
||||
driver, by: str, selector: str, *, driver_wait: Optional[WebDriverWait] = None, multiple: bool = False, error: bool = False
|
||||
) -> Union[WebElement, List[WebElement]]:
|
||||
try:
|
||||
# Retrieve by js script
|
||||
if by == "js":
|
||||
|
|
@ -35,7 +37,9 @@ def safe_get_element(driver, by: str, selector: str, *, driver_wait: Optional[We
|
|||
return el
|
||||
|
||||
# Retrieve with XPATH
|
||||
return (driver_wait or WebDriverWait(driver, 4)).until(EC.presence_of_element_located((by, selector)) if not multiple else EC.presence_of_all_elements_located((by, selector)))
|
||||
return (driver_wait or WebDriverWait(driver, 4)).until(
|
||||
EC.presence_of_element_located((by, selector)) if not multiple else EC.presence_of_all_elements_located((by, selector))
|
||||
)
|
||||
except TimeoutException as e:
|
||||
|
||||
if error:
|
||||
|
|
@ -115,7 +119,9 @@ def access_page(driver, button: Union[bool, str, WebElement], name: str, message
|
|||
if not isinstance(button, bool) and not clicked:
|
||||
clicked = assert_button_click(driver, button)
|
||||
|
||||
title: Union[WebElement, List[WebElement]] = safe_get_element(driver, By.XPATH, "/html/body/div[3]/header/div/nav/h6", driver_wait=WebDriverWait(driver, 45))
|
||||
title: Union[WebElement, List[WebElement]] = safe_get_element(
|
||||
driver, By.XPATH, "/html/body/div[3]/header/div/nav/h6", driver_wait=WebDriverWait(driver, 45)
|
||||
)
|
||||
assert isinstance(title, WebElement), "Title is not a WebElement"
|
||||
|
||||
if title.text != name.title():
|
||||
|
|
|
|||
Loading…
Reference in a new issue