mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
feat: implement histogram linear interpolation quantile function (#253)
This commit is contained in:
parent
52d9b7160b
commit
82640b0af2
5 changed files with 1360 additions and 339 deletions
6
.changeset/bright-dancers-float.md
Normal file
6
.changeset/bright-dancers-float.md
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
'@hyperdx/api': patch
|
||||
'@hyperdx/app': patch
|
||||
---
|
||||
|
||||
feat: implement histogram linear interpolation quantile function
|
||||
|
|
@ -871,325 +871,6 @@ Array [
|
|||
`);
|
||||
});
|
||||
|
||||
describe('fetches multi series metric table chart correctly', () => {
|
||||
const now = new Date('2022-01-05').getTime();
|
||||
const runId = Math.random().toString(); // dedup watch mode runs
|
||||
const teamId = `test`;
|
||||
|
||||
beforeEach(async () => {
|
||||
// Rate: 8, 1, 8, 25
|
||||
await clickhouse.bulkInsertTeamMetricStream(
|
||||
buildMetricSeries({
|
||||
name: 'test.users',
|
||||
tags: { host: 'test1', runId, ip: '127.0.0.1' },
|
||||
data_type: clickhouse.MetricsDataType.Sum,
|
||||
is_monotonic: true,
|
||||
is_delta: true,
|
||||
unit: 'Users',
|
||||
points: [
|
||||
{ value: 0, timestamp: now - ms('1m') }, // 0
|
||||
{ value: 1, timestamp: now },
|
||||
{ value: 8, timestamp: now + ms('4m') }, // 8
|
||||
{ value: 8, timestamp: now + ms('6m') },
|
||||
{ value: 9, timestamp: now + ms('9m') }, // 9
|
||||
{ value: 15, timestamp: now + ms('11m') },
|
||||
{ value: 17, timestamp: now + ms('14m') }, // 17
|
||||
{ value: 32, timestamp: now + ms('16m') },
|
||||
{ value: 42, timestamp: now + ms('19m') }, // 42
|
||||
],
|
||||
}),
|
||||
);
|
||||
|
||||
// Rate: 11, 78, 5805, 78729
|
||||
// Sum: 12, 79, 5813, 78754
|
||||
await clickhouse.bulkInsertTeamMetricStream(
|
||||
buildMetricSeries({
|
||||
name: 'test.users',
|
||||
tags: { host: 'test2', runId, ip: '127.0.0.2' },
|
||||
data_type: clickhouse.MetricsDataType.Sum,
|
||||
is_monotonic: true,
|
||||
is_delta: true,
|
||||
unit: 'Users',
|
||||
points: [
|
||||
{ value: 3, timestamp: now - ms('1m') }, // 3
|
||||
{ value: 3, timestamp: now },
|
||||
{ value: 14, timestamp: now + ms('4m') }, // 14
|
||||
{ value: 15, timestamp: now + ms('6m') },
|
||||
{ value: 92, timestamp: now + ms('9m') }, // 92
|
||||
{ value: 653, timestamp: now + ms('11m') },
|
||||
{ value: 5897, timestamp: now + ms('14m') }, // 5897
|
||||
{ value: 9323, timestamp: now + ms('16m') },
|
||||
{ value: 84626, timestamp: now + ms('19m') }, // 84626
|
||||
],
|
||||
}),
|
||||
);
|
||||
|
||||
await clickhouse.bulkInsertTeamMetricStream(
|
||||
buildMetricSeries({
|
||||
name: 'test.cpu',
|
||||
tags: { host: 'test1', runId, ip: '127.0.0.1' },
|
||||
data_type: clickhouse.MetricsDataType.Gauge,
|
||||
is_monotonic: false,
|
||||
is_delta: false,
|
||||
unit: 'Percent',
|
||||
points: [
|
||||
{ value: 50, timestamp: now },
|
||||
{ value: 25, timestamp: now + ms('1m') },
|
||||
{ value: 12.5, timestamp: now + ms('2m') },
|
||||
{ value: 6.25, timestamp: now + ms('3m') }, // Last 5min
|
||||
{ value: 100, timestamp: now + ms('6m') },
|
||||
{ value: 75, timestamp: now + ms('7m') },
|
||||
{ value: 10, timestamp: now + ms('8m') },
|
||||
{ value: 80, timestamp: now + ms('9m') }, // Last 5min
|
||||
],
|
||||
}),
|
||||
);
|
||||
|
||||
await clickhouse.bulkInsertTeamMetricStream(
|
||||
buildMetricSeries({
|
||||
name: 'test.cpu',
|
||||
tags: { host: 'test2', runId, ip: '127.0.0.2' },
|
||||
data_type: clickhouse.MetricsDataType.Gauge,
|
||||
is_monotonic: false,
|
||||
is_delta: false,
|
||||
unit: 'Percent',
|
||||
points: [
|
||||
{ value: 1, timestamp: now },
|
||||
{ value: 2, timestamp: now + ms('1m') },
|
||||
{ value: 3, timestamp: now + ms('2m') },
|
||||
{ value: 4, timestamp: now + ms('3m') }, // Last 5min
|
||||
{ value: 5, timestamp: now + ms('6m') },
|
||||
{ value: 6, timestamp: now + ms('7m') },
|
||||
{ value: 5, timestamp: now + ms('8m') },
|
||||
{ value: 4, timestamp: now + ms('9m') }, // Last 5min
|
||||
],
|
||||
}),
|
||||
);
|
||||
|
||||
mockSpyMetricPropertyTypeMappingsModel({
|
||||
runId: 'string',
|
||||
host: 'string',
|
||||
});
|
||||
});
|
||||
|
||||
it('returns multiple group by labels correctly', async () => {
|
||||
const data = (
|
||||
await clickhouse.getMultiSeriesChart({
|
||||
series: [
|
||||
{
|
||||
type: 'time',
|
||||
table: 'metrics',
|
||||
aggFn: clickhouse.AggFn.LastValue,
|
||||
field: 'test.cpu',
|
||||
where: `runId:${runId}`,
|
||||
groupBy: ['host', 'ip'],
|
||||
metricDataType: clickhouse.MetricsDataType.Gauge,
|
||||
},
|
||||
],
|
||||
tableVersion: undefined,
|
||||
teamId,
|
||||
startTime: now,
|
||||
endTime: now + ms('20m'),
|
||||
granularity: undefined,
|
||||
maxNumGroups: 20,
|
||||
seriesReturnType: clickhouse.SeriesReturnType.Column,
|
||||
})
|
||||
).data.map(d => {
|
||||
return _.pick(d, ['group', 'series_0.data', 'ts_bucket']);
|
||||
});
|
||||
|
||||
expect(data).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"group": Array [
|
||||
"test1",
|
||||
"127.0.0.1",
|
||||
],
|
||||
"series_0.data": 80,
|
||||
"ts_bucket": "0",
|
||||
},
|
||||
Object {
|
||||
"group": Array [
|
||||
"test2",
|
||||
"127.0.0.2",
|
||||
],
|
||||
"series_0.data": 4,
|
||||
"ts_bucket": "0",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('gauge (last value)', async () => {
|
||||
const data = (
|
||||
await clickhouse.getMultiSeriesChart({
|
||||
series: [
|
||||
{
|
||||
type: 'time',
|
||||
table: 'metrics',
|
||||
aggFn: clickhouse.AggFn.LastValue,
|
||||
field: 'test.cpu',
|
||||
where: `runId:${runId}`,
|
||||
groupBy: ['host'],
|
||||
metricDataType: clickhouse.MetricsDataType.Gauge,
|
||||
},
|
||||
],
|
||||
tableVersion: undefined,
|
||||
teamId,
|
||||
startTime: now,
|
||||
endTime: now + ms('20m'),
|
||||
granularity: undefined,
|
||||
maxNumGroups: 20,
|
||||
seriesReturnType: clickhouse.SeriesReturnType.Column,
|
||||
})
|
||||
).data.map(d => {
|
||||
return _.pick(d, ['group', 'series_0.data', 'ts_bucket']);
|
||||
});
|
||||
|
||||
expect(data).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"group": Array [
|
||||
"test1",
|
||||
],
|
||||
"series_0.data": 80,
|
||||
"ts_bucket": "0",
|
||||
},
|
||||
Object {
|
||||
"group": Array [
|
||||
"test2",
|
||||
],
|
||||
"series_0.data": 4,
|
||||
"ts_bucket": "0",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('sum (rate) + gauge (avg)', async () => {
|
||||
const data = (
|
||||
await clickhouse.getMultiSeriesChart({
|
||||
series: [
|
||||
{
|
||||
type: 'table',
|
||||
table: 'metrics',
|
||||
aggFn: clickhouse.AggFn.SumRate,
|
||||
field: 'test.users',
|
||||
where: `runId:${runId}`,
|
||||
groupBy: [],
|
||||
metricDataType: clickhouse.MetricsDataType.Sum,
|
||||
},
|
||||
{
|
||||
type: 'time',
|
||||
table: 'metrics',
|
||||
aggFn: clickhouse.AggFn.Avg,
|
||||
field: 'test.cpu',
|
||||
where: `runId:${runId}`,
|
||||
groupBy: [],
|
||||
metricDataType: clickhouse.MetricsDataType.Gauge,
|
||||
},
|
||||
],
|
||||
tableVersion: undefined,
|
||||
teamId,
|
||||
startTime: now,
|
||||
endTime: now + ms('20m'),
|
||||
granularity: undefined,
|
||||
maxNumGroups: 20,
|
||||
seriesReturnType: clickhouse.SeriesReturnType.Column,
|
||||
})
|
||||
).data.map(d => {
|
||||
return _.pick(d, [
|
||||
'group',
|
||||
'series_0.data',
|
||||
'series_1.data',
|
||||
'ts_bucket',
|
||||
]);
|
||||
});
|
||||
|
||||
expect(data).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"group": Array [],
|
||||
"series_0.data": 84665,
|
||||
"series_1.data": 42,
|
||||
"ts_bucket": "0",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it('filters using postGroupWhere properly', async () => {
|
||||
const queryConfig: Parameters<typeof clickhouse.getMultiSeriesChart>[0] =
|
||||
{
|
||||
series: [
|
||||
{
|
||||
type: 'time',
|
||||
table: 'metrics',
|
||||
aggFn: clickhouse.AggFn.LastValue,
|
||||
field: 'test.cpu',
|
||||
where: `runId:${runId}`,
|
||||
groupBy: ['host'],
|
||||
metricDataType: clickhouse.MetricsDataType.Gauge,
|
||||
},
|
||||
],
|
||||
tableVersion: undefined,
|
||||
teamId,
|
||||
startTime: now,
|
||||
endTime: now + ms('20m'),
|
||||
granularity: undefined,
|
||||
maxNumGroups: 20,
|
||||
seriesReturnType: clickhouse.SeriesReturnType.Column,
|
||||
postGroupWhere: 'series_0:4',
|
||||
};
|
||||
|
||||
// Exclude postGroupWhere to assert we get the test data we expect at first
|
||||
const data = (
|
||||
await clickhouse.getMultiSeriesChart(
|
||||
_.omit(queryConfig, ['postGroupWhere']),
|
||||
)
|
||||
).data.map(d => {
|
||||
return _.pick(d, ['group', 'series_0.data', 'ts_bucket']);
|
||||
});
|
||||
|
||||
expect(data).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"group": Array [
|
||||
"test1",
|
||||
],
|
||||
"series_0.data": 80,
|
||||
"ts_bucket": "0",
|
||||
},
|
||||
Object {
|
||||
"group": Array [
|
||||
"test2",
|
||||
],
|
||||
"series_0.data": 4,
|
||||
"ts_bucket": "0",
|
||||
},
|
||||
]
|
||||
`);
|
||||
|
||||
const filteredData = (
|
||||
await clickhouse.getMultiSeriesChart(queryConfig)
|
||||
).data.map(d => {
|
||||
return _.pick(d, ['group', 'series_0.data', 'ts_bucket']);
|
||||
});
|
||||
|
||||
expect(filteredData).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"group": Array [
|
||||
"test2",
|
||||
],
|
||||
"series_0.data": 4,
|
||||
"ts_bucket": "0",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
});
|
||||
|
||||
it('limits groups and sorts multi series charts properly', async () => {
|
||||
const now = new Date('2022-01-05').getTime();
|
||||
const runId = Math.random().toString(); // dedup watch mode runs
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -898,7 +898,8 @@ export const buildMetricSeriesQuery = async ({
|
|||
|
||||
const hasGroupBy = groupByColumnNames.length > 0;
|
||||
|
||||
const shouldModifyStartTime = isRate;
|
||||
const shouldModifyStartTime =
|
||||
isRate || dataType === MetricsDataType.Histogram;
|
||||
|
||||
// If it's a rate function, then we'll need to look 1 window back to calculate
|
||||
// the initial rate value.
|
||||
|
|
@ -956,8 +957,12 @@ export const buildMetricSeriesQuery = async ({
|
|||
: '0.99'
|
||||
})(${isRate ? 'rate' : 'value'}) as data`,
|
||||
);
|
||||
} else if (dataType === MetricsDataType.Histogram) {
|
||||
if (!['p50', 'p90', 'p95', 'p99'].includes(aggFn)) {
|
||||
throw new Error(`Unsupported aggFn for Histogram: ${aggFn}`);
|
||||
}
|
||||
} else {
|
||||
logger.error(`Unsupported data type: ${dataType}`);
|
||||
throw new Error(`Unsupported data type: ${dataType}`);
|
||||
}
|
||||
|
||||
const startTimeUnixTs = Math.floor(startTime / 1000);
|
||||
|
|
@ -1019,8 +1024,125 @@ export const buildMetricSeriesQuery = async ({
|
|||
],
|
||||
);
|
||||
|
||||
const query = SqlString.format(
|
||||
const quantile =
|
||||
aggFn === AggFn.P50
|
||||
? '0.5'
|
||||
: aggFn === AggFn.P90
|
||||
? '0.90'
|
||||
: aggFn === AggFn.P95
|
||||
? '0.95'
|
||||
: '0.99';
|
||||
|
||||
// TODO:
|
||||
// 1. handle -Inf
|
||||
// 2. handle counter reset (https://prometheus.io/docs/prometheus/latest/querying/functions/#resets)
|
||||
// - Any increase/decrease in bucket resolution, etc (NOT IMPLEMENTED)
|
||||
const histogramQuery = SqlString.format(
|
||||
`
|
||||
WITH source AS (
|
||||
SELECT
|
||||
?,
|
||||
timestamp,
|
||||
name,
|
||||
toFloat64(sumIf(rate, rate > 0)) AS value,
|
||||
toFloat64OrDefault(_string_attributes['le'], inf) AS bucket
|
||||
FROM (?)
|
||||
WHERE mapContains(_string_attributes, 'le')
|
||||
GROUP BY name, group, bucket, timestamp
|
||||
), points AS (
|
||||
SELECT
|
||||
timestamp,
|
||||
name,
|
||||
group,
|
||||
arraySort((x) -> x[2],
|
||||
groupArray([
|
||||
value,
|
||||
bucket
|
||||
])
|
||||
) AS point,
|
||||
length(point) AS n
|
||||
FROM source
|
||||
GROUP BY timestamp, name, group
|
||||
), metrics AS (
|
||||
SELECT
|
||||
timestamp,
|
||||
name,
|
||||
group,
|
||||
point[n][1] AS total,
|
||||
toFloat64(?) * total AS rank,
|
||||
arrayFirstIndex(x -> x[1] > rank, point) AS upper_idx,
|
||||
point[upper_idx][1] AS upper_count,
|
||||
point[upper_idx][2] AS upper_bound,
|
||||
if (
|
||||
upper_idx = 1,
|
||||
if (
|
||||
point[upper_idx][2] > 0,
|
||||
0,
|
||||
inf
|
||||
),
|
||||
point[upper_idx - 1][2]
|
||||
) AS lower_bound,
|
||||
if (
|
||||
lower_bound = 0,
|
||||
0,
|
||||
point[upper_idx - 1][1]
|
||||
) AS lower_count,
|
||||
if (
|
||||
upper_bound = inf,
|
||||
point[upper_idx - 1][2],
|
||||
if (
|
||||
lower_bound = inf,
|
||||
point[1][2],
|
||||
lower_bound + (upper_bound - lower_bound) * (
|
||||
(rank - lower_count) / (upper_count - lower_count)
|
||||
)
|
||||
)
|
||||
) AS value
|
||||
FROM points
|
||||
WHERE length(point) > 1
|
||||
AND total > 0
|
||||
)
|
||||
SELECT
|
||||
toUnixTimestamp(timestamp) AS ts_bucket,
|
||||
group,
|
||||
value AS data
|
||||
FROM metrics
|
||||
ORDER BY ts_bucket ASC
|
||||
${
|
||||
granularity != null
|
||||
? `WITH FILL
|
||||
FROM toUnixTimestamp(toStartOfInterval(toDateTime(?), INTERVAL ?))
|
||||
TO toUnixTimestamp(toStartOfInterval(toDateTime(?), INTERVAL ?))
|
||||
STEP ?`
|
||||
: ''
|
||||
}`.trim(),
|
||||
[
|
||||
SqlString.raw(
|
||||
hasGroupBy
|
||||
? `[${groupByColumnNames.join(',')}] as group`
|
||||
: `[] as group`,
|
||||
),
|
||||
SqlString.raw(rateMetricSource),
|
||||
quantile,
|
||||
...(granularity != null
|
||||
? [
|
||||
startTime / 1000,
|
||||
granularity,
|
||||
endTime / 1000,
|
||||
granularity,
|
||||
ms(granularity) / 1000,
|
||||
]
|
||||
: []),
|
||||
],
|
||||
);
|
||||
|
||||
// TODO: merge this with the histogram query
|
||||
const source = isRate ? rateMetricSource : gaugeMetricSource;
|
||||
const query =
|
||||
dataType === MetricsDataType.Histogram
|
||||
? histogramQuery
|
||||
: SqlString.format(
|
||||
`
|
||||
WITH metrics AS (?)
|
||||
SELECT ?
|
||||
FROM metrics
|
||||
|
|
@ -1035,20 +1157,20 @@ export const buildMetricSeriesQuery = async ({
|
|||
: ''
|
||||
}
|
||||
`,
|
||||
[
|
||||
SqlString.raw(isRate ? rateMetricSource : gaugeMetricSource),
|
||||
SqlString.raw(selectClause.join(',')),
|
||||
...(granularity != null
|
||||
? [
|
||||
startTime / 1000,
|
||||
granularity,
|
||||
endTime / 1000,
|
||||
granularity,
|
||||
ms(granularity) / 1000,
|
||||
]
|
||||
: []),
|
||||
],
|
||||
);
|
||||
[
|
||||
SqlString.raw(source),
|
||||
SqlString.raw(selectClause.join(',')),
|
||||
...(granularity != null
|
||||
? [
|
||||
startTime / 1000,
|
||||
granularity,
|
||||
endTime / 1000,
|
||||
granularity,
|
||||
ms(granularity) / 1000,
|
||||
]
|
||||
: []),
|
||||
],
|
||||
);
|
||||
|
||||
return {
|
||||
query,
|
||||
|
|
|
|||
|
|
@ -250,15 +250,15 @@ export function buildMetricSeries({
|
|||
}: {
|
||||
tags: Record<string, string>;
|
||||
name: string;
|
||||
points: { value: number; timestamp: number }[];
|
||||
points: { value: number; timestamp: number; le?: string }[];
|
||||
data_type: clickhouse.MetricsDataType;
|
||||
is_monotonic: boolean;
|
||||
is_delta: boolean;
|
||||
unit: string;
|
||||
}): MetricModel[] {
|
||||
// @ts-ignore TODO: Fix Timestamp types
|
||||
return points.map(({ value, timestamp }) => ({
|
||||
_string_attributes: tags,
|
||||
return points.map(({ value, timestamp, le }) => ({
|
||||
_string_attributes: { ...tags, ...(le && { le }) },
|
||||
name,
|
||||
value,
|
||||
timestamp: `${timestamp}000000`,
|
||||
|
|
|
|||
Loading…
Reference in a new issue