Preserves tree from previous docs-website head: 5e47d33ea8. This branch is a CI-managed publish artifact like gh-pages; source provenance is tracked in commit messages rather than Git ancestry.
6.7 KiB
Custom Columns
Custom columns let you implement your own generation logic using Python functions. Use them for multi-step LLM workflows, external API integration, or any scenario requiring full programmatic control. For reusable, distributable components, see Plugins instead.
Quick Start
import data_designer.config as dd
@dd.custom_column_generator(required_columns=["name"])
def create_greeting(row: dict) -> dict:
row["greeting"] = f"Hello, {row['name']}!"
return row
config_builder.add_column(
dd.CustomColumnConfig(
name="greeting",
generator_function=create_greeting,
)
)
Function Signatures
Three signatures are supported. Parameter names are validated:
| Args | Signature | Use Case |
|---|---|---|
| 1 | fn(row) -> dict |
Simple transforms |
| 2 | fn(row, generator_params) -> dict |
With typed params |
| 3 | fn(row, generator_params, models) -> dict |
LLM access via models dict |
For full_column strategy, use df instead of row.
For LLM access without params, use generator_params: None:
@dd.custom_column_generator(required_columns=["name"], model_aliases=["my-model"])
def generate_message(row: dict, generator_params: None, models: dict) -> dict:
response, _ = models["my-model"].generate(prompt=f"Greet {row['name']}")
row["greeting"] = response
return row
Model aliases are validated before generation starts. If an alias doesn't exist in your config, an error is raised during the health check.
Generation Strategies
| Strategy | Input | Use Case |
|---|---|---|
cell_by_cell (default) |
row: dict |
LLM calls, row-by-row logic |
full_column |
df: DataFrame |
Vectorized DataFrame operations |
Recommendation: Use cell_by_cell for LLM calls. The framework handles parallelization automatically. Use full_column only for vectorized operations that don't involve LLM calls.
For full_column, set generation_strategy=dd.GenerationStrategy.FULL_COLUMN.
!!! note "Concurrent dispatch"
Sync cell_by_cell generators are dispatched concurrently across rows under the async engine. Module-level mutable state (counters, caches, non-thread-safe HTTP clients) needs synchronization or per-row instantiation. For network-bound work, prefer async def fn(row) — the engine runs it directly on its event loop and skips the thread bridge.
The Decorator
@dd.custom_column_generator(
required_columns=["col1"], # DAG ordering
side_effect_columns=["extra"], # Additional columns created
model_aliases=["model1"], # Required for LLM access
)
Models Dict
The third argument is a dict of ModelFacade instances, keyed by alias. You must declare all models required in your custom column generator in model_aliases - this populates the models dict and enables health checks before generation starts.
@dd.custom_column_generator(model_aliases=["my-model"])
def my_generator(row: dict, generator_params: None, models: dict) -> dict:
model = models["my-model"]
response, trace = model.generate(
prompt="...",
parser=my_custom_parser, # optional, defaults to identity
system_prompt="...",
max_correction_steps=3,
)
row["result"] = response
return row
This gives you direct access to all ModelFacade capabilities: custom parsers, correction loops, structured output, tool use, etc.
Configuration
| Parameter | Type | Required | Description |
|---|---|---|---|
name |
str | Yes | Column name |
generator_function |
Callable | Yes | Decorated function |
generation_strategy |
GenerationStrategy | No | CELL_BY_CELL or FULL_COLUMN |
generator_params |
BaseModel | No | Typed params passed to function |
allow_resize |
bool | No | Allow 1:N or N:1 generation |
Resizing (1:N and N:1)
FULL_COLUMN: Set allow_resize=True and return a DataFrame with more or fewer rows than the input:
@dd.custom_column_generator(
required_columns=["topic"],
side_effect_columns=["variation_id"],
)
def expand_topics(df: pd.DataFrame, params: None, models: dict) -> pd.DataFrame:
rows = []
for _, row in df.iterrows():
for i in range(3): # Generate 3 variations per input
rows.append({
"topic": row["topic"],
"question": f"Question {i+1} about {row['topic']}",
"variation_id": i,
})
return pd.DataFrame(rows)
dd.CustomColumnConfig(
name="question",
generator_function=expand_topics,
generation_strategy=dd.GenerationStrategy.FULL_COLUMN,
allow_resize=True,
)
CELL_BY_CELL: With allow_resize=True, your function may return a single row (dict) or multiple rows (list[dict]). Return [] to drop that input row.
@dd.custom_column_generator(required_columns=["id"])
def expand_row(row: dict) -> list[dict]:
return [
{**row, "variant": "a"},
{**row, "variant": "b"},
]
dd.CustomColumnConfig(
name="variant",
generator_function=expand_row,
generation_strategy=dd.GenerationStrategy.CELL_BY_CELL,
allow_resize=True,
)
Use cases:
- Expansion (1:N): Generate multiple variations per input
- Retraction (N:1): Filter, aggregate, or deduplicate records (FULL_COLUMN) or return
[]per row (CELL_BY_CELL)
Multi-Turn Example
@dd.custom_column_generator(
required_columns=["topic"],
side_effect_columns=["draft", "critique"],
model_aliases=["writer", "editor"],
)
def writer_editor(row: dict, generator_params: None, models: dict) -> dict:
draft, _ = models["writer"].generate(prompt=f"Write about '{row['topic']}'")
critique, _ = models["editor"].generate(prompt=f"Critique: {draft}")
revised, _ = models["writer"].generate(prompt=f"Revise based on: {critique}\n\nOriginal: {draft}")
row["final_text"] = revised
row["draft"] = draft
row["critique"] = critique
return row
Development Testing
Test generators with real LLM calls without running the full pipeline:
data_designer = DataDesigner()
models = data_designer.get_models(["my-model"])
result = my_generator({"name": "Alice"}, None, models)
In unit tests that mock model clients, use MagicMock(spec=ModelFacade) so async methods are auto-detected:
from unittest.mock import MagicMock
from data_designer.engine.models.facade import ModelFacade
mock_model = MagicMock(spec=ModelFacade)
Mocking only generate() will silently no-op under the async engine because the bridge routes through agenerate().