Enhance job handling and cache permission management in JobScheduler

- Added core directory to dependency paths in bunkernet-data and bunkernet-register.
- Improved error handling in Job class by using current frame for source path determination.
- Refactored job execution to dynamically import plugin modules.
- Introduced a method to update cache file permissions consistently.
This commit is contained in:
Théophile Diot 2024-12-23 11:18:12 +01:00
parent 671719f97e
commit c68cd5ef23
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
4 changed files with 54 additions and 53 deletions

View file

@ -5,7 +5,7 @@ from os.path import join
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]:
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",), ("core", "bunkernet", "jobs"))]:
if deps_path not in sys_path:
sys_path.append(deps_path)

View file

@ -4,7 +4,7 @@ from os import getenv, sep
from os.path import join
from sys import exit as sys_exit, path as sys_path
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]:
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",), ("core", "bunkernet", "jobs"))]:
if deps_path not in sys_path:
sys_path.append(deps_path)

View file

@ -2,13 +2,13 @@
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
from inspect import currentframe, getframeinfo
from io import BytesIO
from logging import Logger
from os import getenv
from os.path import sep
from pathlib import Path
from shutil import rmtree
from sys import argv
from tarfile import TarFile, open as tar_open
from threading import Lock
from traceback import format_exc
@ -27,17 +27,17 @@ EXPIRE_TIME = {
class Job:
def __init__(self, logger: Optional[Logger] = None, db=None, *, job_name: str = "", deprecated: bool = False):
if not argv:
raise ValueError("argv could not be determined.")
frame = currentframe()
if not frame:
raise ValueError("frame could not be determined.")
source_file = argv[0]
source_path = Path(getframeinfo(frame.f_back).filename)
if source_file is None:
if not source_path.exists():
raise ValueError("source_file could not be determined.")
elif not logger and not db:
raise ValueError("Either logger or db must be provided.")
source_path = Path(source_file)
self.job_path = Path(sep, "var", "cache", "bunkerweb", source_path.parent.parent.name)
self.job_name = job_name or source_path.name.replace(".py", "")

View file

@ -5,6 +5,7 @@ from contextlib import suppress
from datetime import datetime
from functools import partial
from glob import glob
from importlib.util import module_from_spec, spec_from_file_location
from json import loads
from logging import Logger
from os import cpu_count, environ, getenv, sep
@ -14,7 +15,6 @@ import re
from typing import Any, Dict, List, Optional
import schedule
from schedule import Job
from subprocess import DEVNULL, STDOUT, run
from sys import path as sys_path
from threading import Lock
@ -46,7 +46,7 @@ class JobScheduler(ApiCaller):
self.__thread_lock = Lock()
self.__job_success = True
self.__job_reload = False
self.__executor = ThreadPoolExecutor(max_workers=cpu_count() or 1)
self.__executor = ThreadPoolExecutor(max_workers=min(32, (cpu_count() or 1) * 4))
self.__compiled_regexes = self.__compile_regexes()
self.update_jobs()
@ -76,21 +76,28 @@ class JobScheduler(ApiCaller):
join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json"),
join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"),
]
for pattern in plugin_dirs:
plugin_files.extend(glob(pattern))
for plugin_file in plugin_files:
def load_plugin(plugin_file):
plugin_name = basename(dirname(plugin_file))
jobs[plugin_name] = []
try:
plugin_data = loads(Path(plugin_file).read_text(encoding="utf-8"))
plugin_jobs = plugin_data.get("jobs", [])
valid_jobs = self.__validate_jobs(plugin_jobs, plugin_name, plugin_file)
jobs[plugin_name] = valid_jobs
return plugin_name, self.__validate_jobs(plugin_jobs, plugin_name, plugin_file)
except FileNotFoundError:
self.__logger.warning(f"Plugin file not found: {plugin_file}")
except Exception as e:
self.__logger.warning(f"Exception while getting jobs for plugin {plugin_name}: {e}")
return plugin_name, []
# Load/validate plugins in parallel:
with ThreadPoolExecutor() as executor:
results = list(executor.map(load_plugin, plugin_files))
for plugin_name, valid_jobs in results:
jobs[plugin_name] = valid_jobs
return jobs
def __validate_jobs(self, plugin_jobs, plugin_name, plugin_file):
@ -145,20 +152,23 @@ class JobScheduler(ApiCaller):
self.__logger.error("Error while reloading nginx")
return False
def __load_plugin_module(self, path: str, name: str) -> Any:
"""Dynamically import plugin module."""
spec = spec_from_file_location(name, path)
module = module_from_spec(spec)
spec.loader.exec_module(module)
return module
def __job_wrapper(self, path: str, plugin: str, name: str, file: str) -> int:
self.__logger.info(f"Executing job '{name}' from plugin '{plugin}'...")
success = True
ret = -1
start_date = datetime.now().astimezone()
try:
proc = run(
join(path, "jobs", file),
stdin=DEVNULL,
stderr=STDOUT,
env=self.__env,
check=False,
)
ret = proc.returncode
module = self.__load_plugin_module(join(path, "jobs", file), name)
ret = module.run(self.__env)
except SystemExit as e:
ret = e.code
except Exception as e:
success = False
self.__logger.error(f"Exception while executing job '{name}' from plugin '{plugin}': {e}")
@ -190,6 +200,25 @@ class JobScheduler(ApiCaller):
else:
self.__logger.warning(f"Failed to add job run for the job '{name}': {err}")
def __update_cache_permissions(self):
"""Update permissions for cache files and directories."""
self.__logger.info("Updating /var/cache/bunkerweb permissions...")
cache_path = Path(sep, "var", "cache", "bunkerweb")
DIR_MODE = 0o740
FILE_MODE = 0o640
try:
# Process directories and files in a single pass
for item in cache_path.rglob("*"):
current_mode = item.stat().st_mode & 0o777
target_mode = DIR_MODE if item.is_dir() else FILE_MODE
if current_mode != target_mode:
item.chmod(target_mode)
except Exception as e:
self.__logger.error(f"Error while updating cache permissions: {e}")
def setup(self):
for plugin, jobs in self.__jobs.items():
for job in jobs:
@ -247,15 +276,7 @@ class JobScheduler(ApiCaller):
if pending_jobs:
self.__logger.info("All scheduled jobs have been executed")
# Update cache files permissions to be 660
for item in Path(sep, "var", "cache", "bunkerweb").rglob("*"):
try:
if item.is_dir():
item.chmod(0o740)
continue
item.chmod(0o640)
except Exception as e:
self.__logger.error(f"Error while changing permissions for '{item}': {e}")
self.__update_cache_permissions()
return success
@ -296,15 +317,7 @@ class JobScheduler(ApiCaller):
for future in futures:
future.result()
# Update cache files permissions to be 660
for item in Path(sep, "var", "cache", "bunkerweb").rglob("*"):
try:
if item.is_dir():
item.chmod(0o740)
continue
item.chmod(0o640)
except Exception as e:
self.__logger.error(f"Error while changing permissions for '{item}': {e}")
self.__update_cache_permissions()
return self.__job_success
@ -338,15 +351,7 @@ class JobScheduler(ApiCaller):
job_to_run["file"],
)
# Update cache files permissions to be 660
for item in Path(sep, "var", "cache", "bunkerweb").rglob("*"):
try:
if item.is_dir():
item.chmod(0o740)
continue
item.chmod(0o640)
except Exception as e:
self.__logger.error(f"Error while changing permissions for '{item}': {e}")
self.__update_cache_permissions()
if self.__lock:
self.__lock.release()
@ -402,7 +407,3 @@ class JobScheduler(ApiCaller):
self.db.readonly = True
return self.db.readonly
def shutdown(self):
"""Shut down the ThreadPoolExecutor."""
self.__executor.shutdown(wait=True)