DataDesigner/tests_e2e/tests/test_async_engine.py
Andre Manoel 7e812630cf
feat: wire async task-queue scheduler into ColumnWiseDatasetBuilder (#429)
* feat: wire async task-queue scheduler into ColumnWiseDatasetBuilder

* chore: add async benchmark notebook and demo scripts

* fix: address all PR review comments on async builder integration

- Wire on_batch_complete through on_row_group_complete callback
- Mark trailing slots as dropped in replace_dataframe when processor filters rows
- Ensure checkpoint still runs when on_before_checkpoint raises
- Gate non-seed task dispatch on pre-batch completion
- Add public run_pre_batch_on_df to ProcessorRunner (replaces private _run_stage call)
- Add public is_column_complete_for_rg to CompletionTracker (replaces private _completed access)
- Type task_traces as list[TaskTrace] in results.py
- Add async_trace docstring to RunConfig
- Move module-level log into _build_async
- Add replace_dataframe unit tests (same-size, dropped rows, fewer rows)
- Assert on public outcomes in scheduler tests instead of private attributes
- Parametrize allow_resize validation tests
- Cache seed_cols before main loop
- Remove redundant disable_early_shutdown from AsyncTaskScheduler

* style: fix ruff format for lambda expression

* fix: address open review issues on async scheduler

- Flush completed row groups before breaking on early shutdown (data loss)
- Change error rate check from >= to > so disable_early_shutdown sentinel
  (1.0) never triggers at 100% failure rate
- Extract seeds-complete check into helper and call it in salvage rounds
  via _drain_frontier, with pre-batch gating, so pre-batch processor runs
  even when seed tasks succeed only after retry
- Fix is_column_complete_for_rg to check _batch_complete first, then verify
  all non-dropped rows for CELL_BY_CELL columns
- Replace O(|in-flight|) scan in _in_flight_for_rg with per-RG counter

* fix: sync pre-batch row drops to CompletionTracker and restore stderr safely

Pre-batch processors that filter rows marked them as dropped in
RowGroupBufferManager but not in CompletionTracker, causing unnecessary
LLM calls for rows that would be discarded at checkpoint time.

Also wrap the benchmark warmup stderr redirect in try/finally so stderr
is restored if _run_once raises.

* fix: prune _admitted_rg_ids on row group checkpoint

Prevents unbounded growth of the admission set across large runs.

* chore: remove demo/async files from PR

Dev-time benchmarks and manual test scripts - kept locally, not needed in the PR.

* fix: wire disable_early_shutdown into AsyncTaskScheduler

RunConfig.disable_early_shutdown was forwarded to the sync executor
but silently ignored in the async path. Now passed through to the
scheduler's _check_error_rate.

* test: add e2e test for async engine concurrency

Verifies the async scheduler dispatches independent LLM columns
concurrently by checking for overlapping task trace intervals.
Uses a wide DAG (sampler -> 2 parallel LLM columns) with 2 records.
Requires NVIDIA_API_KEY.

* fix: drop row group on on_before_checkpoint failure instead of writing unprocessed data

Matches on_seeds_complete failure behavior and avoids silently
checkpointing unfiltered rows when a post-batch processor fails.

* fix: skip on_before_checkpoint when no POST_BATCH processors configured

Avoids unnecessary DataFrame round-trip for every row group in the
common case where no post-batch processors exist.

* fix: address remaining review nits from nabinchha and greptile summary

- Gate on_seeds_complete on PRE_BATCH processors (matches on_before_checkpoint pattern)
- Cache seed_cols as instance attr instead of recomputing in _dispatch_seeds
- Iterate list(self._active_rgs) snapshot in _run_seeds_complete_check
- Add logger.debug to telemetry except block
- Add design comment on on_before_checkpoint failure drop behavior
- Rename row_group param to row_group_index in is_column_complete_for_rg
- Document rg_id as current_batch_number equivalence
- Use mock.patch.object in e2e test instead of direct mutation
- Add max(0, ...) floor guard on _in_flight_counts decrement
- Rename _ensure_async_engine_loop to public ensure_async_engine_loop
- Move AsyncTaskScheduler import to module level in integration tests

* fix: preserve async callback contract and e2e setup

* fix: prune _seeds_dispatched_rgs and _pre_batch_done_rgs on checkpoint

* refactor: consolidate per-RG state into _RowGroupState dataclass

Replace 5 independent collections (_active_rgs, _admitted_rg_ids,
_seeds_dispatched_rgs, _pre_batch_done_rgs, _in_flight_counts) with a
single _rg_states dict keyed by row group ID. Cleanup is now a single
`del` instead of N separate discards, eliminating the class of bugs
where one collection is missed during row group teardown.

* fix: skip checkpoint and callbacks when on_before_checkpoint fails

When on_before_checkpoint raises and all rows are dropped, the code
previously fell through to checkpoint_row_group and on_row_group_complete,
sending a spurious progress notification for a batch with zero records.
Now gates both on a `dropped` flag so they are skipped after failure.

* fix: snapshot dropped rows before await in _run_batch and sync tracker on checkpoint failure

Two fixes:
- _run_batch: snapshot dropped rows before `await agenerate` so the
  row-count expectation matches batch_df. Concurrent tasks can drop rows
  during the await, causing a spurious ValueError that would drop the
  entire row group. Write-back now re-checks is_dropped to skip rows
  dropped mid-flight.
- _checkpoint_completed_row_groups: add tracker.drop_row alongside
  buffer_manager.drop_row when on_before_checkpoint fails, keeping
  both in sync.

* feat: sliding window error rate and out-of-order row group completion test

Replace cumulative error counters with a deque-based sliding window so
that early transient failures do not permanently inflate the error rate
in long-running jobs. Add tests for the sliding window recovery path
and for deterministic out-of-order row group checkpoint ordering.

* fix: use real time delays in out-of-order completion test

asyncio.sleep(0) interleaving is not deterministic across Python
versions. Switch to asyncio.sleep(num_records * 0.02) so the smaller
row group genuinely finishes seeds first regardless of event loop
scheduling.

* fix: prevent ZeroDivisionError when shutdown_error_window is 0

Change RunConfig.shutdown_error_window constraint from ge=0 to ge=1
so the sliding window denominator is never zero.

* fix: address Greptile review nits in async_scheduler

- Move del _rg_states inside try/finally so semaphore is always released
- Add exc_info=True to pre-batch failure log for consistent tracebacks
- Short-circuit _check_error_rate when _early_shutdown already set

* fix: address Greptile summary findings

- Remove duplicate async engine log in build() (kept in _build_async)
- Guard _run_seeds_complete_check with has_pre_batch at both call sites
- Change error rate comparison from > to >= to match sync path semantics
2026-03-20 13:05:09 -03:00

120 lines
3.7 KiB
Python

# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import json
import os
import subprocess
import sys
from pathlib import Path
import pytest
NUM_RECORDS = 2
PARALLEL_COLUMNS = ("summary", "analysis")
def _run_async_engine_concurrency_case(tmp_path: Path) -> dict[str, object]:
repo_root = Path(__file__).resolve().parents[2]
script = f"""
from __future__ import annotations
import json
from collections import defaultdict
from pathlib import Path
import data_designer.config as dd
from data_designer.config.run_config import RunConfig
from data_designer.interface import DataDesigner
NUM_RECORDS = {NUM_RECORDS}
PARALLEL_COLUMNS = {PARALLEL_COLUMNS!r}
tmp_path = Path({str(tmp_path)!r})
dd_instance = DataDesigner(artifact_path=str(tmp_path))
dd_instance.set_run_config(RunConfig(buffer_size=NUM_RECORDS, async_trace=True))
config = dd.DataDesignerConfigBuilder()
config.add_column(
dd.SamplerColumnConfig(
name="topic",
sampler_type=dd.SamplerType.CATEGORY,
params=dd.CategorySamplerParams(values=["science", "history", "art"]),
)
)
for col in PARALLEL_COLUMNS:
config.add_column(
dd.LLMTextColumnConfig(
name=col,
model_alias="nvidia-text",
prompt="Write one sentence about {{{{ topic }}}} (" + col + ").",
)
)
result = dd_instance.create(config, num_records=NUM_RECORDS, dataset_name="async_e2e")
df = result.load_dataset()
traces = result.task_traces
by_col: dict[str, list[tuple[float, float]]] = defaultdict(list)
for trace in traces:
if trace.task_type == "cell" and trace.status == "ok" and trace.slot_acquired_at and trace.completed_at:
by_col[trace.column].append((trace.slot_acquired_at, trace.completed_at))
overlap_found = False
cols = [col for col in PARALLEL_COLUMNS if by_col[col]]
for i, col_a in enumerate(cols):
for col_b in cols[i + 1 :]:
for start_a, end_a in by_col[col_a]:
for start_b, end_b in by_col[col_b]:
if start_a < end_b and start_b < end_a:
overlap_found = True
break
if overlap_found:
break
if overlap_found:
break
if overlap_found:
break
payload = {{
"rows": len(df),
"columns": list(df.columns),
"non_null": {{col: bool(df[col].notna().all()) for col in ("topic", *PARALLEL_COLUMNS)}},
"trace_count": len(traces),
"overlap_found": overlap_found,
}}
print("RESULT_JSON=" + json.dumps(payload))
"""
env = os.environ.copy()
env["DATA_DESIGNER_ASYNC_ENGINE"] = "1"
completed = subprocess.run(
[sys.executable, "-c", script],
check=True,
capture_output=True,
text=True,
cwd=repo_root,
env=env,
)
for line in completed.stdout.splitlines():
if line.startswith("RESULT_JSON="):
return json.loads(line.removeprefix("RESULT_JSON="))
raise AssertionError(f"Missing RESULT_JSON marker in subprocess output:\n{completed.stdout}")
def test_async_engine_concurrent_columns(tmp_path: Path) -> None:
"""Verify the async engine runs independent LLM columns concurrently."""
if os.environ.get("NVIDIA_API_KEY") is None:
pytest.skip("NVIDIA_API_KEY must be set")
payload = _run_async_engine_concurrency_case(tmp_path)
assert payload["rows"] == NUM_RECORDS
for col in ("topic", *PARALLEL_COLUMNS):
assert col in payload["columns"]
assert payload["non_null"][col]
assert payload["trace_count"] > 0
assert payload["overlap_found"], (
"No overlapping execution found between parallel columns - async scheduler may not be dispatching concurrently"
)