mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Add new setting DATABASE_URI_READONLY as a fallback in case the database is down or if there are issues with it
This commit is contained in:
parent
b42cdf97cd
commit
eb6c407faa
4 changed files with 221 additions and 73 deletions
|
|
@ -11,7 +11,16 @@
|
|||
"help": "The database URI, following the sqlalchemy format.",
|
||||
"id": "database-uri",
|
||||
"label": "The database URI",
|
||||
"regex": "^(postgresql|mysql|mariadb|sqlite|oracle)(\\+[\\w\\-]+)?:.+$",
|
||||
"regex": "^((postgresql|mysql|mariadb|sqlite)(\\+[\\w\\-]+)?:.+)?$",
|
||||
"type": "text"
|
||||
},
|
||||
"DATABASE_URI_READONLY": {
|
||||
"context": "global",
|
||||
"default": "",
|
||||
"help": "The database URI for read-only operations, it can also serve as a fallback if the main database is down. Following the sqlalchemy format.",
|
||||
"id": "database-uri-readonly",
|
||||
"label": "The database URI for read-only operations",
|
||||
"regex": "^((postgresql|mysql|mariadb|sqlite)(\\+[\\w\\-]+)?:.+)?$",
|
||||
"type": "text"
|
||||
},
|
||||
"DATABASE_LOG_LEVEL": {
|
||||
|
|
|
|||
|
|
@ -68,9 +68,13 @@ def set_sqlite_pragma(dbapi_connection, _):
|
|||
class Database:
|
||||
DB_STRING_RX = re_compile(r"^(?P<database>(mariadb|mysql)(\+pymysql)?|sqlite(\+pysqlite)?|postgresql(\+psycopg)?):/+(?P<path>/[^\s]+)")
|
||||
|
||||
def __init__(self, logger: Logger, sqlalchemy_string: Optional[str] = None, *, ui: bool = False, pool: Optional[bool] = None, log: bool = True) -> None:
|
||||
def __init__(
|
||||
self, logger: Logger, sqlalchemy_string: Optional[str] = None, *, ui: bool = False, pool: Optional[bool] = None, log: bool = True, **kwargs
|
||||
) -> None:
|
||||
"""Initialize the database"""
|
||||
self.logger = logger
|
||||
self.readonly = False
|
||||
self.fallback_readonly = False
|
||||
|
||||
if pool:
|
||||
self.logger.warning("The pool parameter is deprecated, it will be removed in the next version")
|
||||
|
|
@ -81,6 +85,15 @@ class Database:
|
|||
if not sqlalchemy_string:
|
||||
sqlalchemy_string = getenv("DATABASE_URI", "sqlite:////var/lib/bunkerweb/db.sqlite3")
|
||||
|
||||
sqlalchemy_string_readonly = getenv("DATABASE_URI_READONLY", "")
|
||||
|
||||
if not sqlalchemy_string:
|
||||
sqlalchemy_string = sqlalchemy_string_readonly or "sqlite:////var/lib/bunkerweb/db.sqlite3"
|
||||
|
||||
if sqlalchemy_string == sqlalchemy_string_readonly:
|
||||
self.readonly = True
|
||||
self.logger.warning("The database connection is set to read-only, the changes will not be saved")
|
||||
|
||||
match = self.DB_STRING_RX.search(sqlalchemy_string)
|
||||
if not match:
|
||||
self.logger.error(f"Invalid database string provided: {sqlalchemy_string}, exiting...")
|
||||
|
|
@ -105,19 +118,20 @@ class Database:
|
|||
) # ? This is strongly recommended as psycopg is the new way to connect to postgresql
|
||||
|
||||
self.database_uri = sqlalchemy_string
|
||||
self.database_uri_readonly = sqlalchemy_string_readonly
|
||||
error = False
|
||||
|
||||
engine_kwargs = {
|
||||
self._engine_kwargs = {
|
||||
"future": True,
|
||||
"poolclass": QueuePool,
|
||||
"pool_pre_ping": True,
|
||||
"pool_recycle": 1800,
|
||||
"pool_size": 40,
|
||||
"max_overflow": 20,
|
||||
}
|
||||
} | kwargs
|
||||
|
||||
try:
|
||||
self.sql_engine = create_engine(sqlalchemy_string, **engine_kwargs)
|
||||
self.sql_engine = create_engine(sqlalchemy_string, **self._engine_kwargs)
|
||||
except ArgumentError:
|
||||
self.logger.error(f"Invalid database URI: {sqlalchemy_string}")
|
||||
error = True
|
||||
|
|
@ -139,29 +153,37 @@ class Database:
|
|||
|
||||
while not_connected:
|
||||
try:
|
||||
with self.sql_engine.connect() as conn:
|
||||
conn.execute(text("CREATE TABLE IF NOT EXISTS test (id INT)"))
|
||||
conn.execute(text("DROP TABLE test"))
|
||||
if not self.readonly:
|
||||
with self.sql_engine.connect() as conn:
|
||||
conn.execute(text("CREATE TABLE IF NOT EXISTS test (id INT)"))
|
||||
conn.execute(text("DROP TABLE test"))
|
||||
else:
|
||||
with self.sql_engine.connect() as conn:
|
||||
conn.execute(text("SELECT 1"))
|
||||
|
||||
not_connected = False
|
||||
except (OperationalError, DatabaseError) as e:
|
||||
if retries <= 0:
|
||||
self.logger.error(
|
||||
f"Can't connect to database : {format_exc()}",
|
||||
)
|
||||
if not self.readonly and "attempt to write a readonly database" in str(e):
|
||||
self.logger.warning("The database is read-only, trying one last time to connect in read-only mode")
|
||||
self.sql_engine.dispose(close=True)
|
||||
self.sql_engine = create_engine(sqlalchemy_string_readonly, **self._engine_kwargs)
|
||||
self.readonly = True
|
||||
self.fallback_readonly = True
|
||||
continue
|
||||
self.logger.error(f"Can't connect to database : {format_exc()}")
|
||||
_exit(1)
|
||||
|
||||
if "attempt to write a readonly database" in str(e):
|
||||
if log:
|
||||
self.logger.warning("The database is read-only, waiting for it to become writable. Retrying in 5 seconds ...")
|
||||
self.sql_engine.dispose(close=True)
|
||||
self.sql_engine = create_engine(sqlalchemy_string, **engine_kwargs)
|
||||
self.sql_engine = create_engine(sqlalchemy_string, **self._engine_kwargs)
|
||||
if "Unknown table" in str(e):
|
||||
not_connected = False
|
||||
continue
|
||||
elif log:
|
||||
self.logger.warning(
|
||||
"Can't connect to database, retrying in 5 seconds ...",
|
||||
)
|
||||
self.logger.warning("Can't connect to database, retrying in 5 seconds ...")
|
||||
retries -= 1
|
||||
sleep(5)
|
||||
except BaseException:
|
||||
|
|
@ -170,7 +192,7 @@ class Database:
|
|||
|
||||
self.suffix_rx = re_compile(r"_\d+$")
|
||||
if log:
|
||||
self.logger.info("✅ Database connection established")
|
||||
self.logger.info(f"✅ Database connection established{'' if not self.readonly else ' in read-only mode'}")
|
||||
|
||||
def __del__(self) -> None:
|
||||
"""Close the database"""
|
||||
|
|
@ -180,6 +202,25 @@ class Database:
|
|||
if self.sql_engine:
|
||||
self.sql_engine.dispose()
|
||||
|
||||
def retry_connection(self) -> None:
|
||||
"""Retry the connection to the database"""
|
||||
|
||||
assert self.sql_engine is not None
|
||||
|
||||
try:
|
||||
self.sql_engine.dispose(close=True)
|
||||
self.sql_engine = create_engine(self.database_uri, **self._engine_kwargs)
|
||||
self.fallback_readonly = False
|
||||
self.readonly = False
|
||||
except (OperationalError, DatabaseError) as e:
|
||||
if self.database_uri_readonly and "attempt to write a readonly database" in str(e):
|
||||
self.sql_engine.dispose(close=True)
|
||||
self.sql_engine = create_engine(self.database_uri_readonly, **self._engine_kwargs)
|
||||
self.fallback_readonly = True
|
||||
self.readonly = True
|
||||
return
|
||||
raise e
|
||||
|
||||
@contextmanager
|
||||
def __db_session(self) -> Any:
|
||||
try:
|
||||
|
|
@ -188,13 +229,27 @@ class Database:
|
|||
self.logger.error("The database engine is not initialized")
|
||||
_exit(1)
|
||||
|
||||
if self.fallback_readonly:
|
||||
# ? If the database is forced to be read-only, we try to connect as a non read-only user every time until the database is writable
|
||||
with suppress(OperationalError, DatabaseError):
|
||||
self.retry_connection()
|
||||
|
||||
with self.sql_engine.connect() as conn:
|
||||
session_factory = sessionmaker(bind=conn, autoflush=True, expire_on_commit=False)
|
||||
session = scoped_session(session_factory)
|
||||
try:
|
||||
yield session
|
||||
except BaseException:
|
||||
except BaseException as e:
|
||||
session.rollback()
|
||||
|
||||
if self.database_uri_readonly and "attempt to write a readonly database" in str(e):
|
||||
self.sql_engine.dispose(close=True)
|
||||
self.sql_engine = create_engine(self.database_uri_readonly, **self._engine_kwargs)
|
||||
self.fallback_readonly = True
|
||||
self.readonly = True
|
||||
self.logger.warning("The database is read-only, falling back to read-only mode")
|
||||
return
|
||||
|
||||
raise
|
||||
finally:
|
||||
session.remove()
|
||||
|
|
@ -202,6 +257,9 @@ class Database:
|
|||
def set_autoconf_load(self, value: bool = True) -> str:
|
||||
"""Set the autoconf_loaded value"""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
try:
|
||||
metadata = session.query(Metadata).get(1)
|
||||
|
||||
|
|
@ -227,6 +285,9 @@ class Database:
|
|||
def set_scheduler_first_start(self, value: bool = False) -> str:
|
||||
"""Set the scheduler_first_start value"""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
try:
|
||||
metadata = session.query(Metadata).get(1)
|
||||
|
||||
|
|
@ -243,6 +304,9 @@ class Database:
|
|||
def set_pro_metadata(self, data: Dict[Literal["is_pro", "pro_expire", "pro_status", "pro_overlapped", "pro_services"], Any] = {}) -> str:
|
||||
"""Set the pro metadata values"""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
try:
|
||||
metadata = session.query(Metadata).get(1)
|
||||
|
||||
|
|
@ -287,6 +351,9 @@ class Database:
|
|||
def initialize_db(self, version: str, integration: str = "Unknown") -> str:
|
||||
"""Initialize the database"""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
try:
|
||||
metadata = session.query(Metadata).get(1)
|
||||
|
||||
|
|
@ -400,6 +467,9 @@ class Database:
|
|||
"instances",
|
||||
]
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
try:
|
||||
metadata = session.query(Metadata).get(1)
|
||||
|
||||
|
|
@ -426,6 +496,10 @@ class Database:
|
|||
|
||||
def init_tables(self, default_plugins: List[dict], bunkerweb_version: str) -> Tuple[bool, str]:
|
||||
"""Initialize the database tables and return the result"""
|
||||
|
||||
if self.readonly:
|
||||
return False, "The database is read-only, the changes will not be saved"
|
||||
|
||||
assert self.sql_engine is not None, "The database engine is not initialized"
|
||||
|
||||
inspector = inspect(self.sql_engine)
|
||||
|
|
@ -846,6 +920,9 @@ class Database:
|
|||
"""Save the config in the database"""
|
||||
to_put = []
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
# Delete all the old config
|
||||
session.query(Global_values).filter(Global_values.method == method).delete()
|
||||
session.query(Services_settings).filter(Services_settings.method == method).delete()
|
||||
|
|
@ -1098,6 +1175,9 @@ class Database:
|
|||
"""Save the custom configs in the database"""
|
||||
message = ""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
# Delete all the old config
|
||||
session.query(Custom_configs).filter(Custom_configs.method == method).delete()
|
||||
|
||||
|
|
@ -1303,6 +1383,9 @@ class Database:
|
|||
def update_job(self, plugin_id: str, job_name: str, success: bool) -> str:
|
||||
"""Update the job last_run in the database"""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
job = session.query(Jobs).filter_by(plugin_id=plugin_id, name=job_name).first()
|
||||
|
||||
if not job:
|
||||
|
|
@ -1318,7 +1401,7 @@ class Database:
|
|||
|
||||
return ""
|
||||
|
||||
def delete_job_cache(self, file_name: str, *, job_name: Optional[str] = None, service_id: Optional[str] = None):
|
||||
def delete_job_cache(self, file_name: str, *, job_name: Optional[str] = None, service_id: Optional[str] = None) -> str:
|
||||
job_name = job_name or argv[0].replace(".py", "")
|
||||
filters = {"file_name": file_name}
|
||||
if job_name:
|
||||
|
|
@ -1327,7 +1410,15 @@ class Database:
|
|||
filters["service_id"] = service_id
|
||||
|
||||
with self.__db_session() as session:
|
||||
session.query(Jobs_cache).filter_by(**filters).delete()
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
try:
|
||||
session.query(Jobs_cache).filter_by(**filters).delete()
|
||||
except BaseException:
|
||||
return format_exc()
|
||||
|
||||
return ""
|
||||
|
||||
def upsert_job_cache(
|
||||
self,
|
||||
|
|
@ -1342,6 +1433,9 @@ class Database:
|
|||
job_name = job_name or argv[0].replace(".py", "")
|
||||
service_id = service_id or None
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
cache = session.query(Jobs_cache).filter_by(job_name=job_name, service_id=service_id, file_name=file_name).first()
|
||||
|
||||
if not cache:
|
||||
|
|
@ -1372,6 +1466,9 @@ class Database:
|
|||
to_put = []
|
||||
changes = False
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
db_plugins = session.query(Plugins).with_entities(Plugins.id).filter_by(type=_type).all()
|
||||
|
||||
db_ids = []
|
||||
|
|
@ -2062,6 +2159,9 @@ class Database:
|
|||
def add_instance(self, hostname: str, port: int, server_name: str, changed: Optional[bool] = True) -> str:
|
||||
"""Add instance."""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
db_instance = session.query(Instances).with_entities(Instances.hostname).filter_by(hostname=hostname).first()
|
||||
|
||||
if db_instance is not None:
|
||||
|
|
@ -2086,6 +2186,9 @@ class Database:
|
|||
"""Update instances."""
|
||||
to_put = []
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
session.query(Instances).delete()
|
||||
|
||||
for instance in instances:
|
||||
|
|
@ -2175,7 +2278,11 @@ class Database:
|
|||
def create_ui_user(self, username: str, password: bytes, *, secret_token: Optional[str] = None, method: str = "manual") -> str:
|
||||
"""Create ui user."""
|
||||
with self.__db_session() as session:
|
||||
if self.get_ui_user():
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
user = session.query(Users).filter_by(id=1).first()
|
||||
if user:
|
||||
return "User already exists"
|
||||
|
||||
session.add(Users(id=1, username=username, password=password.decode("utf-8"), secret_token=secret_token, method=method))
|
||||
|
|
@ -2192,6 +2299,9 @@ class Database:
|
|||
) -> str:
|
||||
"""Update ui user."""
|
||||
with self.__db_session() as session:
|
||||
if self.readonly:
|
||||
return "The database is read-only, the changes will not be saved"
|
||||
|
||||
user = session.query(Users).filter_by(id=1).first()
|
||||
if not user:
|
||||
return "User not found"
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from contextlib import suppress
|
||||
from copy import deepcopy
|
||||
from functools import partial
|
||||
from glob import glob
|
||||
|
|
@ -45,7 +46,7 @@ class JobScheduler(ApiCaller):
|
|||
super().__init__(apis or [])
|
||||
self.__logger = logger or setup_logger("Scheduler", getenv("LOG_LEVEL", "INFO"))
|
||||
self.__integration = integration
|
||||
self.__db = db or Database(self.__logger)
|
||||
self.db = db or Database(self.__logger)
|
||||
self.__env = env or {}
|
||||
self.__env.update(environ)
|
||||
self.__jobs = self.__get_jobs()
|
||||
|
|
@ -76,7 +77,7 @@ class JobScheduler(ApiCaller):
|
|||
apis = []
|
||||
try:
|
||||
with self.__thread_lock:
|
||||
instances = self.__db.get_instances()
|
||||
instances = self.db.get_instances()
|
||||
for instance in instances:
|
||||
api = API(f"http://{instance['hostname']}:{instance['port']}", host=instance["server_name"])
|
||||
apis.append(api)
|
||||
|
|
@ -204,7 +205,7 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
def __update_job(self, plugin: str, name: str, success: bool):
|
||||
with self.__thread_lock:
|
||||
err = self.__db.update_job(plugin, name, success)
|
||||
err = self.db.update_job(plugin, name, success)
|
||||
|
||||
if not err:
|
||||
self.__logger.info(f"Successfully updated database for the job {name} from plugin {plugin}")
|
||||
|
|
@ -225,6 +226,15 @@ class JobScheduler(ApiCaller):
|
|||
self.__logger.error(f"Exception while scheduling jobs for plugin {plugin} : {format_exc()}")
|
||||
|
||||
def run_pending(self) -> bool:
|
||||
if self.db.readonly:
|
||||
if self.db.fallback_readonly:
|
||||
with suppress(BaseException):
|
||||
self.db.retry_connection()
|
||||
|
||||
if self.db.readonly:
|
||||
self.__logger.error("Database is in read-only mode, jobs will not be executed")
|
||||
return True
|
||||
|
||||
threads = []
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
|
|
@ -267,6 +277,15 @@ class JobScheduler(ApiCaller):
|
|||
return success
|
||||
|
||||
def run_once(self) -> bool:
|
||||
if self.db.readonly:
|
||||
if self.db.fallback_readonly:
|
||||
with suppress(BaseException):
|
||||
self.db.retry_connection()
|
||||
|
||||
if self.db.readonly:
|
||||
self.__logger.error("Database is in read-only mode, jobs will not be executed")
|
||||
return True
|
||||
|
||||
threads = []
|
||||
self.__job_success = True
|
||||
self.__job_reload = False
|
||||
|
|
@ -290,6 +309,15 @@ class JobScheduler(ApiCaller):
|
|||
return ret
|
||||
|
||||
def run_single(self, job_name: str) -> bool:
|
||||
if self.db.readonly:
|
||||
if self.db.fallback_readonly:
|
||||
with suppress(BaseException):
|
||||
self.db.retry_connection()
|
||||
|
||||
if self.db.readonly:
|
||||
self.__logger.error("Database is in read-only mode, jobs will not be executed")
|
||||
return True
|
||||
|
||||
if self.__lock:
|
||||
self.__lock.acquire()
|
||||
|
||||
|
|
|
|||
|
|
@ -272,33 +272,32 @@ def api_to_instance(api):
|
|||
}
|
||||
|
||||
|
||||
def run_in_slave_mode(db: Database, dotenv_env: Dict[str, Any]):
|
||||
# Instantiate db
|
||||
db = Database(logger, sqlalchemy_string=dotenv_env.get("DATABASE_URI", getenv("DATABASE_URI", None)))
|
||||
def run_in_slave_mode():
|
||||
assert SCHEDULER is not None
|
||||
|
||||
# Wait for init
|
||||
while not db.is_initialized():
|
||||
while not SCHEDULER.db.is_initialized():
|
||||
logger.warning("Database is not initialized, retrying in 5s ...")
|
||||
sleep(5)
|
||||
|
||||
# Wait for first config
|
||||
env = db.get_config()
|
||||
while not db.is_first_config_saved() or not env:
|
||||
env = SCHEDULER.db.get_config()
|
||||
while not SCHEDULER.db.is_first_config_saved() or not env:
|
||||
logger.warning("Database doesn't have any config saved yet, retrying in 5s ...")
|
||||
sleep(5)
|
||||
env = db.get_config()
|
||||
env = SCHEDULER.db.get_config()
|
||||
|
||||
# Download plugins
|
||||
pro_plugins = db.get_plugins(_type="pro", with_data=True)
|
||||
pro_plugins = SCHEDULER.db.get_plugins(_type="pro", with_data=True)
|
||||
generate_external_plugins(pro_plugins, original_path=PRO_PLUGINS_PATH)
|
||||
external_plugins = db.get_plugins(_type="external", with_data=True)
|
||||
external_plugins = SCHEDULER.db.get_plugins(_type="external", with_data=True)
|
||||
generate_external_plugins(external_plugins)
|
||||
|
||||
# Download custom configs
|
||||
generate_custom_configs(db.get_custom_configs())
|
||||
generate_custom_configs(SCHEDULER.db.get_custom_configs())
|
||||
|
||||
# Download caches
|
||||
generate_caches(pro_plugins + external_plugins, db)
|
||||
generate_caches(pro_plugins + external_plugins, SCHEDULER.db)
|
||||
|
||||
# Gen config
|
||||
content = ""
|
||||
|
|
@ -353,18 +352,18 @@ if __name__ == "__main__":
|
|||
nginx_variables_path = Path(sep, "etc", "nginx", "variables.env")
|
||||
dotenv_env = dotenv_values(str(tmp_variables_path))
|
||||
|
||||
db = Database(logger, sqlalchemy_string=dotenv_env.get("DATABASE_URI", getenv("DATABASE_URI", None)))
|
||||
SCHEDULER = JobScheduler(environ, logger, INTEGRATION, db=Database(logger, sqlalchemy_string=dotenv_env.get("DATABASE_URI", getenv("DATABASE_URI", None)))) # type: ignore
|
||||
|
||||
if SLAVE_MODE:
|
||||
run_in_slave_mode(db, dotenv_env)
|
||||
run_in_slave_mode()
|
||||
stop(1)
|
||||
|
||||
if INTEGRATION in ("Swarm", "Kubernetes", "Autoconf"):
|
||||
while not db.is_initialized():
|
||||
while not SCHEDULER.db.is_initialized():
|
||||
logger.warning("Database is not initialized, retrying in 5s ...")
|
||||
sleep(5)
|
||||
|
||||
while not db.is_autoconf_loaded():
|
||||
while not SCHEDULER.db.is_autoconf_loaded():
|
||||
logger.warning("Autoconf is not loaded yet in the database, retrying in 5s ...")
|
||||
sleep(5)
|
||||
|
||||
|
|
@ -373,8 +372,8 @@ if __name__ == "__main__":
|
|||
or not tmp_variables_path.exists()
|
||||
or not nginx_variables_path.exists()
|
||||
or (tmp_variables_path.read_text(encoding="utf-8") != nginx_variables_path.read_text(encoding="utf-8"))
|
||||
or db.is_initialized()
|
||||
and db.get_config() != dotenv_env
|
||||
or SCHEDULER.db.is_initialized()
|
||||
and SCHEDULER.db.get_config() != dotenv_env
|
||||
):
|
||||
# run the config saver
|
||||
proc = subprocess_run(
|
||||
|
|
@ -393,19 +392,19 @@ if __name__ == "__main__":
|
|||
logger.error("Config saver failed, configuration will not work as expected...")
|
||||
|
||||
if INTEGRATION not in ("Swarm", "Kubernetes", "Autoconf"):
|
||||
while not db.is_initialized():
|
||||
while not SCHEDULER.db.is_initialized():
|
||||
logger.warning("Database is not initialized, retrying in 5s ...")
|
||||
sleep(5)
|
||||
|
||||
env = db.get_config()
|
||||
while not db.is_first_config_saved() or not env:
|
||||
env = SCHEDULER.db.get_config()
|
||||
while not SCHEDULER.db.is_first_config_saved() or not env:
|
||||
logger.warning("Database doesn't have any config saved yet, retrying in 5s ...")
|
||||
sleep(5)
|
||||
env = db.get_config()
|
||||
env = SCHEDULER.db.get_config()
|
||||
|
||||
env = db.get_config()
|
||||
env = SCHEDULER.db.get_config()
|
||||
|
||||
env["DATABASE_URI"] = db.database_uri
|
||||
env["DATABASE_URI"] = SCHEDULER.db.database_uri
|
||||
|
||||
# Override instances if needed
|
||||
override_instances = env.get("OVERRIDE_INSTANCES", "")
|
||||
|
|
@ -415,7 +414,7 @@ if __name__ == "__main__":
|
|||
apis.append(API(instance))
|
||||
|
||||
# Instantiate scheduler
|
||||
SCHEDULER = JobScheduler(env | environ, logger, INTEGRATION, db=db, apis=apis)
|
||||
SCHEDULER.env = env | environ
|
||||
|
||||
if INTEGRATION in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
|
||||
# Automatically setup the scheduler apis
|
||||
|
|
@ -425,16 +424,16 @@ if __name__ == "__main__":
|
|||
if not SCHEDULER.apis:
|
||||
logger.warning("No BunkerWeb API found, retrying in 5s ...")
|
||||
sleep(5)
|
||||
db.update_instances([api_to_instance(api) for api in SCHEDULER.apis])
|
||||
SCHEDULER.db.update_instances([api_to_instance(api) for api in SCHEDULER.apis])
|
||||
|
||||
scheduler_first_start = db.is_scheduler_first_start()
|
||||
scheduler_first_start = SCHEDULER.db.is_scheduler_first_start()
|
||||
|
||||
logger.info("Scheduler started ...")
|
||||
|
||||
# Checking if any custom config has been created by the user
|
||||
logger.info("Checking if there are any changes in custom configs ...")
|
||||
custom_configs = []
|
||||
db_configs = db.get_custom_configs()
|
||||
db_configs = SCHEDULER.db.get_custom_configs()
|
||||
changes = False
|
||||
for file in CUSTOM_CONFIGS_PATH.rglob("*.conf"):
|
||||
if len(file.parts) > len(CUSTOM_CONFIGS_PATH.parts) + 3:
|
||||
|
|
@ -460,12 +459,12 @@ if __name__ == "__main__":
|
|||
changes = changes or {hash(dict_to_frozenset(d)) for d in custom_configs} != {hash(dict_to_frozenset(d)) for d in db_configs}
|
||||
|
||||
if changes:
|
||||
err = db.save_custom_configs(custom_configs, "manual")
|
||||
err = SCHEDULER.db.save_custom_configs(custom_configs, "manual")
|
||||
if err:
|
||||
logger.error(f"Couldn't save some manually created custom configs to database: {err}")
|
||||
|
||||
if (scheduler_first_start and db_configs) or changes:
|
||||
generate_custom_configs(db.get_custom_configs())
|
||||
generate_custom_configs(SCHEDULER.db.get_custom_configs())
|
||||
|
||||
del custom_configs, db_configs
|
||||
|
||||
|
|
@ -503,7 +502,7 @@ if __name__ == "__main__":
|
|||
| ({"jobs": jobs} if jobs else {})
|
||||
)
|
||||
|
||||
db_plugins = db.get_plugins(_type=_type)
|
||||
db_plugins = SCHEDULER.db.get_plugins(_type=_type)
|
||||
tmp_db_plugins = []
|
||||
for db_plugin in db_plugins.copy():
|
||||
db_plugin.pop("method", None)
|
||||
|
|
@ -512,12 +511,12 @@ if __name__ == "__main__":
|
|||
changes = {hash(dict_to_frozenset(d)) for d in tmp_external_plugins} != {hash(dict_to_frozenset(d)) for d in tmp_db_plugins}
|
||||
|
||||
if changes:
|
||||
err = db.update_external_plugins(external_plugins, _type=_type, delete_missing=True)
|
||||
err = SCHEDULER.db.update_external_plugins(external_plugins, _type=_type, delete_missing=True)
|
||||
if err:
|
||||
logger.error(f"Couldn't save some manually added {_type} plugins to database: {err}")
|
||||
|
||||
if (scheduler_first_start and db_plugins) or changes:
|
||||
generate_external_plugins(db.get_plugins(_type=_type, with_data=True), original_path=plugin_path)
|
||||
generate_external_plugins(SCHEDULER.db.get_plugins(_type=_type, with_data=True), original_path=plugin_path)
|
||||
|
||||
check_plugin_changes("external")
|
||||
check_plugin_changes("pro")
|
||||
|
|
@ -531,12 +530,12 @@ if __name__ == "__main__":
|
|||
if not SCHEDULER.run_single("download-pro-plugins"):
|
||||
logger.warning("download-pro-plugins job failed at first start, pro plugins settings set by the user may not be up to date ...")
|
||||
|
||||
changes = db.check_changes()
|
||||
changes = SCHEDULER.db.check_changes()
|
||||
if INTEGRATION not in ("Swarm", "Kubernetes", "Autoconf") and (changes["pro_plugins_changed"] or changes["external_plugins_changed"]):
|
||||
if changes["pro_plugins_changed"]:
|
||||
generate_external_plugins(db.get_plugins(_type="pro", with_data=True), original_path=PRO_PLUGINS_PATH)
|
||||
generate_external_plugins(SCHEDULER.db.get_plugins(_type="pro", with_data=True), original_path=PRO_PLUGINS_PATH)
|
||||
if changes["external_plugins_changed"]:
|
||||
generate_external_plugins(db.get_plugins(_type="external", with_data=True))
|
||||
generate_external_plugins(SCHEDULER.db.get_plugins(_type="external", with_data=True))
|
||||
|
||||
# run the config saver to save potential ignored external plugins settings
|
||||
logger.info("Running config saver to save potential ignored external plugins settings ...")
|
||||
|
|
@ -557,8 +556,8 @@ if __name__ == "__main__":
|
|||
)
|
||||
|
||||
SCHEDULER.update_jobs()
|
||||
env = db.get_config()
|
||||
env["DATABASE_URI"] = db.database_uri
|
||||
env = SCHEDULER.db.get_config()
|
||||
env["DATABASE_URI"] = SCHEDULER.db.database_uri
|
||||
|
||||
logger.info("Executing scheduler ...")
|
||||
|
||||
|
|
@ -582,7 +581,7 @@ if __name__ == "__main__":
|
|||
else:
|
||||
logger.info(f"Successfully sent {CACHE_PATH} folder")
|
||||
|
||||
def listen_for_instances_reload(db: Database):
|
||||
def listen_for_instances_reload():
|
||||
from docker import DockerClient
|
||||
|
||||
global SCHEDULER
|
||||
|
|
@ -592,12 +591,12 @@ if __name__ == "__main__":
|
|||
if event["Action"] in ("start", "die"):
|
||||
logger.info(f"🐋 Detected {event['Action']} event on container {event['Actor']['Attributes']['name']}")
|
||||
SCHEDULER.auto_setup()
|
||||
db.update_instances([api_to_instance(api) for api in SCHEDULER.apis], changed=event["Action"] == "die")
|
||||
SCHEDULER.db.update_instances([api_to_instance(api) for api in SCHEDULER.apis], changed=event["Action"] == "die")
|
||||
if event["Action"] == "start":
|
||||
db.checked_changes(value=True)
|
||||
SCHEDULER.db.checked_changes(value=True)
|
||||
|
||||
if INTEGRATION == "Docker" and not override_instances:
|
||||
Thread(target=listen_for_instances_reload, args=(db,), name="listen_for_instances_reload").start()
|
||||
Thread(target=listen_for_instances_reload, name="listen_for_instances_reload").start()
|
||||
|
||||
while True:
|
||||
threads.clear()
|
||||
|
|
@ -685,7 +684,7 @@ if __name__ == "__main__":
|
|||
except:
|
||||
logger.error(f"Exception while reloading after running jobs once scheduling : {format_exc()}")
|
||||
|
||||
ret = db.checked_changes(CHANGES)
|
||||
ret = SCHEDULER.db.checked_changes(CHANGES)
|
||||
|
||||
if ret:
|
||||
logger.error(f"An error occurred when setting the changes to checked in the database : {ret}")
|
||||
|
|
@ -700,9 +699,11 @@ if __name__ == "__main__":
|
|||
INSTANCES_NEED_GENERATION = False
|
||||
|
||||
if scheduler_first_start:
|
||||
ret = db.set_scheduler_first_start()
|
||||
ret = SCHEDULER.db.set_scheduler_first_start()
|
||||
|
||||
if ret:
|
||||
if ret == "The database is read-only, the changes will not be saved":
|
||||
logger.warning("The database is read-only, the scheduler first start will not be saved")
|
||||
elif ret:
|
||||
logger.error(f"An error occurred when setting the scheduler first start : {ret}")
|
||||
stop(1)
|
||||
scheduler_first_start = False
|
||||
|
|
@ -725,7 +726,7 @@ if __name__ == "__main__":
|
|||
|
||||
DB_LOCK_FILE.unlink(missing_ok=True)
|
||||
|
||||
changes = db.check_changes()
|
||||
changes = SCHEDULER.db.check_changes()
|
||||
|
||||
if isinstance(changes, str):
|
||||
raise Exception(f"An error occurred when checking for changes in the database : {changes}")
|
||||
|
|
@ -783,22 +784,22 @@ if __name__ == "__main__":
|
|||
|
||||
if CONFIGS_NEED_GENERATION:
|
||||
CHANGES.append("custom_configs")
|
||||
generate_custom_configs(db.get_custom_configs())
|
||||
generate_custom_configs(SCHEDULER.db.get_custom_configs())
|
||||
|
||||
if PLUGINS_NEED_GENERATION:
|
||||
CHANGES.append("external_plugins")
|
||||
generate_external_plugins(db.get_plugins(_type="external", with_data=True))
|
||||
generate_external_plugins(SCHEDULER.db.get_plugins(_type="external", with_data=True))
|
||||
SCHEDULER.update_jobs()
|
||||
|
||||
if PRO_PLUGINS_NEED_GENERATION:
|
||||
CHANGES.append("pro_plugins")
|
||||
generate_external_plugins(db.get_plugins(_type="pro", with_data=True), original_path=PRO_PLUGINS_PATH)
|
||||
generate_external_plugins(SCHEDULER.db.get_plugins(_type="pro", with_data=True), original_path=PRO_PLUGINS_PATH)
|
||||
SCHEDULER.update_jobs()
|
||||
|
||||
if CONFIG_NEED_GENERATION:
|
||||
CHANGES.append("config")
|
||||
env = db.get_config()
|
||||
env["DATABASE_URI"] = db.database_uri
|
||||
env = SCHEDULER.db.get_config()
|
||||
env["DATABASE_URI"] = SCHEDULER.db.database_uri
|
||||
|
||||
except:
|
||||
logger.error(f"Exception while executing scheduler : {format_exc()}")
|
||||
|
|
|
|||
Loading…
Reference in a new issue