feat: implement histogram linear interpolation quantile function (#253)

This commit is contained in:
Warren 2024-01-23 23:15:59 -08:00 committed by GitHub
parent 52d9b7160b
commit 82640b0af2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 1360 additions and 339 deletions

View file

@ -0,0 +1,6 @@
---
'@hyperdx/api': patch
'@hyperdx/app': patch
---
feat: implement histogram linear interpolation quantile function

View file

@ -871,325 +871,6 @@ Array [
`);
});
describe('fetches multi series metric table chart correctly', () => {
const now = new Date('2022-01-05').getTime();
const runId = Math.random().toString(); // dedup watch mode runs
const teamId = `test`;
beforeEach(async () => {
// Rate: 8, 1, 8, 25
await clickhouse.bulkInsertTeamMetricStream(
buildMetricSeries({
name: 'test.users',
tags: { host: 'test1', runId, ip: '127.0.0.1' },
data_type: clickhouse.MetricsDataType.Sum,
is_monotonic: true,
is_delta: true,
unit: 'Users',
points: [
{ value: 0, timestamp: now - ms('1m') }, // 0
{ value: 1, timestamp: now },
{ value: 8, timestamp: now + ms('4m') }, // 8
{ value: 8, timestamp: now + ms('6m') },
{ value: 9, timestamp: now + ms('9m') }, // 9
{ value: 15, timestamp: now + ms('11m') },
{ value: 17, timestamp: now + ms('14m') }, // 17
{ value: 32, timestamp: now + ms('16m') },
{ value: 42, timestamp: now + ms('19m') }, // 42
],
}),
);
// Rate: 11, 78, 5805, 78729
// Sum: 12, 79, 5813, 78754
await clickhouse.bulkInsertTeamMetricStream(
buildMetricSeries({
name: 'test.users',
tags: { host: 'test2', runId, ip: '127.0.0.2' },
data_type: clickhouse.MetricsDataType.Sum,
is_monotonic: true,
is_delta: true,
unit: 'Users',
points: [
{ value: 3, timestamp: now - ms('1m') }, // 3
{ value: 3, timestamp: now },
{ value: 14, timestamp: now + ms('4m') }, // 14
{ value: 15, timestamp: now + ms('6m') },
{ value: 92, timestamp: now + ms('9m') }, // 92
{ value: 653, timestamp: now + ms('11m') },
{ value: 5897, timestamp: now + ms('14m') }, // 5897
{ value: 9323, timestamp: now + ms('16m') },
{ value: 84626, timestamp: now + ms('19m') }, // 84626
],
}),
);
await clickhouse.bulkInsertTeamMetricStream(
buildMetricSeries({
name: 'test.cpu',
tags: { host: 'test1', runId, ip: '127.0.0.1' },
data_type: clickhouse.MetricsDataType.Gauge,
is_monotonic: false,
is_delta: false,
unit: 'Percent',
points: [
{ value: 50, timestamp: now },
{ value: 25, timestamp: now + ms('1m') },
{ value: 12.5, timestamp: now + ms('2m') },
{ value: 6.25, timestamp: now + ms('3m') }, // Last 5min
{ value: 100, timestamp: now + ms('6m') },
{ value: 75, timestamp: now + ms('7m') },
{ value: 10, timestamp: now + ms('8m') },
{ value: 80, timestamp: now + ms('9m') }, // Last 5min
],
}),
);
await clickhouse.bulkInsertTeamMetricStream(
buildMetricSeries({
name: 'test.cpu',
tags: { host: 'test2', runId, ip: '127.0.0.2' },
data_type: clickhouse.MetricsDataType.Gauge,
is_monotonic: false,
is_delta: false,
unit: 'Percent',
points: [
{ value: 1, timestamp: now },
{ value: 2, timestamp: now + ms('1m') },
{ value: 3, timestamp: now + ms('2m') },
{ value: 4, timestamp: now + ms('3m') }, // Last 5min
{ value: 5, timestamp: now + ms('6m') },
{ value: 6, timestamp: now + ms('7m') },
{ value: 5, timestamp: now + ms('8m') },
{ value: 4, timestamp: now + ms('9m') }, // Last 5min
],
}),
);
mockSpyMetricPropertyTypeMappingsModel({
runId: 'string',
host: 'string',
});
});
it('returns multiple group by labels correctly', async () => {
const data = (
await clickhouse.getMultiSeriesChart({
series: [
{
type: 'time',
table: 'metrics',
aggFn: clickhouse.AggFn.LastValue,
field: 'test.cpu',
where: `runId:${runId}`,
groupBy: ['host', 'ip'],
metricDataType: clickhouse.MetricsDataType.Gauge,
},
],
tableVersion: undefined,
teamId,
startTime: now,
endTime: now + ms('20m'),
granularity: undefined,
maxNumGroups: 20,
seriesReturnType: clickhouse.SeriesReturnType.Column,
})
).data.map(d => {
return _.pick(d, ['group', 'series_0.data', 'ts_bucket']);
});
expect(data).toMatchInlineSnapshot(`
Array [
Object {
"group": Array [
"test1",
"127.0.0.1",
],
"series_0.data": 80,
"ts_bucket": "0",
},
Object {
"group": Array [
"test2",
"127.0.0.2",
],
"series_0.data": 4,
"ts_bucket": "0",
},
]
`);
});
it('gauge (last value)', async () => {
const data = (
await clickhouse.getMultiSeriesChart({
series: [
{
type: 'time',
table: 'metrics',
aggFn: clickhouse.AggFn.LastValue,
field: 'test.cpu',
where: `runId:${runId}`,
groupBy: ['host'],
metricDataType: clickhouse.MetricsDataType.Gauge,
},
],
tableVersion: undefined,
teamId,
startTime: now,
endTime: now + ms('20m'),
granularity: undefined,
maxNumGroups: 20,
seriesReturnType: clickhouse.SeriesReturnType.Column,
})
).data.map(d => {
return _.pick(d, ['group', 'series_0.data', 'ts_bucket']);
});
expect(data).toMatchInlineSnapshot(`
Array [
Object {
"group": Array [
"test1",
],
"series_0.data": 80,
"ts_bucket": "0",
},
Object {
"group": Array [
"test2",
],
"series_0.data": 4,
"ts_bucket": "0",
},
]
`);
});
it('sum (rate) + gauge (avg)', async () => {
const data = (
await clickhouse.getMultiSeriesChart({
series: [
{
type: 'table',
table: 'metrics',
aggFn: clickhouse.AggFn.SumRate,
field: 'test.users',
where: `runId:${runId}`,
groupBy: [],
metricDataType: clickhouse.MetricsDataType.Sum,
},
{
type: 'time',
table: 'metrics',
aggFn: clickhouse.AggFn.Avg,
field: 'test.cpu',
where: `runId:${runId}`,
groupBy: [],
metricDataType: clickhouse.MetricsDataType.Gauge,
},
],
tableVersion: undefined,
teamId,
startTime: now,
endTime: now + ms('20m'),
granularity: undefined,
maxNumGroups: 20,
seriesReturnType: clickhouse.SeriesReturnType.Column,
})
).data.map(d => {
return _.pick(d, [
'group',
'series_0.data',
'series_1.data',
'ts_bucket',
]);
});
expect(data).toMatchInlineSnapshot(`
Array [
Object {
"group": Array [],
"series_0.data": 84665,
"series_1.data": 42,
"ts_bucket": "0",
},
]
`);
});
it('filters using postGroupWhere properly', async () => {
const queryConfig: Parameters<typeof clickhouse.getMultiSeriesChart>[0] =
{
series: [
{
type: 'time',
table: 'metrics',
aggFn: clickhouse.AggFn.LastValue,
field: 'test.cpu',
where: `runId:${runId}`,
groupBy: ['host'],
metricDataType: clickhouse.MetricsDataType.Gauge,
},
],
tableVersion: undefined,
teamId,
startTime: now,
endTime: now + ms('20m'),
granularity: undefined,
maxNumGroups: 20,
seriesReturnType: clickhouse.SeriesReturnType.Column,
postGroupWhere: 'series_0:4',
};
// Exclude postGroupWhere to assert we get the test data we expect at first
const data = (
await clickhouse.getMultiSeriesChart(
_.omit(queryConfig, ['postGroupWhere']),
)
).data.map(d => {
return _.pick(d, ['group', 'series_0.data', 'ts_bucket']);
});
expect(data).toMatchInlineSnapshot(`
Array [
Object {
"group": Array [
"test1",
],
"series_0.data": 80,
"ts_bucket": "0",
},
Object {
"group": Array [
"test2",
],
"series_0.data": 4,
"ts_bucket": "0",
},
]
`);
const filteredData = (
await clickhouse.getMultiSeriesChart(queryConfig)
).data.map(d => {
return _.pick(d, ['group', 'series_0.data', 'ts_bucket']);
});
expect(filteredData).toMatchInlineSnapshot(`
Array [
Object {
"group": Array [
"test2",
],
"series_0.data": 4,
"ts_bucket": "0",
},
]
`);
});
});
it('limits groups and sorts multi series charts properly', async () => {
const now = new Date('2022-01-05').getTime();
const runId = Math.random().toString(); // dedup watch mode runs

File diff suppressed because it is too large Load diff

View file

@ -898,7 +898,8 @@ export const buildMetricSeriesQuery = async ({
const hasGroupBy = groupByColumnNames.length > 0;
const shouldModifyStartTime = isRate;
const shouldModifyStartTime =
isRate || dataType === MetricsDataType.Histogram;
// If it's a rate function, then we'll need to look 1 window back to calculate
// the initial rate value.
@ -956,8 +957,12 @@ export const buildMetricSeriesQuery = async ({
: '0.99'
})(${isRate ? 'rate' : 'value'}) as data`,
);
} else if (dataType === MetricsDataType.Histogram) {
if (!['p50', 'p90', 'p95', 'p99'].includes(aggFn)) {
throw new Error(`Unsupported aggFn for Histogram: ${aggFn}`);
}
} else {
logger.error(`Unsupported data type: ${dataType}`);
throw new Error(`Unsupported data type: ${dataType}`);
}
const startTimeUnixTs = Math.floor(startTime / 1000);
@ -1019,8 +1024,125 @@ export const buildMetricSeriesQuery = async ({
],
);
const query = SqlString.format(
const quantile =
aggFn === AggFn.P50
? '0.5'
: aggFn === AggFn.P90
? '0.90'
: aggFn === AggFn.P95
? '0.95'
: '0.99';
// TODO:
// 1. handle -Inf
// 2. handle counter reset (https://prometheus.io/docs/prometheus/latest/querying/functions/#resets)
// - Any increase/decrease in bucket resolution, etc (NOT IMPLEMENTED)
const histogramQuery = SqlString.format(
`
WITH source AS (
SELECT
?,
timestamp,
name,
toFloat64(sumIf(rate, rate > 0)) AS value,
toFloat64OrDefault(_string_attributes['le'], inf) AS bucket
FROM (?)
WHERE mapContains(_string_attributes, 'le')
GROUP BY name, group, bucket, timestamp
), points AS (
SELECT
timestamp,
name,
group,
arraySort((x) -> x[2],
groupArray([
value,
bucket
])
) AS point,
length(point) AS n
FROM source
GROUP BY timestamp, name, group
), metrics AS (
SELECT
timestamp,
name,
group,
point[n][1] AS total,
toFloat64(?) * total AS rank,
arrayFirstIndex(x -> x[1] > rank, point) AS upper_idx,
point[upper_idx][1] AS upper_count,
point[upper_idx][2] AS upper_bound,
if (
upper_idx = 1,
if (
point[upper_idx][2] > 0,
0,
inf
),
point[upper_idx - 1][2]
) AS lower_bound,
if (
lower_bound = 0,
0,
point[upper_idx - 1][1]
) AS lower_count,
if (
upper_bound = inf,
point[upper_idx - 1][2],
if (
lower_bound = inf,
point[1][2],
lower_bound + (upper_bound - lower_bound) * (
(rank - lower_count) / (upper_count - lower_count)
)
)
) AS value
FROM points
WHERE length(point) > 1
AND total > 0
)
SELECT
toUnixTimestamp(timestamp) AS ts_bucket,
group,
value AS data
FROM metrics
ORDER BY ts_bucket ASC
${
granularity != null
? `WITH FILL
FROM toUnixTimestamp(toStartOfInterval(toDateTime(?), INTERVAL ?))
TO toUnixTimestamp(toStartOfInterval(toDateTime(?), INTERVAL ?))
STEP ?`
: ''
}`.trim(),
[
SqlString.raw(
hasGroupBy
? `[${groupByColumnNames.join(',')}] as group`
: `[] as group`,
),
SqlString.raw(rateMetricSource),
quantile,
...(granularity != null
? [
startTime / 1000,
granularity,
endTime / 1000,
granularity,
ms(granularity) / 1000,
]
: []),
],
);
// TODO: merge this with the histogram query
const source = isRate ? rateMetricSource : gaugeMetricSource;
const query =
dataType === MetricsDataType.Histogram
? histogramQuery
: SqlString.format(
`
WITH metrics AS (?)
SELECT ?
FROM metrics
@ -1035,20 +1157,20 @@ export const buildMetricSeriesQuery = async ({
: ''
}
`,
[
SqlString.raw(isRate ? rateMetricSource : gaugeMetricSource),
SqlString.raw(selectClause.join(',')),
...(granularity != null
? [
startTime / 1000,
granularity,
endTime / 1000,
granularity,
ms(granularity) / 1000,
]
: []),
],
);
[
SqlString.raw(source),
SqlString.raw(selectClause.join(',')),
...(granularity != null
? [
startTime / 1000,
granularity,
endTime / 1000,
granularity,
ms(granularity) / 1000,
]
: []),
],
);
return {
query,

View file

@ -250,15 +250,15 @@ export function buildMetricSeries({
}: {
tags: Record<string, string>;
name: string;
points: { value: number; timestamp: number }[];
points: { value: number; timestamp: number; le?: string }[];
data_type: clickhouse.MetricsDataType;
is_monotonic: boolean;
is_delta: boolean;
unit: string;
}): MetricModel[] {
// @ts-ignore TODO: Fix Timestamp types
return points.map(({ value, timestamp }) => ({
_string_attributes: tags,
return points.map(({ value, timestamp, le }) => ({
_string_attributes: { ...tags, ...(le && { le }) },
name,
value,
timestamp: `${timestamp}000000`,