mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Only use the database for custom configs CRUD in the web UI
This commit is contained in:
parent
b1707d5efe
commit
2d3243ba53
3 changed files with 114 additions and 253 deletions
|
|
@ -962,7 +962,7 @@ class Database:
|
|||
|
||||
def save_custom_configs(
|
||||
self,
|
||||
custom_configs: List[Dict[str, Tuple[str, List[str]]]],
|
||||
custom_configs: List[Dict[str, Union[str, bytes, Tuple[str, List[str]]]]],
|
||||
method: str,
|
||||
changed: Optional[bool] = True,
|
||||
) -> str:
|
||||
|
|
@ -975,59 +975,57 @@ class Database:
|
|||
to_put = []
|
||||
endl = "\n"
|
||||
for custom_config in custom_configs:
|
||||
config = {
|
||||
"data": custom_config["value"].encode("utf-8") if isinstance(custom_config["value"], str) else custom_config["value"],
|
||||
"method": method,
|
||||
if method != "ui":
|
||||
config = {
|
||||
"data": custom_config["value"],
|
||||
"method": method,
|
||||
}
|
||||
assert isinstance(custom_config["exploded"], tuple) and len(custom_config["exploded"]) == 3, "Invalid exploded custom config"
|
||||
|
||||
if custom_config["exploded"][0]:
|
||||
if not session.query(Services).with_entities(Services.id).filter_by(id=custom_config["exploded"][0]).first():
|
||||
message += f"{endl if message else ''}Service {custom_config['exploded'][0]} not found, please check your config"
|
||||
|
||||
config.update(
|
||||
{
|
||||
"service_id": custom_config["exploded"][0],
|
||||
"type": custom_config["exploded"][1],
|
||||
"name": custom_config["exploded"][2],
|
||||
}
|
||||
)
|
||||
else:
|
||||
config.update(
|
||||
{
|
||||
"type": custom_config["exploded"][1],
|
||||
"name": custom_config["exploded"][2],
|
||||
}
|
||||
)
|
||||
|
||||
custom_config = config
|
||||
|
||||
custom_config["type"] = custom_config["type"].replace("-", "_").lower() # type: ignore
|
||||
custom_config["data"] = custom_config["data"].encode("utf-8") if isinstance(custom_config["data"], str) else custom_config["data"]
|
||||
custom_config["checksum"] = sha256(custom_config["data"]).hexdigest() # type: ignore
|
||||
|
||||
service_id = custom_config.get("service_id", None) or None
|
||||
filters = {
|
||||
"type": custom_config["type"],
|
||||
"name": custom_config["name"],
|
||||
}
|
||||
config["checksum"] = sha256(config["data"]).hexdigest()
|
||||
|
||||
if custom_config["exploded"][0]:
|
||||
if not session.query(Services).with_entities(Services.id).filter_by(id=custom_config["exploded"][0]).first():
|
||||
message += f"{endl if message else ''}Service {custom_config['exploded'][0]} not found, please check your config"
|
||||
if service_id:
|
||||
filters["service_id"] = service_id
|
||||
|
||||
config.update(
|
||||
{
|
||||
"service_id": custom_config["exploded"][0],
|
||||
"type": custom_config["exploded"][1].replace("-", "_").lower(),
|
||||
"name": custom_config["exploded"][2],
|
||||
}
|
||||
)
|
||||
else:
|
||||
config.update(
|
||||
{
|
||||
"type": custom_config["exploded"][1].replace("-", "_").lower(),
|
||||
"name": custom_config["exploded"][2],
|
||||
}
|
||||
)
|
||||
|
||||
custom_conf = (
|
||||
session.query(Custom_configs)
|
||||
.with_entities(Custom_configs.checksum, Custom_configs.method)
|
||||
.filter_by(
|
||||
service_id=config.get("service_id", None),
|
||||
type=config["type"],
|
||||
name=config["name"],
|
||||
)
|
||||
.first()
|
||||
)
|
||||
custom_conf = session.query(Custom_configs).with_entities(Custom_configs.checksum, Custom_configs.method).filter_by(**filters).first()
|
||||
|
||||
if not custom_conf:
|
||||
to_put.append(Custom_configs(**config))
|
||||
elif config["checksum"] != custom_conf.checksum and method in (
|
||||
custom_conf.method,
|
||||
"autoconf",
|
||||
):
|
||||
session.query(Custom_configs).filter(
|
||||
Custom_configs.service_id == config.get("service_id", None),
|
||||
Custom_configs.type == config["type"],
|
||||
Custom_configs.name == config["name"],
|
||||
).update(
|
||||
{
|
||||
Custom_configs.data: config["data"],
|
||||
Custom_configs.checksum: config["checksum"],
|
||||
}
|
||||
| ({Custom_configs.method: "autoconf"} if method == "autoconf" else {})
|
||||
)
|
||||
to_put.append(Custom_configs(**custom_config))
|
||||
elif custom_config["checksum"] != custom_conf.checksum and method in (custom_conf.method, "autoconf"):
|
||||
custom_conf.data = custom_config["data"]
|
||||
custom_conf.checksum = custom_config["checksum"]
|
||||
|
||||
if method == "autoconf":
|
||||
custom_conf.method = method
|
||||
if changed:
|
||||
with suppress(ProgrammingError, OperationalError):
|
||||
metadata = session.query(Metadata).get(1)
|
||||
|
|
|
|||
114
src/ui/main.py
114
src/ui/main.py
|
|
@ -195,7 +195,7 @@ try:
|
|||
DEBUG=True,
|
||||
INSTANCES=Instances(docker_client, kubernetes_client, INTEGRATION),
|
||||
CONFIG=Config(db),
|
||||
CONFIGFILES=ConfigFiles(app.logger, db),
|
||||
CONFIGFILES=ConfigFiles(),
|
||||
WTF_CSRF_SSL_STRICT=False,
|
||||
USER=USER,
|
||||
SEND_FILE_MAX_AGE_DEFAULT=86400,
|
||||
|
|
@ -255,14 +255,8 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
|
|||
app.config["TO_FLASH"].append({"content": operation, "type": "success"})
|
||||
|
||||
if (was_draft != is_draft or not is_draft) and (moved or deleted):
|
||||
changes = ["config", "custom_configs"]
|
||||
error = app.config["CONFIGFILES"].save_configs(check_changes=False)
|
||||
if error:
|
||||
app.config["TO_FLASH"].append({"content": error, "type": "error"})
|
||||
changes.pop()
|
||||
|
||||
# update changes in db
|
||||
ret = db.checked_changes(changes, value=True)
|
||||
ret = db.checked_changes(["config", "custom_configs"], value=True)
|
||||
if ret:
|
||||
app.logger.error(f"Couldn't set the changes to checked in the database: {ret}")
|
||||
app.config["TO_FLASH"].append(
|
||||
|
|
@ -1069,6 +1063,8 @@ def global_config():
|
|||
@app.route("/configs", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def configs():
|
||||
db_configs = db.get_custom_configs()
|
||||
|
||||
if request.method == "POST":
|
||||
operation = ""
|
||||
|
||||
|
|
@ -1080,74 +1076,92 @@ def configs():
|
|||
"edit",
|
||||
"delete",
|
||||
):
|
||||
return redirect_flash_error("Missing operation parameter on /configs.", "configs", True)
|
||||
return redirect_flash_error("Operation parameter is invalid on /configs.", "configs", True)
|
||||
|
||||
# Check variables
|
||||
variables = deepcopy(request.form.to_dict())
|
||||
del variables["csrf_token"]
|
||||
|
||||
if variables["type"] != "file":
|
||||
return redirect_flash_error("Invalid type parameter on /configs.", "configs", True)
|
||||
|
||||
operation = app.config["CONFIGFILES"].check_path(variables["path"])
|
||||
|
||||
if operation:
|
||||
return redirect_flash_error(operation, "configs", True)
|
||||
|
||||
old_name = variables.get("old_name", "").replace(".conf", "")
|
||||
name = variables.get("name", old_name).replace(".conf", "")
|
||||
path_exploded = variables["path"].split(sep)
|
||||
service_id = (path_exploded[5] if len(path_exploded) > 6 else None) or None
|
||||
root_dir = path_exploded[4].replace("-", "_").lower()
|
||||
|
||||
if not old_name and not name:
|
||||
return redirect_flash_error("Missing name parameter on /configs.", "configs", True)
|
||||
|
||||
index = -1
|
||||
for i, db_config in enumerate(db_configs):
|
||||
if db_config["type"] == root_dir and db_config["name"] == name and db_config["service_id"] == service_id:
|
||||
if request.form["operation"] == "new":
|
||||
return redirect_flash_error(f"Config {name} already exists{f' for service {service_id}' if service_id else ''}", "configs", True)
|
||||
elif db_config["method"] not in ("ui", "manual"):
|
||||
return redirect_flash_error(
|
||||
f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it was not created by the UI or manually",
|
||||
"configs",
|
||||
True,
|
||||
)
|
||||
index = i
|
||||
break
|
||||
|
||||
# New or edit a config
|
||||
if request.form["operation"] in ("new", "edit"):
|
||||
if not app.config["CONFIGFILES"].check_name(variables["name"]):
|
||||
if not app.config["CONFIGFILES"].check_name(name):
|
||||
return redirect_flash_error(
|
||||
f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores, dots and hyphens (min 4 characters and max 64))",
|
||||
"configs",
|
||||
True,
|
||||
)
|
||||
|
||||
if variables["type"] == "file":
|
||||
variables["name"] = f"{variables['name']}.conf"
|
||||
content = BeautifulSoup(variables["content"], "html.parser").get_text()
|
||||
|
||||
if "old_name" in variables:
|
||||
variables["old_name"] = f"{variables['old_name']}.conf"
|
||||
if request.form["operation"] == "new":
|
||||
db_configs.append({"type": root_dir, "name": name, "service_id": service_id, "data": content, "method": "ui"})
|
||||
operation = f"Created config {name}{f' for service {service_id}' if service_id else ''}"
|
||||
elif request.form["operation"] == "edit":
|
||||
if index == -1:
|
||||
return redirect_flash_error(
|
||||
f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True
|
||||
)
|
||||
|
||||
variables["content"] = BeautifulSoup(variables["content"], "html.parser").get_text()
|
||||
if old_name != name:
|
||||
db_configs[index]["name"] = name
|
||||
elif db_configs[index]["data"] == content:
|
||||
return redirect_flash_error(
|
||||
f"Config {name} was not edited because no values were changed{f' for service {service_id}' if service_id else ''}",
|
||||
"configs",
|
||||
True,
|
||||
)
|
||||
|
||||
error = False
|
||||
|
||||
if request.form["operation"] == "new" and variables["type"] == "folder":
|
||||
operation, error = app.config["CONFIGFILES"].create_folder(variables["path"], variables["name"])
|
||||
|
||||
if request.form["operation"] == "new" and variables["type"] == "file":
|
||||
operation, error = app.config["CONFIGFILES"].create_file(variables["path"], variables["name"], variables["content"])
|
||||
|
||||
if request.form["operation"] == "edit" and variables["type"] == "file":
|
||||
operation, error = app.config["CONFIGFILES"].edit_file(
|
||||
variables["path"],
|
||||
variables["name"],
|
||||
variables.get("old_name", variables["name"]),
|
||||
variables["content"],
|
||||
)
|
||||
|
||||
if request.form["operation"] == "edit" and variables["type"] == "folder":
|
||||
operation, error = app.config["CONFIGFILES"].edit_folder(
|
||||
variables["path"],
|
||||
variables["name"],
|
||||
variables.get("old_name", variables["name"]),
|
||||
)
|
||||
|
||||
if error:
|
||||
return redirect_flash_error(operation, "configs", True)
|
||||
db_configs[index]["data"] = content
|
||||
operation = f"Edited config {name}{f' for service {service_id}' if service_id else ''}"
|
||||
|
||||
# Delete a config
|
||||
if request.form["operation"] == "delete":
|
||||
operation, error = app.config["CONFIGFILES"].delete_path(variables["path"])
|
||||
elif request.form["operation"] == "delete":
|
||||
if index == -1:
|
||||
return redirect_flash_error(
|
||||
f"Can't delete config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True
|
||||
)
|
||||
|
||||
if error:
|
||||
return redirect_flash_error(operation, "configs", True)
|
||||
del db_configs[index]
|
||||
operation = f"Deleted config {name}{f' for service {service_id}' if service_id else ''}"
|
||||
|
||||
error = db.save_custom_configs([config for config in db_configs if config["method"] == "ui"], "ui")
|
||||
if error:
|
||||
app.logger.error(f"Could not save custom configs: {error}")
|
||||
return redirect_flash_error("Couldn't save custom configs", "configs", True)
|
||||
|
||||
flash(operation)
|
||||
|
||||
error = app.config["CONFIGFILES"].save_configs()
|
||||
|
||||
if error:
|
||||
return redirect_flash_error("Couldn't save custom configs to disk", "configs", True)
|
||||
|
||||
return redirect(url_for("loading", next=url_for("configs")))
|
||||
|
||||
return render_template(
|
||||
|
|
@ -1155,7 +1169,7 @@ def configs():
|
|||
folders=[
|
||||
path_to_dict(
|
||||
join(sep, "etc", "bunkerweb", "configs"),
|
||||
db_data=db.get_custom_configs(),
|
||||
db_data=db_configs,
|
||||
services=app.config["CONFIG"].get_config(methods=False).get("SERVER_NAME", "").split(" "),
|
||||
)
|
||||
],
|
||||
|
|
|
|||
|
|
@ -1,84 +1,18 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from glob import glob
|
||||
from os import listdir, replace, sep, walk
|
||||
from os.path import basename, dirname, join
|
||||
from os import sep
|
||||
from os.path import join
|
||||
from pathlib import Path
|
||||
from re import compile as re_compile
|
||||
from shutil import rmtree, move as shutil_move
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
from utils import path_to_dict
|
||||
|
||||
|
||||
def generate_custom_configs(
|
||||
custom_configs: List[Dict[str, Any]],
|
||||
*,
|
||||
original_path: Path = Path(sep, "etc", "bunkerweb", "configs"),
|
||||
):
|
||||
original_path.mkdir(parents=True, exist_ok=True)
|
||||
for custom_config in custom_configs:
|
||||
tmp_path = original_path.joinpath(custom_config["type"].replace("_", "-"))
|
||||
if custom_config["service_id"]:
|
||||
tmp_path = tmp_path.joinpath(custom_config["service_id"])
|
||||
tmp_path = tmp_path.joinpath(f"{custom_config['name']}.conf")
|
||||
tmp_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path.write_bytes(custom_config["data"])
|
||||
|
||||
|
||||
class ConfigFiles:
|
||||
def __init__(self, logger, db):
|
||||
def __init__(self):
|
||||
self.__name_regex = re_compile(r"^[\w.-]{4,64}$")
|
||||
self.__root_dirs = [child["name"] for child in path_to_dict(join(sep, "etc", "bunkerweb", "configs"))["children"]]
|
||||
self.__file_creation_blacklist = ["http", "stream"]
|
||||
self.__logger = logger
|
||||
self.__db = db
|
||||
|
||||
if not Path(sep, "usr", "sbin", "nginx").is_file():
|
||||
custom_configs = self.__db.get_custom_configs()
|
||||
|
||||
if custom_configs:
|
||||
self.__logger.info("Refreshing custom configs ...")
|
||||
# Remove old custom configs files
|
||||
for file in glob(join(sep, "etc", "bunkerweb", "configs", "*", "*")):
|
||||
file = Path(file)
|
||||
if file.is_symlink() or file.is_file():
|
||||
file.unlink()
|
||||
elif file.is_dir():
|
||||
rmtree(str(file), ignore_errors=True)
|
||||
|
||||
generate_custom_configs(custom_configs)
|
||||
self.__logger.info("Custom configs refreshed successfully")
|
||||
|
||||
def save_configs(self, *, check_changes: bool = True) -> str:
|
||||
custom_configs = []
|
||||
configs_path = join(sep, "etc", "bunkerweb", "configs")
|
||||
root_dirs = listdir(configs_path)
|
||||
for root, dirs, files in walk(configs_path):
|
||||
if files or (dirs and basename(root) not in root_dirs):
|
||||
path_exploded = root.split("/")
|
||||
for file in files:
|
||||
# root_dirs is index 4 on path exploded
|
||||
# in case this is a service config, index 5 is the service id and index 6 is the config name
|
||||
# else index 5 is the config name
|
||||
service_id = path_exploded[5] if len(path_exploded) >= 6 else None
|
||||
root_dir = path_exploded[4]
|
||||
path_result = (service_id, root_dir, file.replace(".conf", ""))
|
||||
with open(join(root, file), "r", encoding="utf-8") as f:
|
||||
custom_configs.append(
|
||||
{
|
||||
"value": f.read(),
|
||||
"exploded": path_result,
|
||||
}
|
||||
)
|
||||
|
||||
print("custom config", custom_configs, flush=True)
|
||||
err = self.__db.save_custom_configs(custom_configs, "ui", changed=check_changes)
|
||||
if err:
|
||||
self.__logger.error(f"Could not save custom configs: {err}")
|
||||
return "Couldn't save custom configs to database"
|
||||
|
||||
return ""
|
||||
|
||||
def check_name(self, name: str) -> bool:
|
||||
return self.__name_regex.match(name) is not None
|
||||
|
|
@ -104,88 +38,3 @@ class ConfigFiles:
|
|||
return f"{join(root_path, root_dir, '/'.join(dirs.split('/')[0:-x]))} doesn't exist"
|
||||
|
||||
return ""
|
||||
|
||||
def delete_path(self, path: str) -> Tuple[str, int]:
|
||||
try:
|
||||
path: Path = Path(path)
|
||||
if path.is_file():
|
||||
path.unlink()
|
||||
elif path.is_dir():
|
||||
rmtree(path, ignore_errors=False)
|
||||
else:
|
||||
path = Path(f"{path}.conf")
|
||||
if path.is_file():
|
||||
path.unlink()
|
||||
else:
|
||||
rmtree(path, ignore_errors=False)
|
||||
except OSError:
|
||||
return f"Could not delete {path}", 1
|
||||
|
||||
return f"{path} was successfully deleted", 0
|
||||
|
||||
def create_folder(self, path: str, name: str) -> Tuple[str, int]:
|
||||
folder_path = join(path, name) if not path.endswith(name) else path
|
||||
try:
|
||||
Path(folder_path).mkdir(parents=True)
|
||||
except OSError:
|
||||
return f"Could not create {folder_path}", 1
|
||||
|
||||
return f"The folder {folder_path} was successfully created", 0
|
||||
|
||||
def create_file(self, path: str, name: str, content: str) -> Tuple[str, int]:
|
||||
file_path = Path(path, name)
|
||||
file_path.parent.mkdir(exist_ok=True)
|
||||
file_path.write_text(content, encoding="utf-8")
|
||||
return f"The file {file_path} was successfully created", 0
|
||||
|
||||
def edit_folder(self, path: str, name: str, old_name: str) -> Tuple[str, int]:
|
||||
new_folder_path = join(dirname(path), name)
|
||||
old_folder_path = join(dirname(path), old_name)
|
||||
|
||||
if old_folder_path == new_folder_path:
|
||||
return (
|
||||
f"{old_folder_path} was not renamed because the name didn't change",
|
||||
0,
|
||||
)
|
||||
|
||||
try:
|
||||
shutil_move(old_folder_path, new_folder_path)
|
||||
except OSError:
|
||||
return f"Could not move {old_folder_path}", 1
|
||||
|
||||
return (
|
||||
f"The folder {old_folder_path} was successfully renamed to {new_folder_path}",
|
||||
0,
|
||||
)
|
||||
|
||||
def edit_file(self, path: str, name: str, old_name: str, content: str) -> Tuple[str, int]:
|
||||
new_path = join(dirname(path), name)
|
||||
old_path = join(dirname(path), old_name)
|
||||
|
||||
try:
|
||||
file_content = Path(old_path).read_text(encoding="utf-8")
|
||||
except FileNotFoundError:
|
||||
return f"Could not find {old_path}", 1
|
||||
|
||||
if old_path == new_path and file_content == content:
|
||||
return (
|
||||
f"{old_path} was not edited because the content and the name didn't change",
|
||||
0,
|
||||
)
|
||||
elif file_content == content:
|
||||
try:
|
||||
replace(path, new_path)
|
||||
return f"{old_path} was successfully renamed to {new_path}", 0
|
||||
except OSError:
|
||||
return f"Could not rename {old_path} into {new_path}", 1
|
||||
elif old_path == new_path:
|
||||
new_path = old_path
|
||||
else:
|
||||
try:
|
||||
Path(old_path).unlink()
|
||||
except OSError:
|
||||
return f"Could not remove {old_path}", 1
|
||||
|
||||
Path(new_path).write_text(content, encoding="utf-8")
|
||||
|
||||
return f"The file {old_path} was successfully edited", 0
|
||||
|
|
|
|||
Loading…
Reference in a new issue