Fix shenanigans with jobs cache restoration

This commit is contained in:
Théophile Diot 2024-03-12 20:10:39 +00:00
parent ac3d965360
commit 7272fe95be
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
3 changed files with 43 additions and 17 deletions

View file

@ -1762,13 +1762,13 @@ class Database:
filters = {"plugin_id": plugin_id}
if job_name:
filters["name"] = job_name
job_names = [name for name in session.query(Jobs).with_entities(Jobs.name).filter_by(**filters)]
job_names = [job.name for job in session.query(Jobs).with_entities(Jobs.name).filter_by(**filters)]
if not job_names:
return []
cache_files = []
for cache in db_cache:
if cache.job_name not in job_names:
if job_name and cache.job_name not in job_names:
continue
cache_files.append(
{

View file

@ -45,22 +45,34 @@ class Job:
self.logger = logger or self.db.logger
if not deprecated:
self.restore_cache()
with LOCK:
is_scheduler_first_start = self.db.is_scheduler_first_start()
self.restore_cache(manual=is_scheduler_first_start)
def restore_cache(self, *, job_name: str = "", plugin_id: str = "") -> bool:
def restore_cache(self, *, job_name: str = "", plugin_id: str = "", manual: bool = True) -> bool:
"""Restore job cache files from database."""
ret = True
with LOCK:
job_cache_files = self.db.get_jobs_cache_files(job_name=job_name or self.job_name, plugin_id=plugin_id or self.job_path.name, with_data=True) # type: ignore
job_cache_files = self.db.get_jobs_cache_files(plugin_id=plugin_id or self.job_path.name, with_data=True) # type: ignore
job_name = job_name or self.job_name
plugin_cache_files = []
for job_cache_file in job_cache_files:
cache_path = self.job_path.joinpath(job_cache_file["service_id"] or "", job_cache_file["file_name"])
plugin_cache_files.append(cache_path)
if job_cache_file["job_name"] != job_name:
continue
try:
cache_path = self.job_path.joinpath(job_cache_file["service_id"] or "", job_cache_file["file_name"])
if job_cache_file["file_name"].endswith(".tgz"):
rmtree(cache_path.parent, ignore_errors=True)
cache_path.parent.mkdir(parents=True, exist_ok=True)
extract_path = cache_path.parent
if job_cache_file["file_name"].startswith("path:"):
extract_path = Path(job_cache_file["file_name"].split(":", 1)[1].replace("_", "/").replace(".tgz", ""))
rmtree(extract_path, ignore_errors=True)
extract_path.mkdir(parents=True, exist_ok=True)
with tar_open(fileobj=BytesIO(job_cache_file["data"]), mode="r:gz") as tar:
tar.extractall(cache_path.parent)
tar.extractall(extract_path)
else:
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_bytes(job_cache_file["data"])
@ -68,6 +80,19 @@ class Job:
self.logger.error(f"Exception while restoring cache file {job_cache_file['file_name']} :\n{e}")
ret = False
if not manual:
for file in self.job_path.glob("**/*"):
self.logger.debug(f"file : {file}")
if file not in plugin_cache_files and file.is_file():
self.logger.debug(f"Removing non-cached file {file}")
file.unlink(missing_ok=True)
if file.parent.is_dir() and not list(file.parent.iterdir()):
self.logger.debug(f"Removing empty directory {file.parent}")
rmtree(file.parent, ignore_errors=True)
elif file.is_dir() and not list(file.iterdir()):
self.logger.debug(f"Removing empty directory {file}")
rmtree(file, ignore_errors=True)
return ret
def get_cache(
@ -152,7 +177,7 @@ class Job:
dir_path = Path(dir_path)
assert isinstance(dir_path, Path)
file_name = f"{dir_path.name}.tgz"
file_name = f"path:{dir_path.as_posix().replace('/', '_')}.tgz"
content = BytesIO()
with tar_open(file_name, mode="w:gz", fileobj=content, compresslevel=9) as tgz:
tgz.add(dir_path, arcname=".")

View file

@ -406,13 +406,6 @@ if __name__ == "__main__":
del dotenv_env
if scheduler_first_start:
ret = db.set_scheduler_first_start()
if ret:
logger.error(f"An error occurred when setting the scheduler first start : {ret}")
stop(1)
CONFIG_NEED_GENERATION = True
RUN_JOBS_ONCE = True
CHANGES = []
@ -548,6 +541,14 @@ if __name__ == "__main__":
PRO_PLUGINS_NEED_GENERATION = False
INSTANCES_NEED_GENERATION = False
if scheduler_first_start:
ret = db.set_scheduler_first_start()
if ret:
logger.error(f"An error occurred when setting the scheduler first start : {ret}")
stop(1)
scheduler_first_start = False
if not HEALTHY_PATH.is_file():
HEALTHY_PATH.write_text(datetime.now().isoformat(), encoding="utf-8")