diff --git a/src/common/core/bunkernet/jobs/bunkernet-data.py b/src/common/core/bunkernet/jobs/bunkernet-data.py index eb39bb98a..13a12dc2c 100644 --- a/src/common/core/bunkernet/jobs/bunkernet-data.py +++ b/src/common/core/bunkernet/jobs/bunkernet-data.py @@ -5,7 +5,7 @@ from os.path import join from pathlib import Path from sys import exit as sys_exit, path as sys_path -for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]: +for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",), ("core", "bunkernet", "jobs"))]: if deps_path not in sys_path: sys_path.append(deps_path) diff --git a/src/common/core/bunkernet/jobs/bunkernet-register.py b/src/common/core/bunkernet/jobs/bunkernet-register.py index 2702e88b3..b6879da11 100644 --- a/src/common/core/bunkernet/jobs/bunkernet-register.py +++ b/src/common/core/bunkernet/jobs/bunkernet-register.py @@ -4,7 +4,7 @@ from os import getenv, sep from os.path import join from sys import exit as sys_exit, path as sys_path -for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]: +for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",), ("core", "bunkernet", "jobs"))]: if deps_path not in sys_path: sys_path.append(deps_path) diff --git a/src/common/utils/jobs.py b/src/common/utils/jobs.py index 7a894fd8a..b7bbe19bb 100644 --- a/src/common/utils/jobs.py +++ b/src/common/utils/jobs.py @@ -2,13 +2,13 @@ # -*- coding: utf-8 -*- from datetime import datetime, timedelta +from inspect import currentframe, getframeinfo from io import BytesIO from logging import Logger from os import getenv from os.path import sep from pathlib import Path from shutil import rmtree -from sys import argv from tarfile import TarFile, open as tar_open from threading import Lock from traceback import format_exc @@ -27,17 +27,17 @@ EXPIRE_TIME = { class Job: def __init__(self, logger: Optional[Logger] = None, db=None, *, job_name: str = "", deprecated: bool = False): - if not argv: - raise ValueError("argv could not be determined.") + frame = currentframe() + if not frame: + raise ValueError("frame could not be determined.") - source_file = argv[0] + source_path = Path(getframeinfo(frame.f_back).filename) - if source_file is None: + if not source_path.exists(): raise ValueError("source_file could not be determined.") elif not logger and not db: raise ValueError("Either logger or db must be provided.") - source_path = Path(source_file) self.job_path = Path(sep, "var", "cache", "bunkerweb", source_path.parent.parent.name) self.job_name = job_name or source_path.name.replace(".py", "") diff --git a/src/scheduler/JobScheduler.py b/src/scheduler/JobScheduler.py index bea7295b1..c5eae025a 100644 --- a/src/scheduler/JobScheduler.py +++ b/src/scheduler/JobScheduler.py @@ -5,6 +5,7 @@ from contextlib import suppress from datetime import datetime from functools import partial from glob import glob +from importlib.util import module_from_spec, spec_from_file_location from json import loads from logging import Logger from os import cpu_count, environ, getenv, sep @@ -14,7 +15,6 @@ import re from typing import Any, Dict, List, Optional import schedule from schedule import Job -from subprocess import DEVNULL, STDOUT, run from sys import path as sys_path from threading import Lock @@ -46,7 +46,7 @@ class JobScheduler(ApiCaller): self.__thread_lock = Lock() self.__job_success = True self.__job_reload = False - self.__executor = ThreadPoolExecutor(max_workers=cpu_count() or 1) + self.__executor = ThreadPoolExecutor(max_workers=min(32, (cpu_count() or 1) * 4)) self.__compiled_regexes = self.__compile_regexes() self.update_jobs() @@ -76,21 +76,28 @@ class JobScheduler(ApiCaller): join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json"), join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"), ] + for pattern in plugin_dirs: plugin_files.extend(glob(pattern)) - for plugin_file in plugin_files: + def load_plugin(plugin_file): plugin_name = basename(dirname(plugin_file)) - jobs[plugin_name] = [] try: plugin_data = loads(Path(plugin_file).read_text(encoding="utf-8")) plugin_jobs = plugin_data.get("jobs", []) - valid_jobs = self.__validate_jobs(plugin_jobs, plugin_name, plugin_file) - jobs[plugin_name] = valid_jobs + return plugin_name, self.__validate_jobs(plugin_jobs, plugin_name, plugin_file) except FileNotFoundError: self.__logger.warning(f"Plugin file not found: {plugin_file}") except Exception as e: self.__logger.warning(f"Exception while getting jobs for plugin {plugin_name}: {e}") + return plugin_name, [] + + # Load/validate plugins in parallel: + with ThreadPoolExecutor() as executor: + results = list(executor.map(load_plugin, plugin_files)) + + for plugin_name, valid_jobs in results: + jobs[plugin_name] = valid_jobs return jobs def __validate_jobs(self, plugin_jobs, plugin_name, plugin_file): @@ -145,20 +152,23 @@ class JobScheduler(ApiCaller): self.__logger.error("Error while reloading nginx") return False + def __load_plugin_module(self, path: str, name: str) -> Any: + """Dynamically import plugin module.""" + spec = spec_from_file_location(name, path) + module = module_from_spec(spec) + spec.loader.exec_module(module) + return module + def __job_wrapper(self, path: str, plugin: str, name: str, file: str) -> int: self.__logger.info(f"Executing job '{name}' from plugin '{plugin}'...") success = True ret = -1 start_date = datetime.now().astimezone() try: - proc = run( - join(path, "jobs", file), - stdin=DEVNULL, - stderr=STDOUT, - env=self.__env, - check=False, - ) - ret = proc.returncode + module = self.__load_plugin_module(join(path, "jobs", file), name) + ret = module.run(self.__env) + except SystemExit as e: + ret = e.code except Exception as e: success = False self.__logger.error(f"Exception while executing job '{name}' from plugin '{plugin}': {e}") @@ -190,6 +200,25 @@ class JobScheduler(ApiCaller): else: self.__logger.warning(f"Failed to add job run for the job '{name}': {err}") + def __update_cache_permissions(self): + """Update permissions for cache files and directories.""" + self.__logger.info("Updating /var/cache/bunkerweb permissions...") + cache_path = Path(sep, "var", "cache", "bunkerweb") + + DIR_MODE = 0o740 + FILE_MODE = 0o640 + + try: + # Process directories and files in a single pass + for item in cache_path.rglob("*"): + current_mode = item.stat().st_mode & 0o777 + target_mode = DIR_MODE if item.is_dir() else FILE_MODE + + if current_mode != target_mode: + item.chmod(target_mode) + except Exception as e: + self.__logger.error(f"Error while updating cache permissions: {e}") + def setup(self): for plugin, jobs in self.__jobs.items(): for job in jobs: @@ -247,15 +276,7 @@ class JobScheduler(ApiCaller): if pending_jobs: self.__logger.info("All scheduled jobs have been executed") - # Update cache files permissions to be 660 - for item in Path(sep, "var", "cache", "bunkerweb").rglob("*"): - try: - if item.is_dir(): - item.chmod(0o740) - continue - item.chmod(0o640) - except Exception as e: - self.__logger.error(f"Error while changing permissions for '{item}': {e}") + self.__update_cache_permissions() return success @@ -296,15 +317,7 @@ class JobScheduler(ApiCaller): for future in futures: future.result() - # Update cache files permissions to be 660 - for item in Path(sep, "var", "cache", "bunkerweb").rglob("*"): - try: - if item.is_dir(): - item.chmod(0o740) - continue - item.chmod(0o640) - except Exception as e: - self.__logger.error(f"Error while changing permissions for '{item}': {e}") + self.__update_cache_permissions() return self.__job_success @@ -338,15 +351,7 @@ class JobScheduler(ApiCaller): job_to_run["file"], ) - # Update cache files permissions to be 660 - for item in Path(sep, "var", "cache", "bunkerweb").rglob("*"): - try: - if item.is_dir(): - item.chmod(0o740) - continue - item.chmod(0o640) - except Exception as e: - self.__logger.error(f"Error while changing permissions for '{item}': {e}") + self.__update_cache_permissions() if self.__lock: self.__lock.release() @@ -402,7 +407,3 @@ class JobScheduler(ApiCaller): self.db.readonly = True return self.db.readonly - - def shutdown(self): - """Shut down the ThreadPoolExecutor.""" - self.__executor.shutdown(wait=True)