feat: Add config_changed column to Plugins instead od Metadata models to avoid scheduler re running all jobs when we only need to run specific plugins ones

This commit is contained in:
Théophile Diot 2024-05-27 15:00:15 +01:00
parent 32d9be885f
commit 64eedf9cc2
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
5 changed files with 131 additions and 122 deletions

View file

@ -120,9 +120,7 @@ class Config(ConfigCaller):
)
while not self._db.is_initialized():
self.__logger.warning(
"Database is not initialized, retrying in 5 seconds ...",
)
self.__logger.warning("Database is not initialized, retrying in 5 seconds ...")
sleep(5)
# wait until changes are applied
@ -133,9 +131,7 @@ class Config(ConfigCaller):
elif not any(curr_changes.values()):
break
else:
self.__logger.warning(
"Scheduler is already applying a configuration, retrying in 5 seconds ...",
)
self.__logger.warning("Scheduler is already applying a configuration, retrying in 5 seconds ...")
sleep(5)
# update instances in database
@ -144,27 +140,23 @@ class Config(ConfigCaller):
if err:
self.__logger.error(f"Failed to update instances: {err}")
# save config to database
if "config" in changes:
err = self._db.save_config(self.__config, "autoconf", changed=False)
if err:
success = False
self.__logger.error(
f"Can't save config in database: {err}, config may not work as expected",
)
# save custom configs to database
if "custom_configs" in changes:
err = self._db.save_custom_configs(custom_configs, "autoconf", changed=False)
if err:
success = False
self.__logger.error(
f"Can't save autoconf custom configs in database: {err}, custom configs may not work as expected",
)
self.__logger.error(f"Can't save autoconf custom configs in database: {err}, custom configs may not work as expected")
# update changes in db
ret = self._db.checked_changes(changes, value=True)
if ret:
self.__logger.error(f"An error occurred when setting the changes to checked in the database : {ret}")
# save config to database
if "config" in changes:
err = self._db.save_config(self.__config, "autoconf")
if err:
success = False
self.__logger.error(f"Can't save config in database: {err}, config may not work as expected")
else:
# update changes in db
ret = self._db.checked_changes(changes, value=True)
if ret:
self.__logger.error(f"An error occurred when setting the changes to checked in the database : {ret}")
return success

View file

@ -487,7 +487,6 @@ class Database:
Metadata.custom_configs_changed,
Metadata.external_plugins_changed,
Metadata.pro_plugins_changed,
Metadata.config_changed,
Metadata.instances_changed,
)
.filter_by(id=1)
@ -498,21 +497,24 @@ class Database:
custom_configs_changed=metadata is not None and metadata.custom_configs_changed,
external_plugins_changed=metadata is not None and metadata.external_plugins_changed,
pro_plugins_changed=metadata is not None and metadata.pro_plugins_changed,
config_changed=metadata is not None and metadata.config_changed,
instances_changed=metadata is not None and metadata.instances_changed,
plugins_config_changed=self.check_plugin_changes(),
)
except BaseException as e:
return str(e)
def check_plugin_changes(self) -> Union[List[str], str]:
"""Check if the plugins have changed inside the database"""
with self.__db_session() as session:
try:
plugins = session.query(Plugins).with_entities(Plugins.id).filter_by(config_changed=True).all()
return [plugin.id for plugin in plugins]
except BaseException as e:
return str(e)
def checked_changes(self, changes: Optional[List[str]] = None, value: Optional[bool] = False) -> str:
"""Set changed bit for config, custom configs, instances and plugins"""
changes = changes or [
"config",
"custom_configs",
"external_plugins",
"pro_plugins",
"instances",
]
changes = changes or ["config", "custom_configs", "external_plugins", "pro_plugins", "instances"]
with self.__db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
@ -526,7 +528,6 @@ class Database:
if "config" in changes:
if not metadata.first_config_saved:
metadata.first_config_saved = True
metadata.config_changed = value
if "custom_configs" in changes:
metadata.custom_configs_changed = value
if "external_plugins" in changes:
@ -541,6 +542,25 @@ class Database:
return ""
def checked_plugins_changes(self, plugins: Optional[List[str]] = None, value: Optional[bool] = False) -> str:
"""Set changed bit for plugins"""
with self.__db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
plugins = plugins or []
try:
query = session.query(Plugins)
if plugins:
query = query.filter(Plugins.id.in_(plugins))
query.update({Plugins.config_changed: value})
session.commit()
except BaseException as e:
return str(e)
return ""
def init_tables(self, default_plugins: List[dict], bunkerweb_version: str) -> Tuple[bool, str]:
"""Initialize the database tables and return the result"""
@ -1117,9 +1137,29 @@ class Database:
if self.readonly:
return "The database is read-only, the changes will not be saved"
# Delete all the old config
session.query(Global_values).filter(Global_values.method == method).delete()
session.query(Services_settings).filter(Services_settings.method == method).delete()
changed_plugins = set()
changed_services = False
for db_global_config in session.query(Global_values).filter_by(method=method).all():
key = db_global_config.setting_id
if db_global_config.suffix:
key = f"{key}_{db_global_config.suffix}"
if key not in config and (db_global_config.suffix or f"{key}_0" not in config):
session.delete(db_global_config)
changed_plugins.add(session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_global_config.setting_id).first().plugin_id)
if key == "SERVER_NAME":
changed_services = True
for db_service_config in session.query(Services_settings).filter_by(method=method).all():
key = f"{db_service_config.service_id}_{db_service_config.setting_id}"
if db_service_config.suffix:
key = f"{key}_{db_service_config.suffix}"
if key not in config and (db_service_config.suffix or f"{key}_0" not in config):
session.delete(db_service_config)
changed_plugins.add(session.query(Settings).with_entities(Settings.plugin_id).filter_by(id=db_service_config.setting_id).first().plugin_id)
if config:
config.pop("DATABASE_URI", None)
@ -1140,6 +1180,7 @@ class Database:
session.query(Services_settings).filter(Services_settings.service_id.in_(missing_ids)).delete()
session.query(Custom_configs).filter(Custom_configs.service_id.in_(missing_ids)).delete()
session.query(Jobs_cache).filter(Jobs_cache.service_id.in_(missing_ids)).delete()
changed_services = True
drafts = {service for service in services if config.pop(f"{service}_IS_DRAFT", "no") == "yes"}
db_drafts = {service.id for service in db_services if service.is_draft}
@ -1152,6 +1193,7 @@ class Database:
if missing_drafts:
# Remove drafts that are no longer in the list
session.query(Services).filter(Services.id.in_(missing_drafts)).update({Services.is_draft: False})
changed_services = True
for draft in drafts:
if draft not in db_drafts:
@ -1160,6 +1202,7 @@ class Database:
db_ids[draft] = {"method": method, "is_draft": True}
elif method == db_ids[draft]["method"]:
session.query(Services).filter(Services.id == draft).update({Services.is_draft: True})
changed_services = True
if config.get("MULTISITE", "no") == "yes":
global_values = []
@ -1170,7 +1213,7 @@ class Database:
suffix = int(key.split("_")[-1])
key = key[: -len(str(suffix)) - 1]
setting = session.query(Settings).with_entities(Settings.default).filter_by(id=key).first()
setting = session.query(Settings).with_entities(Settings.default, Settings.plugin_id).filter_by(id=key).first()
if not setting and services:
try:
@ -1181,10 +1224,12 @@ class Database:
if server_name not in db_ids:
to_put.append(Services(id=server_name, method=method, is_draft=server_name in drafts))
db_ids[server_name] = {"method": method, "is_draft": server_name in drafts}
if server_name not in drafts:
changed_services = True
key = key.replace(f"{server_name}_", "")
original_key = original_key.replace(f"{server_name}_", "")
setting = session.query(Settings).with_entities(Settings.default).filter_by(id=key).first()
setting = session.query(Settings).with_entities(Settings.default, Settings.plugin_id).filter_by(id=key).first()
if not setting:
continue
@ -1192,11 +1237,7 @@ class Database:
service_setting = (
session.query(Services_settings)
.with_entities(Services_settings.value, Services_settings.method)
.filter_by(
service_id=server_name,
setting_id=key,
suffix=suffix,
)
.filter_by(service_id=server_name, setting_id=key, suffix=suffix)
.first()
)
@ -1206,45 +1247,29 @@ class Database:
):
continue
to_put.append(
Services_settings(
service_id=server_name,
setting_id=key,
value=value,
suffix=suffix,
method=method,
)
)
changed_plugins.add(setting.plugin_id)
to_put.append(Services_settings(service_id=server_name, setting_id=key, value=value, suffix=suffix, method=method))
elif method in (service_setting.method, "autoconf") and service_setting.value != value:
if key != "SERVER_NAME" and (
(original_key not in config and value == setting.default) or (original_key in config and value == config[original_key])
):
session.query(Services_settings).filter(
Services_settings.service_id == server_name,
Services_settings.setting_id == key,
Services_settings.suffix == suffix,
).delete()
continue
session.query(Services_settings).filter(
changed_plugins.add(setting.plugin_id)
query = session.query(Services_settings).filter(
Services_settings.service_id == server_name,
Services_settings.setting_id == key,
Services_settings.suffix == suffix,
).update(
{
Services_settings.value: value,
Services_settings.method: method,
}
)
if key != "SERVER_NAME" and (
(original_key not in config and value == setting.default) or (original_key in config and value == config[original_key])
):
query.delete()
continue
query.update({Services_settings.value: value, Services_settings.method: method})
elif setting and original_key not in global_values:
global_values.append(original_key)
global_value = (
session.query(Global_values)
.with_entities(Global_values.value, Global_values.method)
.filter_by(
setting_id=key,
suffix=suffix,
)
.filter_by(setting_id=key, suffix=suffix)
.first()
)
@ -1252,31 +1277,16 @@ class Database:
if value == setting.default:
continue
to_put.append(
Global_values(
setting_id=key,
value=value,
suffix=suffix,
method=method,
)
)
changed_plugins.add(setting.plugin_id)
to_put.append(Global_values(setting_id=key, value=value, suffix=suffix, method=method))
elif method in (global_value.method, "autoconf") and global_value.value != value:
if value == setting.default:
session.query(Global_values).filter(
Global_values.setting_id == key,
Global_values.suffix == suffix,
).delete()
continue
changed_plugins.add(setting.plugin_id)
query = session.query(Global_values).filter(Global_values.setting_id == key, Global_values.suffix == suffix)
session.query(Global_values).filter(
Global_values.setting_id == key,
Global_values.suffix == suffix,
).update(
{
Global_values.value: value,
Global_values.method: method,
}
)
if value == setting.default:
query.delete()
continue
query.update({Global_values.value: value, Global_values.method: method})
else:
if (
config.get("SERVER_NAME", "www.example.com")
@ -1286,6 +1296,7 @@ class Database:
.first()
):
to_put.append(Services(id=config.get("SERVER_NAME", "www.example.com").split(" ")[0], method=method))
changed_services = True
for key, value in config.items():
suffix = 0
@ -1293,7 +1304,7 @@ class Database:
suffix = int(key.split("_")[-1])
key = key[: -len(str(suffix)) - 1]
setting = session.query(Settings).with_entities(Settings.default).filter_by(id=key).first()
setting = session.query(Settings).with_entities(Settings.default, Settings.plugin_id).filter_by(id=key).first()
if not setting:
continue
@ -1309,26 +1320,16 @@ class Database:
if value == setting.default:
continue
to_put.append(
Global_values(
setting_id=key,
value=value,
suffix=suffix,
method=method,
)
)
changed_plugins.add(setting.plugin_id)
to_put.append(Global_values(setting_id=key, value=value, suffix=suffix, method=method))
elif global_value.method == method and value != global_value.value:
if value == setting.default:
session.query(Global_values).filter(
Global_values.setting_id == key,
Global_values.suffix == suffix,
).delete()
continue
changed_plugins.add(setting.plugin_id)
query = session.query(Global_values).filter(Global_values.setting_id == key, Global_values.suffix == suffix)
session.query(Global_values).filter(
Global_values.setting_id == key,
Global_values.suffix == suffix,
).update({Global_values.value: value})
if value == setting.default:
query.delete()
continue
query.update({Global_values.value: value})
if changed:
with suppress(ProgrammingError, OperationalError):
@ -1336,7 +1337,11 @@ class Database:
if metadata is not None:
if not metadata.first_config_saved:
metadata.first_config_saved = True
metadata.config_changed = True
if changed_services:
session.query(Plugins).update({Plugins.config_changed: True})
elif changed_plugins:
session.query(Plugins).filter(Plugins.id.in_(changed_plugins)).update({Plugins.config_changed: True})
try:
session.add_all(to_put)

View file

@ -58,6 +58,7 @@ class Plugins(Base):
method = Column(METHODS_ENUM, default="manual", nullable=False)
data = Column(LargeBinary(length=(2**32) - 1), nullable=True)
checksum = Column(String(128), nullable=True)
config_changed = Column(Boolean, default=False, nullable=True)
settings = relationship("Settings", back_populates="plugin", cascade="all, delete-orphan")
jobs = relationship("Jobs", back_populates="plugin", cascade="all, delete-orphan")
@ -243,7 +244,6 @@ class Metadata(Base):
custom_configs_changed = Column(Boolean, default=False, nullable=True)
external_plugins_changed = Column(Boolean, default=False, nullable=True)
pro_plugins_changed = Column(Boolean, default=False, nullable=True)
config_changed = Column(Boolean, default=False, nullable=True)
instances_changed = Column(Boolean, default=False, nullable=True)
integration = Column(INTEGRATIONS_ENUM, default="Unknown", nullable=False)
version = Column(String(32), default="1.5.8", nullable=False)

View file

@ -10,7 +10,7 @@ from os import cpu_count, environ, getenv, sep
from os.path import basename, dirname, join
from pathlib import Path
from re import match
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from schedule import (
Job,
clear as schedule_clear,
@ -274,7 +274,7 @@ class JobScheduler(ApiCaller):
return success
def run_once(self) -> bool:
def run_once(self, plugins: Optional[List[str]] = None) -> bool:
err = self.try_database_readonly()
if err:
return True
@ -283,7 +283,12 @@ class JobScheduler(ApiCaller):
self.__job_success = True
self.__job_reload = False
plugins = plugins or []
for plugin, jobs in self.__jobs.items():
if plugins and plugin not in plugins:
continue
# Add job to the list of jobs to run in the order they are defined
jobs_jobs = [partial(self.__job_wrapper, job["path"], plugin, job["name"], job["file"]) for job in jobs]
@ -337,14 +342,14 @@ class JobScheduler(ApiCaller):
def clear(self):
schedule_clear()
def reload(self, env: Dict[str, Any], apis: Optional[list] = None) -> bool:
def reload(self, env: Dict[str, Any], apis: Optional[list] = None, *, changed_plugins: Optional[List[str]] = None) -> bool:
ret = True
try:
self.__env = env
super().__init__(apis or self.apis)
self.clear()
self.__jobs = self.__get_jobs()
ret = self.run_once()
ret = self.run_once(changed_plugins)
self.setup()
except:
self.__logger.error(f"Exception while reloading scheduler {format_exc()}")

View file

@ -658,12 +658,14 @@ if __name__ == "__main__":
if INTEGRATION == "Docker" and not override_instances:
Thread(target=listen_for_instances_reload, name="listen_for_instances_reload").start()
changed_plugins = []
while True:
threads.clear()
if RUN_JOBS_ONCE:
# Only run jobs once
if not SCHEDULER.reload(env | environ):
if not SCHEDULER.reload(env | environ, changed_plugins=changed_plugins):
logger.error("At least one job in run_once() failed")
else:
logger.info("All jobs in run_once() were successful")
@ -748,6 +750,9 @@ if __name__ == "__main__":
ret = SCHEDULER.db.checked_changes(CHANGES)
if ret:
logger.error(f"An error occurred when setting the changes to checked in the database : {ret}")
ret = SCHEDULER.db.checked_plugins_changes(changed_plugins)
if ret:
logger.error(f"An error occurred when setting the plugins changes to checked in the database : {ret}")
except BaseException as e:
logger.error(f"Error while setting changes to checked in the database: {e}")
@ -758,6 +763,7 @@ if __name__ == "__main__":
PLUGINS_NEED_GENERATION = False
PRO_PLUGINS_NEED_GENERATION = False
INSTANCES_NEED_GENERATION = False
changed_plugins.clear()
if scheduler_first_start:
try:
@ -818,11 +824,12 @@ if __name__ == "__main__":
NEED_RELOAD = True
# check if the config have changed since last time
if changes["config_changed"]:
logger.info("Config changed, generating ...")
if changes["plugins_config_changed"]:
logger.info("Plugins config changed, generating ...")
CONFIG_NEED_GENERATION = True
RUN_JOBS_ONCE = True
NEED_RELOAD = True
changed_plugins = changes["plugins_config_changed"]
# check if the instances have changed since last time
if changes["instances_changed"]: