# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 """Benchmark DataDesigner engine performance with mock LLMs.""" from __future__ import annotations import argparse import asyncio import contextlib import hashlib import json import math import os import random import statistics import subprocess import sys import tempfile import time from collections.abc import Iterator from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any from data_designer.config.column_configs import LLMTextColumnConfig, SamplerColumnConfig, ValidationColumnConfig from data_designer.config.config_builder import DataDesignerConfigBuilder from data_designer.config.mcp import MCPProvider, ToolConfig from data_designer.config.models import ChatCompletionInferenceParams, ModelConfig, ModelProvider from data_designer.config.run_config import RunConfig from data_designer.config.sampler_params import SamplerType, UniformSamplerParams from data_designer.config.validator_params import LocalCallableValidatorParams, ValidatorType from data_designer.engine.mcp.registry import MCPToolDefinition, MCPToolResult from data_designer.engine.models.clients.types import AssistantMessage, ChatCompletionResponse, ToolCall from data_designer.lazy_heavy_imports import np, pd if TYPE_CHECKING: import numpy as np import pandas as pd RESULT_PREFIX = "BENCHMARK_RESULT=" DEFAULT_NUM_RECORDS = 1024 DEFAULT_BUFFER_SIZE = 1024 DEFAULT_SEED = 11 DEFAULT_MAX_PARALLEL_REQUESTS = 16 DEFAULT_VALIDATOR_BATCH_SIZE = 256 DEFAULT_ITERATIONS = 5 MOCK_MCP_PROVIDER_NAME = "mock-mcp" MOCK_TOOL_ALIAS = "mock-tools" MOCK_TOOL_NAME = "mock_lookup" MOCK_TOOL_DESCRIPTION = "Mock lookup tool for benchmark runs." MOCK_TOOL_SCHEMA = { "type": "object", "properties": { "query": {"type": "string"}, "limit": {"type": "integer"}, }, "required": ["query"], } @dataclass(frozen=True) class BenchmarkSettings: num_records: int buffer_size: int seed: int max_parallel_requests: int validator_batch_size: int simulated_latency: bool = False def to_cli_args(self) -> list[str]: args = [ "--num-records", str(self.num_records), "--buffer-size", str(self.buffer_size), "--seed", str(self.seed), "--max-parallel-requests", str(self.max_parallel_requests), "--validator-batch-size", str(self.validator_batch_size), ] if self.simulated_latency: args.append("--simulated-latency") return args @dataclass(frozen=True) class BenchmarkResult: engine_mode: str num_records: int buffer_size: int build_time_sec: float total_time_sec: float dataset_hash: str row_count: int column_count: int def to_dict(self) -> dict[str, Any]: return { "engine_mode": self.engine_mode, "num_records": self.num_records, "buffer_size": self.buffer_size, "build_time_sec": self.build_time_sec, "total_time_sec": self.total_time_sec, "dataset_hash": self.dataset_hash, "row_count": self.row_count, "column_count": self.column_count, } @classmethod def from_dict(cls, payload: dict[str, Any]) -> BenchmarkResult: return cls( engine_mode=str(payload["engine_mode"]), num_records=int(payload["num_records"]), buffer_size=int(payload["buffer_size"]), build_time_sec=float(payload["build_time_sec"]), total_time_sec=float(payload["total_time_sec"]), dataset_hash=str(payload["dataset_hash"]), row_count=int(payload["row_count"]), column_count=int(payload["column_count"]), ) @dataclass(frozen=True) class MetricStats: mean: float stdev: float ci_half_width: float n: int @property def ci_low(self) -> float: return self.mean - self.ci_half_width @property def ci_high(self) -> float: return self.mean + self.ci_half_width @dataclass(frozen=True) class ResponseProfile: label: str score_mu: float score_sigma: float latency_alpha: float latency_beta: float volatility_sigma: float categories: tuple[str, ...] category_weights: tuple[float, ...] MODEL_PROFILES: dict[str, ResponseProfile] = { "mock-alpha": ResponseProfile( label="alpha", score_mu=0.1, score_sigma=0.35, latency_alpha=2.2, latency_beta=6.0, volatility_sigma=0.25, categories=("low", "mid", "high"), category_weights=(0.25, 0.55, 0.2), ), "mock-beta": ResponseProfile( label="beta", score_mu=0.3, score_sigma=0.45, latency_alpha=2.6, latency_beta=4.8, volatility_sigma=0.3, categories=("low", "mid", "high"), category_weights=(0.2, 0.5, 0.3), ), "mock-gamma": ResponseProfile( label="gamma", score_mu=0.5, score_sigma=0.5, latency_alpha=3.0, latency_beta=3.6, volatility_sigma=0.35, categories=("low", "mid", "high"), category_weights=(0.15, 0.45, 0.4), ), } DEFAULT_PROFILE = ResponseProfile( label="default", score_mu=0.2, score_sigma=0.4, latency_alpha=2.4, latency_beta=5.0, volatility_sigma=0.3, categories=("low", "mid", "high"), category_weights=(0.3, 0.5, 0.2), ) @dataclass(frozen=True) class FakeCompletionResponse: """Wraps a ChatCompletionResponse with benchmark-specific latency metadata.""" response: ChatCompletionResponse latency_ms: float = 0.0 def _distinct_parallel_requests(base: int) -> tuple[int, int, int]: if base < 3: raise ValueError("max_parallel_requests must be >= 3 to create distinct per-model limits.") high = base mid = max(1, int(round(high / 2))) low = max(1, int(round(high / 5))) if mid >= high: mid = high - 1 if low >= mid: low = max(1, mid - 1) return high, mid, low def _t_critical_95(df: int) -> float: table = { 1: 12.706, 2: 4.303, 3: 3.182, 4: 2.776, 5: 2.571, 6: 2.447, 7: 2.365, 8: 2.306, 9: 2.262, 10: 2.228, 11: 2.201, 12: 2.179, 13: 2.160, 14: 2.145, 15: 2.131, 16: 2.120, 17: 2.110, 18: 2.101, 19: 2.093, 20: 2.086, 21: 2.080, 22: 2.074, 23: 2.069, 24: 2.064, 25: 2.060, 26: 2.056, 27: 2.052, 28: 2.048, 29: 2.045, 30: 2.042, } return table.get(df, 1.96) def _compute_stats(values: list[float]) -> MetricStats: if not values: return MetricStats(mean=0.0, stdev=0.0, ci_half_width=0.0, n=0) if len(values) == 1: return MetricStats(mean=values[0], stdev=0.0, ci_half_width=0.0, n=1) stdev = statistics.stdev(values) mean = statistics.mean(values) t_value = _t_critical_95(len(values) - 1) ci_half_width = t_value * stdev / math.sqrt(len(values)) return MetricStats(mean=mean, stdev=stdev, ci_half_width=ci_half_width, n=len(values)) def _format_stats(stats: MetricStats, *, unit: str, precision: int = 3) -> str: fmt = f"{{:.{precision}f}}" mean = fmt.format(stats.mean) ci = fmt.format(stats.ci_half_width) stdev = fmt.format(stats.stdev) return f"{mean}{unit} ± {ci}{unit} (stdev {stdev}{unit}, n={stats.n})" def _format_speed_stats(stats: MetricStats, *, precision: int = 2) -> str: fmt = f"{{:.{precision}f}}" mean = fmt.format(stats.mean) ci = fmt.format(stats.ci_half_width) stdev = fmt.format(stats.stdev) return f"{mean}x ± {ci}x (stdev {stdev}x, n={stats.n})" def _significant_diff(stats: MetricStats) -> bool: return stats.n > 1 and abs(stats.mean) > stats.ci_half_width def _json_default(value: Any) -> Any: if isinstance(value, np.generic): return value.item() if isinstance(value, np.ndarray): return value.tolist() if isinstance(value, (pd.Timestamp, pd.Timedelta)): return value.isoformat() if isinstance(value, set): return sorted(value) if isinstance(value, bytes): return value.decode("utf-8", errors="replace") return str(value) def _stable_seed(model: str, messages: list[dict[str, Any]]) -> int: payload = json.dumps( {"model": model, "messages": messages}, sort_keys=True, separators=(",", ":"), ensure_ascii=True, default=_json_default, ) digest = hashlib.sha256(payload.encode("utf-8")).digest() return int.from_bytes(digest[:8], "big") def _profile_for_model(model: str) -> ResponseProfile: for key, profile in MODEL_PROFILES.items(): if key in model: return profile return DEFAULT_PROFILE def _mock_response_text(model: str, messages: list[dict[str, Any]]) -> tuple[str, float]: profile = _profile_for_model(model) rng = random.Random(_stable_seed(model, messages)) category = rng.choices(profile.categories, weights=profile.category_weights, k=1)[0] score = rng.lognormvariate(profile.score_mu, profile.score_sigma) latency_ms = int(rng.betavariate(profile.latency_alpha, profile.latency_beta) * 900.0) volatility = rng.gauss(0.0, profile.volatility_sigma) text = f"{profile.label}:{category}|score={score:.3f}|latency_ms={latency_ms}|vol={volatility:.3f}" return text, float(latency_ms) def _tool_call_id(model: str, messages: list[dict[str, Any]]) -> str: call_seed = _stable_seed(model, messages) return f"tool-{call_seed:016x}" def _tool_call_arguments(model: str, messages: list[dict[str, Any]]) -> dict[str, Any]: rng = random.Random(_stable_seed(model, messages)) return { "query": f"{model}-lookup-{rng.randint(1000, 9999)}", "limit": rng.randint(1, 3), } def _build_tool_call(model: str, messages: list[dict[str, Any]]) -> dict[str, Any]: arguments = _tool_call_arguments(model, messages) return { "id": _tool_call_id(model, messages), "type": "function", "function": {"name": MOCK_TOOL_NAME, "arguments": json.dumps(arguments)}, } def _should_request_tool(messages: list[dict[str, Any]]) -> bool: return not any(message.get("role") == "tool" for message in messages) def _mock_tool_definition() -> MCPToolDefinition: return MCPToolDefinition( name=MOCK_TOOL_NAME, description=MOCK_TOOL_DESCRIPTION, input_schema=MOCK_TOOL_SCHEMA, ) def _mock_tool_result(tool_name: str, arguments: dict[str, Any], provider_name: str) -> MCPToolResult: payload = { "tool": tool_name, "provider": provider_name, "query": arguments.get("query", ""), "limit": arguments.get("limit", 0), "status": "ok", } return MCPToolResult(content=json.dumps(payload)) def _fake_response(model: str, messages: list[dict[str, Any]], **kwargs: Any) -> FakeCompletionResponse: if kwargs.get("tools") and _should_request_tool(messages): raw_call = _build_tool_call(model, messages) profile = _profile_for_model(model) rng = random.Random(_stable_seed(model, messages)) latency_ms = float(int(rng.betavariate(profile.latency_alpha, profile.latency_beta) * 900.0)) return FakeCompletionResponse( response=ChatCompletionResponse( message=AssistantMessage( content="Using tool.", tool_calls=[ ToolCall( id=raw_call["id"], name=raw_call["function"]["name"], arguments_json=raw_call["function"]["arguments"], ) ], ), ), latency_ms=latency_ms, ) response_text, latency_ms = _mock_response_text(model, messages) return FakeCompletionResponse( response=ChatCompletionResponse(message=AssistantMessage(content=response_text)), latency_ms=latency_ms, ) @contextlib.contextmanager def _patch_llm_responses(*, simulated_latency: bool = False) -> Iterator[None]: from data_designer.engine.models.clients.adapters.openai_compatible import OpenAICompatibleClient original_completion = OpenAICompatibleClient.completion original_acompletion = OpenAICompatibleClient.acompletion def fake_completion(self: Any, request: Any) -> ChatCompletionResponse: _ = self fake = _fake_response(request.model, request.messages, tools=request.tools) if simulated_latency and fake.latency_ms > 0: time.sleep(fake.latency_ms / 1000.0) return fake.response async def fake_acompletion(self: Any, request: Any) -> ChatCompletionResponse: _ = self fake = _fake_response(request.model, request.messages, tools=request.tools) if simulated_latency and fake.latency_ms > 0: await asyncio.sleep(fake.latency_ms / 1000.0) return fake.response OpenAICompatibleClient.completion = fake_completion # type: ignore[assignment] OpenAICompatibleClient.acompletion = fake_acompletion # type: ignore[assignment] try: yield finally: OpenAICompatibleClient.completion = original_completion # type: ignore[assignment] OpenAICompatibleClient.acompletion = original_acompletion # type: ignore[assignment] @contextlib.contextmanager def _patch_mcp_io() -> Iterator[None]: import data_designer.engine.mcp.io as mcp_io original_list_tools = mcp_io.list_tools original_call_tools = mcp_io.call_tools def fake_list_tools(provider: Any, timeout_sec: float | None = None) -> tuple[MCPToolDefinition, ...]: if getattr(provider, "name", None) != MOCK_MCP_PROVIDER_NAME: return original_list_tools(provider, timeout_sec=timeout_sec) return (_mock_tool_definition(),) def fake_call_tools( calls: list[tuple[Any, str, dict[str, Any]]], *, timeout_sec: float | None = None, ) -> list[MCPToolResult]: if any(getattr(call[0], "name", None) != MOCK_MCP_PROVIDER_NAME for call in calls): return original_call_tools(calls, timeout_sec=timeout_sec) return [_mock_tool_result(tool_name, arguments, provider.name) for provider, tool_name, arguments in calls] mcp_io.list_tools = fake_list_tools mcp_io.call_tools = fake_call_tools try: yield finally: mcp_io.list_tools = original_list_tools mcp_io.call_tools = original_call_tools def _extract_metric(text: str, key: str) -> float | None: marker = f"{key}=" start = text.find(marker) if start == -1: return None start += len(marker) end = start while end < len(text) and (text[end].isdigit() or text[end] in {".", "-"}): end += 1 try: return float(text[start:end]) except ValueError: return None def _validate_recommendation(df: pd.DataFrame) -> pd.DataFrame: series = df["llm_stage3"].astype(str) scores = series.map(lambda text: _extract_metric(text, "score")) latencies = series.map(lambda text: _extract_metric(text, "latency_ms")) scores_numeric = pd.to_numeric(scores, errors="coerce") latency_numeric = pd.to_numeric(latencies, errors="coerce") is_valid = scores_numeric.between(0.0, 10.0) & latency_numeric.between(0.0, 900.0) return pd.DataFrame( { "is_valid": is_valid.fillna(False).astype(bool), "score": scores_numeric, "latency_ms": latency_numeric, } ) def _build_config(settings: BenchmarkSettings) -> DataDesignerConfigBuilder: high_parallel, mid_parallel, low_parallel = _distinct_parallel_requests(settings.max_parallel_requests) model_configs = [ ModelConfig( alias="mock-alpha", model="mock-alpha", provider="mock-provider", inference_parameters=ChatCompletionInferenceParams(max_parallel_requests=high_parallel), skip_health_check=True, ), ModelConfig( alias="mock-beta", model="mock-beta", provider="mock-provider", inference_parameters=ChatCompletionInferenceParams(max_parallel_requests=low_parallel), skip_health_check=True, ), ModelConfig( alias="mock-gamma", model="mock-gamma", provider="mock-provider", inference_parameters=ChatCompletionInferenceParams(max_parallel_requests=mid_parallel), skip_health_check=True, ), ] builder = DataDesignerConfigBuilder(model_configs=model_configs) builder.add_tool_config( ToolConfig( tool_alias=MOCK_TOOL_ALIAS, providers=[MOCK_MCP_PROVIDER_NAME], allow_tools=[MOCK_TOOL_NAME], max_tool_call_turns=1, timeout_sec=1.0, ) ) builder.add_column( SamplerColumnConfig( name="seed_value", sampler_type=SamplerType.UNIFORM, params=UniformSamplerParams(low=0.0, high=100.0, decimal_places=3), ) ) builder.add_column( LLMTextColumnConfig( name="llm_stage1", model_alias="mock-alpha", prompt="Summarize the signal for seed {{ seed_value }}.", ) ) builder.add_column( LLMTextColumnConfig( name="llm_stage2", model_alias="mock-beta", tool_alias=MOCK_TOOL_ALIAS, prompt="Analyze {{ llm_stage1 }} and produce a risk assessment.", ) ) builder.add_column( LLMTextColumnConfig( name="llm_stage3", model_alias="mock-gamma", prompt="Generate a recommendation from {{ llm_stage2 }} with seed {{ seed_value }}.", ) ) builder.add_column( ValidationColumnConfig( name="llm_stage3_validation", target_columns=["llm_stage3"], validator_type=ValidatorType.LOCAL_CALLABLE, validator_params=LocalCallableValidatorParams(validation_function=_validate_recommendation), batch_size=settings.validator_batch_size, ) ) return builder def _dataset_fingerprint(df: pd.DataFrame) -> str: normalized = df.reset_index(drop=True) normalized = normalized.reindex(sorted(normalized.columns), axis=1) records = normalized.to_dict(orient="records") payload = json.dumps( records, sort_keys=True, separators=(",", ":"), ensure_ascii=True, default=_json_default, ) return hashlib.sha256(payload.encode("utf-8")).hexdigest() def _run_single_benchmark(settings: BenchmarkSettings, engine_mode: str) -> BenchmarkResult: # Imports are deferred so engine selection respects DATA_DESIGNER_ASYNC_ENGINE. from data_designer.engine.dataset_builders.artifact_storage import ArtifactStorage from data_designer.engine.dataset_builders.dataset_builder import DatasetBuilder from data_designer.engine.model_provider import resolve_model_provider_registry from data_designer.engine.resources.resource_provider import create_resource_provider from data_designer.engine.resources.seed_reader import SeedReaderRegistry from data_designer.engine.secret_resolver import CompositeResolver, EnvironmentResolver, PlaintextResolver random.seed(settings.seed) np.random.seed(settings.seed) run_config = RunConfig( buffer_size=settings.buffer_size, disable_early_shutdown=True, max_conversation_restarts=0, max_conversation_correction_steps=0, ) builder = _build_config(settings) provider = ModelProvider( name="mock-provider", endpoint="https://mock.local", provider_type="openai", api_key="mock-key", ) mcp_provider = MCPProvider( name=MOCK_MCP_PROVIDER_NAME, endpoint="https://mock.local/mcp", api_key="mock-mcp-key", ) model_provider_registry = resolve_model_provider_registry([provider], default_provider_name=provider.name) secret_resolver = CompositeResolver([EnvironmentResolver(), PlaintextResolver()]) with tempfile.TemporaryDirectory() as temp_dir: artifact_storage = ArtifactStorage(artifact_path=temp_dir, dataset_name=f"benchmark-{engine_mode}") resource_provider = create_resource_provider( artifact_storage=artifact_storage, model_configs=builder.model_configs, secret_resolver=secret_resolver, model_provider_registry=model_provider_registry, seed_reader_registry=SeedReaderRegistry(readers=[]), blob_storage=None, seed_dataset_source=None, run_config=run_config, mcp_providers=[mcp_provider], tool_configs=builder.tool_configs, ) dataset_builder = DatasetBuilder( data_designer_config=builder.build(), resource_provider=resource_provider, ) total_start = time.perf_counter() with _patch_llm_responses(simulated_latency=settings.simulated_latency), _patch_mcp_io(): build_start = time.perf_counter() dataset_builder.build(num_records=settings.num_records) build_time = time.perf_counter() - build_start dataset = dataset_builder.artifact_storage.load_dataset_with_dropped_columns() dataset_hash = _dataset_fingerprint(dataset) total_time = time.perf_counter() - total_start return BenchmarkResult( engine_mode=engine_mode, num_records=settings.num_records, buffer_size=settings.buffer_size, build_time_sec=build_time, total_time_sec=total_time, dataset_hash=dataset_hash, row_count=int(dataset.shape[0]), column_count=int(dataset.shape[1]), ) def _run_subprocess(settings: BenchmarkSettings, engine_mode: str) -> BenchmarkResult: env = os.environ.copy() if engine_mode == "async": env["DATA_DESIGNER_ASYNC_ENGINE"] = "1" else: env.pop("DATA_DESIGNER_ASYNC_ENGINE", None) script_path = Path(__file__).resolve() cmd = [sys.executable, str(script_path), "--mode", "run", "--engine", engine_mode, *settings.to_cli_args()] completed = subprocess.run(cmd, capture_output=True, text=True, env=env, check=False) if completed.returncode != 0: raise RuntimeError(f"Benchmark subprocess failed.\nstdout:\n{completed.stdout}\nstderr:\n{completed.stderr}") for line in reversed(completed.stdout.splitlines()): if line.startswith(RESULT_PREFIX): payload = json.loads(line.removeprefix(RESULT_PREFIX)) return BenchmarkResult.from_dict(payload) raise RuntimeError( f"Benchmark subprocess did not emit a result payload.\nstdout:\n{completed.stdout}\nstderr:\n{completed.stderr}" ) def _format_speedup(sync_time: float, async_time: float) -> str: if async_time <= 0: return "n/a" return f"{(sync_time / async_time):.2f}x" def _run_with_progress(settings: BenchmarkSettings, engine_mode: str, iteration: int, total: int) -> BenchmarkResult: print(f"[{iteration}/{total}] Running {engine_mode} benchmark...", end="", flush=True) result = _run_subprocess(settings, engine_mode) print(f" done ({result.total_time_sec:.3f}s)") return result def _compare_runs(settings: BenchmarkSettings, iterations: int) -> int: sync_results: list[BenchmarkResult] = [] async_results: list[BenchmarkResult] = [] expected_hash: str | None = None for iteration in range(1, iterations + 1): sync_result = _run_with_progress(settings, "sync", iteration, iterations) async_result = _run_with_progress(settings, "async", iteration, iterations) if sync_result.dataset_hash != async_result.dataset_hash: print( "Content mismatch detected: " f"sync hash {sync_result.dataset_hash} vs async hash {async_result.dataset_hash}" ) return 1 if expected_hash is None: expected_hash = sync_result.dataset_hash elif expected_hash != sync_result.dataset_hash or expected_hash != async_result.dataset_hash: print("Content mismatch detected across iterations.") return 1 sync_results.append(sync_result) async_results.append(async_result) build_sync = [result.build_time_sec for result in sync_results] build_async = [result.build_time_sec for result in async_results] total_sync = [result.total_time_sec for result in sync_results] total_async = [result.total_time_sec for result in async_results] build_speedups = [sync / async_ for sync, async_ in zip(build_sync, build_async)] total_speedups = [sync / async_ for sync, async_ in zip(total_sync, total_async)] build_diffs = [sync - async_ for sync, async_ in zip(build_sync, build_async)] total_diffs = [sync - async_ for sync, async_ in zip(total_sync, total_async)] build_sync_stats = _compute_stats(build_sync) build_async_stats = _compute_stats(build_async) total_sync_stats = _compute_stats(total_sync) total_async_stats = _compute_stats(total_async) build_speed_stats = _compute_stats(build_speedups) total_speed_stats = _compute_stats(total_speedups) build_diff_stats = _compute_stats(build_diffs) total_diff_stats = _compute_stats(total_diffs) latency_label = "on" if settings.simulated_latency else "off" print("\nEngine benchmark summary (95% CI)") print(f"- runs: {iterations} | content match: yes | hash {expected_hash}") print(f"- simulated latency: {latency_label}") print(f"- build time sync: {_format_stats(build_sync_stats, unit='s')}") print(f"- build time async: {_format_stats(build_async_stats, unit='s')}") print( f"- build speedup: {_format_speed_stats(build_speed_stats)} | " f"paired diff {_format_stats(build_diff_stats, unit='s')} | " f"significant: {'yes' if _significant_diff(build_diff_stats) else 'no'}" ) print(f"- total time sync: {_format_stats(total_sync_stats, unit='s')}") print(f"- total time async: {_format_stats(total_async_stats, unit='s')}") print( f"- total speedup: {_format_speed_stats(total_speed_stats)} | " f"paired diff {_format_stats(total_diff_stats, unit='s')} | " f"significant: {'yes' if _significant_diff(total_diff_stats) else 'no'}" ) return 0 def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Benchmark DataDesigner engine with mock LLMs and compare async execution." ) parser.add_argument( "--mode", type=str, choices=("compare", "run"), default="compare", help="Run both engines in subprocesses, or run once in the current process.", ) parser.add_argument( "--engine", type=str, choices=("sync", "async"), default="sync", help="Engine mode for --mode run.", ) parser.add_argument("--num-records", type=int, default=DEFAULT_NUM_RECORDS, help="Records to generate.") parser.add_argument("--buffer-size", type=int, default=DEFAULT_BUFFER_SIZE, help="Batch buffer size.") parser.add_argument("--seed", type=int, default=DEFAULT_SEED, help="Random seed for determinism.") parser.add_argument( "--iterations", type=int, default=DEFAULT_ITERATIONS, help="Number of sync/async runs to include in the compare mode.", ) parser.add_argument( "--max-parallel-requests", type=int, default=DEFAULT_MAX_PARALLEL_REQUESTS, help="Max parallel LLM requests per model.", ) parser.add_argument( "--validator-batch-size", type=int, default=DEFAULT_VALIDATOR_BATCH_SIZE, help="Batch size for the local validator.", ) parser.add_argument( "--simulated-latency", action="store_true", default=False, help="Simulate LLM response latency using beta-distributed delays.", ) return parser.parse_args() def main() -> None: args = _parse_args() settings = BenchmarkSettings( num_records=args.num_records, buffer_size=args.buffer_size, seed=args.seed, max_parallel_requests=args.max_parallel_requests, validator_batch_size=args.validator_batch_size, simulated_latency=args.simulated_latency, ) if args.mode == "compare": sys.exit(_compare_runs(settings, args.iterations)) if args.engine == "async": os.environ["DATA_DESIGNER_ASYNC_ENGINE"] = "1" else: os.environ.pop("DATA_DESIGNER_ASYNC_ENGINE", None) print(f"Running {args.engine} benchmark...") result = _run_single_benchmark(settings, args.engine) print(f"{RESULT_PREFIX}{json.dumps(result.to_dict(), sort_keys=True)}") if __name__ == "__main__": main()