mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
feat: Update ApiCaller.py to use ThreadPoolExecutor for sending API requests
The `ApiCaller.py` file has been updated to use the `ThreadPoolExecutor` class from the `concurrent.futures` module for sending API requests concurrently. This change improves the performance and responsiveness of the API calls by allowing multiple requests to be sent simultaneously.
This commit is contained in:
parent
faf4706bf5
commit
ffaaf503a0
1 changed files with 35 additions and 26 deletions
|
|
@ -1,5 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from io import BytesIO
|
||||
from os import getenv, sep
|
||||
from os.path import join
|
||||
|
|
@ -28,36 +29,44 @@ class ApiCaller:
|
|||
data: Optional[Dict[str, Any]] = None,
|
||||
response: bool = False,
|
||||
) -> Tuple[bool, Tuple[bool, Optional[Dict[str, Any]]]]:
|
||||
def send_request(api):
|
||||
if files is not None:
|
||||
for buffer in files.values():
|
||||
buffer.seek(0)
|
||||
sent, err, status, resp = api.request(method, url, files=files, data=data)
|
||||
return api, sent, err, status, resp
|
||||
|
||||
ret = True
|
||||
url = url if not url.startswith("/") else url[1:]
|
||||
responses = {}
|
||||
for api in self.apis:
|
||||
if files is not None:
|
||||
for buffer in files.values():
|
||||
buffer.seek(0, 0)
|
||||
sent, err, status, resp = api.request(method, url, files=files, data=data)
|
||||
if not sent:
|
||||
ret = False
|
||||
self.__logger.error(
|
||||
f"Can't send API request to {api.endpoint}{url} : {err}",
|
||||
)
|
||||
else:
|
||||
if status != 200:
|
||||
ret = False
|
||||
self.__logger.error(
|
||||
f"Error while sending API request to {api.endpoint}{url} : status = {resp['status']}, msg = {resp['msg']}",
|
||||
)
|
||||
else:
|
||||
self.__logger.info(
|
||||
f"Successfully sent API request to {api.endpoint}{url}",
|
||||
)
|
||||
|
||||
if response:
|
||||
instance = api.endpoint.replace("http://", "").split(":")[0]
|
||||
if isinstance(resp, dict):
|
||||
responses[instance] = resp
|
||||
with ThreadPoolExecutor() as executor:
|
||||
future_to_api = {executor.submit(send_request, api): api for api in self.apis}
|
||||
for future in as_completed(future_to_api):
|
||||
api = future_to_api[future]
|
||||
try:
|
||||
api, sent, err, status, resp = future.result()
|
||||
if not sent:
|
||||
ret = False
|
||||
self.__logger.error(f"Can't send API request to {api.endpoint}{url} : {err}")
|
||||
else:
|
||||
if status != 200:
|
||||
ret = False
|
||||
self.__logger.error(f"Error while sending API request to {api.endpoint}{url} : status = {resp['status']}, msg = {resp['msg']}")
|
||||
else:
|
||||
responses[instance] = resp.json()
|
||||
self.__logger.info(
|
||||
f"Successfully sent API request to {api.endpoint}{url}",
|
||||
)
|
||||
|
||||
if response:
|
||||
instance = api.endpoint.replace("http://", "").split(":")[0]
|
||||
if isinstance(resp, dict):
|
||||
responses[instance] = resp
|
||||
else:
|
||||
responses[instance] = resp.json()
|
||||
except Exception as exc:
|
||||
ret = False
|
||||
self.__logger.error(f"API request generated an exception: {exc}")
|
||||
|
||||
if response:
|
||||
return ret, responses
|
||||
|
|
@ -68,7 +77,7 @@ class ApiCaller:
|
|||
with BytesIO() as tgz:
|
||||
with tar_open(mode="w:gz", fileobj=tgz, dereference=True, compresslevel=3) as tf:
|
||||
tf.add(path, arcname=".")
|
||||
tgz.seek(0, 0)
|
||||
tgz.seek(0)
|
||||
files = {"archive.tar.gz": tgz}
|
||||
if not self.send_to_apis("POST", url, files=files):
|
||||
ret = False
|
||||
|
|
|
|||
Loading…
Reference in a new issue