mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Optimized scheduler execution
This commit is contained in:
parent
5f1cb7bde9
commit
ed0283737d
1 changed files with 68 additions and 64 deletions
|
|
@ -124,7 +124,7 @@ def stop(status):
|
|||
_exit(status)
|
||||
|
||||
|
||||
def generate_custom_configs(configs: List[Dict[str, Any]], *, original_path: Union[Path, str] = CUSTOM_CONFIGS_PATH):
|
||||
def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, original_path: Union[Path, str] = CUSTOM_CONFIGS_PATH):
|
||||
if not isinstance(original_path, Path):
|
||||
original_path = Path(original_path)
|
||||
|
||||
|
|
@ -138,6 +138,10 @@ def generate_custom_configs(configs: List[Dict[str, Any]], *, original_path: Uni
|
|||
elif file.is_dir():
|
||||
rmtree(file, ignore_errors=True)
|
||||
|
||||
if configs is None:
|
||||
assert SCHEDULER is not None
|
||||
configs = SCHEDULER.db.get_custom_configs()
|
||||
|
||||
if configs:
|
||||
logger.info("Generating new custom configs ...")
|
||||
original_path.mkdir(parents=True, exist_ok=True)
|
||||
|
|
@ -171,7 +175,7 @@ def generate_custom_configs(configs: List[Dict[str, Any]], *, original_path: Uni
|
|||
logger.error("Sending custom configs failed, configuration will not work as expected...")
|
||||
|
||||
|
||||
def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]], *, original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
|
||||
def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]] = None, *, original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
|
||||
if not isinstance(original_path, Path):
|
||||
original_path = Path(original_path)
|
||||
pro = "pro" in original_path.parts
|
||||
|
|
@ -240,62 +244,61 @@ def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]], *, origin
|
|||
logger.error(f"Sending {'pro ' if pro else ''}external plugins failed, configuration will not work as expected...")
|
||||
|
||||
|
||||
def generate_caches(plugins: List[Dict[str, Any]]):
|
||||
def generate_caches():
|
||||
assert SCHEDULER is not None
|
||||
|
||||
for plugin in plugins:
|
||||
job_cache_files = SCHEDULER.db.get_jobs_cache_files(plugin_id=plugin["id"])
|
||||
plugin_cache_files = set()
|
||||
ignored_dirs = set()
|
||||
job_path = Path(sep, "var", "cache", "bunkerweb", plugin["id"])
|
||||
job_cache_files = SCHEDULER.db.get_jobs_cache_files()
|
||||
plugin_cache_files = set()
|
||||
ignored_dirs = set()
|
||||
|
||||
for job_cache_file in job_cache_files:
|
||||
cache_path = job_path.joinpath(job_cache_file["service_id"] or "", job_cache_file["file_name"])
|
||||
plugin_cache_files.add(cache_path)
|
||||
for job_cache_file in job_cache_files:
|
||||
job_path = Path(sep, "var", "cache", "bunkerweb", job_cache_file["plugin_id"])
|
||||
cache_path = job_path.joinpath(job_cache_file["service_id"] or "", job_cache_file["file_name"])
|
||||
plugin_cache_files.add(cache_path)
|
||||
|
||||
try:
|
||||
if job_cache_file["file_name"].endswith(".tgz"):
|
||||
extract_path = cache_path.parent
|
||||
if job_cache_file["file_name"].startswith("folder:"):
|
||||
extract_path = Path(job_cache_file["file_name"].split("folder:", 1)[1].rsplit(".tgz", 1)[0])
|
||||
ignored_dirs.add(extract_path.as_posix())
|
||||
rmtree(extract_path, ignore_errors=True)
|
||||
extract_path.mkdir(parents=True, exist_ok=True)
|
||||
with tar_open(fileobj=BytesIO(job_cache_file["data"]), mode="r:gz") as tar:
|
||||
assert isinstance(tar, TarFile)
|
||||
try:
|
||||
for member in tar.getmembers():
|
||||
try:
|
||||
tar.extract(member, path=extract_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting {member.name}: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting tar file: {e}")
|
||||
logger.debug(f"Restored cache directory {extract_path}")
|
||||
continue
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_path.write_bytes(job_cache_file["data"])
|
||||
logger.debug(f"Restored cache file {job_cache_file['file_name']}")
|
||||
except BaseException as e:
|
||||
logger.error(f"Exception while restoring cache file {job_cache_file['file_name']} :\n{e}")
|
||||
try:
|
||||
if job_cache_file["file_name"].endswith(".tgz"):
|
||||
extract_path = cache_path.parent
|
||||
if job_cache_file["file_name"].startswith("folder:"):
|
||||
extract_path = Path(job_cache_file["file_name"].split("folder:", 1)[1].rsplit(".tgz", 1)[0])
|
||||
ignored_dirs.add(extract_path.as_posix())
|
||||
rmtree(extract_path, ignore_errors=True)
|
||||
extract_path.mkdir(parents=True, exist_ok=True)
|
||||
with tar_open(fileobj=BytesIO(job_cache_file["data"]), mode="r:gz") as tar:
|
||||
assert isinstance(tar, TarFile)
|
||||
try:
|
||||
for member in tar.getmembers():
|
||||
try:
|
||||
tar.extract(member, path=extract_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting {member.name}: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting tar file: {e}")
|
||||
logger.debug(f"Restored cache directory {extract_path}")
|
||||
continue
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_path.write_bytes(job_cache_file["data"])
|
||||
logger.debug(f"Restored cache file {job_cache_file['file_name']}")
|
||||
except BaseException as e:
|
||||
logger.error(f"Exception while restoring cache file {job_cache_file['file_name']} :\n{e}")
|
||||
|
||||
if job_path.is_dir():
|
||||
for file in job_path.rglob("*"):
|
||||
if file.as_posix().startswith(tuple(ignored_dirs)):
|
||||
continue
|
||||
if job_path.is_dir():
|
||||
for file in job_path.rglob("*"):
|
||||
if file.as_posix().startswith(tuple(ignored_dirs)):
|
||||
continue
|
||||
|
||||
logger.debug(f"Checking if {file} should be removed")
|
||||
if file not in plugin_cache_files and file.is_file():
|
||||
logger.debug(f"Removing non-cached file {file}")
|
||||
file.unlink(missing_ok=True)
|
||||
if file.parent.is_dir() and not list(file.parent.iterdir()):
|
||||
logger.debug(f"Removing empty directory {file.parent}")
|
||||
rmtree(file.parent, ignore_errors=True)
|
||||
if file.parent == job_path:
|
||||
break
|
||||
elif file.is_dir() and not list(file.iterdir()):
|
||||
logger.debug(f"Removing empty directory {file}")
|
||||
rmtree(file, ignore_errors=True)
|
||||
logger.debug(f"Checking if {file} should be removed")
|
||||
if file not in plugin_cache_files and file.is_file():
|
||||
logger.debug(f"Removing non-cached file {file}")
|
||||
file.unlink(missing_ok=True)
|
||||
if file.parent.is_dir() and not list(file.parent.iterdir()):
|
||||
logger.debug(f"Removing empty directory {file.parent}")
|
||||
rmtree(file.parent, ignore_errors=True)
|
||||
if file.parent == job_path:
|
||||
break
|
||||
elif file.is_dir() and not list(file.iterdir()):
|
||||
logger.debug(f"Removing empty directory {file}")
|
||||
rmtree(file, ignore_errors=True)
|
||||
|
||||
|
||||
def api_to_instance(api):
|
||||
|
|
@ -321,17 +324,18 @@ def run_in_slave_mode():
|
|||
sleep(5)
|
||||
env = SCHEDULER.db.get_config()
|
||||
|
||||
# Download plugins
|
||||
pro_plugins = SCHEDULER.db.get_plugins(_type="pro", with_data=True)
|
||||
generate_external_plugins(pro_plugins, original_path=PRO_PLUGINS_PATH)
|
||||
external_plugins = SCHEDULER.db.get_plugins(_type="external", with_data=True)
|
||||
generate_external_plugins(external_plugins)
|
||||
threads = [
|
||||
Thread(target=generate_custom_configs),
|
||||
Thread(target=generate_external_plugins),
|
||||
Thread(target=generate_external_plugins, kwargs={"original_path": PRO_PLUGINS_PATH}),
|
||||
Thread(target=generate_caches),
|
||||
]
|
||||
|
||||
# Download custom configs
|
||||
generate_custom_configs(SCHEDULER.db.get_custom_configs())
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
||||
# Download caches
|
||||
generate_caches(pro_plugins + external_plugins)
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# Gen config
|
||||
content = ""
|
||||
|
|
@ -577,9 +581,9 @@ if __name__ == "__main__":
|
|||
threads.clear()
|
||||
|
||||
if changes["pro_plugins_changed"]:
|
||||
threads.append(Thread(target=generate_external_plugins, args=(None,), kwargs={"original_path": PRO_PLUGINS_PATH}))
|
||||
threads.append(Thread(target=generate_external_plugins, kwargs={"original_path": PRO_PLUGINS_PATH}))
|
||||
if changes["external_plugins_changed"]:
|
||||
threads.append(Thread(target=generate_external_plugins, args=(None,)))
|
||||
threads.append(Thread(target=generate_external_plugins))
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
|
@ -670,7 +674,7 @@ if __name__ == "__main__":
|
|||
else:
|
||||
logger.info("All jobs in run_once() were successful")
|
||||
if SCHEDULER.db.readonly:
|
||||
generate_caches(SCHEDULER.db.get_plugins())
|
||||
generate_caches()
|
||||
|
||||
if CONFIG_NEED_GENERATION:
|
||||
content = ""
|
||||
|
|
|
|||
Loading…
Reference in a new issue