chore: Refactor generate_caches function + Add a write test on the database before running the jobs on the scheduler

This commit is contained in:
Théophile Diot 2024-05-24 15:43:19 +01:00
parent f4f68bf635
commit 7118272b9e
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
4 changed files with 111 additions and 114 deletions

View file

@ -12,7 +12,6 @@ from re import compile as re_compile
from sys import argv, path as sys_path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from time import sleep
from traceback import format_exc
from zipfile import ZIP_DEFLATED, ZipFile
from model import (
@ -136,8 +135,8 @@ class Database:
except ArgumentError:
self.logger.error(f"Invalid database URI: {sqlalchemy_string}")
error = True
except SQLAlchemyError:
self.logger.error(f"Error when trying to create the engine: {format_exc()}")
except SQLAlchemyError as e:
self.logger.error(f"Error when trying to create the engine: {e}")
error = True
finally:
if error:
@ -175,10 +174,10 @@ class Database:
sqlalchemy_string = self.database_uri_readonly
self.last_fallback = datetime.now()
else:
self.logger.error(f"Can't connect to database : {format_exc()}")
self.logger.error(f"Can't connect to database : {e}")
_exit(1)
else:
self.logger.error(f"Can't connect to database : {format_exc()}")
self.logger.error(f"Can't connect to database : {e}")
_exit(1)
if "attempt to write a readonly database" in str(e):
@ -193,8 +192,8 @@ class Database:
self.logger.warning("Can't connect to database, retrying in 5 seconds ...")
retries -= 1
sleep(5)
except BaseException:
self.logger.error(f"Error when trying to connect to the database: {format_exc()}")
except BaseException as e:
self.logger.error(f"Error when trying to connect to the database: {e}")
exit(1)
self.suffix_rx = re_compile(r"_\d+$")
@ -209,6 +208,12 @@ class Database:
if self.sql_engine:
self.sql_engine.dispose()
def test_write(self):
"""Test the write access to the database"""
with self.__db_session() as session:
session.execute(text("CREATE TABLE IF NOT EXISTS test (id INT)"))
session.execute(text("DROP TABLE test"))
def retry_connection(self, *, readonly: bool = False, fallback: bool = False, **kwargs) -> None:
"""Retry the connection to the database"""
@ -298,8 +303,8 @@ class Database:
metadata.autoconf_loaded = value
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -326,8 +331,8 @@ class Database:
metadata.scheduler_first_start = value
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -346,8 +351,8 @@ class Database:
for key, value in data.items():
setattr(metadata, key, value)
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -401,8 +406,8 @@ class Database:
)
)
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -455,8 +460,8 @@ class Database:
"default": False,
}
)
except BaseException:
self.logger.debug(f"Can't get the metadata: {format_exc()}")
except BaseException as e:
self.logger.debug(f"Can't get the metadata: {e}")
return data
@ -484,8 +489,8 @@ class Database:
config_changed=metadata is not None and metadata.config_changed,
instances_changed=metadata is not None and metadata.instances_changed,
)
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
def checked_changes(self, changes: Optional[List[str]] = None, value: Optional[bool] = False) -> str:
"""Set changed bit for config, custom configs, instances and plugins"""
@ -519,8 +524,8 @@ class Database:
if "instances" in changes:
metadata.instances_changed = value
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -574,8 +579,8 @@ class Database:
try:
Base.metadata.create_all(self.sql_engine, checkfirst=True)
except BaseException:
return False, format_exc()
except BaseException as e:
return False, str(e)
if db_version and db_version != bunkerweb_version:
with self.__db_session() as session:
@ -941,8 +946,8 @@ class Database:
try:
session.add_all(to_put)
session.commit()
except BaseException:
return False, format_exc()
except BaseException as e:
return False, str(e)
return True, ""
@ -1177,8 +1182,8 @@ class Database:
try:
session.add_all(to_put)
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -1270,8 +1275,8 @@ class Database:
try:
session.add_all(to_put)
session.commit()
except BaseException:
return f"{f'{message}{endl}' if message else ''}{format_exc()}"
except BaseException as e:
return f"{f'{message}{endl}' if message else ''}{e}"
return message
@ -1426,8 +1431,8 @@ class Database:
try:
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -1445,8 +1450,8 @@ class Database:
try:
session.query(Jobs_cache).filter_by(**filters).delete()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -1486,8 +1491,8 @@ class Database:
try:
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -1998,8 +2003,8 @@ class Database:
try:
session.add_all(to_put)
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -2212,8 +2217,8 @@ class Database:
try:
session.commit()
except BaseException:
return f"An error occurred while adding the instance {hostname} (port: {port}, server name: {server_name}).\n{format_exc()}"
except BaseException as e:
return f"An error occurred while adding the instance {hostname} (port: {port}, server name: {server_name}).\n{e}"
return ""
@ -2244,8 +2249,8 @@ class Database:
try:
session.add_all(to_put)
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -2324,8 +2329,8 @@ class Database:
try:
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""
@ -2349,7 +2354,7 @@ class Database:
try:
session.commit()
except BaseException:
return format_exc()
except BaseException as e:
return str(e)
return ""

View file

@ -103,12 +103,7 @@ class Job:
with LOCK:
if not manual and self.job_path.is_dir():
for file in self.job_path.rglob("*"):
skipped = False
if file.as_posix().startswith(tuple(ignored_dirs)):
skipped = True
break
if skipped:
continue
self.logger.debug(f"Checking if {file} should be removed")

View file

@ -235,23 +235,9 @@ class JobScheduler(ApiCaller):
if not threads:
return True
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection(pool_timeout=5)
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True, pool_timeout=5)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True, pool_timeout=5)
self.db.readonly = True
if self.db.readonly:
self.__logger.error("Database is in read-only mode, jobs will not be executed")
return True
err = self.try_database_readonly()
if err:
return True
self.__job_success = True
self.__job_reload = False
@ -289,23 +275,9 @@ class JobScheduler(ApiCaller):
return success
def run_once(self) -> bool:
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection(pool_timeout=1)
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True, pool_timeout=1)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True, pool_timeout=1)
self.db.readonly = True
if self.db.readonly:
self.__logger.error("Database is in read-only mode, jobs will not be executed")
return True
err = self.try_database_readonly()
if err:
return True
threads = []
self.__job_success = True
@ -330,23 +302,9 @@ class JobScheduler(ApiCaller):
return ret
def run_single(self, job_name: str) -> bool:
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection(pool_timeout=1)
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True, pool_timeout=1)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True, pool_timeout=1)
self.db.readonly = True
if self.db.readonly:
self.__logger.error("Database is in read-only mode, jobs will not be executed")
return True
err = self.try_database_readonly()
if err:
return True
if self.__lock:
self.__lock.acquire()
@ -392,3 +350,31 @@ class JobScheduler(ApiCaller):
self.__logger.error(f"Exception while reloading scheduler {format_exc()}")
return False
return ret
def try_database_readonly(self) -> bool:
if not self.db.readonly:
try:
self.db.test_write()
except BaseException:
self.db.readonly = True
return True
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection(pool_timeout=1)
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True, pool_timeout=1)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True, pool_timeout=1)
self.db.readonly = True
if self.db.readonly:
self.__logger.error("Database is in read-only mode, jobs will not be executed")
return True
return self.db.readonly

View file

@ -14,7 +14,7 @@ from signal import SIGINT, SIGTERM, signal, SIGHUP
from stat import S_IEXEC
from subprocess import run as subprocess_run, DEVNULL, STDOUT, PIPE
from sys import path as sys_path
from tarfile import open as tar_open
from tarfile import TarFile, open as tar_open
from threading import Thread
from time import sleep
from traceback import format_exc
@ -236,12 +236,15 @@ def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]], *, origin
logger.error(f"Sending {'pro ' if pro else ''}external plugins failed, configuration will not work as expected...")
def generate_caches(plugins: List[Any], db: Database):
def generate_caches(plugins: List[Dict[str, Any]]):
assert SCHEDULER is not None
for plugin in plugins:
job_cache_files = db.get_jobs_cache_files(plugin_id=plugin["id"])
job_cache_files = SCHEDULER.db.get_jobs_cache_files(plugin_id=plugin["id"])
plugin_cache_files = set()
ignored_dirs = set()
job_path = Path(sep, "var", "cache", "bunkerweb", plugin["id"])
for job_cache_file in job_cache_files:
cache_path = job_path.joinpath(job_cache_file["service_id"] or "", job_cache_file["file_name"])
plugin_cache_files.add(cache_path)
@ -255,22 +258,28 @@ def generate_caches(plugins: List[Any], db: Database):
rmtree(extract_path, ignore_errors=True)
extract_path.mkdir(parents=True, exist_ok=True)
with tar_open(fileobj=BytesIO(job_cache_file["data"]), mode="r:gz") as tar:
assert isinstance(tar, TarFile)
try:
tar.extractall(extract_path, filter="fully_trusted")
except TypeError:
tar.extractall(extract_path)
else:
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_bytes(job_cache_file["data"])
for member in tar.getmembers():
try:
tar.extract(member, path=extract_path)
except Exception as e:
logger.error(f"Error extracting {member.name}: {e}")
except Exception as e:
logger.error(f"Error extracting tar file: {e}")
logger.debug(f"Restored cache directory {extract_path}")
continue
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_bytes(job_cache_file["data"])
logger.debug(f"Restored cache file {job_cache_file['file_name']}")
except BaseException as e:
logger.error(f"Exception while restoring cache file {job_cache_file['file_name']} :\n{e}")
if job_path.is_dir():
for file in job_path.rglob("*"):
skipped = False
if file.as_posix().startswith(tuple(ignored_dirs)):
skipped = True
if skipped:
continue
logger.debug(f"Checking if {file} should be removed")
if file not in plugin_cache_files and file.is_file():
logger.debug(f"Removing non-cached file {file}")
@ -318,7 +327,7 @@ def run_in_slave_mode():
generate_custom_configs(SCHEDULER.db.get_custom_configs())
# Download caches
generate_caches(pro_plugins + external_plugins, SCHEDULER.db)
generate_caches(pro_plugins + external_plugins)
# Gen config
content = ""
@ -634,6 +643,8 @@ if __name__ == "__main__":
logger.error("At least one job in run_once() failed")
else:
logger.info("All jobs in run_once() were successful")
if SCHEDULER.db.readonly:
generate_caches(SCHEDULER.db.get_plugins())
if CONFIG_NEED_GENERATION:
content = ""