feat: support sample-weighted aggregations for sampled trace data (#1963)

## Problem

High-throughput services can produce millions of spans per second. Storing every span is expensive, so we run the OpenTelemetry Collector's tail-sampling processor to keep only 1-in-N spans. Each kept span carries a `SampleRate` attribute recording N.

Once data is sampled, naive aggregations are wrong: count() returns N-x fewer events than actually occurred, sum()/avg() are biased, and percentiles shift. Dashboards show misleadingly low request counts, throughput, and error rates, making capacity planning and alerting unreliable.

### Why Materialized Views Cannot Solve This Alone

A materialized view that pre-aggregates sampled spans is a useful performance optimization for known dashboard queries, but it  cannot replace a sampling-aware query engine.

**Fixed dimensions.** A materialized view pre-aggregates by a fixed set of GROUP BY keys (e.g. `ServiceName`, `SpanName`, `StatusCode`, `TimestampBucket`). Trace exploration requires slicing by arbitrary span attributes -- `http.target`, `k8s.pod.name`, custom business tags -- in combinations that cannot be predicted at view creation time. Grouping by a different dimension either requires going back to raw table or a separate materialized views for every possible dimension combination. If you try to work around the fixed-dimensions problem by adding high-cardinality span attributes to the GROUP BY, the materialized table approaches a 1:1 row ratio with the raw table. You end up doubling storage without meaningful compression.

**Fixed aggregation fields.** A typical MV only aggregates a single numeric column like `Duration`. Users want weighted aggregations over any numeric attribute: request body sizes, queue depths, retry counts, custom metrics attached to spans. Each new field requires adding more `AggregateFunction` columns and recreating the view.

**Industry precedent.** Platforms that rely solely on pre-aggregation (Datadog, Splunk, New Relic, Elastic) get accurate RED dashboards but cannot correct ad-hoc queries over sampled span data. Only query-engine weighting (Honeycomb) produces correct results for arbitrary ad-hoc queries, including weighted percentiles and heatmaps.

A better solution is making the query engine itself sampling-aware, so that all queries from dashboards, alerts, an ad-hoc searches, automatically weights by `SampleRate` regardless of which dimensions or fields the user picks. Materialized views remain a useful complement for accelerating known, fixed-dimension dashboard panels, but they are not a substitute for correct query-time weighting.

## Summary

TraceSourceSchema gets a new optional field `sampleRateExpression` - the ClickHouse expression that evaluates to the per-span sample rate (e.g. `SpanAttributes['SampleRate']`). When not configured, all queries are unchanged.
When set, the query builder rewrites SQL aggregations to weight each span by its sample rate:

  aggFn          | Before                 | After (sample-corrected)                            | Overhead
  -------------- | ---------------------- | --------------------------------------------------- | --------
  count          | count()                | sum(weight)                                         | ~1x
  count + cond   | countIf(cond)          | sumIf(weight, cond)                                 | ~1x
  avg            | avg(col)               | sum(col * weight) / sum(weight)                     | ~2x
  sum            | sum(col)               | sum(col * weight)                                   | ~1x
  quantile(p)    | quantile(p)(col)       | quantileTDigestWeighted(p)(col, toUInt32(weight))   | ~1.5x
  min/max        | unchanged              | unchanged                                           | 1x
  count_distinct | unchanged              | unchanged (cannot correct)                          | 1x

**Types**:
- Add sampleRateExpression to TraceSourceSchema + Mongoose model
- Add sampleWeightExpression to ChartConfig schema

**Query builder:**
- sampleWeightExpression is wrapped as greatest(toUInt64OrZero(toString(expr)), 1) so
spans without a SampleRate attribute default to weight 1 (unsampled
data produces identical results to the original queries).
- Rewrite aggFnExpr in renderChartConfig.ts when sampleWeightExpression
  is set, with safe default-to-1 wrapping

**Integration** (propagate sampleWeightExpression to all chart configs):
- ChartEditor/utils.ts, DBSearchPage, ServicesDashboardPage, sessions
- DBDashboardPage (raw SQL + builder branches)
- AlertPreviewChart
- SessionSubpanel
- ServiceDashboardEndpointPerformanceChart
- ServiceDashboardSlowestEventsTile (p95 query + events table)
- ServiceDashboardEndpointSidePanel (error rate + throughput)
- ServiceDashboardDbQuerySidePanel (total query time + throughput)
- External API v2 charts, AI controller, alerts (index + template)

**UI**:
- Add Sample Rate Expression field to trace source admin form

### Screenshots or video



| Before | After |
| :----- | :---- |
|        |       |

### How to test locally or on Vercel



1.
2.
3.

### References



- Linear Issue:
- Related PRs:
This commit is contained in:
Vineet Ahirkar 2026-03-30 12:52:18 -07:00 committed by GitHub
parent 853da16ad3
commit 941d045077
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 1046 additions and 2 deletions

View file

@ -0,0 +1,7 @@
---
'@hyperdx/common-utils': minor
'@hyperdx/api': minor
'@hyperdx/app': minor
---
feat: support sample-weighted aggregations for sampled trace data

View file

@ -81,6 +81,7 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.e2e_otel_traces
\`Links.TraceState\` Array(String) CODEC(ZSTD(1)),
\`Links.Attributes\` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)),
\`__hdx_materialized_rum.sessionId\` String MATERIALIZED ResourceAttributes['rum.sessionId'] CODEC(ZSTD(1)),
\`SampleRate\` UInt64 MATERIALIZED greatest(toUInt64OrZero(SpanAttributes['SampleRate']), 1) CODEC(T64, ZSTD(1)),
INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_rum_session_id __hdx_materialized_rum.sessionId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,

View file

@ -24,6 +24,7 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.otel_traces
`Links.TraceState` Array(String) CODEC(ZSTD(1)),
`Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)),
`__hdx_materialized_rum.sessionId` String MATERIALIZED ResourceAttributes['rum.sessionId'] CODEC(ZSTD(1)),
`SampleRate` UInt64 MATERIALIZED greatest(toUInt64OrZero(SpanAttributes['SampleRate']), 1) CODEC(T64, ZSTD(1)),
INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_rum_session_id __hdx_materialized_rum.sessionId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,

View file

@ -0,0 +1 @@
ALTER TABLE ${DATABASE}.otel_traces DROP COLUMN IF EXISTS `SampleRate`;

View file

@ -0,0 +1,4 @@
ALTER TABLE ${DATABASE}.otel_traces
ADD COLUMN IF NOT EXISTS `SampleRate` UInt64
MATERIALIZED greatest(toUInt64OrZero(SpanAttributes['SampleRate']), 1)
CODEC(T64, ZSTD(1));

View file

@ -9,6 +9,8 @@ import {
AILineTableResponse,
AssistantLineTableConfigSchema,
ChartConfigWithDateRange,
pickSampleWeightExpressionProps,
SourceKind,
} from '@hyperdx/common-utils/dist/types';
import type { LanguageModel } from 'ai';
import * as chrono from 'chrono-node';
@ -271,6 +273,7 @@ export function getChartConfigFromResolvedConfig(
connection: source.connection.toString(),
groupBy: resObject.groupBy,
timestampValueExpression: source.timestampValueExpression,
...pickSampleWeightExpressionProps(source),
dateRange: [dateRange[0].toString(), dateRange[1].toString()],
markdown: resObject.markdown,
granularity: 'auto',

View file

@ -163,6 +163,7 @@ export const TraceSource = Source.discriminator<ITraceSource>(
parentSpanIdExpression: String,
spanNameExpression: String,
spanKindExpression: String,
sampleRateExpression: String,
logSourceId: String,
sessionSourceId: String,
metricSourceId: String,

View file

@ -4,6 +4,7 @@ import { Granularity } from '@hyperdx/common-utils/dist/core/utils';
import {
ChartConfigWithOptDateRange,
DisplayType,
pickSampleWeightExpressionProps,
SourceKind,
} from '@hyperdx/common-utils/dist/types';
import opentelemetry, { SpanStatusCode } from '@opentelemetry/api';
@ -280,6 +281,7 @@ const buildChartConfigFromRequest = async (
],
where: '',
timestampValueExpression: source.timestampValueExpression,
...pickSampleWeightExpressionProps(source),
dateRange: [new Date(params.startTime), new Date(params.endTime)] as [
Date,
Date,

View file

@ -23,6 +23,8 @@ import {
import {
BuilderChartConfigWithOptDateRange,
DisplayType,
getSampleWeightExpression,
pickSampleWeightExpressionProps,
SourceKind,
} from '@hyperdx/common-utils/dist/types';
import * as fns from 'date-fns';
@ -108,6 +110,7 @@ export async function computeAliasWithClauses(
source.kind === SourceKind.Log || source.kind === SourceKind.Trace
? source.implicitColumnExpression
: undefined,
...pickSampleWeightExpressionProps(source),
timestampValueExpression: source.timestampValueExpression,
};
const query = await renderChartConfig(config, metadata, source.querySettings);
@ -454,6 +457,7 @@ const getChartConfigFromAlert = (
source.kind === SourceKind.Log || source.kind === SourceKind.Trace
? source.implicitColumnExpression
: undefined,
...pickSampleWeightExpressionProps(source),
timestampValueExpression: source.timestampValueExpression,
};
} else if (details.taskType === AlertTaskType.TILE) {
@ -475,6 +479,7 @@ const getChartConfigFromAlert = (
source.kind === SourceKind.Log || source.kind === SourceKind.Trace
? source.implicitColumnExpression
: undefined;
const sampleWeightExpression = getSampleWeightExpression(source);
const metricTables =
source.kind === SourceKind.Metric ? source.metricTables : undefined;
return {
@ -487,6 +492,7 @@ const getChartConfigFromAlert = (
granularity: `${windowSizeInMins} minute`,
groupBy: tile.config.groupBy,
implicitColumnExpression,
sampleWeightExpression,
metricTables,
select: tile.config.select,
timestampValueExpression: source.timestampValueExpression,

View file

@ -10,6 +10,7 @@ import {
AlertChannelType,
ChartConfigWithOptDateRange,
DisplayType,
pickSampleWeightExpressionProps,
SourceKind,
WebhookService,
zAlertChannelType,
@ -598,6 +599,7 @@ ${targetTemplate}`;
where: savedSearch.where,
whereLanguage: savedSearch.whereLanguage,
implicitColumnExpression: source.implicitColumnExpression,
...pickSampleWeightExpressionProps(source),
timestampValueExpression: source.timestampValueExpression,
orderBy: savedSearch.orderBy,
limit: {

View file

@ -33,6 +33,7 @@ import {
DashboardFilter,
DisplayType,
Filter,
getSampleWeightExpression,
isLogSource,
isTraceSource,
SearchCondition,
@ -235,6 +236,7 @@ const Tile = forwardRef(
...chart.config,
// Populate these two columns from the source to support Lucene-based filters
...pick(source, ['implicitColumnExpression', 'from']),
sampleWeightExpression: getSampleWeightExpression(source),
dateRange,
granularity,
filters,
@ -269,6 +271,7 @@ const Tile = forwardRef(
isLogSource(source) || isTraceSource(source)
? source.implicitColumnExpression
: undefined,
sampleWeightExpression: getSampleWeightExpression(source),
filters,
metricTables: isMetricSource ? source.metricTables : undefined,
});

View file

@ -36,6 +36,7 @@ import {
Filter,
isLogSource,
isTraceSource,
pickSampleWeightExpressionProps,
SourceKind,
TSource,
} from '@hyperdx/common-utils/dist/types';
@ -687,6 +688,7 @@ function useSearchedConfigToChartConfig(
whereLanguage: whereLanguage ?? 'sql',
timestampValueExpression: sourceObj.timestampValueExpression,
implicitColumnExpression: sourceObj.implicitColumnExpression,
...pickSampleWeightExpressionProps(sourceObj),
connection: sourceObj.connection,
displayType: DisplayType.Search,
orderBy: orderBy || defaultSearchConfig?.orderBy || defaultOrderBy,

View file

@ -19,6 +19,7 @@ import {
Filter,
isLogSource,
isTraceSource,
pickSampleWeightExpressionProps,
PresetDashboard,
SourceKind,
TTraceSource,
@ -34,6 +35,7 @@ function pickSourceConfigFields(source: TSource) {
...(isLogSource(source) || isTraceSource(source)
? { implicitColumnExpression: source.implicitColumnExpression }
: {}),
...pickSampleWeightExpressionProps(source),
};
}
import {

View file

@ -8,6 +8,7 @@ import { tcFromSource } from '@hyperdx/common-utils/dist/core/metadata';
import {
ChartConfigWithOptDateRange,
DateRange,
pickSampleWeightExpressionProps,
SearchCondition,
SearchConditionLanguage,
TSessionSource,
@ -188,6 +189,7 @@ function useSessionChartConfigs({
where,
timestampValueExpression: traceSource.timestampValueExpression,
implicitColumnExpression: traceSource.implicitColumnExpression,
...pickSampleWeightExpressionProps(traceSource),
connection: traceSource.connection,
orderBy: `${traceSource.timestampValueExpression} ASC`,
limit: {

View file

@ -3,6 +3,7 @@ import { aliasMapToWithClauses } from '@hyperdx/common-utils/dist/core/utils';
import {
AlertInterval,
Filter,
getSampleWeightExpression,
isLogSource,
isTraceSource,
SearchCondition,
@ -74,6 +75,7 @@ export const AlertPreviewChart = ({
isLogSource(source) || isTraceSource(source)
? source.implicitColumnExpression
: undefined,
sampleWeightExpression: getSampleWeightExpression(source),
groupBy,
with: aliasWith,
select: [

View file

@ -8,6 +8,7 @@ import {
BuilderSavedChartConfig,
ChartConfigWithDateRange,
DisplayType,
getSampleWeightExpression,
isLogSource,
isMetricSource,
isTraceSource,
@ -142,6 +143,7 @@ export function convertFormStateToChartConfig(
isLogSource(source) || isTraceSource(source)
? source.implicitColumnExpression
: undefined,
sampleWeightExpression: getSampleWeightExpression(source),
metricTables: isMetricSource(source) ? source.metricTables : undefined,
where: form.where ?? '',
select: isSelectEmpty

View file

@ -4,6 +4,7 @@ import { parseAsString, useQueryState } from 'nuqs';
import {
DisplayType,
type Filter,
pickSampleWeightExpressionProps,
SourceKind,
} from '@hyperdx/common-utils/dist/types';
import { Drawer, Grid, Text } from '@mantine/core';
@ -109,6 +110,7 @@ export default function ServiceDashboardDbQuerySidePanel({
'connection',
'from',
]),
...pickSampleWeightExpressionProps(source),
where: '',
whereLanguage: 'sql',
select: [
@ -146,6 +148,7 @@ export default function ServiceDashboardDbQuerySidePanel({
'connection',
'from',
]),
...pickSampleWeightExpressionProps(source),
where: '',
whereLanguage: 'sql',
select: [

View file

@ -1,5 +1,8 @@
import { pick } from 'lodash';
import { TTraceSource } from '@hyperdx/common-utils/dist/types';
import {
pickSampleWeightExpressionProps,
TTraceSource,
} from '@hyperdx/common-utils/dist/types';
import { MS_NUMBER_FORMAT } from '@/ChartUtils';
import { ChartBox } from '@/components/ChartBox';
@ -95,6 +98,7 @@ export default function ServiceDashboardEndpointPerformanceChart({
config={{
source: source.id,
...pick(source, ['timestampValueExpression', 'connection', 'from']),
...pickSampleWeightExpressionProps(source),
where: '',
whereLanguage: 'sql',
select: [

View file

@ -4,6 +4,7 @@ import { parseAsString, useQueryState } from 'nuqs';
import {
DisplayType,
type Filter,
pickSampleWeightExpressionProps,
SourceKind,
} from '@hyperdx/common-utils/dist/types';
import { Drawer, Grid, Text } from '@mantine/core';
@ -116,6 +117,7 @@ export default function ServiceDashboardEndpointSidePanel({
'connection',
'from',
]),
...pickSampleWeightExpressionProps(source),
where: '',
whereLanguage: 'sql',
select: [
@ -159,6 +161,7 @@ export default function ServiceDashboardEndpointSidePanel({
'connection',
'from',
]),
...pickSampleWeightExpressionProps(source),
where: '',
whereLanguage: 'sql',
select: [

View file

@ -1,6 +1,10 @@
import { pick } from 'lodash';
import { ClickHouseQueryError } from '@hyperdx/common-utils/dist/clickhouse';
import type { Filter, TTraceSource } from '@hyperdx/common-utils/dist/types';
import {
type Filter,
pickSampleWeightExpressionProps,
type TTraceSource,
} from '@hyperdx/common-utils/dist/types';
import { Box, Code, Group, Text } from '@mantine/core';
import { ChartBox } from '@/components/ChartBox';
@ -33,6 +37,7 @@ export default function SlowestEventsTile({
{
source: source.id,
...pick(source, ['timestampValueExpression', 'connection', 'from']),
...pickSampleWeightExpressionProps(source),
where: '',
whereLanguage: 'sql',
select: [
@ -117,6 +122,7 @@ export default function SlowestEventsTile({
'connection',
'from',
]),
...pickSampleWeightExpressionProps(source),
where: '',
whereLanguage: 'sql',
select: [

View file

@ -1545,6 +1545,21 @@ function TraceTableModelForm(props: TableModelProps) {
placeholder="SpanAttributes"
/>
</FormRow>
<FormRow
label={'Sample Rate Expression'}
helpText="Column or expression for upstream sampling weight (1/N). When set, aggregations (count, avg, sum, quantile) are corrected for sampling. Percentiles use quantileTDigestWeighted, which is an approximation -- exact values may differ slightly. Leave empty if spans are not sampled."
>
<SQLInlineEditorControlled
tableConnection={{
databaseName,
tableName,
connectionId,
}}
control={control}
name="sampleRateExpression"
placeholder="SampleRate"
/>
</FormRow>
<FormRow
label={'Span Events Expression'}
helpText="Expression to extract span events. Used to capture events associated with spans. Expected to be Nested ( Timestamp DateTime64(9), Name LowCardinality(String), Attributes Map(LowCardinality(String), String)"

View file

@ -5,6 +5,7 @@ import { chSql } from '@hyperdx/common-utils/dist/clickhouse';
import { renderChartConfig } from '@hyperdx/common-utils/dist/core/renderChartConfig';
import {
DateRange,
pickSampleWeightExpressionProps,
SearchCondition,
SearchConditionLanguage,
TSessionSource,
@ -162,6 +163,7 @@ export function useSessions(
}),
timestampValueExpression: traceSource.timestampValueExpression,
implicitColumnExpression: traceSource.implicitColumnExpression,
...pickSampleWeightExpressionProps(traceSource),
connection: traceSource.connection,
groupBy: 'serviceName, sessionId',
},

View file

@ -657,6 +657,40 @@ exports[`renderChartConfig k8s semantic convention migrations should handle metr
),toStartOfInterval(toDateTime(__hdx_time_bucket2), INTERVAL 1 minute) AS \`__hdx_time_bucket\` FROM Bucketed WHERE (__hdx_time_bucket2 >= fromUnixTimestamp64Milli(1739318400000) AND __hdx_time_bucket2 <= fromUnixTimestamp64Milli(1765670400000)) GROUP BY toStartOfInterval(toDateTime(__hdx_time_bucket2), INTERVAL 1 minute) AS \`__hdx_time_bucket\` ORDER BY toStartOfInterval(toDateTime(__hdx_time_bucket2), INTERVAL 1 minute) AS \`__hdx_time_bucket\` LIMIT 10 SETTINGS short_circuit_function_evaluation = 'force_enable', optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"
`;
exports[`renderChartConfig sample-weighted aggregations should handle complex sampleWeightExpression like SpanAttributes map access 1`] = `"SELECT sum(greatest(toUInt64OrZero(toString(SpanAttributes['SampleRate'])), 1)) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig sample-weighted aggregations should handle mixed weighted and passthrough aggregations 1`] = `
"SELECT sum(greatest(toUInt64OrZero(toString(SampleRate)), 1)) AS \\"weighted_count\\",sumIf(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL) / nullIf(sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL), 0) AS \\"weighted_avg\\",min(
toFloat64OrDefault(toString(Duration))
) AS \\"min_duration\\",count(DISTINCT TraceId) AS \\"unique_traces\\" FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"
`;
exports[`renderChartConfig sample-weighted aggregations should leave count_distinct unchanged with sampleWeightExpression 1`] = `"SELECT count(DISTINCT TraceId) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig sample-weighted aggregations should leave min/max unchanged with sampleWeightExpression 1`] = `
"SELECT min(
toFloat64OrDefault(toString(Duration))
),max(
toFloat64OrDefault(toString(Duration))
) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"
`;
exports[`renderChartConfig sample-weighted aggregations should rewrite avg to weighted average 1`] = `"SELECT sumIf(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL) / nullIf(sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL), 0) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig sample-weighted aggregations should rewrite avg with where condition 1`] = `"SELECT sumIf(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1), ServiceName = 'api' AND toFloat64OrDefault(toString(Duration)) IS NOT NULL) / nullIf(sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), ServiceName = 'api' AND toFloat64OrDefault(toString(Duration)) IS NOT NULL), 0) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) AND (ServiceName = 'api') SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig sample-weighted aggregations should rewrite count() to sum(greatest(...)) 1`] = `"SELECT sum(greatest(toUInt64OrZero(toString(SampleRate)), 1)) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig sample-weighted aggregations should rewrite countIf to sumIf(greatest(...), cond) 1`] = `"SELECT sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), StatusCode = 'Error') FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) AND (StatusCode = 'Error') SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig sample-weighted aggregations should rewrite quantile to quantileTDigestWeighted 1`] = `"SELECT quantileTDigestWeighted(0.99)(toFloat64OrDefault(toString(Duration)), toUInt32(greatest(toUInt64OrZero(toString(SampleRate)), 1))) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig sample-weighted aggregations should rewrite quantile with where condition 1`] = `"SELECT quantileTDigestWeightedIf(0.95)(toFloat64OrDefault(toString(Duration)), toUInt32(greatest(toUInt64OrZero(toString(SampleRate)), 1)), ServiceName = 'api' AND toFloat64OrDefault(toString(Duration)) IS NOT NULL) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) AND (ServiceName = 'api') SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig sample-weighted aggregations should rewrite sum to weighted sum 1`] = `"SELECT sum(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1)) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig sample-weighted aggregations should rewrite sum with where condition 1`] = `"SELECT sumIf(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1), ServiceName = 'api' AND toFloat64OrDefault(toString(Duration)) IS NOT NULL) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) AND (ServiceName = 'api') SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`;
exports[`renderChartConfig should generate sql for a single gauge metric 1`] = `
"WITH Source AS (
SELECT

View file

@ -1911,4 +1911,374 @@ describe('renderChartConfig', () => {
);
});
});
describe('sample-weighted aggregations', () => {
const baseSampledConfig: ChartConfigWithOptDateRange = {
displayType: DisplayType.Table,
connection: 'test-connection',
from: {
databaseName: 'default',
tableName: 'otel_traces',
},
select: [],
where: '',
whereLanguage: 'sql',
timestampValueExpression: 'Timestamp',
sampleWeightExpression: 'SampleRate',
dateRange: [new Date('2025-02-12'), new Date('2025-02-14')],
};
it('should rewrite count() to sum(greatest(...))', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain(
'greatest(toUInt64OrZero(toString(SampleRate)), 1)',
);
expect(actual).toContain('sum(');
expect(actual).not.toContain('count()');
expect(actual).toMatchSnapshot();
});
it('should rewrite countIf to sumIf(greatest(...), cond)', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: "StatusCode = 'Error'",
aggConditionLanguage: 'sql',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain(
'sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1)',
);
expect(actual).not.toContain('countIf');
expect(actual).toMatchSnapshot();
});
it('should rewrite avg to weighted average', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: '',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain(
'* greatest(toUInt64OrZero(toString(SampleRate)), 1)',
);
expect(actual).toContain(
'/ nullIf(sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL), 0)',
);
expect(actual).not.toContain('avg(');
expect(actual).toMatchSnapshot();
});
it('should rewrite sum to weighted sum', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'sum',
valueExpression: 'Duration',
aggCondition: '',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain(
'* greatest(toUInt64OrZero(toString(SampleRate)), 1)',
);
expect(actual).toMatchSnapshot();
});
it('should rewrite quantile to quantileTDigestWeighted', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'quantile',
valueExpression: 'Duration',
aggCondition: '',
level: 0.99,
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('quantileTDigestWeighted(0.99)');
expect(actual).toContain(
'toUInt32(greatest(toUInt64OrZero(toString(SampleRate)), 1))',
);
expect(actual).not.toContain('quantile(0.99)');
expect(actual).toMatchSnapshot();
});
it('should leave min/max unchanged with sampleWeightExpression', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'min',
valueExpression: 'Duration',
aggCondition: '',
},
{
aggFn: 'max',
valueExpression: 'Duration',
aggCondition: '',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('min(');
expect(actual).toContain('max(');
expect(actual).not.toContain('SampleRate');
expect(actual).toMatchSnapshot();
});
it('should leave count_distinct unchanged with sampleWeightExpression', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'count_distinct',
valueExpression: 'TraceId',
aggCondition: '',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('count(DISTINCT');
expect(actual).not.toContain('SampleRate');
expect(actual).toMatchSnapshot();
});
it('should handle complex sampleWeightExpression like SpanAttributes map access', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
sampleWeightExpression: "SpanAttributes['SampleRate']",
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain(
"greatest(toUInt64OrZero(toString(SpanAttributes['SampleRate'])), 1)",
);
expect(actual).toContain('sum(');
expect(actual).not.toContain('count()');
expect(actual).toMatchSnapshot();
});
it('should rewrite avg with where condition', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: "ServiceName = 'api'",
aggConditionLanguage: 'sql',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('sumIf(');
expect(actual).toContain("ServiceName = 'api'");
expect(actual).not.toContain('avg(');
expect(actual).toMatchSnapshot();
});
it('should rewrite sum with where condition', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'sum',
valueExpression: 'Duration',
aggCondition: "ServiceName = 'api'",
aggConditionLanguage: 'sql',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('sumIf(');
expect(actual).toContain("ServiceName = 'api'");
expect(actual).toMatchSnapshot();
});
it('should rewrite quantile with where condition', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'quantile',
valueExpression: 'Duration',
aggCondition: "ServiceName = 'api'",
aggConditionLanguage: 'sql',
level: 0.95,
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('quantileTDigestWeightedIf(0.95)');
expect(actual).toContain("ServiceName = 'api'");
expect(actual).not.toContain('quantile(0.95)');
expect(actual).toMatchSnapshot();
});
it('should handle mixed weighted and passthrough aggregations', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'weighted_count',
},
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: '',
alias: 'weighted_avg',
},
{
aggFn: 'min',
valueExpression: 'Duration',
aggCondition: '',
alias: 'min_duration',
},
{
aggFn: 'count_distinct',
valueExpression: 'TraceId',
aggCondition: '',
alias: 'unique_traces',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('sum(');
expect(actual).toContain('min(');
expect(actual).toContain('count(DISTINCT');
expect(actual).not.toContain('count()');
expect(actual).not.toContain('avg(');
expect(actual).toMatchSnapshot();
});
it('should not rewrite aggregations without sampleWeightExpression', async () => {
const config: ChartConfigWithOptDateRange = {
...baseSampledConfig,
sampleWeightExpression: undefined,
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
},
],
};
const generatedSql = await renderChartConfig(
config,
mockMetadata,
querySettings,
);
const actual = parameterizedQueryToSql(generatedSql);
expect(actual).toContain('count()');
expect(actual).not.toContain('SampleRate');
});
});
});

View file

@ -0,0 +1,465 @@
import type { ClickHouseClient } from '@clickhouse/client';
import { createClient } from '@clickhouse/client';
import { parameterizedQueryToSql } from '@/clickhouse';
import { ClickhouseClient as HdxClickhouseClient } from '@/clickhouse/node';
import { Metadata, MetadataCache } from '@/core/metadata';
import {
ChartConfigWithOptDateRange,
DisplayType,
QuerySettings,
} from '@/types';
import { renderChartConfig } from '../core/renderChartConfig';
describe('sample-weighted aggregations (integration)', () => {
let client: ClickHouseClient;
let hdxClient: HdxClickhouseClient;
let metadata: Metadata;
const DB = 'default';
const MAIN_TABLE = 'test_sample_weighted_main';
const EDGE_TABLE = 'test_sample_weighted_edge';
const querySettings: QuerySettings = [
{ setting: 'optimize_read_in_order', value: '0' },
{ setting: 'cast_keep_nullable', value: '1' },
];
const baseConfig: ChartConfigWithOptDateRange = {
displayType: DisplayType.Table,
connection: 'test-connection',
from: { databaseName: DB, tableName: MAIN_TABLE },
select: [],
where: '',
whereLanguage: 'sql',
timestampValueExpression: 'Timestamp',
sampleWeightExpression: 'SampleRate',
dateRange: [new Date('2025-01-01'), new Date('2025-12-31')],
};
async function executeChartConfig(
config: ChartConfigWithOptDateRange,
): Promise<Record<string, string>> {
const generatedSql = await renderChartConfig(
config,
metadata,
querySettings,
);
const sql = parameterizedQueryToSql(generatedSql);
const result = await client.query({ query: sql, format: 'JSONEachRow' });
const rows = (await result.json()) as Record<string, string>[];
expect(rows.length).toBeGreaterThanOrEqual(1);
return rows[0]!;
}
async function executeChartConfigAllRows(
config: ChartConfigWithOptDateRange,
): Promise<Record<string, string>[]> {
const generatedSql = await renderChartConfig(
config,
metadata,
querySettings,
);
const sql = parameterizedQueryToSql(generatedSql);
const result = await client.query({ query: sql, format: 'JSONEachRow' });
return (await result.json()) as Record<string, string>[];
}
beforeAll(async () => {
const host = process.env.CLICKHOUSE_HOST || 'http://localhost:8123';
const username = process.env.CLICKHOUSE_USER || 'default';
const password = process.env.CLICKHOUSE_PASSWORD || '';
client = createClient({ url: host, username, password });
hdxClient = new HdxClickhouseClient({ host, username, password });
await client.command({
query: `
CREATE OR REPLACE TABLE ${DB}.${MAIN_TABLE} (
Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)),
Duration Float64,
ServiceName LowCardinality(String),
SampleRate UInt64
)
ENGINE = MergeTree()
ORDER BY (Timestamp)
`,
});
await client.command({
query: `
INSERT INTO ${DB}.${MAIN_TABLE}
(Timestamp, Duration, ServiceName, SampleRate)
VALUES
('2025-06-01 00:00:01', 100, 'api', 1),
('2025-06-01 00:00:02', 200, 'api', 5),
('2025-06-01 00:00:03', 150, 'api', 10),
('2025-06-01 00:00:04', 250, 'api', 1),
('2025-06-01 00:00:05', 80, 'api', 1),
('2025-06-01 00:00:06', 120, 'api', 5),
('2025-06-01 00:00:07', 300, 'web', 1),
('2025-06-01 00:00:08', 50, 'web', 5),
('2025-06-01 00:00:09', 175, 'web', 10),
('2025-06-01 00:00:10', 400, 'web', 1)
`,
});
await client.command({
query: `
CREATE OR REPLACE TABLE ${DB}.${EDGE_TABLE} (
Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)),
Duration Float64,
SampleRate UInt64,
ServiceName LowCardinality(String),
SpanAttributes Map(LowCardinality(String), String)
)
ENGINE = MergeTree()
ORDER BY (Timestamp)
`,
});
await client.command({
query: `
INSERT INTO ${DB}.${EDGE_TABLE}
(Timestamp, Duration, SampleRate, ServiceName, SpanAttributes)
VALUES
('2025-06-01 00:00:01', 100, 1, 'api', map('SampleRate', '1')),
('2025-06-01 00:00:02', 200, 1, 'api', map('SampleRate', '1')),
('2025-06-01 00:00:03', 300, 1, 'web', map('SampleRate', '1')),
('2025-06-01 00:00:04', 400, 1, 'web', map('SampleRate', 'abc')),
('2025-06-01 00:00:05', 50, 1000000, 'api', map('SampleRate', '1000000'))
`,
});
});
beforeEach(() => {
metadata = new Metadata(hdxClient, new MetadataCache());
});
afterAll(async () => {
await client.command({
query: `DROP TABLE IF EXISTS ${DB}.${EDGE_TABLE}`,
});
await client.command({
query: `DROP TABLE IF EXISTS ${DB}.${MAIN_TABLE}`,
});
await client.close();
});
it('weighted avg when no rows match aggCondition: NULL, not division error', async () => {
const rows = await executeChartConfigAllRows({
...baseConfig,
select: [
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: "ServiceName = 'nonexistent'",
aggConditionLanguage: 'sql',
alias: 'weighted_avg',
},
],
});
expect(rows).toHaveLength(1);
const raw = rows[0]!['weighted_avg'];
expect(
raw === undefined ||
raw === null ||
raw === '' ||
String(raw).toLowerCase() === 'null',
).toBe(true);
});
it('weighted sum when no rows match aggCondition: should return 0', async () => {
const result = await executeChartConfig({
...baseConfig,
select: [
{
aggFn: 'sum',
valueExpression: 'Duration',
aggCondition: "ServiceName = 'nonexistent'",
aggConditionLanguage: 'sql',
alias: 'weighted_sum',
},
],
});
expect(Number(result['weighted_sum'])).toBe(0);
});
it('weighted count when no rows match aggCondition: should return 0', async () => {
const result = await executeChartConfig({
...baseConfig,
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: "ServiceName = 'nonexistent'",
aggConditionLanguage: 'sql',
alias: 'weighted_count',
},
],
});
expect(Number(result['weighted_count'])).toBe(0);
});
it('groupBy ServiceName: weighted count per group', async () => {
const rows = await executeChartConfigAllRows({
...baseConfig,
groupBy: 'ServiceName',
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'weighted_count',
},
],
});
const byService = Object.fromEntries(
rows.map(r => [r['ServiceName'] as string, Number(r['weighted_count'])]),
);
expect(byService['api']).toBe(23);
expect(byService['web']).toBe(17);
});
it('groupBy ServiceName: weighted avg(Duration) per group', async () => {
const rows = await executeChartConfigAllRows({
...baseConfig,
groupBy: 'ServiceName',
select: [
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: '',
alias: 'weighted_avg',
},
],
});
const byService = Object.fromEntries(
rows.map(r => [r['ServiceName'] as string, Number(r['weighted_avg'])]),
);
expect(byService['api']).toBeCloseTo(3530 / 23, 2);
expect(byService['web']).toBeCloseTo(2700 / 17, 2);
});
it('groupBy ServiceName: weighted sum(Duration) per group', async () => {
const rows = await executeChartConfigAllRows({
...baseConfig,
groupBy: 'ServiceName',
select: [
{
aggFn: 'sum',
valueExpression: 'Duration',
aggCondition: '',
alias: 'weighted_sum',
},
],
});
const byService = Object.fromEntries(
rows.map(r => [r['ServiceName'] as string, Number(r['weighted_sum'])]),
);
expect(byService['api']).toBe(3530);
expect(byService['web']).toBe(2700);
});
it('time-series with granularity: weighted count per time bucket', async () => {
const rows = await executeChartConfigAllRows({
...baseConfig,
displayType: DisplayType.Line,
granularity: '1 minute',
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'weighted_count',
},
],
});
expect(rows.length).toBeGreaterThanOrEqual(1);
const totalCount = rows.reduce(
(acc, r) => acc + Number(r['weighted_count']),
0,
);
expect(totalCount).toBe(40);
});
it('time-series with groupBy: weighted count per service per time bucket', async () => {
const rows = await executeChartConfigAllRows({
...baseConfig,
displayType: DisplayType.Line,
granularity: '1 minute',
groupBy: 'ServiceName',
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'weighted_count',
},
],
});
const byService = new Map<string, number>();
for (const r of rows) {
const svc = r['ServiceName'] as string;
byService.set(
svc,
(byService.get(svc) ?? 0) + Number(r['weighted_count']),
);
}
expect(byService.get('api')).toBe(23);
expect(byService.get('web')).toBe(17);
});
describe('additional edge cases', () => {
const edgeConfig: ChartConfigWithOptDateRange = {
displayType: DisplayType.Table,
connection: 'test-connection',
from: { databaseName: DB, tableName: EDGE_TABLE },
select: [],
where: '',
whereLanguage: 'sql',
timestampValueExpression: 'Timestamp',
sampleWeightExpression: 'SampleRate',
dateRange: [new Date('2025-01-01'), new Date('2025-12-31')],
};
it('all SampleRate=1: weighted results should equal unweighted results', async () => {
const filterConfig = {
...edgeConfig,
where: 'SampleRate = 1',
whereLanguage: 'sql' as const,
};
const weightedResult = await executeChartConfig({
...filterConfig,
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'wcount',
},
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: '',
alias: 'wavg',
},
{
aggFn: 'sum',
valueExpression: 'Duration',
aggCondition: '',
alias: 'wsum',
},
],
});
const unweightedResult = await executeChartConfig({
...filterConfig,
sampleWeightExpression: undefined,
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'count',
},
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: '',
alias: 'avg',
},
{
aggFn: 'sum',
valueExpression: 'Duration',
aggCondition: '',
alias: 'sum',
},
],
});
expect(Number(weightedResult['wcount'])).toBe(
Number(unweightedResult['count']),
);
expect(Number(weightedResult['wavg'])).toBeCloseTo(
Number(unweightedResult['avg']),
5,
);
expect(Number(weightedResult['wsum'])).toBeCloseTo(
Number(unweightedResult['sum']),
5,
);
});
it('non-numeric SampleRate in SpanAttributes: should clamp to weight 1', async () => {
const result = await executeChartConfig({
...edgeConfig,
sampleWeightExpression: "SpanAttributes['SampleRate']",
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: "ServiceName = 'web'",
aggConditionLanguage: 'sql',
alias: 'wcount',
},
],
});
expect(Number(result['wcount'])).toBe(2);
});
it('very large SampleRate: should handle without overflow', async () => {
const result = await executeChartConfig({
...edgeConfig,
select: [
{
aggFn: 'count',
valueExpression: '',
aggCondition: '',
alias: 'wcount',
},
{
aggFn: 'sum',
valueExpression: 'Duration',
aggCondition: '',
alias: 'wsum',
},
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: '',
alias: 'wavg',
},
],
});
expect(Number(result['wcount'])).toBe(1000004);
expect(Number(result['wsum'])).toBe(50001000);
expect(Number(result['wavg'])).toBeCloseTo(50001000 / 1000004, 2);
});
it('very large SampleRate: weighted avg dominated by high-weight row', async () => {
const result = await executeChartConfig({
...edgeConfig,
select: [
{
aggFn: 'avg',
valueExpression: 'Duration',
aggCondition: '',
alias: 'wavg',
},
],
});
const value = Number(result['wavg']);
expect(value).toBeGreaterThan(49);
expect(value).toBeLessThan(51);
});
});
});

View file

@ -325,11 +325,13 @@ const aggFnExpr = ({
expr,
level,
where,
sampleWeightExpression,
}: {
fn: AggregateFunction | AggregateFunctionWithCombinators;
expr?: string;
level?: number;
where?: string;
sampleWeightExpression?: string;
}) => {
const isAny = fn === 'any';
const isNone = fn === 'none';
@ -371,6 +373,79 @@ const aggFnExpr = ({
})`;
}
// Sample-weighted aggregations: when sampleWeightExpression is set,
// each row carries a weight (defaults to 1 for unsampled spans).
// Corrected formulas account for upstream sampling (1-in-N).
// The greatest(..., 1) ensures unsampled rows (missing/empty/zero)
// are counted at weight 1 rather than dropped.
if (
sampleWeightExpression &&
!fn.endsWith('Merge') &&
!fn.endsWith('State')
) {
const sampleWeightExpr = `greatest(toUInt64OrZero(toString(${sampleWeightExpression})), 1)`;
const w = { UNSAFE_RAW_SQL: sampleWeightExpr };
if (fn === 'count') {
return isWhereUsed
? chSql`sumIf(${w}, ${{ UNSAFE_RAW_SQL: where }})`
: chSql`sum(${w})`;
}
if (fn === 'none') {
return chSql`${{ UNSAFE_RAW_SQL: expr ?? '' }}`;
}
if (expr != null) {
if (fn === 'count_distinct' || fn === 'min' || fn === 'max') {
// These cannot be corrected for sampling; pass through unchanged
if (fn === 'count_distinct') {
return chSql`count${isWhereUsed ? 'If' : ''}(DISTINCT ${{
UNSAFE_RAW_SQL: expr,
}}${isWhereUsed ? chSql`, ${{ UNSAFE_RAW_SQL: where }}` : ''})`;
}
return chSql`${{ UNSAFE_RAW_SQL: fn }}${isWhereUsed ? 'If' : ''}(
${unsafeExpr}${isWhereUsed ? chSql`, ${{ UNSAFE_RAW_SQL: whereWithExtraNullCheck }}` : ''}
)`;
}
if (fn === 'avg') {
const weightedVal = {
UNSAFE_RAW_SQL: `${unsafeExpr.UNSAFE_RAW_SQL} * ${sampleWeightExpr}`,
};
const nullCheck = `${unsafeExpr.UNSAFE_RAW_SQL} IS NOT NULL`;
if (isWhereUsed) {
const cond = { UNSAFE_RAW_SQL: `${where} AND ${nullCheck}` };
return chSql`sumIf(${weightedVal}, ${cond}) / nullIf(sumIf(${w}, ${cond}), 0)`;
}
return chSql`sumIf(${weightedVal}, ${{ UNSAFE_RAW_SQL: nullCheck }}) / nullIf(sumIf(${w}, ${{ UNSAFE_RAW_SQL: nullCheck }}), 0)`;
}
if (fn === 'sum') {
const weightedVal = {
UNSAFE_RAW_SQL: `${unsafeExpr.UNSAFE_RAW_SQL} * ${sampleWeightExpr}`,
};
if (isWhereUsed) {
return chSql`sumIf(${weightedVal}, ${{ UNSAFE_RAW_SQL: whereWithExtraNullCheck }})`;
}
return chSql`sum(${weightedVal})`;
}
if (level != null && fn.startsWith('quantile')) {
const levelStr = Number.isFinite(level) ? `${level}` : '0';
const weightArg = {
UNSAFE_RAW_SQL: `toUInt32(${sampleWeightExpr})`,
};
if (isWhereUsed) {
return chSql`quantileTDigestWeightedIf(${{ UNSAFE_RAW_SQL: levelStr }})(${unsafeExpr}, ${weightArg}, ${{ UNSAFE_RAW_SQL: whereWithExtraNullCheck }})`;
}
return chSql`quantileTDigestWeighted(${{ UNSAFE_RAW_SQL: levelStr }})(${unsafeExpr}, ${weightArg})`;
}
// For any other fn (last_value, any, etc.), fall through to default
}
}
if (fn === 'count') {
if (isWhereUsed) {
return chSql`${fn}If(${{ UNSAFE_RAW_SQL: where }})`;
@ -484,12 +559,14 @@ async function renderSelectList(
// @ts-expect-error (TS doesn't know that we've already checked for quantile)
level: select.level,
where: whereClause.sql,
sampleWeightExpression: chartConfig.sampleWeightExpression,
});
} else {
expr = aggFnExpr({
fn: select.aggFn,
expr: select.valueExpression,
where: whereClause.sql,
sampleWeightExpression: chartConfig.sampleWeightExpression,
});
}

View file

@ -520,6 +520,7 @@ const SharedChartDisplaySettingsSchema = z.object({
export const _ChartConfigSchema = SharedChartDisplaySettingsSchema.extend({
timestampValueExpression: z.string(),
implicitColumnExpression: z.string().optional(),
sampleWeightExpression: z.string().optional(),
markdown: z.string().optional(),
filtersLogicalOperator: z.enum(['AND', 'OR']).optional(),
filters: z.array(FilterSchema).optional(),
@ -929,6 +930,7 @@ export const TraceSourceSchema = BaseSourceSchema.extend({
spanKindExpression: z.string().min(1, 'Span Kind Expression is required'),
// Optional fields for traces
sampleRateExpression: z.string().optional(),
logSourceId: z.string().optional().nullable(),
sessionSourceId: z.string().optional(),
metricSourceId: z.string().optional(),
@ -1018,6 +1020,28 @@ export function isMetricSource(source: TSource): source is TMetricSource {
return source.kind === SourceKind.Metric;
}
type SourceLikeForSampleWeight = {
kind: SourceKind;
sampleRateExpression?: string | null;
};
/** Trace sample rate expression for chart sampleWeightExpression when set. */
export function getSampleWeightExpression(
source: SourceLikeForSampleWeight,
): string | undefined {
return source.kind === SourceKind.Trace && source.sampleRateExpression
? source.sampleRateExpression
: undefined;
}
/** For object spread: { ...pickSampleWeightExpressionProps(source) } */
export function pickSampleWeightExpressionProps(
source: SourceLikeForSampleWeight,
): { sampleWeightExpression: string } | undefined {
const w = getSampleWeightExpression(source);
return w ? { sampleWeightExpression: w } : undefined;
}
export const AssistantLineTableConfigSchema = z.object({
displayType: z.enum([DisplayType.Line, DisplayType.Table]),
markdown: z.string().optional(),