mirror of
https://github.com/NVIDIA-NeMo/DataDesigner
synced 2026-05-24 09:48:29 +00:00
* chore: rename ColumnWiseDatasetBuilder, wire async preview, unify row-group lifecycle - Rename ColumnWiseDatasetBuilder to DatasetBuilder and column_wise_builder.py to dataset_builder.py, update all references - Extract _prepare_async_run() factory shared by build and preview paths - Add _build_async_preview() for async preview with no disk checkpoints - Replace on_row_group_complete/on_checkpoint_complete with single on_finalize_row_group callback; caller handles checkpointing - Add free_row_group() on RowGroupBufferManager for discard-without-write - Free fully-dropped row groups instead of finalizing them - Add consolidated AsyncProgressReporter for async generation logging Closes #437, closes #442, closes #444 * feat: add consolidated async progress reporting with row group context - Add AsyncProgressReporter: groups per-column progress into a single log block emitted at configurable intervals (default 5s) - Add quiet mode to ProgressTracker to suppress per-tracker logging when used with the consolidated reporter - Add ContextVar-based row group tagging (RG1, RG2, ...) for log messages emitted inside async tasks (samplers, expressions, seeds) - Add progress_interval to RunConfig for user-configurable reporting - Remove log_start_async from ProgressTracker (superseded by reporter) Closes #443 * fix async preview and progress reporting * feat: add opt-in sticky ANSI progress bars for generation Add RunConfig.progress_bar setting that replaces periodic log-line progress with sticky terminal bars that stay at the bottom while logs scroll above. Pure ANSI escape codes, no new dependencies. Disabled by default - existing log-based output unchanged. * docs: add missing progress_interval docstring in RunConfig * fix: update progress bar on every completion in async path Skip the time gate when the progress bar is active so the bar redraws on every record instead of every progress_interval seconds. * fix: resolve row-group semaphore deadlock when all tasks are deferred When all tasks for admitted row groups fail with transient errors, the row-group semaphore never releases, blocking admission of new row groups. Fix by salvaging stalled row groups inline - retrying deferred tasks immediately so row groups can checkpoint and free their semaphore slots. Also updates row group log format to (x/X) with leading zeros. * fix: eagerly salvage stalled row groups to avoid wasting semaphore slots Run inline salvage after every checkpoint pass instead of only when globally stalled. Row groups with 0 in-flight and only deferred tasks are salvaged immediately, freeing their semaphore slot for new work. * fix: address review findings from greptile and codex - Use `with` statement for progress bar context (safe __exit__ on error) - Check bar.is_active instead of bar is not None (non-TTY fallback) - Record failures (not skips) for tasks that exhaust salvage retries - Record skipped tasks when pre-batch filtering drops rows * fix: stable progress bar width and accurate failure counts - Pre-compute fixed stats width at bar creation to prevent bar resizing when failed count appears - Cap displayed completed at total to avoid >100% on retries - Exclude already-failed columns from skip recording to prevent double-counting in progress reporter * fix: address Nabin's review - exclude_columns, dead code, docstring - Add exclude_columns={task.column} on non-retryable batch/from_scratch drop path to prevent double-counting (same pattern as cell path) - Simplify salvage drop to per-task exclude (is_dropped guard handles multi-column case) - Remove dead _in_flight_for_rg method - Fix context.py docstring to match actual (x/X) format * fix: _drain_frontier exits before dispatching ready salvage tasks After salvage discards a cell task from dispatched (making it available in the frontier), _drain_frontier broke immediately because nothing was in-flight yet. The task and its downstream were never re-dispatched, leaving the row group incomplete. Fix: only break when both ready and in-flight are empty. * fix: salvage edge cases found by code review - _drain_frontier: only break when both ready and in-flight are empty - _salvage_rounds: re-mark sibling columns as dispatched after from_scratch retry to prevent duplicate dispatch - _salvage_stalled_row_groups: separate exhausted tasks from new drain failures to avoid treating non-stalled tasks as permanent - _checkpoint_completed_row_groups: clean up deferred tasks for checkpointed row groups - Early shutdown: salvage stalled row groups before exiting * fix: skip record_failure for already-dropped rows in salvage When multiple columns fail for the same row, the first drop records a skip for the other column. Without this guard, record_failure fires again for the second column, double-counting it.
843 lines
29 KiB
Python
843 lines
29 KiB
Python
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""Benchmark DataDesigner engine performance with mock LLMs."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import contextlib
|
|
import hashlib
|
|
import json
|
|
import math
|
|
import os
|
|
import random
|
|
import statistics
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from collections.abc import Iterator
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from data_designer.config.column_configs import LLMTextColumnConfig, SamplerColumnConfig, ValidationColumnConfig
|
|
from data_designer.config.config_builder import DataDesignerConfigBuilder
|
|
from data_designer.config.mcp import MCPProvider, ToolConfig
|
|
from data_designer.config.models import ChatCompletionInferenceParams, ModelConfig, ModelProvider
|
|
from data_designer.config.run_config import RunConfig
|
|
from data_designer.config.sampler_params import SamplerType, UniformSamplerParams
|
|
from data_designer.config.validator_params import LocalCallableValidatorParams, ValidatorType
|
|
from data_designer.engine.mcp.registry import MCPToolDefinition, MCPToolResult
|
|
from data_designer.engine.models.clients.types import AssistantMessage, ChatCompletionResponse, ToolCall
|
|
from data_designer.lazy_heavy_imports import np, pd
|
|
|
|
if TYPE_CHECKING:
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
|
|
RESULT_PREFIX = "BENCHMARK_RESULT="
|
|
DEFAULT_NUM_RECORDS = 1024
|
|
DEFAULT_BUFFER_SIZE = 1024
|
|
DEFAULT_SEED = 11
|
|
DEFAULT_MAX_PARALLEL_REQUESTS = 16
|
|
DEFAULT_VALIDATOR_BATCH_SIZE = 256
|
|
DEFAULT_ITERATIONS = 5
|
|
|
|
MOCK_MCP_PROVIDER_NAME = "mock-mcp"
|
|
MOCK_TOOL_ALIAS = "mock-tools"
|
|
MOCK_TOOL_NAME = "mock_lookup"
|
|
MOCK_TOOL_DESCRIPTION = "Mock lookup tool for benchmark runs."
|
|
MOCK_TOOL_SCHEMA = {
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {"type": "string"},
|
|
"limit": {"type": "integer"},
|
|
},
|
|
"required": ["query"],
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BenchmarkSettings:
|
|
num_records: int
|
|
buffer_size: int
|
|
seed: int
|
|
max_parallel_requests: int
|
|
validator_batch_size: int
|
|
simulated_latency: bool = False
|
|
|
|
def to_cli_args(self) -> list[str]:
|
|
args = [
|
|
"--num-records",
|
|
str(self.num_records),
|
|
"--buffer-size",
|
|
str(self.buffer_size),
|
|
"--seed",
|
|
str(self.seed),
|
|
"--max-parallel-requests",
|
|
str(self.max_parallel_requests),
|
|
"--validator-batch-size",
|
|
str(self.validator_batch_size),
|
|
]
|
|
if self.simulated_latency:
|
|
args.append("--simulated-latency")
|
|
return args
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BenchmarkResult:
|
|
engine_mode: str
|
|
num_records: int
|
|
buffer_size: int
|
|
build_time_sec: float
|
|
total_time_sec: float
|
|
dataset_hash: str
|
|
row_count: int
|
|
column_count: int
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"engine_mode": self.engine_mode,
|
|
"num_records": self.num_records,
|
|
"buffer_size": self.buffer_size,
|
|
"build_time_sec": self.build_time_sec,
|
|
"total_time_sec": self.total_time_sec,
|
|
"dataset_hash": self.dataset_hash,
|
|
"row_count": self.row_count,
|
|
"column_count": self.column_count,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, payload: dict[str, Any]) -> BenchmarkResult:
|
|
return cls(
|
|
engine_mode=str(payload["engine_mode"]),
|
|
num_records=int(payload["num_records"]),
|
|
buffer_size=int(payload["buffer_size"]),
|
|
build_time_sec=float(payload["build_time_sec"]),
|
|
total_time_sec=float(payload["total_time_sec"]),
|
|
dataset_hash=str(payload["dataset_hash"]),
|
|
row_count=int(payload["row_count"]),
|
|
column_count=int(payload["column_count"]),
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MetricStats:
|
|
mean: float
|
|
stdev: float
|
|
ci_half_width: float
|
|
n: int
|
|
|
|
@property
|
|
def ci_low(self) -> float:
|
|
return self.mean - self.ci_half_width
|
|
|
|
@property
|
|
def ci_high(self) -> float:
|
|
return self.mean + self.ci_half_width
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ResponseProfile:
|
|
label: str
|
|
score_mu: float
|
|
score_sigma: float
|
|
latency_alpha: float
|
|
latency_beta: float
|
|
volatility_sigma: float
|
|
categories: tuple[str, ...]
|
|
category_weights: tuple[float, ...]
|
|
|
|
|
|
MODEL_PROFILES: dict[str, ResponseProfile] = {
|
|
"mock-alpha": ResponseProfile(
|
|
label="alpha",
|
|
score_mu=0.1,
|
|
score_sigma=0.35,
|
|
latency_alpha=2.2,
|
|
latency_beta=6.0,
|
|
volatility_sigma=0.25,
|
|
categories=("low", "mid", "high"),
|
|
category_weights=(0.25, 0.55, 0.2),
|
|
),
|
|
"mock-beta": ResponseProfile(
|
|
label="beta",
|
|
score_mu=0.3,
|
|
score_sigma=0.45,
|
|
latency_alpha=2.6,
|
|
latency_beta=4.8,
|
|
volatility_sigma=0.3,
|
|
categories=("low", "mid", "high"),
|
|
category_weights=(0.2, 0.5, 0.3),
|
|
),
|
|
"mock-gamma": ResponseProfile(
|
|
label="gamma",
|
|
score_mu=0.5,
|
|
score_sigma=0.5,
|
|
latency_alpha=3.0,
|
|
latency_beta=3.6,
|
|
volatility_sigma=0.35,
|
|
categories=("low", "mid", "high"),
|
|
category_weights=(0.15, 0.45, 0.4),
|
|
),
|
|
}
|
|
|
|
DEFAULT_PROFILE = ResponseProfile(
|
|
label="default",
|
|
score_mu=0.2,
|
|
score_sigma=0.4,
|
|
latency_alpha=2.4,
|
|
latency_beta=5.0,
|
|
volatility_sigma=0.3,
|
|
categories=("low", "mid", "high"),
|
|
category_weights=(0.3, 0.5, 0.2),
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FakeCompletionResponse:
|
|
"""Wraps a ChatCompletionResponse with benchmark-specific latency metadata."""
|
|
|
|
response: ChatCompletionResponse
|
|
latency_ms: float = 0.0
|
|
|
|
|
|
def _distinct_parallel_requests(base: int) -> tuple[int, int, int]:
|
|
if base < 3:
|
|
raise ValueError("max_parallel_requests must be >= 3 to create distinct per-model limits.")
|
|
high = base
|
|
mid = max(1, int(round(high / 2)))
|
|
low = max(1, int(round(high / 5)))
|
|
|
|
if mid >= high:
|
|
mid = high - 1
|
|
if low >= mid:
|
|
low = max(1, mid - 1)
|
|
|
|
return high, mid, low
|
|
|
|
|
|
def _t_critical_95(df: int) -> float:
|
|
table = {
|
|
1: 12.706,
|
|
2: 4.303,
|
|
3: 3.182,
|
|
4: 2.776,
|
|
5: 2.571,
|
|
6: 2.447,
|
|
7: 2.365,
|
|
8: 2.306,
|
|
9: 2.262,
|
|
10: 2.228,
|
|
11: 2.201,
|
|
12: 2.179,
|
|
13: 2.160,
|
|
14: 2.145,
|
|
15: 2.131,
|
|
16: 2.120,
|
|
17: 2.110,
|
|
18: 2.101,
|
|
19: 2.093,
|
|
20: 2.086,
|
|
21: 2.080,
|
|
22: 2.074,
|
|
23: 2.069,
|
|
24: 2.064,
|
|
25: 2.060,
|
|
26: 2.056,
|
|
27: 2.052,
|
|
28: 2.048,
|
|
29: 2.045,
|
|
30: 2.042,
|
|
}
|
|
return table.get(df, 1.96)
|
|
|
|
|
|
def _compute_stats(values: list[float]) -> MetricStats:
|
|
if not values:
|
|
return MetricStats(mean=0.0, stdev=0.0, ci_half_width=0.0, n=0)
|
|
if len(values) == 1:
|
|
return MetricStats(mean=values[0], stdev=0.0, ci_half_width=0.0, n=1)
|
|
stdev = statistics.stdev(values)
|
|
mean = statistics.mean(values)
|
|
t_value = _t_critical_95(len(values) - 1)
|
|
ci_half_width = t_value * stdev / math.sqrt(len(values))
|
|
return MetricStats(mean=mean, stdev=stdev, ci_half_width=ci_half_width, n=len(values))
|
|
|
|
|
|
def _format_stats(stats: MetricStats, *, unit: str, precision: int = 3) -> str:
|
|
fmt = f"{{:.{precision}f}}"
|
|
mean = fmt.format(stats.mean)
|
|
ci = fmt.format(stats.ci_half_width)
|
|
stdev = fmt.format(stats.stdev)
|
|
return f"{mean}{unit} ± {ci}{unit} (stdev {stdev}{unit}, n={stats.n})"
|
|
|
|
|
|
def _format_speed_stats(stats: MetricStats, *, precision: int = 2) -> str:
|
|
fmt = f"{{:.{precision}f}}"
|
|
mean = fmt.format(stats.mean)
|
|
ci = fmt.format(stats.ci_half_width)
|
|
stdev = fmt.format(stats.stdev)
|
|
return f"{mean}x ± {ci}x (stdev {stdev}x, n={stats.n})"
|
|
|
|
|
|
def _significant_diff(stats: MetricStats) -> bool:
|
|
return stats.n > 1 and abs(stats.mean) > stats.ci_half_width
|
|
|
|
|
|
def _json_default(value: Any) -> Any:
|
|
if isinstance(value, np.generic):
|
|
return value.item()
|
|
if isinstance(value, np.ndarray):
|
|
return value.tolist()
|
|
if isinstance(value, (pd.Timestamp, pd.Timedelta)):
|
|
return value.isoformat()
|
|
if isinstance(value, set):
|
|
return sorted(value)
|
|
if isinstance(value, bytes):
|
|
return value.decode("utf-8", errors="replace")
|
|
return str(value)
|
|
|
|
|
|
def _stable_seed(model: str, messages: list[dict[str, Any]]) -> int:
|
|
payload = json.dumps(
|
|
{"model": model, "messages": messages},
|
|
sort_keys=True,
|
|
separators=(",", ":"),
|
|
ensure_ascii=True,
|
|
default=_json_default,
|
|
)
|
|
digest = hashlib.sha256(payload.encode("utf-8")).digest()
|
|
return int.from_bytes(digest[:8], "big")
|
|
|
|
|
|
def _profile_for_model(model: str) -> ResponseProfile:
|
|
for key, profile in MODEL_PROFILES.items():
|
|
if key in model:
|
|
return profile
|
|
return DEFAULT_PROFILE
|
|
|
|
|
|
def _mock_response_text(model: str, messages: list[dict[str, Any]]) -> tuple[str, float]:
|
|
profile = _profile_for_model(model)
|
|
rng = random.Random(_stable_seed(model, messages))
|
|
category = rng.choices(profile.categories, weights=profile.category_weights, k=1)[0]
|
|
score = rng.lognormvariate(profile.score_mu, profile.score_sigma)
|
|
latency_ms = int(rng.betavariate(profile.latency_alpha, profile.latency_beta) * 900.0)
|
|
volatility = rng.gauss(0.0, profile.volatility_sigma)
|
|
text = f"{profile.label}:{category}|score={score:.3f}|latency_ms={latency_ms}|vol={volatility:.3f}"
|
|
return text, float(latency_ms)
|
|
|
|
|
|
def _tool_call_id(model: str, messages: list[dict[str, Any]]) -> str:
|
|
call_seed = _stable_seed(model, messages)
|
|
return f"tool-{call_seed:016x}"
|
|
|
|
|
|
def _tool_call_arguments(model: str, messages: list[dict[str, Any]]) -> dict[str, Any]:
|
|
rng = random.Random(_stable_seed(model, messages))
|
|
return {
|
|
"query": f"{model}-lookup-{rng.randint(1000, 9999)}",
|
|
"limit": rng.randint(1, 3),
|
|
}
|
|
|
|
|
|
def _build_tool_call(model: str, messages: list[dict[str, Any]]) -> dict[str, Any]:
|
|
arguments = _tool_call_arguments(model, messages)
|
|
return {
|
|
"id": _tool_call_id(model, messages),
|
|
"type": "function",
|
|
"function": {"name": MOCK_TOOL_NAME, "arguments": json.dumps(arguments)},
|
|
}
|
|
|
|
|
|
def _should_request_tool(messages: list[dict[str, Any]]) -> bool:
|
|
return not any(message.get("role") == "tool" for message in messages)
|
|
|
|
|
|
def _mock_tool_definition() -> MCPToolDefinition:
|
|
return MCPToolDefinition(
|
|
name=MOCK_TOOL_NAME,
|
|
description=MOCK_TOOL_DESCRIPTION,
|
|
input_schema=MOCK_TOOL_SCHEMA,
|
|
)
|
|
|
|
|
|
def _mock_tool_result(tool_name: str, arguments: dict[str, Any], provider_name: str) -> MCPToolResult:
|
|
payload = {
|
|
"tool": tool_name,
|
|
"provider": provider_name,
|
|
"query": arguments.get("query", ""),
|
|
"limit": arguments.get("limit", 0),
|
|
"status": "ok",
|
|
}
|
|
return MCPToolResult(content=json.dumps(payload))
|
|
|
|
|
|
def _fake_response(model: str, messages: list[dict[str, Any]], **kwargs: Any) -> FakeCompletionResponse:
|
|
if kwargs.get("tools") and _should_request_tool(messages):
|
|
raw_call = _build_tool_call(model, messages)
|
|
profile = _profile_for_model(model)
|
|
rng = random.Random(_stable_seed(model, messages))
|
|
latency_ms = float(int(rng.betavariate(profile.latency_alpha, profile.latency_beta) * 900.0))
|
|
return FakeCompletionResponse(
|
|
response=ChatCompletionResponse(
|
|
message=AssistantMessage(
|
|
content="Using tool.",
|
|
tool_calls=[
|
|
ToolCall(
|
|
id=raw_call["id"],
|
|
name=raw_call["function"]["name"],
|
|
arguments_json=raw_call["function"]["arguments"],
|
|
)
|
|
],
|
|
),
|
|
),
|
|
latency_ms=latency_ms,
|
|
)
|
|
response_text, latency_ms = _mock_response_text(model, messages)
|
|
return FakeCompletionResponse(
|
|
response=ChatCompletionResponse(message=AssistantMessage(content=response_text)),
|
|
latency_ms=latency_ms,
|
|
)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _patch_llm_responses(*, simulated_latency: bool = False) -> Iterator[None]:
|
|
from data_designer.engine.models.clients.adapters.openai_compatible import OpenAICompatibleClient
|
|
|
|
original_completion = OpenAICompatibleClient.completion
|
|
original_acompletion = OpenAICompatibleClient.acompletion
|
|
|
|
def fake_completion(self: Any, request: Any) -> ChatCompletionResponse:
|
|
_ = self
|
|
fake = _fake_response(request.model, request.messages, tools=request.tools)
|
|
if simulated_latency and fake.latency_ms > 0:
|
|
time.sleep(fake.latency_ms / 1000.0)
|
|
return fake.response
|
|
|
|
async def fake_acompletion(self: Any, request: Any) -> ChatCompletionResponse:
|
|
_ = self
|
|
fake = _fake_response(request.model, request.messages, tools=request.tools)
|
|
if simulated_latency and fake.latency_ms > 0:
|
|
await asyncio.sleep(fake.latency_ms / 1000.0)
|
|
return fake.response
|
|
|
|
OpenAICompatibleClient.completion = fake_completion # type: ignore[assignment]
|
|
OpenAICompatibleClient.acompletion = fake_acompletion # type: ignore[assignment]
|
|
try:
|
|
yield
|
|
finally:
|
|
OpenAICompatibleClient.completion = original_completion # type: ignore[assignment]
|
|
OpenAICompatibleClient.acompletion = original_acompletion # type: ignore[assignment]
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _patch_mcp_io() -> Iterator[None]:
|
|
import data_designer.engine.mcp.io as mcp_io
|
|
|
|
original_list_tools = mcp_io.list_tools
|
|
original_call_tools = mcp_io.call_tools
|
|
|
|
def fake_list_tools(provider: Any, timeout_sec: float | None = None) -> tuple[MCPToolDefinition, ...]:
|
|
if getattr(provider, "name", None) != MOCK_MCP_PROVIDER_NAME:
|
|
return original_list_tools(provider, timeout_sec=timeout_sec)
|
|
return (_mock_tool_definition(),)
|
|
|
|
def fake_call_tools(
|
|
calls: list[tuple[Any, str, dict[str, Any]]],
|
|
*,
|
|
timeout_sec: float | None = None,
|
|
) -> list[MCPToolResult]:
|
|
if any(getattr(call[0], "name", None) != MOCK_MCP_PROVIDER_NAME for call in calls):
|
|
return original_call_tools(calls, timeout_sec=timeout_sec)
|
|
return [_mock_tool_result(tool_name, arguments, provider.name) for provider, tool_name, arguments in calls]
|
|
|
|
mcp_io.list_tools = fake_list_tools
|
|
mcp_io.call_tools = fake_call_tools
|
|
try:
|
|
yield
|
|
finally:
|
|
mcp_io.list_tools = original_list_tools
|
|
mcp_io.call_tools = original_call_tools
|
|
|
|
|
|
def _extract_metric(text: str, key: str) -> float | None:
|
|
marker = f"{key}="
|
|
start = text.find(marker)
|
|
if start == -1:
|
|
return None
|
|
start += len(marker)
|
|
end = start
|
|
while end < len(text) and (text[end].isdigit() or text[end] in {".", "-"}):
|
|
end += 1
|
|
try:
|
|
return float(text[start:end])
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _validate_recommendation(df: pd.DataFrame) -> pd.DataFrame:
|
|
series = df["llm_stage3"].astype(str)
|
|
scores = series.map(lambda text: _extract_metric(text, "score"))
|
|
latencies = series.map(lambda text: _extract_metric(text, "latency_ms"))
|
|
scores_numeric = pd.to_numeric(scores, errors="coerce")
|
|
latency_numeric = pd.to_numeric(latencies, errors="coerce")
|
|
is_valid = scores_numeric.between(0.0, 10.0) & latency_numeric.between(0.0, 900.0)
|
|
return pd.DataFrame(
|
|
{
|
|
"is_valid": is_valid.fillna(False).astype(bool),
|
|
"score": scores_numeric,
|
|
"latency_ms": latency_numeric,
|
|
}
|
|
)
|
|
|
|
|
|
def _build_config(settings: BenchmarkSettings) -> DataDesignerConfigBuilder:
|
|
high_parallel, mid_parallel, low_parallel = _distinct_parallel_requests(settings.max_parallel_requests)
|
|
model_configs = [
|
|
ModelConfig(
|
|
alias="mock-alpha",
|
|
model="mock-alpha",
|
|
provider="mock-provider",
|
|
inference_parameters=ChatCompletionInferenceParams(max_parallel_requests=high_parallel),
|
|
skip_health_check=True,
|
|
),
|
|
ModelConfig(
|
|
alias="mock-beta",
|
|
model="mock-beta",
|
|
provider="mock-provider",
|
|
inference_parameters=ChatCompletionInferenceParams(max_parallel_requests=low_parallel),
|
|
skip_health_check=True,
|
|
),
|
|
ModelConfig(
|
|
alias="mock-gamma",
|
|
model="mock-gamma",
|
|
provider="mock-provider",
|
|
inference_parameters=ChatCompletionInferenceParams(max_parallel_requests=mid_parallel),
|
|
skip_health_check=True,
|
|
),
|
|
]
|
|
|
|
builder = DataDesignerConfigBuilder(model_configs=model_configs)
|
|
builder.add_tool_config(
|
|
ToolConfig(
|
|
tool_alias=MOCK_TOOL_ALIAS,
|
|
providers=[MOCK_MCP_PROVIDER_NAME],
|
|
allow_tools=[MOCK_TOOL_NAME],
|
|
max_tool_call_turns=1,
|
|
timeout_sec=1.0,
|
|
)
|
|
)
|
|
builder.add_column(
|
|
SamplerColumnConfig(
|
|
name="seed_value",
|
|
sampler_type=SamplerType.UNIFORM,
|
|
params=UniformSamplerParams(low=0.0, high=100.0, decimal_places=3),
|
|
)
|
|
)
|
|
builder.add_column(
|
|
LLMTextColumnConfig(
|
|
name="llm_stage1",
|
|
model_alias="mock-alpha",
|
|
prompt="Summarize the signal for seed {{ seed_value }}.",
|
|
)
|
|
)
|
|
builder.add_column(
|
|
LLMTextColumnConfig(
|
|
name="llm_stage2",
|
|
model_alias="mock-beta",
|
|
tool_alias=MOCK_TOOL_ALIAS,
|
|
prompt="Analyze {{ llm_stage1 }} and produce a risk assessment.",
|
|
)
|
|
)
|
|
builder.add_column(
|
|
LLMTextColumnConfig(
|
|
name="llm_stage3",
|
|
model_alias="mock-gamma",
|
|
prompt="Generate a recommendation from {{ llm_stage2 }} with seed {{ seed_value }}.",
|
|
)
|
|
)
|
|
builder.add_column(
|
|
ValidationColumnConfig(
|
|
name="llm_stage3_validation",
|
|
target_columns=["llm_stage3"],
|
|
validator_type=ValidatorType.LOCAL_CALLABLE,
|
|
validator_params=LocalCallableValidatorParams(validation_function=_validate_recommendation),
|
|
batch_size=settings.validator_batch_size,
|
|
)
|
|
)
|
|
return builder
|
|
|
|
|
|
def _dataset_fingerprint(df: pd.DataFrame) -> str:
|
|
normalized = df.reset_index(drop=True)
|
|
normalized = normalized.reindex(sorted(normalized.columns), axis=1)
|
|
records = normalized.to_dict(orient="records")
|
|
payload = json.dumps(
|
|
records,
|
|
sort_keys=True,
|
|
separators=(",", ":"),
|
|
ensure_ascii=True,
|
|
default=_json_default,
|
|
)
|
|
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _run_single_benchmark(settings: BenchmarkSettings, engine_mode: str) -> BenchmarkResult:
|
|
# Imports are deferred so engine selection respects DATA_DESIGNER_ASYNC_ENGINE.
|
|
from data_designer.engine.dataset_builders.artifact_storage import ArtifactStorage
|
|
from data_designer.engine.dataset_builders.dataset_builder import DatasetBuilder
|
|
from data_designer.engine.model_provider import resolve_model_provider_registry
|
|
from data_designer.engine.resources.resource_provider import create_resource_provider
|
|
from data_designer.engine.resources.seed_reader import SeedReaderRegistry
|
|
from data_designer.engine.secret_resolver import CompositeResolver, EnvironmentResolver, PlaintextResolver
|
|
|
|
random.seed(settings.seed)
|
|
np.random.seed(settings.seed)
|
|
|
|
run_config = RunConfig(
|
|
buffer_size=settings.buffer_size,
|
|
disable_early_shutdown=True,
|
|
max_conversation_restarts=0,
|
|
max_conversation_correction_steps=0,
|
|
)
|
|
builder = _build_config(settings)
|
|
|
|
provider = ModelProvider(
|
|
name="mock-provider",
|
|
endpoint="https://mock.local",
|
|
provider_type="openai",
|
|
api_key="mock-key",
|
|
)
|
|
mcp_provider = MCPProvider(
|
|
name=MOCK_MCP_PROVIDER_NAME,
|
|
endpoint="https://mock.local/mcp",
|
|
api_key="mock-mcp-key",
|
|
)
|
|
model_provider_registry = resolve_model_provider_registry([provider], default_provider_name=provider.name)
|
|
secret_resolver = CompositeResolver([EnvironmentResolver(), PlaintextResolver()])
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
artifact_storage = ArtifactStorage(artifact_path=temp_dir, dataset_name=f"benchmark-{engine_mode}")
|
|
resource_provider = create_resource_provider(
|
|
artifact_storage=artifact_storage,
|
|
model_configs=builder.model_configs,
|
|
secret_resolver=secret_resolver,
|
|
model_provider_registry=model_provider_registry,
|
|
seed_reader_registry=SeedReaderRegistry(readers=[]),
|
|
blob_storage=None,
|
|
seed_dataset_source=None,
|
|
run_config=run_config,
|
|
mcp_providers=[mcp_provider],
|
|
tool_configs=builder.tool_configs,
|
|
)
|
|
dataset_builder = DatasetBuilder(
|
|
data_designer_config=builder.build(),
|
|
resource_provider=resource_provider,
|
|
)
|
|
|
|
total_start = time.perf_counter()
|
|
with _patch_llm_responses(simulated_latency=settings.simulated_latency), _patch_mcp_io():
|
|
build_start = time.perf_counter()
|
|
dataset_builder.build(num_records=settings.num_records)
|
|
build_time = time.perf_counter() - build_start
|
|
dataset = dataset_builder.artifact_storage.load_dataset_with_dropped_columns()
|
|
|
|
dataset_hash = _dataset_fingerprint(dataset)
|
|
total_time = time.perf_counter() - total_start
|
|
|
|
return BenchmarkResult(
|
|
engine_mode=engine_mode,
|
|
num_records=settings.num_records,
|
|
buffer_size=settings.buffer_size,
|
|
build_time_sec=build_time,
|
|
total_time_sec=total_time,
|
|
dataset_hash=dataset_hash,
|
|
row_count=int(dataset.shape[0]),
|
|
column_count=int(dataset.shape[1]),
|
|
)
|
|
|
|
|
|
def _run_subprocess(settings: BenchmarkSettings, engine_mode: str) -> BenchmarkResult:
|
|
env = os.environ.copy()
|
|
if engine_mode == "async":
|
|
env["DATA_DESIGNER_ASYNC_ENGINE"] = "1"
|
|
else:
|
|
env.pop("DATA_DESIGNER_ASYNC_ENGINE", None)
|
|
|
|
script_path = Path(__file__).resolve()
|
|
cmd = [sys.executable, str(script_path), "--mode", "run", "--engine", engine_mode, *settings.to_cli_args()]
|
|
completed = subprocess.run(cmd, capture_output=True, text=True, env=env, check=False)
|
|
|
|
if completed.returncode != 0:
|
|
raise RuntimeError(f"Benchmark subprocess failed.\nstdout:\n{completed.stdout}\nstderr:\n{completed.stderr}")
|
|
|
|
for line in reversed(completed.stdout.splitlines()):
|
|
if line.startswith(RESULT_PREFIX):
|
|
payload = json.loads(line.removeprefix(RESULT_PREFIX))
|
|
return BenchmarkResult.from_dict(payload)
|
|
|
|
raise RuntimeError(
|
|
f"Benchmark subprocess did not emit a result payload.\nstdout:\n{completed.stdout}\nstderr:\n{completed.stderr}"
|
|
)
|
|
|
|
|
|
def _format_speedup(sync_time: float, async_time: float) -> str:
|
|
if async_time <= 0:
|
|
return "n/a"
|
|
return f"{(sync_time / async_time):.2f}x"
|
|
|
|
|
|
def _run_with_progress(settings: BenchmarkSettings, engine_mode: str, iteration: int, total: int) -> BenchmarkResult:
|
|
print(f"[{iteration}/{total}] Running {engine_mode} benchmark...", end="", flush=True)
|
|
result = _run_subprocess(settings, engine_mode)
|
|
print(f" done ({result.total_time_sec:.3f}s)")
|
|
return result
|
|
|
|
|
|
def _compare_runs(settings: BenchmarkSettings, iterations: int) -> int:
|
|
sync_results: list[BenchmarkResult] = []
|
|
async_results: list[BenchmarkResult] = []
|
|
expected_hash: str | None = None
|
|
|
|
for iteration in range(1, iterations + 1):
|
|
sync_result = _run_with_progress(settings, "sync", iteration, iterations)
|
|
async_result = _run_with_progress(settings, "async", iteration, iterations)
|
|
|
|
if sync_result.dataset_hash != async_result.dataset_hash:
|
|
print(
|
|
"Content mismatch detected: "
|
|
f"sync hash {sync_result.dataset_hash} vs async hash {async_result.dataset_hash}"
|
|
)
|
|
return 1
|
|
|
|
if expected_hash is None:
|
|
expected_hash = sync_result.dataset_hash
|
|
elif expected_hash != sync_result.dataset_hash or expected_hash != async_result.dataset_hash:
|
|
print("Content mismatch detected across iterations.")
|
|
return 1
|
|
|
|
sync_results.append(sync_result)
|
|
async_results.append(async_result)
|
|
|
|
build_sync = [result.build_time_sec for result in sync_results]
|
|
build_async = [result.build_time_sec for result in async_results]
|
|
total_sync = [result.total_time_sec for result in sync_results]
|
|
total_async = [result.total_time_sec for result in async_results]
|
|
|
|
build_speedups = [sync / async_ for sync, async_ in zip(build_sync, build_async)]
|
|
total_speedups = [sync / async_ for sync, async_ in zip(total_sync, total_async)]
|
|
build_diffs = [sync - async_ for sync, async_ in zip(build_sync, build_async)]
|
|
total_diffs = [sync - async_ for sync, async_ in zip(total_sync, total_async)]
|
|
|
|
build_sync_stats = _compute_stats(build_sync)
|
|
build_async_stats = _compute_stats(build_async)
|
|
total_sync_stats = _compute_stats(total_sync)
|
|
total_async_stats = _compute_stats(total_async)
|
|
|
|
build_speed_stats = _compute_stats(build_speedups)
|
|
total_speed_stats = _compute_stats(total_speedups)
|
|
build_diff_stats = _compute_stats(build_diffs)
|
|
total_diff_stats = _compute_stats(total_diffs)
|
|
|
|
latency_label = "on" if settings.simulated_latency else "off"
|
|
print("\nEngine benchmark summary (95% CI)")
|
|
print(f"- runs: {iterations} | content match: yes | hash {expected_hash}")
|
|
print(f"- simulated latency: {latency_label}")
|
|
print(f"- build time sync: {_format_stats(build_sync_stats, unit='s')}")
|
|
print(f"- build time async: {_format_stats(build_async_stats, unit='s')}")
|
|
print(
|
|
f"- build speedup: {_format_speed_stats(build_speed_stats)} | "
|
|
f"paired diff {_format_stats(build_diff_stats, unit='s')} | "
|
|
f"significant: {'yes' if _significant_diff(build_diff_stats) else 'no'}"
|
|
)
|
|
print(f"- total time sync: {_format_stats(total_sync_stats, unit='s')}")
|
|
print(f"- total time async: {_format_stats(total_async_stats, unit='s')}")
|
|
print(
|
|
f"- total speedup: {_format_speed_stats(total_speed_stats)} | "
|
|
f"paired diff {_format_stats(total_diff_stats, unit='s')} | "
|
|
f"significant: {'yes' if _significant_diff(total_diff_stats) else 'no'}"
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
def _parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Benchmark DataDesigner engine with mock LLMs and compare async execution."
|
|
)
|
|
parser.add_argument(
|
|
"--mode",
|
|
type=str,
|
|
choices=("compare", "run"),
|
|
default="compare",
|
|
help="Run both engines in subprocesses, or run once in the current process.",
|
|
)
|
|
parser.add_argument(
|
|
"--engine",
|
|
type=str,
|
|
choices=("sync", "async"),
|
|
default="sync",
|
|
help="Engine mode for --mode run.",
|
|
)
|
|
parser.add_argument("--num-records", type=int, default=DEFAULT_NUM_RECORDS, help="Records to generate.")
|
|
parser.add_argument("--buffer-size", type=int, default=DEFAULT_BUFFER_SIZE, help="Batch buffer size.")
|
|
parser.add_argument("--seed", type=int, default=DEFAULT_SEED, help="Random seed for determinism.")
|
|
parser.add_argument(
|
|
"--iterations",
|
|
type=int,
|
|
default=DEFAULT_ITERATIONS,
|
|
help="Number of sync/async runs to include in the compare mode.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-parallel-requests",
|
|
type=int,
|
|
default=DEFAULT_MAX_PARALLEL_REQUESTS,
|
|
help="Max parallel LLM requests per model.",
|
|
)
|
|
parser.add_argument(
|
|
"--validator-batch-size",
|
|
type=int,
|
|
default=DEFAULT_VALIDATOR_BATCH_SIZE,
|
|
help="Batch size for the local validator.",
|
|
)
|
|
parser.add_argument(
|
|
"--simulated-latency",
|
|
action="store_true",
|
|
default=False,
|
|
help="Simulate LLM response latency using beta-distributed delays.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
args = _parse_args()
|
|
settings = BenchmarkSettings(
|
|
num_records=args.num_records,
|
|
buffer_size=args.buffer_size,
|
|
seed=args.seed,
|
|
max_parallel_requests=args.max_parallel_requests,
|
|
validator_batch_size=args.validator_batch_size,
|
|
simulated_latency=args.simulated_latency,
|
|
)
|
|
|
|
if args.mode == "compare":
|
|
sys.exit(_compare_runs(settings, args.iterations))
|
|
|
|
if args.engine == "async":
|
|
os.environ["DATA_DESIGNER_ASYNC_ENGINE"] = "1"
|
|
else:
|
|
os.environ.pop("DATA_DESIGNER_ASYNC_ENGINE", None)
|
|
|
|
print(f"Running {args.engine} benchmark...")
|
|
result = _run_single_benchmark(settings, args.engine)
|
|
print(f"{RESULT_PREFIX}{json.dumps(result.to_dict(), sort_keys=True)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|