mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
chore: Refactor generate_caches function + Add a write test on the database before running the jobs on the scheduler
This commit is contained in:
parent
f4f68bf635
commit
7118272b9e
4 changed files with 111 additions and 114 deletions
|
|
@ -12,7 +12,6 @@ from re import compile as re_compile
|
|||
from sys import argv, path as sys_path
|
||||
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
|
||||
from time import sleep
|
||||
from traceback import format_exc
|
||||
from zipfile import ZIP_DEFLATED, ZipFile
|
||||
|
||||
from model import (
|
||||
|
|
@ -136,8 +135,8 @@ class Database:
|
|||
except ArgumentError:
|
||||
self.logger.error(f"Invalid database URI: {sqlalchemy_string}")
|
||||
error = True
|
||||
except SQLAlchemyError:
|
||||
self.logger.error(f"Error when trying to create the engine: {format_exc()}")
|
||||
except SQLAlchemyError as e:
|
||||
self.logger.error(f"Error when trying to create the engine: {e}")
|
||||
error = True
|
||||
finally:
|
||||
if error:
|
||||
|
|
@ -175,10 +174,10 @@ class Database:
|
|||
sqlalchemy_string = self.database_uri_readonly
|
||||
self.last_fallback = datetime.now()
|
||||
else:
|
||||
self.logger.error(f"Can't connect to database : {format_exc()}")
|
||||
self.logger.error(f"Can't connect to database : {e}")
|
||||
_exit(1)
|
||||
else:
|
||||
self.logger.error(f"Can't connect to database : {format_exc()}")
|
||||
self.logger.error(f"Can't connect to database : {e}")
|
||||
_exit(1)
|
||||
|
||||
if "attempt to write a readonly database" in str(e):
|
||||
|
|
@ -193,8 +192,8 @@ class Database:
|
|||
self.logger.warning("Can't connect to database, retrying in 5 seconds ...")
|
||||
retries -= 1
|
||||
sleep(5)
|
||||
except BaseException:
|
||||
self.logger.error(f"Error when trying to connect to the database: {format_exc()}")
|
||||
except BaseException as e:
|
||||
self.logger.error(f"Error when trying to connect to the database: {e}")
|
||||
exit(1)
|
||||
|
||||
self.suffix_rx = re_compile(r"_\d+$")
|
||||
|
|
@ -209,6 +208,12 @@ class Database:
|
|||
if self.sql_engine:
|
||||
self.sql_engine.dispose()
|
||||
|
||||
def test_write(self):
|
||||
"""Test the write access to the database"""
|
||||
with self.__db_session() as session:
|
||||
session.execute(text("CREATE TABLE IF NOT EXISTS test (id INT)"))
|
||||
session.execute(text("DROP TABLE test"))
|
||||
|
||||
def retry_connection(self, *, readonly: bool = False, fallback: bool = False, **kwargs) -> None:
|
||||
"""Retry the connection to the database"""
|
||||
|
||||
|
|
@ -298,8 +303,8 @@ class Database:
|
|||
|
||||
metadata.autoconf_loaded = value
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -326,8 +331,8 @@ class Database:
|
|||
|
||||
metadata.scheduler_first_start = value
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -346,8 +351,8 @@ class Database:
|
|||
for key, value in data.items():
|
||||
setattr(metadata, key, value)
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -401,8 +406,8 @@ class Database:
|
|||
)
|
||||
)
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -455,8 +460,8 @@ class Database:
|
|||
"default": False,
|
||||
}
|
||||
)
|
||||
except BaseException:
|
||||
self.logger.debug(f"Can't get the metadata: {format_exc()}")
|
||||
except BaseException as e:
|
||||
self.logger.debug(f"Can't get the metadata: {e}")
|
||||
|
||||
return data
|
||||
|
||||
|
|
@ -484,8 +489,8 @@ class Database:
|
|||
config_changed=metadata is not None and metadata.config_changed,
|
||||
instances_changed=metadata is not None and metadata.instances_changed,
|
||||
)
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
def checked_changes(self, changes: Optional[List[str]] = None, value: Optional[bool] = False) -> str:
|
||||
"""Set changed bit for config, custom configs, instances and plugins"""
|
||||
|
|
@ -519,8 +524,8 @@ class Database:
|
|||
if "instances" in changes:
|
||||
metadata.instances_changed = value
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -574,8 +579,8 @@ class Database:
|
|||
|
||||
try:
|
||||
Base.metadata.create_all(self.sql_engine, checkfirst=True)
|
||||
except BaseException:
|
||||
return False, format_exc()
|
||||
except BaseException as e:
|
||||
return False, str(e)
|
||||
|
||||
if db_version and db_version != bunkerweb_version:
|
||||
with self.__db_session() as session:
|
||||
|
|
@ -941,8 +946,8 @@ class Database:
|
|||
try:
|
||||
session.add_all(to_put)
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return False, format_exc()
|
||||
except BaseException as e:
|
||||
return False, str(e)
|
||||
|
||||
return True, ""
|
||||
|
||||
|
|
@ -1177,8 +1182,8 @@ class Database:
|
|||
try:
|
||||
session.add_all(to_put)
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -1270,8 +1275,8 @@ class Database:
|
|||
try:
|
||||
session.add_all(to_put)
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return f"{f'{message}{endl}' if message else ''}{format_exc()}"
|
||||
except BaseException as e:
|
||||
return f"{f'{message}{endl}' if message else ''}{e}"
|
||||
|
||||
return message
|
||||
|
||||
|
|
@ -1426,8 +1431,8 @@ class Database:
|
|||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -1445,8 +1450,8 @@ class Database:
|
|||
|
||||
try:
|
||||
session.query(Jobs_cache).filter_by(**filters).delete()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -1486,8 +1491,8 @@ class Database:
|
|||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -1998,8 +2003,8 @@ class Database:
|
|||
try:
|
||||
session.add_all(to_put)
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -2212,8 +2217,8 @@ class Database:
|
|||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return f"An error occurred while adding the instance {hostname} (port: {port}, server name: {server_name}).\n{format_exc()}"
|
||||
except BaseException as e:
|
||||
return f"An error occurred while adding the instance {hostname} (port: {port}, server name: {server_name}).\n{e}"
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -2244,8 +2249,8 @@ class Database:
|
|||
try:
|
||||
session.add_all(to_put)
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -2324,8 +2329,8 @@ class Database:
|
|||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
||||
|
|
@ -2349,7 +2354,7 @@ class Database:
|
|||
|
||||
try:
|
||||
session.commit()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
except BaseException as e:
|
||||
return str(e)
|
||||
|
||||
return ""
|
||||
|
|
|
|||
|
|
@ -103,12 +103,7 @@ class Job:
|
|||
with LOCK:
|
||||
if not manual and self.job_path.is_dir():
|
||||
for file in self.job_path.rglob("*"):
|
||||
skipped = False
|
||||
if file.as_posix().startswith(tuple(ignored_dirs)):
|
||||
skipped = True
|
||||
break
|
||||
|
||||
if skipped:
|
||||
continue
|
||||
|
||||
self.logger.debug(f"Checking if {file} should be removed")
|
||||
|
|
|
|||
|
|
@ -235,23 +235,9 @@ class JobScheduler(ApiCaller):
|
|||
if not threads:
|
||||
return True
|
||||
|
||||
if self.db.database_uri and self.db.readonly:
|
||||
try:
|
||||
self.db.retry_connection(pool_timeout=5)
|
||||
self.db.readonly = False
|
||||
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
|
||||
except BaseException:
|
||||
try:
|
||||
self.db.retry_connection(readonly=True, pool_timeout=5)
|
||||
except BaseException:
|
||||
if self.db.database_uri_readonly:
|
||||
with suppress(BaseException):
|
||||
self.db.retry_connection(fallback=True, pool_timeout=5)
|
||||
self.db.readonly = True
|
||||
|
||||
if self.db.readonly:
|
||||
self.__logger.error("Database is in read-only mode, jobs will not be executed")
|
||||
return True
|
||||
err = self.try_database_readonly()
|
||||
if err:
|
||||
return True
|
||||
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
|
|
@ -289,23 +275,9 @@ class JobScheduler(ApiCaller):
|
|||
return success
|
||||
|
||||
def run_once(self) -> bool:
|
||||
if self.db.database_uri and self.db.readonly:
|
||||
try:
|
||||
self.db.retry_connection(pool_timeout=1)
|
||||
self.db.readonly = False
|
||||
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
|
||||
except BaseException:
|
||||
try:
|
||||
self.db.retry_connection(readonly=True, pool_timeout=1)
|
||||
except BaseException:
|
||||
if self.db.database_uri_readonly:
|
||||
with suppress(BaseException):
|
||||
self.db.retry_connection(fallback=True, pool_timeout=1)
|
||||
self.db.readonly = True
|
||||
|
||||
if self.db.readonly:
|
||||
self.__logger.error("Database is in read-only mode, jobs will not be executed")
|
||||
return True
|
||||
err = self.try_database_readonly()
|
||||
if err:
|
||||
return True
|
||||
|
||||
threads = []
|
||||
self.__job_success = True
|
||||
|
|
@ -330,23 +302,9 @@ class JobScheduler(ApiCaller):
|
|||
return ret
|
||||
|
||||
def run_single(self, job_name: str) -> bool:
|
||||
if self.db.database_uri and self.db.readonly:
|
||||
try:
|
||||
self.db.retry_connection(pool_timeout=1)
|
||||
self.db.readonly = False
|
||||
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
|
||||
except BaseException:
|
||||
try:
|
||||
self.db.retry_connection(readonly=True, pool_timeout=1)
|
||||
except BaseException:
|
||||
if self.db.database_uri_readonly:
|
||||
with suppress(BaseException):
|
||||
self.db.retry_connection(fallback=True, pool_timeout=1)
|
||||
self.db.readonly = True
|
||||
|
||||
if self.db.readonly:
|
||||
self.__logger.error("Database is in read-only mode, jobs will not be executed")
|
||||
return True
|
||||
err = self.try_database_readonly()
|
||||
if err:
|
||||
return True
|
||||
|
||||
if self.__lock:
|
||||
self.__lock.acquire()
|
||||
|
|
@ -392,3 +350,31 @@ class JobScheduler(ApiCaller):
|
|||
self.__logger.error(f"Exception while reloading scheduler {format_exc()}")
|
||||
return False
|
||||
return ret
|
||||
|
||||
def try_database_readonly(self) -> bool:
|
||||
if not self.db.readonly:
|
||||
try:
|
||||
self.db.test_write()
|
||||
except BaseException:
|
||||
self.db.readonly = True
|
||||
return True
|
||||
|
||||
if self.db.database_uri and self.db.readonly:
|
||||
try:
|
||||
self.db.retry_connection(pool_timeout=1)
|
||||
self.db.readonly = False
|
||||
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
|
||||
except BaseException:
|
||||
try:
|
||||
self.db.retry_connection(readonly=True, pool_timeout=1)
|
||||
except BaseException:
|
||||
if self.db.database_uri_readonly:
|
||||
with suppress(BaseException):
|
||||
self.db.retry_connection(fallback=True, pool_timeout=1)
|
||||
self.db.readonly = True
|
||||
|
||||
if self.db.readonly:
|
||||
self.__logger.error("Database is in read-only mode, jobs will not be executed")
|
||||
return True
|
||||
|
||||
return self.db.readonly
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ from signal import SIGINT, SIGTERM, signal, SIGHUP
|
|||
from stat import S_IEXEC
|
||||
from subprocess import run as subprocess_run, DEVNULL, STDOUT, PIPE
|
||||
from sys import path as sys_path
|
||||
from tarfile import open as tar_open
|
||||
from tarfile import TarFile, open as tar_open
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
from traceback import format_exc
|
||||
|
|
@ -236,12 +236,15 @@ def generate_external_plugins(plugins: Optional[List[Dict[str, Any]]], *, origin
|
|||
logger.error(f"Sending {'pro ' if pro else ''}external plugins failed, configuration will not work as expected...")
|
||||
|
||||
|
||||
def generate_caches(plugins: List[Any], db: Database):
|
||||
def generate_caches(plugins: List[Dict[str, Any]]):
|
||||
assert SCHEDULER is not None
|
||||
|
||||
for plugin in plugins:
|
||||
job_cache_files = db.get_jobs_cache_files(plugin_id=plugin["id"])
|
||||
job_cache_files = SCHEDULER.db.get_jobs_cache_files(plugin_id=plugin["id"])
|
||||
plugin_cache_files = set()
|
||||
ignored_dirs = set()
|
||||
job_path = Path(sep, "var", "cache", "bunkerweb", plugin["id"])
|
||||
|
||||
for job_cache_file in job_cache_files:
|
||||
cache_path = job_path.joinpath(job_cache_file["service_id"] or "", job_cache_file["file_name"])
|
||||
plugin_cache_files.add(cache_path)
|
||||
|
|
@ -255,22 +258,28 @@ def generate_caches(plugins: List[Any], db: Database):
|
|||
rmtree(extract_path, ignore_errors=True)
|
||||
extract_path.mkdir(parents=True, exist_ok=True)
|
||||
with tar_open(fileobj=BytesIO(job_cache_file["data"]), mode="r:gz") as tar:
|
||||
assert isinstance(tar, TarFile)
|
||||
try:
|
||||
tar.extractall(extract_path, filter="fully_trusted")
|
||||
except TypeError:
|
||||
tar.extractall(extract_path)
|
||||
else:
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_path.write_bytes(job_cache_file["data"])
|
||||
for member in tar.getmembers():
|
||||
try:
|
||||
tar.extract(member, path=extract_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting {member.name}: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting tar file: {e}")
|
||||
logger.debug(f"Restored cache directory {extract_path}")
|
||||
continue
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_path.write_bytes(job_cache_file["data"])
|
||||
logger.debug(f"Restored cache file {job_cache_file['file_name']}")
|
||||
except BaseException as e:
|
||||
logger.error(f"Exception while restoring cache file {job_cache_file['file_name']} :\n{e}")
|
||||
|
||||
if job_path.is_dir():
|
||||
for file in job_path.rglob("*"):
|
||||
skipped = False
|
||||
if file.as_posix().startswith(tuple(ignored_dirs)):
|
||||
skipped = True
|
||||
if skipped:
|
||||
continue
|
||||
|
||||
logger.debug(f"Checking if {file} should be removed")
|
||||
if file not in plugin_cache_files and file.is_file():
|
||||
logger.debug(f"Removing non-cached file {file}")
|
||||
|
|
@ -318,7 +327,7 @@ def run_in_slave_mode():
|
|||
generate_custom_configs(SCHEDULER.db.get_custom_configs())
|
||||
|
||||
# Download caches
|
||||
generate_caches(pro_plugins + external_plugins, SCHEDULER.db)
|
||||
generate_caches(pro_plugins + external_plugins)
|
||||
|
||||
# Gen config
|
||||
content = ""
|
||||
|
|
@ -634,6 +643,8 @@ if __name__ == "__main__":
|
|||
logger.error("At least one job in run_once() failed")
|
||||
else:
|
||||
logger.info("All jobs in run_once() were successful")
|
||||
if SCHEDULER.db.readonly:
|
||||
generate_caches(SCHEDULER.db.get_plugins())
|
||||
|
||||
if CONFIG_NEED_GENERATION:
|
||||
content = ""
|
||||
|
|
|
|||
Loading…
Reference in a new issue