Refactor Job Scheduler for better performance

This commit is contained in:
Théophile Diot 2024-09-30 16:00:56 +02:00
parent 5989988c93
commit 174e437046
No known key found for this signature in database
GPG key ID: FA995104A0BA376A

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from copy import deepcopy
from datetime import datetime
from functools import partial
from glob import glob
@ -10,19 +10,15 @@ from logging import Logger
from os import cpu_count, environ, getenv, sep
from os.path import basename, dirname, join
from pathlib import Path
from re import match
import re
from typing import Any, Dict, List, Optional
from schedule import (
Job,
clear as schedule_clear,
every as schedule_every,
jobs as schedule_jobs,
)
import schedule
from schedule import Job
from subprocess import DEVNULL, STDOUT, run
from sys import path as sys_path
from threading import Lock, Semaphore, Thread
from traceback import format_exc
from threading import Lock, Semaphore
# Add dependencies to sys.path
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("utils",), ("db",))]:
if deps_path not in sys_path:
sys_path.append(deps_path)
@ -45,14 +41,22 @@ class JobScheduler(ApiCaller):
super().__init__(apis or [])
self.__logger = logger or setup_logger("Scheduler", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
self.db = db or Database(self.__logger)
self.__env = env or {}
self.__env.update(environ)
self.__jobs = self.__get_jobs()
self.__env = {**environ, **(env or {})}
self.__lock = lock
self.__thread_lock = Lock()
self.__job_success = True
self.__job_reload = False
self.__semaphore = Semaphore(cpu_count() or 1)
self.__executor = ThreadPoolExecutor(max_workers=cpu_count() or 1)
self.__compiled_regexes = self.__compile_regexes()
self.update_jobs()
def __compile_regexes(self):
"""Precompile regular expressions for job validation."""
return {
"name": re.compile(r"^[\w.-]{1,128}$"),
"file": re.compile(r"^[\w./-]{1,256}$"),
}
@property
def env(self) -> Dict[str, Any]:
@ -60,98 +64,94 @@ class JobScheduler(ApiCaller):
@env.setter
def env(self, env: Dict[str, Any]):
self.__env = env
self.__env = environ | env
def update_jobs(self):
self.__jobs = self.__get_jobs()
def __get_jobs(self):
jobs = {}
for plugin_file in (
glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json"))
+ glob(join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json"))
+ glob(join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"))
): # core plugins # external plugins # pro plugins
plugin_files = []
plugin_dirs = [
join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json"),
join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json"),
join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"),
]
for pattern in plugin_dirs:
plugin_files.extend(glob(pattern))
for plugin_file in plugin_files:
plugin_name = basename(dirname(plugin_file))
jobs[plugin_name] = []
try:
plugin_data = loads(Path(plugin_file).read_text(encoding="utf-8"))
if "jobs" not in plugin_data:
continue
plugin_jobs = plugin_data["jobs"]
for x, job in enumerate(deepcopy(plugin_jobs)):
if not all(key in job.keys() for key in ("name", "file", "every", "reload")):
self.__logger.warning(
f"missing keys for job {job['name']} in plugin {plugin_name}, must have name, file, every and reload, ignoring job"
)
plugin_jobs.pop(x)
continue
if not match(r"^[\w.-]{1,128}$", job["name"]):
self.__logger.warning(
f"Invalid name for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128)), ignoring job"
)
plugin_jobs.pop(x)
continue
elif not match(r"^[\w./-]{1,256}$", job["file"]):
self.__logger.warning(
f"Invalid file for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256)), ignoring job"
)
plugin_jobs.pop(x)
continue
elif job["every"] not in ("once", "minute", "hour", "day", "week"):
self.__logger.warning(
f"Invalid every for job {job['name']} in plugin {plugin_name} (Must be once, minute, hour, day or week), ignoring job"
)
plugin_jobs.pop(x)
continue
elif job["reload"] is not True and job["reload"] is not False:
self.__logger.warning(f"Invalid reload for job {job['name']} in plugin {plugin_name} (Must be true or false), ignoring job")
plugin_jobs.pop(x)
continue
plugin_jobs[x]["path"] = dirname(plugin_file)
jobs[plugin_name] = plugin_jobs
plugin_jobs = plugin_data.get("jobs", [])
valid_jobs = self.__validate_jobs(plugin_jobs, plugin_name, plugin_file)
jobs[plugin_name] = valid_jobs
except FileNotFoundError:
pass
except:
self.__logger.warning(f"Exception while getting jobs for plugin {plugin_name} : {format_exc()}")
self.__logger.warning(f"Plugin file not found: {plugin_file}")
except Exception as e:
self.__logger.warning(f"Exception while getting jobs for plugin {plugin_name}: {e}")
return jobs
def __validate_jobs(self, plugin_jobs, plugin_name, plugin_file):
valid_jobs = []
for job in plugin_jobs:
if not all(k in job for k in ("name", "file", "every", "reload")):
self.__logger.warning(f"Missing keys in job definition in plugin {plugin_name}. Required keys: name, file, every, reload. Job: {job}")
continue
name_valid = self.__compiled_regexes["name"].match(job["name"])
file_valid = self.__compiled_regexes["file"].match(job["file"])
every_valid = job["every"] in ("once", "minute", "hour", "day", "week")
reload_valid = isinstance(job["reload"], bool)
if not (name_valid and file_valid and every_valid and reload_valid):
self.__logger.warning(f"Invalid job definition in plugin {plugin_name}. Job: {job}")
continue
job["path"] = dirname(plugin_file)
valid_jobs.append(job)
return valid_jobs
def __str_to_schedule(self, every: str) -> Job:
if every == "minute":
return schedule_every().minute
elif every == "hour":
return schedule_every().hour
elif every == "day":
return schedule_every().day
elif every == "week":
return schedule_every().week
raise ValueError(f"can't convert string {every} to schedule")
schedule_map = {
"minute": schedule.every().minute,
"hour": schedule.every().hour,
"day": schedule.every().day,
"week": schedule.every().week,
}
try:
return schedule_map[every]
except KeyError:
raise ValueError(f"Can't convert string '{every}' to schedule")
def __reload(self) -> bool:
self.__logger.info("Reloading nginx ...")
reload = self.send_to_apis("POST", "/reload")[0]
if reload:
self.__logger.info("Reloading nginx...")
reload_success = self.send_to_apis("POST", "/reload")[0]
if reload_success:
self.__logger.info("Successfully reloaded nginx")
return True
self.__logger.error("Error while reloading nginx")
return reload
return False
def __job_wrapper(self, path: str, plugin: str, name: str, file: str) -> int:
self.__logger.info(f"Executing job {name} from plugin {plugin} ...")
self.__logger.info(f"Executing job '{name}' from plugin '{plugin}'...")
success = True
ret = -1
start_date = datetime.now().astimezone()
try:
proc = run(join(path, "jobs", file), stdin=DEVNULL, stderr=STDOUT, env=self.__env, check=False)
proc = run(
join(path, "jobs", file),
stdin=DEVNULL,
stderr=STDOUT,
env=self.__env,
check=False,
)
ret = proc.returncode
except BaseException:
except Exception as e:
success = False
self.__logger.error(f"Exception while executing job {name} from plugin {plugin} :\n{format_exc()}")
self.__logger.error(f"Exception while executing job '{name}' from plugin '{plugin}': {e}")
with self.__thread_lock:
self.__job_success = False
end_date = datetime.now().astimezone()
@ -162,11 +162,12 @@ class JobScheduler(ApiCaller):
if self.__job_success and (ret < 0 or ret >= 2):
success = False
self.__logger.error(f"Error while executing job {name} from plugin {plugin}")
self.__logger.error(f"Error while executing job '{name}' from plugin '{plugin}'")
with self.__thread_lock:
self.__job_success = False
Thread(target=self.__add_job_run, args=(name, success, start_date, end_date)).start()
# Use the executor to manage threads
self.__executor.submit(self.__add_job_run, name, success, start_date, end_date)
return ret
@ -175,8 +176,9 @@ class JobScheduler(ApiCaller):
err = self.db.add_job_run(name, success, start_date, end_date)
if not err:
return self.__logger.info(f"Successfully added job run for the job {name}")
self.__logger.warning(f"Failed to add job run for the job {name}: {err}")
self.__logger.info(f"Successfully added job run for the job '{name}'")
else:
self.__logger.warning(f"Failed to add job run for the job '{name}': {err}")
def setup(self):
for plugin, jobs in self.__jobs.items():
@ -188,32 +190,28 @@ class JobScheduler(ApiCaller):
every = job["every"]
if every != "once":
self.__str_to_schedule(every).do(self.__job_wrapper, path, plugin, name, file)
except:
self.__logger.error(f"Exception while scheduling jobs for plugin {plugin} : {format_exc()}")
except Exception as e:
self.__logger.error(f"Exception while scheduling job '{name}' for plugin '{plugin}': {e}")
def run_pending(self) -> bool:
threads = []
for job in schedule_jobs:
if not job.should_run:
continue
threads.append(Thread(target=self.__run_in_thread, args=((job.run,),)))
pending_jobs = [job for job in schedule.jobs if job.should_run]
if not threads:
if not pending_jobs:
return True
err = self.try_database_readonly()
if err:
if self.try_database_readonly():
self.__logger.error("Database is in read-only mode, pending jobs will not be executed")
return True
self.__job_success = True
self.__job_reload = False
for thread in threads:
thread.start()
# Use ThreadPoolExecutor to run jobs
futures = [self.__executor.submit(self.__run_job_with_semaphore, job.run) for job in pending_jobs]
for thread in threads:
thread.join()
# Wait for all jobs to complete
for future in futures:
future.result()
success = self.__job_success
self.__job_success = True
@ -222,62 +220,63 @@ class JobScheduler(ApiCaller):
try:
if self.apis:
cache_path = join(sep, "var", "cache", "bunkerweb")
self.__logger.info(f"Sending {cache_path} folder ...")
self.__logger.info(f"Sending '{cache_path}' folder...")
if not self.send_files(cache_path, "/cache"):
success = False
self.__logger.error(f"Error while sending {cache_path} folder")
self.__logger.error(f"Error while sending '{cache_path}' folder")
else:
self.__logger.info(f"Successfully sent {cache_path} folder")
self.__logger.info(f"Successfully sent '{cache_path}' folder")
if not self.__reload():
success = False
except BaseException:
except Exception as e:
success = False
self.__logger.error(f"Exception while reloading after job scheduling : {format_exc()}")
self.__logger.error(f"Exception while reloading after job scheduling: {e}")
self.__job_reload = False
if threads:
if pending_jobs:
self.__logger.info("All scheduled jobs have been executed")
return success
def run_once(self, plugins: Optional[List[str]] = None) -> bool:
err = self.try_database_readonly()
if err:
if self.try_database_readonly():
self.__logger.error("Database is in read-only mode, jobs will not be executed")
return True
threads = []
self.__job_success = True
self.__job_reload = False
plugins = plugins or []
# Create a list of all jobs to run
jobs_to_run = []
for plugin, jobs in self.__jobs.items():
if plugins and plugin not in plugins:
continue
for job in jobs:
jobs_to_run.append(
partial(
self.__job_wrapper,
job["path"],
plugin,
job["name"],
job["file"],
)
)
# Add job to the list of jobs to run in the order they are defined
jobs_jobs = [partial(self.__job_wrapper, job["path"], plugin, job["name"], job["file"]) for job in jobs]
# Use ThreadPoolExecutor to run jobs
futures = [self.__executor.submit(self.__run_job_with_semaphore, job_func) for job_func in jobs_to_run]
# Create a thread for each plugin
threads.append(Thread(target=self.__run_in_thread, args=(jobs_jobs,)))
# Wait for all jobs to complete
for future in futures:
future.result()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
ret = self.__job_success
self.__job_success = True
return ret
return self.__job_success
def run_single(self, job_name: str) -> bool:
err = self.try_database_readonly()
if err:
self.__logger.error(f"Database is in read-only mode, single job {job_name} will not be executed")
if self.try_database_readonly():
self.__logger.error(f"Database is in read-only mode, single job '{job_name}' will not be executed")
return True
if self.__lock:
@ -293,37 +292,41 @@ class JobScheduler(ApiCaller):
break
if not job_plugin or not job_to_run:
self.__logger.warning(f"Job {job_name} not found")
self.__logger.warning(f"Job '{job_name}' not found")
if self.__lock:
self.__lock.release()
return False
self.__job_wrapper(job_to_run["path"], job_plugin, job_to_run["name"], job_to_run["file"])
self.__job_wrapper(
job_to_run["path"],
job_plugin,
job_to_run["name"],
job_to_run["file"],
)
if self.__lock:
self.__lock.release()
return self.__job_success
def __run_in_thread(self, jobs: list):
self.__semaphore.acquire(timeout=60)
for job in jobs:
job()
self.__semaphore.release()
def __run_job_with_semaphore(self, job_func):
with self.__semaphore:
job_func()
def clear(self):
schedule_clear()
schedule.clear()
def reload(self, env: Dict[str, Any], apis: Optional[list] = None, *, changed_plugins: Optional[List[str]] = None) -> bool:
ret = True
try:
self.__env = env
self.__env = environ | env
super().__init__(apis or self.apis)
self.clear()
self.__jobs = self.__get_jobs()
ret = self.run_once(changed_plugins)
self.update_jobs()
success = self.run_once(changed_plugins)
self.setup()
except:
self.__logger.error(f"Exception while reloading scheduler {format_exc()}")
return success
except Exception as e:
self.__logger.error(f"Exception while reloading scheduler: {e}")
return False
return ret
def try_database_readonly(self, force: bool = False) -> bool:
if not self.db.readonly:
@ -331,7 +334,7 @@ class JobScheduler(ApiCaller):
self.db.test_write()
self.db.readonly = False
return False
except BaseException:
except Exception:
self.db.readonly = True
return True
elif not force and self.db.last_connection_retry and (datetime.now().astimezone() - self.db.last_connection_retry).total_seconds() > 30:
@ -343,15 +346,19 @@ class JobScheduler(ApiCaller):
self.db.retry_connection(log=False)
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
except Exception:
try:
self.db.retry_connection(readonly=True, pool_timeout=1)
self.db.retry_connection(readonly=True, log=False)
except BaseException:
except Exception:
if self.db.database_uri_readonly:
with suppress(BaseException):
with suppress(Exception):
self.db.retry_connection(fallback=True, pool_timeout=1)
self.db.retry_connection(fallback=True, log=False)
self.db.readonly = True
return self.db.readonly
def shutdown(self):
"""Shut down the ThreadPoolExecutor."""
self.__executor.shutdown(wait=True)