mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Refactor JobScheduler to improve environment handling and plugin execution
This commit is contained in:
parent
d46605bf18
commit
586beda91c
1 changed files with 25 additions and 12 deletions
|
|
@ -12,6 +12,7 @@ from os import cpu_count, environ, getenv, sep
|
|||
from os.path import basename, dirname, join
|
||||
from pathlib import Path
|
||||
import re
|
||||
from types import ModuleType
|
||||
from typing import Any, Dict, List, Optional
|
||||
import schedule
|
||||
from schedule import Job
|
||||
|
|
@ -41,7 +42,7 @@ class JobScheduler(ApiCaller):
|
|||
super().__init__(apis or [])
|
||||
self.__logger = logger or setup_logger("Scheduler", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
|
||||
self.db = db or Database(self.__logger)
|
||||
self.__env = environ | env
|
||||
self.__env = environ | (env or {})
|
||||
self.__lock = lock
|
||||
self.__thread_lock = Lock()
|
||||
self.__job_success = True
|
||||
|
|
@ -63,7 +64,7 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
@env.setter
|
||||
def env(self, env: Dict[str, Any]):
|
||||
self.__env = environ | env
|
||||
self.__env = env
|
||||
|
||||
def update_jobs(self):
|
||||
self.__jobs = self.__get_jobs()
|
||||
|
|
@ -152,12 +153,11 @@ class JobScheduler(ApiCaller):
|
|||
self.__logger.error("Error while reloading nginx")
|
||||
return False
|
||||
|
||||
def __load_plugin_module(self, path: str, name: str) -> Any:
|
||||
def __exec_plugin_module(self, path: str, name: str) -> ModuleType:
|
||||
"""Dynamically import plugin module."""
|
||||
spec = spec_from_file_location(name, path)
|
||||
module = module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
def __job_wrapper(self, path: str, plugin: str, name: str, file: str) -> int:
|
||||
self.__logger.info(f"Executing job '{name}' from plugin '{plugin}'...")
|
||||
|
|
@ -165,11 +165,9 @@ class JobScheduler(ApiCaller):
|
|||
ret = -1
|
||||
start_date = datetime.now().astimezone()
|
||||
try:
|
||||
module = self.__load_plugin_module(join(path, "jobs", file), name)
|
||||
job_env = self.__env.copy()
|
||||
ret = module.run(job_env)
|
||||
self.__exec_plugin_module(join(path, "jobs", file), name)
|
||||
except SystemExit as e:
|
||||
ret = e.code
|
||||
ret = e.code if isinstance(e.code, int) else 1
|
||||
except Exception as e:
|
||||
success = False
|
||||
self.__logger.error(f"Exception while executing job '{name}' from plugin '{plugin}': {e}")
|
||||
|
|
@ -246,7 +244,9 @@ class JobScheduler(ApiCaller):
|
|||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
|
||||
environ.update(environ | self.__env)
|
||||
old_env = environ.copy()
|
||||
environ.clear()
|
||||
environ.update(old_env | self.__env)
|
||||
|
||||
# Use ThreadPoolExecutor to run jobs
|
||||
futures = [self.__executor.submit(job.run) for job in pending_jobs]
|
||||
|
|
@ -279,6 +279,9 @@ class JobScheduler(ApiCaller):
|
|||
if pending_jobs:
|
||||
self.__logger.info("All scheduled jobs have been executed")
|
||||
|
||||
environ.clear()
|
||||
environ.update(old_env)
|
||||
|
||||
self.__update_cache_permissions()
|
||||
|
||||
return success
|
||||
|
|
@ -293,7 +296,9 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
plugins = plugins or []
|
||||
|
||||
environ.update(environ | self.__env)
|
||||
old_env = environ.copy()
|
||||
environ.clear()
|
||||
environ.update(old_env | self.__env)
|
||||
|
||||
futures = []
|
||||
for plugin, jobs in self.__jobs.items():
|
||||
|
|
@ -322,6 +327,9 @@ class JobScheduler(ApiCaller):
|
|||
for future in futures:
|
||||
future.result()
|
||||
|
||||
environ.clear()
|
||||
environ.update(old_env)
|
||||
|
||||
self.__update_cache_permissions()
|
||||
|
||||
return self.__job_success
|
||||
|
|
@ -349,7 +357,9 @@ class JobScheduler(ApiCaller):
|
|||
self.__lock.release()
|
||||
return False
|
||||
|
||||
environ.update(environ | self.__env)
|
||||
old_env = environ.copy()
|
||||
environ.clear()
|
||||
environ.update(old_env | self.__env)
|
||||
|
||||
self.__job_wrapper(
|
||||
job_to_run["path"],
|
||||
|
|
@ -358,6 +368,9 @@ class JobScheduler(ApiCaller):
|
|||
job_to_run["file"],
|
||||
)
|
||||
|
||||
environ.clear()
|
||||
environ.update(old_env)
|
||||
|
||||
self.__update_cache_permissions()
|
||||
|
||||
if self.__lock:
|
||||
|
|
@ -373,7 +386,7 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
def reload(self, env: Dict[str, Any], apis: Optional[list] = None, *, changed_plugins: Optional[List[str]] = None) -> bool:
|
||||
try:
|
||||
self.__env = environ | env
|
||||
self.__env = env
|
||||
super().__init__(apis or self.apis)
|
||||
self.clear()
|
||||
self.update_jobs()
|
||||
|
|
|
|||
Loading…
Reference in a new issue