refactor main.py

* better form params handling
* refactor bans endpoint
* refactor services endpoint
This commit is contained in:
Jordan Blasenhauer 2024-07-29 11:40:21 +02:00
parent 59ae53a2a9
commit 5adecce14b
2 changed files with 260 additions and 250 deletions

View file

@ -34,7 +34,7 @@ export default defineConfig({
},
},
esbuild: {
drop: ["console.log"],
drop: ["console.log", "console.info", "console.warn"],
},
build: {
minify: "esbuild",

View file

@ -319,35 +319,43 @@ def run_action(plugin: str, function_name: str = ""):
return {"status": "ok", "code": 200, "data": res}
def is_request_form(url_name: str, next: bool = False):
if not request.form:
flash("Missing form data.", "error")
return redirect(url_for(url_name))
def get_user_info():
return current_user.get_id(), current_user.password_hash, current_user.is_two_factor_enabled, current_user.secret_token
def is_request_params(params: list, url_name: str, next: bool = False):
for param in params:
if param not in request.form:
flash(f"Missing {param} parameter.", "error")
if next:
return redirect(url_for("loading", next=url_for(url_name)))
def verify_data_in_form(data: dict[str, Union[tuple, any]] = {}, err_message: str = "", redirect_url: str = "", next: bool = False) -> Union[bool, Response]:
# Loop on each key in data
for key, values in data.items():
if key not in request.form:
return handle_error(f"Missing {key} in form", redirect_url, next, "error")
return redirect(url_for(url_name))
# Case we want to only check if key is in form, we can skip the values check by setting values to falsy value
if not values:
continue
if request.form[key] not in values:
return handle_error(err_message, redirect_url, next, "error")
return True
def redirect_flash_error(message: str, url_name: str, next: bool = False, log: Union[bool, str] = False):
flash(message, "error")
def handle_error(err_message: str = "", redirect_url: str = "", next: bool = False, log: Union[bool, str] = False) -> Union[bool, Response]:
"""Handle error message, flash it, log it if needed and redirect to redirect_url if provided or return False."""
flash(err_message, "error")
if log == "error":
app.logger.error(message)
app.logger.error(err_message)
if log == "exception":
app.logger.exception(message)
app.logger.exception(err_message)
if not redirect_url:
return False
if next:
return redirect(url_for("loading", next=url_for(url_name)))
return redirect(url_for("loading", next=url_for(redirect_url)))
return redirect(url_for(url_name))
return redirect(url_for(redirect_url))
def error_message(msg: str):
@ -575,9 +583,7 @@ def setup():
if request.method == "POST":
if app.config["DB"].readonly:
return redirect_flash_error("Database is in read-only mode", "setup")
is_request_form("setup")
return handle_error("Database is in read-only mode", "setup")
required_keys = []
if not ui_reverse_proxy:
@ -586,17 +592,17 @@ def setup():
required_keys.extend(["admin_username", "admin_password", "admin_password_check"])
if not any(key in request.form for key in required_keys):
return redirect_flash_error(f"Missing either one of the following parameters: {', '.join(required_keys)}.", "setup")
return handle_error(f"Missing either one of the following parameters: {', '.join(required_keys)}.", "setup")
if not admin_user:
if len(request.form["admin_username"]) > 256:
return redirect_flash_error("The admin username is too long. It must be less than 256 characters.", "setup")
return handle_error("The admin username is too long. It must be less than 256 characters.", "setup")
if request.form["admin_password"] != request.form["admin_password_check"]:
return redirect_flash_error("The passwords do not match.", "setup")
return handle_error("The passwords do not match.", "setup")
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
return redirect_flash_error(
return handle_error(
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).",
"setup",
)
@ -605,21 +611,21 @@ def setup():
ret = app.config["DB"].create_ui_user(request.form["admin_username"], admin_user.password_hash, method="ui")
if ret:
return redirect_flash_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error")
return handle_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error")
flash("The admin user was created successfully", "success")
if not ui_reverse_proxy:
server_names = db_config["SERVER_NAME"].split(" ")
if request.form["server_name"] in server_names:
return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup")
return handle_error(f"The hostname {request.form['server_name']} is already in use.", "setup")
else:
for server_name in server_names:
if request.form["server_name"] in db_config.get(f"{server_name}_SERVER_NAME", "").split(" "):
return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup")
return handle_error(f"The hostname {request.form['server_name']} is already in use.", "setup")
if not REVERSE_PROXY_PATH.match(request.form["ui_host"]):
return redirect_flash_error("The hostname is not valid.", "setup")
return handle_error("The hostname is not valid.", "setup")
ui_data = get_ui_data()
ui_data["RELOADING"] = True
@ -679,12 +685,11 @@ def setup_loading():
@login_required
def totp():
if request.method == "POST":
is_request_form("totp")
is_request_params(["totp_token"], "totp")
verify_data_in_form(data={"totp_token": None}, err_message="No token provided on /totp.", redirect_url="totp")
if not current_user.check_otp(request.form["totp_token"]):
return redirect_flash_error("The token is invalid.", "totp")
return handle_error("The token is invalid.", "totp")
session["totp_validated"] = True
redirect(url_for("loading", next=request.form.get("next") or url_for("home")))
@ -773,19 +778,20 @@ def home():
def account():
if request.method == "POST":
if app.config["DB"].readonly:
return redirect_flash_error("Database is in read-only mode", "account")
return handle_error("Database is in read-only mode", "account")
# Check form data validity
is_request_form("account")
verify_data_in_form(
data={"operation": ("username", "password", "totp", "activate-key")}, err_message="Invalid operation parameter.", redirect_url="account"
)
if request.form["operation"] not in ("username", "password", "totp", "activate-key"):
return redirect_flash_error("Invalid operation parameter.", "account")
return handle_error("Invalid operation parameter.", "account")
if request.form["operation"] == "activate-key":
is_request_params(["license"], "account")
verify_data_in_form(data={"license": None}, err_message="Missing license for operation activate key on /account.", redirect_url="account")
if len(request.form["license"]) == 0:
return redirect_flash_error("The license key is empty", "account")
return handle_error("The license key is empty", "account")
variable = {}
variable["PRO_LICENSE_KEY"] = request.form["license"]
@ -793,15 +799,15 @@ def account():
variable = app.config["CONFIG"].check_variables(variable, {"PRO_LICENSE_KEY": request.form["license"]})
if not variable:
return redirect_flash_error("The license key variable checks returned error", "account", True)
return handle_error("The license key variable checks returned error", "account", True)
# Force job to contact PRO API
# by setting the last check to None
metadata = app.config["DB"].get_metadata()
metadata["last_pro_check"] = None
app.config["DB"].set_metadata(metadata)
app.config["DB"].set_pro_metadata(metadata)
db_metadata = app.config["DB"].get_metadata()
curr_changes = app.config["DB"].check_changes()
# Reload instances
def update_global_config(threaded: bool = False):
@ -818,11 +824,7 @@ def account():
ui_data["PRO_LOADING"] = True
ui_data["CONFIG_CHANGED"] = True
if any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed")
):
if any(curr_changes.values()):
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
Thread(target=update_global_config, args=(True,)).start()
@ -834,10 +836,10 @@ def account():
return redirect(url_for("account"))
is_request_params(["operation", "curr_password"], "account")
verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /account.", redirect_url="account")
if not current_user.check_password(request.form["curr_password"]):
return redirect_flash_error(f"The current password is incorrect. ({request.form['operation']})", "account")
return handle_error(f"The current password is incorrect. ({request.form['operation']})", "account")
username = current_user.get_id()
password = request.form["curr_password"]
@ -845,24 +847,27 @@ def account():
secret_token = current_user.secret_token
if request.form["operation"] == "username":
is_request_params(["admin_username"], "account")
verify_data_in_form(data={"admin_username": None}, err_message="Missing admin username parameter on /account.", redirect_url="account")
if len(request.form["admin_username"]) > 256:
return redirect_flash_error("The admin username is too long. It must be less than 256 characters. (username)", "account")
return handle_error("The admin username is too long. It must be less than 256 characters. (username)", "account")
username = request.form["admin_username"]
logout()
if request.form["operation"] == "password":
is_request_params(["admin_password", "admin_password_check"], "account")
verify_data_in_form(
data={"admin_password": None, "admin_password_check": None},
err_message="Missing admin password or confirm password parameter on /account.",
redirect_url="account",
)
if request.form["admin_password"] != request.form["admin_password_check"]:
return redirect_flash_error("The passwords do not match. (password)", "account")
return handle_error("The passwords do not match. (password)", "account")
if not USER_PASSWORD_RX.match(request.form["admin_password"]):
return redirect_flash_error(
return handle_error(
"The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)",
"account",
)
@ -872,13 +877,12 @@ def account():
logout()
if request.form["operation"] == "totp":
is_request_params(["totp_token"], "account")
verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /account.", redirect_url="account")
ui_data = get_ui_data()
if not current_user.check_otp(request.form["totp_token"], secret=ui_data.get("CURRENT_TOTP_TOKEN", None)):
return redirect_flash_error("The totp token is invalid. (totp)", "account")
return handle_error("The totp token is invalid. (totp)", "account")
session["totp_validated"] = not current_user.is_two_factor_enabled
is_two_factor_enabled = session["totp_validated"]
@ -893,7 +897,7 @@ def account():
username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui"
)
if ret:
return redirect_flash_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
return handle_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error")
flash(
(
@ -920,6 +924,7 @@ def account():
return render_template(
"account.html",
username=current_user.get_id(),
is_totp=current_user.is_two_factor_enabled,
secret_token=secret_token,
totp_qr_image=totp_qr_image,
@ -932,16 +937,20 @@ def instances():
# Manage instances
if request.method == "POST":
is_request_params(["operation", "INSTANCE_ID"], "instances", True)
# Check operation
if request.form["operation"] not in (
"reload",
"start",
"stop",
"restart",
):
return redirect_flash_error("Missing operation parameter on /instances.", "instances")
verify_data_in_form(data={"INSTANCE_ID": None}, err_message="Missing instance id parameter on /instances.", redirect_url="instances", next=True)
verify_data_in_form(
data={
"operation": (
"reload",
"start",
"stop",
"restart",
)
},
err_message="Missing operation parameter on /instances.",
redirect_url="instances",
next=True,
)
ui_data = get_ui_data()
ui_data["RELOADING"] = True
@ -971,123 +980,117 @@ def instances():
return render_template("instances.html", title="Instances", data_server_builder=json.dumps(data_server_builder))
def get_service_data():
config = app.config["DB"].get_config(methods=True, with_drafts=True)
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
operation = variables.pop("operation")
# Delete custom client variables
variables.pop("SECURITY_LEVEL", None)
variables.pop("mode", None)
# Get server name and old one
old_server_name = ""
if variables.get("OLD_SERVER_NAME"):
old_server_name = variables.get("OLD_SERVER_NAME", "")
del variables["OLD_SERVER_NAME"]
server_name = variables["SERVER_NAME"].split(" ")[0] if "SERVER_NAME" in variables else old_server_name
# Get draft if exists
was_draft = config.get(f"{server_name}_IS_DRAFT", {"value": "no"})["value"] == "yes"
is_draft = was_draft if not variables.get("is_draft") else variables.get("is_draft") == "yes"
if variables.get("is_draft"):
del variables["is_draft"]
is_draft_unchanged = is_draft == was_draft
# Get all variables starting with custom_config and delete them from variables
custom_configs = []
config_types = (
"http",
"stream",
"server-http",
"server-stream",
"default-server-http",
"default-server-stream",
"modsec",
"modsec-crs",
"crs-plugins-before",
"crs-plugins-after",
)
for variable in variables:
if variable.startswith("custom_config_"):
custom_configs.append(variable)
del variables[variable]
# custom_config variable format is custom_config_<type>_<filename>
# we want a list of dict with each dict containing type, filename, action and server name
# after getting all configs, we want to save them after the end of current service action
# to avoid create config for none existing service or in case editing server name
format_configs = []
for custom_config in custom_configs:
# first remove custom_config_ prefix
custom_config = custom_config.split("custom_config_")[1]
# then split the config into type, filename, action
custom_config = custom_config.split("_")
# check if the config is valid
if len(custom_config) == 2 and custom_config[0] in config_types:
format_configs.append({"type": custom_config[0], "filename": custom_config[1], "action": operation, "server_name": server_name})
else:
return handle_error(err_message=f"Invalid custom config {custom_config}", redirect_url="services", next=True)
# Edit check fields and remove already existing ones
for variable, value in variables.copy().items():
if (
variable in variables
and variable != "SERVER_NAME"
and value == config.get(f"{server_name}_{variable}" if request.form["operation"] == "edit" else variable, {"value": None})["value"]
):
del variables[variable]
variables = app.config["CONFIG"].check_variables(variables, config)
return config, variables, format_configs, server_name, old_server_name, operation, is_draft, was_draft, is_draft_unchanged
@app.route("/services", methods=["GET", "POST"])
@login_required
def services():
if request.method == "POST":
if app.config["DB"].readonly:
return redirect_flash_error("Database is in read-only mode", "services")
return handle_error("Database is in read-only mode", "services")
is_request_params(["operation", "is_draft"], "services", True)
# Check operation
if request.form["operation"] not in ("new", "edit", "delete"):
return redirect_flash_error("Missing operation parameter on /services.", "services")
if request.form["is_draft"] not in ("yes", "no"):
return redirect_flash_error("Missing is_draft parameter on /services.", "services")
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
# Delete custom client variables
variables.pop("SECURITY_LEVEL", None)
variables.pop("mode", None)
is_draft = variables.pop("is_draft", "no") == "yes"
if "OLD_SERVER_NAME" not in request.form and request.form["operation"] == "edit":
return redirect_flash_error("Missing OLD_SERVER_NAME parameter.", "services", True)
if "SERVER_NAME" not in variables:
variables["SERVER_NAME"] = variables["OLD_SERVER_NAME"]
config = app.config["DB"].get_config(methods=True, with_drafts=True)
server_name = variables["SERVER_NAME"].split(" ")[0]
was_draft = config.get(f"{server_name}_IS_DRAFT", {"value": "no"})["value"] == "yes"
operation = request.form["operation"]
# Get all variables starting with custom_config and delete them from variables
custom_configs = []
config_types = (
"http",
"stream",
"server-http",
"server-stream",
"default-server-http",
"default-server-stream",
"modsec",
"modsec-crs",
"crs-plugins-before",
"crs-plugins-after",
verify_data_in_form(
data={"operation": ("edit", "new", "delete")},
err_message="Invalid operation parameter on /services.",
redirect_url="services",
)
for variable in variables:
if variable.startswith("custom_config_"):
custom_configs.append(variable)
del variables[variable]
config, variables, format_configs, server_name, old_server_name, operation, is_draft, was_draft, is_draft_unchanged = get_service_data()
# custom_config variable format is custom_config_<type>_<filename>
# we want a list of dict with each dict containing type, filename, action and server name
# after getting all configs, we want to save them after the end of current service action
# to avoid create config for none existing service or in case editing server name
format_configs = []
for custom_config in custom_configs:
# first remove custom_config_ prefix
custom_config = custom_config.split("custom_config_")[1]
# then split the config into type, filename, action
custom_config = custom_config.split("_")
# check if the config is valid
if len(custom_config) == 2 and custom_config[0] in config_types:
format_configs.append({"type": custom_config[0], "filename": custom_config[1], "action": operation, "server_name": server_name})
else:
return redirect_flash_error(f"Invalid custom config {custom_config}", "services", True)
if request.form["operation"] == "edit":
if is_draft_unchanged and len(variables) == 1 and "SERVER_NAME" in variables and server_name == old_server_name:
return handle_error("The service was not edited because no values were changed.", "services", True)
if request.form["operation"] in ("new", "edit"):
del variables["operation"]
del variables["OLD_SERVER_NAME"]
# Edit check fields and remove already existing ones
for variable, value in variables.copy().items():
if (
variable in variables
and variable != "SERVER_NAME"
and value == config.get(f"{server_name}_{variable}" if request.form["operation"] == "edit" else variable, {"value": None})["value"]
):
del variables[variable]
variables = app.config["CONFIG"].check_variables(variables, config)
if (
was_draft == is_draft
and request.form["operation"] == "edit"
and len(variables) == 1
and "SERVER_NAME" in variables
and variables["SERVER_NAME"] == request.form.get("OLD_SERVER_NAME", "")
):
return redirect_flash_error("The service was not edited because no values were changed.", "services", True)
elif request.form["operation"] == "new" and not variables:
return redirect_flash_error("The service was not created because all values had the default value.", "services", True)
if request.form["operation"] == "new" and not variables:
return handle_error("The service was not created because all values had the default value.", "services", True)
# Delete
if request.form["operation"] == "delete":
is_request_params(["SERVER_NAME"], "services", True)
is_service = app.config["CONFIG"].check_variables({"SERVER_NAME": request.form["SERVER_NAME"]}, config)
variables = app.config["CONFIG"].check_variables({"SERVER_NAME": request.form["SERVER_NAME"]}, config)
if not variables:
if not is_service:
error_message(f"Error while deleting the service {request.form['SERVER_NAME']}")
if config.get(f"{request.form['SERVER_NAME'].split(' ')[0]}_SERVER_NAME", {"method": "scheduler"})["method"] != "ui":
return redirect_flash_error("The service cannot be deleted because it has not been created with the UI.", "services", True)
return handle_error("The service cannot be deleted because it has not been created with the UI.", "services", True)
db_metadata = app.config["DB"].get_metadata()
old_server_name = request.form.get("OLD_SERVER_NAME", "")
operation = request.form["operation"]
def update_services(threaded: bool = False):
wait_applying()
@ -1185,7 +1188,7 @@ def services():
def global_config():
if request.method == "POST":
if app.config["DB"].readonly:
return redirect_flash_error("Database is in read-only mode", "global_config")
return handle_error("Database is in read-only mode", "global_config")
# Check variables
variables = request.form.to_dict().copy()
@ -1203,7 +1206,7 @@ def global_config():
variables = app.config["CONFIG"].check_variables(variables, config)
if not variables:
return redirect_flash_error("The global configuration was not edited because no values were changed.", "global_config", True)
return handle_error("The global configuration was not edited because no values were changed.", "global_config", True)
for variable, value in variables.copy().items():
for service in services:
@ -1264,31 +1267,28 @@ def configs():
if request.method == "POST":
if app.config["DB"].readonly:
return redirect_flash_error("Database is in read-only mode", "configs")
return handle_error("Database is in read-only mode", "configs")
operation = ""
is_request_params(["operation"], "configs", True)
# Check operation
if request.form["operation"] not in (
"new",
"edit",
"delete",
):
return redirect_flash_error("Operation parameter is invalid on /configs.", "configs", True)
verify_data_in_form(
data={"operation": ("new", "edit", "delete"), "type": "file", "path": None},
err_message="Invalid operation parameter on /configs.",
redirect_url="configs",
next=True,
)
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
if variables["type"] != "file":
return redirect_flash_error("Invalid type parameter on /configs.", "configs", True)
return handle_error("Invalid type parameter on /configs.", "configs", True)
operation = app.config["CONFIGFILES"].check_path(variables["path"])
if operation:
return redirect_flash_error(operation, "configs", True)
return handle_error(operation, "configs", True)
old_name = variables.get("old_name", "").replace(".conf", "")
name = variables.get("name", old_name).replace(".conf", "")
@ -1297,15 +1297,15 @@ def configs():
root_dir = path_exploded[4].replace("-", "_").lower()
if not old_name and not name:
return redirect_flash_error("Missing name parameter on /configs.", "configs", True)
return handle_error("Missing name parameter on /configs.", "configs", True)
index = -1
for i, db_config in enumerate(db_configs):
if db_config["type"] == root_dir and db_config["name"] == name and db_config["service_id"] == service_id:
if request.form["operation"] == "new":
return redirect_flash_error(f"Config {name} already exists{f' for service {service_id}' if service_id else ''}", "configs", True)
return handle_error(f"Config {name} already exists{f' for service {service_id}' if service_id else ''}", "configs", True)
elif db_config["method"] not in ("ui", "manual"):
return redirect_flash_error(
return handle_error(
f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it was not created by the UI or manually",
"configs",
True,
@ -1316,7 +1316,7 @@ def configs():
# New or edit a config
if request.form["operation"] in ("new", "edit"):
if not app.config["CONFIGFILES"].check_name(name):
return redirect_flash_error(
return handle_error(
f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores, dots and hyphens (min 4 characters and max 64))",
"configs",
True,
@ -1329,14 +1329,14 @@ def configs():
operation = f"Created config {name}{f' for service {service_id}' if service_id else ''}"
elif request.form["operation"] == "edit":
if index == -1:
return redirect_flash_error(
return handle_error(
f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True
)
if old_name != name:
db_configs[index]["name"] = name
elif db_configs[index]["data"] == content:
return redirect_flash_error(
return handle_error(
f"Config {name} was not edited because no values were changed{f' for service {service_id}' if service_id else ''}",
"configs",
True,
@ -1348,9 +1348,7 @@ def configs():
# Delete a config
elif request.form["operation"] == "delete":
if index == -1:
return redirect_flash_error(
f"Can't delete config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True
)
return handle_error(f"Can't delete config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True)
del db_configs[index]
operation = f"Deleted config {name}{f' for service {service_id}' if service_id else ''}"
@ -1358,7 +1356,7 @@ def configs():
error = app.config["DB"].save_custom_configs([config for config in db_configs if config["method"] == "ui"], "ui")
if error:
app.logger.error(f"Could not save custom configs: {error}")
return redirect_flash_error("Couldn't save custom configs", "configs", True)
return handle_error("Couldn't save custom configs", "configs", True)
ui_data = get_ui_data()
ui_data["CONFIG_CHANGED"] = True
@ -1388,20 +1386,25 @@ def plugins():
if request.method == "POST":
if app.config["DB"].readonly:
return redirect_flash_error("Database is in read-only mode", "plugins")
return handle_error("Database is in read-only mode", "plugins")
verify_data_in_form(
data={"operation": ("delete"), "type": None},
err_message="Missing type parameter for operation delete on /plugins.",
redirect_url="plugins",
next=True,
)
error = 0
# Delete plugin
if "operation" in request.form and request.form["operation"] == "delete":
is_request_params(["type"], "plugins", True)
if request.form["operation"] == "delete":
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
if variables["type"] in ("core", "pro"):
return redirect_flash_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True)
return handle_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True)
db_metadata = app.config["DB"].get_metadata()
@ -1450,7 +1453,7 @@ def plugins():
else:
# Upload plugins
if not tmp_ui_path.exists() or not listdir(str(tmp_ui_path)):
return redirect_flash_error("Please upload new plugins to reload plugins", "plugins", True)
return handle_error("Please upload new plugins to reload plugins", "plugins", True)
errors = 0
files_count = 0
@ -2189,8 +2192,14 @@ def reports():
@app.route("/bans", methods=["GET", "POST"])
@login_required
def bans():
if request.method == "POST" and app.config["DB"].readonly:
return redirect_flash_error("Database is in read-only mode", "bans")
if request.method == "POST":
if app.config["DB"].readonly:
return handle_error("Database is in read-only mode", "bans")
# Check variables
verify_data_in_form(data={"operation": ("ban", "unban")}, err_message="Invalid operation parameter on /bans.", redirect_url="bans")
verify_data_in_form(data={"data": None}, err_message="Missing data parameter on /bans.", redirect_url="bans")
redis_client = None
db_config = app.config["CONFIG"].get_config(
@ -2280,70 +2289,71 @@ def bans():
redis_client = None
flash("Couldn't connect to redis, ban list might be incomplete", "error")
if request.method == "POST":
# Check variables
is_request_form("bans")
is_request_params(["operation", "data"], "bans")
def get_load_data():
try:
data = json_loads(request.form["data"])
assert isinstance(data, list)
return data
except BaseException:
return redirect_flash_error("Data must be a list of dict", "bans", False, "exception")
return handle_error("Data must be a list of dict", "bans", False, "exception")
if request.form["operation"] not in ("ban", "unban"):
return redirect_flash_error("Operation unknown", "bans")
if request.method == "POST" and request.form["operation"] == "unban":
if request.form["operation"] == "unban":
for unban in data:
data = get_load_data()
for unban in data:
try:
unban = json_loads(unban.replace('"', '"').replace("'", '"'))
except BaseException:
flash(f"Invalid unban: {unban}, skipping it ...", "error")
app.logger.exception(f"Couldn't unban {unban['ip']}")
continue
if "ip" not in unban:
flash(f"Invalid unban: {unban}, skipping it ...", "error")
continue
if redis_client:
if not redis_client.delete(f"bans_ip_{unban['ip']}"):
flash(f"Couldn't unban {unban['ip']} on redis", "error")
resp = app.config["INSTANCES"].unban(unban["ip"])
if resp:
flash(f"Couldn't unban {unban['ip']} on the following instances: {', '.join(resp)}", "error")
else:
flash(f"Successfully unbanned {unban['ip']}")
return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))
if request.method == "POST" and request.form["operation"] == "ban":
data = get_load_data()
for ban in data:
if not isinstance(ban, dict) or "ip" not in ban:
flash(f"Invalid ban: {ban}, skipping it ...", "error")
continue
reason = ban.get("reason", "ui")
ban_end = 86400.0
if "ban_end" in ban:
try:
unban = json_loads(unban.replace('"', '"').replace("'", '"'))
except BaseException:
flash(f"Invalid unban: {unban}, skipping it ...", "error")
app.logger.exception(f"Couldn't unban {unban['ip']}")
ban_end = float(ban["ban_end"])
except ValueError:
continue
ban_end = (datetime.fromtimestamp(ban_end) - datetime.now()).total_seconds()
if "ip" not in unban:
flash(f"Invalid unban: {unban}, skipping it ...", "error")
continue
if redis_client:
ok = redis_client.set(f"bans_ip_{ban['ip']}", dumps({"reason": reason, "date": time()}))
if not ok:
flash(f"Couldn't ban {ban['ip']} on redis", "error")
redis_client.expire(f"bans_ip_{ban['ip']}", int(ban_end))
if redis_client:
if not redis_client.delete(f"bans_ip_{unban['ip']}"):
flash(f"Couldn't unban {unban['ip']} on redis", "error")
resp = app.config["INSTANCES"].unban(unban["ip"])
if resp:
flash(f"Couldn't unban {unban['ip']} on the following instances: {', '.join(resp)}", "error")
else:
flash(f"Successfully unbanned {unban['ip']}")
if request.form["operation"] == "ban":
for ban in data:
if not isinstance(ban, dict) or "ip" not in ban:
flash(f"Invalid ban: {ban}, skipping it ...", "error")
continue
reason = ban.get("reason", "ui")
ban_end = 86400.0
if "ban_end" in ban:
try:
ban_end = float(ban["ban_end"])
except ValueError:
continue
ban_end = (datetime.fromtimestamp(ban_end) - datetime.now()).total_seconds()
if redis_client:
ok = redis_client.set(f"bans_ip_{ban['ip']}", dumps({"reason": reason, "date": time()}))
if not ok:
flash(f"Couldn't ban {ban['ip']} on redis", "error")
redis_client.expire(f"bans_ip_{ban['ip']}", int(ban_end))
resp = app.config["INSTANCES"].ban(ban["ip"], ban_end, reason)
if resp:
flash(f"Couldn't ban {ban['ip']} on the following instances: {', '.join(resp)}", "error")
else:
flash(f"Successfully banned {ban['ip']}")
resp = app.config["INSTANCES"].ban(ban["ip"], ban_end, reason)
if resp:
flash(f"Couldn't ban {ban['ip']} on the following instances: {', '.join(resp)}", "error")
else:
flash(f"Successfully banned {ban['ip']}")
return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))