mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Refactor save_config method to greatly accelerate config saves
This commit is contained in:
parent
4d0a53ec90
commit
a272448242
1 changed files with 194 additions and 102 deletions
|
|
@ -1,5 +1,7 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from collections import defaultdict
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from contextlib import contextmanager, suppress
|
||||
from copy import deepcopy
|
||||
from datetime import datetime
|
||||
|
|
@ -1266,6 +1268,10 @@ class Database:
|
|||
def save_config(self, config: Dict[str, Any], method: str, changed: Optional[bool] = True) -> Union[str, Set[str]]:
|
||||
"""Save the config in the database"""
|
||||
to_put = []
|
||||
to_update = []
|
||||
to_delete = []
|
||||
changed_plugins = set()
|
||||
changed_services = False
|
||||
|
||||
db_config = {}
|
||||
if method == "autoconf":
|
||||
|
|
@ -1276,31 +1282,34 @@ class Database:
|
|||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
self.logger.debug(f"Saving config for method {method}")
|
||||
changed_plugins = set()
|
||||
changed_services = False
|
||||
|
||||
self.logger.debug(f"Cleaning up {method} old global settings")
|
||||
# Collect global settings to delete
|
||||
global_settings_to_delete = []
|
||||
for db_global_config in session.query(Global_values).filter_by(method=method).all():
|
||||
key = db_global_config.setting_id
|
||||
if db_global_config.suffix:
|
||||
key = f"{key}_{db_global_config.suffix}"
|
||||
|
||||
if key not in config and (db_global_config.suffix or f"{key}_0" not in config):
|
||||
session.delete(db_global_config)
|
||||
changed_plugins.add(session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_global_config.setting_id).first().plugin_id)
|
||||
|
||||
global_settings_to_delete.append(db_global_config)
|
||||
plugin_id = session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_global_config.setting_id).first().plugin_id
|
||||
changed_plugins.add(plugin_id)
|
||||
if key == "SERVER_NAME":
|
||||
changed_services = True
|
||||
|
||||
self.logger.debug(f"Cleaning up {method} old services settings")
|
||||
# Collect service settings to delete
|
||||
service_settings_to_delete = []
|
||||
for db_service_config in session.query(Services_settings).filter_by(method=method).all():
|
||||
key = f"{db_service_config.service_id}_{db_service_config.setting_id}"
|
||||
if db_service_config.suffix:
|
||||
key = f"{key}_{db_service_config.suffix}"
|
||||
|
||||
if key not in config and (db_service_config.suffix or f"{key}_0" not in config):
|
||||
session.delete(db_service_config)
|
||||
changed_plugins.add(session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_service_config.setting_id).first().plugin_id)
|
||||
service_settings_to_delete.append(db_service_config)
|
||||
plugin_id = session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_service_config.setting_id).first().plugin_id
|
||||
changed_plugins.add(plugin_id)
|
||||
|
||||
if config:
|
||||
config.pop("DATABASE_URI", None)
|
||||
|
|
@ -1314,9 +1323,7 @@ class Database:
|
|||
if isinstance(services, str):
|
||||
services = services.strip().split(" ")
|
||||
|
||||
for i, service in enumerate(services):
|
||||
if not service:
|
||||
services.pop(i)
|
||||
services = [service for service in services if service] # Clean up empty strings
|
||||
|
||||
if db_services:
|
||||
missing_ids = [service.id for service in db_services if service.method == method and service.id not in services]
|
||||
|
|
@ -1324,10 +1331,10 @@ class Database:
|
|||
if missing_ids:
|
||||
self.logger.debug(f"Removing {len(missing_ids)} services that are no longer in the list")
|
||||
# Remove services that are no longer in the list
|
||||
session.query(Services).filter(Services.id.in_(missing_ids)).delete()
|
||||
session.query(Services_settings).filter(Services_settings.service_id.in_(missing_ids)).delete()
|
||||
session.query(Custom_configs).filter(Custom_configs.service_id.in_(missing_ids)).delete()
|
||||
session.query(Jobs_cache).filter(Jobs_cache.service_id.in_(missing_ids)).delete()
|
||||
session.query(Services).filter(Services.id.in_(missing_ids)).delete(synchronize_session=False)
|
||||
session.query(Services_settings).filter(Services_settings.service_id.in_(missing_ids)).delete(synchronize_session=False)
|
||||
session.query(Custom_configs).filter(Custom_configs.service_id.in_(missing_ids)).delete(synchronize_session=False)
|
||||
session.query(Jobs_cache).filter(Jobs_cache.service_id.in_(missing_ids)).delete(synchronize_session=False)
|
||||
session.query(Metadata).filter_by(id=1).update(
|
||||
{Metadata.custom_configs_changed: True, Metadata.last_custom_configs_change: datetime.now().astimezone()}
|
||||
)
|
||||
|
|
@ -1344,8 +1351,8 @@ class Database:
|
|||
|
||||
if missing_drafts:
|
||||
self.logger.debug(f"Removing {len(missing_drafts)} drafts that are no longer in the list")
|
||||
# Remove drafts that are no longer in the list
|
||||
session.query(Services).filter(Services.id.in_(missing_drafts)).update({Services.is_draft: False})
|
||||
# Update services to remove draft status
|
||||
session.query(Services).filter(Services.id.in_(missing_drafts)).update({Services.is_draft: False}, synchronize_session=False)
|
||||
changed_services = True
|
||||
|
||||
for draft in drafts:
|
||||
|
|
@ -1357,7 +1364,7 @@ class Database:
|
|||
db_ids[draft] = {"method": method, "is_draft": True}
|
||||
elif method == db_ids[draft]["method"]:
|
||||
self.logger.debug(f"Updating draft {draft}")
|
||||
session.query(Services).filter(Services.id == draft).update({Services.is_draft: True, Services.last_update: current_time})
|
||||
to_update.append({"model": Services, "filter": {"id": draft}, "values": {"is_draft": True, "last_update": current_time}})
|
||||
changed_services = True
|
||||
|
||||
template = config.get("USE_TEMPLATE", "")
|
||||
|
|
@ -1365,136 +1372,201 @@ class Database:
|
|||
if config.get("MULTISITE", "no") == "yes":
|
||||
self.logger.debug("Checking if the multisite settings have changed")
|
||||
|
||||
service_templates = {}
|
||||
for service in services:
|
||||
service_templates[service] = config.get(f"{service}_USE_TEMPLATE", template)
|
||||
service_configs = defaultdict(dict)
|
||||
global_config = {}
|
||||
|
||||
global_values = []
|
||||
for key, value in config.copy().items():
|
||||
suffix = 0
|
||||
original_key = deepcopy(key)
|
||||
if self.suffix_rx.search(key):
|
||||
suffix = int(key.split("_")[-1])
|
||||
key = key[: -len(str(suffix)) - 1]
|
||||
for key, value in config.items():
|
||||
matched = False
|
||||
for service in services:
|
||||
prefix = f"{service}_"
|
||||
if key.startswith(prefix):
|
||||
stripped_key = key[len(prefix) :] # noqa: E203
|
||||
service_configs[service][stripped_key] = value
|
||||
matched = True
|
||||
break
|
||||
if not matched:
|
||||
global_config[key] = value
|
||||
|
||||
setting = session.query(Settings).with_entities(Settings.default, Settings.plugin_id).filter_by(id=key).first()
|
||||
# Collect necessary data before threading
|
||||
settings_data = session.query(Settings.id, Settings.default, Settings.plugin_id).all()
|
||||
settings_dict = {s.id: {"default": s.default, "plugin_id": s.plugin_id} for s in settings_data}
|
||||
|
||||
if not setting and services:
|
||||
try:
|
||||
server_name = next(service for service in services if key.startswith(f"{service}_"))
|
||||
except StopIteration:
|
||||
# Collect existing service settings
|
||||
existing_service_settings = session.query(
|
||||
Services_settings.service_id, Services_settings.setting_id, Services_settings.suffix, Services_settings.value, Services_settings.method
|
||||
).all()
|
||||
existing_service_settings_dict = {
|
||||
(s.service_id, s.setting_id, s.suffix): {"value": s.value, "method": s.method} for s in existing_service_settings
|
||||
}
|
||||
|
||||
# Collect template settings
|
||||
template_settings = {}
|
||||
if template:
|
||||
template_settings_data = (
|
||||
session.query(Template_settings.setting_id, Template_settings.suffix, Template_settings.default)
|
||||
.filter_by(template_id=template)
|
||||
.all()
|
||||
)
|
||||
template_settings = {(t.setting_id, t.suffix): t.default for t in template_settings_data}
|
||||
|
||||
def process_service(server_name: str, service_config: Dict[str, str], db_ids: Dict[str, dict]):
|
||||
local_to_put = []
|
||||
local_to_update = []
|
||||
local_to_delete = []
|
||||
local_changed_plugins = set()
|
||||
local_changed_services = False
|
||||
|
||||
service_template = service_config.get("USE_TEMPLATE", template)
|
||||
|
||||
for original_key, value in service_config.items():
|
||||
suffix = 0
|
||||
key = deepcopy(original_key)
|
||||
if self.suffix_rx.search(key):
|
||||
suffix = int(key.split("_")[-1])
|
||||
key = key[: -len(str(suffix)) - 1]
|
||||
|
||||
setting = settings_dict.get(key)
|
||||
if not setting:
|
||||
self.logger.debug(f"Setting {key} does not exist")
|
||||
continue
|
||||
|
||||
if server_name not in db_ids:
|
||||
self.logger.debug(f"Adding service {server_name}")
|
||||
current_time = datetime.now().astimezone()
|
||||
to_put.append(
|
||||
local_to_put.append(
|
||||
Services(
|
||||
id=server_name, method=method, is_draft=server_name in drafts, creation_date=current_time, last_update=current_time
|
||||
)
|
||||
)
|
||||
db_ids[server_name] = {"method": method, "is_draft": server_name in drafts}
|
||||
if server_name not in drafts:
|
||||
changed_services = True
|
||||
local_changed_services = True
|
||||
|
||||
key = key.replace(f"{server_name}_", "")
|
||||
original_key = original_key.replace(f"{server_name}_", "")
|
||||
setting = session.query(Settings).with_entities(Settings.default, Settings.plugin_id).filter_by(id=key).first()
|
||||
service_setting = existing_service_settings_dict.get((server_name, key, suffix))
|
||||
|
||||
if not setting:
|
||||
self.logger.debug(f"Setting {key} does not exist")
|
||||
continue
|
||||
|
||||
service_setting = (
|
||||
session.query(Services_settings)
|
||||
.with_entities(Services_settings.value, Services_settings.method)
|
||||
.filter_by(service_id=server_name, setting_id=key, suffix=suffix)
|
||||
.first()
|
||||
)
|
||||
|
||||
template_setting = None
|
||||
if service_templates[server_name]:
|
||||
template_setting = (
|
||||
session.query(Template_settings)
|
||||
.with_entities(Template_settings.default)
|
||||
.filter_by(template_id=service_templates[server_name], setting_id=key, suffix=suffix)
|
||||
.first()
|
||||
)
|
||||
template_setting_default = None
|
||||
if service_template:
|
||||
template_setting_default = template_settings.get((key, suffix))
|
||||
|
||||
# Determine if we need to add, update, or delete
|
||||
if not service_setting:
|
||||
if key != "SERVER_NAME" and (
|
||||
(original_key in config and value == config[original_key])
|
||||
or (original_key in db_config and value == db_config[original_key])
|
||||
or value == (template_setting.default if template_setting else setting.default)
|
||||
or value == (template_setting_default if template_setting_default is not None else setting["default"])
|
||||
):
|
||||
continue
|
||||
|
||||
self.logger.debug(f"Adding setting {key} for service {server_name}")
|
||||
changed_plugins.add(setting.plugin_id)
|
||||
to_put.append(Services_settings(service_id=server_name, setting_id=key, value=value, suffix=suffix, method=method))
|
||||
session.query(Services).filter(Services.id == server_name).update({Services.last_update: datetime.now().astimezone()})
|
||||
elif (
|
||||
method == service_setting.method or (service_setting.method not in ("scheduler", "autoconf") and method == "autoconf")
|
||||
) and service_setting.value != value:
|
||||
changed_plugins.add(setting.plugin_id)
|
||||
query = session.query(Services_settings).filter(
|
||||
Services_settings.service_id == server_name,
|
||||
Services_settings.setting_id == key,
|
||||
Services_settings.suffix == suffix,
|
||||
local_changed_plugins.add(setting["plugin_id"])
|
||||
local_to_put.append(Services_settings(service_id=server_name, setting_id=key, value=value, suffix=suffix, method=method))
|
||||
# Update Services.last_update
|
||||
local_to_update.append(
|
||||
{"model": Services, "filter": {"id": server_name}, "values": {"last_update": datetime.now().astimezone()}}
|
||||
)
|
||||
elif (
|
||||
method == service_setting["method"] or (service_setting["method"] not in ("scheduler", "autoconf") and method == "autoconf")
|
||||
) and service_setting["value"] != value:
|
||||
local_changed_plugins.add(setting["plugin_id"])
|
||||
|
||||
if key != "SERVER_NAME" and (
|
||||
(original_key in config and value == config[original_key])
|
||||
or (original_key in db_config and value == db_config[original_key])
|
||||
or value == (template_setting.default if template_setting else setting.default)
|
||||
or value == (template_setting_default if template_setting_default is not None else setting["default"])
|
||||
):
|
||||
self.logger.debug(f"Removing setting {key} for service {server_name}")
|
||||
query.delete()
|
||||
local_to_delete.append(
|
||||
{"model": Services_settings, "filter": {"service_id": server_name, "setting_id": key, "suffix": suffix}}
|
||||
)
|
||||
continue
|
||||
|
||||
self.logger.debug(f"Updating setting {key} for service {server_name}")
|
||||
query.update({Services_settings.value: value, Services_settings.method: method})
|
||||
session.query(Services).filter(Services.id == server_name).update({Services.last_update: datetime.now().astimezone()})
|
||||
elif setting and original_key not in global_values:
|
||||
global_values.append(original_key)
|
||||
global_value = (
|
||||
session.query(Global_values)
|
||||
.with_entities(Global_values.value, Global_values.method)
|
||||
.filter_by(setting_id=key, suffix=suffix)
|
||||
.first()
|
||||
)
|
||||
|
||||
template_setting = None
|
||||
if template:
|
||||
template_setting = (
|
||||
session.query(Template_settings)
|
||||
.with_entities(Template_settings.default)
|
||||
.filter_by(template_id=template, setting_id=key, suffix=suffix)
|
||||
.first()
|
||||
local_to_update.extend(
|
||||
[
|
||||
{
|
||||
"model": Services_settings,
|
||||
"filter": {"service_id": server_name, "setting_id": key, "suffix": suffix},
|
||||
"values": {"value": value, "method": method},
|
||||
},
|
||||
{"model": Services, "filter": {"id": server_name}, "values": {"last_update": datetime.now().astimezone()}},
|
||||
]
|
||||
)
|
||||
|
||||
return local_to_put, local_to_update, local_to_delete, local_changed_plugins, local_changed_services
|
||||
|
||||
def process_global_settings(global_config: Dict[str, str]):
|
||||
local_to_put = []
|
||||
local_to_update = []
|
||||
local_to_delete = []
|
||||
local_changed_plugins = set()
|
||||
|
||||
for original_key, value in global_config.items():
|
||||
suffix = 0
|
||||
key = deepcopy(original_key)
|
||||
if self.suffix_rx.search(key):
|
||||
suffix = int(key.split("_")[-1])
|
||||
key = key[: -len(str(suffix)) - 1]
|
||||
|
||||
setting = settings_dict.get(key)
|
||||
if not setting:
|
||||
self.logger.debug(f"Setting {key} does not exist")
|
||||
continue
|
||||
|
||||
global_value = session.query(Global_values.value, Global_values.method).filter_by(setting_id=key, suffix=suffix).first()
|
||||
|
||||
template_setting_default = None
|
||||
if template:
|
||||
template_setting_default = template_settings.get((key, suffix))
|
||||
|
||||
if not global_value:
|
||||
if value == (template_setting.default if template_setting is not None else setting.default):
|
||||
if value == (template_setting_default if template_setting_default is not None else setting["default"]):
|
||||
continue
|
||||
|
||||
self.logger.debug(f"Adding global setting {key}")
|
||||
changed_plugins.add(setting.plugin_id)
|
||||
to_put.append(Global_values(setting_id=key, value=value, suffix=suffix, method=method))
|
||||
local_changed_plugins.add(setting["plugin_id"])
|
||||
local_to_put.append(Global_values(setting_id=key, value=value, suffix=suffix, method=method))
|
||||
elif (
|
||||
method == global_value.method or (global_value.method not in ("scheduler", "autoconf") and method == "autoconf")
|
||||
) and global_value.value != value:
|
||||
changed_plugins.add(setting.plugin_id)
|
||||
query = session.query(Global_values).filter(Global_values.setting_id == key, Global_values.suffix == suffix)
|
||||
local_changed_plugins.add(setting["plugin_id"])
|
||||
|
||||
if value == (template_setting.default if template_setting is not None else setting.default):
|
||||
if value == (template_setting_default if template_setting_default is not None else setting["default"]):
|
||||
self.logger.debug(f"Removing global setting {key}")
|
||||
query.delete()
|
||||
local_to_delete.append({"model": Global_values, "filter": {"setting_id": key, "suffix": suffix}})
|
||||
continue
|
||||
|
||||
self.logger.debug(f"Updating global setting {key}")
|
||||
query.update({Global_values.value: value, Global_values.method: method})
|
||||
elif method != "autoconf":
|
||||
self.logger.debug("Checking if non multisite settings have changed")
|
||||
local_to_update.append(
|
||||
{"model": Global_values, "filter": {"setting_id": key, "suffix": suffix}, "values": {"value": value, "method": method}}
|
||||
)
|
||||
|
||||
return local_to_put, local_to_update, local_to_delete, local_changed_plugins, False
|
||||
|
||||
# Use ThreadPoolExecutor to process services in parallel
|
||||
with ThreadPoolExecutor() as executor:
|
||||
futures = [
|
||||
executor.submit(process_service, service_name, service_config, db_ids) for service_name, service_config in service_configs.items()
|
||||
]
|
||||
|
||||
# Process global settings in another thread or the main thread
|
||||
futures.append(executor.submit(process_global_settings, global_config))
|
||||
|
||||
# Collect results from threads
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
ret_to_put, ret_to_update, ret_to_delete, ret_changed_plugins, ret_changed_services = future.result()
|
||||
to_put.extend(ret_to_put)
|
||||
to_update.extend(ret_to_update)
|
||||
to_delete.extend(ret_to_delete)
|
||||
changed_plugins.update(ret_changed_plugins)
|
||||
if not changed_services:
|
||||
changed_services = ret_changed_services
|
||||
except Exception as e:
|
||||
self.logger.error(f"Thread raised an exception: {e}")
|
||||
|
||||
else:
|
||||
# Non-multisite configuration
|
||||
self.logger.debug("Checking if non-multisite settings have changed")
|
||||
|
||||
server_name = config.get("SERVER_NAME", None)
|
||||
if template and server_name is None:
|
||||
|
|
@ -1555,15 +1627,16 @@ class Database:
|
|||
method == global_value.method or (global_value.method not in ("scheduler", "autoconf") and method == "autoconf")
|
||||
) and global_value.value != value:
|
||||
changed_plugins.add(setting.plugin_id)
|
||||
query = session.query(Global_values).filter(Global_values.setting_id == key, Global_values.suffix == suffix)
|
||||
|
||||
if value == (template_setting.default if template_setting is not None else setting.default):
|
||||
self.logger.debug(f"Removing global setting {key}")
|
||||
query.delete()
|
||||
to_delete.append({"model": Global_values, "filter": {"setting_id": key, "suffix": suffix}})
|
||||
continue
|
||||
|
||||
self.logger.debug(f"Updating global setting {key}")
|
||||
query.update({Global_values.value: value, Global_values.method: method})
|
||||
to_update.append(
|
||||
{"model": Global_values, "filter": {"setting_id": key, "suffix": suffix}, "values": {"value": value, "method": method}}
|
||||
)
|
||||
|
||||
if changed_services:
|
||||
changed_plugins = set(plugin.id for plugin in session.query(Plugins).with_entities(Plugins.id).all())
|
||||
|
|
@ -1576,12 +1649,31 @@ class Database:
|
|||
metadata.first_config_saved = True
|
||||
|
||||
if changed_plugins:
|
||||
session.query(Plugins).filter(Plugins.id.in_(changed_plugins)).update({Plugins.config_changed: True})
|
||||
session.query(Plugins).filter(Plugins.id.in_(changed_plugins)).update({Plugins.config_changed: True}, synchronize_session=False)
|
||||
|
||||
try:
|
||||
# Apply collected deletions
|
||||
for delete_item in to_delete:
|
||||
session.query(delete_item["model"]).filter_by(**delete_item["filter"]).delete(synchronize_session=False)
|
||||
|
||||
# Apply collected updates
|
||||
for update_item in to_update:
|
||||
session.query(update_item["model"]).filter_by(**update_item["filter"]).update(update_item["values"], synchronize_session=False)
|
||||
|
||||
# Add new objects
|
||||
session.add_all(to_put)
|
||||
|
||||
# Delete old global settings
|
||||
for global_setting in global_settings_to_delete:
|
||||
session.delete(global_setting)
|
||||
|
||||
# Delete old service settings
|
||||
for service_setting in service_settings_to_delete:
|
||||
session.delete(service_setting)
|
||||
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
session.rollback()
|
||||
return str(e)
|
||||
|
||||
return changed_plugins
|
||||
|
|
|
|||
Loading…
Reference in a new issue