Add the possibility to run plugins job in async mode to avoid running them in order in the scheduler by setting the async key to true in the plugin job configuration (default is false)

This commit is contained in:
Théophile Diot 2024-11-03 11:58:18 +01:00
parent a7828e04d2
commit 983adf64a1
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
18 changed files with 100 additions and 40 deletions

View file

@ -48,7 +48,8 @@
"name": "backup-data",
"file": "backup-data.py",
"every": "day",
"reload": false
"reload": false,
"async": true
}
],
"bwcli": {

View file

@ -209,7 +209,8 @@
"name": "blacklist-download",
"file": "blacklist-download.py",
"every": "hour",
"reload": true
"reload": true,
"async": true
}
]
}

View file

@ -56,7 +56,8 @@
"name": "custom-cert",
"file": "custom-cert.py",
"every": "day",
"reload": true
"reload": true,
"async": true
}
]
}

View file

@ -48,7 +48,8 @@
"name": "cleanup-excess-jobs-runs",
"file": "cleanup-excess-jobs-runs.py",
"every": "day",
"reload": false
"reload": false,
"async": true
}
]
}

View file

@ -119,7 +119,8 @@
"name": "greylist-download",
"file": "greylist-download.py",
"every": "hour",
"reload": true
"reload": true,
"async": true
}
]
}

View file

@ -10,25 +10,29 @@
"name": "mmdb-country",
"file": "mmdb-country.py",
"every": "day",
"reload": true
"reload": true,
"async": true
},
{
"name": "mmdb-asn",
"file": "mmdb-asn.py",
"every": "day",
"reload": true
"reload": true,
"async": true
},
{
"name": "update-check",
"file": "update-check.py",
"every": "day",
"reload": false
"reload": false,
"async": true
},
{
"name": "failover-backup",
"file": "failover-backup.py",
"every": "once",
"reload": false
"reload": false,
"async": true
}
]
}

View file

@ -201,19 +201,22 @@
"name": "default-server-cert",
"file": "default-server-cert.py",
"every": "once",
"reload": false
"reload": false,
"async": true
},
{
"name": "anonymous-report",
"file": "anonymous-report.py",
"every": "day",
"reload": false
"reload": false,
"async": true
},
{
"name": "download-plugins",
"file": "download-plugins.py",
"every": "once",
"reload": false
"reload": false,
"async": true
}
]
}

View file

@ -86,13 +86,15 @@
"name": "coreruleset-nightly",
"file": "coreruleset-nightly.py",
"every": "day",
"reload": true
"reload": true,
"async": true
},
{
"name": "download-crs-plugins",
"file": "download-crs-plugins.py",
"every": "day",
"reload": true
"reload": true,
"async": true
}
]
}

View file

@ -20,7 +20,8 @@
"name": "download-pro-plugins",
"file": "download-pro-plugins.py",
"every": "day",
"reload": true
"reload": true,
"async": true
}
]
}

View file

@ -65,7 +65,8 @@
"name": "realip-download",
"file": "realip-download.py",
"every": "hour",
"reload": true
"reload": true,
"async": true
}
]
}

View file

@ -38,7 +38,8 @@
"name": "self-signed",
"file": "self-signed.py",
"every": "day",
"reload": true
"reload": true,
"async": true
}
]
}

View file

@ -119,7 +119,8 @@
"name": "whitelist-download",
"file": "whitelist-download.py",
"every": "hour",
"reload": true
"reload": true,
"async": true
}
]
}

View file

@ -720,6 +720,7 @@ class Database:
for job in jobs:
job["file_name"] = job.pop("file")
job["reload"] = job.get("reload", False)
job["run_async"] = job.pop("async", False)
# ? Check if the job already exists and if it has changed
if plugin["id"] in found_plugins:
@ -2578,7 +2579,7 @@ class Database:
for job in jobs:
db_job = (
session.query(Jobs)
.with_entities(Jobs.file_name, Jobs.every, Jobs.reload)
.with_entities(Jobs.file_name, Jobs.every, Jobs.reload, Jobs.run_async)
.filter_by(name=job["name"], plugin_id=plugin["id"])
.first()
)
@ -2587,6 +2588,7 @@ class Database:
changes = True
job["file_name"] = job.pop("file")
job["reload"] = job.get("reload", False)
job["run_async"] = job.pop("async", False)
to_put.append(Jobs(plugin_id=plugin["id"], **job))
else:
updates = {}
@ -2597,9 +2599,12 @@ class Database:
if job["every"] != db_job.every:
updates[Jobs.every] = job["every"]
if job.get("reload", None) != db_job.reload:
if job.get("reload", False) != db_job.reload:
updates[Jobs.reload] = job.get("reload", False)
if job.get("async", False) != db_job.run_async:
updates[Jobs.run_async] = job.get("async", False)
if updates:
changes = True
updates[Jobs.last_run] = None
@ -2959,9 +2964,7 @@ class Database:
plugin_settings.add(setting)
for job in jobs:
db_job = (
session.query(Jobs).with_entities(Jobs.file_name, Jobs.every, Jobs.reload).filter_by(name=job["name"], plugin_id=plugin["id"]).first()
)
db_job = session.query(Jobs).filter_by(name=job["name"], plugin_id=plugin["id"]).first()
if db_job is not None:
self.logger.warning(f"A job with the name {job['name']} already exists in the database, therefore it will not be added.")
@ -2969,6 +2972,7 @@ class Database:
job["file_name"] = job.pop("file")
job["reload"] = job.get("reload", False)
job["run_async"] = job.pop("async", False)
to_put.append(Jobs(plugin_id=plugin["id"], **job))
plugin_path = Path(sep, "var", "tmp", "bunkerweb", "ui", plugin["id"])
@ -3299,6 +3303,7 @@ class Database:
"plugin_id": job.plugin_id,
"every": job.every,
"reload": job.reload,
"async": job.run_async,
"history": [
{
"start_date": job_run.start_date.isoformat(),
@ -3323,7 +3328,7 @@ class Database:
.filter_by(job_name=job.name)
],
}
for job in session.query(Jobs).with_entities(Jobs.name, Jobs.plugin_id, Jobs.every, Jobs.reload)
for job in session.query(Jobs).with_entities(Jobs.name, Jobs.plugin_id, Jobs.every, Jobs.reload, Jobs.run_async)
}
def get_job_cache_file(

View file

@ -138,6 +138,7 @@ class Jobs(Base):
file_name = Column(String(256), nullable=False)
every = Column(SCHEDULES_ENUM, nullable=False)
reload = Column(Boolean, default=False, nullable=False)
run_async = Column(Boolean, default=False, nullable=False)
plugin = relationship("Plugins", back_populates="jobs")
cache = relationship("Jobs_cache", back_populates="job", cascade="all")

View file

@ -357,7 +357,9 @@ class Configurator:
)
elif job["every"] not in ("once", "minute", "hour", "day", "week"):
return (False, f"Invalid every for job {job['name']} in plugin {plugin['id']} (Must be once, minute, hour, day or week)")
elif job["reload"] is not True and job["reload"] is not False:
elif job.get("reload", False) is not True and job.get("reload", False) is not False:
return (False, f"Invalid reload for job {job['name']} in plugin {plugin['id']} (Must be true or false)")
elif job.get("async", False) is not True and job.get("async", False) is not False:
return (False, f"Invalid async for job {job['name']} in plugin {plugin['id']} (Must be true or false)")
return True, "ok"

View file

@ -16,7 +16,7 @@ import schedule
from schedule import Job
from subprocess import DEVNULL, STDOUT, run
from sys import path as sys_path
from threading import Lock, Semaphore
from threading import Lock
# Add dependencies to sys.path
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("utils",), ("db",))]:
@ -46,7 +46,6 @@ class JobScheduler(ApiCaller):
self.__thread_lock = Lock()
self.__job_success = True
self.__job_reload = False
self.__semaphore = Semaphore(cpu_count() or 1)
self.__executor = ThreadPoolExecutor(max_workers=cpu_count() or 1)
self.__compiled_regexes = self.__compile_regexes()
self.update_jobs()
@ -104,9 +103,10 @@ class JobScheduler(ApiCaller):
name_valid = self.__compiled_regexes["name"].match(job["name"])
file_valid = self.__compiled_regexes["file"].match(job["file"])
every_valid = job["every"] in ("once", "minute", "hour", "day", "week")
reload_valid = isinstance(job["reload"], bool)
reload_valid = isinstance(job.get("reload", False), bool)
async_valid = isinstance(job.get("async", False), bool)
if not (name_valid and file_valid and every_valid and reload_valid):
if not all((name_valid, file_valid, every_valid, reload_valid, async_valid)):
self.__logger.warning(f"Invalid job definition in plugin {plugin_name}. Job: {job}")
continue
@ -213,7 +213,7 @@ class JobScheduler(ApiCaller):
self.__job_reload = False
# Use ThreadPoolExecutor to run jobs
futures = [self.__executor.submit(self.__run_jobs_with_semaphore, [job.run]) for job in pending_jobs]
futures = [self.__executor.submit(job.run) for job in pending_jobs]
# Wait for all jobs to complete
for future in futures:
@ -261,6 +261,10 @@ class JobScheduler(ApiCaller):
if plugins and plugin not in plugins:
continue
for job in jobs:
if job.get("async", False):
futures.append(self.__executor.submit(self.__job_wrapper, job["path"], plugin, job["name"], job["file"]))
continue
jobs_to_run.append(
partial(
self.__job_wrapper,
@ -270,7 +274,9 @@ class JobScheduler(ApiCaller):
job["file"],
)
)
futures.append(self.__executor.submit(self.__run_jobs_with_semaphore, jobs_to_run))
if jobs_to_run:
futures.append(self.__executor.submit(self.__run_jobs, jobs_to_run))
# Wait for all jobs to complete
for future in futures:
@ -312,10 +318,9 @@ class JobScheduler(ApiCaller):
self.__lock.release()
return self.__job_success
def __run_jobs_with_semaphore(self, jobs):
with self.__semaphore:
for job in jobs:
job()
def __run_jobs(self, jobs):
for job in jobs:
job()
def clear(self):
schedule.clear()

View file

@ -9,7 +9,7 @@ $(document).ready(function () {
viewTotal: true,
cascadePanes: true,
collapse: false,
columns: [2, 3, 4, 5],
columns: [2, 3, 4, 5, 6],
},
},
topStart: {},
@ -288,17 +288,16 @@ $(document).ready(function () {
{
searchPanes: {
show: true,
header: "Last run state",
header: "Async",
options: [
{
label: '<i class="bx bx-xs bx-x text-danger"></i>&nbsp;Failed',
label: '<i class="bx bx-xs bx-x text-danger"></i>&nbsp;No',
value: function (rowData, rowIdx) {
return rowData[5].includes("bx-x");
},
},
{
label:
'<i class="bx bx-xs bx-check text-success"></i>&nbsp;Success',
label: '<i class="bx bx-xs bx-check text-success"></i>&nbsp;Yes',
value: function (rowData, rowIdx) {
return rowData[5].includes("bx-check");
},
@ -309,6 +308,30 @@ $(document).ready(function () {
},
targets: 5,
},
{
searchPanes: {
show: true,
header: "Last run state",
options: [
{
label: '<i class="bx bx-xs bx-x text-danger"></i>&nbsp;Failed',
value: function (rowData, rowIdx) {
return rowData[6].includes("bx-x");
},
},
{
label:
'<i class="bx bx-xs bx-check text-success"></i>&nbsp;Success',
value: function (rowData, rowIdx) {
return rowData[6].includes("bx-check");
},
},
],
combiner: "or",
orderable: false,
},
targets: 6,
},
],
order: [[2, "asc"]],
autoFill: false,

View file

@ -31,6 +31,9 @@
<th data-bs-toggle="tooltip"
data-bs-placement="bottom"
data-bs-original-title="Does the Job reloads BunkerWeb?">Reload</th>
<th data-bs-toggle="tooltip"
data-bs-placement="bottom"
data-bs-original-title="Does the Job run asynchronously?">Async</th>
<th data-bs-toggle="tooltip"
data-bs-placement="bottom"
data-bs-original-title="Does the last Job's execution was successful?">Last run</th>
@ -53,6 +56,9 @@
<td class="text-center">
<i class="bx bx-sm bx-{% if job_data['reload'] %}check text-success{% else %}x text-danger{% endif %}"></i>
</td>
<td class="text-center">
<i class="bx bx-sm bx-{% if job_data['async'] %}check text-success{% else %}x text-danger{% endif %}"></i>
</td>
<td class="text-center">
{% if job_data['history'] %}
<i class="bx bx-sm bx-{% if job_data['history'][0]['success'] %}check text-success{% else %}x text-danger{% endif %}"></i>