Add basic scheduler healthcheck on BunkerWeb instances while waiting for the health endpoint

This commit is contained in:
Théophile Diot 2024-08-15 20:32:48 +01:00
parent ad5b5443d4
commit a6fe96db50
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
3 changed files with 66 additions and 33 deletions

View file

@ -3164,6 +3164,27 @@ class Database:
return ""
def update_instance(self, hostname: str, status: str) -> str:
"""Update instance."""
with self._db_session() as session:
if self.readonly:
return "The database is read-only, the changes will not be saved"
db_instance = session.query(Instances).filter_by(hostname=hostname).first()
if db_instance is None:
return f"Instance {hostname} does not exist, will not be updated."
db_instance.status = status
db_instance.last_seen = datetime.now()
try:
session.commit()
except BaseException as e:
return f"An error occurred while updating the instance {hostname}.\n{e}"
return ""
def get_instances(self, *, method: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get instances."""
with self._db_session() as session:

View file

@ -1,7 +1,5 @@
#!/usr/bin/env python3
from datetime import datetime, timezone
from functools import partial
from sqlalchemy import TEXT, Boolean, Column, DateTime, Enum, ForeignKey, Identity, Integer, LargeBinary, String, func
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.schema import UniqueConstraint
@ -208,7 +206,7 @@ class Instances(Base):
status = Column(INSTANCE_STATUS_ENUM, nullable=False, default="loading")
method = Column(METHODS_ENUM, nullable=False, default="manual")
creation_date = Column(DateTime, nullable=False, server_default=func.now())
last_seen = Column(DateTime, nullable=True, server_default=func.now(), onupdate=partial(datetime.now, timezone.utc))
last_seen = Column(DateTime, nullable=True, server_default=func.now())
class Bw_cli_commands(Base):

View file

@ -33,7 +33,8 @@ from Database import Database # type: ignore
from JobScheduler import JobScheduler
from jobs import Job # type: ignore
from API import API # type: ignore
from ApiCaller import ApiCaller # type: ignore # type: ignore
# from ApiCaller import ApiCaller # type: ignore # TODO: uncomment when the healthcheck endpoint is ready @fl0ppy-d1sk
APPLYING_CHANGES = Event()
RUN = True
@ -82,11 +83,11 @@ SCHEDULER_TMP_ENV_PATH.touch()
DB_LOCK_FILE = Path(sep, "var", "lib", "bunkerweb", "db.lock")
LOGGER = setup_logger("Scheduler", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
HEALTHCHECK_INTERVAL = getenv("HEALTHCHECK_INTERVAL", "10")
HEALTHCHECK_INTERVAL = getenv("HEALTHCHECK_INTERVAL", "30")
if not HEALTHCHECK_INTERVAL.isdigit():
LOGGER.error("HEALTHCHECK_INTERVAL must be an integer, defaulting to 10")
HEALTHCHECK_INTERVAL = 10
LOGGER.error("HEALTHCHECK_INTERVAL must be an integer, defaulting to 30")
HEALTHCHECK_INTERVAL = 30
HEALTHCHECK_INTERVAL = int(HEALTHCHECK_INTERVAL)
HEALTHCHECK_EVENT = Event()
@ -419,8 +420,6 @@ def run_in_slave_mode(): # TODO: Refactor this feature
def healthcheck_job():
return # TODO: remove this when the healthcheck endpoint is ready @fl0ppy-d1sk
if HEALTHCHECK_EVENT.is_set():
LOGGER.warning("Healthcheck job is already running, skipping execution ...")
return
@ -435,40 +434,56 @@ def healthcheck_job():
for db_instance in SCHEDULER.db.get_instances():
try:
bw_instance = API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"])
sent, err, status, resp = bw_instance.request("GET", "health")
sent, err, status, resp = bw_instance.request("GET", "ping")
# sent, err, status, resp = bw_instance.request("GET", "health") # TODO: use health instead when the healthcheck endpoint is ready @fl0ppy-d1sk
success = True
if not sent:
LOGGER.warning(
f"instances_healthcheck - Can't send API request to {bw_instance.endpoint}health : {err}, healthcheck will be retried in {HEALTHCHECK_INTERVAL} seconds ..."
)
if bw_instance in SCHEDULER.apis:
SCHEDULER.apis.remove(bw_instance)
continue
if status != 200:
success = False
elif status != 200:
LOGGER.warning(
f"instances_healthcheck - Error while sending API request to {bw_instance.endpoint}health : status = {resp['status']}, msg = {resp['msg']}, healthcheck will be retried in {HEALTHCHECK_INTERVAL} seconds ..."
)
success = False
if not success:
ret = SCHEDULER.db.update_instance(db_instance["hostname"], "down")
if ret:
LOGGER.error(f"instances_healthcheck - Couldn't update instance {bw_instance.endpoint} status to down in the database: {ret}")
if bw_instance in SCHEDULER.apis:
SCHEDULER.apis.remove(bw_instance)
continue
if resp["state"] == "loading":
LOGGER.info(f"instances_healthcheck - Instance {bw_instance.endpoint} is loading, sending config ...")
api_caller = ApiCaller([bw_instance])
api_caller.send_files(CUSTOM_CONFIGS_PATH, "/custom_configs")
api_caller.send_files(EXTERNAL_PLUGINS_PATH, "/plugins")
api_caller.send_files(PRO_PLUGINS_PATH, "/pro_plugins")
api_caller.send_files(join(sep, "etc", "nginx"), "/confs")
api_caller.send_files(CACHE_PATH, "/cache")
if api_caller.send_to_apis("POST", "/reload")[0]:
LOGGER.info(f"Successfully reloaded instance {bw_instance.endpoint}")
else:
LOGGER.error(f"Error while reloading instance {bw_instance.endpoint}")
elif resp["state"] == "down":
LOGGER.warning(f"instances_healthcheck - Instance {bw_instance.endpoint} is down")
if bw_instance in SCHEDULER.apis:
SCHEDULER.apis.remove(bw_instance)
continue
# if resp["state"] == "loading": # TODO: uncomment when the healthcheck endpoint is ready @fl0ppy-d1sk
# LOGGER.info(f"instances_healthcheck - Instance {bw_instance.endpoint} is loading, sending config ...")
# api_caller = ApiCaller([bw_instance])
# api_caller.send_files(CUSTOM_CONFIGS_PATH, "/custom_configs")
# api_caller.send_files(EXTERNAL_PLUGINS_PATH, "/plugins")
# api_caller.send_files(PRO_PLUGINS_PATH, "/pro_plugins")
# api_caller.send_files(join(sep, "etc", "nginx"), "/confs")
# api_caller.send_files(CACHE_PATH, "/cache")
# if not api_caller.send_to_apis("POST", "/reload")[0]:
# LOGGER.error(f"Error while reloading instance {bw_instance.endpoint}")
# ret = SCHEDULER.db.update_instance(db_instance["hostname"], "loading")
# if ret:
# LOGGER.error(f"instances_healthcheck - Couldn't update instance {bw_instance.endpoint} status to loading in the database: {ret}")
# continue
# LOGGER.info(f"Successfully reloaded instance {bw_instance.endpoint}")
# elif resp["state"] == "down":
# LOGGER.warning(f"instances_healthcheck - Instance {bw_instance.endpoint} is down")
# ret = SCHEDULER.db.update_instance(db_instance["hostname"], "down")
# if ret:
# LOGGER.error(f"instances_healthcheck - Couldn't update instance {bw_instance.endpoint} status to down in the database: {ret}")
# if bw_instance in SCHEDULER.apis:
# SCHEDULER.apis.remove(bw_instance)
# continue
ret = SCHEDULER.db.update_instance(db_instance["hostname"], "up")
if ret:
LOGGER.error(f"instances_healthcheck - Couldn't update instance {bw_instance.endpoint} status to down in the database: {ret}")
if bw_instance not in SCHEDULER.apis:
SCHEDULER.apis.append(bw_instance)
@ -511,8 +526,6 @@ if __name__ == "__main__":
run_in_slave_mode()
stop(1)
schedule_every(HEALTHCHECK_INTERVAL).seconds.do(healthcheck_job)
db_metadata = SCHEDULER.db.get_metadata()
APPLYING_CHANGES.set()
@ -888,6 +901,7 @@ if __name__ == "__main__":
HEALTHY_PATH.write_text(datetime.now().isoformat(), encoding="utf-8")
APPLYING_CHANGES.clear()
schedule_every(HEALTHCHECK_INTERVAL).seconds.do(healthcheck_job)
# infinite schedule for the jobs
LOGGER.info("Executing job scheduler ...")