Fix and optimize database connection fallback logic in JobScheduler and UI

This commit is contained in:
Théophile Diot 2024-05-22 15:09:31 +01:00
parent 5ee348c4a3
commit 3437d8610e
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
3 changed files with 58 additions and 58 deletions

View file

@ -74,6 +74,7 @@ class Database:
"""Initialize the database"""
self.logger = logger
self.readonly = False
self.last_fallback = None
if pool:
self.logger.warning("The pool parameter is deprecated, it will be removed in the next version")
@ -168,9 +169,11 @@ class Database:
if not self.readonly:
self.logger.warning("The database is read-only, trying one last time to connect in read-only mode")
self.readonly = True
self.last_fallback = datetime.now()
elif self.database_uri_readonly and sqlalchemy_string != self.database_uri_readonly:
self.logger.warning("Can't connect to the database in read-only mode, falling back to read-only one")
sqlalchemy_string = self.database_uri_readonly
self.last_fallback = datetime.now()
else:
self.logger.error(f"Can't connect to database : {format_exc()}")
_exit(1)
@ -206,7 +209,7 @@ class Database:
if self.sql_engine:
self.sql_engine.dispose()
def retry_connection(self, *, readonly: bool = False, fallback: bool = False) -> None:
def retry_connection(self, *, readonly: bool = False, fallback: bool = False, **kwargs) -> None:
"""Retry the connection to the database"""
assert self.sql_engine is not None
@ -215,7 +218,7 @@ class Database:
raise ValueError("The fallback parameter is set to True but the read-only database URI is not set")
self.sql_engine.dispose(close=True)
self.sql_engine = create_engine(self.database_uri_readonly if fallback else self.database_uri, **self._engine_kwargs)
self.sql_engine = create_engine(self.database_uri_readonly if fallback else self.database_uri, **self._engine_kwargs | kwargs)
if fallback or readonly:
with self.sql_engine.connect() as conn:
@ -234,47 +237,51 @@ class Database:
self.logger.error("The database engine is not initialized")
_exit(1)
if self.database_uri and self.readonly:
if self.database_uri and self.readonly and self.last_fallback and (datetime.now() - self.last_fallback).total_seconds() > 30:
# ? If the database is forced to be read-only, we try to connect as a non read-only user every time until the database is writable
try:
self.retry_connection()
self.retry_connection(pool_timeout=1)
self.readonly = False
self.logger.info("The database is no longer read-only, defaulting to read-write mode")
except (OperationalError, DatabaseError):
try:
self.retry_connection(readonly=True)
self.retry_connection(readonly=True, pool_timeout=1)
except (OperationalError, DatabaseError):
if self.database_uri_readonly:
with suppress(OperationalError, DatabaseError):
self.retry_connection(fallback=True)
self.retry_connection(fallback=True, pool_timeout=1)
self.readonly = True
with self.sql_engine.connect() as conn:
session_factory = sessionmaker(bind=conn, autoflush=True, expire_on_commit=False)
session = scoped_session(session_factory)
try:
session = None
try:
with self.sql_engine.connect() as conn:
session_factory = sessionmaker(bind=conn, autoflush=True, expire_on_commit=False)
session = scoped_session(session_factory)
yield session
except BaseException as e:
except BaseException as e:
if session:
session.rollback()
if "attempt to write a readonly database" in str(e):
self.logger.warning("The database is read-only, retrying in read-only mode ...")
try:
self.retry_connection(readonly=True)
except (OperationalError, DatabaseError):
if self.database_uri_readonly:
self.logger.warning("Can't connect to the database in read-only mode, falling back to read-only one")
with suppress(OperationalError, DatabaseError):
self.retry_connection(fallback=True)
if "attempt to write a readonly database" in str(e):
self.logger.warning("The database is read-only, retrying in read-only mode ...")
try:
self.retry_connection(readonly=True, pool_timeout=1)
except (OperationalError, DatabaseError):
if self.database_uri_readonly:
self.logger.warning("Can't connect to the database in read-only mode, falling back to read-only one")
with suppress(OperationalError, DatabaseError):
self.retry_connection(fallback=True, pool_timeout=1)
self.readonly = True
self.last_fallback = datetime.now()
elif isinstance(e, (ConnectionRefusedError, OperationalError)) and self.database_uri_readonly:
self.logger.warning("Can't connect to the database, falling back to read-only one ...")
with suppress(OperationalError, DatabaseError):
self.retry_connection(fallback=True, pool_timeout=1)
self.readonly = True
elif isinstance(e, ConnectionRefusedError) and self.database_uri_readonly:
self.logger.warning("Can't connect to the database, falling back to read-only one ...")
with suppress(OperationalError, DatabaseError):
self.retry_connection(fallback=True)
raise
finally:
self.last_fallback = datetime.now()
raise
finally:
if session:
session.remove()
def set_autoconf_load(self, value: bool = True) -> str:

View file

@ -226,33 +226,36 @@ class JobScheduler(ApiCaller):
self.__logger.error(f"Exception while scheduling jobs for plugin {plugin} : {format_exc()}")
def run_pending(self) -> bool:
threads = []
for job in schedule_jobs:
if not job.should_run:
continue
threads.append(Thread(target=self.__run_in_thread, args=((job.run,),)))
if not threads:
return True
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection()
self.db.retry_connection(pool_timeout=5)
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True)
self.db.retry_connection(readonly=True, pool_timeout=5)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True)
self.db.retry_connection(fallback=True, pool_timeout=5)
self.db.readonly = True
if self.db.readonly:
self.__logger.error("Database is in read-only mode, jobs will not be executed")
return True
threads = []
self.__job_success = True
self.__job_reload = False
for job in schedule_jobs:
if not job.should_run:
continue
threads.append(Thread(target=self.__run_in_thread, args=((job.run,),)))
for thread in threads:
thread.start()
@ -288,16 +291,16 @@ class JobScheduler(ApiCaller):
def run_once(self) -> bool:
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection()
self.db.retry_connection(pool_timeout=1)
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True)
self.db.retry_connection(readonly=True, pool_timeout=1)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True)
self.db.retry_connection(fallback=True, pool_timeout=1)
self.db.readonly = True
if self.db.readonly:
@ -329,16 +332,16 @@ class JobScheduler(ApiCaller):
def run_single(self, job_name: str) -> bool:
if self.db.database_uri and self.db.readonly:
try:
self.db.retry_connection()
self.db.retry_connection(pool_timeout=1)
self.db.readonly = False
self.__logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
self.db.retry_connection(readonly=True)
self.db.retry_connection(readonly=True, pool_timeout=1)
except BaseException:
if self.db.database_uri_readonly:
with suppress(BaseException):
self.db.retry_connection(fallback=True)
self.db.retry_connection(fallback=True, pool_timeout=1)
self.db.readonly = True
if self.db.readonly:

View file

@ -392,20 +392,6 @@ def inject_variables():
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
if db.database_uri and db.readonly:
try:
db.retry_connection()
db.readonly = False
app.logger.info("The database is no longer read-only, defaulting to read-write mode")
except BaseException:
try:
db.retry_connection(readonly=True)
except BaseException:
if db.database_uri_readonly:
with suppress(BaseException):
db.retry_connection(fallback=True)
db.readonly = True
# check that is value is in tuple
return dict(
script_nonce=app.config["SCRIPT_NONCE"],
@ -464,7 +450,11 @@ def handle_csrf_error(_):
@app.before_request
def before_request():
db_user = db.get_ui_user()
try:
db_user = db.get_ui_user()
except BaseException:
db_user = db.get_ui_user()
if db_user:
app.config["USER"] = User(**db_user)