feat: Update ApiCaller.py to use ThreadPoolExecutor for sending API requests

The `ApiCaller.py` file has been updated to use the `ThreadPoolExecutor` class from the `concurrent.futures` module for sending API requests concurrently. This change improves the performance and responsiveness of the API calls by allowing multiple requests to be sent simultaneously.
This commit is contained in:
Théophile Diot 2024-08-07 13:22:02 +01:00
parent faf4706bf5
commit ffaaf503a0
No known key found for this signature in database
GPG key ID: FA995104A0BA376A

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor, as_completed
from io import BytesIO
from os import getenv, sep
from os.path import join
@ -28,36 +29,44 @@ class ApiCaller:
data: Optional[Dict[str, Any]] = None,
response: bool = False,
) -> Tuple[bool, Tuple[bool, Optional[Dict[str, Any]]]]:
def send_request(api):
if files is not None:
for buffer in files.values():
buffer.seek(0)
sent, err, status, resp = api.request(method, url, files=files, data=data)
return api, sent, err, status, resp
ret = True
url = url if not url.startswith("/") else url[1:]
responses = {}
for api in self.apis:
if files is not None:
for buffer in files.values():
buffer.seek(0, 0)
sent, err, status, resp = api.request(method, url, files=files, data=data)
if not sent:
ret = False
self.__logger.error(
f"Can't send API request to {api.endpoint}{url} : {err}",
)
else:
if status != 200:
ret = False
self.__logger.error(
f"Error while sending API request to {api.endpoint}{url} : status = {resp['status']}, msg = {resp['msg']}",
)
else:
self.__logger.info(
f"Successfully sent API request to {api.endpoint}{url}",
)
if response:
instance = api.endpoint.replace("http://", "").split(":")[0]
if isinstance(resp, dict):
responses[instance] = resp
with ThreadPoolExecutor() as executor:
future_to_api = {executor.submit(send_request, api): api for api in self.apis}
for future in as_completed(future_to_api):
api = future_to_api[future]
try:
api, sent, err, status, resp = future.result()
if not sent:
ret = False
self.__logger.error(f"Can't send API request to {api.endpoint}{url} : {err}")
else:
if status != 200:
ret = False
self.__logger.error(f"Error while sending API request to {api.endpoint}{url} : status = {resp['status']}, msg = {resp['msg']}")
else:
responses[instance] = resp.json()
self.__logger.info(
f"Successfully sent API request to {api.endpoint}{url}",
)
if response:
instance = api.endpoint.replace("http://", "").split(":")[0]
if isinstance(resp, dict):
responses[instance] = resp
else:
responses[instance] = resp.json()
except Exception as exc:
ret = False
self.__logger.error(f"API request generated an exception: {exc}")
if response:
return ret, responses
@ -68,7 +77,7 @@ class ApiCaller:
with BytesIO() as tgz:
with tar_open(mode="w:gz", fileobj=tgz, dereference=True, compresslevel=3) as tf:
tf.add(path, arcname=".")
tgz.seek(0, 0)
tgz.seek(0)
files = {"archive.tar.gz": tgz}
if not self.send_to_apis("POST", url, files=files):
ret = False