mirror of
https://github.com/NVIDIA-NeMo/DataDesigner
synced 2026-05-24 09:48:29 +00:00
plan: centralize __skipped__ handling in skip_provenance
- Document new skip_provenance.py (key constant, read/write/strip API) - Point sync builder, async scheduler, and batch buffers at shared helpers - Strip metadata before every DataFrame from buffer dicts, including FULL_COLUMN active subsets - Split §3 into skip_evaluator vs skip_provenance; extend verification Refs #479 Made-with: Cursor
This commit is contained in:
parent
9790f95acc
commit
70463789f7
1 changed files with 81 additions and 48 deletions
|
|
@ -86,7 +86,7 @@ flowchart TD
|
|||
end
|
||||
|
||||
subgraph runtime ["3 - Runtime (both engines)"]
|
||||
skip_eval["skip_evaluator.py (NEW)"]
|
||||
skip_eval["skip_evaluator.py +\nskip_provenance.py (NEW)"]
|
||||
builder["dataset_builder.py\n(sync)"]
|
||||
sched["async_scheduler.py\n(async)"]
|
||||
batch_buf["dataset_batch_manager.py\n(sync buffer)"]
|
||||
|
|
@ -263,7 +263,9 @@ Store metadata on the graph for runtime access:
|
|||
- Populate during first pass: `if sub.skip is not None: graph._skip_configs[name] = sub.skip` and `graph._propagate_skip[name] = sub.propagate_skip` (for all columns, not just those with `SkipConfig`)
|
||||
- Add accessors: `get_skip_config(column) -> SkipConfig | None`, `should_propagate_skip(column) -> bool` (defaults to `True` if column not in dict)
|
||||
|
||||
### 3. New utility: `skip_evaluator.py`
|
||||
### 3. New utilities: `skip_evaluator.py` and `skip_provenance.py`
|
||||
|
||||
#### 3a. Expression evaluation — `skip_evaluator.py`
|
||||
|
||||
**New file:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_evaluator.py`
|
||||
|
||||
|
|
@ -325,6 +327,42 @@ The module-level `_env` singleton and `lru_cache` on `_compile_skip_template` av
|
|||
|
||||
**Plugins (`CustomColumnConfig`):** Propagation uses each column's `required_columns` (Jinja-derived for LLM columns, `@custom_column_generator` metadata for custom columns). If plugin metadata omits a dependency, the column may **not** auto-skip when an upstream was skipped — authors should keep `required_columns` accurate and add a verification case for a custom column with `propagate_skip=True`.
|
||||
|
||||
#### 3b. Record provenance — `skip_provenance.py`
|
||||
|
||||
**New file:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_provenance.py`
|
||||
|
||||
Keep the magic string and all read/write/strip logic for record-inline skip metadata in **one module** so sync, async, and buffer code do not diverge. **Do not** spell `"__skipped__"` elsewhere except tests mirroring the constant.
|
||||
|
||||
```python
|
||||
SKIPPED_COLUMNS_RECORD_KEY: Final[str] = "__skipped__"
|
||||
|
||||
def get_skipped_column_names(record: dict) -> set[str]:
|
||||
"""Return a copy of skipped producer column names for this row (empty if unset)."""
|
||||
|
||||
def apply_skip_to_record(
|
||||
record: dict,
|
||||
*,
|
||||
column_name: str,
|
||||
cell_value: bool | int | float | str | None,
|
||||
side_effect_columns: Sequence[str],
|
||||
) -> None:
|
||||
"""Mutate *record* in place: provenance, primary cell value, side effects cleared."""
|
||||
|
||||
def strip_skip_metadata_for_dataframe_row(record: dict) -> dict:
|
||||
"""Shallow copy of *record* without skip provenance — safe for pd.DataFrame(rows)."""
|
||||
|
||||
def strip_skip_metadata_from_records(records: Sequence[dict]) -> list[dict]:
|
||||
"""Map strip_skip_metadata_for_dataframe_row over *records* (list comp or comprehension)."""
|
||||
```
|
||||
|
||||
**Usage contract:**
|
||||
|
||||
- **`DatasetBuilder._should_skip_cell`** and **async skip checks** call **`get_skipped_column_names(record)`** instead of `record.get("__skipped__", set())`.
|
||||
- **`DatasetBuilder._write_skip_to_record`** and **async `_run_cell` skip path** call **`apply_skip_to_record`** with `cell_value` from `SkipConfig.value` if a `SkipConfig` exists else `None`, and `side_effect_columns=self._graph.get_side_effect_columns(column_name)` (async: same via graph).
|
||||
- **`DatasetBatchManager`**, **`RowGroupBufferManager`**, and **any** code that builds a **`pd.DataFrame` from buffer dicts** (including **FULL_COLUMN** `active_df` in Step 4d / async Step 5c) call **`strip_skip_metadata_from_records`** (or the single-row helper) — **never** inline `{k: v for k, v in row.items() if k != "__skipped__"}` outside this module.
|
||||
|
||||
Optional thin wrappers on **`DatasetBuilder`** (`_should_skip_cell`, `_write_skip_to_record`) remain for graph access but **delegate** to `skip_provenance` + `skip_evaluator` for the mechanics.
|
||||
|
||||
### 4. Sync engine: `DatasetBuilder`
|
||||
|
||||
**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py`
|
||||
|
|
@ -339,7 +377,7 @@ Add a private method on `DatasetBuilder` that centralizes the skip decision for
|
|||
def _should_skip_cell(
|
||||
self, column_name: str, record: dict
|
||||
) -> bool:
|
||||
skipped_cols: set[str] = record.get("__skipped__", set())
|
||||
skipped_cols = get_skipped_column_names(record)
|
||||
|
||||
# 1. Propagation — independent of SkipConfig
|
||||
propagate = self._graph.should_propagate_skip(column_name)
|
||||
|
|
@ -366,10 +404,12 @@ def _write_skip_to_record(
|
|||
) -> None:
|
||||
skip_config = self._graph.get_skip_config(column_name)
|
||||
skip_value = skip_config.value if skip_config is not None else None
|
||||
record.setdefault("__skipped__", set()).add(column_name)
|
||||
record[column_name] = skip_value
|
||||
for se_col in self._graph.get_side_effect_columns(column_name):
|
||||
record[se_col] = None
|
||||
apply_skip_to_record(
|
||||
record,
|
||||
column_name=column_name,
|
||||
cell_value=skip_value,
|
||||
side_effect_columns=self._graph.get_side_effect_columns(column_name),
|
||||
)
|
||||
```
|
||||
|
||||
#### 4c. Modify `_fan_out_with_threads()` (line 631)
|
||||
|
|
@ -414,7 +454,9 @@ def _run_full_column_generator(self, generator: ColumnGenerator) -> None:
|
|||
active_records = [r for i, r in enumerate(batch) if i not in skip_indices]
|
||||
|
||||
if active_records:
|
||||
active_df = lazy.pd.DataFrame(active_records)
|
||||
active_df = lazy.pd.DataFrame(
|
||||
strip_skip_metadata_from_records(active_records)
|
||||
)
|
||||
result_df = generator.generate(active_df)
|
||||
result_records = result_df.to_dict(orient="records")
|
||||
|
||||
|
|
@ -444,27 +486,22 @@ The merge-back loop preserves row order: skipped rows keep their skip provenance
|
|||
|
||||
#### 5a. Skip provenance: record-inline `__skipped__`
|
||||
|
||||
Skip provenance is stored directly in each record dict under a hidden `__skipped__` key (a `set[str]` of skipped column names). When a cell is skipped, the column name is added to the record's `__skipped__` set:
|
||||
Skip provenance is stored on each record dict under **`SKIPPED_COLUMNS_RECORD_KEY`** (see Step 3b). **Reads, writes, and DataFrame stripping** go through **`skip_provenance.py`** — do not duplicate inline patterns here.
|
||||
|
||||
```python
|
||||
record.setdefault("__skipped__", set()).add(column_name)
|
||||
```
|
||||
The set travels with the record through the buffer — no separate tracking state is needed on `CompletionTracker` or elsewhere. The async engine reads skip state via **`get_skipped_column_names(buffer_manager.get_row(rg, ri))`** (after ensuring the row is a `dict`).
|
||||
|
||||
The `__skipped__` set travels with the record through the buffer — no separate tracking state is needed on `CompletionTracker` or elsewhere. The async engine reads skip state from the buffer via `buffer_manager.get_row(rg, ri).get("__skipped__", set())`.
|
||||
|
||||
The `__skipped__` key is stripped at the serialization boundary before converting records to DataFrames (see Step 8).
|
||||
The key is stripped whenever records become DataFrames (see Step 8 and **`strip_skip_metadata_from_records`**).
|
||||
|
||||
#### 5b. Modify `_run_cell()` (line 767 of `async_scheduler.py`)
|
||||
|
||||
After the `is_dropped` guard (line 772), add skip evaluation:
|
||||
|
||||
1. Get `skipped_cols` from `row_data.get("__skipped__", set())` — the row data is already read from the buffer at line 777, so no tracker query is needed.
|
||||
1. Get `skipped_cols = get_skipped_column_names(row_data)` — the row data is already read from the buffer at line 777, so no tracker query is needed.
|
||||
2. Check propagation first (independent of `SkipConfig`): `should_skip_by_propagation(self._graph.get_required_columns(task.column), skipped_cols, self._graph.should_propagate_skip(task.column))` — same list as sync (`config.required_columns` duplicated on the graph; **do not** substitute `get_upstream_columns()`).
|
||||
3. If not propagation-skipped, get `skip_config = self._graph.get_skip_config(task.column)`. If not None, check `evaluate_skip_when(skip_config.when, row_data)`.
|
||||
4. If skip (by either path), write to the buffer record via `buffer_manager.get_row(rg, ri)`:
|
||||
- `record.setdefault("__skipped__", set()).add(task.column)` — records skip provenance.
|
||||
- `record[task.column] = skip_config.value if skip_config else None` — the **column key must be present** in the record dict, not absent. Downstream `skip.when` expressions and Jinja2 templates may reference skipped columns (e.g., `{{ col is none }}`); an absent key would cause `UndefinedError`. Propagation-only skips (no `SkipConfig`) use `None`.
|
||||
- `record[se_col] = None` for each side-effect column — side-effect columns (`__trace`, `__reasoning_content`, etc.) are always written as `None`, regardless of the parent column's `skip.value`. A skipped cell has no trace or reasoning.
|
||||
4. If skip (by either path), write to the buffer record via `buffer_manager.get_row(rg, ri)` using **`apply_skip_to_record`**:
|
||||
- `cell_value=skip_config.value if skip_config else None` — the **primary column key must be present** in the record dict, not absent. Downstream `skip.when` expressions and Jinja2 templates may reference skipped columns (e.g., `{{ col is none }}`); an absent key would cause `UndefinedError`. Propagation-only skips (no `SkipConfig`) use `None`.
|
||||
- `side_effect_columns` from `self._graph.get_side_effect_columns(task.column)` — always cleared to `None` on skip.
|
||||
- Return `None`.
|
||||
|
||||
The caller (`_execute_task_inner_impl`) still marks the task complete — skipped cells ARE complete (they produced a value). Downstream tasks get unblocked and will themselves check propagation (respecting their own `propagate_skip` setting). Note: `_execute_task_inner_impl` also calls `_check_error_rate(success=True)` and `_reporter.record_success()` — skipped cells count as successes in metrics. This is acceptable for v1 (a skip is a successful outcome, not a failure), but consider adding a separate skip counter to the reporter for observability.
|
||||
|
|
@ -477,7 +514,7 @@ The caller (`_execute_task_inner_impl`) still marks the task complete — skippe
|
|||
|
||||
Pre-filter the DataFrame to exclude skipped rows before calling `generator.agenerate()`, then merge results back. Specific adjustments to the existing code:
|
||||
|
||||
1. After computing `pre_dropped` (line 799), build `pre_skipped: set[int]` by evaluating `_should_skip_cell()` for each non-dropped row. Write `skip_config.value` and `__skipped__` provenance into the buffer record for each skipped row.
|
||||
1. After computing `pre_dropped` (line 799), build `pre_skipped: set[int]` by evaluating skip logic per row (same semantics as sync **`_should_skip_cell`**, using **`get_skipped_column_names`**, **`should_skip_by_propagation`**, **`evaluate_skip_when`**, and **`apply_skip_to_record`** on the buffer record for each skipped row).
|
||||
2. Filter `batch_df` to exclude both dropped and skipped rows before calling `generator.agenerate()`.
|
||||
3. **Adjust the row-count assertion** (line 811): change `active_rows = rg_size - len(pre_dropped)` to `active_rows = rg_size - len(pre_dropped) - len(pre_skipped)`. The generator only receives non-dropped, non-skipped rows, so the result must match that count.
|
||||
4. **Adjust the merge-back loop** (lines 816-825): skip both `pre_dropped` and `pre_skipped` indices when writing results back from `result_df`.
|
||||
|
|
@ -503,46 +540,40 @@ This avoids the design problem of threading engine-internal skip state into the
|
|||
|
||||
### 8. Serialization boundary: strip `__skipped__` before DataFrame conversion
|
||||
|
||||
The `__skipped__` key is a `set` object stored in the record dict. `pd.DataFrame(records)` would serialize it into a column, which is incorrect. Strip `__skipped__` at every point where records are converted to DataFrames — in both engines.
|
||||
The value under **`SKIPPED_COLUMNS_RECORD_KEY`** is a `set` stored on the record dict. `pd.DataFrame(records)` would materialize it as an object column, which is incorrect. **Every** conversion from buffer dicts to a DataFrame must use **`strip_skip_metadata_from_records`** (Step 3b) — including **`get_dataframe`**, **`get_current_batch(as_dataframe=True)`**, **`write()`**, and **internal** FULL_COLUMN subsets (Steps 4d / 5c).
|
||||
|
||||
**Async — `RowGroupBufferManager`**
|
||||
|
||||
**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py`
|
||||
|
||||
In `get_dataframe()` (line 68-72), filter out `__skipped__`:
|
||||
In `get_dataframe()` (line 68-72), build rows with the shared helper:
|
||||
|
||||
```python
|
||||
from data_designer.engine.dataset_builders.utils.skip_provenance import strip_skip_metadata_from_records
|
||||
|
||||
def get_dataframe(self, row_group: int) -> pd.DataFrame:
|
||||
dropped = self._dropped.get(row_group, set())
|
||||
rows = [
|
||||
{k: v for k, v in row.items() if k != "__skipped__"}
|
||||
for i, row in enumerate(self._buffers[row_group])
|
||||
if i not in dropped
|
||||
]
|
||||
return lazy.pd.DataFrame(rows)
|
||||
raw = [row for i, row in enumerate(self._buffers[row_group]) if i not in dropped]
|
||||
return lazy.pd.DataFrame(strip_skip_metadata_from_records(raw))
|
||||
```
|
||||
|
||||
**Sync — `DatasetBatchManager`**
|
||||
|
||||
**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py`
|
||||
|
||||
In `get_current_batch()` (line 134), strip `__skipped__` when returning a DataFrame:
|
||||
In `get_current_batch()` (line 134), when returning a DataFrame:
|
||||
|
||||
```python
|
||||
def get_current_batch(self, *, as_dataframe: bool = False) -> pd.DataFrame | list[dict]:
|
||||
if as_dataframe:
|
||||
return lazy.pd.DataFrame(
|
||||
[{k: v for k, v in row.items() if k != "__skipped__"} for row in self._buffer]
|
||||
)
|
||||
return lazy.pd.DataFrame(strip_skip_metadata_from_records(self._buffer))
|
||||
return self._buffer
|
||||
```
|
||||
|
||||
Also in `write()` (line 172), which converts `self._buffer` to a DataFrame for parquet serialization:
|
||||
|
||||
```python
|
||||
dataframe=lazy.pd.DataFrame(
|
||||
[{k: v for k, v in row.items() if k != "__skipped__"} for row in self._buffer]
|
||||
),
|
||||
dataframe=lazy.pd.DataFrame(strip_skip_metadata_from_records(self._buffer)),
|
||||
```
|
||||
|
||||
The `__skipped__` key remains on **in-memory record dicts** during generation for propagation. It is stripped when building **DataFrames** for `get_dataframe()` / `get_current_batch(as_dataframe=True)` / `write()` so it never appears as a Parquet column.
|
||||
|
|
@ -559,10 +590,11 @@ The `__skipped__` key remains on **in-memory record dicts** during generation fo
|
|||
| `engine/.../dag.py` | Add `skip.columns` edges in topological sort (guarded by `if col.skip is not None`) |
|
||||
| `engine/.../execution_graph.py` | Add `skip.columns` edges (matching existing `required_columns` pattern) + `_required_columns: dict[str, list[str]]` + `get_required_columns(column) -> list[str]` (**do not** use `get_upstream_columns()` for propagation — it mixes `skip.when` edges with data deps) + `_skip_configs` / `_propagate_skip` + `get_skip_config()` / `should_propagate_skip()` |
|
||||
| `engine/.../skip_evaluator.py` | **NEW** — `NativeSandboxedEnvironment`, `_compile_skip_template()` (cached), `evaluate_skip_when()`, `should_skip_by_propagation()` |
|
||||
| `engine/.../dataset_builder.py` | `_should_skip_cell()` + `_write_skip_to_record()` helpers; skip pre-check in `_fan_out_with_threads()`; pre-filter + merge-back in `_run_full_column_generator()` |
|
||||
| `engine/.../async_scheduler.py` | Skip checks in `_run_cell()` and `_run_batch()` with FULL_COLUMN pre-filtering + adjusted row-count assertion (reads `__skipped__` from buffer records, `propagate_skip` + `SkipConfig` from graph) |
|
||||
| `engine/.../dataset_batch_manager.py` | Strip `__skipped__` key in `get_current_batch(as_dataframe=True)` and `write()` |
|
||||
| `engine/.../row_group_buffer.py` | Strip `__skipped__` key in `get_dataframe()` |
|
||||
| `engine/.../skip_provenance.py` | **NEW** — `SKIPPED_COLUMNS_RECORD_KEY`, `get_skipped_column_names()`, `apply_skip_to_record()`, `strip_skip_metadata_for_dataframe_row()`, `strip_skip_metadata_from_records()` (sole owners of `__skipped__` string and strip logic) |
|
||||
| `engine/.../dataset_builder.py` | `_should_skip_cell()` + `_write_skip_to_record()` delegate to `skip_provenance` + `skip_evaluator`; skip pre-check in `_fan_out_with_threads()`; pre-filter + merge-back in `_run_full_column_generator()` (strip before `DataFrame(active_records)`) |
|
||||
| `engine/.../async_scheduler.py` | Skip checks in `_run_cell()` / `_run_batch()` using same helpers as sync; FULL_COLUMN pre-filter builds DataFrame only via `strip_skip_metadata_from_records` |
|
||||
| `engine/.../dataset_batch_manager.py` | `get_current_batch(as_dataframe=True)` and `write()` use `strip_skip_metadata_from_records` |
|
||||
| `engine/.../row_group_buffer.py` | `get_dataframe()` uses `strip_skip_metadata_from_records` |
|
||||
| `engine/.../expression.py` | No changes — skip handling is done at the engine dispatch layer (Step 5c / Step 4d pre-filtering) |
|
||||
| `engine/validation.py` | `validate_skip_references()` (ERROR level) + sampler/seed + allow_resize checks |
|
||||
|
||||
|
|
@ -582,7 +614,7 @@ The `__skipped__` key remains on **in-memory record dicts** during generation fo
|
|||
|
||||
**Resolution (unchanged):** Start with a `DropSkippedRowsProcessorConfig` processor — it's opt-in, composable with other processors, and doesn't require new parameters on `create()` or column configs.
|
||||
|
||||
Implementation must align with Step 8: either the processor runs on **record dicts** while `__skipped__` is still present, or it uses a **derived column** produced before `__skipped__` is stripped from DataFrame views — not `record.get("__skipped__")` on a post-strip DataFrame (that key will not exist there).
|
||||
Implementation must align with Step 8: either the processor runs on **record dicts** while **`get_skipped_column_names`** is non-empty, or it uses a **derived column** produced before `strip_skip_metadata_from_records` — not provenance reads on a post-strip DataFrame (metadata will not be there).
|
||||
|
||||
## Open Questions
|
||||
|
||||
|
|
@ -595,7 +627,7 @@ Implementation must align with Step 8: either the processor runs on **record dic
|
|||
|
||||
The record-inline `__skipped__` infrastructure designed here could serve both use cases — #362 could write to the same `__skipped__` set (or a parallel `__failed__` set) in the record dict, and downstream propagation would work identically. Consider designing one shared propagation path instead of building two separate cascades. Not blocking for this implementation, but worth keeping in mind before the implementation PRs land to avoid a second refactor.
|
||||
|
||||
The shared propagation path needs to exist in both `DatasetBuilder` (sync — `_should_skip_cell()` / `_fan_out_with_threads()` / `_run_full_column_generator()`) and `AsyncTaskScheduler` (async — `_run_cell()` / `_run_batch()`). The `skip_evaluator.py` utility module is engine-agnostic and already serves both.
|
||||
The shared propagation path needs to exist in both `DatasetBuilder` (sync — `_should_skip_cell()` / `_fan_out_with_threads()` / `_run_full_column_generator()`) and `AsyncTaskScheduler` (async — `_run_cell()` / `_run_batch()`). The `skip_evaluator.py` and `skip_provenance.py` utility modules are engine-agnostic and serve both.
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -603,9 +635,10 @@ The shared propagation path needs to exist in both `DatasetBuilder` (sync — `_
|
|||
|
||||
1. **Unit tests — config:** `SkipConfig` field defaults (`value=None`), Jinja2 syntax validation on `when`, `columns` extraction (cached), `SingleColumnConfig.skip` defaults to `None`, `SingleColumnConfig.propagate_skip` defaults to `True`, rejection of `skip` on sampler/seed types, rejection of `skip` + `allow_resize`, rejection of self-referencing `skip.when` (column references itself)
|
||||
2. **Unit tests — skip evaluator:** `NativeSandboxedEnvironment` returns native Python types; `evaluate_skip_when` with truthy/falsy expressions (including `False`/`None`/`0` — the case-sensitivity bug); `evaluate_skip_when` against deserialized JSON records; `should_skip_by_propagation` with `propagate_skip=True` and `propagate_skip=False`; `StrictUndefined` raises `UndefinedError` for missing variables (not silently truthy); error handling in `evaluate_skip_when` (graceful failure on sandbox violations)
|
||||
3. **Unit tests — DAG/graph:** `skip.columns` become edges; unknown column in `skip.when` raises `ValueError` (same behavior as unknown `required_columns`); `skip.when` referencing sampler/seed columns (available via `MultiColumnConfig` flattening) resolves correctly; **`get_required_columns()`** returns only config `required_columns` (not the same as `get_upstream_columns()` after `skip` edges exist) — regression case: column with `skip.when` on `gating_col` and empty `required_columns` is **not** propagation-skipped solely because `gating_col` ∈ `__skipped__`
|
||||
4. **Integration tests (sync):** Column with `skip` produces `skip.value` for matching rows in `_fan_out_with_threads`; FULL_COLUMN generators pre-filter and merge-back correctly in `_run_full_column_generator`; downstream with no `SkipConfig` auto-skips via `propagate_skip=True`; downstream with `propagate_skip=False` does NOT auto-skip; row count preserved; skipped cells never submitted to thread pool (verify generator not called for skipped rows); **custom column** with declared `required_columns` propagates skip when upstream is skipped (`propagate_skip=True`)
|
||||
5. **Integration tests (async):** Column with `skip` produces `skip.value` for matching rows; downstream with no `SkipConfig` auto-skips via `propagate_skip=True`; downstream with `propagate_skip=False` does NOT auto-skip; **chained propagation** (A skipped -> B auto-skips -> C auto-skips, where B and C have no `SkipConfig`) works transitively; row count preserved; FULL_COLUMN generators pre-filter (verify LLM calls not made for skipped rows); custom `skip.value` written correctly; propagation-only skips write `None` (not a configured value); side-effect columns get `None`; skip count is logged per-column
|
||||
6. **Validation tests:** Unknown column in `skip.when` produces ERROR violation; sampler/seed + `skip` produces violation; `allow_resize` + `skip` produces violation
|
||||
7. **Serialization tests:** Verify `__skipped__` is stripped from DataFrame output in both `RowGroupBufferManager.get_dataframe()` (async) and `DatasetBatchManager.get_current_batch(as_dataframe=True)` / `write()` (sync) — the key must not appear as a column in the resulting DataFrame
|
||||
8. **Run:** `make check-all-fix` + `make test` + `make update-license-headers`
|
||||
3. **Unit tests — skip provenance:** `get_skipped_column_names` empty vs populated and copy semantics; `apply_skip_to_record` adds key, sets cell and side effects; `strip_skip_metadata_*` removes only `SKIPPED_COLUMNS_RECORD_KEY` and preserves other keys; no other module hard-codes `"__skipped__"` (grep / lint rule optional)
|
||||
4. **Unit tests — DAG/graph:** `skip.columns` become edges; unknown column in `skip.when` raises `ValueError` (same behavior as unknown `required_columns`); `skip.when` referencing sampler/seed columns (available via `MultiColumnConfig` flattening) resolves correctly; **`get_required_columns()`** returns only config `required_columns` (not the same as `get_upstream_columns()` after `skip` edges exist) — regression case: column with `skip.when` on `gating_col` and empty `required_columns` is **not** propagation-skipped solely because `gating_col` ∈ `__skipped__`
|
||||
5. **Integration tests (sync):** Column with `skip` produces `skip.value` for matching rows in `_fan_out_with_threads`; FULL_COLUMN generators pre-filter and merge-back correctly in `_run_full_column_generator`; downstream with no `SkipConfig` auto-skips via `propagate_skip=True`; downstream with `propagate_skip=False` does NOT auto-skip; row count preserved; skipped cells never submitted to thread pool (verify generator not called for skipped rows); **custom column** with declared `required_columns` propagates skip when upstream is skipped (`propagate_skip=True`)
|
||||
6. **Integration tests (async):** Column with `skip` produces `skip.value` for matching rows; downstream with no `SkipConfig` auto-skips via `propagate_skip=True`; downstream with `propagate_skip=False` does NOT auto-skip; **chained propagation** (A skipped -> B auto-skips -> C auto-skips, where B and C have no `SkipConfig`) works transitively; row count preserved; FULL_COLUMN generators pre-filter (verify LLM calls not made for skipped rows); custom `skip.value` written correctly; propagation-only skips write `None` (not a configured value); side-effect columns get `None`; skip count is logged per-column
|
||||
7. **Validation tests:** Unknown column in `skip.when` produces ERROR violation; sampler/seed + `skip` produces violation; `allow_resize` + `skip` produces violation
|
||||
8. **Serialization tests:** Verify `SKIPPED_COLUMNS_RECORD_KEY` is stripped from DataFrame output (via `strip_skip_metadata_from_records`) in both `RowGroupBufferManager.get_dataframe()` (async) and `DatasetBatchManager.get_current_batch(as_dataframe=True)` / `write()` (sync) — it must not appear as a column in the resulting DataFrame; FULL_COLUMN `active_df` path also stripped
|
||||
9. **Run:** `make check-all-fix` + `make test` + `make update-license-headers`
|
||||
|
|
|
|||
Loading…
Reference in a new issue