mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Add the possibility to run plugins job in async mode to avoid running them in order in the scheduler by setting the async key to true in the plugin job configuration (default is false)
This commit is contained in:
parent
a7828e04d2
commit
983adf64a1
18 changed files with 100 additions and 40 deletions
|
|
@ -48,7 +48,8 @@
|
|||
"name": "backup-data",
|
||||
"file": "backup-data.py",
|
||||
"every": "day",
|
||||
"reload": false
|
||||
"reload": false,
|
||||
"async": true
|
||||
}
|
||||
],
|
||||
"bwcli": {
|
||||
|
|
|
|||
|
|
@ -209,7 +209,8 @@
|
|||
"name": "blacklist-download",
|
||||
"file": "blacklist-download.py",
|
||||
"every": "hour",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,7 +56,8 @@
|
|||
"name": "custom-cert",
|
||||
"file": "custom-cert.py",
|
||||
"every": "day",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,7 +48,8 @@
|
|||
"name": "cleanup-excess-jobs-runs",
|
||||
"file": "cleanup-excess-jobs-runs.py",
|
||||
"every": "day",
|
||||
"reload": false
|
||||
"reload": false,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -119,7 +119,8 @@
|
|||
"name": "greylist-download",
|
||||
"file": "greylist-download.py",
|
||||
"every": "hour",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,25 +10,29 @@
|
|||
"name": "mmdb-country",
|
||||
"file": "mmdb-country.py",
|
||||
"every": "day",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
},
|
||||
{
|
||||
"name": "mmdb-asn",
|
||||
"file": "mmdb-asn.py",
|
||||
"every": "day",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
},
|
||||
{
|
||||
"name": "update-check",
|
||||
"file": "update-check.py",
|
||||
"every": "day",
|
||||
"reload": false
|
||||
"reload": false,
|
||||
"async": true
|
||||
},
|
||||
{
|
||||
"name": "failover-backup",
|
||||
"file": "failover-backup.py",
|
||||
"every": "once",
|
||||
"reload": false
|
||||
"reload": false,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -201,19 +201,22 @@
|
|||
"name": "default-server-cert",
|
||||
"file": "default-server-cert.py",
|
||||
"every": "once",
|
||||
"reload": false
|
||||
"reload": false,
|
||||
"async": true
|
||||
},
|
||||
{
|
||||
"name": "anonymous-report",
|
||||
"file": "anonymous-report.py",
|
||||
"every": "day",
|
||||
"reload": false
|
||||
"reload": false,
|
||||
"async": true
|
||||
},
|
||||
{
|
||||
"name": "download-plugins",
|
||||
"file": "download-plugins.py",
|
||||
"every": "once",
|
||||
"reload": false
|
||||
"reload": false,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -86,13 +86,15 @@
|
|||
"name": "coreruleset-nightly",
|
||||
"file": "coreruleset-nightly.py",
|
||||
"every": "day",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
},
|
||||
{
|
||||
"name": "download-crs-plugins",
|
||||
"file": "download-crs-plugins.py",
|
||||
"every": "day",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,8 @@
|
|||
"name": "download-pro-plugins",
|
||||
"file": "download-pro-plugins.py",
|
||||
"every": "day",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,7 +65,8 @@
|
|||
"name": "realip-download",
|
||||
"file": "realip-download.py",
|
||||
"every": "hour",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,8 @@
|
|||
"name": "self-signed",
|
||||
"file": "self-signed.py",
|
||||
"every": "day",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -119,7 +119,8 @@
|
|||
"name": "whitelist-download",
|
||||
"file": "whitelist-download.py",
|
||||
"every": "hour",
|
||||
"reload": true
|
||||
"reload": true,
|
||||
"async": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -720,6 +720,7 @@ class Database:
|
|||
for job in jobs:
|
||||
job["file_name"] = job.pop("file")
|
||||
job["reload"] = job.get("reload", False)
|
||||
job["run_async"] = job.pop("async", False)
|
||||
|
||||
# ? Check if the job already exists and if it has changed
|
||||
if plugin["id"] in found_plugins:
|
||||
|
|
@ -2578,7 +2579,7 @@ class Database:
|
|||
for job in jobs:
|
||||
db_job = (
|
||||
session.query(Jobs)
|
||||
.with_entities(Jobs.file_name, Jobs.every, Jobs.reload)
|
||||
.with_entities(Jobs.file_name, Jobs.every, Jobs.reload, Jobs.run_async)
|
||||
.filter_by(name=job["name"], plugin_id=plugin["id"])
|
||||
.first()
|
||||
)
|
||||
|
|
@ -2587,6 +2588,7 @@ class Database:
|
|||
changes = True
|
||||
job["file_name"] = job.pop("file")
|
||||
job["reload"] = job.get("reload", False)
|
||||
job["run_async"] = job.pop("async", False)
|
||||
to_put.append(Jobs(plugin_id=plugin["id"], **job))
|
||||
else:
|
||||
updates = {}
|
||||
|
|
@ -2597,9 +2599,12 @@ class Database:
|
|||
if job["every"] != db_job.every:
|
||||
updates[Jobs.every] = job["every"]
|
||||
|
||||
if job.get("reload", None) != db_job.reload:
|
||||
if job.get("reload", False) != db_job.reload:
|
||||
updates[Jobs.reload] = job.get("reload", False)
|
||||
|
||||
if job.get("async", False) != db_job.run_async:
|
||||
updates[Jobs.run_async] = job.get("async", False)
|
||||
|
||||
if updates:
|
||||
changes = True
|
||||
updates[Jobs.last_run] = None
|
||||
|
|
@ -2959,9 +2964,7 @@ class Database:
|
|||
plugin_settings.add(setting)
|
||||
|
||||
for job in jobs:
|
||||
db_job = (
|
||||
session.query(Jobs).with_entities(Jobs.file_name, Jobs.every, Jobs.reload).filter_by(name=job["name"], plugin_id=plugin["id"]).first()
|
||||
)
|
||||
db_job = session.query(Jobs).filter_by(name=job["name"], plugin_id=plugin["id"]).first()
|
||||
|
||||
if db_job is not None:
|
||||
self.logger.warning(f"A job with the name {job['name']} already exists in the database, therefore it will not be added.")
|
||||
|
|
@ -2969,6 +2972,7 @@ class Database:
|
|||
|
||||
job["file_name"] = job.pop("file")
|
||||
job["reload"] = job.get("reload", False)
|
||||
job["run_async"] = job.pop("async", False)
|
||||
to_put.append(Jobs(plugin_id=plugin["id"], **job))
|
||||
|
||||
plugin_path = Path(sep, "var", "tmp", "bunkerweb", "ui", plugin["id"])
|
||||
|
|
@ -3299,6 +3303,7 @@ class Database:
|
|||
"plugin_id": job.plugin_id,
|
||||
"every": job.every,
|
||||
"reload": job.reload,
|
||||
"async": job.run_async,
|
||||
"history": [
|
||||
{
|
||||
"start_date": job_run.start_date.isoformat(),
|
||||
|
|
@ -3323,7 +3328,7 @@ class Database:
|
|||
.filter_by(job_name=job.name)
|
||||
],
|
||||
}
|
||||
for job in session.query(Jobs).with_entities(Jobs.name, Jobs.plugin_id, Jobs.every, Jobs.reload)
|
||||
for job in session.query(Jobs).with_entities(Jobs.name, Jobs.plugin_id, Jobs.every, Jobs.reload, Jobs.run_async)
|
||||
}
|
||||
|
||||
def get_job_cache_file(
|
||||
|
|
|
|||
|
|
@ -138,6 +138,7 @@ class Jobs(Base):
|
|||
file_name = Column(String(256), nullable=False)
|
||||
every = Column(SCHEDULES_ENUM, nullable=False)
|
||||
reload = Column(Boolean, default=False, nullable=False)
|
||||
run_async = Column(Boolean, default=False, nullable=False)
|
||||
|
||||
plugin = relationship("Plugins", back_populates="jobs")
|
||||
cache = relationship("Jobs_cache", back_populates="job", cascade="all")
|
||||
|
|
|
|||
|
|
@ -357,7 +357,9 @@ class Configurator:
|
|||
)
|
||||
elif job["every"] not in ("once", "minute", "hour", "day", "week"):
|
||||
return (False, f"Invalid every for job {job['name']} in plugin {plugin['id']} (Must be once, minute, hour, day or week)")
|
||||
elif job["reload"] is not True and job["reload"] is not False:
|
||||
elif job.get("reload", False) is not True and job.get("reload", False) is not False:
|
||||
return (False, f"Invalid reload for job {job['name']} in plugin {plugin['id']} (Must be true or false)")
|
||||
elif job.get("async", False) is not True and job.get("async", False) is not False:
|
||||
return (False, f"Invalid async for job {job['name']} in plugin {plugin['id']} (Must be true or false)")
|
||||
|
||||
return True, "ok"
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import schedule
|
|||
from schedule import Job
|
||||
from subprocess import DEVNULL, STDOUT, run
|
||||
from sys import path as sys_path
|
||||
from threading import Lock, Semaphore
|
||||
from threading import Lock
|
||||
|
||||
# Add dependencies to sys.path
|
||||
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("utils",), ("db",))]:
|
||||
|
|
@ -46,7 +46,6 @@ class JobScheduler(ApiCaller):
|
|||
self.__thread_lock = Lock()
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
self.__semaphore = Semaphore(cpu_count() or 1)
|
||||
self.__executor = ThreadPoolExecutor(max_workers=cpu_count() or 1)
|
||||
self.__compiled_regexes = self.__compile_regexes()
|
||||
self.update_jobs()
|
||||
|
|
@ -104,9 +103,10 @@ class JobScheduler(ApiCaller):
|
|||
name_valid = self.__compiled_regexes["name"].match(job["name"])
|
||||
file_valid = self.__compiled_regexes["file"].match(job["file"])
|
||||
every_valid = job["every"] in ("once", "minute", "hour", "day", "week")
|
||||
reload_valid = isinstance(job["reload"], bool)
|
||||
reload_valid = isinstance(job.get("reload", False), bool)
|
||||
async_valid = isinstance(job.get("async", False), bool)
|
||||
|
||||
if not (name_valid and file_valid and every_valid and reload_valid):
|
||||
if not all((name_valid, file_valid, every_valid, reload_valid, async_valid)):
|
||||
self.__logger.warning(f"Invalid job definition in plugin {plugin_name}. Job: {job}")
|
||||
continue
|
||||
|
||||
|
|
@ -213,7 +213,7 @@ class JobScheduler(ApiCaller):
|
|||
self.__job_reload = False
|
||||
|
||||
# Use ThreadPoolExecutor to run jobs
|
||||
futures = [self.__executor.submit(self.__run_jobs_with_semaphore, [job.run]) for job in pending_jobs]
|
||||
futures = [self.__executor.submit(job.run) for job in pending_jobs]
|
||||
|
||||
# Wait for all jobs to complete
|
||||
for future in futures:
|
||||
|
|
@ -261,6 +261,10 @@ class JobScheduler(ApiCaller):
|
|||
if plugins and plugin not in plugins:
|
||||
continue
|
||||
for job in jobs:
|
||||
if job.get("async", False):
|
||||
futures.append(self.__executor.submit(self.__job_wrapper, job["path"], plugin, job["name"], job["file"]))
|
||||
continue
|
||||
|
||||
jobs_to_run.append(
|
||||
partial(
|
||||
self.__job_wrapper,
|
||||
|
|
@ -270,7 +274,9 @@ class JobScheduler(ApiCaller):
|
|||
job["file"],
|
||||
)
|
||||
)
|
||||
futures.append(self.__executor.submit(self.__run_jobs_with_semaphore, jobs_to_run))
|
||||
|
||||
if jobs_to_run:
|
||||
futures.append(self.__executor.submit(self.__run_jobs, jobs_to_run))
|
||||
|
||||
# Wait for all jobs to complete
|
||||
for future in futures:
|
||||
|
|
@ -312,10 +318,9 @@ class JobScheduler(ApiCaller):
|
|||
self.__lock.release()
|
||||
return self.__job_success
|
||||
|
||||
def __run_jobs_with_semaphore(self, jobs):
|
||||
with self.__semaphore:
|
||||
for job in jobs:
|
||||
job()
|
||||
def __run_jobs(self, jobs):
|
||||
for job in jobs:
|
||||
job()
|
||||
|
||||
def clear(self):
|
||||
schedule.clear()
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ $(document).ready(function () {
|
|||
viewTotal: true,
|
||||
cascadePanes: true,
|
||||
collapse: false,
|
||||
columns: [2, 3, 4, 5],
|
||||
columns: [2, 3, 4, 5, 6],
|
||||
},
|
||||
},
|
||||
topStart: {},
|
||||
|
|
@ -288,17 +288,16 @@ $(document).ready(function () {
|
|||
{
|
||||
searchPanes: {
|
||||
show: true,
|
||||
header: "Last run state",
|
||||
header: "Async",
|
||||
options: [
|
||||
{
|
||||
label: '<i class="bx bx-xs bx-x text-danger"></i> Failed',
|
||||
label: '<i class="bx bx-xs bx-x text-danger"></i> No',
|
||||
value: function (rowData, rowIdx) {
|
||||
return rowData[5].includes("bx-x");
|
||||
},
|
||||
},
|
||||
{
|
||||
label:
|
||||
'<i class="bx bx-xs bx-check text-success"></i> Success',
|
||||
label: '<i class="bx bx-xs bx-check text-success"></i> Yes',
|
||||
value: function (rowData, rowIdx) {
|
||||
return rowData[5].includes("bx-check");
|
||||
},
|
||||
|
|
@ -309,6 +308,30 @@ $(document).ready(function () {
|
|||
},
|
||||
targets: 5,
|
||||
},
|
||||
{
|
||||
searchPanes: {
|
||||
show: true,
|
||||
header: "Last run state",
|
||||
options: [
|
||||
{
|
||||
label: '<i class="bx bx-xs bx-x text-danger"></i> Failed',
|
||||
value: function (rowData, rowIdx) {
|
||||
return rowData[6].includes("bx-x");
|
||||
},
|
||||
},
|
||||
{
|
||||
label:
|
||||
'<i class="bx bx-xs bx-check text-success"></i> Success',
|
||||
value: function (rowData, rowIdx) {
|
||||
return rowData[6].includes("bx-check");
|
||||
},
|
||||
},
|
||||
],
|
||||
combiner: "or",
|
||||
orderable: false,
|
||||
},
|
||||
targets: 6,
|
||||
},
|
||||
],
|
||||
order: [[2, "asc"]],
|
||||
autoFill: false,
|
||||
|
|
|
|||
|
|
@ -31,6 +31,9 @@
|
|||
<th data-bs-toggle="tooltip"
|
||||
data-bs-placement="bottom"
|
||||
data-bs-original-title="Does the Job reloads BunkerWeb?">Reload</th>
|
||||
<th data-bs-toggle="tooltip"
|
||||
data-bs-placement="bottom"
|
||||
data-bs-original-title="Does the Job run asynchronously?">Async</th>
|
||||
<th data-bs-toggle="tooltip"
|
||||
data-bs-placement="bottom"
|
||||
data-bs-original-title="Does the last Job's execution was successful?">Last run</th>
|
||||
|
|
@ -53,6 +56,9 @@
|
|||
<td class="text-center">
|
||||
<i class="bx bx-sm bx-{% if job_data['reload'] %}check text-success{% else %}x text-danger{% endif %}"></i>
|
||||
</td>
|
||||
<td class="text-center">
|
||||
<i class="bx bx-sm bx-{% if job_data['async'] %}check text-success{% else %}x text-danger{% endif %}"></i>
|
||||
</td>
|
||||
<td class="text-center">
|
||||
{% if job_data['history'] %}
|
||||
<i class="bx bx-sm bx-{% if job_data['history'][0]['success'] %}check text-success{% else %}x text-danger{% endif %}"></i>
|
||||
|
|
|
|||
Loading…
Reference in a new issue