Refactor Database to avoid unnecessary functions

This commit is contained in:
Théophile Diot 2024-05-22 11:56:53 +01:00
parent 28aefbba3f
commit fb5d56a071
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
19 changed files with 281 additions and 413 deletions

View file

@ -1,5 +1,7 @@
#!/usr/bin/env python3
from contextlib import suppress
from datetime import datetime
from os import getenv
from time import sleep
from copy import deepcopy
@ -62,20 +64,24 @@ class Config(ConfigCaller):
return False
def wait_applying(self):
i = 0
while i < 10:
curr_changes = self._db.check_changes()
if isinstance(curr_changes, str):
self.__logger.error(f"An error occurred when checking for changes in the database : {curr_changes}")
elif not any(curr_changes.values()):
break
current_time = datetime.now()
ready = False
while not ready and (datetime.now() - current_time).seconds < 240:
db_metadata = self._db.get_metadata()
if isinstance(db_metadata, str):
self.__logger.error(f"An error occurred when checking for changes in the database : {db_metadata}")
elif not any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "config_changed", "instances_changed")
):
ready = True
continue
else:
self.__logger.warning(
"Scheduler is already applying a configuration, retrying in 5 seconds ...",
)
i += 1
self.__logger.warning("Scheduler is already applying a configuration, retrying in 5 seconds ...")
sleep(5)
if i >= 10:
if not ready:
raise Exception("Too many retries while waiting for scheduler to apply configuration...")
def apply(self, instances, services, configs={}, first=False) -> bool:
@ -119,24 +125,28 @@ class Config(ConfigCaller):
}
)
while not self._db.is_initialized():
self.__logger.warning(
"Database is not initialized, retrying in 5 seconds ...",
)
sleep(5)
current_time = datetime.now()
ready = False
while not ready and (datetime.now() - current_time).seconds < 120:
db_metadata = self._db.get_metadata()
if isinstance(db_metadata, str) or not db_metadata["is_initialized"]:
self.__logger.warning("Database is not initialized, retrying in 5s ...")
sleep(5)
continue
ready = True
if not ready:
self.__logger.error(f"Timeout while waiting for database to be initialized, ignoring changes ...\ndb data: {db_metadata}")
return False
# wait until changes are applied
while True:
curr_changes = self._db.check_changes()
if isinstance(curr_changes, str):
self.__logger.error(f"An error occurred when checking for changes in the database : {curr_changes}")
elif not any(curr_changes.values()):
break
else:
self.__logger.warning(
"Scheduler is already applying a configuration, retrying in 5 seconds ...",
)
sleep(5)
ready = False
with suppress(BaseException):
self.wait_applying()
ready = True
if not ready:
self.__logger.warning("Timeout while waiting for scheduler to apply configuration, continuing anyway...")
# update instances in database
if "instances" in changes:

View file

@ -72,11 +72,9 @@ class Controller(Config):
def _set_autoconf_load_db(self):
if not self._loaded:
ret = self._db.set_autoconf_load(True)
ret = self._db.set_metadata({"autoconf_loaded": True})
if ret:
self._logger.warning(
f"Can't set autoconf loaded metadata to true in database: {ret}",
)
self._logger.warning(f"Can't set autoconf loaded metadata to true in database: {ret}")
else:
self._loaded = True

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from contextlib import suppress
from time import sleep
from traceback import format_exc
from typing import List
@ -272,41 +273,29 @@ class IngressController(Controller):
error = False
try:
for event in w.stream(what):
self.__internal_lock.acquire()
locked = True
if not self.__process_event(event):
self.__internal_lock.release()
locked = False
continue
self.wait_applying()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not self.update_needed(self._instances, self._services, configs=self._configs):
self.__internal_lock.release()
locked = False
continue
self._logger.info(
f"Caught kubernetes event ({watch_type}), deploying new configuration ...",
)
try:
ret = self.apply_config()
if not ret:
self._logger.error(
"Error while deploying new configuration ...",
)
else:
self._logger.info(
"Successfully deployed new configuration 🚀",
)
self._set_autoconf_load_db()
except:
self._logger.error(
f"Exception while deploying new configuration :\n{format_exc()}",
)
self.__internal_lock.release()
with self.__internal_lock:
locked = True
if not self.__process_event(event):
locked = False
continue
self.wait_applying()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not self.update_needed(self._instances, self._services, configs=self._configs):
locked = False
continue
self._logger.info(f"Caught kubernetes event ({watch_type}), deploying new configuration ...")
try:
ret = self.apply_config()
if not ret:
self._logger.error("Error while deploying new configuration ...")
else:
self._logger.info("Successfully deployed new configuration 🚀")
self._set_autoconf_load_db()
except:
self._logger.error(f"Exception while deploying new configuration :\n{format_exc()}")
locked = False
except ApiException as e:
if e.status != 410:
@ -321,7 +310,8 @@ class IngressController(Controller):
error = True
finally:
if locked:
self.__internal_lock.release()
with suppress(BaseException):
self.__internal_lock.release()
locked = False
if error is True:

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from contextlib import suppress
from time import sleep
from traceback import format_exc
from threading import Thread, Lock
@ -150,33 +151,28 @@ class SwarmController(Controller):
error = False
try:
for event in self.__client.events(decode=True, filters={"type": event_type}):
self.__internal_lock.acquire()
locked = True
if not self.__process_event(event):
self.__internal_lock.release()
locked = False
continue
try:
self.wait_applying()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not self.update_needed(self._instances, self._services, configs=self._configs):
self.__internal_lock.release()
with self.__internal_lock:
locked = True
if not self.__process_event(event):
locked = False
continue
self._logger.info(f"Caught Swarm event ({event_type}), deploying new configuration ...")
if not self.apply_config():
self._logger.error("Error while deploying new configuration")
else:
self._logger.info(
"Successfully deployed new configuration 🚀",
)
self._set_autoconf_load_db()
except:
self._logger.error(f"Exception while processing Swarm event ({event_type}) :\n{format_exc()}")
self.__internal_lock.release()
try:
self.wait_applying()
self._update_settings()
self._instances = self.get_instances()
self._services = self.get_services()
self._configs = self.get_configs()
if not self.update_needed(self._instances, self._services, configs=self._configs):
locked = False
continue
self._logger.info(f"Caught Swarm event ({event_type}), deploying new configuration ...")
if not self.apply_config():
self._logger.error("Error while deploying new configuration")
else:
self._logger.info("Successfully deployed new configuration 🚀")
self._set_autoconf_load_db()
except:
self._logger.error(f"Exception while processing Swarm event ({event_type}) :\n{format_exc()}")
locked = False
except:
self._logger.error(
@ -185,7 +181,8 @@ class SwarmController(Controller):
error = True
finally:
if locked:
self.__internal_lock.release()
with suppress(BaseException):
self.__internal_lock.release()
locked = False
if error is True:
self._logger.warning("Got exception, retrying in 10 seconds ...")

View file

@ -6,7 +6,6 @@ from os import getenv, sep
from os.path import join
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",), ("core", "backup"))]:
@ -48,10 +47,9 @@ try:
LOGGER.info(f"Backup already done within the last {backup_period} period, skipping backup ...")
sys_exit(0)
with Lock():
is_scheduler_first_start = JOB.db.is_scheduler_first_start()
db_metadata = JOB.db.get_metadata()
if is_scheduler_first_start:
if isinstance(db_metadata, str) or db_metadata["scheduler_first_start"]:
LOGGER.info("First start of the scheduler, skipping backup ...")
sys_exit(0)

View file

@ -7,7 +7,6 @@ from os.path import join
from pathlib import Path
from stat import S_IEXEC
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from uuid import uuid4
from json import JSONDecodeError, load as json_load, loads
from shutil import copytree, rmtree
@ -198,14 +197,11 @@ try:
external_plugins.append(plugin_data)
external_plugins_ids.append(plugin_data["id"])
lock = Lock()
for plugin in db.get_plugins(_type="external", with_data=True):
if plugin["method"] != "scheduler" and plugin["id"] not in external_plugins_ids:
external_plugins.append(plugin)
with lock:
err = db.update_external_plugins(external_plugins)
err = db.update_external_plugins(external_plugins)
if err:
LOGGER.error(f"Couldn't update external plugins to database: {err}")

View file

@ -7,7 +7,6 @@ from os import getenv, sep
from os.path import join
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
from typing import Optional
@ -24,7 +23,6 @@ from jobs import Job # type: ignore
LOGGER = setup_logger("JOBS.mmdb-asn", getenv("LOG_LEVEL", "INFO"))
status = 0
LOCK = Lock()
def request_mmdb() -> Optional[Response]:

View file

@ -7,7 +7,6 @@ from os import getenv, sep
from os.path import join
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
from typing import Optional
@ -24,7 +23,6 @@ from jobs import Job # type: ignore
LOGGER = setup_logger("JOBS.mmdb-country", getenv("LOG_LEVEL", "INFO"))
status = 0
LOCK = Lock()
def request_mmdb() -> Optional[Response]:

View file

@ -4,7 +4,6 @@ from os import getenv, sep
from os.path import join
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
@ -30,10 +29,8 @@ try:
# Cluster case
if integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
db = Database(LOGGER, sqlalchemy_string=getenv("DATABASE_URI", None))
lock = Lock()
with lock:
instances = db.get_instances()
instances = db.get_instances()
LOGGER.info(f"Sending challenge to {len(instances)} instances")
for instance in instances:

View file

@ -4,7 +4,6 @@ from os import getenv, sep
from os.path import join
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
@ -29,9 +28,7 @@ try:
# Cluster case
if integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
db = Database(LOGGER, sqlalchemy_string=getenv("DATABASE_URI", None))
lock = Lock()
with lock:
instances = db.get_instances()
instances = db.get_instances()
LOGGER.info(f"Cleaning challenge from {len(instances)} instances")
for instance in instances:

View file

@ -6,7 +6,6 @@ from os.path import join
from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from tarfile import open as tar_open
from threading import Lock
from traceback import format_exc
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
@ -38,10 +37,8 @@ try:
files = {"archive.tar.gz": tgz}
db = Database(LOGGER, sqlalchemy_string=getenv("DATABASE_URI", None))
lock = Lock()
with lock:
instances = db.get_instances()
instances = db.get_instances()
for instance in instances:
endpoint = f"http://{instance['hostname']}:{instance['port']}"

View file

@ -8,7 +8,6 @@ from re import MULTILINE, search
from shutil import rmtree
from sys import exit as sys_exit, path as sys_path
from tarfile import open as tar_open
from threading import Lock
from traceback import format_exc
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("db",))]:
@ -22,7 +21,6 @@ from jobs import Job # type: ignore
LOGGER = setup_logger("MODSECURITY.coreruleset-nightly", getenv("LOG_LEVEL", "INFO"))
status = 0
LOCK = Lock()
CRS_PATH = Path(sep, "var", "cache", "bunkerweb", "modsecurity", "crs")

View file

@ -8,7 +8,6 @@ from os.path import join
from pathlib import Path
from stat import S_IEXEC
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from uuid import uuid4
from json import JSONDecodeError, load as json_load, loads
from shutil import copytree, rmtree
@ -168,7 +167,7 @@ try:
default_metadata["last_pro_check"] = current_date
metadata = default_metadata | metadata
db.set_pro_metadata(metadata)
db.set_metadata(metadata)
if metadata["is_pro"] != db_metadata["is_pro"]:
clean_pro_plugins(db)
@ -194,7 +193,7 @@ try:
if clean:
metadata = default_metadata.copy()
db.set_pro_metadata(metadata)
db.set_metadata(metadata)
clean_pro_plugins(db)
else:
LOGGER.warning("Skipping the check for new or updated Pro plugins...")
@ -302,14 +301,11 @@ try:
pro_plugins.append(plugin_data)
pro_plugins_ids.append(plugin_data["id"])
lock = Lock()
for plugin in db.get_plugins(_type="pro", with_data=True):
if plugin["method"] != "scheduler" and plugin["id"] not in pro_plugins_ids:
pro_plugins.append(plugin)
with lock:
err = db.update_external_plugins(pro_plugins, _type="pro")
err = db.update_external_plugins(pro_plugins, _type="pro")
if err:
LOGGER.error(f"Couldn't update Pro plugins to database: {err}")

View file

@ -10,6 +10,7 @@ from os.path import join
from pathlib import Path
from re import compile as re_compile
from sys import argv, path as sys_path
from threading import Lock
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from time import sleep
from traceback import format_exc
@ -55,6 +56,8 @@ from sqlite3 import Connection as SQLiteConnection
install_as_MySQLdb()
LOCK = Lock()
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, _):
@ -235,119 +238,26 @@ class Database:
with suppress(OperationalError, DatabaseError):
self.retry_connection()
with self.sql_engine.connect() as conn:
session_factory = sessionmaker(bind=conn, autoflush=True, expire_on_commit=False)
session = scoped_session(session_factory)
try:
yield session
except BaseException as e:
session.rollback()
with LOCK:
with self.sql_engine.connect() as conn:
session_factory = sessionmaker(bind=conn, autoflush=True, expire_on_commit=False)
session = scoped_session(session_factory)
try:
yield session
except BaseException as e:
session.rollback()
if self.database_uri_readonly and "attempt to write a readonly database" in str(e):
self.sql_engine.dispose(close=True)
self.sql_engine = create_engine(self.database_uri_readonly, **self._engine_kwargs)
self.fallback_readonly = True
self.readonly = True
self.logger.warning("The database is read-only, falling back to read-only mode")
return
if self.database_uri_readonly and "attempt to write a readonly database" in str(e):
self.sql_engine.dispose(close=True)
self.sql_engine = create_engine(self.database_uri_readonly, **self._engine_kwargs)
self.fallback_readonly = True
self.readonly = True
self.logger.warning("The database is read-only, falling back to read-only mode")
return
raise
finally:
session.remove()
def set_autoconf_load(self, value: bool = True) -> str:
"""Set the autoconf_loaded value"""
with self.__db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
try:
metadata = session.query(Metadata).get(1)
if not metadata:
return "The metadata are not set yet, try again"
metadata.autoconf_loaded = value
session.commit()
except BaseException:
return format_exc()
return ""
def is_autoconf_loaded(self) -> bool:
"""Check if the autoconf is loaded"""
with self.__db_session() as session:
try:
metadata = session.query(Metadata).with_entities(Metadata.autoconf_loaded).filter_by(id=1).first()
return metadata is not None and metadata.autoconf_loaded
except (ProgrammingError, OperationalError):
return False
def set_scheduler_first_start(self, value: bool = False) -> str:
"""Set the scheduler_first_start value"""
with self.__db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
try:
metadata = session.query(Metadata).get(1)
if not metadata:
return "The metadata are not set yet, try again"
metadata.scheduler_first_start = value
session.commit()
except BaseException:
return format_exc()
return ""
def set_pro_metadata(self, data: Dict[Literal["is_pro", "pro_expire", "pro_status", "pro_overlapped", "pro_services"], Any] = {}) -> str:
"""Set the pro metadata values"""
with self.__db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
try:
metadata = session.query(Metadata).get(1)
if not metadata:
return "The metadata are not set yet, try again"
for key, value in data.items():
setattr(metadata, key, value)
session.commit()
except BaseException:
return format_exc()
return ""
def is_scheduler_first_start(self) -> bool:
"""Check if it's the scheduler's first start"""
with self.__db_session() as session:
try:
metadata = session.query(Metadata).with_entities(Metadata.scheduler_first_start).filter_by(id=1).first()
return metadata is not None and metadata.scheduler_first_start
except (ProgrammingError, OperationalError):
return True
def is_first_config_saved(self) -> bool:
"""Check if the first configuration has been saved"""
with self.__db_session() as session:
try:
metadata = session.query(Metadata).with_entities(Metadata.first_config_saved).filter_by(id=1).first()
return metadata is not None and metadata.first_config_saved
except (ProgrammingError, OperationalError):
return False
def is_initialized(self) -> bool:
"""Check if the database is initialized"""
with self.__db_session() as session:
try:
metadata = session.query(Metadata).with_entities(Metadata.is_initialized).filter_by(id=1).first()
return metadata is not None and metadata.is_initialized
except (ProgrammingError, OperationalError, DatabaseError):
return False
raise
finally:
session.remove()
def initialize_db(self, version: str, integration: str = "Unknown") -> str:
"""Initialize the database"""
@ -380,16 +290,25 @@ class Database:
def get_metadata(self) -> Dict[str, str]:
"""Get the metadata from the database"""
data = {
"version": "1.5.7",
"integration": "unknown",
"database_version": "Unknown",
"is_initialized": False,
"is_pro": "no",
"pro_expire": None,
"pro_status": "invalid",
"pro_services": 0,
"pro_overlapped": False,
"pro_status": "invalid",
"last_pro_check": None,
"default": True,
"first_config_saved": False,
"autoconf_loaded": False,
"scheduler_first_start": True,
"custom_configs_changed": False,
"external_plugins_changed": False,
"pro_plugins_changed": False,
"config_changed": False,
"instances_changed": False,
"integration": "unknown",
"version": "1.5.7",
"database_version": "Unknown", # ? Extracted from the database
"default": True, # ? Extra field to know if the returned data is the default one
}
with self.__db_session() as session:
try:
@ -397,67 +316,41 @@ class Database:
data["database_version"] = (
session.execute(text("SELECT sqlite_version()" if database == "sqlite" else "SELECT VERSION()")).first() or ["unknown"]
)[0]
metadata = (
session.query(Metadata)
.with_entities(
Metadata.version,
Metadata.integration,
Metadata.is_pro,
Metadata.pro_expire,
Metadata.pro_services,
Metadata.pro_overlapped,
Metadata.pro_status,
Metadata.last_pro_check,
)
.filter_by(id=1)
.first()
)
metadata = session.query(Metadata).filter_by(id=1).first()
if metadata:
data.update(
{
"version": metadata.version,
"integration": metadata.integration,
"is_pro": metadata.is_pro,
"pro_expire": metadata.pro_expire,
"pro_services": metadata.pro_services,
"pro_overlapped": metadata.pro_overlapped,
"pro_status": metadata.pro_status,
"last_pro_check": metadata.last_pro_check,
"default": False,
}
)
for key in data.copy():
if hasattr(metadata, key) and key not in ("database_version", "default"):
data[key] = getattr(metadata, key)
data["default"] = False
except BaseException:
self.logger.debug(f"Can't get the metadata: {format_exc()}")
return data
def check_changes(self) -> Union[Dict[str, bool], bool, str]:
"""Check if either the config, the custom configs, plugins or instances have changed inside the database"""
def set_metadata(self, data: Dict[str, Any]) -> str:
"""Set the metadata values"""
with self.__db_session() as session:
try:
metadata = (
session.query(Metadata)
.with_entities(
Metadata.custom_configs_changed,
Metadata.external_plugins_changed,
Metadata.pro_plugins_changed,
Metadata.config_changed,
Metadata.instances_changed,
)
.filter_by(id=1)
.first()
)
if self.readonly:
return "The database is read-only, the changes will not be saved"
return dict(
custom_configs_changed=metadata is not None and metadata.custom_configs_changed,
external_plugins_changed=metadata is not None and metadata.external_plugins_changed,
pro_plugins_changed=metadata is not None and metadata.pro_plugins_changed,
config_changed=metadata is not None and metadata.config_changed,
instances_changed=metadata is not None and metadata.instances_changed,
)
try:
metadata = session.query(Metadata).get(1)
if not metadata:
return "The metadata are not set yet, try again"
for key, value in data.items():
if not hasattr(metadata, key):
self.logger.warning(f"Metadata key {key} does not exist")
continue
setattr(metadata, key, value)
session.commit()
except BaseException:
return format_exc()
return ""
def checked_changes(self, changes: Optional[List[str]] = None, value: Optional[bool] = False) -> str:
"""Set changed bit for config, custom configs, instances and plugins"""
changes = changes or [

View file

@ -227,35 +227,28 @@ if __name__ == "__main__":
config_files = config.get_config()
bunkerweb_version = Path(sep, "usr", "share", "bunkerweb", "VERSION").read_text().strip()
db_initialized = db.is_initialized()
db_metadata = db.get_metadata()
db_initialized = isinstance(db_metadata, str) or not db_metadata["is_initialized"]
if not db_initialized:
logger.info(
"Database not initialized, initializing ...",
)
logger.info("Database not initialized, initializing ...")
ret, err = db.init_tables(
[config.get_settings(), config.get_plugins("core"), config.get_plugins("external"), config.get_plugins("pro")],
bunkerweb_version,
[config.get_settings(), config.get_plugins("core"), config.get_plugins("external"), config.get_plugins("pro")], bunkerweb_version
)
# Initialize database tables
if err:
logger.error(
f"Exception while initializing database : {err}",
)
logger.error(f"Exception while initializing database : {err}")
sys_exit(1)
elif not ret:
logger.info(
"Database tables are already initialized, skipping creation ...",
)
logger.info("Database tables are already initialized, skipping creation ...")
else:
logger.info("Database tables initialized")
else:
logger.info("Database is already initialized, checking for changes ...")
ret, err = db.init_tables(
[config.get_settings(), config.get_plugins("core"), config.get_plugins("external"), config.get_plugins("pro")],
bunkerweb_version,
[config.get_settings(), config.get_plugins("core"), config.get_plugins("external"), config.get_plugins("pro")], bunkerweb_version
)
if not ret and err:
@ -298,12 +291,7 @@ if __name__ == "__main__":
if apis:
for api in apis:
endpoint_data = api.endpoint.replace("http://", "").split(":")
err = db.add_instance(
endpoint_data[0],
endpoint_data[1].replace("/", ""),
api.host,
changed=False,
)
err = db.add_instance(endpoint_data[0], endpoint_data[1].replace("/", ""), api.host, changed=False)
if err:
logger.warning(err)
@ -312,12 +300,7 @@ if __name__ == "__main__":
changes.append("instances")
logger.info(f"Instance {endpoint_data[0]} successfully saved to database")
else:
err = db.add_instance(
"127.0.0.1",
config_files.get("API_HTTP_PORT", 5000),
config_files.get("API_SERVER_NAME", "bwapi"),
changed=False,
)
err = db.add_instance("127.0.0.1", config_files.get("API_HTTP_PORT", 5000), config_files.get("API_SERVER_NAME", "bwapi"), changed=False)
if err:
logger.warning(err)
@ -333,9 +316,7 @@ if __name__ == "__main__":
except SystemExit as e:
sys_exit(e.code)
except:
logger.error(
f"Exception while executing config saver : {format_exc()}",
)
logger.error(f"Exception while executing config saver : {format_exc()}")
sys_exit(1)
# We're done

View file

@ -49,16 +49,14 @@ class Job:
self.logger = logger or self.db.logger
if not deprecated:
with LOCK:
is_scheduler_first_start = self.db.is_scheduler_first_start()
if not is_scheduler_first_start:
db_metadata = self.db.get_metadata()
if not isinstance(db_metadata, str) and not db_metadata["scheduler_first_start"]:
self.restore_cache(manual=False)
def restore_cache(self, *, job_name: str = "", plugin_id: str = "", manual: bool = True) -> bool:
"""Restore job cache files from database."""
ret = True
with LOCK:
job_cache_files = self.db.get_jobs_cache_files(plugin_id=plugin_id or self.job_path.name) # type: ignore
job_cache_files = self.db.get_jobs_cache_files(plugin_id=plugin_id or self.job_path.name) # type: ignore
job_name = job_name or self.job_name
plugin_cache_files = set()
@ -131,10 +129,9 @@ class Job:
if with_data:
ret_data["data"] = cache_path.read_bytes()
with LOCK:
if not ret_data:
return self.db.get_job_cache_file(job_name or self.job_name, name, service_id=service_id, plugin_id=plugin_id or self.job_path.name, with_info=with_info, with_data=with_data) # type: ignore
ret_data.update(self.db.get_job_cache_file(job_name or self.job_name, name, service_id=service_id, plugin_id=plugin_id or self.job_path.name, with_info=True, with_data=False) or {}) # type: ignore
if not ret_data:
return self.db.get_job_cache_file(job_name or self.job_name, name, service_id=service_id, plugin_id=plugin_id or self.job_path.name, with_info=with_info, with_data=with_data) # type: ignore
ret_data.update(self.db.get_job_cache_file(job_name or self.job_name, name, service_id=service_id, plugin_id=plugin_id or self.job_path.name, with_info=True, with_data=False) or {}) # type: ignore
return ret_data
def is_cached_file(
@ -184,10 +181,9 @@ class Job:
checksum = bytes_hash(content)
try:
with LOCK:
err = self.db.upsert_job_cache(service_id, name, content, job_name=job_name or self.job_name, checksum=checksum) # type: ignore
if err:
ret = False
err = self.db.upsert_job_cache(service_id, name, content, job_name=job_name or self.job_name, checksum=checksum) # type: ignore
if err:
ret = False
if ret and isinstance(file_cache, Path) and delete_file and file_cache != cache_path:
file_cache.unlink(missing_ok=True)
@ -223,8 +219,7 @@ class Job:
rmtree(job_path, ignore_errors=True)
try:
with LOCK:
self.db.delete_job_cache(name, job_name=job_name, service_id=service_id) # type: ignore
self.db.delete_job_cache(name, job_name=job_name, service_id=service_id) # type: ignore
except:
return False, f"exception :\n{format_exc()}"
return ret, err

View file

@ -289,17 +289,21 @@ def api_to_instance(api):
def run_in_slave_mode():
assert SCHEDULER is not None
# Wait for init
while not SCHEDULER.db.is_initialized():
logger.warning("Database is not initialized, retrying in 5s ...")
ready = False
while not ready:
db_metadata = SCHEDULER.db.get_metadata()
env = SCHEDULER.db.get_config()
if isinstance(db_metadata, str) or not db_metadata["is_initialized"]:
logger.warning("Database is not initialized, retrying in 5s ...")
elif not db_metadata["first_config_saved"] or not env:
logger.warning("Database doesn't have any config saved yet, retrying in 5s ...")
else:
ready = True
continue
sleep(5)
# Wait for first config
env = SCHEDULER.db.get_config()
while not SCHEDULER.db.is_first_config_saved() or not env:
logger.warning("Database doesn't have any config saved yet, retrying in 5s ...")
sleep(5)
env = SCHEDULER.db.get_config()
# Instantiate scheduler environment
SCHEDULER.env = env | environ
# Download plugins
pro_plugins = SCHEDULER.db.get_plugins(_type="pro", with_data=True)
@ -372,22 +376,14 @@ if __name__ == "__main__":
run_in_slave_mode()
stop(1)
if INTEGRATION in ("Swarm", "Kubernetes", "Autoconf"):
while not SCHEDULER.db.is_initialized():
logger.warning("Database is not initialized, retrying in 5s ...")
sleep(5)
while not SCHEDULER.db.is_autoconf_loaded():
logger.warning("Autoconf is not loaded yet in the database, retrying in 5s ...")
sleep(5)
db_metadata = SCHEDULER.db.get_metadata()
if (
INTEGRATION in ("Swarm", "Kubernetes", "Autoconf")
isinstance(db_metadata, str)
or (db_metadata["is_initialized"] and SCHEDULER.db.get_config() != dotenv_env)
or not tmp_variables_path.exists()
or not nginx_variables_path.exists()
or (tmp_variables_path.read_text(encoding="utf-8") != nginx_variables_path.read_text(encoding="utf-8"))
or SCHEDULER.db.is_initialized()
and SCHEDULER.db.get_config() != dotenv_env
):
# run the config saver
proc = subprocess_run(
@ -405,19 +401,19 @@ if __name__ == "__main__":
if proc.returncode != 0:
logger.error("Config saver failed, configuration will not work as expected...")
if INTEGRATION not in ("Swarm", "Kubernetes", "Autoconf"):
while not SCHEDULER.db.is_initialized():
logger.warning("Database is not initialized, retrying in 5s ...")
sleep(5)
env = SCHEDULER.db.get_config()
while not SCHEDULER.db.is_first_config_saved() or not env:
logger.warning("Database doesn't have any config saved yet, retrying in 5s ...")
sleep(5)
env = SCHEDULER.db.get_config()
ready = False
while not ready:
db_metadata = SCHEDULER.db.get_metadata()
if isinstance(db_metadata, str) or not db_metadata["is_initialized"]:
logger.warning("Database is not initialized, retrying in 5s ...")
elif INTEGRATION in ("Swarm", "Kubernetes", "Autoconf") and not db_metadata["autoconf_loaded"]:
logger.warning("Autoconf is not loaded yet in the database, retrying in 5s ...")
else:
ready = True
continue
sleep(5)
env = SCHEDULER.db.get_config()
env["DATABASE_URI"] = SCHEDULER.db.database_uri
# Override instances if needed
@ -427,7 +423,7 @@ if __name__ == "__main__":
for instance in override_instances.split(" "):
apis.append(API(instance))
# Instantiate scheduler
# Instantiate scheduler environment
SCHEDULER.env = env | environ
if INTEGRATION in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
@ -440,7 +436,7 @@ if __name__ == "__main__":
sleep(5)
SCHEDULER.db.update_instances([api_to_instance(api) for api in SCHEDULER.apis])
scheduler_first_start = SCHEDULER.db.is_scheduler_first_start()
scheduler_first_start = db_metadata["scheduler_first_start"]
logger.info("Scheduler started ...")
@ -548,11 +544,11 @@ if __name__ == "__main__":
if not SCHEDULER.run_single("download-pro-plugins"):
logger.warning("download-pro-plugins job failed at first start, pro plugins settings set by the user may not be up to date ...")
changes = SCHEDULER.db.check_changes()
if INTEGRATION not in ("Swarm", "Kubernetes", "Autoconf") and (changes["pro_plugins_changed"] or changes["external_plugins_changed"]):
if changes["pro_plugins_changed"]:
db_metadata = SCHEDULER.db.get_metadata()
if INTEGRATION not in ("Swarm", "Kubernetes", "Autoconf") and (db_metadata["pro_plugins_changed"] or db_metadata["external_plugins_changed"]):
if db_metadata["pro_plugins_changed"]:
generate_external_plugins(SCHEDULER.db.get_plugins(_type="pro", with_data=True), original_path=PRO_PLUGINS_PATH)
if changes["external_plugins_changed"]:
if db_metadata["external_plugins_changed"]:
generate_external_plugins(SCHEDULER.db.get_plugins(_type="external", with_data=True))
# run the config saver to save potential ignored external plugins settings
@ -717,7 +713,7 @@ if __name__ == "__main__":
INSTANCES_NEED_GENERATION = False
if scheduler_first_start:
ret = SCHEDULER.db.set_scheduler_first_start()
ret = SCHEDULER.db.set_metadata({"scheduler_first_start": False})
if ret == "The database is read-only, the changes will not be saved":
logger.warning("The database is read-only, the scheduler first start will not be saved")
@ -744,20 +740,20 @@ if __name__ == "__main__":
DB_LOCK_FILE.unlink(missing_ok=True)
changes = SCHEDULER.db.check_changes()
db_metadata = SCHEDULER.db.get_metadata()
if isinstance(changes, str):
raise Exception(f"An error occurred when checking for changes in the database : {changes}")
if isinstance(db_metadata, str):
raise Exception(f"An error occurred when checking for changes in the database : {db_metadata}")
# check if the plugins have changed since last time
if changes["pro_plugins_changed"]:
if db_metadata["pro_plugins_changed"]:
logger.info("Pro plugins changed, generating ...")
PRO_PLUGINS_NEED_GENERATION = True
CONFIG_NEED_GENERATION = True
RUN_JOBS_ONCE = True
NEED_RELOAD = True
if changes["external_plugins_changed"]:
if db_metadata["external_plugins_changed"]:
logger.info("External plugins changed, generating ...")
PLUGINS_NEED_GENERATION = True
CONFIG_NEED_GENERATION = True
@ -765,21 +761,21 @@ if __name__ == "__main__":
NEED_RELOAD = True
# check if the custom configs have changed since last time
if changes["custom_configs_changed"]:
if db_metadata["custom_configs_changed"]:
logger.info("Custom configs changed, generating ...")
CONFIGS_NEED_GENERATION = True
CONFIG_NEED_GENERATION = True
NEED_RELOAD = True
# check if the config have changed since last time
if changes["config_changed"]:
if db_metadata["config_changed"]:
logger.info("Config changed, generating ...")
CONFIG_NEED_GENERATION = True
RUN_JOBS_ONCE = True
NEED_RELOAD = True
# check if the instances have changed since last time
if changes["instances_changed"]:
if db_metadata["instances_changed"]:
logger.info("Instances changed, generating ...")
INSTANCES_NEED_GENERATION = True
CONFIGS_NEED_GENERATION = True

View file

@ -58,13 +58,16 @@ def on_starting(server):
INTEGRATION = get_integration()
if INTEGRATION in ("Swarm", "Kubernetes", "Autoconf"):
while not db.is_autoconf_loaded():
ready = False
while not ready:
db_metadata = db.get_metadata()
if isinstance(db_metadata, str) or not db_metadata["is_initialized"]:
LOGGER.warning("Database is not initialized, retrying in 5s ...")
elif INTEGRATION in ("Swarm", "Kubernetes", "Autoconf") and not db_metadata["autoconf_loaded"]:
LOGGER.warning("Autoconf is not loaded yet in the database, retrying in 5s ...")
sleep(5)
while not db.is_initialized():
LOGGER.warning("Database is not initialized, retrying in 5s ...")
else:
ready = True
continue
sleep(5)
USER_PASSWORD_RX = re_compile(r"^(?=.*?\p{Lowercase_Letter})(?=.*?\p{Uppercase_Letter})(?=.*?\d)(?=.*?[ !\"#$%&'()*+,./:;<=>?@[\\\]^_`{|}~-]).{8,}$")

View file

@ -174,18 +174,24 @@ REVERSE_PROXY_PATH = re_compile(r"^(?P<host>https?://.{1,255}(:((6553[0-5])|(655
def wait_applying():
for i in range(31):
curr_changes = db.check_changes()
if isinstance(curr_changes, str):
app.logger.error(f"An error occurred when checking for changes in the database : {curr_changes}")
elif not any(curr_changes.values()):
break
current_time = datetime.now()
ready = False
while not ready and (datetime.now() - current_time).seconds < 120:
db_metadata = db.get_metadata()
if isinstance(db_metadata, str):
app.logger.error(f"An error occurred when checking for changes in the database : {db_metadata}")
elif not any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "config_changed", "instances_changed")
):
ready = True
continue
else:
app.logger.warning(
"Scheduler is already applying a configuration, retrying in 1 seconds ...",
)
app.logger.warning("Scheduler is already applying a configuration, retrying in 1s ...")
sleep(1)
if i >= 30:
if not ready:
app.logger.error("Too many retries while waiting for scheduler to apply configuration...")
@ -384,9 +390,13 @@ def inject_variables():
ui_data = get_ui_data()
metadata = db.get_metadata()
curr_changes = db.check_changes()
db_metadata = db.get_metadata()
if ui_data.get("PRO_LOADING") and not any(curr_changes.values()):
if ui_data.get("PRO_LOADING") and not any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "config_changed", "instances_changed")
):
ui_data["PRO_LOADING"] = False
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
@ -716,11 +726,11 @@ def account():
# by setting the last check to None
metadata = db.get_metadata()
metadata["last_pro_check"] = None
db.set_pro_metadata(metadata)
db.set_metadata(metadata)
flash("Checking license key to upgrade.", "success")
curr_changes = db.check_changes()
db_metadata = db.get_metadata()
# Reload instances
def update_global_config(threaded: bool = False):
@ -731,7 +741,11 @@ def account():
ui_data = get_ui_data()
ui_data["PRO_LOADING"] = True
if any(curr_changes.values()):
if any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "config_changed", "instances_changed")
):
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
Thread(target=update_global_config, args=(True,)).start()
@ -993,7 +1007,7 @@ def services():
error = 0
curr_changes = db.check_changes()
db_metadata = db.get_metadata()
old_server_name = request.form.get("OLD_SERVER_NAME", "")
operation = request.form["operation"]
@ -1012,7 +1026,11 @@ def services():
threaded=threaded,
)
if any(curr_changes.values()):
if any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "config_changed", "instances_changed")
):
ui_data = get_ui_data()
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
@ -1123,7 +1141,7 @@ def global_config():
if setting and setting["global"] and (setting["value"] != value or setting["value"] == config.get(variable, {"value": None})["value"]):
variables[f"{service}_{variable}"] = value
curr_changes = db.check_changes()
db_metadata = db.get_metadata()
def update_global_config(threaded: bool = False):
wait_applying()
@ -1135,7 +1153,11 @@ def global_config():
if "PRO_LICENSE_KEY" in variables:
ui_data["PRO_LOADING"] = True
if any(curr_changes.values()):
if any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "config_changed", "instances_changed")
):
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
Thread(target=update_global_config, args=(True,)).start()
@ -1303,7 +1325,7 @@ def plugins():
if variables["type"] in ("core", "pro"):
return redirect_flash_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True)
curr_changes = db.check_changes()
db_metadata = db.get_metadata()
def update_plugins(threaded: bool = False): # type: ignore
wait_applying()
@ -1333,7 +1355,11 @@ def plugins():
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
if any(curr_changes.values()):
if any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "config_changed", "instances_changed")
):
ui_data = get_ui_data()
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()
@ -1512,7 +1538,7 @@ def plugins():
if errors >= files_count:
return redirect(url_for("loading", next=url_for("plugins")))
curr_changes = db.check_changes()
db_metadata = db.get_metadata()
def update_plugins(threaded: bool = False):
wait_applying()
@ -1543,7 +1569,11 @@ def plugins():
with LOCK:
TMP_DATA_FILE.write_text(dumps(ui_data), encoding="utf-8")
if any(curr_changes.values()):
if any(
v
for k, v in db_metadata.items()
if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "config_changed", "instances_changed")
):
ui_data = get_ui_data()
ui_data["RELOADING"] = True
ui_data["LAST_RELOAD"] = time()