DataDesigner/packages/data-designer-engine/tests/engine/progress/test_tracker.py

321 lines
9.6 KiB
Python
Raw Permalink Normal View History

# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import threading
import pytest
from data_designer.engine.progress.tracker import ProgressTracker
@pytest.fixture
def tracker() -> ProgressTracker:
return ProgressTracker(total_records=100, label="test column 'name'")
def test_initializes_with_correct_values() -> None:
tracker = ProgressTracker(total_records=100, label="test label")
assert tracker.total_records == 100
assert tracker.label == "test label"
assert tracker.completed == 0
assert tracker.success == 0
assert tracker.failed == 0
chore: async engine follow-up - rename, preview, lifecycle, progress (#456) * chore: rename ColumnWiseDatasetBuilder, wire async preview, unify row-group lifecycle - Rename ColumnWiseDatasetBuilder to DatasetBuilder and column_wise_builder.py to dataset_builder.py, update all references - Extract _prepare_async_run() factory shared by build and preview paths - Add _build_async_preview() for async preview with no disk checkpoints - Replace on_row_group_complete/on_checkpoint_complete with single on_finalize_row_group callback; caller handles checkpointing - Add free_row_group() on RowGroupBufferManager for discard-without-write - Free fully-dropped row groups instead of finalizing them - Add consolidated AsyncProgressReporter for async generation logging Closes #437, closes #442, closes #444 * feat: add consolidated async progress reporting with row group context - Add AsyncProgressReporter: groups per-column progress into a single log block emitted at configurable intervals (default 5s) - Add quiet mode to ProgressTracker to suppress per-tracker logging when used with the consolidated reporter - Add ContextVar-based row group tagging (RG1, RG2, ...) for log messages emitted inside async tasks (samplers, expressions, seeds) - Add progress_interval to RunConfig for user-configurable reporting - Remove log_start_async from ProgressTracker (superseded by reporter) Closes #443 * fix async preview and progress reporting * feat: add opt-in sticky ANSI progress bars for generation Add RunConfig.progress_bar setting that replaces periodic log-line progress with sticky terminal bars that stay at the bottom while logs scroll above. Pure ANSI escape codes, no new dependencies. Disabled by default - existing log-based output unchanged. * docs: add missing progress_interval docstring in RunConfig * fix: update progress bar on every completion in async path Skip the time gate when the progress bar is active so the bar redraws on every record instead of every progress_interval seconds. * fix: resolve row-group semaphore deadlock when all tasks are deferred When all tasks for admitted row groups fail with transient errors, the row-group semaphore never releases, blocking admission of new row groups. Fix by salvaging stalled row groups inline - retrying deferred tasks immediately so row groups can checkpoint and free their semaphore slots. Also updates row group log format to (x/X) with leading zeros. * fix: eagerly salvage stalled row groups to avoid wasting semaphore slots Run inline salvage after every checkpoint pass instead of only when globally stalled. Row groups with 0 in-flight and only deferred tasks are salvaged immediately, freeing their semaphore slot for new work. * fix: address review findings from greptile and codex - Use `with` statement for progress bar context (safe __exit__ on error) - Check bar.is_active instead of bar is not None (non-TTY fallback) - Record failures (not skips) for tasks that exhaust salvage retries - Record skipped tasks when pre-batch filtering drops rows * fix: stable progress bar width and accurate failure counts - Pre-compute fixed stats width at bar creation to prevent bar resizing when failed count appears - Cap displayed completed at total to avoid >100% on retries - Exclude already-failed columns from skip recording to prevent double-counting in progress reporter * fix: address Nabin's review - exclude_columns, dead code, docstring - Add exclude_columns={task.column} on non-retryable batch/from_scratch drop path to prevent double-counting (same pattern as cell path) - Simplify salvage drop to per-task exclude (is_dropped guard handles multi-column case) - Remove dead _in_flight_for_rg method - Fix context.py docstring to match actual (x/X) format * fix: _drain_frontier exits before dispatching ready salvage tasks After salvage discards a cell task from dispatched (making it available in the frontier), _drain_frontier broke immediately because nothing was in-flight yet. The task and its downstream were never re-dispatched, leaving the row group incomplete. Fix: only break when both ready and in-flight are empty. * fix: salvage edge cases found by code review - _drain_frontier: only break when both ready and in-flight are empty - _salvage_rounds: re-mark sibling columns as dispatched after from_scratch retry to prevent duplicate dispatch - _salvage_stalled_row_groups: separate exhausted tasks from new drain failures to avoid treating non-stalled tasks as permanent - _checkpoint_completed_row_groups: clean up deferred tasks for checkpointed row groups - Early shutdown: salvage stalled row groups before exiting * fix: skip record_failure for already-dropped rows in salvage When multiple columns fail for the same row, the first drop records a skip for the other column. Without this guard, record_failure fires again for the second column, double-counting it.
2026-03-25 17:50:48 +00:00
assert tracker.skipped == 0
def test_calculates_log_interval_from_percentage() -> None:
tracker = ProgressTracker(total_records=100, label="test", log_interval_percent=10)
assert tracker.log_interval == 10
tracker = ProgressTracker(total_records=100, label="test", log_interval_percent=25)
assert tracker.log_interval == 25
tracker = ProgressTracker(total_records=1000, label="test", log_interval_percent=5)
assert tracker.log_interval == 50
def test_log_interval_minimum_is_one() -> None:
tracker = ProgressTracker(total_records=5, label="test", log_interval_percent=10)
assert tracker.log_interval >= 1
def test_handles_zero_total_records() -> None:
tracker = ProgressTracker(total_records=0, label="test")
assert tracker.log_interval == 1
assert tracker.total_records == 0
def test_record_success_increments_completed_and_success(tracker: ProgressTracker) -> None:
tracker.record_success()
assert tracker.completed == 1
assert tracker.success == 1
assert tracker.failed == 0
def test_record_success_multiple_times(tracker: ProgressTracker) -> None:
for _ in range(5):
tracker.record_success()
assert tracker.completed == 5
assert tracker.success == 5
assert tracker.failed == 0
def test_record_failure_increments_completed_and_failed(tracker: ProgressTracker) -> None:
tracker.record_failure()
assert tracker.completed == 1
assert tracker.success == 0
assert tracker.failed == 1
def test_record_failure_multiple_times(tracker: ProgressTracker) -> None:
for _ in range(5):
tracker.record_failure()
assert tracker.completed == 5
assert tracker.success == 0
assert tracker.failed == 5
chore: async engine follow-up - rename, preview, lifecycle, progress (#456) * chore: rename ColumnWiseDatasetBuilder, wire async preview, unify row-group lifecycle - Rename ColumnWiseDatasetBuilder to DatasetBuilder and column_wise_builder.py to dataset_builder.py, update all references - Extract _prepare_async_run() factory shared by build and preview paths - Add _build_async_preview() for async preview with no disk checkpoints - Replace on_row_group_complete/on_checkpoint_complete with single on_finalize_row_group callback; caller handles checkpointing - Add free_row_group() on RowGroupBufferManager for discard-without-write - Free fully-dropped row groups instead of finalizing them - Add consolidated AsyncProgressReporter for async generation logging Closes #437, closes #442, closes #444 * feat: add consolidated async progress reporting with row group context - Add AsyncProgressReporter: groups per-column progress into a single log block emitted at configurable intervals (default 5s) - Add quiet mode to ProgressTracker to suppress per-tracker logging when used with the consolidated reporter - Add ContextVar-based row group tagging (RG1, RG2, ...) for log messages emitted inside async tasks (samplers, expressions, seeds) - Add progress_interval to RunConfig for user-configurable reporting - Remove log_start_async from ProgressTracker (superseded by reporter) Closes #443 * fix async preview and progress reporting * feat: add opt-in sticky ANSI progress bars for generation Add RunConfig.progress_bar setting that replaces periodic log-line progress with sticky terminal bars that stay at the bottom while logs scroll above. Pure ANSI escape codes, no new dependencies. Disabled by default - existing log-based output unchanged. * docs: add missing progress_interval docstring in RunConfig * fix: update progress bar on every completion in async path Skip the time gate when the progress bar is active so the bar redraws on every record instead of every progress_interval seconds. * fix: resolve row-group semaphore deadlock when all tasks are deferred When all tasks for admitted row groups fail with transient errors, the row-group semaphore never releases, blocking admission of new row groups. Fix by salvaging stalled row groups inline - retrying deferred tasks immediately so row groups can checkpoint and free their semaphore slots. Also updates row group log format to (x/X) with leading zeros. * fix: eagerly salvage stalled row groups to avoid wasting semaphore slots Run inline salvage after every checkpoint pass instead of only when globally stalled. Row groups with 0 in-flight and only deferred tasks are salvaged immediately, freeing their semaphore slot for new work. * fix: address review findings from greptile and codex - Use `with` statement for progress bar context (safe __exit__ on error) - Check bar.is_active instead of bar is not None (non-TTY fallback) - Record failures (not skips) for tasks that exhaust salvage retries - Record skipped tasks when pre-batch filtering drops rows * fix: stable progress bar width and accurate failure counts - Pre-compute fixed stats width at bar creation to prevent bar resizing when failed count appears - Cap displayed completed at total to avoid >100% on retries - Exclude already-failed columns from skip recording to prevent double-counting in progress reporter * fix: address Nabin's review - exclude_columns, dead code, docstring - Add exclude_columns={task.column} on non-retryable batch/from_scratch drop path to prevent double-counting (same pattern as cell path) - Simplify salvage drop to per-task exclude (is_dropped guard handles multi-column case) - Remove dead _in_flight_for_rg method - Fix context.py docstring to match actual (x/X) format * fix: _drain_frontier exits before dispatching ready salvage tasks After salvage discards a cell task from dispatched (making it available in the frontier), _drain_frontier broke immediately because nothing was in-flight yet. The task and its downstream were never re-dispatched, leaving the row group incomplete. Fix: only break when both ready and in-flight are empty. * fix: salvage edge cases found by code review - _drain_frontier: only break when both ready and in-flight are empty - _salvage_rounds: re-mark sibling columns as dispatched after from_scratch retry to prevent duplicate dispatch - _salvage_stalled_row_groups: separate exhausted tasks from new drain failures to avoid treating non-stalled tasks as permanent - _checkpoint_completed_row_groups: clean up deferred tasks for checkpointed row groups - Early shutdown: salvage stalled row groups before exiting * fix: skip record_failure for already-dropped rows in salvage When multiple columns fail for the same row, the first drop records a skip for the other column. Without this guard, record_failure fires again for the second column, double-counting it.
2026-03-25 17:50:48 +00:00
def test_record_skipped_increments_completed_and_skipped(tracker: ProgressTracker) -> None:
tracker.record_skipped()
assert tracker.completed == 1
assert tracker.success == 0
assert tracker.failed == 0
assert tracker.skipped == 1
def test_tracks_mixed_successes_and_failures(tracker: ProgressTracker) -> None:
tracker.record_success()
tracker.record_success()
tracker.record_failure()
tracker.record_success()
tracker.record_failure()
assert tracker.completed == 5
assert tracker.success == 3
assert tracker.failed == 2
chore: async engine follow-up - rename, preview, lifecycle, progress (#456) * chore: rename ColumnWiseDatasetBuilder, wire async preview, unify row-group lifecycle - Rename ColumnWiseDatasetBuilder to DatasetBuilder and column_wise_builder.py to dataset_builder.py, update all references - Extract _prepare_async_run() factory shared by build and preview paths - Add _build_async_preview() for async preview with no disk checkpoints - Replace on_row_group_complete/on_checkpoint_complete with single on_finalize_row_group callback; caller handles checkpointing - Add free_row_group() on RowGroupBufferManager for discard-without-write - Free fully-dropped row groups instead of finalizing them - Add consolidated AsyncProgressReporter for async generation logging Closes #437, closes #442, closes #444 * feat: add consolidated async progress reporting with row group context - Add AsyncProgressReporter: groups per-column progress into a single log block emitted at configurable intervals (default 5s) - Add quiet mode to ProgressTracker to suppress per-tracker logging when used with the consolidated reporter - Add ContextVar-based row group tagging (RG1, RG2, ...) for log messages emitted inside async tasks (samplers, expressions, seeds) - Add progress_interval to RunConfig for user-configurable reporting - Remove log_start_async from ProgressTracker (superseded by reporter) Closes #443 * fix async preview and progress reporting * feat: add opt-in sticky ANSI progress bars for generation Add RunConfig.progress_bar setting that replaces periodic log-line progress with sticky terminal bars that stay at the bottom while logs scroll above. Pure ANSI escape codes, no new dependencies. Disabled by default - existing log-based output unchanged. * docs: add missing progress_interval docstring in RunConfig * fix: update progress bar on every completion in async path Skip the time gate when the progress bar is active so the bar redraws on every record instead of every progress_interval seconds. * fix: resolve row-group semaphore deadlock when all tasks are deferred When all tasks for admitted row groups fail with transient errors, the row-group semaphore never releases, blocking admission of new row groups. Fix by salvaging stalled row groups inline - retrying deferred tasks immediately so row groups can checkpoint and free their semaphore slots. Also updates row group log format to (x/X) with leading zeros. * fix: eagerly salvage stalled row groups to avoid wasting semaphore slots Run inline salvage after every checkpoint pass instead of only when globally stalled. Row groups with 0 in-flight and only deferred tasks are salvaged immediately, freeing their semaphore slot for new work. * fix: address review findings from greptile and codex - Use `with` statement for progress bar context (safe __exit__ on error) - Check bar.is_active instead of bar is not None (non-TTY fallback) - Record failures (not skips) for tasks that exhaust salvage retries - Record skipped tasks when pre-batch filtering drops rows * fix: stable progress bar width and accurate failure counts - Pre-compute fixed stats width at bar creation to prevent bar resizing when failed count appears - Cap displayed completed at total to avoid >100% on retries - Exclude already-failed columns from skip recording to prevent double-counting in progress reporter * fix: address Nabin's review - exclude_columns, dead code, docstring - Add exclude_columns={task.column} on non-retryable batch/from_scratch drop path to prevent double-counting (same pattern as cell path) - Simplify salvage drop to per-task exclude (is_dropped guard handles multi-column case) - Remove dead _in_flight_for_rg method - Fix context.py docstring to match actual (x/X) format * fix: _drain_frontier exits before dispatching ready salvage tasks After salvage discards a cell task from dispatched (making it available in the frontier), _drain_frontier broke immediately because nothing was in-flight yet. The task and its downstream were never re-dispatched, leaving the row group incomplete. Fix: only break when both ready and in-flight are empty. * fix: salvage edge cases found by code review - _drain_frontier: only break when both ready and in-flight are empty - _salvage_rounds: re-mark sibling columns as dispatched after from_scratch retry to prevent duplicate dispatch - _salvage_stalled_row_groups: separate exhausted tasks from new drain failures to avoid treating non-stalled tasks as permanent - _checkpoint_completed_row_groups: clean up deferred tasks for checkpointed row groups - Early shutdown: salvage stalled row groups before exiting * fix: skip record_failure for already-dropped rows in salvage When multiple columns fail for the same row, the first drop records a skip for the other column. Without this guard, record_failure fires again for the second column, double-counting it.
2026-03-25 17:50:48 +00:00
def test_get_snapshot_includes_skipped_counts() -> None:
tracker = ProgressTracker(total_records=10, label="test")
tracker.record_success()
tracker.record_failure()
tracker.record_skipped()
completed, total_records, success, failed, skipped, percent, rate, emoji = tracker.get_snapshot(elapsed=2.0)
assert completed == 3
assert total_records == 10
assert success == 1
assert failed == 1
assert skipped == 1
assert percent == 30.0
assert rate == 1.5
assert emoji
def test_log_start_logs_worker_info(tracker: ProgressTracker, caplog: pytest.LogCaptureFixture) -> None:
with caplog.at_level(logging.INFO):
tracker.log_start(max_workers=8)
assert "8 concurrent workers" in caplog.text
assert "test column 'name'" in caplog.text
def test_logs_progress_at_interval(caplog: pytest.LogCaptureFixture) -> None:
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=50)
with caplog.at_level(logging.INFO):
for _ in range(5):
tracker.record_success()
assert "5/10" in caplog.text
assert "50%" in caplog.text
def test_log_final_logs_remaining_progress(caplog: pytest.LogCaptureFixture) -> None:
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=50)
for _ in range(3):
tracker.record_success()
with caplog.at_level(logging.INFO):
tracker.log_final()
assert "3/10" in caplog.text
def test_progress_log_includes_rate_and_eta(caplog: pytest.LogCaptureFixture) -> None:
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=50)
with caplog.at_level(logging.INFO):
for _ in range(5):
tracker.record_success()
assert "rec/s" in caplog.text
assert "eta" in caplog.text
def test_progress_log_shows_ok_and_failed_counts(caplog: pytest.LogCaptureFixture) -> None:
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=50)
with caplog.at_level(logging.INFO):
for _ in range(3):
tracker.record_success()
for _ in range(2):
tracker.record_failure()
assert "3 ok" in caplog.text
assert "2 failed" in caplog.text
def test_concurrent_record_calls_are_thread_safe() -> None:
tracker = ProgressTracker(total_records=1000, label="test", log_interval_percent=100)
num_threads = 10
records_per_thread = 100
def record_many_successes() -> None:
for _ in range(records_per_thread):
tracker.record_success()
def record_many_failures() -> None:
for _ in range(records_per_thread):
tracker.record_failure()
threads = []
for i in range(num_threads):
if i % 2 == 0:
thread = threading.Thread(target=record_many_successes)
else:
thread = threading.Thread(target=record_many_failures)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
expected_success = (num_threads // 2) * records_per_thread
expected_failed = (num_threads - num_threads // 2) * records_per_thread
assert tracker.completed == num_threads * records_per_thread
assert tracker.success == expected_success
assert tracker.failed == expected_failed
def test_handles_very_small_total_records() -> None:
tracker = ProgressTracker(total_records=1, label="test")
tracker.record_success()
assert tracker.completed == 1
assert tracker.success == 1
def test_handles_log_interval_larger_than_total() -> None:
tracker = ProgressTracker(total_records=5, label="test", log_interval_percent=50)
for _ in range(5):
tracker.record_success()
assert tracker.completed == 5
def test_log_final_handles_zero_records_processed(caplog: pytest.LogCaptureFixture) -> None:
tracker = ProgressTracker(total_records=10, label="test")
with caplog.at_level(logging.INFO):
tracker.log_final()
# Should not raise, may or may not log depending on implementation
def test_progress_percentage_with_zero_total() -> None:
tracker = ProgressTracker(total_records=0, label="test")
# Should not raise division by zero
tracker.record_success()
assert tracker.completed == 1
@pytest.mark.parametrize(
"total_records,log_interval_percent",
[
(10, 10), # Exact divisibility
(10, 30), # Non-exact divisibility: logs at 3, 6, 9
(1, 10), # Single record edge case
(10, 100), # Interval equals total
],
)
def test_100_percent_logged_exactly_once(
total_records: int, log_interval_percent: int, caplog: pytest.LogCaptureFixture
) -> None:
"""100% should be logged exactly once after completing all records + log_final."""
tracker = ProgressTracker(total_records=total_records, label="test", log_interval_percent=log_interval_percent)
with caplog.at_level(logging.INFO):
for _ in range(total_records):
tracker.record_success()
tracker.log_final()
assert caplog.text.count("100%") == 1
def test_record_completion_never_logs_100_percent(caplog: pytest.LogCaptureFixture) -> None:
"""_record_completion should never log 100%; that's log_final's job."""
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=10)
with caplog.at_level(logging.INFO):
for _ in range(10):
tracker.record_success()
assert caplog.text.count("100%") == 0
with caplog.at_level(logging.INFO):
tracker.log_final()
assert caplog.text.count("100%") == 1
def test_partial_completion_logs_correct_percentage(caplog: pytest.LogCaptureFixture) -> None:
"""Partial progress should show actual percentage, not 100%."""
tracker = ProgressTracker(total_records=10, label="test", log_interval_percent=10)
for _ in range(7):
tracker.record_success()
with caplog.at_level(logging.INFO):
tracker.log_final()
assert "70%" in caplog.text
assert caplog.text.count("100%") == 0
def test_concurrent_completion_logs_100_percent_once(caplog: pytest.LogCaptureFixture) -> None:
"""Thread safety: 100% logged exactly once even with concurrent completions."""
tracker = ProgressTracker(total_records=100, label="test", log_interval_percent=100)
def record_successes() -> None:
for _ in range(10):
tracker.record_success()
threads = [threading.Thread(target=record_successes) for _ in range(10)]
with caplog.at_level(logging.INFO):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
tracker.log_final()
assert tracker.completed == 100
assert caplog.text.count("100%") == 1