mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Enhance job handling and cache permission management in JobScheduler
- Added core directory to dependency paths in bunkernet-data and bunkernet-register. - Improved error handling in Job class by using current frame for source path determination. - Refactored job execution to dynamically import plugin modules. - Introduced a method to update cache file permissions consistently.
This commit is contained in:
parent
671719f97e
commit
c68cd5ef23
4 changed files with 54 additions and 53 deletions
|
|
@ -5,7 +5,7 @@ from os.path import join
|
|||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]:
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",), ("core", "bunkernet", "jobs"))]:
|
||||
if deps_path not in sys_path:
|
||||
sys_path.append(deps_path)
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from os import getenv, sep
|
|||
from os.path import join
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]:
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",), ("core", "bunkernet", "jobs"))]:
|
||||
if deps_path not in sys_path:
|
||||
sys_path.append(deps_path)
|
||||
|
||||
|
|
|
|||
|
|
@ -2,13 +2,13 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from inspect import currentframe, getframeinfo
|
||||
from io import BytesIO
|
||||
from logging import Logger
|
||||
from os import getenv
|
||||
from os.path import sep
|
||||
from pathlib import Path
|
||||
from shutil import rmtree
|
||||
from sys import argv
|
||||
from tarfile import TarFile, open as tar_open
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
|
@ -27,17 +27,17 @@ EXPIRE_TIME = {
|
|||
|
||||
class Job:
|
||||
def __init__(self, logger: Optional[Logger] = None, db=None, *, job_name: str = "", deprecated: bool = False):
|
||||
if not argv:
|
||||
raise ValueError("argv could not be determined.")
|
||||
frame = currentframe()
|
||||
if not frame:
|
||||
raise ValueError("frame could not be determined.")
|
||||
|
||||
source_file = argv[0]
|
||||
source_path = Path(getframeinfo(frame.f_back).filename)
|
||||
|
||||
if source_file is None:
|
||||
if not source_path.exists():
|
||||
raise ValueError("source_file could not be determined.")
|
||||
elif not logger and not db:
|
||||
raise ValueError("Either logger or db must be provided.")
|
||||
|
||||
source_path = Path(source_file)
|
||||
self.job_path = Path(sep, "var", "cache", "bunkerweb", source_path.parent.parent.name)
|
||||
self.job_name = job_name or source_path.name.replace(".py", "")
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ from contextlib import suppress
|
|||
from datetime import datetime
|
||||
from functools import partial
|
||||
from glob import glob
|
||||
from importlib.util import module_from_spec, spec_from_file_location
|
||||
from json import loads
|
||||
from logging import Logger
|
||||
from os import cpu_count, environ, getenv, sep
|
||||
|
|
@ -14,7 +15,6 @@ import re
|
|||
from typing import Any, Dict, List, Optional
|
||||
import schedule
|
||||
from schedule import Job
|
||||
from subprocess import DEVNULL, STDOUT, run
|
||||
from sys import path as sys_path
|
||||
from threading import Lock
|
||||
|
||||
|
|
@ -46,7 +46,7 @@ class JobScheduler(ApiCaller):
|
|||
self.__thread_lock = Lock()
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
self.__executor = ThreadPoolExecutor(max_workers=cpu_count() or 1)
|
||||
self.__executor = ThreadPoolExecutor(max_workers=min(32, (cpu_count() or 1) * 4))
|
||||
self.__compiled_regexes = self.__compile_regexes()
|
||||
self.update_jobs()
|
||||
|
||||
|
|
@ -76,21 +76,28 @@ class JobScheduler(ApiCaller):
|
|||
join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json"),
|
||||
join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"),
|
||||
]
|
||||
|
||||
for pattern in plugin_dirs:
|
||||
plugin_files.extend(glob(pattern))
|
||||
|
||||
for plugin_file in plugin_files:
|
||||
def load_plugin(plugin_file):
|
||||
plugin_name = basename(dirname(plugin_file))
|
||||
jobs[plugin_name] = []
|
||||
try:
|
||||
plugin_data = loads(Path(plugin_file).read_text(encoding="utf-8"))
|
||||
plugin_jobs = plugin_data.get("jobs", [])
|
||||
valid_jobs = self.__validate_jobs(plugin_jobs, plugin_name, plugin_file)
|
||||
jobs[plugin_name] = valid_jobs
|
||||
return plugin_name, self.__validate_jobs(plugin_jobs, plugin_name, plugin_file)
|
||||
except FileNotFoundError:
|
||||
self.__logger.warning(f"Plugin file not found: {plugin_file}")
|
||||
except Exception as e:
|
||||
self.__logger.warning(f"Exception while getting jobs for plugin {plugin_name}: {e}")
|
||||
return plugin_name, []
|
||||
|
||||
# Load/validate plugins in parallel:
|
||||
with ThreadPoolExecutor() as executor:
|
||||
results = list(executor.map(load_plugin, plugin_files))
|
||||
|
||||
for plugin_name, valid_jobs in results:
|
||||
jobs[plugin_name] = valid_jobs
|
||||
return jobs
|
||||
|
||||
def __validate_jobs(self, plugin_jobs, plugin_name, plugin_file):
|
||||
|
|
@ -145,20 +152,23 @@ class JobScheduler(ApiCaller):
|
|||
self.__logger.error("Error while reloading nginx")
|
||||
return False
|
||||
|
||||
def __load_plugin_module(self, path: str, name: str) -> Any:
|
||||
"""Dynamically import plugin module."""
|
||||
spec = spec_from_file_location(name, path)
|
||||
module = module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
def __job_wrapper(self, path: str, plugin: str, name: str, file: str) -> int:
|
||||
self.__logger.info(f"Executing job '{name}' from plugin '{plugin}'...")
|
||||
success = True
|
||||
ret = -1
|
||||
start_date = datetime.now().astimezone()
|
||||
try:
|
||||
proc = run(
|
||||
join(path, "jobs", file),
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
env=self.__env,
|
||||
check=False,
|
||||
)
|
||||
ret = proc.returncode
|
||||
module = self.__load_plugin_module(join(path, "jobs", file), name)
|
||||
ret = module.run(self.__env)
|
||||
except SystemExit as e:
|
||||
ret = e.code
|
||||
except Exception as e:
|
||||
success = False
|
||||
self.__logger.error(f"Exception while executing job '{name}' from plugin '{plugin}': {e}")
|
||||
|
|
@ -190,6 +200,25 @@ class JobScheduler(ApiCaller):
|
|||
else:
|
||||
self.__logger.warning(f"Failed to add job run for the job '{name}': {err}")
|
||||
|
||||
def __update_cache_permissions(self):
|
||||
"""Update permissions for cache files and directories."""
|
||||
self.__logger.info("Updating /var/cache/bunkerweb permissions...")
|
||||
cache_path = Path(sep, "var", "cache", "bunkerweb")
|
||||
|
||||
DIR_MODE = 0o740
|
||||
FILE_MODE = 0o640
|
||||
|
||||
try:
|
||||
# Process directories and files in a single pass
|
||||
for item in cache_path.rglob("*"):
|
||||
current_mode = item.stat().st_mode & 0o777
|
||||
target_mode = DIR_MODE if item.is_dir() else FILE_MODE
|
||||
|
||||
if current_mode != target_mode:
|
||||
item.chmod(target_mode)
|
||||
except Exception as e:
|
||||
self.__logger.error(f"Error while updating cache permissions: {e}")
|
||||
|
||||
def setup(self):
|
||||
for plugin, jobs in self.__jobs.items():
|
||||
for job in jobs:
|
||||
|
|
@ -247,15 +276,7 @@ class JobScheduler(ApiCaller):
|
|||
if pending_jobs:
|
||||
self.__logger.info("All scheduled jobs have been executed")
|
||||
|
||||
# Update cache files permissions to be 660
|
||||
for item in Path(sep, "var", "cache", "bunkerweb").rglob("*"):
|
||||
try:
|
||||
if item.is_dir():
|
||||
item.chmod(0o740)
|
||||
continue
|
||||
item.chmod(0o640)
|
||||
except Exception as e:
|
||||
self.__logger.error(f"Error while changing permissions for '{item}': {e}")
|
||||
self.__update_cache_permissions()
|
||||
|
||||
return success
|
||||
|
||||
|
|
@ -296,15 +317,7 @@ class JobScheduler(ApiCaller):
|
|||
for future in futures:
|
||||
future.result()
|
||||
|
||||
# Update cache files permissions to be 660
|
||||
for item in Path(sep, "var", "cache", "bunkerweb").rglob("*"):
|
||||
try:
|
||||
if item.is_dir():
|
||||
item.chmod(0o740)
|
||||
continue
|
||||
item.chmod(0o640)
|
||||
except Exception as e:
|
||||
self.__logger.error(f"Error while changing permissions for '{item}': {e}")
|
||||
self.__update_cache_permissions()
|
||||
|
||||
return self.__job_success
|
||||
|
||||
|
|
@ -338,15 +351,7 @@ class JobScheduler(ApiCaller):
|
|||
job_to_run["file"],
|
||||
)
|
||||
|
||||
# Update cache files permissions to be 660
|
||||
for item in Path(sep, "var", "cache", "bunkerweb").rglob("*"):
|
||||
try:
|
||||
if item.is_dir():
|
||||
item.chmod(0o740)
|
||||
continue
|
||||
item.chmod(0o640)
|
||||
except Exception as e:
|
||||
self.__logger.error(f"Error while changing permissions for '{item}': {e}")
|
||||
self.__update_cache_permissions()
|
||||
|
||||
if self.__lock:
|
||||
self.__lock.release()
|
||||
|
|
@ -402,7 +407,3 @@ class JobScheduler(ApiCaller):
|
|||
self.db.readonly = True
|
||||
|
||||
return self.db.readonly
|
||||
|
||||
def shutdown(self):
|
||||
"""Shut down the ThreadPoolExecutor."""
|
||||
self.__executor.shutdown(wait=True)
|
||||
|
|
|
|||
Loading…
Reference in a new issue