Implement threading startup tasks in scheduler to speed up the starting process

This commit is contained in:
Théophile Diot 2024-05-24 11:12:24 +01:00
parent 1e54e6adcd
commit 6179d6f5ff
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06

View file

@ -167,11 +167,16 @@ def generate_custom_configs(configs: List[Dict[str, Any]], *, original_path: Uni
logger.error("Sending custom configs failed, configuration will not work as expected...")
def generate_external_plugins(plugins: List[Dict[str, Any]], *, original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]], *, original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
if not isinstance(original_path, Path):
original_path = Path(original_path)
pro = "pro" in original_path.parts
if not plugins:
assert SCHEDULER is not None
plugins = SCHEDULER.db.get_plugins(_type="pro" if pro else "external", with_data=True)
assert plugins is not None, "Couldn't get plugins from database"
# Remove old external/pro plugins files
logger.info(f"Removing old/changed {'pro ' if pro else ''}external plugins files ...")
ignored_plugins = set()
@ -423,6 +428,8 @@ if __name__ == "__main__":
# Instantiate scheduler
SCHEDULER.env = env | environ
threads = []
if INTEGRATION in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
# Automatically setup the scheduler apis
while not SCHEDULER.apis:
@ -431,49 +438,49 @@ if __name__ == "__main__":
if not SCHEDULER.apis:
logger.warning("No BunkerWeb API found, retrying in 5s ...")
sleep(5)
SCHEDULER.db.update_instances([api_to_instance(api) for api in SCHEDULER.apis])
threads.append(Thread(target=SCHEDULER.db.update_instances, args=([api_to_instance(api) for api in SCHEDULER.apis],)))
scheduler_first_start = SCHEDULER.db.is_scheduler_first_start()
logger.info("Scheduler started ...")
# Checking if any custom config has been created by the user
logger.info("Checking if there are any changes in custom configs ...")
custom_configs = []
db_configs = SCHEDULER.db.get_custom_configs()
changes = False
for file in CUSTOM_CONFIGS_PATH.rglob("*.conf"):
if len(file.parts) > len(CUSTOM_CONFIGS_PATH.parts) + 3:
logger.warning(f"Custom config file {file} is not in the correct path, skipping ...")
def check_configs_changes():
# Checking if any custom config has been created by the user
logger.info("Checking if there are any changes in custom configs ...")
custom_configs = []
db_configs = SCHEDULER.db.get_custom_configs()
changes = False
for file in CUSTOM_CONFIGS_PATH.rglob("*.conf"):
if len(file.parts) > len(CUSTOM_CONFIGS_PATH.parts) + 3:
logger.warning(f"Custom config file {file} is not in the correct path, skipping ...")
content = file.read_text(encoding="utf-8")
service_id = file.parent.name if file.parent.name not in CUSTOM_CONFIGS_DIRS else None
config_type = file.parent.parent.name if service_id else file.parent.name
content = file.read_text(encoding="utf-8")
service_id = file.parent.name if file.parent.name not in CUSTOM_CONFIGS_DIRS else None
config_type = file.parent.parent.name if service_id else file.parent.name
saving = True
in_db = False
for db_conf in db_configs:
if db_conf["service_id"] == service_id and db_conf["name"] == file.stem:
in_db = True
saving = True
in_db = False
for db_conf in db_configs:
if db_conf["service_id"] == service_id and db_conf["name"] == file.stem:
in_db = True
if not in_db and content.startswith("# CREATED BY ENV"):
saving = False
changes = True
if not in_db and content.startswith("# CREATED BY ENV"):
saving = False
changes = True
if saving:
custom_configs.append({"value": content, "exploded": (service_id, config_type, file.stem)})
if saving:
custom_configs.append({"value": content, "exploded": (service_id, config_type, file.stem)})
changes = changes or {hash(dict_to_frozenset(d)) for d in custom_configs} != {hash(dict_to_frozenset(d)) for d in db_configs}
changes = changes or {hash(dict_to_frozenset(d)) for d in custom_configs} != {hash(dict_to_frozenset(d)) for d in db_configs}
if changes:
err = SCHEDULER.db.save_custom_configs(custom_configs, "manual")
if err:
logger.error(f"Couldn't save some manually created custom configs to database: {err}")
if changes:
err = SCHEDULER.db.save_custom_configs(custom_configs, "manual")
if err:
logger.error(f"Couldn't save some manually created custom configs to database: {err}")
if (scheduler_first_start and db_configs) or changes:
generate_custom_configs(SCHEDULER.db.get_custom_configs())
del custom_configs, db_configs
threads.append(Thread(target=check_configs_changes))
def check_plugin_changes(_type: Literal["external", "pro"] = "external"):
# Check if any external or pro plugin has been added by the user
@ -524,11 +531,15 @@ if __name__ == "__main__":
if err:
logger.error(f"Couldn't save some manually added {_type} plugins to database: {err}")
if (scheduler_first_start and db_plugins) or changes:
generate_external_plugins(SCHEDULER.db.get_plugins(_type=_type, with_data=True), original_path=plugin_path)
generate_external_plugins(SCHEDULER.db.get_plugins(_type=_type, with_data=True), original_path=plugin_path)
check_plugin_changes("external")
check_plugin_changes("pro")
threads.extend([Thread(target=check_plugin_changes, args=("external",)), Thread(target=check_plugin_changes, args=("pro",))])
for thread in threads:
thread.start()
for thread in threads:
thread.join()
logger.info("Running plugins download jobs ...")
@ -541,10 +552,18 @@ if __name__ == "__main__":
changes = SCHEDULER.db.check_changes()
if INTEGRATION not in ("Swarm", "Kubernetes", "Autoconf") and (changes["pro_plugins_changed"] or changes["external_plugins_changed"]):
threads.clear()
if changes["pro_plugins_changed"]:
generate_external_plugins(SCHEDULER.db.get_plugins(_type="pro", with_data=True), original_path=PRO_PLUGINS_PATH)
threads.append(Thread(target=generate_external_plugins, args=(None,), kwargs={"original_path": PRO_PLUGINS_PATH}))
if changes["external_plugins_changed"]:
generate_external_plugins(SCHEDULER.db.get_plugins(_type="external", with_data=True))
threads.append(Thread(target=generate_external_plugins, args=(None,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# run the config saver to save potential ignored external plugins settings
logger.info("Running config saver to save potential ignored external plugins settings ...")
@ -575,7 +594,6 @@ if __name__ == "__main__":
CONFIG_NEED_GENERATION = True
RUN_JOBS_ONCE = True
CHANGES = []
threads = []
def send_nginx_configs():
logger.info(f"Sending {join(sep, 'etc', 'nginx')} folder ...")