Refactor save_config method to greatly accelerate config saves

This commit is contained in:
Théophile Diot 2024-09-30 15:57:32 +02:00
parent 4d0a53ec90
commit a272448242
No known key found for this signature in database
GPG key ID: FA995104A0BA376A

View file

@ -1,5 +1,7 @@
#!/usr/bin/env python3
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager, suppress
from copy import deepcopy
from datetime import datetime
@ -1266,6 +1268,10 @@ class Database:
def save_config(self, config: Dict[str, Any], method: str, changed: Optional[bool] = True) -> Union[str, Set[str]]:
"""Save the config in the database"""
to_put = []
to_update = []
to_delete = []
changed_plugins = set()
changed_services = False
db_config = {}
if method == "autoconf":
@ -1276,31 +1282,34 @@ class Database:
return "The database is read-only, the changes will not be saved"
self.logger.debug(f"Saving config for method {method}")
changed_plugins = set()
changed_services = False
self.logger.debug(f"Cleaning up {method} old global settings")
# Collect global settings to delete
global_settings_to_delete = []
for db_global_config in session.query(Global_values).filter_by(method=method).all():
key = db_global_config.setting_id
if db_global_config.suffix:
key = f"{key}_{db_global_config.suffix}"
if key not in config and (db_global_config.suffix or f"{key}_0" not in config):
session.delete(db_global_config)
changed_plugins.add(session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_global_config.setting_id).first().plugin_id)
global_settings_to_delete.append(db_global_config)
plugin_id = session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_global_config.setting_id).first().plugin_id
changed_plugins.add(plugin_id)
if key == "SERVER_NAME":
changed_services = True
self.logger.debug(f"Cleaning up {method} old services settings")
# Collect service settings to delete
service_settings_to_delete = []
for db_service_config in session.query(Services_settings).filter_by(method=method).all():
key = f"{db_service_config.service_id}_{db_service_config.setting_id}"
if db_service_config.suffix:
key = f"{key}_{db_service_config.suffix}"
if key not in config and (db_service_config.suffix or f"{key}_0" not in config):
session.delete(db_service_config)
changed_plugins.add(session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_service_config.setting_id).first().plugin_id)
service_settings_to_delete.append(db_service_config)
plugin_id = session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_service_config.setting_id).first().plugin_id
changed_plugins.add(plugin_id)
if config:
config.pop("DATABASE_URI", None)
@ -1314,9 +1323,7 @@ class Database:
if isinstance(services, str):
services = services.strip().split(" ")
for i, service in enumerate(services):
if not service:
services.pop(i)
services = [service for service in services if service] # Clean up empty strings
if db_services:
missing_ids = [service.id for service in db_services if service.method == method and service.id not in services]
@ -1324,10 +1331,10 @@ class Database:
if missing_ids:
self.logger.debug(f"Removing {len(missing_ids)} services that are no longer in the list")
# Remove services that are no longer in the list
session.query(Services).filter(Services.id.in_(missing_ids)).delete()
session.query(Services_settings).filter(Services_settings.service_id.in_(missing_ids)).delete()
session.query(Custom_configs).filter(Custom_configs.service_id.in_(missing_ids)).delete()
session.query(Jobs_cache).filter(Jobs_cache.service_id.in_(missing_ids)).delete()
session.query(Services).filter(Services.id.in_(missing_ids)).delete(synchronize_session=False)
session.query(Services_settings).filter(Services_settings.service_id.in_(missing_ids)).delete(synchronize_session=False)
session.query(Custom_configs).filter(Custom_configs.service_id.in_(missing_ids)).delete(synchronize_session=False)
session.query(Jobs_cache).filter(Jobs_cache.service_id.in_(missing_ids)).delete(synchronize_session=False)
session.query(Metadata).filter_by(id=1).update(
{Metadata.custom_configs_changed: True, Metadata.last_custom_configs_change: datetime.now().astimezone()}
)
@ -1344,8 +1351,8 @@ class Database:
if missing_drafts:
self.logger.debug(f"Removing {len(missing_drafts)} drafts that are no longer in the list")
# Remove drafts that are no longer in the list
session.query(Services).filter(Services.id.in_(missing_drafts)).update({Services.is_draft: False})
# Update services to remove draft status
session.query(Services).filter(Services.id.in_(missing_drafts)).update({Services.is_draft: False}, synchronize_session=False)
changed_services = True
for draft in drafts:
@ -1357,7 +1364,7 @@ class Database:
db_ids[draft] = {"method": method, "is_draft": True}
elif method == db_ids[draft]["method"]:
self.logger.debug(f"Updating draft {draft}")
session.query(Services).filter(Services.id == draft).update({Services.is_draft: True, Services.last_update: current_time})
to_update.append({"model": Services, "filter": {"id": draft}, "values": {"is_draft": True, "last_update": current_time}})
changed_services = True
template = config.get("USE_TEMPLATE", "")
@ -1365,136 +1372,201 @@ class Database:
if config.get("MULTISITE", "no") == "yes":
self.logger.debug("Checking if the multisite settings have changed")
service_templates = {}
for service in services:
service_templates[service] = config.get(f"{service}_USE_TEMPLATE", template)
service_configs = defaultdict(dict)
global_config = {}
global_values = []
for key, value in config.copy().items():
suffix = 0
original_key = deepcopy(key)
if self.suffix_rx.search(key):
suffix = int(key.split("_")[-1])
key = key[: -len(str(suffix)) - 1]
for key, value in config.items():
matched = False
for service in services:
prefix = f"{service}_"
if key.startswith(prefix):
stripped_key = key[len(prefix) :] # noqa: E203
service_configs[service][stripped_key] = value
matched = True
break
if not matched:
global_config[key] = value
setting = session.query(Settings).with_entities(Settings.default, Settings.plugin_id).filter_by(id=key).first()
# Collect necessary data before threading
settings_data = session.query(Settings.id, Settings.default, Settings.plugin_id).all()
settings_dict = {s.id: {"default": s.default, "plugin_id": s.plugin_id} for s in settings_data}
if not setting and services:
try:
server_name = next(service for service in services if key.startswith(f"{service}_"))
except StopIteration:
# Collect existing service settings
existing_service_settings = session.query(
Services_settings.service_id, Services_settings.setting_id, Services_settings.suffix, Services_settings.value, Services_settings.method
).all()
existing_service_settings_dict = {
(s.service_id, s.setting_id, s.suffix): {"value": s.value, "method": s.method} for s in existing_service_settings
}
# Collect template settings
template_settings = {}
if template:
template_settings_data = (
session.query(Template_settings.setting_id, Template_settings.suffix, Template_settings.default)
.filter_by(template_id=template)
.all()
)
template_settings = {(t.setting_id, t.suffix): t.default for t in template_settings_data}
def process_service(server_name: str, service_config: Dict[str, str], db_ids: Dict[str, dict]):
local_to_put = []
local_to_update = []
local_to_delete = []
local_changed_plugins = set()
local_changed_services = False
service_template = service_config.get("USE_TEMPLATE", template)
for original_key, value in service_config.items():
suffix = 0
key = deepcopy(original_key)
if self.suffix_rx.search(key):
suffix = int(key.split("_")[-1])
key = key[: -len(str(suffix)) - 1]
setting = settings_dict.get(key)
if not setting:
self.logger.debug(f"Setting {key} does not exist")
continue
if server_name not in db_ids:
self.logger.debug(f"Adding service {server_name}")
current_time = datetime.now().astimezone()
to_put.append(
local_to_put.append(
Services(
id=server_name, method=method, is_draft=server_name in drafts, creation_date=current_time, last_update=current_time
)
)
db_ids[server_name] = {"method": method, "is_draft": server_name in drafts}
if server_name not in drafts:
changed_services = True
local_changed_services = True
key = key.replace(f"{server_name}_", "")
original_key = original_key.replace(f"{server_name}_", "")
setting = session.query(Settings).with_entities(Settings.default, Settings.plugin_id).filter_by(id=key).first()
service_setting = existing_service_settings_dict.get((server_name, key, suffix))
if not setting:
self.logger.debug(f"Setting {key} does not exist")
continue
service_setting = (
session.query(Services_settings)
.with_entities(Services_settings.value, Services_settings.method)
.filter_by(service_id=server_name, setting_id=key, suffix=suffix)
.first()
)
template_setting = None
if service_templates[server_name]:
template_setting = (
session.query(Template_settings)
.with_entities(Template_settings.default)
.filter_by(template_id=service_templates[server_name], setting_id=key, suffix=suffix)
.first()
)
template_setting_default = None
if service_template:
template_setting_default = template_settings.get((key, suffix))
# Determine if we need to add, update, or delete
if not service_setting:
if key != "SERVER_NAME" and (
(original_key in config and value == config[original_key])
or (original_key in db_config and value == db_config[original_key])
or value == (template_setting.default if template_setting else setting.default)
or value == (template_setting_default if template_setting_default is not None else setting["default"])
):
continue
self.logger.debug(f"Adding setting {key} for service {server_name}")
changed_plugins.add(setting.plugin_id)
to_put.append(Services_settings(service_id=server_name, setting_id=key, value=value, suffix=suffix, method=method))
session.query(Services).filter(Services.id == server_name).update({Services.last_update: datetime.now().astimezone()})
elif (
method == service_setting.method or (service_setting.method not in ("scheduler", "autoconf") and method == "autoconf")
) and service_setting.value != value:
changed_plugins.add(setting.plugin_id)
query = session.query(Services_settings).filter(
Services_settings.service_id == server_name,
Services_settings.setting_id == key,
Services_settings.suffix == suffix,
local_changed_plugins.add(setting["plugin_id"])
local_to_put.append(Services_settings(service_id=server_name, setting_id=key, value=value, suffix=suffix, method=method))
# Update Services.last_update
local_to_update.append(
{"model": Services, "filter": {"id": server_name}, "values": {"last_update": datetime.now().astimezone()}}
)
elif (
method == service_setting["method"] or (service_setting["method"] not in ("scheduler", "autoconf") and method == "autoconf")
) and service_setting["value"] != value:
local_changed_plugins.add(setting["plugin_id"])
if key != "SERVER_NAME" and (
(original_key in config and value == config[original_key])
or (original_key in db_config and value == db_config[original_key])
or value == (template_setting.default if template_setting else setting.default)
or value == (template_setting_default if template_setting_default is not None else setting["default"])
):
self.logger.debug(f"Removing setting {key} for service {server_name}")
query.delete()
local_to_delete.append(
{"model": Services_settings, "filter": {"service_id": server_name, "setting_id": key, "suffix": suffix}}
)
continue
self.logger.debug(f"Updating setting {key} for service {server_name}")
query.update({Services_settings.value: value, Services_settings.method: method})
session.query(Services).filter(Services.id == server_name).update({Services.last_update: datetime.now().astimezone()})
elif setting and original_key not in global_values:
global_values.append(original_key)
global_value = (
session.query(Global_values)
.with_entities(Global_values.value, Global_values.method)
.filter_by(setting_id=key, suffix=suffix)
.first()
)
template_setting = None
if template:
template_setting = (
session.query(Template_settings)
.with_entities(Template_settings.default)
.filter_by(template_id=template, setting_id=key, suffix=suffix)
.first()
local_to_update.extend(
[
{
"model": Services_settings,
"filter": {"service_id": server_name, "setting_id": key, "suffix": suffix},
"values": {"value": value, "method": method},
},
{"model": Services, "filter": {"id": server_name}, "values": {"last_update": datetime.now().astimezone()}},
]
)
return local_to_put, local_to_update, local_to_delete, local_changed_plugins, local_changed_services
def process_global_settings(global_config: Dict[str, str]):
local_to_put = []
local_to_update = []
local_to_delete = []
local_changed_plugins = set()
for original_key, value in global_config.items():
suffix = 0
key = deepcopy(original_key)
if self.suffix_rx.search(key):
suffix = int(key.split("_")[-1])
key = key[: -len(str(suffix)) - 1]
setting = settings_dict.get(key)
if not setting:
self.logger.debug(f"Setting {key} does not exist")
continue
global_value = session.query(Global_values.value, Global_values.method).filter_by(setting_id=key, suffix=suffix).first()
template_setting_default = None
if template:
template_setting_default = template_settings.get((key, suffix))
if not global_value:
if value == (template_setting.default if template_setting is not None else setting.default):
if value == (template_setting_default if template_setting_default is not None else setting["default"]):
continue
self.logger.debug(f"Adding global setting {key}")
changed_plugins.add(setting.plugin_id)
to_put.append(Global_values(setting_id=key, value=value, suffix=suffix, method=method))
local_changed_plugins.add(setting["plugin_id"])
local_to_put.append(Global_values(setting_id=key, value=value, suffix=suffix, method=method))
elif (
method == global_value.method or (global_value.method not in ("scheduler", "autoconf") and method == "autoconf")
) and global_value.value != value:
changed_plugins.add(setting.plugin_id)
query = session.query(Global_values).filter(Global_values.setting_id == key, Global_values.suffix == suffix)
local_changed_plugins.add(setting["plugin_id"])
if value == (template_setting.default if template_setting is not None else setting.default):
if value == (template_setting_default if template_setting_default is not None else setting["default"]):
self.logger.debug(f"Removing global setting {key}")
query.delete()
local_to_delete.append({"model": Global_values, "filter": {"setting_id": key, "suffix": suffix}})
continue
self.logger.debug(f"Updating global setting {key}")
query.update({Global_values.value: value, Global_values.method: method})
elif method != "autoconf":
self.logger.debug("Checking if non multisite settings have changed")
local_to_update.append(
{"model": Global_values, "filter": {"setting_id": key, "suffix": suffix}, "values": {"value": value, "method": method}}
)
return local_to_put, local_to_update, local_to_delete, local_changed_plugins, False
# Use ThreadPoolExecutor to process services in parallel
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(process_service, service_name, service_config, db_ids) for service_name, service_config in service_configs.items()
]
# Process global settings in another thread or the main thread
futures.append(executor.submit(process_global_settings, global_config))
# Collect results from threads
for future in as_completed(futures):
try:
ret_to_put, ret_to_update, ret_to_delete, ret_changed_plugins, ret_changed_services = future.result()
to_put.extend(ret_to_put)
to_update.extend(ret_to_update)
to_delete.extend(ret_to_delete)
changed_plugins.update(ret_changed_plugins)
if not changed_services:
changed_services = ret_changed_services
except Exception as e:
self.logger.error(f"Thread raised an exception: {e}")
else:
# Non-multisite configuration
self.logger.debug("Checking if non-multisite settings have changed")
server_name = config.get("SERVER_NAME", None)
if template and server_name is None:
@ -1555,15 +1627,16 @@ class Database:
method == global_value.method or (global_value.method not in ("scheduler", "autoconf") and method == "autoconf")
) and global_value.value != value:
changed_plugins.add(setting.plugin_id)
query = session.query(Global_values).filter(Global_values.setting_id == key, Global_values.suffix == suffix)
if value == (template_setting.default if template_setting is not None else setting.default):
self.logger.debug(f"Removing global setting {key}")
query.delete()
to_delete.append({"model": Global_values, "filter": {"setting_id": key, "suffix": suffix}})
continue
self.logger.debug(f"Updating global setting {key}")
query.update({Global_values.value: value, Global_values.method: method})
to_update.append(
{"model": Global_values, "filter": {"setting_id": key, "suffix": suffix}, "values": {"value": value, "method": method}}
)
if changed_services:
changed_plugins = set(plugin.id for plugin in session.query(Plugins).with_entities(Plugins.id).all())
@ -1576,12 +1649,31 @@ class Database:
metadata.first_config_saved = True
if changed_plugins:
session.query(Plugins).filter(Plugins.id.in_(changed_plugins)).update({Plugins.config_changed: True})
session.query(Plugins).filter(Plugins.id.in_(changed_plugins)).update({Plugins.config_changed: True}, synchronize_session=False)
try:
# Apply collected deletions
for delete_item in to_delete:
session.query(delete_item["model"]).filter_by(**delete_item["filter"]).delete(synchronize_session=False)
# Apply collected updates
for update_item in to_update:
session.query(update_item["model"]).filter_by(**update_item["filter"]).update(update_item["values"], synchronize_session=False)
# Add new objects
session.add_all(to_put)
# Delete old global settings
for global_setting in global_settings_to_delete:
session.delete(global_setting)
# Delete old service settings
for service_setting in service_settings_to_delete:
session.delete(service_setting)
session.commit()
except BaseException as e:
session.rollback()
return str(e)
return changed_plugins