From 586beda91c1baeff6df50e1e914a29eba19213f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Diot?= Date: Mon, 23 Dec 2024 17:18:27 +0100 Subject: [PATCH] Refactor JobScheduler to improve environment handling and plugin execution --- src/scheduler/JobScheduler.py | 37 +++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/scheduler/JobScheduler.py b/src/scheduler/JobScheduler.py index 8f56ccc08..727a3d430 100644 --- a/src/scheduler/JobScheduler.py +++ b/src/scheduler/JobScheduler.py @@ -12,6 +12,7 @@ from os import cpu_count, environ, getenv, sep from os.path import basename, dirname, join from pathlib import Path import re +from types import ModuleType from typing import Any, Dict, List, Optional import schedule from schedule import Job @@ -41,7 +42,7 @@ class JobScheduler(ApiCaller): super().__init__(apis or []) self.__logger = logger or setup_logger("Scheduler", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO"))) self.db = db or Database(self.__logger) - self.__env = environ | env + self.__env = environ | (env or {}) self.__lock = lock self.__thread_lock = Lock() self.__job_success = True @@ -63,7 +64,7 @@ class JobScheduler(ApiCaller): @env.setter def env(self, env: Dict[str, Any]): - self.__env = environ | env + self.__env = env def update_jobs(self): self.__jobs = self.__get_jobs() @@ -152,12 +153,11 @@ class JobScheduler(ApiCaller): self.__logger.error("Error while reloading nginx") return False - def __load_plugin_module(self, path: str, name: str) -> Any: + def __exec_plugin_module(self, path: str, name: str) -> ModuleType: """Dynamically import plugin module.""" spec = spec_from_file_location(name, path) module = module_from_spec(spec) spec.loader.exec_module(module) - return module def __job_wrapper(self, path: str, plugin: str, name: str, file: str) -> int: self.__logger.info(f"Executing job '{name}' from plugin '{plugin}'...") @@ -165,11 +165,9 @@ class JobScheduler(ApiCaller): ret = -1 start_date = datetime.now().astimezone() try: - module = self.__load_plugin_module(join(path, "jobs", file), name) - job_env = self.__env.copy() - ret = module.run(job_env) + self.__exec_plugin_module(join(path, "jobs", file), name) except SystemExit as e: - ret = e.code + ret = e.code if isinstance(e.code, int) else 1 except Exception as e: success = False self.__logger.error(f"Exception while executing job '{name}' from plugin '{plugin}': {e}") @@ -246,7 +244,9 @@ class JobScheduler(ApiCaller): self.__job_success = True self.__job_reload = False - environ.update(environ | self.__env) + old_env = environ.copy() + environ.clear() + environ.update(old_env | self.__env) # Use ThreadPoolExecutor to run jobs futures = [self.__executor.submit(job.run) for job in pending_jobs] @@ -279,6 +279,9 @@ class JobScheduler(ApiCaller): if pending_jobs: self.__logger.info("All scheduled jobs have been executed") + environ.clear() + environ.update(old_env) + self.__update_cache_permissions() return success @@ -293,7 +296,9 @@ class JobScheduler(ApiCaller): plugins = plugins or [] - environ.update(environ | self.__env) + old_env = environ.copy() + environ.clear() + environ.update(old_env | self.__env) futures = [] for plugin, jobs in self.__jobs.items(): @@ -322,6 +327,9 @@ class JobScheduler(ApiCaller): for future in futures: future.result() + environ.clear() + environ.update(old_env) + self.__update_cache_permissions() return self.__job_success @@ -349,7 +357,9 @@ class JobScheduler(ApiCaller): self.__lock.release() return False - environ.update(environ | self.__env) + old_env = environ.copy() + environ.clear() + environ.update(old_env | self.__env) self.__job_wrapper( job_to_run["path"], @@ -358,6 +368,9 @@ class JobScheduler(ApiCaller): job_to_run["file"], ) + environ.clear() + environ.update(old_env) + self.__update_cache_permissions() if self.__lock: @@ -373,7 +386,7 @@ class JobScheduler(ApiCaller): def reload(self, env: Dict[str, Any], apis: Optional[list] = None, *, changed_plugins: Optional[List[str]] = None) -> bool: try: - self.__env = environ | env + self.__env = env super().__init__(apis or self.apis) self.clear() self.update_jobs()