refactor main.py

This commit is contained in:
Jordan Blasenhauer 2024-02-22 21:20:25 +01:00
parent fc15facb5e
commit b2773489e8
2 changed files with 63 additions and 66 deletions

View file

@ -292,8 +292,6 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
# UTILS
def is_request_form(url_name: str, next: bool = False):
if not request.form:
flash("Missing form data.", "error")
@ -311,7 +309,6 @@ def is_request_params(params: list, url_name: str, next: bool = False):
def redirect_flash_error(message: str, url_name: str, next: bool = False, log: Union[bool, str] = False):
flash(message, "error")
if log == "error":
@ -326,6 +323,11 @@ def redirect_flash_error(message: str, url_name: str, next: bool = False, log: U
return redirect(url_for(url_name))
def error_message(msg: str):
app.logger.error(msg)
return {"status": "ko", "message": msg}
@app.before_request
def generate_nonce():
app.config["SCRIPT_NONCE"] = sha256(urandom(32)).hexdigest()
@ -604,6 +606,9 @@ def account():
is_two_factor_enabled = current_user.is_two_factor_enabled
secret_token = current_user.secret_token
if request.form["operation"] not in ("username", "password", "totp"):
return redirect_flash_error("Invalid operation parameter.", "account")
if request.form["operation"] == "username":
is_request_params(["admin_username"], "account")
@ -614,7 +619,8 @@ def account():
session.clear()
logout_user()
elif request.form["operation"] == "password":
if request.form["operation"] == "password":
is_request_params(["admin_password", "admin_password_check"], "account")
@ -630,7 +636,8 @@ def account():
session.clear()
logout_user()
elif request.form["operation"] == "totp":
if request.form["operation"] == "totp":
is_request_params(["totp_token"], "account")
@ -641,8 +648,6 @@ def account():
is_two_factor_enabled = session["totp_validated"]
secret_token = None if current_user.is_two_factor_enabled else app.config["CURRENT_TOTP_TOKEN"]
app.config["CURRENT_TOTP_TOKEN"] = None
else:
return redirect_flash_error("Invalid operation parameter.", "account")
user = User(username, password, is_two_factor_enabled=is_two_factor_enabled, secret_token=secret_token, method=current_user.method)
ret = db.update_ui_user(username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
@ -768,17 +773,17 @@ def services():
error = app.config["CONFIG"].check_variables(variables)
if error:
return redirect_flash_error("The config variable checks returned error", "services", True)
error_message("The config variable checks returned error")
# Delete
elif request.form["operation"] == "delete":
if request.form["operation"] == "delete":
is_request_params(["SERVER_NAME"], "services", True)
error = app.config["CONFIG"].check_variables({"SERVER_NAME": request.form["SERVER_NAME"]})
if error:
return redirect(url_for("loading", next=url_for("services")))
error_message(f"Error while deleting the service {request.form['SERVER_NAME']}")
error = 0
@ -913,6 +918,7 @@ def configs():
if operation:
return redirect_flash_error(operation, "configs", True)
# New or edit a config
if request.form["operation"] in ("new", "edit"):
if not app.config["CONFIGFILES"].check_name(variables["name"]):
return redirect_flash_error(
@ -931,35 +937,36 @@ def configs():
error = False
if request.form["operation"] == "new":
if variables["type"] == "folder":
operation, error = app.config["CONFIGFILES"].create_folder(variables["path"], variables["name"])
elif variables["type"] == "file":
operation, error = app.config["CONFIGFILES"].create_file(variables["path"], variables["name"], variables["content"])
elif request.form["operation"] == "edit":
if variables["type"] == "folder":
operation, error = app.config["CONFIGFILES"].edit_folder(
variables["path"],
variables["name"],
variables.get("old_name", variables["name"]),
)
elif variables["type"] == "file":
operation, error = app.config["CONFIGFILES"].edit_file(
variables["path"],
variables["name"],
variables.get("old_name", variables["name"]),
variables["content"],
)
if request.form["operation"] == "new" and variables["type"] == "folder":
operation, error = app.config["CONFIGFILES"].create_folder(variables["path"], variables["name"])
if request.form["operation"] == "new" and variables["type"] == "file":
operation, error = app.config["CONFIGFILES"].create_file(variables["path"], variables["name"], variables["content"])
if request.form["operation"] == "edit" and variables["type"] == "file":
operation, error = app.config["CONFIGFILES"].edit_file(
variables["path"],
variables["name"],
variables.get("old_name", variables["name"]),
variables["content"],
)
if request.form["operation"] == "edit" and variables["type"] == "folder":
operation, error = app.config["CONFIGFILES"].edit_folder(
variables["path"],
variables["name"],
variables.get("old_name", variables["name"]),
)
if error:
return redirect_flash_error(operation, "configs", True)
else:
# Delete a config
if request.form["operation"] == "delete":
operation, error = app.config["CONFIGFILES"].delete_path(variables["path"])
if error:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("configs")))
return redirect_flash_error(operation, "configs", True)
flash(operation)
@ -988,10 +995,14 @@ def configs():
def plugins():
tmp_ui_path = Path(sep, "var", "tmp", "bunkerweb", "ui")
if request.method == "POST":
operation = ""
error = 0
if "operation" in request.form and request.form["operation"] == "delete":
is_request_params(["operation"], "plugins", True)
# Delete plugin
if request.form["operation"] == "delete":
is_request_params(["type"], "plugins", True)
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
@ -1006,9 +1017,11 @@ def plugins():
err = db.update_external_plugins(plugins)
if err:
flash(f"Couldn't update external plugins to database: {err}", "error")
flash(f"Deleted plugin {variables['name']} successfully")
error_message(f"Couldn't update external plugins to database: {err}")
else:
flash(f"Deleted plugin {variables['name']} successfully")
else:
# Upload plugins
if not tmp_ui_path.exists() or not listdir(str(tmp_ui_path)):
return redirect_flash_error("Please upload new plugins to reload plugins", "plugins", True)
@ -1184,9 +1197,8 @@ def plugins():
err = db.update_external_plugins(new_plugins, delete_missing=False)
if err:
flash(f"Couldn't update external plugins to database: {err}", "error")
if operation:
flash(operation)
else:
flash("Plugins uploaded successfully")
# Reload instances
app.config["RELOADING"] = True
@ -1281,11 +1293,7 @@ def upload_plugin():
def custom_plugin(plugin: str):
message = ""
if not plugin_id_rx.match(plugin):
message = f'Invalid plugin id, "{plugin}" (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)'
app.logger.error(message)
if request.method == "GET":
return message, 400
return {"message": f'Invalid plugin id, "{plugin}" (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)'}, 400
return error_message(f'Invalid plugin id, "{plugin}" (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)'), 400
# Case we ware looking for a plugin template
# We need to check if a page exists, and if it does, we need to check if the plugin is activated and metrics are on
@ -1295,9 +1303,7 @@ def custom_plugin(plugin: str):
page = db.get_plugin_template(plugin)
if not page:
message = f'The plugin "{plugin}" does not have a template'
app.logger.error(message)
return message, 404
return error_message(f'The plugin "{plugin}" does not have a template'), 404
# Case template, prepare data
plugins = app.config["CONFIG"].get_plugins()
@ -1316,9 +1322,7 @@ def custom_plugin(plugin: str):
# Case no plugin found
if plugin_id is None:
message = f'Plugin "{plugin}" not found'
app.logger.error(message)
return message, 404
return error_message(f'Plugin "{plugin}" not found'), 404
config = app.config["CONFIG"].get_config(methods=False)
@ -1400,7 +1404,7 @@ def custom_plugin(plugin: str):
module = db.get_plugin_actions(plugin)
if module is None:
return {"message": f'The actions.py file for the plugin "{plugin}" does not exist'}, 404
return error_message(f'The actions.py file for the plugin "{plugin}" does not exist'), 404
try:
# Try to import the custom plugin
@ -1411,11 +1415,8 @@ def custom_plugin(plugin: str):
loader = SourceFileLoader("actions", temp.name)
actions = loader.load_module()
except:
message = f'An error occurred while importing the plugin "{plugin}", see logs for more details'
app.logger.exception(message)
return {"message": message}, 500
return error_message(f'An error occurred while importing the plugin "{plugin}", see logs for more details'), 500
error = None
res = None
try:
@ -1429,10 +1430,8 @@ def custom_plugin(plugin: str):
res = method(app=app)
except AttributeError:
message = f'The plugin "{plugin}" does not have a "{plugin}" method, see logs for more details'
error = 404
except:
message = f'An error occurred while executing the plugin "{plugin}", see logs for more details'
error = 500
finally:
if sbin_nginx_path.is_file():
# Remove the custom plugin from the shared library
@ -1441,11 +1440,7 @@ def custom_plugin(plugin: str):
del actions
if message or not isinstance(res, dict) and not res:
message = message or f'The plugin "{plugin}" did not return a valid response'
if error:
app.logger.exception(message)
else:
app.logger.error(message)
return error_message(message or f'The plugin "{plugin}" did not return a valid response'), 500
app.logger.info(f"Plugin {plugin} action executed successfully")
return jsonify({"message": "ok", "data": res}), 200
@ -1811,6 +1806,9 @@ def bans():
except BaseException:
return redirect_flash_error("Data must be a list of dict", "bans", False, "exception")
if request.form["operation"] not in ("ban", "unban"):
return redirect_flash_error("Operation unknown", "bans")
if request.form["operation"] == "unban":
for unban in data:
try:
@ -1833,7 +1831,8 @@ def bans():
flash(f"Couldn't unban {unban['ip']} on the following instances: {', '.join(resp)}", "error")
else:
flash(f"Successfully unbanned {unban['ip']}")
elif request.form["operation"] == "ban":
if request.form["operation"] == "ban":
for ban in data:
if not isinstance(ban, dict) or "ip" not in ban:
flash(f"Invalid ban: {ban}, skipping it ...", "error")
@ -1859,8 +1858,6 @@ def bans():
flash(f"Couldn't ban {ban['ip']} on the following instances: {', '.join(resp)}", "error")
else:
flash(f"Successfully banned {ban['ip']}")
else:
return redirect_flash_error("Operation unknown", "bans")
return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))

View file

@ -23,7 +23,7 @@
class="w-full h-full flex flex-col justify-between"
id="form-delete-plugin"
method="POST">
<input type="hidden" name="external" id="external" />
<input type="hidden" name="type" id="type" />
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
<input type="hidden" value="name" name="name" id="name" />
<input type="hidden" value="external" name="type" id="type" />