2026-01-29 18:06:21 +00:00
|
|
|
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
|
|
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
|
|
2026-05-21 19:29:49 +00:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-01-29 18:06:21 +00:00
|
|
|
import logging
|
|
|
|
|
import threading
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
2026-05-21 19:29:49 +00:00
|
|
|
from data_designer.engine.progress.tracker import ProgressTracker
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def tracker() -> ProgressTracker:
|
|
|
|
|
return ProgressTracker(total_records=100, label="test column 'name'")
|
|
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_initializes_with_correct_values() -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=100, label="test label")
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert tracker.total_records == 100
|
|
|
|
|
assert tracker.label == "test label"
|
|
|
|
|
assert tracker.completed == 0
|
|
|
|
|
assert tracker.success == 0
|
|
|
|
|
assert tracker.failed == 0
|
chore: async engine follow-up - rename, preview, lifecycle, progress (#456)
* chore: rename ColumnWiseDatasetBuilder, wire async preview, unify row-group lifecycle
- Rename ColumnWiseDatasetBuilder to DatasetBuilder and
column_wise_builder.py to dataset_builder.py, update all references
- Extract _prepare_async_run() factory shared by build and preview paths
- Add _build_async_preview() for async preview with no disk checkpoints
- Replace on_row_group_complete/on_checkpoint_complete with single
on_finalize_row_group callback; caller handles checkpointing
- Add free_row_group() on RowGroupBufferManager for discard-without-write
- Free fully-dropped row groups instead of finalizing them
- Add consolidated AsyncProgressReporter for async generation logging
Closes #437, closes #442, closes #444
* feat: add consolidated async progress reporting with row group context
- Add AsyncProgressReporter: groups per-column progress into a single
log block emitted at configurable intervals (default 5s)
- Add quiet mode to ProgressTracker to suppress per-tracker logging
when used with the consolidated reporter
- Add ContextVar-based row group tagging (RG1, RG2, ...) for log
messages emitted inside async tasks (samplers, expressions, seeds)
- Add progress_interval to RunConfig for user-configurable reporting
- Remove log_start_async from ProgressTracker (superseded by reporter)
Closes #443
* fix async preview and progress reporting
* feat: add opt-in sticky ANSI progress bars for generation
Add RunConfig.progress_bar setting that replaces periodic log-line
progress with sticky terminal bars that stay at the bottom while
logs scroll above. Pure ANSI escape codes, no new dependencies.
Disabled by default - existing log-based output unchanged.
* docs: add missing progress_interval docstring in RunConfig
* fix: update progress bar on every completion in async path
Skip the time gate when the progress bar is active so the bar
redraws on every record instead of every progress_interval seconds.
* fix: resolve row-group semaphore deadlock when all tasks are deferred
When all tasks for admitted row groups fail with transient errors,
the row-group semaphore never releases, blocking admission of new
row groups. Fix by salvaging stalled row groups inline - retrying
deferred tasks immediately so row groups can checkpoint and free
their semaphore slots.
Also updates row group log format to (x/X) with leading zeros.
* fix: eagerly salvage stalled row groups to avoid wasting semaphore slots
Run inline salvage after every checkpoint pass instead of only when
globally stalled. Row groups with 0 in-flight and only deferred tasks
are salvaged immediately, freeing their semaphore slot for new work.
* fix: address review findings from greptile and codex
- Use `with` statement for progress bar context (safe __exit__ on error)
- Check bar.is_active instead of bar is not None (non-TTY fallback)
- Record failures (not skips) for tasks that exhaust salvage retries
- Record skipped tasks when pre-batch filtering drops rows
* fix: stable progress bar width and accurate failure counts
- Pre-compute fixed stats width at bar creation to prevent bar
resizing when failed count appears
- Cap displayed completed at total to avoid >100% on retries
- Exclude already-failed columns from skip recording to prevent
double-counting in progress reporter
* fix: address Nabin's review - exclude_columns, dead code, docstring
- Add exclude_columns={task.column} on non-retryable batch/from_scratch
drop path to prevent double-counting (same pattern as cell path)
- Simplify salvage drop to per-task exclude (is_dropped guard handles
multi-column case)
- Remove dead _in_flight_for_rg method
- Fix context.py docstring to match actual (x/X) format
* fix: _drain_frontier exits before dispatching ready salvage tasks
After salvage discards a cell task from dispatched (making it
available in the frontier), _drain_frontier broke immediately
because nothing was in-flight yet. The task and its downstream
were never re-dispatched, leaving the row group incomplete.
Fix: only break when both ready and in-flight are empty.
* fix: salvage edge cases found by code review
- _drain_frontier: only break when both ready and in-flight are empty
- _salvage_rounds: re-mark sibling columns as dispatched after
from_scratch retry to prevent duplicate dispatch
- _salvage_stalled_row_groups: separate exhausted tasks from new
drain failures to avoid treating non-stalled tasks as permanent
- _checkpoint_completed_row_groups: clean up deferred tasks for
checkpointed row groups
- Early shutdown: salvage stalled row groups before exiting
* fix: skip record_failure for already-dropped rows in salvage
When multiple columns fail for the same row, the first drop records
a skip for the other column. Without this guard, record_failure
fires again for the second column, double-counting it.
2026-03-25 17:50:48 +00:00
|
|
|
assert tracker.skipped == 0
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_calculates_log_interval_from_percentage() -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=100, label="test", log_interval_percent=10)
|
|
|
|
|
assert tracker.log_interval == 10
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
tracker = ProgressTracker(total_records=100, label="test", log_interval_percent=25)
|
|
|
|
|
assert tracker.log_interval == 25
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
tracker = ProgressTracker(total_records=1000, label="test", log_interval_percent=5)
|
|
|
|
|
assert tracker.log_interval == 50
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_log_interval_minimum_is_one() -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=5, label="test", log_interval_percent=10)
|
|
|
|
|
assert tracker.log_interval >= 1
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
|
|
|
|
|
def test_handles_zero_total_records() -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=0, label="test")
|
|
|
|
|
assert tracker.log_interval == 1
|
|
|
|
|
assert tracker.total_records == 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_record_success_increments_completed_and_success(tracker: ProgressTracker) -> None:
|
|
|
|
|
tracker.record_success()
|
|
|
|
|
|
|
|
|
|
assert tracker.completed == 1
|
|
|
|
|
assert tracker.success == 1
|
|
|
|
|
assert tracker.failed == 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_record_success_multiple_times(tracker: ProgressTracker) -> None:
|
|
|
|
|
for _ in range(5):
|
2026-01-29 18:06:21 +00:00
|
|
|
tracker.record_success()
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert tracker.completed == 5
|
|
|
|
|
assert tracker.success == 5
|
|
|
|
|
assert tracker.failed == 0
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_record_failure_increments_completed_and_failed(tracker: ProgressTracker) -> None:
|
|
|
|
|
tracker.record_failure()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert tracker.completed == 1
|
|
|
|
|
assert tracker.success == 0
|
|
|
|
|
assert tracker.failed == 1
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
|
|
|
|
|
def test_record_failure_multiple_times(tracker: ProgressTracker) -> None:
|
|
|
|
|
for _ in range(5):
|
2026-01-29 18:06:21 +00:00
|
|
|
tracker.record_failure()
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert tracker.completed == 5
|
|
|
|
|
assert tracker.success == 0
|
|
|
|
|
assert tracker.failed == 5
|
|
|
|
|
|
|
|
|
|
|
chore: async engine follow-up - rename, preview, lifecycle, progress (#456)
* chore: rename ColumnWiseDatasetBuilder, wire async preview, unify row-group lifecycle
- Rename ColumnWiseDatasetBuilder to DatasetBuilder and
column_wise_builder.py to dataset_builder.py, update all references
- Extract _prepare_async_run() factory shared by build and preview paths
- Add _build_async_preview() for async preview with no disk checkpoints
- Replace on_row_group_complete/on_checkpoint_complete with single
on_finalize_row_group callback; caller handles checkpointing
- Add free_row_group() on RowGroupBufferManager for discard-without-write
- Free fully-dropped row groups instead of finalizing them
- Add consolidated AsyncProgressReporter for async generation logging
Closes #437, closes #442, closes #444
* feat: add consolidated async progress reporting with row group context
- Add AsyncProgressReporter: groups per-column progress into a single
log block emitted at configurable intervals (default 5s)
- Add quiet mode to ProgressTracker to suppress per-tracker logging
when used with the consolidated reporter
- Add ContextVar-based row group tagging (RG1, RG2, ...) for log
messages emitted inside async tasks (samplers, expressions, seeds)
- Add progress_interval to RunConfig for user-configurable reporting
- Remove log_start_async from ProgressTracker (superseded by reporter)
Closes #443
* fix async preview and progress reporting
* feat: add opt-in sticky ANSI progress bars for generation
Add RunConfig.progress_bar setting that replaces periodic log-line
progress with sticky terminal bars that stay at the bottom while
logs scroll above. Pure ANSI escape codes, no new dependencies.
Disabled by default - existing log-based output unchanged.
* docs: add missing progress_interval docstring in RunConfig
* fix: update progress bar on every completion in async path
Skip the time gate when the progress bar is active so the bar
redraws on every record instead of every progress_interval seconds.
* fix: resolve row-group semaphore deadlock when all tasks are deferred
When all tasks for admitted row groups fail with transient errors,
the row-group semaphore never releases, blocking admission of new
row groups. Fix by salvaging stalled row groups inline - retrying
deferred tasks immediately so row groups can checkpoint and free
their semaphore slots.
Also updates row group log format to (x/X) with leading zeros.
* fix: eagerly salvage stalled row groups to avoid wasting semaphore slots
Run inline salvage after every checkpoint pass instead of only when
globally stalled. Row groups with 0 in-flight and only deferred tasks
are salvaged immediately, freeing their semaphore slot for new work.
* fix: address review findings from greptile and codex
- Use `with` statement for progress bar context (safe __exit__ on error)
- Check bar.is_active instead of bar is not None (non-TTY fallback)
- Record failures (not skips) for tasks that exhaust salvage retries
- Record skipped tasks when pre-batch filtering drops rows
* fix: stable progress bar width and accurate failure counts
- Pre-compute fixed stats width at bar creation to prevent bar
resizing when failed count appears
- Cap displayed completed at total to avoid >100% on retries
- Exclude already-failed columns from skip recording to prevent
double-counting in progress reporter
* fix: address Nabin's review - exclude_columns, dead code, docstring
- Add exclude_columns={task.column} on non-retryable batch/from_scratch
drop path to prevent double-counting (same pattern as cell path)
- Simplify salvage drop to per-task exclude (is_dropped guard handles
multi-column case)
- Remove dead _in_flight_for_rg method
- Fix context.py docstring to match actual (x/X) format
* fix: _drain_frontier exits before dispatching ready salvage tasks
After salvage discards a cell task from dispatched (making it
available in the frontier), _drain_frontier broke immediately
because nothing was in-flight yet. The task and its downstream
were never re-dispatched, leaving the row group incomplete.
Fix: only break when both ready and in-flight are empty.
* fix: salvage edge cases found by code review
- _drain_frontier: only break when both ready and in-flight are empty
- _salvage_rounds: re-mark sibling columns as dispatched after
from_scratch retry to prevent duplicate dispatch
- _salvage_stalled_row_groups: separate exhausted tasks from new
drain failures to avoid treating non-stalled tasks as permanent
- _checkpoint_completed_row_groups: clean up deferred tasks for
checkpointed row groups
- Early shutdown: salvage stalled row groups before exiting
* fix: skip record_failure for already-dropped rows in salvage
When multiple columns fail for the same row, the first drop records
a skip for the other column. Without this guard, record_failure
fires again for the second column, double-counting it.
2026-03-25 17:50:48 +00:00
|
|
|
def test_record_skipped_increments_completed_and_skipped(tracker: ProgressTracker) -> None:
|
|
|
|
|
tracker.record_skipped()
|
|
|
|
|
|
|
|
|
|
assert tracker.completed == 1
|
|
|
|
|
assert tracker.success == 0
|
|
|
|
|
assert tracker.failed == 0
|
|
|
|
|
assert tracker.skipped == 1
|
|
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_tracks_mixed_successes_and_failures(tracker: ProgressTracker) -> None:
|
|
|
|
|
tracker.record_success()
|
|
|
|
|
tracker.record_success()
|
|
|
|
|
tracker.record_failure()
|
|
|
|
|
tracker.record_success()
|
|
|
|
|
tracker.record_failure()
|
|
|
|
|
|
|
|
|
|
assert tracker.completed == 5
|
|
|
|
|
assert tracker.success == 3
|
|
|
|
|
assert tracker.failed == 2
|
|
|
|
|
|
|
|
|
|
|
chore: async engine follow-up - rename, preview, lifecycle, progress (#456)
* chore: rename ColumnWiseDatasetBuilder, wire async preview, unify row-group lifecycle
- Rename ColumnWiseDatasetBuilder to DatasetBuilder and
column_wise_builder.py to dataset_builder.py, update all references
- Extract _prepare_async_run() factory shared by build and preview paths
- Add _build_async_preview() for async preview with no disk checkpoints
- Replace on_row_group_complete/on_checkpoint_complete with single
on_finalize_row_group callback; caller handles checkpointing
- Add free_row_group() on RowGroupBufferManager for discard-without-write
- Free fully-dropped row groups instead of finalizing them
- Add consolidated AsyncProgressReporter for async generation logging
Closes #437, closes #442, closes #444
* feat: add consolidated async progress reporting with row group context
- Add AsyncProgressReporter: groups per-column progress into a single
log block emitted at configurable intervals (default 5s)
- Add quiet mode to ProgressTracker to suppress per-tracker logging
when used with the consolidated reporter
- Add ContextVar-based row group tagging (RG1, RG2, ...) for log
messages emitted inside async tasks (samplers, expressions, seeds)
- Add progress_interval to RunConfig for user-configurable reporting
- Remove log_start_async from ProgressTracker (superseded by reporter)
Closes #443
* fix async preview and progress reporting
* feat: add opt-in sticky ANSI progress bars for generation
Add RunConfig.progress_bar setting that replaces periodic log-line
progress with sticky terminal bars that stay at the bottom while
logs scroll above. Pure ANSI escape codes, no new dependencies.
Disabled by default - existing log-based output unchanged.
* docs: add missing progress_interval docstring in RunConfig
* fix: update progress bar on every completion in async path
Skip the time gate when the progress bar is active so the bar
redraws on every record instead of every progress_interval seconds.
* fix: resolve row-group semaphore deadlock when all tasks are deferred
When all tasks for admitted row groups fail with transient errors,
the row-group semaphore never releases, blocking admission of new
row groups. Fix by salvaging stalled row groups inline - retrying
deferred tasks immediately so row groups can checkpoint and free
their semaphore slots.
Also updates row group log format to (x/X) with leading zeros.
* fix: eagerly salvage stalled row groups to avoid wasting semaphore slots
Run inline salvage after every checkpoint pass instead of only when
globally stalled. Row groups with 0 in-flight and only deferred tasks
are salvaged immediately, freeing their semaphore slot for new work.
* fix: address review findings from greptile and codex
- Use `with` statement for progress bar context (safe __exit__ on error)
- Check bar.is_active instead of bar is not None (non-TTY fallback)
- Record failures (not skips) for tasks that exhaust salvage retries
- Record skipped tasks when pre-batch filtering drops rows
* fix: stable progress bar width and accurate failure counts
- Pre-compute fixed stats width at bar creation to prevent bar
resizing when failed count appears
- Cap displayed completed at total to avoid >100% on retries
- Exclude already-failed columns from skip recording to prevent
double-counting in progress reporter
* fix: address Nabin's review - exclude_columns, dead code, docstring
- Add exclude_columns={task.column} on non-retryable batch/from_scratch
drop path to prevent double-counting (same pattern as cell path)
- Simplify salvage drop to per-task exclude (is_dropped guard handles
multi-column case)
- Remove dead _in_flight_for_rg method
- Fix context.py docstring to match actual (x/X) format
* fix: _drain_frontier exits before dispatching ready salvage tasks
After salvage discards a cell task from dispatched (making it
available in the frontier), _drain_frontier broke immediately
because nothing was in-flight yet. The task and its downstream
were never re-dispatched, leaving the row group incomplete.
Fix: only break when both ready and in-flight are empty.
* fix: salvage edge cases found by code review
- _drain_frontier: only break when both ready and in-flight are empty
- _salvage_rounds: re-mark sibling columns as dispatched after
from_scratch retry to prevent duplicate dispatch
- _salvage_stalled_row_groups: separate exhausted tasks from new
drain failures to avoid treating non-stalled tasks as permanent
- _checkpoint_completed_row_groups: clean up deferred tasks for
checkpointed row groups
- Early shutdown: salvage stalled row groups before exiting
* fix: skip record_failure for already-dropped rows in salvage
When multiple columns fail for the same row, the first drop records
a skip for the other column. Without this guard, record_failure
fires again for the second column, double-counting it.
2026-03-25 17:50:48 +00:00
|
|
|
def test_get_snapshot_includes_skipped_counts() -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=10, label="test")
|
|
|
|
|
tracker.record_success()
|
|
|
|
|
tracker.record_failure()
|
|
|
|
|
tracker.record_skipped()
|
|
|
|
|
|
|
|
|
|
completed, total_records, success, failed, skipped, percent, rate, emoji = tracker.get_snapshot(elapsed=2.0)
|
|
|
|
|
|
|
|
|
|
assert completed == 3
|
|
|
|
|
assert total_records == 10
|
|
|
|
|
assert success == 1
|
|
|
|
|
assert failed == 1
|
|
|
|
|
assert skipped == 1
|
|
|
|
|
assert percent == 30.0
|
|
|
|
|
assert rate == 1.5
|
|
|
|
|
assert emoji
|
|
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_log_start_logs_worker_info(tracker: ProgressTracker, caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
|
|
|
tracker.log_start(max_workers=8)
|
|
|
|
|
|
|
|
|
|
assert "8 concurrent workers" in caplog.text
|
|
|
|
|
assert "test column 'name'" in caplog.text
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
|
|
|
|
|
def test_logs_progress_at_interval(caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=50)
|
|
|
|
|
|
|
|
|
|
with caplog.at_level(logging.INFO):
|
2026-01-29 18:06:21 +00:00
|
|
|
for _ in range(5):
|
2026-01-31 02:03:18 +00:00
|
|
|
tracker.record_success()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert "5/10" in caplog.text
|
|
|
|
|
assert "50%" in caplog.text
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_log_final_logs_remaining_progress(caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=50)
|
|
|
|
|
|
|
|
|
|
for _ in range(3):
|
2026-01-29 18:06:21 +00:00
|
|
|
tracker.record_success()
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
with caplog.at_level(logging.INFO):
|
|
|
|
|
tracker.log_final()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert "3/10" in caplog.text
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_progress_log_includes_rate_and_eta(caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=50)
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
with caplog.at_level(logging.INFO):
|
|
|
|
|
for _ in range(5):
|
|
|
|
|
tracker.record_success()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert "rec/s" in caplog.text
|
|
|
|
|
assert "eta" in caplog.text
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_progress_log_shows_ok_and_failed_counts(caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=50)
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
with caplog.at_level(logging.INFO):
|
2026-01-29 18:06:21 +00:00
|
|
|
for _ in range(3):
|
|
|
|
|
tracker.record_success()
|
2026-01-31 02:03:18 +00:00
|
|
|
for _ in range(2):
|
|
|
|
|
tracker.record_failure()
|
|
|
|
|
|
|
|
|
|
assert "3 ok" in caplog.text
|
|
|
|
|
assert "2 failed" in caplog.text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_concurrent_record_calls_are_thread_safe() -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=1000, label="test", log_interval_percent=100)
|
|
|
|
|
num_threads = 10
|
|
|
|
|
records_per_thread = 100
|
|
|
|
|
|
|
|
|
|
def record_many_successes() -> None:
|
|
|
|
|
for _ in range(records_per_thread):
|
|
|
|
|
tracker.record_success()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def record_many_failures() -> None:
|
|
|
|
|
for _ in range(records_per_thread):
|
|
|
|
|
tracker.record_failure()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
threads = []
|
|
|
|
|
for i in range(num_threads):
|
|
|
|
|
if i % 2 == 0:
|
|
|
|
|
thread = threading.Thread(target=record_many_successes)
|
|
|
|
|
else:
|
|
|
|
|
thread = threading.Thread(target=record_many_failures)
|
|
|
|
|
threads.append(thread)
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
for thread in threads:
|
|
|
|
|
thread.start()
|
|
|
|
|
for thread in threads:
|
|
|
|
|
thread.join()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
expected_success = (num_threads // 2) * records_per_thread
|
|
|
|
|
expected_failed = (num_threads - num_threads // 2) * records_per_thread
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert tracker.completed == num_threads * records_per_thread
|
|
|
|
|
assert tracker.success == expected_success
|
|
|
|
|
assert tracker.failed == expected_failed
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_handles_very_small_total_records() -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=1, label="test")
|
|
|
|
|
tracker.record_success()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert tracker.completed == 1
|
|
|
|
|
assert tracker.success == 1
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_handles_log_interval_larger_than_total() -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=5, label="test", log_interval_percent=50)
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
for _ in range(5):
|
|
|
|
|
tracker.record_success()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert tracker.completed == 5
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_log_final_handles_zero_records_processed(caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=10, label="test")
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
with caplog.at_level(logging.INFO):
|
|
|
|
|
tracker.log_final()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
# Should not raise, may or may not log depending on implementation
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_progress_percentage_with_zero_total() -> None:
|
|
|
|
|
tracker = ProgressTracker(total_records=0, label="test")
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
# Should not raise division by zero
|
|
|
|
|
tracker.record_success()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert tracker.completed == 1
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"total_records,log_interval_percent",
|
|
|
|
|
[
|
|
|
|
|
(10, 10), # Exact divisibility
|
|
|
|
|
(10, 30), # Non-exact divisibility: logs at 3, 6, 9
|
|
|
|
|
(1, 10), # Single record edge case
|
|
|
|
|
(10, 100), # Interval equals total
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_100_percent_logged_exactly_once(
|
|
|
|
|
total_records: int, log_interval_percent: int, caplog: pytest.LogCaptureFixture
|
|
|
|
|
) -> None:
|
|
|
|
|
"""100% should be logged exactly once after completing all records + log_final."""
|
|
|
|
|
tracker = ProgressTracker(total_records=total_records, label="test", log_interval_percent=log_interval_percent)
|
|
|
|
|
|
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
|
|
|
for _ in range(total_records):
|
2026-01-29 18:06:21 +00:00
|
|
|
tracker.record_success()
|
2026-01-31 02:03:18 +00:00
|
|
|
tracker.log_final()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert caplog.text.count("100%") == 1
|
2026-01-29 18:06:21 +00:00
|
|
|
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
def test_record_completion_never_logs_100_percent(caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
|
"""_record_completion should never log 100%; that's log_final's job."""
|
|
|
|
|
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=10)
|
|
|
|
|
|
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
|
|
|
for _ in range(10):
|
|
|
|
|
tracker.record_success()
|
|
|
|
|
|
|
|
|
|
assert caplog.text.count("100%") == 0
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
with caplog.at_level(logging.INFO):
|
|
|
|
|
tracker.log_final()
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
assert caplog.text.count("100%") == 1
|
2026-01-29 18:06:21 +00:00
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
|
|
|
|
|
def test_partial_completion_logs_correct_percentage(caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
|
"""Partial progress should show actual percentage, not 100%."""
|
|
|
|
|
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=10)
|
|
|
|
|
|
|
|
|
|
for _ in range(7):
|
2026-01-29 18:06:21 +00:00
|
|
|
tracker.record_success()
|
|
|
|
|
|
2026-01-31 02:03:18 +00:00
|
|
|
with caplog.at_level(logging.INFO):
|
|
|
|
|
tracker.log_final()
|
|
|
|
|
|
|
|
|
|
assert "70%" in caplog.text
|
|
|
|
|
assert caplog.text.count("100%") == 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_concurrent_completion_logs_100_percent_once(caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
|
"""Thread safety: 100% logged exactly once even with concurrent completions."""
|
|
|
|
|
tracker = ProgressTracker(total_records=100, label="test", log_interval_percent=100)
|
|
|
|
|
|
|
|
|
|
def record_successes() -> None:
|
|
|
|
|
for _ in range(10):
|
|
|
|
|
tracker.record_success()
|
|
|
|
|
|
|
|
|
|
threads = [threading.Thread(target=record_successes) for _ in range(10)]
|
|
|
|
|
|
|
|
|
|
with caplog.at_level(logging.INFO):
|
|
|
|
|
for thread in threads:
|
|
|
|
|
thread.start()
|
|
|
|
|
for thread in threads:
|
|
|
|
|
thread.join()
|
|
|
|
|
tracker.log_final()
|
|
|
|
|
|
|
|
|
|
assert tracker.completed == 100
|
|
|
|
|
assert caplog.text.count("100%") == 1
|