Refactor Job class initialization to include plugin_id and improve job path validation

This commit is contained in:
Théophile Diot 2024-12-27 10:00:00 +01:00
parent c2c52f5f71
commit e36755841d
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
2 changed files with 28 additions and 13 deletions

View file

@ -26,20 +26,32 @@ EXPIRE_TIME = {
class Job:
def __init__(self, logger: Optional[Logger] = None, db=None, *, job_name: str = "", deprecated: bool = False):
frame = currentframe()
if not frame:
raise ValueError("frame could not be determined.")
def __init__(self, logger: Optional[Logger] = None, db=None, *, plugin_id: str = "", job_name: str = "", deprecated: bool = False):
if not plugin_id:
frame = currentframe()
if not frame:
raise ValueError("frame could not be determined.")
source_path = Path(getframeinfo(frame.f_back).filename)
source_path = Path(getframeinfo(frame.f_back).filename)
if not source_path.exists():
raise ValueError("source_file could not be determined.")
elif not logger and not db:
raise ValueError("Either logger or db must be provided.")
if not source_path.exists():
raise ValueError("source_file could not be determined.")
elif not logger and not db:
raise ValueError("Either logger or db must be provided.")
self.job_path = Path(sep, "var", "cache", "bunkerweb", source_path.parent.parent.name)
self.job_name = job_name or source_path.name.replace(".py", "")
plugin_id = source_path.parent.parent.name
job_name = job_name or source_path.name.replace(".py", "")
if not job_name:
raise ValueError("Could not determine job name.")
# Set job_path and job_name
self.job_path = Path(sep, "var", "cache", "bunkerweb", plugin_id)
self.job_name = job_name
# Additional validation for job_path
if self.job_path == Path(sep, "var", "cache", "bunkerweb"):
raise ValueError("Could not determine job path. Ensure passed_plugin_id is valid.")
self.db = db
if not self.db:

View file

@ -153,13 +153,16 @@ class JobScheduler(ApiCaller):
self.__logger.error("Error while reloading nginx")
return False
def __exec_plugin_module(self, path: str, name: str) -> ModuleType:
def __exec_plugin_module(self, path: str, name: str, **kwargs) -> ModuleType:
"""Dynamically import plugin module."""
module_dir = dirname(path)
sys_path.insert(0, module_dir)
try:
spec = spec_from_file_location(name, path)
module = module_from_spec(spec)
# Set any arguments as module attributes before execution
for key, value in kwargs.items():
setattr(module, key, value)
spec.loader.exec_module(module)
finally:
sys_path.remove(module_dir)
@ -170,7 +173,7 @@ class JobScheduler(ApiCaller):
ret = -1
start_date = datetime.now().astimezone()
try:
self.__exec_plugin_module(join(path, "jobs", file), name)
self.__exec_plugin_module(join(path, "jobs", file), name, passed_plugin_id=plugin, passed_job_name=name)
except SystemExit as e:
ret = e.code if isinstance(e.code, int) else 1
except Exception as e: