mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
feat: Add a run history to jobs and the possibility to control the size of the history via the DATABASE_MAX_JOBS_RUNS setting
This commit is contained in:
parent
a84826c75f
commit
14b5af3f27
5 changed files with 129 additions and 39 deletions
30
src/common/core/db/jobs/cleanup-excess-jobs-runs.py
Normal file
30
src/common/core/db/jobs/cleanup-excess-jobs-runs.py
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from os import getenv, sep
|
||||
from os.path import join
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]:
|
||||
if deps_path not in sys_path:
|
||||
sys_path.append(deps_path)
|
||||
|
||||
from Database import Database # type: ignore
|
||||
from logger import setup_logger # type: ignore
|
||||
|
||||
LOGGER = setup_logger("DB.CLEANUP-EXCESS-JOBS-RUNS", getenv("LOG_LEVEL", "INFO"))
|
||||
status = 0
|
||||
|
||||
try:
|
||||
DB = Database(LOGGER, sqlalchemy_string=getenv("DATABASE_URI"))
|
||||
ret = DB.cleanup_jobs_runs_excess(int(getenv("DATABASE_MAX_JOBS_RUNS", "10000")))
|
||||
if not ret.startswith("Removed"):
|
||||
LOGGER.error(ret)
|
||||
sys_exit(1)
|
||||
LOGGER.info(ret)
|
||||
except SystemExit as e:
|
||||
status = e.code
|
||||
except BaseException as e:
|
||||
status = 2
|
||||
LOGGER.error(f"Exception while running cleanup-excess-jobs-runs.py :\n{e}")
|
||||
|
||||
sys_exit(status)
|
||||
|
|
@ -32,6 +32,23 @@
|
|||
"regex": "^(debug|info|warn|warning|error)$",
|
||||
"type": "select",
|
||||
"select": ["debug", "info", "warn", "warning", "error"]
|
||||
},
|
||||
"DATABASE_MAX_JOBS_RUNS": {
|
||||
"context": "global",
|
||||
"default": "10000",
|
||||
"help": "The maximum number of jobs runs to keep in the database.",
|
||||
"id": "database-max-jobs-runs",
|
||||
"label": "The maximum number of jobs runs",
|
||||
"regex": "^\\d+$",
|
||||
"type": "text"
|
||||
}
|
||||
}
|
||||
},
|
||||
"jobs": [
|
||||
{
|
||||
"name": "cleanup-excess-jobs-runs",
|
||||
"file": "cleanup-excess-jobs-runs.py",
|
||||
"every": "day",
|
||||
"reload": false
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ from model import (
|
|||
Jobs,
|
||||
Plugin_pages,
|
||||
Jobs_cache,
|
||||
Jobs_runs,
|
||||
Custom_configs,
|
||||
Selects,
|
||||
BwcliCommands,
|
||||
|
|
@ -40,7 +41,7 @@ for deps_path in [os_join(sep, "usr", "share", "bunkerweb", *paths) for paths in
|
|||
from common_utils import bytes_hash # type: ignore
|
||||
|
||||
from pymysql import install_as_MySQLdb
|
||||
from sqlalchemy import create_engine, event, MetaData as sql_metadata, join, select as db_select, text, inspect
|
||||
from sqlalchemy import create_engine, event, MetaData as sql_metadata, func, join, select as db_select, text, inspect
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.exc import (
|
||||
ArgumentError,
|
||||
|
|
@ -577,6 +578,7 @@ class Database:
|
|||
session.query(BwcliCommands).filter(BwcliCommands.plugin_id.in_(missing_ids)).delete()
|
||||
|
||||
for plugin_job in session.query(Jobs).with_entities(Jobs.name).filter(Jobs.plugin_id.in_(missing_ids)):
|
||||
session.query(Jobs_runs).filter(Jobs_runs.job_name == plugin_job.name).delete()
|
||||
session.query(Jobs_cache).filter(Jobs_cache.job_name == plugin_job.name).delete()
|
||||
session.query(Jobs).filter(Jobs.name == plugin_job.name).delete()
|
||||
|
||||
|
|
@ -605,6 +607,7 @@ class Database:
|
|||
session.query(BwcliCommands).filter(BwcliCommands.plugin_id.in_(missing_values)).delete()
|
||||
|
||||
for plugin_job in session.query(Jobs).with_entities(Jobs.name).filter(Jobs.plugin_id.in_(missing_values)):
|
||||
session.query(Jobs_runs).filter(Jobs_runs.job_name == plugin_job.name).delete()
|
||||
session.query(Jobs_cache).filter(Jobs_cache.job_name == plugin_job.name).delete()
|
||||
session.query(Jobs).filter(Jobs.name == plugin_job.name).delete()
|
||||
|
||||
|
|
@ -816,6 +819,7 @@ class Database:
|
|||
self.logger.warning(f'Removing {len(missing_names)} jobs from plugin "{plugin["id"]}" as they are no longer in the list')
|
||||
session.query(Jobs).filter(Jobs.name.in_(missing_names), Jobs.plugin_id == plugin["id"]).delete()
|
||||
session.query(Jobs_cache).filter(Jobs_cache.job_name.in_(missing_names)).delete()
|
||||
session.query(Jobs_runs).filter(Jobs_runs.job_name.in_(missing_names)).delete()
|
||||
|
||||
if "bw_jobs" in old_data:
|
||||
indexes = [i for i, job in enumerate(old_data["bw_jobs"]) if job.plugin_id == plugin["id"]]
|
||||
|
|
@ -862,6 +866,7 @@ class Database:
|
|||
if updates:
|
||||
self.logger.warning(f'Job "{job["name"]}" already exists, updating it with the new values')
|
||||
updates[Jobs.last_run] = None
|
||||
session.query(Jobs_runs).filter(Jobs_runs.job_name == job["name"]).delete()
|
||||
session.query(Jobs_cache).filter(Jobs_cache.job_name == job["name"]).delete()
|
||||
session.query(Jobs).filter(Jobs.name == job["name"]).update(updates)
|
||||
|
||||
|
|
@ -1627,27 +1632,36 @@ class Database:
|
|||
|
||||
return services
|
||||
|
||||
def update_job(self, plugin_id: str, job_name: str, success: bool) -> str:
|
||||
"""Update the job last_run in the database"""
|
||||
def add_job_run(self, job_name: str, success: bool, start_date: datetime, end_date: Optional[datetime] = None) -> str:
|
||||
"""Add a job run."""
|
||||
with self._db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
job = session.query(Jobs).filter_by(plugin_id=plugin_id, name=job_name).first()
|
||||
|
||||
if not job:
|
||||
return "Job not found"
|
||||
|
||||
job.last_run = datetime.now()
|
||||
job.success = success
|
||||
session.add(Jobs_runs(job_name=job_name, success=success, start_date=start_date, end_date=end_date or datetime.now()))
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
def cleanup_jobs_runs_excess(self, max_runs: int) -> str:
|
||||
"""Remove excess jobs runs."""
|
||||
result = 0
|
||||
with self._db_session() as session:
|
||||
rows_count = session.query(Jobs_runs).count()
|
||||
if rows_count > max_runs:
|
||||
result = (
|
||||
session.query(Jobs_runs).order_by(Jobs_runs.end_date.asc()).limit(rows_count - max_runs).with_for_update().delete(synchronize_session=False)
|
||||
)
|
||||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
return f"Removed {result} excess jobs runs"
|
||||
|
||||
def delete_job_cache(self, file_name: str, *, job_name: Optional[str] = None, service_id: Optional[str] = None) -> str:
|
||||
job_name = job_name or argv[0].replace(".py", "")
|
||||
filters = {"file_name": file_name}
|
||||
|
|
@ -1732,6 +1746,7 @@ class Database:
|
|||
session.query(BwcliCommands).filter(BwcliCommands.plugin_id.in_(missing_ids)).delete()
|
||||
|
||||
for plugin_job in session.query(Jobs).with_entities(Jobs.name).filter(Jobs.plugin_id.in_(missing_ids)):
|
||||
session.query(Jobs_runs).filter(Jobs_runs.job_name == plugin_job.name).delete()
|
||||
session.query(Jobs_cache).filter(Jobs_cache.job_name == plugin_job.name).delete()
|
||||
session.query(Jobs).filter(Jobs.name == plugin_job.name).delete()
|
||||
|
||||
|
|
@ -1903,6 +1918,7 @@ class Database:
|
|||
# Remove jobs that are no longer in the list
|
||||
session.query(Jobs).filter(Jobs.name.in_(missing_names)).delete()
|
||||
session.query(Jobs_cache).filter(Jobs_cache.job_name.in_(missing_names)).delete()
|
||||
session.query(Jobs_runs).filter(Jobs_runs.job_name.in_(missing_names)).delete()
|
||||
|
||||
for job in jobs:
|
||||
db_job = (
|
||||
|
|
@ -1932,6 +1948,7 @@ class Database:
|
|||
if updates:
|
||||
changes = True
|
||||
updates[Jobs.last_run] = None
|
||||
session.query(Jobs_runs).filter(Jobs_runs.job_name == job["name"]).delete()
|
||||
session.query(Jobs_cache).filter(Jobs_cache.job_name == job["name"]).delete()
|
||||
session.query(Jobs).filter(Jobs.name == job["name"]).update(updates)
|
||||
|
||||
|
|
@ -2292,7 +2309,22 @@ class Database:
|
|||
def get_plugins_errors(self) -> int:
|
||||
"""Get plugins errors."""
|
||||
with self._db_session() as session:
|
||||
return session.query(Jobs).filter(Jobs.success == False).count() # noqa: E712
|
||||
# Subquery to get the latest run for each job
|
||||
latest_runs_subquery = (
|
||||
session.query(Jobs_runs.job_name, func.max(Jobs_runs.end_date).label("latest_end_date")).group_by(Jobs_runs.job_name).subquery()
|
||||
)
|
||||
|
||||
# Main query to fetch latest job runs and count errors
|
||||
latest_job_runs = (
|
||||
session.query(Jobs_runs.job_name, Jobs_runs.success)
|
||||
.join(
|
||||
latest_runs_subquery,
|
||||
(Jobs_runs.job_name == latest_runs_subquery.c.job_name) & (Jobs_runs.end_date == latest_runs_subquery.c.latest_end_date),
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
return sum(1 for job_run in latest_job_runs if not job_run.success)
|
||||
|
||||
def get_jobs(self) -> Dict[str, Dict[str, Any]]:
|
||||
"""Get jobs."""
|
||||
|
|
@ -2302,33 +2334,31 @@ class Database:
|
|||
"plugin_id": job.plugin_id,
|
||||
"every": job.every,
|
||||
"reload": job.reload,
|
||||
"success": job.success,
|
||||
"last_run": job.last_run.strftime("%Y/%m/%d, %I:%M:%S %p") if job.last_run is not None else "Never",
|
||||
"history": [
|
||||
{
|
||||
"start_date": job_run.start_date.strftime("%d/%m/%Y, %I:%M:%S %p"),
|
||||
"end_date": job_run.end_date.strftime("%d/%m/%Y, %I:%M:%S %p"),
|
||||
"success": job_run.success,
|
||||
}
|
||||
for job_run in session.query(Jobs_runs)
|
||||
.with_entities(Jobs_runs.success, Jobs_runs.start_date, Jobs_runs.end_date)
|
||||
.filter_by(job_name=job.name)
|
||||
.order_by(Jobs_runs.end_date.desc())
|
||||
.limit(10)
|
||||
],
|
||||
"cache": [
|
||||
{
|
||||
"service_id": cache.service_id,
|
||||
"file_name": cache.file_name,
|
||||
"last_update": cache.last_update.strftime("%Y/%m/%d, %I:%M:%S %p") if cache.last_update is not None else "Never",
|
||||
"last_update": cache.last_update.strftime("%d/%m/%Y, %I:%M:%S %p") if cache.last_update else "Never",
|
||||
"checksum": cache.checksum,
|
||||
}
|
||||
for cache in session.query(Jobs_cache)
|
||||
.with_entities(
|
||||
Jobs_cache.service_id,
|
||||
Jobs_cache.file_name,
|
||||
Jobs_cache.last_update,
|
||||
)
|
||||
.with_entities(Jobs_cache.service_id, Jobs_cache.file_name, Jobs_cache.last_update, Jobs_cache.checksum)
|
||||
.filter_by(job_name=job.name)
|
||||
],
|
||||
}
|
||||
for job in (
|
||||
session.query(Jobs).with_entities(
|
||||
Jobs.name,
|
||||
Jobs.plugin_id,
|
||||
Jobs.every,
|
||||
Jobs.reload,
|
||||
Jobs.success,
|
||||
Jobs.last_run,
|
||||
)
|
||||
)
|
||||
for job in session.query(Jobs).with_entities(Jobs.name, Jobs.plugin_id, Jobs.every, Jobs.reload)
|
||||
}
|
||||
|
||||
def get_job_cache_file(
|
||||
|
|
|
|||
|
|
@ -152,11 +152,10 @@ class Jobs(Base):
|
|||
file_name = Column(String(256), nullable=False)
|
||||
every = Column(SCHEDULES_ENUM, nullable=False)
|
||||
reload = Column(Boolean, default=False, nullable=False)
|
||||
success = Column(Boolean, nullable=True)
|
||||
last_run = Column(DateTime, nullable=True)
|
||||
|
||||
plugin = relationship("Plugins", back_populates="jobs")
|
||||
cache = relationship("Jobs_cache", back_populates="job", cascade="all")
|
||||
runs = relationship("Jobs_runs", back_populates="job", cascade="all")
|
||||
|
||||
|
||||
class Plugin_pages(Base):
|
||||
|
|
@ -164,6 +163,7 @@ class Plugin_pages(Base):
|
|||
|
||||
id = Column(Integer, Identity(start=1, increment=1), primary_key=True)
|
||||
plugin_id = Column(String(64), ForeignKey("bw_plugins.id", onupdate="cascade", ondelete="cascade"), nullable=False)
|
||||
# TODO: replace with a raw data that gets extracted by the plugin
|
||||
template_file = Column(LargeBinary(length=(2**32) - 1), nullable=False)
|
||||
template_checksum = Column(String(128), nullable=False)
|
||||
actions_file = Column(LargeBinary(length=(2**32) - 1), nullable=False)
|
||||
|
|
@ -189,6 +189,18 @@ class Jobs_cache(Base):
|
|||
service = relationship("Services", back_populates="jobs_cache")
|
||||
|
||||
|
||||
class Jobs_runs(Base):
|
||||
__tablename__ = "bw_jobs_runs"
|
||||
|
||||
id = Column(Integer, Identity(start=1, increment=1), primary_key=True)
|
||||
job_name = Column(String(128), ForeignKey("bw_jobs.name", onupdate="cascade", ondelete="cascade"), nullable=False)
|
||||
success = Column(Boolean, nullable=True, default=False)
|
||||
start_date = Column(DateTime(), nullable=False)
|
||||
end_date = Column(DateTime(), nullable=True, server_default=func.now())
|
||||
|
||||
job = relationship("Jobs", back_populates="runs")
|
||||
|
||||
|
||||
class Custom_configs(Base):
|
||||
__tablename__ = "bw_custom_configs"
|
||||
__table_args__ = (UniqueConstraint("service_id", "type", "name"),)
|
||||
|
|
|
|||
|
|
@ -162,6 +162,7 @@ class JobScheduler(ApiCaller):
|
|||
self.__logger.info(f"Executing job {name} from plugin {plugin} ...")
|
||||
success = True
|
||||
ret = -1
|
||||
start_date = datetime.now()
|
||||
try:
|
||||
proc = run(join(path, "jobs", file), stdin=DEVNULL, stderr=STDOUT, env=self.__env, check=False)
|
||||
ret = proc.returncode
|
||||
|
|
@ -170,6 +171,7 @@ class JobScheduler(ApiCaller):
|
|||
self.__logger.error(f"Exception while executing job {name} from plugin {plugin} :\n{format_exc()}")
|
||||
with self.__thread_lock:
|
||||
self.__job_success = False
|
||||
end_date = datetime.now()
|
||||
|
||||
if ret == 1:
|
||||
with self.__thread_lock:
|
||||
|
|
@ -181,18 +183,17 @@ class JobScheduler(ApiCaller):
|
|||
with self.__thread_lock:
|
||||
self.__job_success = False
|
||||
|
||||
Thread(target=self.__update_job, args=(plugin, name, success)).start()
|
||||
Thread(target=self.__add_job_run, args=(name, success, start_date, end_date)).start()
|
||||
|
||||
return ret
|
||||
|
||||
def __update_job(self, plugin: str, name: str, success: bool):
|
||||
def __add_job_run(self, name: str, success: bool, start_date: datetime, end_date: datetime = None):
|
||||
with self.__thread_lock:
|
||||
err = self.db.update_job(plugin, name, success)
|
||||
err = self.db.add_job_run(name, success, start_date, end_date)
|
||||
|
||||
if not err:
|
||||
self.__logger.info(f"Successfully updated database for the job {name} from plugin {plugin}")
|
||||
else:
|
||||
self.__logger.warning(f"Failed to update database for the job {name} from plugin {plugin}: {err}")
|
||||
return self.__logger.info(f"Successfully added job run for the job {name}")
|
||||
self.__logger.warning(f"Failed to add job run for the job {name}: {err}")
|
||||
|
||||
def setup(self):
|
||||
for plugin, jobs in self.__jobs.items():
|
||||
|
|
|
|||
Loading…
Reference in a new issue