Only use the database for custom configs CRUD in the web UI

This commit is contained in:
Théophile Diot 2024-03-22 12:07:40 +00:00
parent b1707d5efe
commit 2d3243ba53
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
3 changed files with 114 additions and 253 deletions

View file

@ -962,7 +962,7 @@ class Database:
def save_custom_configs(
self,
custom_configs: List[Dict[str, Tuple[str, List[str]]]],
custom_configs: List[Dict[str, Union[str, bytes, Tuple[str, List[str]]]]],
method: str,
changed: Optional[bool] = True,
) -> str:
@ -975,59 +975,57 @@ class Database:
to_put = []
endl = "\n"
for custom_config in custom_configs:
config = {
"data": custom_config["value"].encode("utf-8") if isinstance(custom_config["value"], str) else custom_config["value"],
"method": method,
if method != "ui":
config = {
"data": custom_config["value"],
"method": method,
}
assert isinstance(custom_config["exploded"], tuple) and len(custom_config["exploded"]) == 3, "Invalid exploded custom config"
if custom_config["exploded"][0]:
if not session.query(Services).with_entities(Services.id).filter_by(id=custom_config["exploded"][0]).first():
message += f"{endl if message else ''}Service {custom_config['exploded'][0]} not found, please check your config"
config.update(
{
"service_id": custom_config["exploded"][0],
"type": custom_config["exploded"][1],
"name": custom_config["exploded"][2],
}
)
else:
config.update(
{
"type": custom_config["exploded"][1],
"name": custom_config["exploded"][2],
}
)
custom_config = config
custom_config["type"] = custom_config["type"].replace("-", "_").lower() # type: ignore
custom_config["data"] = custom_config["data"].encode("utf-8") if isinstance(custom_config["data"], str) else custom_config["data"]
custom_config["checksum"] = sha256(custom_config["data"]).hexdigest() # type: ignore
service_id = custom_config.get("service_id", None) or None
filters = {
"type": custom_config["type"],
"name": custom_config["name"],
}
config["checksum"] = sha256(config["data"]).hexdigest()
if custom_config["exploded"][0]:
if not session.query(Services).with_entities(Services.id).filter_by(id=custom_config["exploded"][0]).first():
message += f"{endl if message else ''}Service {custom_config['exploded'][0]} not found, please check your config"
if service_id:
filters["service_id"] = service_id
config.update(
{
"service_id": custom_config["exploded"][0],
"type": custom_config["exploded"][1].replace("-", "_").lower(),
"name": custom_config["exploded"][2],
}
)
else:
config.update(
{
"type": custom_config["exploded"][1].replace("-", "_").lower(),
"name": custom_config["exploded"][2],
}
)
custom_conf = (
session.query(Custom_configs)
.with_entities(Custom_configs.checksum, Custom_configs.method)
.filter_by(
service_id=config.get("service_id", None),
type=config["type"],
name=config["name"],
)
.first()
)
custom_conf = session.query(Custom_configs).with_entities(Custom_configs.checksum, Custom_configs.method).filter_by(**filters).first()
if not custom_conf:
to_put.append(Custom_configs(**config))
elif config["checksum"] != custom_conf.checksum and method in (
custom_conf.method,
"autoconf",
):
session.query(Custom_configs).filter(
Custom_configs.service_id == config.get("service_id", None),
Custom_configs.type == config["type"],
Custom_configs.name == config["name"],
).update(
{
Custom_configs.data: config["data"],
Custom_configs.checksum: config["checksum"],
}
| ({Custom_configs.method: "autoconf"} if method == "autoconf" else {})
)
to_put.append(Custom_configs(**custom_config))
elif custom_config["checksum"] != custom_conf.checksum and method in (custom_conf.method, "autoconf"):
custom_conf.data = custom_config["data"]
custom_conf.checksum = custom_config["checksum"]
if method == "autoconf":
custom_conf.method = method
if changed:
with suppress(ProgrammingError, OperationalError):
metadata = session.query(Metadata).get(1)

View file

@ -195,7 +195,7 @@ try:
DEBUG=True,
INSTANCES=Instances(docker_client, kubernetes_client, INTEGRATION),
CONFIG=Config(db),
CONFIGFILES=ConfigFiles(app.logger, db),
CONFIGFILES=ConfigFiles(),
WTF_CSRF_SSL_STRICT=False,
USER=USER,
SEND_FILE_MAX_AGE_DEFAULT=86400,
@ -255,14 +255,8 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
app.config["TO_FLASH"].append({"content": operation, "type": "success"})
if (was_draft != is_draft or not is_draft) and (moved or deleted):
changes = ["config", "custom_configs"]
error = app.config["CONFIGFILES"].save_configs(check_changes=False)
if error:
app.config["TO_FLASH"].append({"content": error, "type": "error"})
changes.pop()
# update changes in db
ret = db.checked_changes(changes, value=True)
ret = db.checked_changes(["config", "custom_configs"], value=True)
if ret:
app.logger.error(f"Couldn't set the changes to checked in the database: {ret}")
app.config["TO_FLASH"].append(
@ -1069,6 +1063,8 @@ def global_config():
@app.route("/configs", methods=["GET", "POST"])
@login_required
def configs():
db_configs = db.get_custom_configs()
if request.method == "POST":
operation = ""
@ -1080,74 +1076,92 @@ def configs():
"edit",
"delete",
):
return redirect_flash_error("Missing operation parameter on /configs.", "configs", True)
return redirect_flash_error("Operation parameter is invalid on /configs.", "configs", True)
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
if variables["type"] != "file":
return redirect_flash_error("Invalid type parameter on /configs.", "configs", True)
operation = app.config["CONFIGFILES"].check_path(variables["path"])
if operation:
return redirect_flash_error(operation, "configs", True)
old_name = variables.get("old_name", "").replace(".conf", "")
name = variables.get("name", old_name).replace(".conf", "")
path_exploded = variables["path"].split(sep)
service_id = (path_exploded[5] if len(path_exploded) > 6 else None) or None
root_dir = path_exploded[4].replace("-", "_").lower()
if not old_name and not name:
return redirect_flash_error("Missing name parameter on /configs.", "configs", True)
index = -1
for i, db_config in enumerate(db_configs):
if db_config["type"] == root_dir and db_config["name"] == name and db_config["service_id"] == service_id:
if request.form["operation"] == "new":
return redirect_flash_error(f"Config {name} already exists{f' for service {service_id}' if service_id else ''}", "configs", True)
elif db_config["method"] not in ("ui", "manual"):
return redirect_flash_error(
f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it was not created by the UI or manually",
"configs",
True,
)
index = i
break
# New or edit a config
if request.form["operation"] in ("new", "edit"):
if not app.config["CONFIGFILES"].check_name(variables["name"]):
if not app.config["CONFIGFILES"].check_name(name):
return redirect_flash_error(
f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores, dots and hyphens (min 4 characters and max 64))",
"configs",
True,
)
if variables["type"] == "file":
variables["name"] = f"{variables['name']}.conf"
content = BeautifulSoup(variables["content"], "html.parser").get_text()
if "old_name" in variables:
variables["old_name"] = f"{variables['old_name']}.conf"
if request.form["operation"] == "new":
db_configs.append({"type": root_dir, "name": name, "service_id": service_id, "data": content, "method": "ui"})
operation = f"Created config {name}{f' for service {service_id}' if service_id else ''}"
elif request.form["operation"] == "edit":
if index == -1:
return redirect_flash_error(
f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True
)
variables["content"] = BeautifulSoup(variables["content"], "html.parser").get_text()
if old_name != name:
db_configs[index]["name"] = name
elif db_configs[index]["data"] == content:
return redirect_flash_error(
f"Config {name} was not edited because no values were changed{f' for service {service_id}' if service_id else ''}",
"configs",
True,
)
error = False
if request.form["operation"] == "new" and variables["type"] == "folder":
operation, error = app.config["CONFIGFILES"].create_folder(variables["path"], variables["name"])
if request.form["operation"] == "new" and variables["type"] == "file":
operation, error = app.config["CONFIGFILES"].create_file(variables["path"], variables["name"], variables["content"])
if request.form["operation"] == "edit" and variables["type"] == "file":
operation, error = app.config["CONFIGFILES"].edit_file(
variables["path"],
variables["name"],
variables.get("old_name", variables["name"]),
variables["content"],
)
if request.form["operation"] == "edit" and variables["type"] == "folder":
operation, error = app.config["CONFIGFILES"].edit_folder(
variables["path"],
variables["name"],
variables.get("old_name", variables["name"]),
)
if error:
return redirect_flash_error(operation, "configs", True)
db_configs[index]["data"] = content
operation = f"Edited config {name}{f' for service {service_id}' if service_id else ''}"
# Delete a config
if request.form["operation"] == "delete":
operation, error = app.config["CONFIGFILES"].delete_path(variables["path"])
elif request.form["operation"] == "delete":
if index == -1:
return redirect_flash_error(
f"Can't delete config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True
)
if error:
return redirect_flash_error(operation, "configs", True)
del db_configs[index]
operation = f"Deleted config {name}{f' for service {service_id}' if service_id else ''}"
error = db.save_custom_configs([config for config in db_configs if config["method"] == "ui"], "ui")
if error:
app.logger.error(f"Could not save custom configs: {error}")
return redirect_flash_error("Couldn't save custom configs", "configs", True)
flash(operation)
error = app.config["CONFIGFILES"].save_configs()
if error:
return redirect_flash_error("Couldn't save custom configs to disk", "configs", True)
return redirect(url_for("loading", next=url_for("configs")))
return render_template(
@ -1155,7 +1169,7 @@ def configs():
folders=[
path_to_dict(
join(sep, "etc", "bunkerweb", "configs"),
db_data=db.get_custom_configs(),
db_data=db_configs,
services=app.config["CONFIG"].get_config(methods=False).get("SERVER_NAME", "").split(" "),
)
],

View file

@ -1,84 +1,18 @@
#!/usr/bin/env python3
from glob import glob
from os import listdir, replace, sep, walk
from os.path import basename, dirname, join
from os import sep
from os.path import join
from pathlib import Path
from re import compile as re_compile
from shutil import rmtree, move as shutil_move
from typing import Any, Dict, List, Tuple
from utils import path_to_dict
def generate_custom_configs(
custom_configs: List[Dict[str, Any]],
*,
original_path: Path = Path(sep, "etc", "bunkerweb", "configs"),
):
original_path.mkdir(parents=True, exist_ok=True)
for custom_config in custom_configs:
tmp_path = original_path.joinpath(custom_config["type"].replace("_", "-"))
if custom_config["service_id"]:
tmp_path = tmp_path.joinpath(custom_config["service_id"])
tmp_path = tmp_path.joinpath(f"{custom_config['name']}.conf")
tmp_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path.write_bytes(custom_config["data"])
class ConfigFiles:
def __init__(self, logger, db):
def __init__(self):
self.__name_regex = re_compile(r"^[\w.-]{4,64}$")
self.__root_dirs = [child["name"] for child in path_to_dict(join(sep, "etc", "bunkerweb", "configs"))["children"]]
self.__file_creation_blacklist = ["http", "stream"]
self.__logger = logger
self.__db = db
if not Path(sep, "usr", "sbin", "nginx").is_file():
custom_configs = self.__db.get_custom_configs()
if custom_configs:
self.__logger.info("Refreshing custom configs ...")
# Remove old custom configs files
for file in glob(join(sep, "etc", "bunkerweb", "configs", "*", "*")):
file = Path(file)
if file.is_symlink() or file.is_file():
file.unlink()
elif file.is_dir():
rmtree(str(file), ignore_errors=True)
generate_custom_configs(custom_configs)
self.__logger.info("Custom configs refreshed successfully")
def save_configs(self, *, check_changes: bool = True) -> str:
custom_configs = []
configs_path = join(sep, "etc", "bunkerweb", "configs")
root_dirs = listdir(configs_path)
for root, dirs, files in walk(configs_path):
if files or (dirs and basename(root) not in root_dirs):
path_exploded = root.split("/")
for file in files:
# root_dirs is index 4 on path exploded
# in case this is a service config, index 5 is the service id and index 6 is the config name
# else index 5 is the config name
service_id = path_exploded[5] if len(path_exploded) >= 6 else None
root_dir = path_exploded[4]
path_result = (service_id, root_dir, file.replace(".conf", ""))
with open(join(root, file), "r", encoding="utf-8") as f:
custom_configs.append(
{
"value": f.read(),
"exploded": path_result,
}
)
print("custom config", custom_configs, flush=True)
err = self.__db.save_custom_configs(custom_configs, "ui", changed=check_changes)
if err:
self.__logger.error(f"Could not save custom configs: {err}")
return "Couldn't save custom configs to database"
return ""
def check_name(self, name: str) -> bool:
return self.__name_regex.match(name) is not None
@ -104,88 +38,3 @@ class ConfigFiles:
return f"{join(root_path, root_dir, '/'.join(dirs.split('/')[0:-x]))} doesn't exist"
return ""
def delete_path(self, path: str) -> Tuple[str, int]:
try:
path: Path = Path(path)
if path.is_file():
path.unlink()
elif path.is_dir():
rmtree(path, ignore_errors=False)
else:
path = Path(f"{path}.conf")
if path.is_file():
path.unlink()
else:
rmtree(path, ignore_errors=False)
except OSError:
return f"Could not delete {path}", 1
return f"{path} was successfully deleted", 0
def create_folder(self, path: str, name: str) -> Tuple[str, int]:
folder_path = join(path, name) if not path.endswith(name) else path
try:
Path(folder_path).mkdir(parents=True)
except OSError:
return f"Could not create {folder_path}", 1
return f"The folder {folder_path} was successfully created", 0
def create_file(self, path: str, name: str, content: str) -> Tuple[str, int]:
file_path = Path(path, name)
file_path.parent.mkdir(exist_ok=True)
file_path.write_text(content, encoding="utf-8")
return f"The file {file_path} was successfully created", 0
def edit_folder(self, path: str, name: str, old_name: str) -> Tuple[str, int]:
new_folder_path = join(dirname(path), name)
old_folder_path = join(dirname(path), old_name)
if old_folder_path == new_folder_path:
return (
f"{old_folder_path} was not renamed because the name didn't change",
0,
)
try:
shutil_move(old_folder_path, new_folder_path)
except OSError:
return f"Could not move {old_folder_path}", 1
return (
f"The folder {old_folder_path} was successfully renamed to {new_folder_path}",
0,
)
def edit_file(self, path: str, name: str, old_name: str, content: str) -> Tuple[str, int]:
new_path = join(dirname(path), name)
old_path = join(dirname(path), old_name)
try:
file_content = Path(old_path).read_text(encoding="utf-8")
except FileNotFoundError:
return f"Could not find {old_path}", 1
if old_path == new_path and file_content == content:
return (
f"{old_path} was not edited because the content and the name didn't change",
0,
)
elif file_content == content:
try:
replace(path, new_path)
return f"{old_path} was successfully renamed to {new_path}", 0
except OSError:
return f"Could not rename {old_path} into {new_path}", 1
elif old_path == new_path:
new_path = old_path
else:
try:
Path(old_path).unlink()
except OSError:
return f"Could not remove {old_path}", 1
Path(new_path).write_text(content, encoding="utf-8")
return f"The file {old_path} was successfully edited", 0