diff --git a/src/common/utils/ApiCaller.py b/src/common/utils/ApiCaller.py index 6708227f7..7158f7e4a 100644 --- a/src/common/utils/ApiCaller.py +++ b/src/common/utils/ApiCaller.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +from concurrent.futures import ThreadPoolExecutor, as_completed from io import BytesIO from os import getenv, sep from os.path import join @@ -28,36 +29,44 @@ class ApiCaller: data: Optional[Dict[str, Any]] = None, response: bool = False, ) -> Tuple[bool, Tuple[bool, Optional[Dict[str, Any]]]]: + def send_request(api): + if files is not None: + for buffer in files.values(): + buffer.seek(0) + sent, err, status, resp = api.request(method, url, files=files, data=data) + return api, sent, err, status, resp + ret = True url = url if not url.startswith("/") else url[1:] responses = {} - for api in self.apis: - if files is not None: - for buffer in files.values(): - buffer.seek(0, 0) - sent, err, status, resp = api.request(method, url, files=files, data=data) - if not sent: - ret = False - self.__logger.error( - f"Can't send API request to {api.endpoint}{url} : {err}", - ) - else: - if status != 200: - ret = False - self.__logger.error( - f"Error while sending API request to {api.endpoint}{url} : status = {resp['status']}, msg = {resp['msg']}", - ) - else: - self.__logger.info( - f"Successfully sent API request to {api.endpoint}{url}", - ) - if response: - instance = api.endpoint.replace("http://", "").split(":")[0] - if isinstance(resp, dict): - responses[instance] = resp + with ThreadPoolExecutor() as executor: + future_to_api = {executor.submit(send_request, api): api for api in self.apis} + for future in as_completed(future_to_api): + api = future_to_api[future] + try: + api, sent, err, status, resp = future.result() + if not sent: + ret = False + self.__logger.error(f"Can't send API request to {api.endpoint}{url} : {err}") + else: + if status != 200: + ret = False + self.__logger.error(f"Error while sending API request to {api.endpoint}{url} : status = {resp['status']}, msg = {resp['msg']}") else: - responses[instance] = resp.json() + self.__logger.info( + f"Successfully sent API request to {api.endpoint}{url}", + ) + + if response: + instance = api.endpoint.replace("http://", "").split(":")[0] + if isinstance(resp, dict): + responses[instance] = resp + else: + responses[instance] = resp.json() + except Exception as exc: + ret = False + self.__logger.error(f"API request generated an exception: {exc}") if response: return ret, responses @@ -68,7 +77,7 @@ class ApiCaller: with BytesIO() as tgz: with tar_open(mode="w:gz", fileobj=tgz, dereference=True, compresslevel=3) as tf: tf.add(path, arcname=".") - tgz.seek(0, 0) + tgz.seek(0) files = {"archive.tar.gz": tgz} if not self.send_to_apis("POST", url, files=files): ret = False