mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Refactor Job Scheduler for better performance
This commit is contained in:
parent
5989988c93
commit
174e437046
1 changed files with 148 additions and 141 deletions
|
|
@ -1,7 +1,7 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from contextlib import suppress
|
||||
from copy import deepcopy
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
from glob import glob
|
||||
|
|
@ -10,19 +10,15 @@ from logging import Logger
|
|||
from os import cpu_count, environ, getenv, sep
|
||||
from os.path import basename, dirname, join
|
||||
from pathlib import Path
|
||||
from re import match
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional
|
||||
from schedule import (
|
||||
Job,
|
||||
clear as schedule_clear,
|
||||
every as schedule_every,
|
||||
jobs as schedule_jobs,
|
||||
)
|
||||
import schedule
|
||||
from schedule import Job
|
||||
from subprocess import DEVNULL, STDOUT, run
|
||||
from sys import path as sys_path
|
||||
from threading import Lock, Semaphore, Thread
|
||||
from traceback import format_exc
|
||||
from threading import Lock, Semaphore
|
||||
|
||||
# Add dependencies to sys.path
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("utils",), ("db",))]:
|
||||
if deps_path not in sys_path:
|
||||
sys_path.append(deps_path)
|
||||
|
|
@ -45,14 +41,22 @@ class JobScheduler(ApiCaller):
|
|||
super().__init__(apis or [])
|
||||
self.__logger = logger or setup_logger("Scheduler", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
|
||||
self.db = db or Database(self.__logger)
|
||||
self.__env = env or {}
|
||||
self.__env.update(environ)
|
||||
self.__jobs = self.__get_jobs()
|
||||
self.__env = {**environ, **(env or {})}
|
||||
self.__lock = lock
|
||||
self.__thread_lock = Lock()
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
self.__semaphore = Semaphore(cpu_count() or 1)
|
||||
self.__executor = ThreadPoolExecutor(max_workers=cpu_count() or 1)
|
||||
self.__compiled_regexes = self.__compile_regexes()
|
||||
self.update_jobs()
|
||||
|
||||
def __compile_regexes(self):
|
||||
"""Precompile regular expressions for job validation."""
|
||||
return {
|
||||
"name": re.compile(r"^[\w.-]{1,128}$"),
|
||||
"file": re.compile(r"^[\w./-]{1,256}$"),
|
||||
}
|
||||
|
||||
@property
|
||||
def env(self) -> Dict[str, Any]:
|
||||
|
|
@ -60,98 +64,94 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
@env.setter
|
||||
def env(self, env: Dict[str, Any]):
|
||||
self.__env = env
|
||||
self.__env = environ | env
|
||||
|
||||
def update_jobs(self):
|
||||
self.__jobs = self.__get_jobs()
|
||||
|
||||
def __get_jobs(self):
|
||||
jobs = {}
|
||||
for plugin_file in (
|
||||
glob(join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json"))
|
||||
+ glob(join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json"))
|
||||
+ glob(join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"))
|
||||
): # core plugins # external plugins # pro plugins
|
||||
plugin_files = []
|
||||
plugin_dirs = [
|
||||
join(sep, "usr", "share", "bunkerweb", "core", "*", "plugin.json"),
|
||||
join(sep, "etc", "bunkerweb", "plugins", "*", "plugin.json"),
|
||||
join(sep, "etc", "bunkerweb", "pro", "plugins", "*", "plugin.json"),
|
||||
]
|
||||
for pattern in plugin_dirs:
|
||||
plugin_files.extend(glob(pattern))
|
||||
|
||||
for plugin_file in plugin_files:
|
||||
plugin_name = basename(dirname(plugin_file))
|
||||
jobs[plugin_name] = []
|
||||
try:
|
||||
plugin_data = loads(Path(plugin_file).read_text(encoding="utf-8"))
|
||||
if "jobs" not in plugin_data:
|
||||
continue
|
||||
|
||||
plugin_jobs = plugin_data["jobs"]
|
||||
|
||||
for x, job in enumerate(deepcopy(plugin_jobs)):
|
||||
if not all(key in job.keys() for key in ("name", "file", "every", "reload")):
|
||||
self.__logger.warning(
|
||||
f"missing keys for job {job['name']} in plugin {plugin_name}, must have name, file, every and reload, ignoring job"
|
||||
)
|
||||
plugin_jobs.pop(x)
|
||||
continue
|
||||
|
||||
if not match(r"^[\w.-]{1,128}$", job["name"]):
|
||||
self.__logger.warning(
|
||||
f"Invalid name for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores and hyphens (min 1 characters and max 128)), ignoring job"
|
||||
)
|
||||
plugin_jobs.pop(x)
|
||||
continue
|
||||
elif not match(r"^[\w./-]{1,256}$", job["file"]):
|
||||
self.__logger.warning(
|
||||
f"Invalid file for job {job['name']} in plugin {plugin_name} (Can only contain numbers, letters, underscores, hyphens and slashes (min 1 characters and max 256)), ignoring job"
|
||||
)
|
||||
plugin_jobs.pop(x)
|
||||
continue
|
||||
elif job["every"] not in ("once", "minute", "hour", "day", "week"):
|
||||
self.__logger.warning(
|
||||
f"Invalid every for job {job['name']} in plugin {plugin_name} (Must be once, minute, hour, day or week), ignoring job"
|
||||
)
|
||||
plugin_jobs.pop(x)
|
||||
continue
|
||||
elif job["reload"] is not True and job["reload"] is not False:
|
||||
self.__logger.warning(f"Invalid reload for job {job['name']} in plugin {plugin_name} (Must be true or false), ignoring job")
|
||||
plugin_jobs.pop(x)
|
||||
continue
|
||||
|
||||
plugin_jobs[x]["path"] = dirname(plugin_file)
|
||||
|
||||
jobs[plugin_name] = plugin_jobs
|
||||
plugin_jobs = plugin_data.get("jobs", [])
|
||||
valid_jobs = self.__validate_jobs(plugin_jobs, plugin_name, plugin_file)
|
||||
jobs[plugin_name] = valid_jobs
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except:
|
||||
self.__logger.warning(f"Exception while getting jobs for plugin {plugin_name} : {format_exc()}")
|
||||
self.__logger.warning(f"Plugin file not found: {plugin_file}")
|
||||
except Exception as e:
|
||||
self.__logger.warning(f"Exception while getting jobs for plugin {plugin_name}: {e}")
|
||||
return jobs
|
||||
|
||||
def __validate_jobs(self, plugin_jobs, plugin_name, plugin_file):
|
||||
valid_jobs = []
|
||||
for job in plugin_jobs:
|
||||
if not all(k in job for k in ("name", "file", "every", "reload")):
|
||||
self.__logger.warning(f"Missing keys in job definition in plugin {plugin_name}. Required keys: name, file, every, reload. Job: {job}")
|
||||
continue
|
||||
|
||||
name_valid = self.__compiled_regexes["name"].match(job["name"])
|
||||
file_valid = self.__compiled_regexes["file"].match(job["file"])
|
||||
every_valid = job["every"] in ("once", "minute", "hour", "day", "week")
|
||||
reload_valid = isinstance(job["reload"], bool)
|
||||
|
||||
if not (name_valid and file_valid and every_valid and reload_valid):
|
||||
self.__logger.warning(f"Invalid job definition in plugin {plugin_name}. Job: {job}")
|
||||
continue
|
||||
|
||||
job["path"] = dirname(plugin_file)
|
||||
valid_jobs.append(job)
|
||||
return valid_jobs
|
||||
|
||||
def __str_to_schedule(self, every: str) -> Job:
|
||||
if every == "minute":
|
||||
return schedule_every().minute
|
||||
elif every == "hour":
|
||||
return schedule_every().hour
|
||||
elif every == "day":
|
||||
return schedule_every().day
|
||||
elif every == "week":
|
||||
return schedule_every().week
|
||||
raise ValueError(f"can't convert string {every} to schedule")
|
||||
schedule_map = {
|
||||
"minute": schedule.every().minute,
|
||||
"hour": schedule.every().hour,
|
||||
"day": schedule.every().day,
|
||||
"week": schedule.every().week,
|
||||
}
|
||||
try:
|
||||
return schedule_map[every]
|
||||
except KeyError:
|
||||
raise ValueError(f"Can't convert string '{every}' to schedule")
|
||||
|
||||
def __reload(self) -> bool:
|
||||
self.__logger.info("Reloading nginx ...")
|
||||
reload = self.send_to_apis("POST", "/reload")[0]
|
||||
if reload:
|
||||
self.__logger.info("Reloading nginx...")
|
||||
reload_success = self.send_to_apis("POST", "/reload")[0]
|
||||
if reload_success:
|
||||
self.__logger.info("Successfully reloaded nginx")
|
||||
return True
|
||||
self.__logger.error("Error while reloading nginx")
|
||||
return reload
|
||||
return False
|
||||
|
||||
def __job_wrapper(self, path: str, plugin: str, name: str, file: str) -> int:
|
||||
self.__logger.info(f"Executing job {name} from plugin {plugin} ...")
|
||||
self.__logger.info(f"Executing job '{name}' from plugin '{plugin}'...")
|
||||
success = True
|
||||
ret = -1
|
||||
start_date = datetime.now().astimezone()
|
||||
try:
|
||||
proc = run(join(path, "jobs", file), stdin=DEVNULL, stderr=STDOUT, env=self.__env, check=False)
|
||||
proc = run(
|
||||
join(path, "jobs", file),
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
env=self.__env,
|
||||
check=False,
|
||||
)
|
||||
ret = proc.returncode
|
||||
except BaseException:
|
||||
except Exception as e:
|
||||
success = False
|
||||
self.__logger.error(f"Exception while executing job {name} from plugin {plugin} :\n{format_exc()}")
|
||||
self.__logger.error(f"Exception while executing job '{name}' from plugin '{plugin}': {e}")
|
||||
with self.__thread_lock:
|
||||
self.__job_success = False
|
||||
end_date = datetime.now().astimezone()
|
||||
|
|
@ -162,11 +162,12 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
if self.__job_success and (ret < 0 or ret >= 2):
|
||||
success = False
|
||||
self.__logger.error(f"Error while executing job {name} from plugin {plugin}")
|
||||
self.__logger.error(f"Error while executing job '{name}' from plugin '{plugin}'")
|
||||
with self.__thread_lock:
|
||||
self.__job_success = False
|
||||
|
||||
Thread(target=self.__add_job_run, args=(name, success, start_date, end_date)).start()
|
||||
# Use the executor to manage threads
|
||||
self.__executor.submit(self.__add_job_run, name, success, start_date, end_date)
|
||||
|
||||
return ret
|
||||
|
||||
|
|
@ -175,8 +176,9 @@ class JobScheduler(ApiCaller):
|
|||
err = self.db.add_job_run(name, success, start_date, end_date)
|
||||
|
||||
if not err:
|
||||
return self.__logger.info(f"Successfully added job run for the job {name}")
|
||||
self.__logger.warning(f"Failed to add job run for the job {name}: {err}")
|
||||
self.__logger.info(f"Successfully added job run for the job '{name}'")
|
||||
else:
|
||||
self.__logger.warning(f"Failed to add job run for the job '{name}': {err}")
|
||||
|
||||
def setup(self):
|
||||
for plugin, jobs in self.__jobs.items():
|
||||
|
|
@ -188,32 +190,28 @@ class JobScheduler(ApiCaller):
|
|||
every = job["every"]
|
||||
if every != "once":
|
||||
self.__str_to_schedule(every).do(self.__job_wrapper, path, plugin, name, file)
|
||||
except:
|
||||
self.__logger.error(f"Exception while scheduling jobs for plugin {plugin} : {format_exc()}")
|
||||
except Exception as e:
|
||||
self.__logger.error(f"Exception while scheduling job '{name}' for plugin '{plugin}': {e}")
|
||||
|
||||
def run_pending(self) -> bool:
|
||||
threads = []
|
||||
for job in schedule_jobs:
|
||||
if not job.should_run:
|
||||
continue
|
||||
threads.append(Thread(target=self.__run_in_thread, args=((job.run,),)))
|
||||
pending_jobs = [job for job in schedule.jobs if job.should_run]
|
||||
|
||||
if not threads:
|
||||
if not pending_jobs:
|
||||
return True
|
||||
|
||||
err = self.try_database_readonly()
|
||||
if err:
|
||||
if self.try_database_readonly():
|
||||
self.__logger.error("Database is in read-only mode, pending jobs will not be executed")
|
||||
return True
|
||||
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
# Use ThreadPoolExecutor to run jobs
|
||||
futures = [self.__executor.submit(self.__run_job_with_semaphore, job.run) for job in pending_jobs]
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
# Wait for all jobs to complete
|
||||
for future in futures:
|
||||
future.result()
|
||||
|
||||
success = self.__job_success
|
||||
self.__job_success = True
|
||||
|
|
@ -222,62 +220,63 @@ class JobScheduler(ApiCaller):
|
|||
try:
|
||||
if self.apis:
|
||||
cache_path = join(sep, "var", "cache", "bunkerweb")
|
||||
self.__logger.info(f"Sending {cache_path} folder ...")
|
||||
self.__logger.info(f"Sending '{cache_path}' folder...")
|
||||
if not self.send_files(cache_path, "/cache"):
|
||||
success = False
|
||||
self.__logger.error(f"Error while sending {cache_path} folder")
|
||||
self.__logger.error(f"Error while sending '{cache_path}' folder")
|
||||
else:
|
||||
self.__logger.info(f"Successfully sent {cache_path} folder")
|
||||
self.__logger.info(f"Successfully sent '{cache_path}' folder")
|
||||
|
||||
if not self.__reload():
|
||||
success = False
|
||||
except BaseException:
|
||||
except Exception as e:
|
||||
success = False
|
||||
self.__logger.error(f"Exception while reloading after job scheduling : {format_exc()}")
|
||||
self.__logger.error(f"Exception while reloading after job scheduling: {e}")
|
||||
self.__job_reload = False
|
||||
|
||||
if threads:
|
||||
if pending_jobs:
|
||||
self.__logger.info("All scheduled jobs have been executed")
|
||||
|
||||
return success
|
||||
|
||||
def run_once(self, plugins: Optional[List[str]] = None) -> bool:
|
||||
err = self.try_database_readonly()
|
||||
if err:
|
||||
if self.try_database_readonly():
|
||||
self.__logger.error("Database is in read-only mode, jobs will not be executed")
|
||||
return True
|
||||
|
||||
threads = []
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
|
||||
plugins = plugins or []
|
||||
|
||||
# Create a list of all jobs to run
|
||||
jobs_to_run = []
|
||||
for plugin, jobs in self.__jobs.items():
|
||||
if plugins and plugin not in plugins:
|
||||
continue
|
||||
for job in jobs:
|
||||
jobs_to_run.append(
|
||||
partial(
|
||||
self.__job_wrapper,
|
||||
job["path"],
|
||||
plugin,
|
||||
job["name"],
|
||||
job["file"],
|
||||
)
|
||||
)
|
||||
|
||||
# Add job to the list of jobs to run in the order they are defined
|
||||
jobs_jobs = [partial(self.__job_wrapper, job["path"], plugin, job["name"], job["file"]) for job in jobs]
|
||||
# Use ThreadPoolExecutor to run jobs
|
||||
futures = [self.__executor.submit(self.__run_job_with_semaphore, job_func) for job_func in jobs_to_run]
|
||||
|
||||
# Create a thread for each plugin
|
||||
threads.append(Thread(target=self.__run_in_thread, args=(jobs_jobs,)))
|
||||
# Wait for all jobs to complete
|
||||
for future in futures:
|
||||
future.result()
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
ret = self.__job_success
|
||||
self.__job_success = True
|
||||
|
||||
return ret
|
||||
return self.__job_success
|
||||
|
||||
def run_single(self, job_name: str) -> bool:
|
||||
err = self.try_database_readonly()
|
||||
if err:
|
||||
self.__logger.error(f"Database is in read-only mode, single job {job_name} will not be executed")
|
||||
if self.try_database_readonly():
|
||||
self.__logger.error(f"Database is in read-only mode, single job '{job_name}' will not be executed")
|
||||
return True
|
||||
|
||||
if self.__lock:
|
||||
|
|
@ -293,37 +292,41 @@ class JobScheduler(ApiCaller):
|
|||
break
|
||||
|
||||
if not job_plugin or not job_to_run:
|
||||
self.__logger.warning(f"Job {job_name} not found")
|
||||
self.__logger.warning(f"Job '{job_name}' not found")
|
||||
if self.__lock:
|
||||
self.__lock.release()
|
||||
return False
|
||||
|
||||
self.__job_wrapper(job_to_run["path"], job_plugin, job_to_run["name"], job_to_run["file"])
|
||||
self.__job_wrapper(
|
||||
job_to_run["path"],
|
||||
job_plugin,
|
||||
job_to_run["name"],
|
||||
job_to_run["file"],
|
||||
)
|
||||
|
||||
if self.__lock:
|
||||
self.__lock.release()
|
||||
return self.__job_success
|
||||
|
||||
def __run_in_thread(self, jobs: list):
|
||||
self.__semaphore.acquire(timeout=60)
|
||||
for job in jobs:
|
||||
job()
|
||||
self.__semaphore.release()
|
||||
def __run_job_with_semaphore(self, job_func):
|
||||
with self.__semaphore:
|
||||
job_func()
|
||||
|
||||
def clear(self):
|
||||
schedule_clear()
|
||||
schedule.clear()
|
||||
|
||||
def reload(self, env: Dict[str, Any], apis: Optional[list] = None, *, changed_plugins: Optional[List[str]] = None) -> bool:
|
||||
ret = True
|
||||
try:
|
||||
self.__env = env
|
||||
self.__env = environ | env
|
||||
super().__init__(apis or self.apis)
|
||||
self.clear()
|
||||
self.__jobs = self.__get_jobs()
|
||||
ret = self.run_once(changed_plugins)
|
||||
self.update_jobs()
|
||||
success = self.run_once(changed_plugins)
|
||||
self.setup()
|
||||
except:
|
||||
self.__logger.error(f"Exception while reloading scheduler {format_exc()}")
|
||||
return success
|
||||
except Exception as e:
|
||||
self.__logger.error(f"Exception while reloading scheduler: {e}")
|
||||
return False
|
||||
return ret
|
||||
|
||||
def try_database_readonly(self, force: bool = False) -> bool:
|
||||
if not self.db.readonly:
|
||||
|
|
@ -331,7 +334,7 @@ class JobScheduler(ApiCaller):
|
|||
self.db.test_write()
|
||||
self.db.readonly = False
|
||||
return False
|
||||
except BaseException:
|
||||
except Exception:
|
||||
self.db.readonly = True
|
||||
return True
|
||||
elif not force and self.db.last_connection_retry and (datetime.now().astimezone() - self.db.last_connection_retry).total_seconds() > 30:
|
||||
|
|
@ -343,15 +346,19 @@ class JobScheduler(ApiCaller):
|
|||
self.db.retry_connection(log=False)
|
||||
self.db.readonly = False
|
||||
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
|
||||
except BaseException:
|
||||
except Exception:
|
||||
try:
|
||||
self.db.retry_connection(readonly=True, pool_timeout=1)
|
||||
self.db.retry_connection(readonly=True, log=False)
|
||||
except BaseException:
|
||||
except Exception:
|
||||
if self.db.database_uri_readonly:
|
||||
with suppress(BaseException):
|
||||
with suppress(Exception):
|
||||
self.db.retry_connection(fallback=True, pool_timeout=1)
|
||||
self.db.retry_connection(fallback=True, log=False)
|
||||
self.db.readonly = True
|
||||
|
||||
return self.db.readonly
|
||||
|
||||
def shutdown(self):
|
||||
"""Shut down the ThreadPoolExecutor."""
|
||||
self.__executor.shutdown(wait=True)
|
||||
|
|
|
|||
Loading…
Reference in a new issue