# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations import contextlib import json import logging import warnings from datetime import datetime from pathlib import Path from typing import Any from unittest.mock import MagicMock, PropertyMock, patch import pytest from pydantic import ValidationError import data_designer.interface.data_designer as dd_mod import data_designer.lazy_heavy_imports as lazy from data_designer.config.column_configs import CustomColumnConfig, ExpressionColumnConfig, SamplerColumnConfig from data_designer.config.config_builder import DataDesignerConfigBuilder from data_designer.config.custom_column import custom_column_generator from data_designer.config.errors import InvalidConfigError from data_designer.config.models import ModelProvider from data_designer.config.processors import DropColumnsProcessorConfig from data_designer.config.run_config import JinjaRenderingEngine, RequestAdmissionTuningConfig, RunConfig from data_designer.config.sampler_params import CategorySamplerParams, DatetimeSamplerParams, SamplerType from data_designer.config.seed import IndexRange, PartitionBlock, SamplingStrategy from data_designer.config.seed_source import ( AgentRolloutFormat, AgentRolloutSeedSource, DirectorySeedSource, FileContentsSeedSource, HuggingFaceSeedSource, ) from data_designer.engine.resources.seed_reader import ( FileSystemSeedReader, SeedReaderError, SeedReaderFileSystemContext, ) from data_designer.engine.secret_resolver import CompositeResolver, EnvironmentResolver, PlaintextResolver from data_designer.engine.testing.seed_readers import LineFanoutDirectorySeedReader from data_designer.engine.testing.stubs import StubHuggingFaceSeedReader from data_designer.interface.data_designer import DataDesigner from data_designer.interface.errors import ( DataDesignerEarlyShutdownError, DataDesignerGenerationError, DataDesignerProfilingError, ) class CustomDirectorySeedReader(FileSystemSeedReader[DirectorySeedSource]): output_columns = ["relative_path", "file_name", "decorated_path"] def build_manifest(self, *, context: SeedReaderFileSystemContext) -> lazy.pd.DataFrame | list[dict[str, str]]: matched_paths = self.get_matching_relative_paths( context=context, file_pattern=self.source.file_pattern, recursive=self.source.recursive, ) return [ { "relative_path": relative_path, "file_name": Path(relative_path).name, } for relative_path in matched_paths ] def hydrate_row( self, *, manifest_row: dict[str, Any], context: SeedReaderFileSystemContext, ) -> dict[str, str]: del context return { "relative_path": str(manifest_row["relative_path"]), "file_name": str(manifest_row["file_name"]), "decorated_path": f"custom::{manifest_row['relative_path']}", } def _add_irrelevant_sampler_column(builder: DataDesignerConfigBuilder) -> None: builder.add_column( SamplerColumnConfig( name="irrelevant", sampler_type=SamplerType.CATEGORY, params=CategorySamplerParams(values=["irrelevant"]), ) ) def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as file: for row in rows: file.write(f"{json.dumps(row)}\n") def _write_json(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload), encoding="utf-8") def _write_empty_jsonl(path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text("", encoding="utf-8") def _write_claude_trace_directory(root_path: Path) -> None: session_dir = root_path / "project-a" subagents_dir = session_dir / "subagents" subagents_dir.mkdir(parents=True) _write_jsonl( session_dir / "session-1.jsonl", [ {"type": "user", "sessionId": "session-1", "message": {"content": "Inspect the repo"}}, { "type": "assistant", "sessionId": "session-1", "message": { "content": [ {"type": "thinking", "thinking": "Need to inspect"}, {"type": "tool_use", "id": "toolu_1", "name": "ReadFile", "input": {"path": "README.md"}}, ], }, }, { "type": "user", "sessionId": "session-1", "message": { "content": [{"type": "tool_result", "tool_use_id": "toolu_1", "content": "README contents"}], }, }, { "type": "assistant", "sessionId": "session-1", "message": {"content": [{"type": "text", "text": "Repo inspected"}]}, }, ], ) _write_jsonl( subagents_dir / "agent-a.jsonl", [ { "type": "user", "sessionId": "session-1", "agentId": "agent-a", "isSidechain": True, "message": {"content": "Check tests"}, }, { "type": "assistant", "sessionId": "session-1", "agentId": "agent-a", "isSidechain": True, "message": {"content": [{"type": "text", "text": "Tests checked"}]}, }, ], ) (session_dir / "sessions-index.json").write_text( json.dumps( { "version": 1, "entries": [ { "sessionId": "session-1", "projectPath": "/repo-from-index", "summary": "Investigate repository", } ], } ), encoding="utf-8", ) def _write_codex_trace_directory(root_path: Path) -> None: codex_dir = root_path / "sessions" / "2026" / "03" / "10" codex_dir.mkdir(parents=True) _write_jsonl( codex_dir / "rollout-2026-03-10T00-00-00-session.jsonl", [ { "type": "session_meta", "payload": {"id": "codex-session", "cwd": "/workspace"}, }, { "type": "response_item", "payload": { "type": "message", "role": "user", "content": [{"type": "input_text", "text": "List files"}], }, }, { "type": "response_item", "payload": { "type": "function_call", "name": "exec_command", "arguments": '{"cmd":"ls"}', "call_id": "call_1", }, }, { "type": "response_item", "payload": { "type": "function_call_output", "call_id": "call_1", "output": "README.md\nsrc", }, }, { "type": "response_item", "payload": { "type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "Listed files"}], }, }, ], ) def _write_atif_trace_directory(root_path: Path) -> None: trace_dir = root_path / "sessions" trace_dir.mkdir(parents=True, exist_ok=True) (trace_dir / "session-1.json").write_text( json.dumps( { "schema_version": "ATIF-v1.6", "session_id": "atif-session-1", "agent": { "name": "harbor-agent", "model_name": "gpt-5", "extra": {"cwd": "/workspace/project", "git_branch": "main"}, }, "steps": [ { "step_id": 1, "timestamp": "2026-04-06T12:00:00Z", "source": "user", "message": "Inspect the repository.", }, { "step_id": 2, "timestamp": "2026-04-06T12:00:04Z", "source": "agent", "message": [{"type": "text", "text": "Repository inspected"}], }, ], } ), encoding="utf-8", ) def _write_hermes_trace_directory(root_path: Path) -> None: _write_json( root_path / "request_dump_20260407_092759_baeaac_20260407_093000_000000.json", { "session_id": "20260407_092759_baeaac", "timestamp": "2026-04-07T09:30:00", "reason": "debug_dump", "error": None, "request": {"messages": []}, }, ) _write_json( root_path / "session_20260407_092759_baeaac.json", { "session_id": "20260407_092759_baeaac", "model": "aws/anthropic/bedrock-claude-opus-4-6", "base_url": "https://inference-api.nvidia.com/v1", "platform": "cli", "session_start": "2026-04-07T09:39:07.028463", "last_updated": "2026-04-07T09:51:07.905570", "system_prompt": "You are Hermes.", "tools": [ { "type": "function", "function": { "name": "terminal", "description": "Run shell commands.", "parameters": {"type": "object", "properties": {}, "required": []}, }, } ], "messages": [ {"role": "user", "content": "Set up a uv project."}, { "role": "assistant", "content": "I'll initialize the project.", "finish_reason": "tool_calls", "tool_calls": [ { "id": "tooluse_init", "call_id": "tooluse_init", "type": "function", "function": { "name": "terminal", "arguments": '{"command":"uv init"}', }, } ], }, { "role": "tool", "tool_call_id": "tooluse_init", "content": '{"output":"Initialized project","exit_code":0,"error":null}', }, { "role": "assistant", "content": "Done.", "finish_reason": "stop", "tool_calls": [], }, ], }, ) _write_json( root_path / "sessions.json", {"slack:thread-1": "gateway-session-1"}, ) _write_jsonl( root_path / "gateway-session-1.jsonl", [ {"role": "user", "content": "Check the deployment status."}, { "role": "assistant", "content": "I'll inspect the logs.", "finish_reason": "tool_calls", "tool_calls": [ { "id": "tooluse_logs", "type": "function", "function": { "name": "terminal", "arguments": '{"command":"kubectl logs deploy/app"}', }, } ], }, { "role": "tool", "tool_call_id": "tooluse_logs", "content": '{"output":"healthy","exit_code":0,"error":null}', }, ], ) def _write_claude_trace_directory_with_unhandled_files(root_path: Path) -> None: _write_claude_trace_directory(root_path) _write_jsonl(root_path / "project-a" / "history.jsonl", [{"type": "system"}]) _write_jsonl(root_path / "project-a" / "tool-results" / "ignored.jsonl", [{"type": "system"}]) @pytest.fixture def stub_artifact_path(tmp_path): """Temporary directory for artifacts.""" return tmp_path / "artifacts" @pytest.fixture def stub_managed_assets_path(tmp_path): """Temporary directory for managed assets.""" managed_path = tmp_path / "managed-assets" managed_path.mkdir(parents=True, exist_ok=True) return managed_path @pytest.fixture def stub_model_providers(): return [ ModelProvider( name="stub-model-provider", endpoint="https://api.stub-model-provider.com/v1", api_key="stub-model-provider-api-key", ) ] @pytest.fixture def stub_seed_reader(): return StubHuggingFaceSeedReader() def _builder_with_allow_resize() -> DataDesignerConfigBuilder: """Config with one allow_resize=True column — forces sync-engine fallback.""" @custom_column_generator() def _expander(row: dict) -> list[dict]: return [{**row, "item": i} for i in range(2)] builder = DataDesignerConfigBuilder() builder.add_column( SamplerColumnConfig( name="seed", sampler_type=SamplerType.CATEGORY, params=CategorySamplerParams(values=["a"]), ) ) builder.add_column( CustomColumnConfig( name="item", generator_function=_expander, allow_resize=True, ) ) return builder @pytest.mark.parametrize( "env_value,with_allow_resize,expected,expect_deprecation", [ ("1", False, "async", False), ("1", True, "sync", False), ("0", False, "sync", True), ], ids=[ "async-on-no-fallback-uses-async-clients", "async-on-allow-resize-falls-back-to-sync-clients", "async-off-uses-sync-clients-and-warns", ], ) def test_resolve_client_concurrency_mode_matches_engine_choice( env_value: str, with_allow_resize: bool, expected: str, expect_deprecation: bool, monkeypatch: pytest.MonkeyPatch, ) -> None: """Client mode must match the engine the run will actually use. Without this alignment, a sync-fallback run (e.g. ``allow_resize=True``) would be left with async-only clients and call sync methods on them, raising ``SyncClientUnavailableError`` from inside the sync engine. The ``DATA_DESIGNER_ASYNC_ENGINE=0`` opt-out path also emits a ``DeprecationWarning`` so users on the legacy sync engine see a pre-removal signal in their logs. The auto-fallback path (``allow_resize=True``) does not double-warn here; the builder layer emits its own warning when the run actually executes. """ monkeypatch.setattr(dd_mod, "DATA_DESIGNER_ASYNC_ENGINE", env_value == "1") builder = _builder_with_allow_resize() if with_allow_resize else DataDesignerConfigBuilder() if not with_allow_resize: builder.add_column( SamplerColumnConfig( name="seed", sampler_type=SamplerType.CATEGORY, params=CategorySamplerParams(values=["a"]), ) ) if expect_deprecation: with pytest.warns(DeprecationWarning, match="legacy sync engine"): mode = DataDesigner._resolve_client_concurrency_mode(builder) else: with warnings.catch_warnings(): warnings.simplefilter("error", DeprecationWarning) mode = DataDesigner._resolve_client_concurrency_mode(builder) assert mode.value == expected def test_init_with_custom_secret_resolver(stub_artifact_path, stub_model_providers): """Test DataDesigner initialization with custom secret resolver.""" designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), ) assert designer is not None def test_init_with_default_composite_secret_resolver(stub_artifact_path, stub_model_providers): """Test DataDesigner initialization with default composite secret resolver.""" designer = DataDesigner(artifact_path=stub_artifact_path, model_providers=stub_model_providers) assert designer is not None assert isinstance(designer.secret_resolver, CompositeResolver) # Verify the composite resolver is properly configured with the expected resolvers resolvers = designer.secret_resolver.resolvers assert len(resolvers) == 2 assert isinstance(resolvers[0], EnvironmentResolver) assert isinstance(resolvers[1], PlaintextResolver) def test_init_with_string_path(stub_artifact_path, stub_model_providers): """Test DataDesigner accepts string paths.""" designer = DataDesigner(artifact_path=str(stub_artifact_path), model_providers=stub_model_providers) assert designer is not None assert isinstance(designer.artifact_path, Path) assert designer.artifact_path == stub_artifact_path def test_init_with_path_object(stub_artifact_path, stub_model_providers): """Test DataDesigner accepts Path objects.""" designer = DataDesigner(artifact_path=stub_artifact_path, model_providers=stub_model_providers) assert designer is not None def test_init_user_supplied_providers_ignore_unrelated_yaml_default( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, ) -> None: """Regression for #588: a YAML ``default:`` that names a provider absent from a user-supplied ``model_providers`` list must not leak into construction. Pre-fix this raised ``ValidationError: Specified default 'unrelated' not found in providers list``. """ with patch.object(dd_mod, "get_default_provider_name", return_value="unrelated"): data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) assert data_designer.model_provider_registry.get_default_provider_name() == "stub-model-provider" def test_init_user_supplied_providers_preserve_first_wins_over_yaml_default( stub_artifact_path: Path, stub_managed_assets_path: Path, ) -> None: """Regression for #588: when the YAML ``default:`` matches a user-supplied provider that isn't first in the list, the documented ``model_providers[0]`` "first wins" behavior must not be silently overridden. """ user_providers = [ ModelProvider( name="first-provider", endpoint="https://first.example.com/v1", api_key="FIRST_API_KEY", ), ModelProvider( name="second-provider", endpoint="https://second.example.com/v1", api_key="SECOND_API_KEY", ), ] # Multi-provider construction (user-supplied list of length > 1) still # passes ``default=`` to ``ModelProviderRegistry`` — that's the deprecated # path under #589 — so the registry-level deprecation fires here. with ( patch.object(dd_mod, "get_default_provider_name", return_value="second-provider"), pytest.warns(DeprecationWarning, match="ModelProviderRegistry.default is deprecated"), ): data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=user_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) assert data_designer.model_provider_registry.get_default_provider_name() == "first-provider" def test_init_no_user_providers_uses_yaml_default( stub_artifact_path: Path, stub_managed_assets_path: Path, ) -> None: """Pin the unchanged YAML-fallback path: when the caller omits ``model_providers``, DataDesigner consults both ``providers:`` and ``default:`` from the YAML. The fix in #588 only changes the user-supplied branch; this test locks the YAML-fallback branch's contract so a future refactor can't silently regress it. """ yaml_providers = [ ModelProvider( name="yaml-first", endpoint="https://yaml-first.example.com/v1", api_key="yaml-first-key", ), ModelProvider( name="yaml-second", endpoint="https://yaml-second.example.com/v1", api_key="yaml-second-key", ), ] with ( patch.object(dd_mod, "get_default_providers", return_value=yaml_providers), patch.object(dd_mod, "get_default_provider_name", return_value="yaml-second"), ): data_designer = DataDesigner( artifact_path=stub_artifact_path, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) assert data_designer.model_provider_registry.get_default_provider_name() == "yaml-second" def test_init_yaml_default_emits_single_deprecation_warning( stub_artifact_path: Path, stub_managed_assets_path: Path, ) -> None: """Regression for PR #594 review: when ``DataDesigner()`` falls back to the YAML's ``providers:`` and ``default:``, the user should see a single ``DeprecationWarning`` (the YAML one) rather than a duplicate cascade where ``ModelProviderRegistry._warn_on_explicit_default`` also fires for the same root cause. See issue #589. """ yaml_providers = [ ModelProvider( name="yaml-first", endpoint="https://yaml-first.example.com/v1", api_key="yaml-first-key", ), ModelProvider( name="yaml-second", endpoint="https://yaml-second.example.com/v1", api_key="yaml-second-key", ), ] with warnings.catch_warnings(record=True) as caught: warnings.simplefilter("always", DeprecationWarning) with ( patch.object(dd_mod, "get_default_providers", return_value=yaml_providers), patch.object(dd_mod, "get_default_provider_name") as mock_get_default, ): mock_get_default.side_effect = lambda: ( warnings.warn( "The 'default:' key in /fake/path is deprecated and will " "be removed in a future release. Remove it and specify provider= " "explicitly on each ModelConfig instead. See issue #589.", DeprecationWarning, stacklevel=2, ) or "yaml-second" ) DataDesigner( artifact_path=stub_artifact_path, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) deprecation_messages = [str(w.message) for w in caught if issubclass(w.category, DeprecationWarning)] yaml_default_warnings = [m for m in deprecation_messages if "'default:' key" in m] registry_default_warnings = [m for m in deprecation_messages if "ModelProviderRegistry.default is deprecated" in m] assert len(yaml_default_warnings) == 1, deprecation_messages assert registry_default_warnings == [], ( "Registry-level deprecation should be suppressed in the YAML-fallback path " "to avoid two warnings for the same root cause." ) def test_init_no_user_providers_no_yaml_default_stays_quiet( stub_artifact_path: Path, stub_managed_assets_path: Path, ) -> None: """Pin the bare-``DataDesigner()`` happy path: when the caller passes nothing and the YAML carries multiple ``providers:`` but no ``default:`` key, ``resolve_model_provider_registry`` synthesises ``default=providers[0].name`` to satisfy ``check_implicit_default``. The user did not opt into the deprecated registry-level default — the library filled it in on their behalf — so ``_warn_on_explicit_default`` must stay quiet. The fresh-install YAML ships exactly this shape (3 providers, no ``default:``), so a regression here is what every user sees on their first ``DataDesigner()`` call. Counterpart to ``test_init_user_supplied_providers_preserve_first_wins_over_yaml_default``, which pins that the warning *does* fire when the caller hand-builds a multi-provider list themselves (they wrote the multi-provider intent, so the deprecation nudge applies). """ yaml_providers = [ ModelProvider( name="yaml-first", endpoint="https://yaml-first.example.com/v1", api_key="yaml-first-key", ), ModelProvider( name="yaml-second", endpoint="https://yaml-second.example.com/v1", api_key="yaml-second-key", ), ] with warnings.catch_warnings(record=True) as caught: warnings.simplefilter("always", DeprecationWarning) with ( patch.object(dd_mod, "get_default_providers", return_value=yaml_providers), patch.object(dd_mod, "get_default_provider_name", return_value=None), ): data_designer = DataDesigner( artifact_path=stub_artifact_path, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) deprecation_messages = [str(w.message) for w in caught if issubclass(w.category, DeprecationWarning)] registry_default_warnings = [m for m in deprecation_messages if "ModelProviderRegistry.default is deprecated" in m] assert registry_default_warnings == [], ( "Library-synthesised default must not emit the registry-level deprecation; " f"the user did not opt into it. Saw: {deprecation_messages}" ) # Behavioral pin: first-wins still resolves correctly. assert data_designer.model_provider_registry.get_default_provider_name() == "yaml-first" def test_run_config_setting_persists(stub_artifact_path, stub_model_providers): """Test that run config setting persists across multiple calls.""" data_designer = DataDesigner(artifact_path=stub_artifact_path, model_providers=stub_model_providers) original_request_admission = data_designer._request_admission # Test default values assert data_designer.run_config.disable_early_shutdown is False assert data_designer.run_config.shutdown_error_rate == 0.5 assert data_designer.run_config.shutdown_error_window == 10 assert data_designer.run_config.buffer_size == 1000 assert data_designer.run_config.max_conversation_restarts == 5 assert data_designer.run_config.max_conversation_correction_steps == 0 # Test setting custom values data_designer.set_run_config( RunConfig( disable_early_shutdown=True, shutdown_error_rate=0.8, shutdown_error_window=25, buffer_size=500, max_conversation_restarts=7, max_conversation_correction_steps=2, request_admission=RequestAdmissionTuningConfig(successes_until_increase=7), ) ) assert data_designer.run_config.disable_early_shutdown is True assert data_designer.run_config.shutdown_error_rate == 1.0 # normalized when disabled assert data_designer.run_config.shutdown_error_window == 25 assert data_designer.run_config.buffer_size == 500 assert data_designer.run_config.max_conversation_restarts == 7 assert data_designer.run_config.max_conversation_correction_steps == 2 assert data_designer._request_admission is not original_request_admission assert data_designer._request_admission.config.successes_until_increase == 7 # Test updating values data_designer.set_run_config( RunConfig( disable_early_shutdown=False, shutdown_error_rate=0.3, shutdown_error_window=5, buffer_size=750, max_conversation_restarts=9, max_conversation_correction_steps=1, ) ) assert data_designer.run_config.disable_early_shutdown is False assert data_designer.run_config.shutdown_error_rate == 0.3 assert data_designer.run_config.shutdown_error_window == 5 assert data_designer.run_config.buffer_size == 750 assert data_designer.run_config.max_conversation_restarts == 9 assert data_designer.run_config.max_conversation_correction_steps == 1 def test_run_config_normalizes_error_rate_when_disabled(stub_artifact_path, stub_model_providers): """Test that shutdown_error_rate is normalized to 1.0 when disabled.""" data_designer = DataDesigner(artifact_path=stub_artifact_path, model_providers=stub_model_providers) # When enabled (default), shutdown_error_rate should use the configured value data_designer.set_run_config( RunConfig( disable_early_shutdown=False, shutdown_error_rate=0.7, ) ) assert data_designer.run_config.shutdown_error_rate == 0.7 # When disabled, shutdown_error_rate should be normalized to 1.0 data_designer.set_run_config( RunConfig( disable_early_shutdown=True, shutdown_error_rate=0.7, ) ) assert data_designer.run_config.shutdown_error_rate == 1.0 def test_create_forwards_on_batch_complete_callback( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_sampler_only_config_builder: DataDesignerConfigBuilder, stub_managed_assets_path: Path, ) -> None: data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) def on_batch_complete(path: Path) -> None: del path with ( patch.object(data_designer, "_create_resource_provider") as mock_resource_provider_method, patch.object(data_designer, "_create_dataset_builder") as mock_builder_method, patch.object(data_designer, "_create_dataset_profiler") as mock_profiler_method, ): mock_resource_provider = MagicMock() mock_resource_provider.get_dataset_metadata.return_value = {} mock_resource_provider_method.return_value = mock_resource_provider mock_builder = MagicMock() mock_builder.build.return_value = None mock_builder.task_traces = [] mock_builder.artifact_storage.load_dataset_with_dropped_columns.return_value = lazy.pd.DataFrame({"col": [1]}) mock_builder_method.return_value = mock_builder mock_profiler = MagicMock() mock_profiler.profile_dataset.return_value = None mock_profiler_method.return_value = mock_profiler data_designer.create(stub_sampler_only_config_builder, num_records=1, on_batch_complete=on_batch_complete) _, build_kwargs = mock_builder.build.call_args assert build_kwargs["num_records"] == 1 assert build_kwargs["on_batch_complete"] is on_batch_complete def test_run_config_rejects_invalid_buffer_size() -> None: with pytest.raises(ValidationError, match="buffer_size"): RunConfig(buffer_size=0) def test_create_dataset_e2e_using_only_sampler_columns( stub_sampler_only_config_builder, stub_artifact_path, stub_model_providers, stub_managed_assets_path ): column_names = [config.name for config in stub_sampler_only_config_builder.get_column_configs()] num_records = 3 data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) results = data_designer.create(stub_sampler_only_config_builder, num_records=num_records) df = results.load_dataset() assert len(df) == num_records assert set(df.columns) == set(column_names) # cycle through with no errors for _ in range(num_records + 2): results.display_sample_record() analysis = results.load_analysis() assert analysis.target_num_records == num_records # display report with no errors analysis.to_report() def test_create_with_drop_true_can_skip_dropped_column_artifacts( stub_artifact_path, stub_model_providers, stub_model_configs, stub_managed_assets_path, ): config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) config_builder.add_column( SamplerColumnConfig( name="uuid", sampler_type="uuid", params={"prefix": "id_", "short_form": True, "uppercase": False}, ) ) config_builder.add_column( SamplerColumnConfig( name="hidden_category", sampler_type="category", params={"values": ["private"]}, drop=True, ) ) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) data_designer.set_run_config(RunConfig(preserve_dropped_columns=False)) results = data_designer.create(config_builder, num_records=3) df = results.load_dataset() assert "uuid" in df.columns assert "hidden_category" not in df.columns assert not results.artifact_storage.dropped_columns_dataset_path.exists() metadata = json.loads(results.artifact_storage.metadata_file_path.read_text()) assert metadata["preserve_dropped_columns"] is False def test_create_with_drop_true_preserves_columns_only_in_dropped_artifacts( stub_artifact_path, stub_model_providers, stub_model_configs, stub_managed_assets_path, ): config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) config_builder.add_column( SamplerColumnConfig( name="uuid", sampler_type="uuid", params={"prefix": "id_", "short_form": True, "uppercase": False}, ) ) config_builder.add_column( SamplerColumnConfig( name="hidden_category", sampler_type="category", params={"values": ["private"]}, drop=True, ) ) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) results = data_designer.create(config_builder, num_records=3) main_df = results.load_dataset() dropped_df = lazy.pd.read_parquet(results.artifact_storage.dropped_columns_dataset_path) assert "uuid" in main_df.columns assert "hidden_category" not in main_df.columns assert "hidden_category" in dropped_df.columns assert "uuid" not in dropped_df.columns metadata = json.loads(results.artifact_storage.metadata_file_path.read_text()) assert metadata["preserve_dropped_columns"] is True def test_create_raises_error_when_builder_fails( stub_artifact_path, stub_model_providers, stub_sampler_only_config_builder, stub_managed_assets_path ): """Test that create method raises DataDesignerCreateError when builder.build fails.""" data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) with patch.object(data_designer, "_create_dataset_builder") as mock_builder_method: mock_builder = MagicMock() mock_builder.build.side_effect = RuntimeError("Builder failed") mock_builder_method.return_value = mock_builder with pytest.raises( DataDesignerGenerationError, match="🛑 Error generating dataset: Builder failed" ) as exc_info: data_designer.create(stub_sampler_only_config_builder, num_records=3) assert isinstance(exc_info.value.__cause__, RuntimeError) def test_create_raises_error_when_profiler_fails( stub_artifact_path, stub_model_providers, stub_sampler_only_config_builder, stub_managed_assets_path ): """Test that create method raises DataDesignerCreateError when profiler.profile_dataset fails.""" data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) with ( patch.object(data_designer, "_create_dataset_builder") as mock_builder_method, patch.object(data_designer, "_create_dataset_profiler") as mock_profiler_method, ): # Mock builder to succeed mock_builder = MagicMock() mock_builder.build.return_value = None mock_builder.artifact_storage.load_dataset_with_dropped_columns.return_value = lazy.pd.DataFrame( {"col": [1, 2, 3]} ) mock_builder_method.return_value = mock_builder # Mock profiler to fail mock_profiler = MagicMock() mock_profiler.profile_dataset.side_effect = ValueError("Profiler failed") mock_profiler_method.return_value = mock_profiler with pytest.raises(DataDesignerProfilingError, match="🛑 Error profiling dataset: Profiler failed") as exc_info: data_designer.create(stub_sampler_only_config_builder, num_records=3) assert isinstance(exc_info.value.__cause__, ValueError) def test_preview_raises_error_when_builder_fails( stub_artifact_path, stub_model_providers, stub_sampler_only_config_builder, stub_managed_assets_path ): """Test that preview method raises DataDesignerPreviewError when builder.build_preview fails.""" data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) with patch.object(data_designer, "_create_dataset_builder") as mock_builder_method: mock_builder = MagicMock() mock_builder.build_preview.side_effect = RuntimeError("Builder preview failed") mock_builder_method.return_value = mock_builder with pytest.raises( DataDesignerGenerationError, match="🛑 Error generating preview dataset: Builder preview failed" ) as exc_info: data_designer.preview(stub_sampler_only_config_builder, num_records=3) assert isinstance(exc_info.value.__cause__, RuntimeError) def test_preview_raises_error_when_profiler_fails( stub_artifact_path, stub_model_providers, stub_sampler_only_config_builder, stub_managed_assets_path ): """Test that preview method raises DataDesignerPreviewError when profiler.profile_dataset fails.""" data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) with ( patch.object(data_designer, "_create_dataset_builder") as mock_builder_method, patch.object(data_designer, "_create_dataset_profiler") as mock_profiler_method, ): # Mock builder to succeed mock_builder = MagicMock() mock_builder.build_preview.return_value = lazy.pd.DataFrame({"col": [1, 2, 3]}) mock_builder.process_preview.return_value = lazy.pd.DataFrame({"col": [1, 2, 3]}) mock_builder_method.return_value = mock_builder # Mock profiler to fail mock_profiler = MagicMock() mock_profiler.profile_dataset.side_effect = ValueError("Profiler failed in preview") mock_profiler_method.return_value = mock_profiler with pytest.raises( DataDesignerProfilingError, match="🛑 Error profiling preview dataset: Profiler failed in preview" ) as exc_info: data_designer.preview(stub_sampler_only_config_builder, num_records=3) assert isinstance(exc_info.value.__cause__, ValueError) def _patch_builder_state( *, early_shutdown: bool, actual_num_records: int = 0, first_non_retryable_error: Exception | None = None, ) -> contextlib.ExitStack: """Patch DatasetBuilder.early_shutdown / actual_num_records / first_non_retryable_error.""" stack = contextlib.ExitStack() stack.enter_context( patch( "data_designer.engine.dataset_builders.dataset_builder.DatasetBuilder.early_shutdown", new_callable=PropertyMock, return_value=early_shutdown, ) ) stack.enter_context( patch( "data_designer.engine.dataset_builders.dataset_builder.DatasetBuilder.actual_num_records", new_callable=PropertyMock, return_value=actual_num_records, ) ) stack.enter_context( patch( "data_designer.engine.dataset_builders.dataset_builder.DatasetBuilder.first_non_retryable_error", new_callable=PropertyMock, return_value=first_non_retryable_error, ) ) return stack def _make_data_designer( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, ) -> DataDesigner: return DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) # Matrix of error-dispatch behavior in create() when the load step doesn't return a # usable dataset. Two side-effect modes (load raises FileNotFoundError vs. load # returns an empty DF) crossed with three builder states (no shutdown, shutdown # with zero records, shutdown with partial salvage). Each case asserts exactly # which error type create() surfaces and the message it carries. @pytest.mark.parametrize( "load_side_effect,early_shutdown,actual_num_records,expected_exc,match,expect_filenotfound_cause", [ # Load raises FileNotFoundError → "Failed to load generated dataset" path. pytest.param( "raises", False, -1, DataDesignerGenerationError, "Failed to load generated dataset", True, id="load_fails_no_shutdown", ), pytest.param( "raises", True, 0, DataDesignerEarlyShutdownError, "early shutdown was triggered", True, id="load_fails_shutdown_zero_records", ), pytest.param( "raises", True, 7, DataDesignerGenerationError, "Failed to load generated dataset", True, id="load_fails_shutdown_partial_salvage", ), # Load returns empty DF → "Dataset is empty" defensive guard. pytest.param( "empty_df", False, -1, DataDesignerGenerationError, "Dataset is empty", False, id="empty_df_no_shutdown", ), pytest.param( "empty_df", True, 0, DataDesignerEarlyShutdownError, "early shutdown was triggered", False, id="empty_df_shutdown_zero_records", ), pytest.param( "empty_df", True, 7, DataDesignerGenerationError, "Dataset is empty", False, id="empty_df_shutdown_partial_salvage", ), ], ) def test_create_error_dispatch_on_load_outcome( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_sampler_only_config_builder: DataDesignerConfigBuilder, stub_managed_assets_path: Path, load_side_effect: str, early_shutdown: bool, actual_num_records: int, expected_exc: type[Exception], match: str, expect_filenotfound_cause: bool, ) -> None: """create() picks the right error type based on (load outcome × builder state). The typed ``DataDesignerEarlyShutdownError`` only fires when the gate tripped AND zero records were produced. Partial-salvage runs that fail to load (or return empty for unrelated reasons) fall through to the generic error so the real cause isn't masked. """ data_designer = _make_data_designer(stub_artifact_path, stub_model_providers, stub_managed_assets_path) if load_side_effect == "raises": load_patch = patch( "data_designer.engine.storage.artifact_storage.ArtifactStorage.load_dataset_with_dropped_columns", side_effect=FileNotFoundError("No parquet files found"), ) else: load_patch = patch( "data_designer.engine.storage.artifact_storage.ArtifactStorage.load_dataset_with_dropped_columns", return_value=lazy.pd.DataFrame(), ) state_patch = ( _patch_builder_state(early_shutdown=early_shutdown, actual_num_records=actual_num_records) if early_shutdown else contextlib.nullcontext() ) with load_patch, state_patch: with pytest.raises(expected_exc, match=match) as exc_info: data_designer.create(stub_sampler_only_config_builder, num_records=10) # Subclass relationship is the contract callers depend on - existing handlers # for DataDesignerGenerationError must still catch the typed subclass. if expected_exc is DataDesignerEarlyShutdownError: assert isinstance(exc_info.value, DataDesignerGenerationError) else: assert not isinstance(exc_info.value, DataDesignerEarlyShutdownError) if expect_filenotfound_cause: assert isinstance(exc_info.value.__cause__, FileNotFoundError) @pytest.mark.parametrize( "load_side_effect", ["raises", "empty_df"], ids=["load-raises-filenotfound", "load-returns-empty-df"], ) def test_create_surfaces_first_non_retryable_error_when_zero_records( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_sampler_only_config_builder: DataDesignerConfigBuilder, stub_managed_assets_path: Path, load_side_effect: str, ) -> None: """When 0 records were produced due to a deterministic non-retryable error (no early-shutdown), surface that error's message instead of a wrapped FileNotFoundError on the parquet path. The interface chains the original exception via ``__cause__`` so callers still have full context. """ data_designer = _make_data_designer(stub_artifact_path, stub_model_providers, stub_managed_assets_path) root_cause = ValueError("invalid seed source: no rows after hydration") if load_side_effect == "raises": load_patch = patch( "data_designer.engine.storage.artifact_storage.ArtifactStorage.load_dataset_with_dropped_columns", side_effect=FileNotFoundError("No parquet files found"), ) else: load_patch = patch( "data_designer.engine.storage.artifact_storage.ArtifactStorage.load_dataset_with_dropped_columns", return_value=lazy.pd.DataFrame(), ) with ( load_patch, _patch_builder_state( early_shutdown=False, actual_num_records=0, first_non_retryable_error=root_cause, ), ): with pytest.raises(DataDesignerGenerationError, match="invalid seed source") as exc_info: data_designer.create(stub_sampler_only_config_builder, num_records=10) # Original cause is preserved via __cause__, not lost behind the parquet error. assert exc_info.value.__cause__ is root_cause # The typed DataDesignerEarlyShutdownError must NOT fire here — the gate didn't trip. assert not isinstance(exc_info.value, DataDesignerEarlyShutdownError) def test_preview_raises_generation_error_when_dataset_is_empty( stub_artifact_path, stub_model_providers, stub_sampler_only_config_builder, stub_managed_assets_path ): """When all records are dropped during generation, preview should raise DataDesignerGenerationError with a clear message instead of a misleading profiler error. """ data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) with patch( "data_designer.engine.dataset_builders.dataset_builder.DatasetBuilder.process_preview", return_value=lazy.pd.DataFrame(), ): with pytest.raises(DataDesignerGenerationError, match="Dataset is empty"): data_designer.preview(stub_sampler_only_config_builder, num_records=1) def test_preview_raises_early_shutdown_error_on_empty_after_shutdown( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_sampler_only_config_builder: DataDesignerConfigBuilder, stub_managed_assets_path: Path, ) -> None: """Preview mirrors create(): typed early-shutdown error fires when shutdown produced zero records.""" data_designer = _make_data_designer(stub_artifact_path, stub_model_providers, stub_managed_assets_path) with ( patch( "data_designer.engine.dataset_builders.dataset_builder.DatasetBuilder.process_preview", return_value=lazy.pd.DataFrame(), ), _patch_builder_state(early_shutdown=True, actual_num_records=0), ): with pytest.raises(DataDesignerEarlyShutdownError, match="early shutdown was triggered"): data_designer.preview(stub_sampler_only_config_builder, num_records=1) def test_preview_raises_generic_error_when_partial_then_empty( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_sampler_only_config_builder: DataDesignerConfigBuilder, stub_managed_assets_path: Path, ) -> None: """Preview falls through to the generic error when records were salvaged.""" data_designer = _make_data_designer(stub_artifact_path, stub_model_providers, stub_managed_assets_path) with ( patch( "data_designer.engine.dataset_builders.dataset_builder.DatasetBuilder.process_preview", return_value=lazy.pd.DataFrame(), ), _patch_builder_state(early_shutdown=True, actual_num_records=3), ): with pytest.raises(DataDesignerGenerationError, match="Dataset is empty") as exc_info: data_designer.preview(stub_sampler_only_config_builder, num_records=10) assert not isinstance(exc_info.value, DataDesignerEarlyShutdownError) def test_create_logs_secure_jinja_rendering_mode( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_sampler_only_config_builder: DataDesignerConfigBuilder, stub_managed_assets_path: Path, ) -> None: data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) data_designer.set_run_config(RunConfig(jinja_rendering_engine=JinjaRenderingEngine.SECURE)) with ( patch.object(dd_mod.logger, "info") as mock_info, patch.object(data_designer, "_create_resource_provider") as mock_resource_provider_method, patch.object(data_designer, "_create_dataset_builder") as mock_builder_method, patch.object(data_designer, "_create_dataset_profiler") as mock_profiler_method, ): mock_resource_provider = MagicMock() mock_resource_provider.get_dataset_metadata.return_value = {} mock_resource_provider_method.return_value = mock_resource_provider mock_builder = MagicMock() mock_builder.build.return_value = None mock_builder.task_traces = [] mock_builder.artifact_storage.load_dataset_with_dropped_columns.return_value = lazy.pd.DataFrame({"col": [1]}) mock_builder_method.return_value = mock_builder mock_profiler = MagicMock() mock_profiler.profile_dataset.return_value = None mock_profiler_method.return_value = mock_profiler data_designer.create(stub_sampler_only_config_builder, num_records=1) assert any("🔒 Jinja rendering engine: secure" in call.args[0] for call in mock_info.call_args_list) def test_preview_logs_native_jinja_rendering_mode( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_sampler_only_config_builder: DataDesignerConfigBuilder, stub_managed_assets_path: Path, ) -> None: data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) data_designer.set_run_config(RunConfig(jinja_rendering_engine=JinjaRenderingEngine.NATIVE)) with ( patch.object(dd_mod.logger, "info") as mock_info, patch.object(data_designer, "_create_resource_provider") as mock_resource_provider_method, patch.object(data_designer, "_create_dataset_builder") as mock_builder_method, patch.object(data_designer, "_create_dataset_profiler") as mock_profiler_method, ): mock_resource_provider = MagicMock() mock_resource_provider.get_dataset_metadata.return_value = {} mock_resource_provider_method.return_value = mock_resource_provider mock_builder = MagicMock() mock_builder.build_preview.return_value = lazy.pd.DataFrame({"col": [1]}) mock_builder.process_preview.return_value = lazy.pd.DataFrame({"col": [1]}) mock_builder.artifact_storage.list_processor_names.return_value = [] mock_builder_method.return_value = mock_builder mock_profiler = MagicMock() mock_profiler.profile_dataset.return_value = None mock_profiler_method.return_value = mock_profiler data_designer.preview(stub_sampler_only_config_builder, num_records=1) assert any("🏠 Jinja rendering engine: native" in call.args[0] for call in mock_info.call_args_list) def test_preview_datetime_single_record_returns_iso8601( stub_artifact_path, stub_model_providers, stub_model_configs, stub_managed_assets_path ): """Regression test for #484: single-record datetime preview must return ISO-8601, not a bare year.""" config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) config_builder.add_column( SamplerColumnConfig( name="ts", sampler_type=SamplerType.DATETIME, params=DatetimeSamplerParams(start="2024-01-01", end="2026-06-30", unit="h"), ), ) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) result = data_designer.preview(config_builder, num_records=1) ts_value = result.dataset["ts"].iloc[0] # Must be a full ISO-8601 timestamp, not a bare year like "2025". assert "T" in ts_value, f"Expected ISO-8601 timestamp, got: {ts_value!r}" parsed = datetime.fromisoformat(ts_value) assert parsed.year >= 2024 assert parsed.year <= 2026 def test_preview_with_dropped_columns( stub_artifact_path, stub_model_providers, stub_model_configs, stub_managed_assets_path ): """Test that preview correctly handles dropped columns and maintains consistency.""" config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) config_builder.add_column( SamplerColumnConfig( name="uuid", sampler_type="uuid", params={"prefix": "id_", "short_form": True, "uppercase": False} ) ) config_builder.add_column( SamplerColumnConfig(name="category", sampler_type="category", params={"values": ["a", "b", "c"]}) ) config_builder.add_column( SamplerColumnConfig(name="uniform", sampler_type="uniform", params={"low": 1, "high": 100}) ) config_builder.add_processor(DropColumnsProcessorConfig(name="drop_columns_processor", column_names=["category"])) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) num_records = 5 preview_results = data_designer.preview(config_builder, num_records=num_records) preview_dataset = preview_results.dataset assert "category" not in preview_dataset.columns, "Dropped column 'category' should not be in preview dataset" assert "uuid" in preview_dataset.columns, "Column 'uuid' should be in preview dataset" assert "uniform" in preview_dataset.columns, "Column 'uniform' should be in preview dataset" assert len(preview_dataset) == num_records, f"Preview dataset should have {num_records} records" analysis = preview_results.analysis assert analysis is not None, "Analysis should be generated" column_names_in_analysis = [stat.column_name for stat in analysis.column_statistics] assert "uuid" in column_names_in_analysis, "Column 'uuid' should be in analysis" assert "uniform" in column_names_in_analysis, "Column 'uniform' should be in analysis" assert "category" not in column_names_in_analysis, "Dropped column 'category' should not be in analysis statistics" assert analysis.side_effect_column_names is not None, "Side effect column names should be tracked" assert "category" in analysis.side_effect_column_names, ( "Dropped column 'category' should be tracked in side_effect_column_names" ) def test_validate_raises_error_when_seed_collides( stub_artifact_path, stub_model_providers, stub_model_configs, stub_managed_assets_path, stub_seed_reader, ): config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) config_builder.with_seed_dataset(HuggingFaceSeedSource(path="hf://datasets/test/data.csv")) config_builder.add_column( SamplerColumnConfig( name="city", sampler_type=SamplerType.CATEGORY, params=CategorySamplerParams(values=["new york", "los angeles"]), ) ) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, seed_readers=[stub_seed_reader], ) with pytest.raises(InvalidConfigError): data_designer.validate(config_builder) def test_initialize_interface_runtime_runs_once(monkeypatch: pytest.MonkeyPatch) -> None: """_initialize_interface_runtime only runs initialization once.""" monkeypatch.setattr(dd_mod, "_interface_runtime_initialized", False) with ( patch("data_designer.interface.data_designer.configure_logging") as mock_logging, patch("data_designer.interface.data_designer.resolve_seed_default_model_settings") as mock_resolve, ): dd_mod._initialize_interface_runtime() dd_mod._initialize_interface_runtime() mock_logging.assert_called_once() mock_resolve.assert_called_once() def test_create_dataset_e2e_with_directory_seed_source( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "directory-seed" (seed_dir / "subdir").mkdir(parents=True) (seed_dir / "alpha.txt").write_text("alpha", encoding="utf-8") (seed_dir / "subdir" / "beta.md").write_text("beta", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset(DirectorySeedSource(path=str(seed_dir))) builder.add_column(ExpressionColumnConfig(name="path_label", expr="{{ source_kind }}::{{ relative_path }}")) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) results = data_designer.create(builder, num_records=2, dataset_name="directory-seed-test") df = results.load_dataset().sort_values("relative_path").reset_index(drop=True) assert list(df["source_kind"]) == ["directory_file", "directory_file"] assert list(df["relative_path"]) == ["alpha.txt", "subdir/beta.md"] assert list(df["file_name"]) == ["alpha.txt", "beta.md"] assert list(df["path_label"]) == [ "directory_file::alpha.txt", "directory_file::subdir/beta.md", ] def test_preview_dataset_e2e_with_directory_seed_source( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "directory-preview-seed" (seed_dir / "subdir").mkdir(parents=True) (seed_dir / "alpha.txt").write_text("alpha", encoding="utf-8") (seed_dir / "subdir" / "beta.txt").write_text("beta", encoding="utf-8") (seed_dir / "subdir" / "gamma.md").write_text("gamma", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset(DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt")) builder.add_column(ExpressionColumnConfig(name="path_label", expr="{{ source_kind }}::{{ relative_path }}")) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) preview_results = data_designer.preview(builder, num_records=2) df = preview_results.dataset.sort_values("relative_path").reset_index(drop=True) assert list(df["source_kind"]) == ["directory_file", "directory_file"] assert list(df["relative_path"]) == ["alpha.txt", "subdir/beta.txt"] assert list(df["path_label"]) == [ "directory_file::alpha.txt", "directory_file::subdir/beta.txt", ] def test_create_dataset_e2e_with_file_contents_seed_source( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "file-contents-seed" seed_dir.mkdir(parents=True) (seed_dir / "alpha.txt").write_text("alpha", encoding="utf-8") (seed_dir / "beta.txt").write_text("beta", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset(FileContentsSeedSource(path=str(seed_dir), file_pattern="*.txt")) builder.add_column(ExpressionColumnConfig(name="content_label", expr="{{ file_name }}::{{ content }}")) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) results = data_designer.create(builder, num_records=2, dataset_name="file-contents-seed-test") df = results.load_dataset().sort_values("file_name").reset_index(drop=True) assert list(df["source_kind"]) == ["file_contents", "file_contents"] assert list(df["file_name"]) == ["alpha.txt", "beta.txt"] assert list(df["content"]) == ["alpha", "beta"] assert list(df["content_label"]) == [ "alpha.txt::alpha", "beta.txt::beta", ] def test_preview_dataset_e2e_with_file_contents_seed_source( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "file-contents-preview-seed" seed_dir.mkdir(parents=True) (seed_dir / "alpha.txt").write_text("alpha", encoding="utf-8") (seed_dir / "beta.txt").write_text("beta", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset(FileContentsSeedSource(path=str(seed_dir), file_pattern="*.txt")) builder.add_column(ExpressionColumnConfig(name="content_label", expr="{{ file_name }}::{{ content }}")) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) preview_results = data_designer.preview(builder, num_records=2) df = preview_results.dataset.sort_values("file_name").reset_index(drop=True) assert list(df["source_kind"]) == ["file_contents", "file_contents"] assert list(df["file_name"]) == ["alpha.txt", "beta.txt"] assert list(df["content_label"]) == [ "alpha.txt::alpha", "beta.txt::beta", ] def test_create_dataset_e2e_with_directory_seed_source_index_range_cycles_within_selection( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "directory-index-range-seed" seed_dir.mkdir(parents=True) for index in range(4): (seed_dir / f"file-{index}.txt").write_text(f"value-{index}", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset( DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt"), selection_strategy=IndexRange(start=1, end=2), ) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) results = data_designer.create(builder, num_records=5, dataset_name="directory-index-range-test") df = results.load_dataset().reset_index(drop=True) assert list(df["relative_path"]) == [ "file-1.txt", "file-2.txt", "file-1.txt", "file-2.txt", "file-1.txt", ] def test_create_dataset_e2e_with_file_contents_seed_source_partition_block_cycles_within_selection( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "file-contents-partition-seed" seed_dir.mkdir(parents=True) for index in range(6): (seed_dir / f"file-{index}.txt").write_text(f"value-{index}", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset( FileContentsSeedSource(path=str(seed_dir), file_pattern="*.txt"), selection_strategy=PartitionBlock(index=1, num_partitions=3), ) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) results = data_designer.create(builder, num_records=5, dataset_name="file-contents-partition-test") df = results.load_dataset().reset_index(drop=True) assert list(df["relative_path"]) == [ "file-2.txt", "file-3.txt", "file-2.txt", "file-3.txt", "file-2.txt", ] assert list(df["content"]) == [ "value-2", "value-3", "value-2", "value-3", "value-2", ] def test_create_dataset_e2e_with_file_contents_seed_source_shuffle_within_selection( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "file-contents-shuffle-seed" seed_dir.mkdir(parents=True) for index in range(6): (seed_dir / f"file-{index}.txt").write_text(f"value-{index}", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset( FileContentsSeedSource(path=str(seed_dir), file_pattern="*.txt"), sampling_strategy=SamplingStrategy.SHUFFLE, selection_strategy=IndexRange(start=0, end=4), ) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) results = data_designer.create(builder, num_records=15, dataset_name="file-contents-shuffle-test") df = results.load_dataset().reset_index(drop=True) expected_paths = [f"file-{index}.txt" for index in range(5)] assert len(df) == 15 assert set(df["relative_path"]) == set(expected_paths) assert list(df["relative_path"]) != expected_paths * 3 def test_preview_dataset_e2e_with_custom_filesystem_seed_reader_via_seed_readers_argument( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "custom-directory-reader" seed_dir.mkdir(parents=True) (seed_dir / "alpha.txt").write_text("alpha", encoding="utf-8") (seed_dir / "beta.txt").write_text("beta", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset(DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt")) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, seed_readers=[CustomDirectorySeedReader()], ) preview_results = data_designer.preview(builder, num_records=2) df = preview_results.dataset.sort_values("relative_path").reset_index(drop=True) assert list(df["decorated_path"]) == [ "custom::alpha.txt", "custom::beta.txt", ] def test_create_dataset_e2e_with_custom_filesystem_seed_reader_fanout_partition_block_selection( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "custom-fanout-directory-reader" seed_dir.mkdir(parents=True) (seed_dir / "alpha.txt").write_text("alpha-0\nalpha-1", encoding="utf-8") (seed_dir / "beta.txt").write_text("beta-0\nbeta-1", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset( DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt"), selection_strategy=PartitionBlock(index=1, num_partitions=2), ) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, seed_readers=[LineFanoutDirectorySeedReader()], ) results = data_designer.create(builder, num_records=3, dataset_name="custom-fanout-directory-reader-test") df = results.load_dataset().reset_index(drop=True) assert list(df["relative_path"]) == ["beta.txt", "beta.txt", "beta.txt"] assert list(df["line_index"]) == [0, 1, 0] assert list(df["line"]) == ["beta-0", "beta-1", "beta-0"] def test_create_dataset_e2e_with_custom_filesystem_seed_reader_selected_empty_fanout_raises_generation_error( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "custom-fanout-empty-selection-directory-reader" seed_dir.mkdir(parents=True) (seed_dir / "alpha.txt").write_text("alpha-0", encoding="utf-8") (seed_dir / "beta.txt").write_text("", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset( DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt"), selection_strategy=PartitionBlock(index=1, num_partitions=2), ) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, seed_readers=[LineFanoutDirectorySeedReader()], ) with pytest.raises( DataDesignerGenerationError, match="Selected manifest rows for seed source at .* did not produce any rows after hydration", ) as exc_info: data_designer.create(builder, num_records=1, dataset_name="custom-fanout-empty-selection-directory-reader") assert exc_info.value.__cause__ is not None assert "Selected manifest rows for seed source at" in str(exc_info.value.__cause__) def test_create_dataset_e2e_with_directory_seed_source_no_matches_raises_generation_error( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "directory-no-matches-seed" seed_dir.mkdir(parents=True) (seed_dir / "alpha.txt").write_text("alpha", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset(DirectorySeedSource(path=str(seed_dir), file_pattern="*.md")) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) with pytest.raises(DataDesignerGenerationError, match="No files matched file_pattern '\\*\\.md'") as exc_info: data_designer.create(builder, num_records=1, dataset_name="directory-no-matches-test") assert isinstance(exc_info.value.__cause__, SeedReaderError) def test_preview_dataset_e2e_with_directory_seed_source_no_matches_raises_generation_error( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "directory-preview-no-matches-seed" seed_dir.mkdir(parents=True) (seed_dir / "alpha.txt").write_text("alpha", encoding="utf-8") builder = DataDesignerConfigBuilder() builder.with_seed_dataset(DirectorySeedSource(path=str(seed_dir), file_pattern="*.md")) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) with pytest.raises(DataDesignerGenerationError, match="No files matched file_pattern '\\*\\.md'") as exc_info: data_designer.preview(builder, num_records=1) assert isinstance(exc_info.value.__cause__, SeedReaderError) def test_create_dataset_e2e_with_file_contents_seed_source_decode_failure_raises_generation_error( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "file-contents-decode-error-seed" seed_dir.mkdir(parents=True) (seed_dir / "latin1.txt").write_bytes("café".encode("latin-1")) builder = DataDesignerConfigBuilder() builder.with_seed_dataset(FileContentsSeedSource(path=str(seed_dir), file_pattern="*.txt")) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) with pytest.raises(DataDesignerGenerationError, match="Failed to decode file"): data_designer.create(builder, num_records=1, dataset_name="file-contents-decode-error-test") def test_create_dataset_e2e_with_file_contents_seed_source_unreadable_file_raises_generation_error( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: seed_dir = tmp_path / "file-contents-permissions-seed" seed_dir.mkdir(parents=True) unreadable_path = seed_dir / "blocked.txt" unreadable_path.write_text("blocked", encoding="utf-8") unreadable_path.chmod(0) builder = DataDesignerConfigBuilder() builder.with_seed_dataset(FileContentsSeedSource(path=str(seed_dir), file_pattern="*.txt")) _add_irrelevant_sampler_column(builder) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) try: with pytest.raises(DataDesignerGenerationError, match="Failed to read file"): data_designer.create(builder, num_records=1, dataset_name="file-contents-permissions-test") finally: unreadable_path.chmod(0o644) @pytest.mark.parametrize( ("dir_name", "seed_source_factory", "writer", "expected_trace_ids", "expected_messages", "expected_tool_counts"), [ ( "atif", lambda path: AgentRolloutSeedSource(path=str(path), format=AgentRolloutFormat.ATIF), _write_atif_trace_directory, ["atif-session-1"], ["Repository inspected"], [0], ), ( "claude-code", lambda path: AgentRolloutSeedSource( path=str(path), format=AgentRolloutFormat.CLAUDE_CODE, ), _write_claude_trace_directory, ["session-1", "session-1:agent-a"], ["Repo inspected", "Tests checked"], [1, 0], ), ( "codex", lambda path: AgentRolloutSeedSource(path=str(path), format=AgentRolloutFormat.CODEX), _write_codex_trace_directory, ["codex-session"], ["Listed files"], [1], ), ( "hermes-agent", lambda path: AgentRolloutSeedSource(path=str(path), format=AgentRolloutFormat.HERMES_AGENT), _write_hermes_trace_directory, ["20260407_092759_baeaac", "gateway-session-1"], ["Done.", "I'll inspect the logs."], [1, 1], ), ], ids=["atif", "claude-code", "codex", "hermes-agent"], ) def test_create_dataset_e2e_with_trace_seed_sources( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, dir_name: str, seed_source_factory: Any, writer: Any, expected_trace_ids: list[str], expected_messages: list[str], expected_tool_counts: list[int], ) -> None: trace_dir = tmp_path / dir_name writer(trace_dir) builder = DataDesignerConfigBuilder() builder.with_seed_dataset(seed_source_factory(trace_dir)) builder.add_column(ExpressionColumnConfig(name="assistant_copy", expr="{{ final_assistant_message }}")) builder.add_column(ExpressionColumnConfig(name="trace_label", expr="{{ source_kind }}::{{ trace_id }}")) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) results = data_designer.create( builder, num_records=len(expected_trace_ids), dataset_name=f"trace-{dir_name}", ) df = results.load_dataset().sort_values("trace_id").reset_index(drop=True) assert list(df["trace_id"]) == expected_trace_ids assert list(df["assistant_copy"]) == expected_messages assert list(df["tool_call_count"]) == expected_tool_counts assert list(df["trace_label"]) == [ f"{source_kind}::{trace_id}" for source_kind, trace_id in df[["source_kind", "trace_id"]].itertuples(index=False) ] assert "messages" in df.columns assert "_internal_row_id" not in df.columns if dir_name == "claude-code": assert list(df["source_kind"]) == ["claude_code", "claude_code"] assert lazy.pd.isna(df.iloc[0]["agent_id"]) assert df.iloc[1]["agent_id"] == "agent-a" assert list(df["project_path"]) == ["/repo-from-index", "/repo-from-index"] assert list(df["is_sidechain"]) == [False, True] elif dir_name == "codex": assert list(df["source_kind"]) == ["codex"] assert list(df["cwd"]) == ["/workspace"] elif dir_name == "atif": assert list(df["source_kind"]) == ["atif"] assert list(df["cwd"]) == ["/workspace/project"] assert list(df["git_branch"]) == ["main"] elif dir_name == "hermes-agent": assert list(df["source_kind"]) == ["hermes_agent", "hermes_agent"] assert df.iloc[0]["started_at"] == "2026-04-07T09:39:07.028463" assert df.iloc[0]["ended_at"] == "2026-04-07T09:51:07.905570" assert lazy.pd.isna(df.iloc[1]["started_at"]) assert lazy.pd.isna(df.iloc[1]["ended_at"]) assert list(df["source_meta"].map(lambda meta: meta["session_format"])) == [ "cli_session_log", "gateway_transcript", ] def test_create_dataset_warns_for_unhandled_transform_files( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, caplog: pytest.LogCaptureFixture, ) -> None: trace_dir = tmp_path / "claude-code-with-unhandled" _write_claude_trace_directory_with_unhandled_files(trace_dir) caplog.set_level(logging.WARNING) builder = DataDesignerConfigBuilder() builder.with_seed_dataset(AgentRolloutSeedSource(path=str(trace_dir), format=AgentRolloutFormat.CLAUDE_CODE)) builder.add_column(ExpressionColumnConfig(name="assistant_copy", expr="{{ final_assistant_message }}")) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) results = data_designer.create(builder, num_records=2, dataset_name="trace-unhandled-test") df = results.load_dataset().sort_values("trace_id").reset_index(drop=True) assert list(df["trace_id"]) == ["session-1", "session-1:agent-a"] assert "Skipping unhandled claude_code file" in caplog.text def test_create_raises_error_when_all_trace_files_are_skipped( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_managed_assets_path: Path, tmp_path: Path, ) -> None: trace_dir = tmp_path / "invalid-traces" session_dir = trace_dir / "project-a" _write_empty_jsonl(session_dir / "empty-1.jsonl") _write_empty_jsonl(session_dir / "empty-2.jsonl") builder = DataDesignerConfigBuilder() builder.with_seed_dataset( AgentRolloutSeedSource( path=str(trace_dir), format=AgentRolloutFormat.CLAUDE_CODE, ) ) builder.add_column(ExpressionColumnConfig(name="assistant_copy", expr="{{ final_assistant_message }}")) data_designer = DataDesigner( artifact_path=stub_artifact_path, model_providers=stub_model_providers, secret_resolver=PlaintextResolver(), managed_assets_path=stub_managed_assets_path, ) with pytest.raises(DataDesignerGenerationError, match="did not produce any rows"): data_designer.create(builder, num_records=1, dataset_name="invalid-trace-seed")