ultralytics 8.4.11 Platform model uploads retries (#23538)

Signed-off-by: Glenn Jocher <glenn.jocher@ultralytics.com>
Co-authored-by: UltralyticsAssistant <web@ultralytics.com>
This commit is contained in:
Glenn Jocher 2026-02-03 12:54:14 +00:00 committed by GitHub
parent 07a2e19ebc
commit c6fcc51d2c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 187 additions and 39 deletions

View file

@ -0,0 +1,20 @@
---
description: Explore the Ultralytics upload utilities with retry logic, progress bars, and signed URL support for cloud storage.
keywords: Ultralytics, upload utilities, file upload, retry logic, progress bar, cloud storage, GCS, YOLO, machine learning
---
# Reference for `ultralytics/utils/uploads.py`
!!! success "Improvements"
This page is sourced from [https://github.com/ultralytics/ultralytics/blob/main/ultralytics/utils/uploads.py](https://github.com/ultralytics/ultralytics/blob/main/ultralytics/utils/uploads.py). Have an improvement or example to add? Open a [Pull Request](https://docs.ultralytics.com/help/contributing/) — thank you! 🙏
<br>
## ::: ultralytics.utils.uploads._ProgressReader
<br><br><hr><br>
## ::: ultralytics.utils.uploads.safe_upload
<br><br>

View file

@ -824,6 +824,7 @@ nav:
- tqdm: reference/utils/tqdm.md
- triton: reference/utils/triton.md
- tuner: reference/utils/tuner.md
- uploads: reference/utils/uploads.md
- Help:
- Help: help/index.md

View file

@ -1,6 +1,6 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
__version__ = "8.4.10"
__version__ = "8.4.11"
import importlib
import os

View file

@ -9,7 +9,7 @@ from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from time import time
from ultralytics.utils import ENVIRONMENT, GIT, LOGGER, PYTHON_VERSION, RANK, SETTINGS, TESTS_RUNNING, colorstr
from ultralytics.utils import ENVIRONMENT, GIT, LOGGER, PYTHON_VERSION, RANK, SETTINGS, TESTS_RUNNING, Retry, colorstr
PREFIX = colorstr("Platform: ")
@ -148,22 +148,30 @@ def _interp_plot(plot, n=101):
return result
def _send(event, data, project, name, model_id=None):
"""Send event to Platform endpoint. Returns response JSON on success."""
try:
payload = {"event": event, "project": project, "name": name, "data": data}
if model_id:
payload["modelId"] = model_id
def _send(event, data, project, name, model_id=None, retry=2):
"""Send event to Platform endpoint with retry logic."""
payload = {"event": event, "project": project, "name": name, "data": data}
if model_id:
payload["modelId"] = model_id
@Retry(times=retry, delay=1)
def post():
r = requests.post(
f"{PLATFORM_API_URL}/training/metrics",
json=payload,
headers={"Authorization": f"Bearer {_api_key}"},
timeout=10,
timeout=30,
)
if 400 <= r.status_code < 500 and r.status_code not in {408, 429}:
LOGGER.warning(f"{PREFIX}Failed to send {event}: {r.status_code} {r.reason}")
return None # Don't retry client errors (except 408 timeout, 429 rate limit)
r.raise_for_status()
return r.json()
try:
return post()
except Exception as e:
LOGGER.debug(f"Platform: Failed to send {event}: {e}")
LOGGER.debug(f"{PREFIX}Failed to send {event}: {e}")
return None
@ -172,40 +180,38 @@ def _send_async(event, data, project, name, model_id=None):
_executor.submit(_send, event, data, project, name, model_id)
def _upload_model(model_path, project, name):
def _upload_model(model_path, project, name, progress=False, retry=1):
"""Upload model checkpoint to Platform via signed URL."""
try:
model_path = Path(model_path)
if not model_path.exists():
return None
from ultralytics.utils.uploads import safe_upload
# Get signed upload URL
response = requests.post(
model_path = Path(model_path)
if not model_path.exists():
LOGGER.warning(f"{PREFIX}Model file not found: {model_path}")
return None
# Get signed upload URL from Platform
@Retry(times=3, delay=2)
def get_signed_url():
r = requests.post(
f"{PLATFORM_API_URL}/models/upload",
json={"project": project, "name": name, "filename": model_path.name},
headers={"Authorization": f"Bearer {_api_key}"},
timeout=10,
timeout=30,
)
response.raise_for_status()
data = response.json()
# Upload to GCS
with open(model_path, "rb") as f:
requests.put(
data["uploadUrl"],
data=f,
headers={"Content-Type": "application/octet-stream"},
timeout=600, # 10 min timeout for large models
).raise_for_status()
# url = f"{PLATFORM_URL}/{project}/{name}"
# LOGGER.info(f"{PREFIX}Model uploaded to {url}")
return data.get("gcsPath")
r.raise_for_status()
return r.json()
try:
data = get_signed_url()
except Exception as e:
LOGGER.debug(f"Platform: Failed to upload model: {e}")
LOGGER.warning(f"{PREFIX}Failed to get upload URL: {e}")
return None
# Upload to GCS using safe_upload with retry logic and optional progress bar
if safe_upload(file=model_path, url=data["uploadUrl"], retry=retry, progress=progress):
return data.get("gcsPath")
return None
def _upload_model_async(model_path, project, name):
"""Upload model asynchronously using bounded thread pool."""
@ -306,7 +312,7 @@ def on_pretrain_routine_start(trainer):
# Note: model_info is sent later in on_fit_epoch_end (epoch 0) when the model is actually loaded
train_args = {k: str(v) for k, v in vars(trainer.args).items()}
# Send synchronously to get modelId for subsequent webhooks
# Send synchronously to get modelId for subsequent webhooks (critical, more retries)
response = _send(
"training_started",
{
@ -317,9 +323,12 @@ def on_pretrain_routine_start(trainer):
},
project,
name,
retry=4,
)
if response and response.get("modelId"):
trainer._platform_model_id = response["modelId"]
else:
LOGGER.warning(f"{PREFIX}Failed to register training session - metrics may not sync to Platform")
def on_fit_epoch_end(trainer):
@ -404,12 +413,14 @@ def on_train_end(trainer):
trainer._platform_console_logger.stop_capture()
trainer._platform_console_logger = None
# Upload best model (blocking to ensure it completes)
model_path = None
# Upload best model (blocking with progress bar to ensure it completes)
gcs_path = None
model_size = None
if trainer.best and Path(trainer.best).exists():
model_size = Path(trainer.best).stat().st_size
model_path = _upload_model(trainer.best, project, name)
gcs_path = _upload_model(trainer.best, project, name, progress=True, retry=3)
if not gcs_path:
LOGGER.warning(f"{PREFIX}Model will not be available for download on Platform (upload failed)")
# Collect plots from trainer and validator, deduplicating by type
plots_by_type = {}
@ -432,7 +443,7 @@ def on_train_end(trainer):
"metrics": {**trainer.metrics, "fitness": trainer.fitness},
"bestEpoch": getattr(trainer, "best_epoch", trainer.epoch),
"bestFitness": trainer.best_fitness,
"modelPath": model_path or (str(trainer.best) if trainer.best else None),
"modelPath": gcs_path, # Only send GCS path, not local path
"modelSize": model_size,
},
"classNames": class_names,
@ -441,6 +452,7 @@ def on_train_end(trainer):
project,
name,
getattr(trainer, "_platform_model_id", None),
retry=4, # Critical, more retries
)
url = f"{PLATFORM_URL}/{project}/{name}"
LOGGER.info(f"{PREFIX}View results at {url}")

View file

@ -0,0 +1,115 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
"""Upload utilities for Ultralytics, mirroring downloads.py patterns."""
from __future__ import annotations
import os
from pathlib import Path
from time import sleep
from ultralytics.utils import LOGGER, TQDM
class _ProgressReader:
"""File wrapper that reports read progress for upload monitoring."""
def __init__(self, file_path, pbar):
self.file = open(file_path, "rb")
self.pbar = pbar
self._size = os.path.getsize(file_path)
def read(self, size=-1):
"""Read data and update progress bar."""
data = self.file.read(size)
if data and self.pbar:
self.pbar.update(len(data))
return data
def __len__(self):
"""Return file size for Content-Length header."""
return self._size
def close(self):
"""Close the file."""
self.file.close()
def safe_upload(
file: str | Path,
url: str,
headers: dict | None = None,
retry: int = 2,
timeout: int = 600,
progress: bool = False,
) -> bool:
"""Upload a file to a URL with retry logic and optional progress bar.
Args:
file (str | Path): Path to the file to upload.
url (str): The URL endpoint to upload the file to (e.g., signed GCS URL).
headers (dict, optional): Additional headers to include in the request.
retry (int, optional): Number of retry attempts on failure (default: 2 for 3 total attempts).
timeout (int, optional): Request timeout in seconds.
progress (bool, optional): Whether to display a progress bar during upload.
Returns:
(bool): True if upload succeeded, False otherwise.
Examples:
>>> from ultralytics.utils.uploads import safe_upload
>>> success = safe_upload("model.pt", "https://storage.googleapis.com/...", progress=True)
"""
import requests
file = Path(file)
if not file.exists():
raise FileNotFoundError(f"File not found: {file}")
file_size = file.stat().st_size
desc = f"Uploading {file.name}"
# Prepare headers (Content-Length set automatically from file size)
upload_headers = {"Content-Type": "application/octet-stream"}
if headers:
upload_headers.update(headers)
last_error = None
for attempt in range(retry + 1):
pbar = None
reader = None
try:
if progress:
pbar = TQDM(total=file_size, desc=desc, unit="B", unit_scale=True, unit_divisor=1024)
reader = _ProgressReader(file, pbar)
r = requests.put(url, data=reader, headers=upload_headers, timeout=timeout)
r.raise_for_status()
reader.close()
reader = None # Prevent double-close in finally
if pbar:
pbar.close()
pbar = None
LOGGER.info(f"Uploaded {file.name}")
return True
except requests.exceptions.HTTPError as e:
status = e.response.status_code if e.response is not None else 0
if 400 <= status < 500 and status not in {408, 429}:
LOGGER.warning(f"{desc} failed: {status} {getattr(e.response, 'reason', '')}")
return False
last_error = f"HTTP {status}"
except Exception as e:
last_error = str(e)
finally:
if reader:
reader.close()
if pbar:
pbar.close()
if attempt < retry:
wait_time = 2 ** (attempt + 1)
LOGGER.warning(f"{desc} failed ({last_error}), retrying {attempt + 1}/{retry} in {wait_time}s...")
sleep(wait_time)
LOGGER.warning(f"{desc} failed after {retry + 1} attempts: {last_error}")
return False