feat: robust library sync with adaptive watchdog, resume-on-failure &… (#22)

* feat: robust library sync with adaptive watchdog, resume-on-failure & parallel pre-warming

* update copy
This commit is contained in:
Harvey 2026-04-05 15:36:42 +01:00 committed by GitHub
parent e76ed59b44
commit e84f2d6127
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 819 additions and 136 deletions

View file

@ -103,8 +103,19 @@ backend-test-local-files-fallback: $(BACKEND_VENV_STAMP) ## Run local files stal
backend-test-jellyfin-proxy: $(BACKEND_VENV_STAMP) ## Run Jellyfin stream proxy tests
cd "$(BACKEND_DIR)" && .venv/bin/python -m pytest tests/routes/test_stream_routes.py -v
backend-test-sync-watchdog: $(BACKEND_VENV_STAMP) ## Run adaptive watchdog timeout tests
cd "$(BACKEND_DIR)" && .venv/bin/python -m pytest tests/test_sync_watchdog.py -v
backend-test-sync-resume: $(BACKEND_VENV_STAMP) ## Run sync resume-on-failure tests
cd "$(BACKEND_DIR)" && .venv/bin/python -m pytest tests/test_sync_resume.py -v
backend-test-audiodb-parallel: $(BACKEND_VENV_STAMP) ## Run AudioDB parallel prewarm tests
cd "$(BACKEND_DIR)" && .venv/bin/python -m pytest tests/test_audiodb_parallel.py -v
test-audiodb-all: backend-test-audiodb backend-test-audiodb-prewarm backend-test-audiodb-settings backend-test-coverart-audiodb backend-test-audiodb-phase8 backend-test-audiodb-phase9 frontend-test-audiodb-images ## Run every AudioDB test target
test-sync-all: backend-test-sync-watchdog backend-test-sync-resume backend-test-audiodb-parallel ## Run all sync robustness tests
frontend-install: ## Install frontend npm dependencies
cd "$(FRONTEND_DIR)" && $(NPM) install

View file

@ -1,6 +1,6 @@
import asyncio
import logging
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, Query
from api.v1.schemas.library import (
LibraryResponse,
LibraryArtistsResponse,
@ -90,10 +90,11 @@ async def get_recently_added(
@router.post("/sync", response_model=SyncLibraryResponse)
async def sync_library(
force_full: bool = Query(default=False, description="Clear resume checkpoint and start a full sync from scratch"),
library_service: LibraryService = Depends(get_library_service)
):
try:
return await library_service.sync_library(is_manual=True)
return await library_service.sync_library(is_manual=True, force_full=force_full)
except ExternalServiceError as e:
if "cooldown" in str(e).lower():
raise HTTPException(status_code=429, detail="Sync is on cooldown, please wait")

View file

@ -56,13 +56,14 @@ class AdvancedSettings(AppStruct):
http_timeout: int = 10
http_connect_timeout: int = 5
http_max_connections: int = 200
batch_artist_images: int = 5
batch_albums: int = 3
batch_artist_images: int = 10
batch_albums: int = 8
delay_artist: float = 0.5
delay_albums: float = 1.0
delay_albums: float = 0.3
artist_discovery_warm_interval: int = 14400
artist_discovery_warm_delay: float = 0.5
artist_discovery_precache_delay: float = 0.3
artist_discovery_precache_concurrency: int = 3
memory_cache_max_entries: int = 10000
memory_cache_cleanup_interval: int = 300
cover_memory_cache_max_entries: int = 128
@ -100,6 +101,10 @@ class AdvancedSettings(AppStruct):
cache_ttl_audiodb_not_found: int = 86400
cache_ttl_audiodb_library: int = 1209600
cache_ttl_recently_viewed_bytes: int = 172800
sync_stall_timeout_minutes: int = 10
sync_max_timeout_hours: int = 8
audiodb_prewarm_concurrency: int = 4
audiodb_prewarm_delay: float = 0.3
genre_section_ttl: int = 21600
request_history_retention_days: int = 180
ignored_releases_retention_days: int = 365
@ -149,6 +154,11 @@ class AdvancedSettings(AppStruct):
"recent_covers_max_size_mb": (100, 10000),
"persistent_metadata_ttl_hours": (1, 168),
"musicbrainz_concurrent_searches": (2, 10),
"artist_discovery_precache_concurrency": (1, 8),
"sync_stall_timeout_minutes": (2, 30),
"sync_max_timeout_hours": (1, 48),
"audiodb_prewarm_concurrency": (1, 8),
"audiodb_prewarm_delay": (0.0, 5.0),
"discover_queue_size": (1, 20),
"discover_queue_ttl": (3600, 604800),
"discover_queue_polling_interval": (1000, 30000),
@ -219,10 +229,10 @@ class AdvancedSettingsFrontend(AppStruct):
http_timeout: int = 10
http_connect_timeout: int = 5
http_max_connections: int = 200
batch_artist_images: int = 5
batch_albums: int = 3
batch_artist_images: int = 10
batch_albums: int = 8
delay_artist: float = 0.5
delay_albums: float = 1.0
delay_albums: float = 0.3
artist_discovery_warm_interval: int = 240
artist_discovery_warm_delay: float = 0.5
artist_discovery_precache_delay: float = 0.3
@ -268,6 +278,11 @@ class AdvancedSettingsFrontend(AppStruct):
ignored_releases_retention_days: int = 365
orphan_cover_demote_interval_hours: int = 24
store_prune_interval_hours: int = 6
sync_stall_timeout_minutes: int = 10
sync_max_timeout_hours: int = 8
audiodb_prewarm_concurrency: int = 4
audiodb_prewarm_delay: float = 0.3
artist_discovery_precache_concurrency: int = 3
def __post_init__(self) -> None:
int_coerce_fields = [
@ -371,6 +386,11 @@ class AdvancedSettingsFrontend(AppStruct):
"ignored_releases_retention_days": (30, 3650),
"orphan_cover_demote_interval_hours": (1, 168),
"store_prune_interval_hours": (1, 168),
"sync_stall_timeout_minutes": (2, 30),
"sync_max_timeout_hours": (1, 48),
"audiodb_prewarm_concurrency": (1, 8),
"audiodb_prewarm_delay": (0.0, 5.0),
"artist_discovery_precache_concurrency": (1, 8),
}
for field_name, (minimum, maximum) in ranges.items():
_validate_range(getattr(self, field_name), field_name, minimum, maximum)
@ -450,6 +470,11 @@ class AdvancedSettingsFrontend(AppStruct):
ignored_releases_retention_days=settings.ignored_releases_retention_days,
orphan_cover_demote_interval_hours=settings.orphan_cover_demote_interval_hours,
store_prune_interval_hours=settings.store_prune_interval_hours,
sync_stall_timeout_minutes=settings.sync_stall_timeout_minutes,
sync_max_timeout_hours=settings.sync_max_timeout_hours,
audiodb_prewarm_concurrency=settings.audiodb_prewarm_concurrency,
audiodb_prewarm_delay=settings.audiodb_prewarm_delay,
artist_discovery_precache_concurrency=settings.artist_discovery_precache_concurrency,
)
def to_backend(self) -> AdvancedSettings:
@ -526,4 +551,9 @@ class AdvancedSettingsFrontend(AppStruct):
ignored_releases_retention_days=self.ignored_releases_retention_days,
orphan_cover_demote_interval_hours=self.orphan_cover_demote_interval_hours,
store_prune_interval_hours=self.store_prune_interval_hours,
sync_stall_timeout_minutes=self.sync_stall_timeout_minutes,
sync_max_timeout_hours=self.sync_max_timeout_hours,
audiodb_prewarm_concurrency=self.audiodb_prewarm_concurrency,
audiodb_prewarm_delay=self.audiodb_prewarm_delay,
artist_discovery_precache_concurrency=self.artist_discovery_precache_concurrency,
)

View file

@ -1,4 +1,4 @@
"""Domain 1 Library state persistence (artists, albums, metadata)."""
"""Domain 1: Library state persistence (artists, albums, metadata)."""
import json
import logging
@ -27,7 +27,6 @@ def _escape_like(term: str) -> str:
_CROSS_DOMAIN_CLEAR_TABLES = (
"artist_genres",
"artist_genre_lookup",
"processed_items",
)
_FULL_CLEAR_EXTRA_TABLES = (

View file

@ -1,4 +1,4 @@
"""Domain 5 Sync lifecycle persistence."""
"""Domain 5: Sync lifecycle persistence."""
import logging
import sqlite3
@ -87,13 +87,15 @@ class SyncStateStore(PersistenceBase):
return await self._read(operation)
async def mark_items_processed_batch(self, item_type: str, mbids: list[str]) -> None:
normalized = [mbid for mbid in mbids if isinstance(mbid, str) and mbid]
normalized = [(item_type, _normalize(mbid), mbid) for mbid in mbids if isinstance(mbid, str) and mbid]
def operation(conn: sqlite3.Connection) -> None:
for mbid in normalized:
conn.execute(
"INSERT OR REPLACE INTO processed_items (item_type, mbid_lower, mbid) VALUES (?, ?, ?)",
(item_type, _normalize(mbid), mbid),
)
conn.executemany(
"INSERT OR REPLACE INTO processed_items (item_type, mbid_lower, mbid) VALUES (?, ?, ?)",
normalized,
)
await self._write(operation)
async def clear_processed_items(self) -> None:
await self._write(lambda conn: conn.execute("DELETE FROM processed_items"))

View file

@ -422,58 +422,88 @@ class ArtistDiscoveryService:
cached_count = 0
source_fetches = 0
for i, mbid in enumerate(artist_mbids):
advanced = self._preferences_service.get_advanced_settings() if self._preferences_service else None
discovery_concurrency = getattr(advanced, 'artist_discovery_precache_concurrency', 3) if advanced else 3
sem = asyncio.Semaphore(discovery_concurrency)
counter_lock = asyncio.Lock()
progress_counter = 0
async def process_artist(idx: int, mbid: str) -> bool:
nonlocal cached_count, source_fetches, progress_counter
try:
for source in sources:
similar_key = self._build_cache_key(
"similar", mbid, DEFAULT_SIMILAR_COUNT, source
)
songs_key = self._build_cache_key(
"top_songs", mbid, DEFAULT_TOP_SONGS_COUNT, source
)
albums_key = self._build_cache_key(
"top_albums", mbid, DEFAULT_TOP_ALBUMS_COUNT, source
)
async with sem:
for source in sources:
similar_key = self._build_cache_key(
"similar", mbid, DEFAULT_SIMILAR_COUNT, source
)
songs_key = self._build_cache_key(
"top_songs", mbid, DEFAULT_TOP_SONGS_COUNT, source
)
albums_key = self._build_cache_key(
"top_albums", mbid, DEFAULT_TOP_ALBUMS_COUNT, source
)
has_all = (
await self._cache.get(similar_key) is not None
and await self._cache.get(songs_key) is not None
and await self._cache.get(albums_key) is not None
)
if has_all:
continue
has_all = (
await self._cache.get(similar_key) is not None
and await self._cache.get(songs_key) is not None
and await self._cache.get(albums_key) is not None
)
if has_all:
continue
results = await asyncio.gather(
self.get_similar_artists(
mbid, count=DEFAULT_SIMILAR_COUNT, source=source
),
self.get_top_songs(
mbid, count=DEFAULT_TOP_SONGS_COUNT, source=source
),
self.get_top_albums(
mbid, count=DEFAULT_TOP_ALBUMS_COUNT, source=source
),
return_exceptions=True,
)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.debug("Discovery precache errors for %s: %s", mbid[:8], errors)
source_fetches += 1
results = await asyncio.gather(
self.get_similar_artists(
mbid, count=DEFAULT_SIMILAR_COUNT, source=source
),
self.get_top_songs(
mbid, count=DEFAULT_TOP_SONGS_COUNT, source=source
),
self.get_top_albums(
mbid, count=DEFAULT_TOP_ALBUMS_COUNT, source=source
),
return_exceptions=True,
)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.debug("Discovery precache errors for %s: %s", mbid[:8], errors)
async with counter_lock:
source_fetches += 1
cached_count += 1
if delay > 0:
await asyncio.sleep(delay)
async with counter_lock:
cached_count += 1
progress_counter += 1
local_progress = progress_counter
except Exception as e: # noqa: BLE001
logger.warning("Failed to precache discovery for %s: %s", mbid[:8], e)
finally:
if status_service:
artist_name = (mbid_to_name or {}).get(mbid, mbid[:8])
await status_service.update_progress(i + 1, current_item=artist_name)
await status_service.update_progress(local_progress, current_item=artist_name)
if (i + 1) % 10 == 0:
logger.info("Discovery precache progress: %d/%d artists", i + 1, len(artist_mbids))
if local_progress % 10 == 0:
logger.info("Discovery precache progress: %d/%d artists", local_progress, len(artist_mbids))
if delay > 0 and i < len(artist_mbids) - 1:
await asyncio.sleep(delay)
return True
except Exception as e: # noqa: BLE001
logger.warning("Failed to precache discovery for %s: %s", mbid[:8], e)
async with counter_lock:
progress_counter += 1
local_progress = progress_counter
if status_service:
artist_name = (mbid_to_name or {}).get(mbid, mbid[:8])
await status_service.update_progress(local_progress, current_item=artist_name)
return False
chunk = max(discovery_concurrency * 4, 20)
for i in range(0, len(artist_mbids), chunk):
if status_service and status_service.is_cancelled():
logger.info("Discovery precache cancelled by user")
break
batch = artist_mbids[i:i + chunk]
batch_tasks = [asyncio.create_task(process_artist(i + j, mbid)) for j, mbid in enumerate(batch)]
if batch_tasks:
await asyncio.gather(*batch_tasks, return_exceptions=True)
logger.info(
"Discovery precache complete: %d/%d artists refreshed (%d source fetches)",

View file

@ -69,6 +69,7 @@ class CacheStatusService:
self._last_persist_time: float = 0.0
self._last_broadcast_time: float = 0.0
self._persist_item_counter: int = 0
self._last_progress_at: float = time.time()
def set_sync_state_store(self, sync_state_store: 'SyncStateStore'):
self._sync_state_store = sync_state_store
@ -120,6 +121,7 @@ class CacheStatusService:
self._last_persist_time = 0.0
self._last_broadcast_time = 0.0
self._persist_item_counter = 0
self._last_progress_at = time.time()
started_at = time.time()
self._progress = CacheSyncProgress(
is_syncing=True,
@ -167,6 +169,7 @@ class CacheStatusService:
self._progress.processed_artists = processed_artists
if processed_albums is not None:
self._progress.processed_albums = processed_albums
self._last_progress_at = time.time()
now = time.time()
is_final = processed >= self._progress.total_items
@ -180,8 +183,9 @@ class CacheStatusService:
self._progress.total_items = total_items
self._progress.processed_items = 0
self._progress.current_item = None
self._last_progress_at = time.time()
if self._sync_state_store:
if self._sync_state_store and self._progress.is_syncing:
try:
await self._sync_state_store.save_sync_state(
status='running',
@ -208,6 +212,9 @@ class CacheStatusService:
logger.info(f"Phase skipped (already cached): {phase}")
await asyncio.sleep(0.5)
def get_last_progress_at(self) -> float:
return self._last_progress_at
_PERSIST_INTERVAL_SECONDS = 5.0
_PERSIST_ITEM_INTERVAL = 10
@ -244,7 +251,10 @@ class CacheStatusService:
async def complete_sync(self, error_message: Optional[str] = None):
async with self._state_lock:
status = 'failed' if error_message else 'completed'
if not self._progress.is_syncing:
return
is_success = error_message is None
status = 'completed' if is_success else 'failed'
logger.info(f"Cache sync {status}: {self._progress.phase}")
if self._sync_state_store:
@ -259,7 +269,9 @@ class CacheStatusService:
error_message=error_message,
started_at=self._progress.started_at
)
await self._sync_state_store.clear_sync_state()
if is_success:
await self._sync_state_store.clear_sync_state()
await self._sync_state_store.clear_processed_items()
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to persist completion: {e}")

View file

@ -71,6 +71,7 @@ class LibraryService:
self._local_files_service = local_files_service
self._jellyfin_library_service = jellyfin_library_service
self._navidrome_library_service = navidrome_library_service
self._sync_state_store = sync_state_store
self._can_precache = sync_state_store is not None and genre_index is not None
self._precache_service: LibraryPrecacheService | None = None
if self._can_precache:
@ -287,11 +288,11 @@ class LibraryService:
logger.error(f"Failed to fetch recently added: {e}")
raise ExternalServiceError(f"Failed to fetch recently added: {e}")
async def sync_library(self, is_manual: bool = False) -> SyncLibraryResponse:
async def sync_library(self, is_manual: bool = False, force_full: bool = False) -> SyncLibraryResponse:
from services.cache_status_service import CacheStatusService
if not self._lidarr_repo.is_configured():
raise ExternalServiceError("Lidarr is not configured — set a Lidarr API key in Settings to sync your library.")
raise ExternalServiceError("Lidarr is not configured. Set a Lidarr API key in Settings to sync your library.")
try:
status_service = CacheStatusService()
@ -368,13 +369,31 @@ class LibraryService:
self._last_manual_sync = now
if self._precache_service is None:
logger.warning("Precache skipped sync_state_store/genre_index not provided")
logger.warning("Precache skipped: sync_state_store/genre_index not provided")
self._update_last_sync_timestamp()
result = SyncLibraryResponse(status='success', artists=len(artists), albums=len(albums))
self._sync_future.set_result(result)
return result
task = asyncio.create_task(self._precache_service.precache_library_resources(artists, albums))
resume = False
if not force_full and self._sync_state_store:
try:
last_state = await self._sync_state_store.get_sync_state()
if last_state and last_state.get('status') == 'failed':
resume = True
logger.info("Previous sync failed, resuming from checkpoint")
except Exception as e: # noqa: BLE001
logger.warning("Failed to check sync state for resume: %s", e)
if force_full and self._sync_state_store:
try:
await self._sync_state_store.clear_processed_items()
await self._sync_state_store.clear_sync_state()
logger.info("Force full sync: cleared previous progress")
except Exception as e: # noqa: BLE001
logger.warning("Failed to clear sync state for force_full: %s", e)
task = asyncio.create_task(self._precache_service.precache_library_resources(artists, albums, resume=resume))
def on_task_done(t: asyncio.Task):
try:
@ -572,8 +591,6 @@ class LibraryService:
except Exception: # noqa: BLE001
logger.warning("Failed to clean up cover images after removal", exc_info=True)
# Track resolution — extracted from routes/library.py
async def _resolve_album_tracks(
self,
album_mbid: str,

View file

@ -69,7 +69,7 @@ class AlbumPhase:
advanced_settings = self._preferences_service.get_advanced_settings()
batch_size = advanced_settings.batch_albums
min_batch = max(1, advanced_settings.batch_albums - 2)
max_batch = min(20, advanced_settings.batch_albums + 7)
max_batch = min(20, advanced_settings.batch_albums + 12)
metadata_fetched = 0
covers_fetched = 0
consecutive_slow_batches = 0

View file

@ -19,7 +19,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
_AUDIODB_PREWARM_INTER_ITEM_DELAY = 2.0
_AUDIODB_PREWARM_LOG_INTERVAL = 100
@ -34,6 +33,8 @@ class AudioDBPhase:
self._preferences_service = preferences_service
self._audiodb_image_service = audiodb_image_service
_CACHE_CHECK_CHUNK = 200
async def check_cache_needs(
self,
artists: list[dict],
@ -56,14 +57,21 @@ class AudioDBPhase:
cached = await svc.get_cached_album_images(mbid)
return None if cached is not None else album
artist_results = await asyncio.gather(
*(check_artist(a) for a in artists), return_exceptions=True
)
album_results = await asyncio.gather(
*(check_album(a) for a in albums), return_exceptions=True
)
needed_artists = [r for r in artist_results if r is not None and not isinstance(r, Exception)]
needed_albums = [r for r in album_results if r is not None and not isinstance(r, Exception)]
needed_artists: list[dict] = []
chunk = self._CACHE_CHECK_CHUNK
for i in range(0, len(artists), chunk):
results = await asyncio.gather(
*(check_artist(a) for a in artists[i:i + chunk]), return_exceptions=True
)
needed_artists.extend(r for r in results if r is not None and not isinstance(r, Exception))
needed_albums: list[Any] = []
for i in range(0, len(albums), chunk):
results = await asyncio.gather(
*(check_album(a) for a in albums[i:i + chunk]), return_exceptions=True
)
needed_albums.extend(r for r in results if r is not None and not isinstance(r, Exception))
return needed_artists, needed_albums
async def download_bytes(self, url: str, entity_type: str, mbid: str) -> bool:
@ -182,6 +190,9 @@ class AudioDBPhase:
await status_service.skip_phase('audiodb_prewarm')
return
concurrency = settings.audiodb_prewarm_concurrency
inter_item_delay = settings.audiodb_prewarm_delay
needed_artists, needed_albums = await self.check_cache_needs(artists, albums)
total = len(needed_artists) + len(needed_albums)
if total == 0:
@ -190,10 +201,10 @@ class AudioDBPhase:
return
original_total = len(artists) + len(albums)
hit_rate = ((original_total - total) / original_total * 100) if original_total > 0 else 100
initial_hit_rate = ((original_total - total) / original_total * 100) if original_total > 0 else 100
logger.info(
"Phase 5 (AudioDB): Pre-warming %d items (%d artists, %d albums)%.0f%% already cached",
total, len(needed_artists), len(needed_albums), hit_rate,
"Phase 5 (AudioDB): Pre-warming %d items (%d artists, %d albums), %.0f%% already cached, concurrency=%d delay=%.1fs",
total, len(needed_artists), len(needed_albums), initial_hit_rate, concurrency, inter_item_delay,
)
await status_service.update_phase('audiodb_prewarm', total)
@ -204,71 +215,102 @@ class AudioDBPhase:
bytes_ok = 0
bytes_fail = 0
svc = self._audiodb_image_service
sem = asyncio.Semaphore(concurrency)
counter_lock = asyncio.Lock()
for artist in needed_artists:
async def process_artist(artist: dict) -> None:
nonlocal processed, bytes_ok, bytes_fail
if status_service.is_cancelled():
logger.info("AudioDB pre-warming cancelled during artist phase")
break
return
if not self._preferences_service.get_advanced_settings().audiodb_enabled:
logger.info("AudioDB disabled during pre-warming, stopping")
break
return
mbid = artist.get('mbid')
name = artist.get('name', 'Unknown')
processed += 1
try:
result = await svc.fetch_and_cache_artist_images(mbid, name, is_monitored=True)
if result and not result.is_negative and result.thumb_url:
if await self.download_bytes(result.thumb_url, "artist", mbid):
async with sem:
if inter_item_delay > 0:
await asyncio.sleep(inter_item_delay)
try:
result = await svc.fetch_and_cache_artist_images(mbid, name, is_monitored=True)
except Exception as e: # noqa: BLE001
result = None
logger.warning("audiodb.prewarm action=artist_error mbid=%s error=%s", mbid[:8] if mbid else '?', e)
if result and not result.is_negative and result.thumb_url:
ok = await self.download_bytes(result.thumb_url, "artist", mbid)
async with counter_lock:
if ok:
bytes_ok += 1
else:
bytes_fail += 1
except Exception as e: # noqa: BLE001
logger.warning("audiodb.prewarm action=artist_error mbid=%s error=%s", mbid[:8] if mbid else '?', e)
await status_service.update_progress(processed, f"AudioDB: {name}")
async with counter_lock:
processed += 1
local_processed = processed
snap_ok, snap_fail = bytes_ok, bytes_fail
await status_service.update_progress(local_processed, f"AudioDB: {name}")
if processed % _AUDIODB_PREWARM_LOG_INTERVAL == 0:
if local_processed % _AUDIODB_PREWARM_LOG_INTERVAL == 0:
logger.info(
"audiodb.prewarm processed=%d total=%d hit_rate=%.0f%% bytes_ok=%d bytes_fail=%d remaining=%d",
processed, total, hit_rate, bytes_ok, bytes_fail, total - processed,
"audiodb.prewarm processed=%d total=%d initial_hit=%.0f%% bytes_ok=%d bytes_fail=%d remaining=%d",
local_processed, total, initial_hit_rate, snap_ok, snap_fail, total - local_processed,
)
await asyncio.sleep(_AUDIODB_PREWARM_INTER_ITEM_DELAY)
for album in needed_albums:
async def process_album(album: Any) -> None:
nonlocal processed, bytes_ok, bytes_fail
if status_service.is_cancelled():
logger.info("AudioDB pre-warming cancelled during album phase")
break
return
if not self._preferences_service.get_advanced_settings().audiodb_enabled:
logger.info("AudioDB disabled during pre-warming, stopping")
break
return
mbid = getattr(album, 'musicbrainz_id', None) if hasattr(album, 'musicbrainz_id') else album.get('mbid') if isinstance(album, dict) else None
artist_name = getattr(album, 'artist_name', None) if hasattr(album, 'artist_name') else album.get('artist_name') if isinstance(album, dict) else None
album_name = getattr(album, 'title', None) if hasattr(album, 'title') else album.get('title') if isinstance(album, dict) else None
processed += 1
try:
result = await svc.fetch_and_cache_album_images(
mbid, artist_name=artist_name, album_name=album_name, is_monitored=True,
)
if result and not result.is_negative and result.album_thumb_url:
if await self.download_bytes(result.album_thumb_url, "album", mbid):
async with sem:
if inter_item_delay > 0:
await asyncio.sleep(inter_item_delay)
try:
result = await svc.fetch_and_cache_album_images(
mbid, artist_name=artist_name, album_name=album_name, is_monitored=True,
)
except Exception as e: # noqa: BLE001
result = None
logger.warning("audiodb.prewarm action=album_error mbid=%s error=%s", mbid[:8] if mbid else '?', e)
if result and not result.is_negative and result.album_thumb_url:
ok = await self.download_bytes(result.album_thumb_url, "album", mbid)
async with counter_lock:
if ok:
bytes_ok += 1
else:
bytes_fail += 1
except Exception as e: # noqa: BLE001
logger.warning("audiodb.prewarm action=album_error mbid=%s error=%s", mbid[:8] if mbid else '?', e)
await status_service.update_progress(processed, f"AudioDB: {album_name or 'Unknown'}")
async with counter_lock:
processed += 1
local_processed = processed
snap_ok, snap_fail = bytes_ok, bytes_fail
await status_service.update_progress(local_processed, f"AudioDB: {album_name or 'Unknown'}")
if processed % _AUDIODB_PREWARM_LOG_INTERVAL == 0:
if local_processed % _AUDIODB_PREWARM_LOG_INTERVAL == 0:
logger.info(
"audiodb.prewarm processed=%d total=%d hit_rate=%.0f%% bytes_ok=%d bytes_fail=%d remaining=%d",
processed, total, hit_rate, bytes_ok, bytes_fail, total - processed,
"audiodb.prewarm processed=%d total=%d initial_hit=%.0f%% bytes_ok=%d bytes_fail=%d remaining=%d",
local_processed, total, initial_hit_rate, snap_ok, snap_fail, total - local_processed,
)
await asyncio.sleep(_AUDIODB_PREWARM_INTER_ITEM_DELAY)
chunk = max(concurrency * 4, 20)
for i in range(0, len(needed_artists), chunk):
if status_service.is_cancelled():
break
batch = needed_artists[i:i + chunk]
await asyncio.gather(*(process_artist(a) for a in batch), return_exceptions=True)
for i in range(0, len(needed_albums), chunk):
if status_service.is_cancelled():
break
batch = needed_albums[i:i + chunk]
await asyncio.gather(*(process_album(a) for a in batch), return_exceptions=True)
logger.info(
"audiodb.prewarm action=complete processed=%d total=%d bytes_ok=%d bytes_fail=%d",

View file

@ -1,9 +1,10 @@
"""Pre-cache orchestrator delegates to phase sub-services."""
"""Pre-cache orchestrator: delegates to phase sub-services."""
from __future__ import annotations
import logging
import asyncio
import time
from typing import Any, TYPE_CHECKING
from repositories.protocols import LidarrRepositoryProtocol, CoverArtRepositoryProtocol
@ -63,32 +64,81 @@ class LibraryPrecacheService:
async def precache_library_resources(self, artists: list[dict], albums: list[Any], resume: bool = False) -> None:
status_service = CacheStatusService(self._sync_state_store)
task = None
advanced_settings = self._preferences_service.get_advanced_settings()
stall_timeout_s = advanced_settings.sync_stall_timeout_minutes * 60
max_timeout_s = advanced_settings.sync_max_timeout_hours * 3600
try:
task = asyncio.create_task(self._do_precache(artists, albums, status_service, resume))
from core.task_registry import TaskRegistry
TaskRegistry.get_instance().register("precache-library", task)
await asyncio.wait_for(task, timeout=1800.0)
except asyncio.TimeoutError:
logger.error("Library pre-cache operation timed out after 30 minutes")
if task and not task.done():
task.cancel()
watchdog = asyncio.create_task(
self._watchdog(task, status_service, stall_timeout_s, max_timeout_s)
)
done, _ = await asyncio.wait(
{task, watchdog}, return_when=asyncio.FIRST_COMPLETED
)
# Always prioritise the main task result; if it completed
# successfully we don't care about a simultaneous watchdog error.
if task in done:
watchdog.cancel()
try:
await task
await watchdog
except asyncio.CancelledError:
logger.info("Pre-cache task successfully cancelled after timeout")
except Exception as e: # noqa: BLE001
logger.error(f"Error during task cancellation: {e}")
await status_service.complete_sync("Sync timed out after 30 minutes")
raise ExternalServiceError("Library sync timed out - too many items or slow network")
pass
if task.exception():
raise task.exception()
elif watchdog in done:
exc = watchdog.exception() if watchdog.done() and not watchdog.cancelled() else None
if exc:
if not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
await status_service.complete_sync(str(exc))
raise ExternalServiceError(str(exc))
except asyncio.CancelledError:
logger.warning("Pre-cache was cancelled")
await status_service.complete_sync()
raise
except ExternalServiceError:
raise
except Exception as e:
logger.error(f"Pre-cache failed: {e}")
await status_service.complete_sync(str(e))
raise
async def _watchdog(
self,
task: asyncio.Task,
status_service: CacheStatusService,
stall_timeout_s: float,
max_timeout_s: float,
) -> None:
start = time.time()
while not task.done():
await asyncio.sleep(30)
elapsed = time.time() - start
if elapsed > max_timeout_s:
msg = f"Sync exceeded maximum timeout ({max_timeout_s / 3600:.1f}h)"
logger.error(msg)
raise ExternalServiceError(msg)
stall_duration = time.time() - status_service.get_last_progress_at()
if stall_duration > stall_timeout_s:
msg = (
f"Sync stalled: no progress for {stall_duration / 60:.0f} minutes "
f"during {status_service.get_progress().phase or 'unknown'} phase"
)
logger.error(msg)
raise ExternalServiceError(msg)
async def _do_precache(self, artists: list[dict], albums: list[Any], status_service: CacheStatusService, resume: bool = False) -> None:
from core.dependencies import get_album_service
try:
@ -209,12 +259,23 @@ class LibraryPrecacheService:
await status_service.skip_phase('albums')
if not status_service.is_cancelled():
try:
await self._audiodb_phase.precache_audiodb_data(artists, albums, status_service)
except Exception as e: # noqa: BLE001
logger.error(f"AudioDB pre-warming failed (non-fatal): {e}")
await status_service.complete_sync()
logger.info("Library resource pre-caching complete (core phases done)")
logger.info("Library resource pre-caching complete")
try:
audiodb_timeout = self._preferences_service.get_advanced_settings().sync_max_timeout_hours * 3600
logger.info("Starting AudioDB image prewarm as background enhancement...")
await asyncio.wait_for(
self._audiodb_phase.precache_audiodb_data(artists, albums, status_service),
timeout=audiodb_timeout,
)
logger.info("AudioDB image prewarm complete")
except asyncio.TimeoutError:
logger.warning("AudioDB pre-warming timed out (non-fatal)")
except Exception as e: # noqa: BLE001
logger.warning(f"AudioDB pre-warming failed (non-fatal): {e}")
else:
logger.info("Library resource pre-caching complete (cancelled)")
except Exception as e:
logger.error(f"Error during pre-cache: {e}")
raise

View file

@ -14,6 +14,8 @@ def _make_settings(audiodb_enabled: bool = True, name_search_fallback: bool = Fa
s = MagicMock()
s.audiodb_enabled = audiodb_enabled
s.audiodb_name_search_fallback = name_search_fallback
s.audiodb_prewarm_concurrency = 4
s.audiodb_prewarm_delay = 0.0
return s

View file

@ -185,3 +185,34 @@ class TestAudiodbEnabledRoundTrip:
assert frontend.audiodb_enabled is True
restored = frontend.to_backend()
assert restored.audiodb_enabled is True
class TestSyncSettingsRoundTrip:
"""Round-trip tests for the sync robustness settings."""
@pytest.mark.parametrize("field,backend_val,frontend_val", [
("sync_stall_timeout_minutes", 15, 15),
("sync_max_timeout_hours", 6, 6),
("audiodb_prewarm_concurrency", 6, 6),
("audiodb_prewarm_delay", 1.5, 1.5),
("artist_discovery_precache_concurrency", 5, 5),
])
def test_roundtrip_preserves_value(self, field: str, backend_val, frontend_val) -> None:
backend = AdvancedSettings(**{field: backend_val})
frontend = AdvancedSettingsFrontend.from_backend(backend)
assert getattr(frontend, field) == frontend_val
restored = frontend.to_backend()
assert getattr(restored, field) == backend_val
@pytest.mark.parametrize("field,default_val", [
("sync_stall_timeout_minutes", 10),
("sync_max_timeout_hours", 8),
("audiodb_prewarm_concurrency", 4),
("audiodb_prewarm_delay", 0.3),
("artist_discovery_precache_concurrency", 3),
])
def test_defaults_match(self, field: str, default_val) -> None:
backend = AdvancedSettings()
frontend = AdvancedSettingsFrontend()
assert getattr(backend, field) == default_val
assert getattr(frontend, field) == default_val

View file

@ -0,0 +1,153 @@
"""Tests for AudioDB parallel prewarm with semaphore gating."""
import asyncio
import tempfile
from pathlib import Path
import pytest
from unittest.mock import AsyncMock, MagicMock
from services.precache.audiodb_phase import AudioDBPhase
def _make_settings(concurrency=4, delay=0.0, enabled=True):
s = MagicMock()
s.audiodb_enabled = enabled
s.audiodb_name_search_fallback = False
s.audiodb_prewarm_concurrency = concurrency
s.audiodb_prewarm_delay = delay
return s
def _make_prefs(settings=None):
if settings is None:
settings = _make_settings()
prefs = MagicMock()
prefs.get_advanced_settings.return_value = settings
return prefs
def _make_status_service():
status = MagicMock()
status.update_phase = AsyncMock()
status.update_progress = AsyncMock()
status.persist_progress = AsyncMock()
status.skip_phase = AsyncMock()
status.is_cancelled.return_value = False
return status
def _make_cover_repo(tmpdir):
repo = AsyncMock()
repo.cache_dir = Path(tmpdir)
return repo
class TestAudioDBParallel:
@pytest.mark.asyncio
async def test_concurrent_processing(self):
"""Multiple artists should be processed concurrently up to the semaphore limit."""
concurrency = 2
prefs = _make_prefs(_make_settings(concurrency=concurrency, delay=0.0))
audiodb_svc = AsyncMock()
audiodb_svc.get_cached_artist_images = AsyncMock(return_value=None)
audiodb_svc.get_cached_album_images = AsyncMock(return_value=None)
audiodb_svc.fetch_and_cache_artist_images = AsyncMock(return_value=None)
with tempfile.TemporaryDirectory() as tmpdir:
phase = AudioDBPhase(
cover_repo=_make_cover_repo(tmpdir),
preferences_service=prefs,
audiodb_image_service=audiodb_svc,
)
artists = [{"mbid": f"mbid-{i:04d}", "name": f"Artist {i}"} for i in range(6)]
status = _make_status_service()
await phase.precache_audiodb_data(artists, [], status)
assert audiodb_svc.fetch_and_cache_artist_images.call_count == 6
assert status.update_progress.call_count == 6
assert True
@pytest.mark.asyncio
async def test_concurrency_respects_setting(self):
"""The concurrency semaphore should limit parallel execution."""
max_concurrent = 0
current_concurrent = 0
lock = asyncio.Lock()
prefs = _make_prefs(_make_settings(concurrency=2, delay=0.0))
audiodb_svc = AsyncMock()
audiodb_svc.get_cached_artist_images = AsyncMock(return_value=None)
audiodb_svc.get_cached_album_images = AsyncMock(return_value=None)
async def track_concurrency(*args, **kwargs):
nonlocal max_concurrent, current_concurrent
async with lock:
current_concurrent += 1
if current_concurrent > max_concurrent:
max_concurrent = current_concurrent
await asyncio.sleep(0.05)
async with lock:
current_concurrent -= 1
return None
audiodb_svc.fetch_and_cache_artist_images = AsyncMock(side_effect=track_concurrency)
with tempfile.TemporaryDirectory() as tmpdir:
phase = AudioDBPhase(
cover_repo=_make_cover_repo(tmpdir),
preferences_service=prefs,
audiodb_image_service=audiodb_svc,
)
artists = [{"mbid": f"mbid-{i:04d}", "name": f"Artist {i}"} for i in range(10)]
status = _make_status_service()
await phase.precache_audiodb_data(artists, [], status)
assert max_concurrent <= 2
assert max_concurrent >= 1
assert True
@pytest.mark.asyncio
async def test_disabled_audiodb_skips(self):
"""When audiodb_enabled is False, phase should be skipped."""
prefs = _make_prefs(_make_settings(enabled=False))
phase = AudioDBPhase(
cover_repo=AsyncMock(),
preferences_service=prefs,
audiodb_image_service=AsyncMock(),
)
status = _make_status_service()
await phase.precache_audiodb_data([], [], status)
status.skip_phase.assert_called_once_with('audiodb_prewarm')
assert True
@pytest.mark.asyncio
async def test_all_cached_skips(self):
"""When all items are cached, phase should be skipped."""
prefs = _make_prefs(_make_settings())
audiodb_svc = AsyncMock()
audiodb_svc.get_cached_artist_images = AsyncMock(return_value={"some": "data"})
audiodb_svc.get_cached_album_images = AsyncMock(return_value={"some": "data"})
phase = AudioDBPhase(
cover_repo=AsyncMock(),
preferences_service=prefs,
audiodb_image_service=audiodb_svc,
)
artists = [{"mbid": "mbid-0001", "name": "Artist 1"}]
status = _make_status_service()
await phase.precache_audiodb_data(artists, [], status)
status.skip_phase.assert_called_once_with('audiodb_prewarm')
assert True

View file

@ -46,6 +46,8 @@ def _make_settings(enabled=True, name_search_fallback=False):
s.cache_ttl_audiodb_found = 604800
s.cache_ttl_audiodb_not_found = 86400
s.cache_ttl_audiodb_library = 1209600
s.audiodb_prewarm_concurrency = 4
s.audiodb_prewarm_delay = 0.0
return s
@ -276,6 +278,8 @@ class TestPrewarmLogContract:
settings = MagicMock()
settings.audiodb_enabled = True
settings.audiodb_name_search_fallback = False
settings.audiodb_prewarm_concurrency = 4
settings.audiodb_prewarm_delay = 0.0
prefs = MagicMock()
prefs.get_advanced_settings.return_value = settings
return LibraryPrecacheService(

View file

@ -0,0 +1,114 @@
"""Tests for sync resume-on-failure behaviour."""
import asyncio
import sqlite3
import tempfile
import os
import threading
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from infrastructure.persistence.sync_state_store import SyncStateStore
from services.cache_status_service import CacheStatusService
@pytest.fixture(autouse=True)
def _reset_singleton():
"""Reset CacheStatusService singleton between tests."""
CacheStatusService._instance = None
yield
CacheStatusService._instance = None
class TestResumeOnFailure:
@pytest.mark.asyncio
async def test_complete_sync_preserves_state_on_failure(self):
"""On failure, sync state should be saved but NOT cleared."""
store = AsyncMock()
store.save_sync_state = AsyncMock()
store.clear_sync_state = AsyncMock()
store.clear_processed_items = AsyncMock()
svc = CacheStatusService(store)
await svc.start_sync('artists', 100)
await svc.update_progress(50, "some artist")
await svc.complete_sync("Sync stalled: no progress")
store.save_sync_state.assert_called()
last_call = store.save_sync_state.call_args
assert last_call.kwargs.get('status') == 'failed'
store.clear_sync_state.assert_not_called()
store.clear_processed_items.assert_not_called()
assert True
@pytest.mark.asyncio
async def test_complete_sync_clears_state_on_success(self):
"""On success, both sync_state and processed_items should be cleared."""
store = AsyncMock()
store.save_sync_state = AsyncMock()
store.clear_sync_state = AsyncMock()
store.clear_processed_items = AsyncMock()
svc = CacheStatusService(store)
await svc.start_sync('artists', 10)
await svc.update_progress(10, "done")
await svc.complete_sync(None)
store.clear_sync_state.assert_called_once()
store.clear_processed_items.assert_called_once()
assert True
@pytest.mark.asyncio
async def test_last_progress_at_updates_on_progress(self):
"""get_last_progress_at should reflect the latest update_progress call."""
store = AsyncMock()
store.save_sync_state = AsyncMock()
svc = CacheStatusService(store)
await svc.start_sync('artists', 10)
t1 = svc.get_last_progress_at()
await asyncio.sleep(0.05)
await svc.update_progress(5, "artist5")
t2 = svc.get_last_progress_at()
assert t2 > t1
assert True
@pytest.mark.asyncio
async def test_last_progress_at_updates_on_phase_change(self):
"""get_last_progress_at should refresh when phase changes."""
store = AsyncMock()
store.save_sync_state = AsyncMock()
svc = CacheStatusService(store)
await svc.start_sync('artists', 10)
t1 = svc.get_last_progress_at()
await asyncio.sleep(0.05)
await svc.update_phase('albums', 50)
t2 = svc.get_last_progress_at()
assert t2 > t1
assert True
class TestSyncStateStoreClear:
@pytest.mark.asyncio
async def test_clear_processed_items_deletes_all(self):
"""clear_processed_items should execute a DELETE on processed_items."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test.db")
write_lock = threading.Lock()
store = SyncStateStore(db_path, write_lock)
await store.mark_items_processed_batch("artist", ["mbid1", "mbid2"])
items = await store.get_processed_items("artist")
assert len(items) == 2
await store.clear_processed_items()
items = await store.get_processed_items("artist")
assert len(items) == 0
assert True

View file

@ -0,0 +1,126 @@
"""Tests for the adaptive watchdog timeout in the orchestrator."""
import asyncio
import time
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from services.precache.orchestrator import LibraryPrecacheService
from services.cache_status_service import CacheStatusService
from core.exceptions import ExternalServiceError
@pytest.fixture(autouse=True)
def _reset_singleton():
"""Reset CacheStatusService singleton between tests."""
CacheStatusService._instance = None
yield
CacheStatusService._instance = None
def _make_settings():
s = MagicMock()
s.sync_stall_timeout_minutes = 0.05 # 3 seconds for test speed
s.sync_max_timeout_hours = 0.01 # 36 seconds
s.audiodb_enabled = False
s.audiodb_prewarm_concurrency = 4
s.audiodb_prewarm_delay = 0.0
s.batch_artist_images = 10
s.batch_albums = 8
s.delay_albums = 0.0
s.delay_artists = 0.0
s.artist_discovery_precache_delay = 0.0
return s
def _make_prefs(settings=None):
if settings is None:
settings = _make_settings()
prefs = MagicMock()
prefs.get_advanced_settings.return_value = settings
return prefs
def _make_service(prefs=None):
if prefs is None:
prefs = _make_prefs()
return LibraryPrecacheService(
lidarr_repo=AsyncMock(),
cover_repo=AsyncMock(),
preferences_service=prefs,
sync_state_store=AsyncMock(),
genre_index=AsyncMock(),
library_db=AsyncMock(),
)
class TestAdaptiveWatchdog:
@pytest.mark.asyncio
async def test_stall_detection_cancels_sync(self):
"""Sync that stops making progress should be cancelled by the watchdog."""
settings = _make_settings()
settings.sync_stall_timeout_minutes = 0.02 # 1.2 seconds
svc = _make_service(_make_prefs(settings))
async def stalling_precache(artists, albums, status_service, resume=False):
await status_service.start_sync('artists', 1)
await asyncio.sleep(30) # Stall forever
with patch.object(svc, '_do_precache', side_effect=stalling_precache):
with pytest.raises(ExternalServiceError, match="stalled"):
await svc.precache_library_resources([], [])
assert True
@pytest.mark.asyncio
async def test_progressing_sync_completes(self):
"""A sync that makes steady progress should complete without watchdog interference."""
settings = _make_settings()
settings.sync_stall_timeout_minutes = 0.1 # 6 seconds
svc = _make_service(_make_prefs(settings))
async def fast_precache(artists, albums, status_service, resume=False):
await status_service.start_sync('artists', 2)
await status_service.update_progress(1, "artist1")
await asyncio.sleep(0.1)
await status_service.update_progress(2, "artist2")
with patch.object(svc, '_do_precache', side_effect=fast_precache):
await svc.precache_library_resources([], [])
assert True
@pytest.mark.asyncio
async def test_max_timeout_cancels_even_with_progress(self):
"""Max timeout should cancel even if progress is being made."""
settings = _make_settings()
settings.sync_stall_timeout_minutes = 10 # Very generous stall timeout
settings.sync_max_timeout_hours = 0.0003 # ~1 second
svc = _make_service(_make_prefs(settings))
async def slow_but_progressing(artists, albums, status_service, resume=False):
await status_service.start_sync('artists', 100)
for i in range(100):
await status_service.update_progress(i + 1, f"artist{i}")
await asyncio.sleep(0.5)
with patch.object(svc, '_do_precache', side_effect=slow_but_progressing):
with pytest.raises(ExternalServiceError, match="maximum timeout"):
await svc.precache_library_resources([], [])
assert True
@pytest.mark.asyncio
async def test_error_in_precache_propagates(self):
"""Errors in the precache task should propagate through the watchdog."""
svc = _make_service()
async def failing_precache(artists, albums, status_service, resume=False):
raise ValueError("something broke")
with patch.object(svc, '_do_precache', side_effect=failing_precache):
with pytest.raises(ValueError, match="something broke"):
await svc.precache_library_resources([], [])
assert True

View file

@ -101,3 +101,46 @@
max={5}
/>
</div>
<div class="divider my-4"></div>
<h4 class="font-medium text-sm text-base-content/70 mb-3">Library Sync</h4>
<div class="grid grid-cols-1 sm:grid-cols-2 gap-x-6 gap-y-4">
<SettingsNumberField
label="Stall Timeout"
description="Cancel sync if no progress (default: 10 min)"
bind:value={data.sync_stall_timeout_minutes}
min={2}
max={30}
unit="min"
/>
<SettingsNumberField
label="Max Timeout"
description="Hard sync timeout (default: 8 hrs)"
bind:value={data.sync_max_timeout_hours}
min={1}
max={48}
unit="hrs"
/>
<SettingsNumberField
label="AudioDB Prewarm Concurrency"
description="Parallel AudioDB requests (default: 4)"
bind:value={data.audiodb_prewarm_concurrency}
min={1}
max={8}
/>
<SettingsNumberField
label="AudioDB Prewarm Delay"
description="Delay between items (default: 0.3s)"
bind:value={data.audiodb_prewarm_delay}
min={0}
max={5}
step={0.1}
unit="sec"
/>
<SettingsNumberField
label="Discovery Precache Concurrency"
description="Parallel discovery fetches (default: 3)"
bind:value={data.artist_discovery_precache_concurrency}
min={1}
max={8}
/>
</div>

View file

@ -59,4 +59,9 @@ export interface AdvancedSettingsForm {
cache_ttl_audiodb_not_found: number;
cache_ttl_audiodb_library: number;
cache_ttl_recently_viewed_bytes: number;
sync_stall_timeout_minutes: number;
sync_max_timeout_hours: number;
audiodb_prewarm_concurrency: number;
audiodb_prewarm_delay: number;
artist_discovery_precache_concurrency: number;
}