mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Refactor how we run scheduled jobs to thread them
This commit is contained in:
parent
02d6f59d8c
commit
8316f113ac
1 changed files with 35 additions and 30 deletions
|
|
@ -52,6 +52,7 @@ class JobScheduler(ApiCaller):
|
|||
self.__lock = lock
|
||||
self.__thread_lock = Lock()
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
self.__semaphore = Semaphore(cpu_count() or 1)
|
||||
|
||||
@property
|
||||
|
|
@ -187,7 +188,11 @@ class JobScheduler(ApiCaller):
|
|||
with self.__thread_lock:
|
||||
self.__job_success = False
|
||||
|
||||
if self.__job_success and ret >= 2:
|
||||
if ret == 1:
|
||||
with self.__thread_lock:
|
||||
self.__job_reload = True
|
||||
|
||||
if self.__job_success and (ret < 0 or ret >= 2):
|
||||
success = False
|
||||
self.__logger.error(f"Error while executing job {name} from plugin {plugin}")
|
||||
with self.__thread_lock:
|
||||
|
|
@ -220,24 +225,25 @@ class JobScheduler(ApiCaller):
|
|||
self.__logger.error(f"Exception while scheduling jobs for plugin {plugin} : {format_exc()}")
|
||||
|
||||
def run_pending(self) -> bool:
|
||||
if self.__lock:
|
||||
self.__lock.acquire()
|
||||
threads = []
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
|
||||
jobs = [job for job in schedule_jobs if job.should_run]
|
||||
success = True
|
||||
reload = False
|
||||
for job in jobs:
|
||||
ret = job.run()
|
||||
for job in schedule_jobs:
|
||||
if not job.should_run:
|
||||
continue
|
||||
threads.append(Thread(target=self.__run_in_thread, args=((job.run,),)))
|
||||
|
||||
if not isinstance(ret, int):
|
||||
ret = -1
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
||||
if ret == 1:
|
||||
reload = True
|
||||
elif ret < 0 or ret >= 2:
|
||||
success = False
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
if reload:
|
||||
success = self.__job_success
|
||||
self.__job_success = True
|
||||
|
||||
if self.__job_reload:
|
||||
try:
|
||||
if self.apis:
|
||||
cache_path = join(sep, "var", "cache", "bunkerweb")
|
||||
|
|
@ -247,28 +253,27 @@ class JobScheduler(ApiCaller):
|
|||
self.__logger.error(f"Error while sending {cache_path} folder")
|
||||
else:
|
||||
self.__logger.info(f"Successfully sent {cache_path} folder")
|
||||
|
||||
if not self.__reload():
|
||||
success = False
|
||||
except:
|
||||
except BaseException:
|
||||
success = False
|
||||
self.__logger.error(f"Exception while reloading after job scheduling : {format_exc()}")
|
||||
self.__job_reload = False
|
||||
|
||||
if threads:
|
||||
self.__logger.info("All scheduled jobs have been executed")
|
||||
|
||||
if self.__lock:
|
||||
self.__lock.release()
|
||||
return success
|
||||
|
||||
def run_once(self) -> bool:
|
||||
threads = []
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
|
||||
for plugin, jobs in self.__jobs.items():
|
||||
jobs_jobs = []
|
||||
|
||||
for job in jobs:
|
||||
path = job["path"]
|
||||
name = job["name"]
|
||||
file = job["file"]
|
||||
|
||||
# Add job to the list of jobs to run in the order they are defined
|
||||
jobs_jobs.append(partial(self.__job_wrapper, path, plugin, name, file))
|
||||
# Add job to the list of jobs to run in the order they are defined
|
||||
jobs_jobs = [partial(self.__job_wrapper, job["path"], plugin, job["name"], job["file"]) for job in jobs]
|
||||
|
||||
# Create a thread for each plugin
|
||||
threads.append(Thread(target=self.__run_in_thread, args=(jobs_jobs,)))
|
||||
|
|
@ -279,7 +284,7 @@ class JobScheduler(ApiCaller):
|
|||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
ret = self.__job_success is True
|
||||
ret = self.__job_success
|
||||
self.__job_success = True
|
||||
|
||||
return ret
|
||||
|
|
@ -288,7 +293,7 @@ class JobScheduler(ApiCaller):
|
|||
if self.__lock:
|
||||
self.__lock.acquire()
|
||||
|
||||
job_plugin = None
|
||||
job_plugin = ""
|
||||
job_to_run = None
|
||||
for plugin, jobs in self.__jobs.items():
|
||||
for job in jobs:
|
||||
|
|
@ -297,7 +302,7 @@ class JobScheduler(ApiCaller):
|
|||
job_to_run = job
|
||||
break
|
||||
|
||||
if not job_to_run:
|
||||
if not job_plugin or not job_to_run:
|
||||
self.__logger.warning(f"Job {job_name} not found")
|
||||
return False
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue