Refactor JobScheduler to improve environment handling and plugin execution

This commit is contained in:
Théophile Diot 2024-12-23 17:18:27 +01:00
parent d46605bf18
commit 586beda91c
No known key found for this signature in database
GPG key ID: FA995104A0BA376A

View file

@ -12,6 +12,7 @@ from os import cpu_count, environ, getenv, sep
from os.path import basename, dirname, join
from pathlib import Path
import re
from types import ModuleType
from typing import Any, Dict, List, Optional
import schedule
from schedule import Job
@ -41,7 +42,7 @@ class JobScheduler(ApiCaller):
super().__init__(apis or [])
self.__logger = logger or setup_logger("Scheduler", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
self.db = db or Database(self.__logger)
self.__env = environ | env
self.__env = environ | (env or {})
self.__lock = lock
self.__thread_lock = Lock()
self.__job_success = True
@ -63,7 +64,7 @@ class JobScheduler(ApiCaller):
@env.setter
def env(self, env: Dict[str, Any]):
self.__env = environ | env
self.__env = env
def update_jobs(self):
self.__jobs = self.__get_jobs()
@ -152,12 +153,11 @@ class JobScheduler(ApiCaller):
self.__logger.error("Error while reloading nginx")
return False
def __load_plugin_module(self, path: str, name: str) -> Any:
def __exec_plugin_module(self, path: str, name: str) -> ModuleType:
"""Dynamically import plugin module."""
spec = spec_from_file_location(name, path)
module = module_from_spec(spec)
spec.loader.exec_module(module)
return module
def __job_wrapper(self, path: str, plugin: str, name: str, file: str) -> int:
self.__logger.info(f"Executing job '{name}' from plugin '{plugin}'...")
@ -165,11 +165,9 @@ class JobScheduler(ApiCaller):
ret = -1
start_date = datetime.now().astimezone()
try:
module = self.__load_plugin_module(join(path, "jobs", file), name)
job_env = self.__env.copy()
ret = module.run(job_env)
self.__exec_plugin_module(join(path, "jobs", file), name)
except SystemExit as e:
ret = e.code
ret = e.code if isinstance(e.code, int) else 1
except Exception as e:
success = False
self.__logger.error(f"Exception while executing job '{name}' from plugin '{plugin}': {e}")
@ -246,7 +244,9 @@ class JobScheduler(ApiCaller):
self.__job_success = True
self.__job_reload = False
environ.update(environ | self.__env)
old_env = environ.copy()
environ.clear()
environ.update(old_env | self.__env)
# Use ThreadPoolExecutor to run jobs
futures = [self.__executor.submit(job.run) for job in pending_jobs]
@ -279,6 +279,9 @@ class JobScheduler(ApiCaller):
if pending_jobs:
self.__logger.info("All scheduled jobs have been executed")
environ.clear()
environ.update(old_env)
self.__update_cache_permissions()
return success
@ -293,7 +296,9 @@ class JobScheduler(ApiCaller):
plugins = plugins or []
environ.update(environ | self.__env)
old_env = environ.copy()
environ.clear()
environ.update(old_env | self.__env)
futures = []
for plugin, jobs in self.__jobs.items():
@ -322,6 +327,9 @@ class JobScheduler(ApiCaller):
for future in futures:
future.result()
environ.clear()
environ.update(old_env)
self.__update_cache_permissions()
return self.__job_success
@ -349,7 +357,9 @@ class JobScheduler(ApiCaller):
self.__lock.release()
return False
environ.update(environ | self.__env)
old_env = environ.copy()
environ.clear()
environ.update(old_env | self.__env)
self.__job_wrapper(
job_to_run["path"],
@ -358,6 +368,9 @@ class JobScheduler(ApiCaller):
job_to_run["file"],
)
environ.clear()
environ.update(old_env)
self.__update_cache_permissions()
if self.__lock:
@ -373,7 +386,7 @@ class JobScheduler(ApiCaller):
def reload(self, env: Dict[str, Any], apis: Optional[list] = None, *, changed_plugins: Optional[List[str]] = None) -> bool:
try:
self.__env = environ | env
self.__env = env
super().__init__(apis or self.apis)
self.clear()
self.update_jobs()