feat: Add a run history to jobs and the possibility to control the size of the history via the DATABASE_MAX_JOBS_RUNS setting

This commit is contained in:
Théophile Diot 2024-08-02 09:12:15 +01:00
parent a84826c75f
commit 14b5af3f27
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
5 changed files with 129 additions and 39 deletions

View file

@ -0,0 +1,30 @@
#!/usr/bin/env python3
from os import getenv, sep
from os.path import join
from sys import exit as sys_exit, path as sys_path
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]:
if deps_path not in sys_path:
sys_path.append(deps_path)
from Database import Database # type: ignore
from logger import setup_logger # type: ignore
LOGGER = setup_logger("DB.CLEANUP-EXCESS-JOBS-RUNS", getenv("LOG_LEVEL", "INFO"))
status = 0
try:
DB = Database(LOGGER, sqlalchemy_string=getenv("DATABASE_URI"))
ret = DB.cleanup_jobs_runs_excess(int(getenv("DATABASE_MAX_JOBS_RUNS", "10000")))
if not ret.startswith("Removed"):
LOGGER.error(ret)
sys_exit(1)
LOGGER.info(ret)
except SystemExit as e:
status = e.code
except BaseException as e:
status = 2
LOGGER.error(f"Exception while running cleanup-excess-jobs-runs.py :\n{e}")
sys_exit(status)

View file

@ -32,6 +32,23 @@
"regex": "^(debug|info|warn|warning|error)$",
"type": "select",
"select": ["debug", "info", "warn", "warning", "error"]
},
"DATABASE_MAX_JOBS_RUNS": {
"context": "global",
"default": "10000",
"help": "The maximum number of jobs runs to keep in the database.",
"id": "database-max-jobs-runs",
"label": "The maximum number of jobs runs",
"regex": "^\\d+$",
"type": "text"
}
}
},
"jobs": [
{
"name": "cleanup-excess-jobs-runs",
"file": "cleanup-excess-jobs-runs.py",
"every": "day",
"reload": false
}
]
}

View file

@ -27,6 +27,7 @@ from model import (
Jobs,
Plugin_pages,
Jobs_cache,
Jobs_runs,
Custom_configs,
Selects,
BwcliCommands,
@ -40,7 +41,7 @@ for deps_path in [os_join(sep, "usr", "share", "bunkerweb", *paths) for paths in
from common_utils import bytes_hash # type: ignore
from pymysql import install_as_MySQLdb
from sqlalchemy import create_engine, event, MetaData as sql_metadata, join, select as db_select, text, inspect
from sqlalchemy import create_engine, event, MetaData as sql_metadata, func, join, select as db_select, text, inspect
from sqlalchemy.engine import Engine
from sqlalchemy.exc import (
ArgumentError,
@ -577,6 +578,7 @@ class Database:
session.query(BwcliCommands).filter(BwcliCommands.plugin_id.in_(missing_ids)).delete()
for plugin_job in session.query(Jobs).with_entities(Jobs.name).filter(Jobs.plugin_id.in_(missing_ids)):
session.query(Jobs_runs).filter(Jobs_runs.job_name == plugin_job.name).delete()
session.query(Jobs_cache).filter(Jobs_cache.job_name == plugin_job.name).delete()
session.query(Jobs).filter(Jobs.name == plugin_job.name).delete()
@ -605,6 +607,7 @@ class Database:
session.query(BwcliCommands).filter(BwcliCommands.plugin_id.in_(missing_values)).delete()
for plugin_job in session.query(Jobs).with_entities(Jobs.name).filter(Jobs.plugin_id.in_(missing_values)):
session.query(Jobs_runs).filter(Jobs_runs.job_name == plugin_job.name).delete()
session.query(Jobs_cache).filter(Jobs_cache.job_name == plugin_job.name).delete()
session.query(Jobs).filter(Jobs.name == plugin_job.name).delete()
@ -816,6 +819,7 @@ class Database:
self.logger.warning(f'Removing {len(missing_names)} jobs from plugin "{plugin["id"]}" as they are no longer in the list')
session.query(Jobs).filter(Jobs.name.in_(missing_names), Jobs.plugin_id == plugin["id"]).delete()
session.query(Jobs_cache).filter(Jobs_cache.job_name.in_(missing_names)).delete()
session.query(Jobs_runs).filter(Jobs_runs.job_name.in_(missing_names)).delete()
if "bw_jobs" in old_data:
indexes = [i for i, job in enumerate(old_data["bw_jobs"]) if job.plugin_id == plugin["id"]]
@ -862,6 +866,7 @@ class Database:
if updates:
self.logger.warning(f'Job "{job["name"]}" already exists, updating it with the new values')
updates[Jobs.last_run] = None
session.query(Jobs_runs).filter(Jobs_runs.job_name == job["name"]).delete()
session.query(Jobs_cache).filter(Jobs_cache.job_name == job["name"]).delete()
session.query(Jobs).filter(Jobs.name == job["name"]).update(updates)
@ -1627,27 +1632,36 @@ class Database:
return services
def update_job(self, plugin_id: str, job_name: str, success: bool) -> str:
"""Update the job last_run in the database"""
def add_job_run(self, job_name: str, success: bool, start_date: datetime, end_date: Optional[datetime] = None) -> str:
"""Add a job run."""
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
job = session.query(Jobs).filter_by(plugin_id=plugin_id, name=job_name).first()
if not job:
return "Job not found"
job.last_run = datetime.now()
job.success = success
session.add(Jobs_runs(job_name=job_name, success=success, start_date=start_date, end_date=end_date or datetime.now()))
try:
session.commit()
except BaseException as e:
return str(e)
return ""
def cleanup_jobs_runs_excess(self, max_runs: int) -> str:
"""Remove excess jobs runs."""
result = 0
with self._db_session() as session:
rows_count = session.query(Jobs_runs).count()
if rows_count > max_runs:
result = (
session.query(Jobs_runs).order_by(Jobs_runs.end_date.asc()).limit(rows_count - max_runs).with_for_update().delete(synchronize_session=False)
)
try:
session.commit()
except BaseException as e:
return str(e)
return f"Removed {result} excess jobs runs"
def delete_job_cache(self, file_name: str, *, job_name: Optional[str] = None, service_id: Optional[str] = None) -> str:
job_name = job_name or argv[0].replace(".py", "")
filters = {"file_name": file_name}
@ -1732,6 +1746,7 @@ class Database:
session.query(BwcliCommands).filter(BwcliCommands.plugin_id.in_(missing_ids)).delete()
for plugin_job in session.query(Jobs).with_entities(Jobs.name).filter(Jobs.plugin_id.in_(missing_ids)):
session.query(Jobs_runs).filter(Jobs_runs.job_name == plugin_job.name).delete()
session.query(Jobs_cache).filter(Jobs_cache.job_name == plugin_job.name).delete()
session.query(Jobs).filter(Jobs.name == plugin_job.name).delete()
@ -1903,6 +1918,7 @@ class Database:
# Remove jobs that are no longer in the list
session.query(Jobs).filter(Jobs.name.in_(missing_names)).delete()
session.query(Jobs_cache).filter(Jobs_cache.job_name.in_(missing_names)).delete()
session.query(Jobs_runs).filter(Jobs_runs.job_name.in_(missing_names)).delete()
for job in jobs:
db_job = (
@ -1932,6 +1948,7 @@ class Database:
if updates:
changes = True
updates[Jobs.last_run] = None
session.query(Jobs_runs).filter(Jobs_runs.job_name == job["name"]).delete()
session.query(Jobs_cache).filter(Jobs_cache.job_name == job["name"]).delete()
session.query(Jobs).filter(Jobs.name == job["name"]).update(updates)
@ -2292,7 +2309,22 @@ class Database:
def get_plugins_errors(self) -> int:
"""Get plugins errors."""
with self._db_session() as session:
return session.query(Jobs).filter(Jobs.success == False).count() # noqa: E712
# Subquery to get the latest run for each job
latest_runs_subquery = (
session.query(Jobs_runs.job_name, func.max(Jobs_runs.end_date).label("latest_end_date")).group_by(Jobs_runs.job_name).subquery()
)
# Main query to fetch latest job runs and count errors
latest_job_runs = (
session.query(Jobs_runs.job_name, Jobs_runs.success)
.join(
latest_runs_subquery,
(Jobs_runs.job_name == latest_runs_subquery.c.job_name) & (Jobs_runs.end_date == latest_runs_subquery.c.latest_end_date),
)
.all()
)
return sum(1 for job_run in latest_job_runs if not job_run.success)
def get_jobs(self) -> Dict[str, Dict[str, Any]]:
"""Get jobs."""
@ -2302,33 +2334,31 @@ class Database:
"plugin_id": job.plugin_id,
"every": job.every,
"reload": job.reload,
"success": job.success,
"last_run": job.last_run.strftime("%Y/%m/%d, %I:%M:%S %p") if job.last_run is not None else "Never",
"history": [
{
"start_date": job_run.start_date.strftime("%d/%m/%Y, %I:%M:%S %p"),
"end_date": job_run.end_date.strftime("%d/%m/%Y, %I:%M:%S %p"),
"success": job_run.success,
}
for job_run in session.query(Jobs_runs)
.with_entities(Jobs_runs.success, Jobs_runs.start_date, Jobs_runs.end_date)
.filter_by(job_name=job.name)
.order_by(Jobs_runs.end_date.desc())
.limit(10)
],
"cache": [
{
"service_id": cache.service_id,
"file_name": cache.file_name,
"last_update": cache.last_update.strftime("%Y/%m/%d, %I:%M:%S %p") if cache.last_update is not None else "Never",
"last_update": cache.last_update.strftime("%d/%m/%Y, %I:%M:%S %p") if cache.last_update else "Never",
"checksum": cache.checksum,
}
for cache in session.query(Jobs_cache)
.with_entities(
Jobs_cache.service_id,
Jobs_cache.file_name,
Jobs_cache.last_update,
)
.with_entities(Jobs_cache.service_id, Jobs_cache.file_name, Jobs_cache.last_update, Jobs_cache.checksum)
.filter_by(job_name=job.name)
],
}
for job in (
session.query(Jobs).with_entities(
Jobs.name,
Jobs.plugin_id,
Jobs.every,
Jobs.reload,
Jobs.success,
Jobs.last_run,
)
)
for job in session.query(Jobs).with_entities(Jobs.name, Jobs.plugin_id, Jobs.every, Jobs.reload)
}
def get_job_cache_file(

View file

@ -152,11 +152,10 @@ class Jobs(Base):
file_name = Column(String(256), nullable=False)
every = Column(SCHEDULES_ENUM, nullable=False)
reload = Column(Boolean, default=False, nullable=False)
success = Column(Boolean, nullable=True)
last_run = Column(DateTime, nullable=True)
plugin = relationship("Plugins", back_populates="jobs")
cache = relationship("Jobs_cache", back_populates="job", cascade="all")
runs = relationship("Jobs_runs", back_populates="job", cascade="all")
class Plugin_pages(Base):
@ -164,6 +163,7 @@ class Plugin_pages(Base):
id = Column(Integer, Identity(start=1, increment=1), primary_key=True)
plugin_id = Column(String(64), ForeignKey("bw_plugins.id", onupdate="cascade", ondelete="cascade"), nullable=False)
# TODO: replace with a raw data that gets extracted by the plugin
template_file = Column(LargeBinary(length=(2**32) - 1), nullable=False)
template_checksum = Column(String(128), nullable=False)
actions_file = Column(LargeBinary(length=(2**32) - 1), nullable=False)
@ -189,6 +189,18 @@ class Jobs_cache(Base):
service = relationship("Services", back_populates="jobs_cache")
class Jobs_runs(Base):
__tablename__ = "bw_jobs_runs"
id = Column(Integer, Identity(start=1, increment=1), primary_key=True)
job_name = Column(String(128), ForeignKey("bw_jobs.name", onupdate="cascade", ondelete="cascade"), nullable=False)
success = Column(Boolean, nullable=True, default=False)
start_date = Column(DateTime(), nullable=False)
end_date = Column(DateTime(), nullable=True, server_default=func.now())
job = relationship("Jobs", back_populates="runs")
class Custom_configs(Base):
__tablename__ = "bw_custom_configs"
__table_args__ = (UniqueConstraint("service_id", "type", "name"),)

View file

@ -162,6 +162,7 @@ class JobScheduler(ApiCaller):
self.__logger.info(f"Executing job {name} from plugin {plugin} ...")
success = True
ret = -1
start_date = datetime.now()
try:
proc = run(join(path, "jobs", file), stdin=DEVNULL, stderr=STDOUT, env=self.__env, check=False)
ret = proc.returncode
@ -170,6 +171,7 @@ class JobScheduler(ApiCaller):
self.__logger.error(f"Exception while executing job {name} from plugin {plugin} :\n{format_exc()}")
with self.__thread_lock:
self.__job_success = False
end_date = datetime.now()
if ret == 1:
with self.__thread_lock:
@ -181,18 +183,17 @@ class JobScheduler(ApiCaller):
with self.__thread_lock:
self.__job_success = False
Thread(target=self.__update_job, args=(plugin, name, success)).start()
Thread(target=self.__add_job_run, args=(name, success, start_date, end_date)).start()
return ret
def __update_job(self, plugin: str, name: str, success: bool):
def __add_job_run(self, name: str, success: bool, start_date: datetime, end_date: datetime = None):
with self.__thread_lock:
err = self.db.update_job(plugin, name, success)
err = self.db.add_job_run(name, success, start_date, end_date)
if not err:
self.__logger.info(f"Successfully updated database for the job {name} from plugin {plugin}")
else:
self.__logger.warning(f"Failed to update database for the job {name} from plugin {plugin}: {err}")
return self.__logger.info(f"Successfully added job run for the job {name}")
self.__logger.warning(f"Failed to add job run for the job {name}: {err}")
def setup(self):
for plugin, jobs in self.__jobs.items():