Enhance Job and JobScheduler classes with unique environment handling and improved initialization

This commit is contained in:
Théophile Diot 2024-12-27 11:43:31 +01:00
parent c4cce2a1ac
commit 1d022fbe45
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
2 changed files with 31 additions and 11 deletions

View file

@ -26,8 +26,13 @@ EXPIRE_TIME = {
class Job:
def __init__(self, logger: Optional[Logger] = None, db=None, *, plugin_id: str = "", job_name: str = "", deprecated: bool = False):
if not plugin_id:
def __init__(self, logger: Optional[Logger] = None, db=None, *, job_name: str = "", deprecated: bool = False):
"""Initialize Job class."""
unique_id = getattr(__name__, "unique_env_id", None)
if unique_id:
plugin_id = getenv(f"{unique_id}_PLUGIN_ID", "")
job_name = job_name or getenv(f"{unique_id}_JOB_NAME", "")
else:
frame = currentframe()
if not frame:
raise ValueError("frame could not be determined.")

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from contextlib import contextmanager, suppress
from datetime import datetime
from functools import partial
from glob import glob
@ -14,6 +14,7 @@ from pathlib import Path
import re
from types import ModuleType
from typing import Any, Dict, List, Optional
from uuid import uuid4
import schedule
from schedule import Job
from sys import path as sys_path
@ -29,6 +30,21 @@ from logger import setup_logger # type: ignore
from ApiCaller import ApiCaller # type: ignore
@contextmanager
def unique_env_context(**kwargs):
"""Context manager for setting and cleaning up unique-prefixed environment variables."""
unique_id = uuid4().hex # Generate a unique identifier
prefixed_keys = {f"{unique_id}_{key}": str(value) for key, value in kwargs.items()}
try:
# Set environment variables with unique prefixes
environ.update(prefixed_keys)
yield unique_id # Provide the unique ID for retrieval
finally:
# Clean up environment variables
for key in prefixed_keys:
environ.pop(key, None)
class JobScheduler(ApiCaller):
def __init__(
self,
@ -154,16 +170,15 @@ class JobScheduler(ApiCaller):
return False
def __exec_plugin_module(self, path: str, name: str, **kwargs) -> ModuleType:
"""Dynamically import plugin module."""
"""Dynamically import a plugin module with thread-local environment."""
module_dir = dirname(path)
sys_path.insert(0, module_dir)
try:
spec = spec_from_file_location(name, path)
module = module_from_spec(spec)
# Set any arguments as module attributes before execution
for key, value in kwargs.items():
setattr(module, key, value)
spec.loader.exec_module(module)
with unique_env_context(**kwargs) as unique_id:
spec = spec_from_file_location(name, path)
module = module_from_spec(spec)
setattr(module, "unique_env_id", unique_id)
spec.loader.exec_module(module)
finally:
sys_path.remove(module_dir)
@ -173,7 +188,7 @@ class JobScheduler(ApiCaller):
ret = -1
start_date = datetime.now().astimezone()
try:
self.__exec_plugin_module(join(path, "jobs", file), name, passed_plugin_id=plugin, passed_job_name=name)
self.__exec_plugin_module(join(path, "jobs", file), name, PLUGIN_ID=plugin, JOB_NAME=name)
except SystemExit as e:
ret = e.code if isinstance(e.code, int) else 1
except Exception as e: