chore: Add HEALTHCHECK_LOGGER and fix shenanigans with instances in scheduler

This commit is contained in:
Théophile Diot 2024-09-06 16:05:38 +02:00
parent 2e5fe99dd8
commit 3e56a2c522
No known key found for this signature in database
GPG key ID: FA995104A0BA376A

View file

@ -91,6 +91,7 @@ if not HEALTHCHECK_INTERVAL.isdigit():
HEALTHCHECK_INTERVAL = int(HEALTHCHECK_INTERVAL)
HEALTHCHECK_EVENT = Event()
HEALTHCHECK_LOGGER = setup_logger("Scheduler.Healthcheck", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
SLAVE_MODE = getenv("SLAVE_MODE", "no") == "yes"
MASTER_MODE = getenv("MASTER_MODE", "no") == "yes"
@ -425,7 +426,7 @@ def run_in_slave_mode(): # TODO: Refactor this feature
def healthcheck_job():
if HEALTHCHECK_EVENT.is_set():
LOGGER.warning("Healthcheck job is already running, skipping execution ...")
HEALTHCHECK_LOGGER.warning("Healthcheck job is already running, skipping execution ...")
return
try:
@ -436,33 +437,37 @@ def healthcheck_job():
HEALTHCHECK_EVENT.set()
for db_instance in SCHEDULER.db.get_instances():
bw_instance = API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"])
try:
bw_instance = API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"])
sent, err, status, resp = bw_instance.request("GET", "ping")
# sent, err, status, resp = bw_instance.request("GET", "health") # TODO: use health instead when the healthcheck endpoint is ready @fl0ppy-d1sk
success = True
if not sent:
LOGGER.warning(
f"instances_healthcheck - Can't send API request to {bw_instance.endpoint}health : {err}, healthcheck will be retried in {HEALTHCHECK_INTERVAL} seconds ..."
HEALTHCHECK_LOGGER.warning(
f"Can't send API request to {bw_instance.endpoint}health : {err}, healthcheck will be retried in {HEALTHCHECK_INTERVAL} seconds ..."
)
success = False
elif status != 200:
LOGGER.warning(
f"instances_healthcheck - Error while sending API request to {bw_instance.endpoint}health : status = {resp['status']}, msg = {resp['msg']}, healthcheck will be retried in {HEALTHCHECK_INTERVAL} seconds ..."
HEALTHCHECK_LOGGER.warning(
f"Error while sending API request to {bw_instance.endpoint}health : status = {resp['status']}, msg = {resp['msg']}, healthcheck will be retried in {HEALTHCHECK_INTERVAL} seconds ..."
)
success = False
if not success:
ret = SCHEDULER.db.update_instance(db_instance["hostname"], "down")
if ret:
LOGGER.error(f"instances_healthcheck - Couldn't update instance {bw_instance.endpoint} status to down in the database: {ret}")
if bw_instance in SCHEDULER.apis:
SCHEDULER.apis.remove(bw_instance)
HEALTHCHECK_LOGGER.error(f"Couldn't update instance {bw_instance.endpoint} status to down in the database: {ret}")
for i, api in enumerate(SCHEDULER.apis):
if api.endpoint == bw_instance.endpoint:
HEALTHCHECK_LOGGER.debug(f"Removing {bw_instance.endpoint} from the list of reachable instances")
del SCHEDULER.apis[i]
break
continue
# if resp["state"] == "loading": # TODO: uncomment when the healthcheck endpoint is ready @fl0ppy-d1sk
# LOGGER.info(f"instances_healthcheck - Instance {bw_instance.endpoint} is loading, sending config ...")
# HEALTHCHECK_LOGGER.info(f"Instance {bw_instance.endpoint} is loading, sending config ...")
# api_caller = ApiCaller([bw_instance])
# api_caller.send_files(CUSTOM_CONFIGS_PATH, "/custom_configs")
# api_caller.send_files(EXTERNAL_PLUGINS_PATH, "/plugins")
@ -470,31 +475,43 @@ def healthcheck_job():
# api_caller.send_files(join(sep, "etc", "nginx"), "/confs")
# api_caller.send_files(CACHE_PATH, "/cache")
# if not api_caller.send_to_apis("POST", "/reload")[0]:
# LOGGER.error(f"Error while reloading instance {bw_instance.endpoint}")
# HEALTHCHECK_LOGGER.error(f"Error while reloading instance {bw_instance.endpoint}")
# ret = SCHEDULER.db.update_instance(db_instance["hostname"], "loading")
# if ret:
# LOGGER.error(f"instances_healthcheck - Couldn't update instance {bw_instance.endpoint} status to loading in the database: {ret}")
# HEALTHCHECK_LOGGER.error(f"Couldn't update instance {bw_instance.endpoint} status to loading in the database: {ret}")
# continue
# LOGGER.info(f"Successfully reloaded instance {bw_instance.endpoint}")
# HEALTHCHECK_LOGGER.info(f"Successfully reloaded instance {bw_instance.endpoint}")
# elif resp["state"] == "down":
# LOGGER.warning(f"instances_healthcheck - Instance {bw_instance.endpoint} is down")
# HEALTHCHECK_LOGGER.warning(f"Instance {bw_instance.endpoint} is down")
# ret = SCHEDULER.db.update_instance(db_instance["hostname"], "down")
# if ret:
# LOGGER.error(f"instances_healthcheck - Couldn't update instance {bw_instance.endpoint} status to down in the database: {ret}")
# if bw_instance in SCHEDULER.apis:
# SCHEDULER.apis.remove(bw_instance)
# HEALTHCHECK_LOGGER.error(f"Couldn't update instance {bw_instance.endpoint} status to down in the database: {ret}")
# for i, api in enumerate(SCHEDULER.apis):
# if api.endpoint == bw_instance.endpoint:
# HEALTHCHECK_LOGGER.debug(f"Removing {bw_instance.endpoint} from the list of reachable instances")
# del SCHEDULER.apis[i]
# break
# continue
ret = SCHEDULER.db.update_instance(db_instance["hostname"], "up")
if ret:
LOGGER.error(f"instances_healthcheck - Couldn't update instance {bw_instance.endpoint} status to down in the database: {ret}")
HEALTHCHECK_LOGGER.error(f"Couldn't update instance {bw_instance.endpoint} status to down in the database: {ret}")
if bw_instance not in SCHEDULER.apis:
found = False
for api in SCHEDULER.apis:
if api.endpoint == bw_instance.endpoint:
found = True
break
if not found:
HEALTHCHECK_LOGGER.debug(f"Adding {bw_instance.endpoint} to the list of reachable instances")
SCHEDULER.apis.append(bw_instance)
except BaseException:
LOGGER.exception(f"instances_healthcheck - Exception while checking instance {bw_instance.endpoint}")
if bw_instance in SCHEDULER.apis:
SCHEDULER.apis.remove(bw_instance)
except BaseException as e:
HEALTHCHECK_LOGGER.error(f"Exception while checking instance {bw_instance.endpoint}: {e}")
for i, api in enumerate(SCHEDULER.apis):
if api.endpoint == bw_instance.endpoint:
HEALTHCHECK_LOGGER.debug(f"Removing {bw_instance.endpoint} from the list of reachable instances")
del SCHEDULER.apis[i]
break
HEALTHCHECK_EVENT.clear()
@ -821,8 +838,22 @@ if __name__ == "__main__":
ret = SCHEDULER.db.update_instance(db_instance["hostname"], "up" if status == "success" else "down")
if ret:
LOGGER.error(f"Couldn't update instance {db_instance['hostname']} status to down in the database: {ret}")
if db_instance["hostname"] in SCHEDULER.apis:
SCHEDULER.apis.remove(db_instance["hostname"])
if status == "success":
found = False
for api in SCHEDULER.apis:
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
found = True
break
if not found:
LOGGER.debug(f"Adding {db_instance['hostname']}:{db_instance['port']} to the list of reachable instances")
SCHEDULER.apis.append(API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"]))
continue
for i, api in enumerate(SCHEDULER.apis):
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
LOGGER.debug(f"Removing {db_instance['hostname']}:{db_instance['port']} from the list of reachable instances")
del SCHEDULER.apis[i]
break
else:
for thread in threads:
thread.join()
@ -1046,7 +1077,29 @@ if __name__ == "__main__":
if CONFIG_NEED_GENERATION:
CHANGES.append("config")
old_env = env.copy()
env = SCHEDULER.db.get_config()
if old_env["API_HTTP_PORT"] != env["API_HTTP_PORT"] or old_env["API_SERVER_NAME"] != env["API_SERVER_NAME"]:
err = SCHEDULER.db.update_instances(
[
{
"hostname": db_instance["hostname"],
"name": db_instance["name"],
"env": {
"API_HTTP_PORT": env["API_HTTP_PORT"],
"API_SERVER_NAME": env["API_SERVER_NAME"],
},
"type": db_instance["type"],
"status": db_instance["status"],
"method": db_instance["method"],
"last_seen": db_instance["last_seen"],
}
for db_instance in SCHEDULER.db.get_instances()
],
method="scheduler",
)
if err:
LOGGER.error(f"Couldn't update instances in the database: {err}")
env["DATABASE_URI"] = SCHEDULER.db.database_uri
tz = getenv("TZ")
if tz: