mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
refactor main.py
This commit is contained in:
parent
fc15facb5e
commit
b2773489e8
2 changed files with 63 additions and 66 deletions
127
src/ui/main.py
127
src/ui/main.py
|
|
@ -292,8 +292,6 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
|
|||
|
||||
|
||||
# UTILS
|
||||
|
||||
|
||||
def is_request_form(url_name: str, next: bool = False):
|
||||
if not request.form:
|
||||
flash("Missing form data.", "error")
|
||||
|
|
@ -311,7 +309,6 @@ def is_request_params(params: list, url_name: str, next: bool = False):
|
|||
|
||||
|
||||
def redirect_flash_error(message: str, url_name: str, next: bool = False, log: Union[bool, str] = False):
|
||||
|
||||
flash(message, "error")
|
||||
|
||||
if log == "error":
|
||||
|
|
@ -326,6 +323,11 @@ def redirect_flash_error(message: str, url_name: str, next: bool = False, log: U
|
|||
return redirect(url_for(url_name))
|
||||
|
||||
|
||||
def error_message(msg: str):
|
||||
app.logger.error(msg)
|
||||
return {"status": "ko", "message": msg}
|
||||
|
||||
|
||||
@app.before_request
|
||||
def generate_nonce():
|
||||
app.config["SCRIPT_NONCE"] = sha256(urandom(32)).hexdigest()
|
||||
|
|
@ -604,6 +606,9 @@ def account():
|
|||
is_two_factor_enabled = current_user.is_two_factor_enabled
|
||||
secret_token = current_user.secret_token
|
||||
|
||||
if request.form["operation"] not in ("username", "password", "totp"):
|
||||
return redirect_flash_error("Invalid operation parameter.", "account")
|
||||
|
||||
if request.form["operation"] == "username":
|
||||
is_request_params(["admin_username"], "account")
|
||||
|
||||
|
|
@ -614,7 +619,8 @@ def account():
|
|||
|
||||
session.clear()
|
||||
logout_user()
|
||||
elif request.form["operation"] == "password":
|
||||
|
||||
if request.form["operation"] == "password":
|
||||
|
||||
is_request_params(["admin_password", "admin_password_check"], "account")
|
||||
|
||||
|
|
@ -630,7 +636,8 @@ def account():
|
|||
|
||||
session.clear()
|
||||
logout_user()
|
||||
elif request.form["operation"] == "totp":
|
||||
|
||||
if request.form["operation"] == "totp":
|
||||
|
||||
is_request_params(["totp_token"], "account")
|
||||
|
||||
|
|
@ -641,8 +648,6 @@ def account():
|
|||
is_two_factor_enabled = session["totp_validated"]
|
||||
secret_token = None if current_user.is_two_factor_enabled else app.config["CURRENT_TOTP_TOKEN"]
|
||||
app.config["CURRENT_TOTP_TOKEN"] = None
|
||||
else:
|
||||
return redirect_flash_error("Invalid operation parameter.", "account")
|
||||
|
||||
user = User(username, password, is_two_factor_enabled=is_two_factor_enabled, secret_token=secret_token, method=current_user.method)
|
||||
ret = db.update_ui_user(username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui")
|
||||
|
|
@ -768,17 +773,17 @@ def services():
|
|||
error = app.config["CONFIG"].check_variables(variables)
|
||||
|
||||
if error:
|
||||
return redirect_flash_error("The config variable checks returned error", "services", True)
|
||||
error_message("The config variable checks returned error")
|
||||
|
||||
# Delete
|
||||
elif request.form["operation"] == "delete":
|
||||
if request.form["operation"] == "delete":
|
||||
|
||||
is_request_params(["SERVER_NAME"], "services", True)
|
||||
|
||||
error = app.config["CONFIG"].check_variables({"SERVER_NAME": request.form["SERVER_NAME"]})
|
||||
|
||||
if error:
|
||||
return redirect(url_for("loading", next=url_for("services")))
|
||||
error_message(f"Error while deleting the service {request.form['SERVER_NAME']}")
|
||||
|
||||
error = 0
|
||||
|
||||
|
|
@ -913,6 +918,7 @@ def configs():
|
|||
if operation:
|
||||
return redirect_flash_error(operation, "configs", True)
|
||||
|
||||
# New or edit a config
|
||||
if request.form["operation"] in ("new", "edit"):
|
||||
if not app.config["CONFIGFILES"].check_name(variables["name"]):
|
||||
return redirect_flash_error(
|
||||
|
|
@ -931,35 +937,36 @@ def configs():
|
|||
|
||||
error = False
|
||||
|
||||
if request.form["operation"] == "new":
|
||||
if variables["type"] == "folder":
|
||||
operation, error = app.config["CONFIGFILES"].create_folder(variables["path"], variables["name"])
|
||||
elif variables["type"] == "file":
|
||||
operation, error = app.config["CONFIGFILES"].create_file(variables["path"], variables["name"], variables["content"])
|
||||
elif request.form["operation"] == "edit":
|
||||
if variables["type"] == "folder":
|
||||
operation, error = app.config["CONFIGFILES"].edit_folder(
|
||||
variables["path"],
|
||||
variables["name"],
|
||||
variables.get("old_name", variables["name"]),
|
||||
)
|
||||
elif variables["type"] == "file":
|
||||
operation, error = app.config["CONFIGFILES"].edit_file(
|
||||
variables["path"],
|
||||
variables["name"],
|
||||
variables.get("old_name", variables["name"]),
|
||||
variables["content"],
|
||||
)
|
||||
if request.form["operation"] == "new" and variables["type"] == "folder":
|
||||
operation, error = app.config["CONFIGFILES"].create_folder(variables["path"], variables["name"])
|
||||
|
||||
if request.form["operation"] == "new" and variables["type"] == "file":
|
||||
operation, error = app.config["CONFIGFILES"].create_file(variables["path"], variables["name"], variables["content"])
|
||||
|
||||
if request.form["operation"] == "edit" and variables["type"] == "file":
|
||||
operation, error = app.config["CONFIGFILES"].edit_file(
|
||||
variables["path"],
|
||||
variables["name"],
|
||||
variables.get("old_name", variables["name"]),
|
||||
variables["content"],
|
||||
)
|
||||
|
||||
if request.form["operation"] == "edit" and variables["type"] == "folder":
|
||||
operation, error = app.config["CONFIGFILES"].edit_folder(
|
||||
variables["path"],
|
||||
variables["name"],
|
||||
variables.get("old_name", variables["name"]),
|
||||
)
|
||||
|
||||
if error:
|
||||
return redirect_flash_error(operation, "configs", True)
|
||||
|
||||
else:
|
||||
# Delete a config
|
||||
if request.form["operation"] == "delete":
|
||||
operation, error = app.config["CONFIGFILES"].delete_path(variables["path"])
|
||||
|
||||
if error:
|
||||
flash(operation, "error")
|
||||
return redirect(url_for("loading", next=url_for("configs")))
|
||||
return redirect_flash_error(operation, "configs", True)
|
||||
|
||||
flash(operation)
|
||||
|
||||
|
|
@ -988,10 +995,14 @@ def configs():
|
|||
def plugins():
|
||||
tmp_ui_path = Path(sep, "var", "tmp", "bunkerweb", "ui")
|
||||
if request.method == "POST":
|
||||
operation = ""
|
||||
error = 0
|
||||
|
||||
if "operation" in request.form and request.form["operation"] == "delete":
|
||||
is_request_params(["operation"], "plugins", True)
|
||||
# Delete plugin
|
||||
if request.form["operation"] == "delete":
|
||||
|
||||
is_request_params(["type"], "plugins", True)
|
||||
|
||||
# Check variables
|
||||
variables = deepcopy(request.form.to_dict())
|
||||
del variables["csrf_token"]
|
||||
|
|
@ -1006,9 +1017,11 @@ def plugins():
|
|||
|
||||
err = db.update_external_plugins(plugins)
|
||||
if err:
|
||||
flash(f"Couldn't update external plugins to database: {err}", "error")
|
||||
flash(f"Deleted plugin {variables['name']} successfully")
|
||||
error_message(f"Couldn't update external plugins to database: {err}")
|
||||
else:
|
||||
flash(f"Deleted plugin {variables['name']} successfully")
|
||||
else:
|
||||
# Upload plugins
|
||||
if not tmp_ui_path.exists() or not listdir(str(tmp_ui_path)):
|
||||
return redirect_flash_error("Please upload new plugins to reload plugins", "plugins", True)
|
||||
|
||||
|
|
@ -1184,9 +1197,8 @@ def plugins():
|
|||
err = db.update_external_plugins(new_plugins, delete_missing=False)
|
||||
if err:
|
||||
flash(f"Couldn't update external plugins to database: {err}", "error")
|
||||
|
||||
if operation:
|
||||
flash(operation)
|
||||
else:
|
||||
flash("Plugins uploaded successfully")
|
||||
|
||||
# Reload instances
|
||||
app.config["RELOADING"] = True
|
||||
|
|
@ -1281,11 +1293,7 @@ def upload_plugin():
|
|||
def custom_plugin(plugin: str):
|
||||
message = ""
|
||||
if not plugin_id_rx.match(plugin):
|
||||
message = f'Invalid plugin id, "{plugin}" (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)'
|
||||
app.logger.error(message)
|
||||
if request.method == "GET":
|
||||
return message, 400
|
||||
return {"message": f'Invalid plugin id, "{plugin}" (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)'}, 400
|
||||
return error_message(f'Invalid plugin id, "{plugin}" (must be between 1 and 64 characters, only letters, numbers, underscores and hyphens)'), 400
|
||||
|
||||
# Case we ware looking for a plugin template
|
||||
# We need to check if a page exists, and if it does, we need to check if the plugin is activated and metrics are on
|
||||
|
|
@ -1295,9 +1303,7 @@ def custom_plugin(plugin: str):
|
|||
page = db.get_plugin_template(plugin)
|
||||
|
||||
if not page:
|
||||
message = f'The plugin "{plugin}" does not have a template'
|
||||
app.logger.error(message)
|
||||
return message, 404
|
||||
return error_message(f'The plugin "{plugin}" does not have a template'), 404
|
||||
|
||||
# Case template, prepare data
|
||||
plugins = app.config["CONFIG"].get_plugins()
|
||||
|
|
@ -1316,9 +1322,7 @@ def custom_plugin(plugin: str):
|
|||
|
||||
# Case no plugin found
|
||||
if plugin_id is None:
|
||||
message = f'Plugin "{plugin}" not found'
|
||||
app.logger.error(message)
|
||||
return message, 404
|
||||
return error_message(f'Plugin "{plugin}" not found'), 404
|
||||
|
||||
config = app.config["CONFIG"].get_config(methods=False)
|
||||
|
||||
|
|
@ -1400,7 +1404,7 @@ def custom_plugin(plugin: str):
|
|||
module = db.get_plugin_actions(plugin)
|
||||
|
||||
if module is None:
|
||||
return {"message": f'The actions.py file for the plugin "{plugin}" does not exist'}, 404
|
||||
return error_message(f'The actions.py file for the plugin "{plugin}" does not exist'), 404
|
||||
|
||||
try:
|
||||
# Try to import the custom plugin
|
||||
|
|
@ -1411,11 +1415,8 @@ def custom_plugin(plugin: str):
|
|||
loader = SourceFileLoader("actions", temp.name)
|
||||
actions = loader.load_module()
|
||||
except:
|
||||
message = f'An error occurred while importing the plugin "{plugin}", see logs for more details'
|
||||
app.logger.exception(message)
|
||||
return {"message": message}, 500
|
||||
return error_message(f'An error occurred while importing the plugin "{plugin}", see logs for more details'), 500
|
||||
|
||||
error = None
|
||||
res = None
|
||||
|
||||
try:
|
||||
|
|
@ -1429,10 +1430,8 @@ def custom_plugin(plugin: str):
|
|||
res = method(app=app)
|
||||
except AttributeError:
|
||||
message = f'The plugin "{plugin}" does not have a "{plugin}" method, see logs for more details'
|
||||
error = 404
|
||||
except:
|
||||
message = f'An error occurred while executing the plugin "{plugin}", see logs for more details'
|
||||
error = 500
|
||||
finally:
|
||||
if sbin_nginx_path.is_file():
|
||||
# Remove the custom plugin from the shared library
|
||||
|
|
@ -1441,11 +1440,7 @@ def custom_plugin(plugin: str):
|
|||
del actions
|
||||
|
||||
if message or not isinstance(res, dict) and not res:
|
||||
message = message or f'The plugin "{plugin}" did not return a valid response'
|
||||
if error:
|
||||
app.logger.exception(message)
|
||||
else:
|
||||
app.logger.error(message)
|
||||
return error_message(message or f'The plugin "{plugin}" did not return a valid response'), 500
|
||||
|
||||
app.logger.info(f"Plugin {plugin} action executed successfully")
|
||||
return jsonify({"message": "ok", "data": res}), 200
|
||||
|
|
@ -1811,6 +1806,9 @@ def bans():
|
|||
except BaseException:
|
||||
return redirect_flash_error("Data must be a list of dict", "bans", False, "exception")
|
||||
|
||||
if request.form["operation"] not in ("ban", "unban"):
|
||||
return redirect_flash_error("Operation unknown", "bans")
|
||||
|
||||
if request.form["operation"] == "unban":
|
||||
for unban in data:
|
||||
try:
|
||||
|
|
@ -1833,7 +1831,8 @@ def bans():
|
|||
flash(f"Couldn't unban {unban['ip']} on the following instances: {', '.join(resp)}", "error")
|
||||
else:
|
||||
flash(f"Successfully unbanned {unban['ip']}")
|
||||
elif request.form["operation"] == "ban":
|
||||
|
||||
if request.form["operation"] == "ban":
|
||||
for ban in data:
|
||||
if not isinstance(ban, dict) or "ip" not in ban:
|
||||
flash(f"Invalid ban: {ban}, skipping it ...", "error")
|
||||
|
|
@ -1859,8 +1858,6 @@ def bans():
|
|||
flash(f"Couldn't ban {ban['ip']} on the following instances: {', '.join(resp)}", "error")
|
||||
else:
|
||||
flash(f"Successfully banned {ban['ip']}")
|
||||
else:
|
||||
return redirect_flash_error("Operation unknown", "bans")
|
||||
|
||||
return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))
|
||||
|
||||
|
|
|
|||
2
src/ui/templates/plugins_modal.html
vendored
2
src/ui/templates/plugins_modal.html
vendored
|
|
@ -23,7 +23,7 @@
|
|||
class="w-full h-full flex flex-col justify-between"
|
||||
id="form-delete-plugin"
|
||||
method="POST">
|
||||
<input type="hidden" name="external" id="external" />
|
||||
<input type="hidden" name="type" id="type" />
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
|
||||
<input type="hidden" value="name" name="name" id="name" />
|
||||
<input type="hidden" value="external" name="type" id="type" />
|
||||
|
|
|
|||
Loading…
Reference in a new issue