Refactor how we run scheduled jobs to thread them

This commit is contained in:
Théophile Diot 2024-03-20 13:59:13 +00:00
parent 02d6f59d8c
commit 8316f113ac
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06

View file

@ -52,6 +52,7 @@ class JobScheduler(ApiCaller):
self.__lock = lock
self.__thread_lock = Lock()
self.__job_success = True
self.__job_reload = False
self.__semaphore = Semaphore(cpu_count() or 1)
@property
@ -187,7 +188,11 @@ class JobScheduler(ApiCaller):
with self.__thread_lock:
self.__job_success = False
if self.__job_success and ret >= 2:
if ret == 1:
with self.__thread_lock:
self.__job_reload = True
if self.__job_success and (ret < 0 or ret >= 2):
success = False
self.__logger.error(f"Error while executing job {name} from plugin {plugin}")
with self.__thread_lock:
@ -220,24 +225,25 @@ class JobScheduler(ApiCaller):
self.__logger.error(f"Exception while scheduling jobs for plugin {plugin} : {format_exc()}")
def run_pending(self) -> bool:
if self.__lock:
self.__lock.acquire()
threads = []
self.__job_success = True
self.__job_reload = False
jobs = [job for job in schedule_jobs if job.should_run]
success = True
reload = False
for job in jobs:
ret = job.run()
for job in schedule_jobs:
if not job.should_run:
continue
threads.append(Thread(target=self.__run_in_thread, args=((job.run,),)))
if not isinstance(ret, int):
ret = -1
for thread in threads:
thread.start()
if ret == 1:
reload = True
elif ret < 0 or ret >= 2:
success = False
for thread in threads:
thread.join()
if reload:
success = self.__job_success
self.__job_success = True
if self.__job_reload:
try:
if self.apis:
cache_path = join(sep, "var", "cache", "bunkerweb")
@ -247,28 +253,27 @@ class JobScheduler(ApiCaller):
self.__logger.error(f"Error while sending {cache_path} folder")
else:
self.__logger.info(f"Successfully sent {cache_path} folder")
if not self.__reload():
success = False
except:
except BaseException:
success = False
self.__logger.error(f"Exception while reloading after job scheduling : {format_exc()}")
self.__job_reload = False
if threads:
self.__logger.info("All scheduled jobs have been executed")
if self.__lock:
self.__lock.release()
return success
def run_once(self) -> bool:
threads = []
self.__job_success = True
self.__job_reload = False
for plugin, jobs in self.__jobs.items():
jobs_jobs = []
for job in jobs:
path = job["path"]
name = job["name"]
file = job["file"]
# Add job to the list of jobs to run in the order they are defined
jobs_jobs.append(partial(self.__job_wrapper, path, plugin, name, file))
# Add job to the list of jobs to run in the order they are defined
jobs_jobs = [partial(self.__job_wrapper, job["path"], plugin, job["name"], job["file"]) for job in jobs]
# Create a thread for each plugin
threads.append(Thread(target=self.__run_in_thread, args=(jobs_jobs,)))
@ -279,7 +284,7 @@ class JobScheduler(ApiCaller):
for thread in threads:
thread.join()
ret = self.__job_success is True
ret = self.__job_success
self.__job_success = True
return ret
@ -288,7 +293,7 @@ class JobScheduler(ApiCaller):
if self.__lock:
self.__lock.acquire()
job_plugin = None
job_plugin = ""
job_to_run = None
for plugin, jobs in self.__jobs.items():
for job in jobs:
@ -297,7 +302,7 @@ class JobScheduler(ApiCaller):
job_to_run = job
break
if not job_to_run:
if not job_plugin or not job_to_run:
self.__logger.warning(f"Job {job_name} not found")
return False