Optimized scheduler execution

This commit is contained in:
Théophile Diot 2024-05-28 13:23:58 +01:00
parent 5f1cb7bde9
commit ed0283737d
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06

View file

@ -124,7 +124,7 @@ def stop(status):
_exit(status)
def generate_custom_configs(configs: List[Dict[str, Any]], *, original_path: Union[Path, str] = CUSTOM_CONFIGS_PATH):
def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, original_path: Union[Path, str] = CUSTOM_CONFIGS_PATH):
if not isinstance(original_path, Path):
original_path = Path(original_path)
@ -138,6 +138,10 @@ def generate_custom_configs(configs: List[Dict[str, Any]], *, original_path: Uni
elif file.is_dir():
rmtree(file, ignore_errors=True)
if configs is None:
assert SCHEDULER is not None
configs = SCHEDULER.db.get_custom_configs()
if configs:
logger.info("Generating new custom configs ...")
original_path.mkdir(parents=True, exist_ok=True)
@ -171,7 +175,7 @@ def generate_custom_configs(configs: List[Dict[str, Any]], *, original_path: Uni
logger.error("Sending custom configs failed, configuration will not work as expected...")
def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]], *, original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]] = None, *, original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
if not isinstance(original_path, Path):
original_path = Path(original_path)
pro = "pro" in original_path.parts
@ -240,62 +244,61 @@ def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]], *, origin
logger.error(f"Sending {'pro ' if pro else ''}external plugins failed, configuration will not work as expected...")
def generate_caches(plugins: List[Dict[str, Any]]):
def generate_caches():
assert SCHEDULER is not None
for plugin in plugins:
job_cache_files = SCHEDULER.db.get_jobs_cache_files(plugin_id=plugin["id"])
plugin_cache_files = set()
ignored_dirs = set()
job_path = Path(sep, "var", "cache", "bunkerweb", plugin["id"])
job_cache_files = SCHEDULER.db.get_jobs_cache_files()
plugin_cache_files = set()
ignored_dirs = set()
for job_cache_file in job_cache_files:
cache_path = job_path.joinpath(job_cache_file["service_id"] or "", job_cache_file["file_name"])
plugin_cache_files.add(cache_path)
for job_cache_file in job_cache_files:
job_path = Path(sep, "var", "cache", "bunkerweb", job_cache_file["plugin_id"])
cache_path = job_path.joinpath(job_cache_file["service_id"] or "", job_cache_file["file_name"])
plugin_cache_files.add(cache_path)
try:
if job_cache_file["file_name"].endswith(".tgz"):
extract_path = cache_path.parent
if job_cache_file["file_name"].startswith("folder:"):
extract_path = Path(job_cache_file["file_name"].split("folder:", 1)[1].rsplit(".tgz", 1)[0])
ignored_dirs.add(extract_path.as_posix())
rmtree(extract_path, ignore_errors=True)
extract_path.mkdir(parents=True, exist_ok=True)
with tar_open(fileobj=BytesIO(job_cache_file["data"]), mode="r:gz") as tar:
assert isinstance(tar, TarFile)
try:
for member in tar.getmembers():
try:
tar.extract(member, path=extract_path)
except Exception as e:
logger.error(f"Error extracting {member.name}: {e}")
except Exception as e:
logger.error(f"Error extracting tar file: {e}")
logger.debug(f"Restored cache directory {extract_path}")
continue
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_bytes(job_cache_file["data"])
logger.debug(f"Restored cache file {job_cache_file['file_name']}")
except BaseException as e:
logger.error(f"Exception while restoring cache file {job_cache_file['file_name']} :\n{e}")
try:
if job_cache_file["file_name"].endswith(".tgz"):
extract_path = cache_path.parent
if job_cache_file["file_name"].startswith("folder:"):
extract_path = Path(job_cache_file["file_name"].split("folder:", 1)[1].rsplit(".tgz", 1)[0])
ignored_dirs.add(extract_path.as_posix())
rmtree(extract_path, ignore_errors=True)
extract_path.mkdir(parents=True, exist_ok=True)
with tar_open(fileobj=BytesIO(job_cache_file["data"]), mode="r:gz") as tar:
assert isinstance(tar, TarFile)
try:
for member in tar.getmembers():
try:
tar.extract(member, path=extract_path)
except Exception as e:
logger.error(f"Error extracting {member.name}: {e}")
except Exception as e:
logger.error(f"Error extracting tar file: {e}")
logger.debug(f"Restored cache directory {extract_path}")
continue
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_bytes(job_cache_file["data"])
logger.debug(f"Restored cache file {job_cache_file['file_name']}")
except BaseException as e:
logger.error(f"Exception while restoring cache file {job_cache_file['file_name']} :\n{e}")
if job_path.is_dir():
for file in job_path.rglob("*"):
if file.as_posix().startswith(tuple(ignored_dirs)):
continue
if job_path.is_dir():
for file in job_path.rglob("*"):
if file.as_posix().startswith(tuple(ignored_dirs)):
continue
logger.debug(f"Checking if {file} should be removed")
if file not in plugin_cache_files and file.is_file():
logger.debug(f"Removing non-cached file {file}")
file.unlink(missing_ok=True)
if file.parent.is_dir() and not list(file.parent.iterdir()):
logger.debug(f"Removing empty directory {file.parent}")
rmtree(file.parent, ignore_errors=True)
if file.parent == job_path:
break
elif file.is_dir() and not list(file.iterdir()):
logger.debug(f"Removing empty directory {file}")
rmtree(file, ignore_errors=True)
logger.debug(f"Checking if {file} should be removed")
if file not in plugin_cache_files and file.is_file():
logger.debug(f"Removing non-cached file {file}")
file.unlink(missing_ok=True)
if file.parent.is_dir() and not list(file.parent.iterdir()):
logger.debug(f"Removing empty directory {file.parent}")
rmtree(file.parent, ignore_errors=True)
if file.parent == job_path:
break
elif file.is_dir() and not list(file.iterdir()):
logger.debug(f"Removing empty directory {file}")
rmtree(file, ignore_errors=True)
def api_to_instance(api):
@ -321,17 +324,18 @@ def run_in_slave_mode():
sleep(5)
env = SCHEDULER.db.get_config()
# Download plugins
pro_plugins = SCHEDULER.db.get_plugins(_type="pro", with_data=True)
generate_external_plugins(pro_plugins, original_path=PRO_PLUGINS_PATH)
external_plugins = SCHEDULER.db.get_plugins(_type="external", with_data=True)
generate_external_plugins(external_plugins)
threads = [
Thread(target=generate_custom_configs),
Thread(target=generate_external_plugins),
Thread(target=generate_external_plugins, kwargs={"original_path": PRO_PLUGINS_PATH}),
Thread(target=generate_caches),
]
# Download custom configs
generate_custom_configs(SCHEDULER.db.get_custom_configs())
for thread in threads:
thread.start()
# Download caches
generate_caches(pro_plugins + external_plugins)
for thread in threads:
thread.join()
# Gen config
content = ""
@ -577,9 +581,9 @@ if __name__ == "__main__":
threads.clear()
if changes["pro_plugins_changed"]:
threads.append(Thread(target=generate_external_plugins, args=(None,), kwargs={"original_path": PRO_PLUGINS_PATH}))
threads.append(Thread(target=generate_external_plugins, kwargs={"original_path": PRO_PLUGINS_PATH}))
if changes["external_plugins_changed"]:
threads.append(Thread(target=generate_external_plugins, args=(None,)))
threads.append(Thread(target=generate_external_plugins))
for thread in threads:
thread.start()
@ -670,7 +674,7 @@ if __name__ == "__main__":
else:
logger.info("All jobs in run_once() were successful")
if SCHEDULER.db.readonly:
generate_caches(SCHEDULER.db.get_plugins())
generate_caches()
if CONFIG_NEED_GENERATION:
content = ""