mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
[HDX-3840] Optimize alerthistories aggregation queries to reduce DocDB CPU load (#1997)
## Summary - Replace chunked `$in` aggregations with per-alert queries to leverage compound indexes for index-backed sorting in DocumentDB - Eliminate N+1 query pattern from the alerts API endpoint by adding a concurrency-controlled batch function Linear: https://linear.app/clickhouse/issue/HDX-3840/docdb-ro-instances-high-cpu ## Problem Two aggregation queries on the `alerthistories` collection were causing sustained high CPU on DocDB read-only instances: 1. **`getPreviousAlertHistories`** (runs every minute via check-alerts cron) — used `$match: {alert: {$in: [50 ids]}}` which breaks index-backed sort optimization on the compound index `{alert: 1, group: 1, createdAt: -1}`. With a 7-day lookback, each chunk scanned ~500K documents and required an in-memory sort. Additionally, `$first: '$$ROOT'` prevented projection pushdown, forcing full document fetches. 2. **`getRecentAlertHistories`** (called from `GET /alerts` API) — fired one aggregation query per alert (N+1 pattern), multiplying load on every page view. ## Changes ### `getPreviousAlertHistories` (`packages/api/src/tasks/checkAlerts/index.ts`) - Replaced chunked `$in` batches (50 IDs per chunk) with **per-alert queries** using `PQueue({ concurrency: 20 })` - Each query matches on a single `alert` value, so the compound index `{alert: 1, group: 1, createdAt: -1}` delivers results already sorted — no in-memory sort needed - Replaced `$first: '$$ROOT'` with `$first: '$createdAt'` / `$first: '$state'` to allow projection pushdown (DocumentDB can avoid full document fetches) ### `getRecentAlertHistories` (`packages/api/src/controllers/alertHistory.ts`) - Added `getRecentAlertHistoriesBatch()` — runs per-alert queries with `PQueue({ concurrency: 20 })` to control parallelism - Each per-alert query uses the `{alert: 1, createdAt: -1}` index for a single-range scan ### Alerts API route (`packages/api/src/routers/api/alerts.ts`) - Replaced N individual `getRecentAlertHistories` calls with a single `getRecentAlertHistoriesBatch` call ## Why per-alert queries instead of `$in`? The `$in` operator on the leading field of a compound index produces multiple index range scans. The merged results are **not** globally sorted, forcing DocumentDB to perform an expensive in-memory sort. With per-alert queries, each query walks a single contiguous range in the index — sort is free, and `$group` + `$first` can short-circuit immediately per group. ## Testing - All 14 integration tests pass (`make dev-int FILE=alertHistory.test`) - Updated the batching test in `checkAlerts.test.ts` to verify per-alert query behavior - Added 5 new tests for `getRecentAlertHistoriesBatch` covering batch results, empty histories, per-alert limits, and ALERT state detection
This commit is contained in:
parent
56e60ef8c8
commit
acd117abcf
6 changed files with 348 additions and 121 deletions
5
.changeset/silly-pugs-beam.md
Normal file
5
.changeset/silly-pugs-beam.md
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
"@hyperdx/api": patch
|
||||
---
|
||||
|
||||
perf: Optimize alerthistories aggregation queries to reduce DocDB CPU load
|
||||
|
|
@ -1,6 +1,9 @@
|
|||
import { ObjectId } from 'mongodb';
|
||||
|
||||
import { getRecentAlertHistories } from '@/controllers/alertHistory';
|
||||
import {
|
||||
getRecentAlertHistories,
|
||||
getRecentAlertHistoriesBatch,
|
||||
} from '@/controllers/alertHistory';
|
||||
import { clearDBCollections, closeDB, connectDB } from '@/fixtures';
|
||||
import Alert, { AlertState } from '@/models/alert';
|
||||
import AlertHistory from '@/models/alertHistory';
|
||||
|
|
@ -365,4 +368,154 @@ describe('alertHistory controller', () => {
|
|||
expect(histories[0].counts).toBe(5);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getRecentAlertHistoriesBatch', () => {
|
||||
it('should return empty map when no alerts are provided', async () => {
|
||||
const result = await getRecentAlertHistoriesBatch([], 20);
|
||||
expect(result.size).toBe(0);
|
||||
});
|
||||
|
||||
it('should return histories for multiple alerts in a single batch call', async () => {
|
||||
const team = await Team.create({ name: 'Test Team' });
|
||||
const alert1 = await Alert.create({
|
||||
team: team._id,
|
||||
threshold: 100,
|
||||
interval: '5m',
|
||||
channel: { type: null },
|
||||
});
|
||||
const alert2 = await Alert.create({
|
||||
team: team._id,
|
||||
threshold: 200,
|
||||
interval: '5m',
|
||||
channel: { type: null },
|
||||
});
|
||||
|
||||
const now = new Date(Date.now() - 60000);
|
||||
const earlier = new Date(Date.now() - 120000);
|
||||
|
||||
await AlertHistory.create({
|
||||
alert: alert1._id,
|
||||
createdAt: now,
|
||||
state: AlertState.ALERT,
|
||||
counts: 5,
|
||||
lastValues: [{ startTime: now, count: 5 }],
|
||||
});
|
||||
await AlertHistory.create({
|
||||
alert: alert1._id,
|
||||
createdAt: earlier,
|
||||
state: AlertState.OK,
|
||||
counts: 0,
|
||||
lastValues: [{ startTime: earlier, count: 0 }],
|
||||
});
|
||||
await AlertHistory.create({
|
||||
alert: alert2._id,
|
||||
createdAt: now,
|
||||
state: AlertState.OK,
|
||||
counts: 1,
|
||||
lastValues: [{ startTime: now, count: 1 }],
|
||||
});
|
||||
|
||||
const result = await getRecentAlertHistoriesBatch(
|
||||
[
|
||||
{ alertId: new ObjectId(alert1._id), interval: '5m' },
|
||||
{ alertId: new ObjectId(alert2._id), interval: '5m' },
|
||||
],
|
||||
20,
|
||||
);
|
||||
|
||||
expect(result.size).toBe(2);
|
||||
|
||||
const alert1Histories = result.get(alert1._id.toString());
|
||||
expect(alert1Histories).toHaveLength(2);
|
||||
expect(alert1Histories![0].createdAt).toEqual(now);
|
||||
expect(alert1Histories![0].state).toBe(AlertState.ALERT);
|
||||
expect(alert1Histories![1].createdAt).toEqual(earlier);
|
||||
expect(alert1Histories![1].state).toBe(AlertState.OK);
|
||||
|
||||
const alert2Histories = result.get(alert2._id.toString());
|
||||
expect(alert2Histories).toHaveLength(1);
|
||||
expect(alert2Histories![0].state).toBe(AlertState.OK);
|
||||
expect(alert2Histories![0].counts).toBe(1);
|
||||
});
|
||||
|
||||
it('should return empty array for alerts with no history', async () => {
|
||||
const alertId = new ObjectId();
|
||||
|
||||
const result = await getRecentAlertHistoriesBatch(
|
||||
[{ alertId, interval: '5m' }],
|
||||
20,
|
||||
);
|
||||
|
||||
expect(result.size).toBe(1);
|
||||
expect(result.get(alertId.toString())).toEqual([]);
|
||||
});
|
||||
|
||||
it('should respect the limit parameter per alert', async () => {
|
||||
const team = await Team.create({ name: 'Test Team' });
|
||||
const alert = await Alert.create({
|
||||
team: team._id,
|
||||
threshold: 100,
|
||||
interval: '5m',
|
||||
channel: { type: null },
|
||||
});
|
||||
|
||||
// Create 5 histories
|
||||
for (let i = 0; i < 5; i++) {
|
||||
await AlertHistory.create({
|
||||
alert: alert._id,
|
||||
createdAt: new Date(Date.now() - i * 60000),
|
||||
state: AlertState.OK,
|
||||
counts: 0,
|
||||
lastValues: [
|
||||
{ startTime: new Date(Date.now() - i * 60000), count: 0 },
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
const result = await getRecentAlertHistoriesBatch(
|
||||
[{ alertId: new ObjectId(alert._id), interval: '5m' }],
|
||||
3,
|
||||
);
|
||||
|
||||
expect(result.get(alert._id.toString())).toHaveLength(3);
|
||||
});
|
||||
|
||||
it('should detect ALERT state when any grouped history has ALERT state', async () => {
|
||||
const team = await Team.create({ name: 'Test Team' });
|
||||
const alert = await Alert.create({
|
||||
team: team._id,
|
||||
threshold: 100,
|
||||
interval: '5m',
|
||||
channel: { type: null },
|
||||
});
|
||||
|
||||
const timestamp = new Date(Date.now() - 60000);
|
||||
|
||||
// Create histories with mixed states at the same timestamp
|
||||
await AlertHistory.create({
|
||||
alert: alert._id,
|
||||
createdAt: timestamp,
|
||||
state: AlertState.OK,
|
||||
counts: 0,
|
||||
lastValues: [{ startTime: timestamp, count: 0 }],
|
||||
});
|
||||
await AlertHistory.create({
|
||||
alert: alert._id,
|
||||
createdAt: timestamp,
|
||||
state: AlertState.ALERT,
|
||||
counts: 3,
|
||||
lastValues: [{ startTime: timestamp, count: 3 }],
|
||||
});
|
||||
|
||||
const result = await getRecentAlertHistoriesBatch(
|
||||
[{ alertId: new ObjectId(alert._id), interval: '5m' }],
|
||||
20,
|
||||
);
|
||||
|
||||
const histories = result.get(alert._id.toString());
|
||||
expect(histories).toHaveLength(1);
|
||||
expect(histories![0].state).toBe(AlertState.ALERT);
|
||||
expect(histories![0].counts).toBe(3);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import PQueue from '@esm2cjs/p-queue';
|
||||
import {
|
||||
ALERT_INTERVAL_TO_MINUTES,
|
||||
AlertInterval,
|
||||
|
|
@ -7,6 +8,9 @@ import { ObjectId } from 'mongodb';
|
|||
import { AlertState } from '@/models/alert';
|
||||
import AlertHistory, { IAlertHistory } from '@/models/alertHistory';
|
||||
|
||||
// Max parallel per-alert queries to avoid overwhelming the DB connection pool
|
||||
export const ALERT_HISTORY_QUERY_CONCURRENCY = 20;
|
||||
|
||||
type GroupedAlertHistory = {
|
||||
_id: Date;
|
||||
states: string[];
|
||||
|
|
@ -14,6 +18,21 @@ type GroupedAlertHistory = {
|
|||
lastValues: IAlertHistory['lastValues'][];
|
||||
};
|
||||
|
||||
function mapGroupedHistories(
|
||||
groupedHistories: GroupedAlertHistory[],
|
||||
): Omit<IAlertHistory, 'alert'>[] {
|
||||
return groupedHistories.map(group => ({
|
||||
createdAt: group._id,
|
||||
state: group.states.includes(AlertState.ALERT)
|
||||
? AlertState.ALERT
|
||||
: AlertState.OK,
|
||||
counts: group.counts,
|
||||
lastValues: group.lastValues
|
||||
.flat()
|
||||
.sort((a, b) => a.startTime.getTime() - b.startTime.getTime()),
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the most recent alert histories for a given alert ID,
|
||||
* limiting to the given number of entries.
|
||||
|
|
@ -61,14 +80,42 @@ export async function getRecentAlertHistories({
|
|||
},
|
||||
]);
|
||||
|
||||
return groupedHistories.map(group => ({
|
||||
createdAt: group._id,
|
||||
state: group.states.includes(AlertState.ALERT)
|
||||
? AlertState.ALERT
|
||||
: AlertState.OK,
|
||||
counts: group.counts,
|
||||
lastValues: group.lastValues
|
||||
.flat()
|
||||
.sort((a, b) => a.startTime.getTime() - b.startTime.getTime()),
|
||||
}));
|
||||
return mapGroupedHistories(groupedHistories);
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch-fetch recent alert histories for multiple alerts in parallel.
|
||||
*
|
||||
* Uses per-alert queries with concurrency control instead of a single
|
||||
* $in-based aggregation. This avoids the $in + $sort anti-pattern that
|
||||
* breaks index-backed sorting in DocumentDB, while eliminating the N+1
|
||||
* query pattern from the caller.
|
||||
*
|
||||
* Each per-alert query uses the compound index {alert: 1, createdAt: -1}
|
||||
* for an efficient single-range index scan.
|
||||
*/
|
||||
export async function getRecentAlertHistoriesBatch(
|
||||
alerts: { alertId: ObjectId; interval: AlertInterval }[],
|
||||
limit: number,
|
||||
): Promise<Map<string, Omit<IAlertHistory, 'alert'>[]>> {
|
||||
const queue = new PQueue({ concurrency: ALERT_HISTORY_QUERY_CONCURRENCY });
|
||||
|
||||
const entries = await Promise.all(
|
||||
alerts.map(({ alertId, interval }) =>
|
||||
queue.add(async () => {
|
||||
const histories = await getRecentAlertHistories({
|
||||
alertId,
|
||||
interval,
|
||||
limit,
|
||||
});
|
||||
return [alertId.toString(), histories] as const;
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
return new Map(
|
||||
entries.filter(
|
||||
(e): e is [string, Omit<IAlertHistory, 'alert'>[]] => e !== undefined,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import { ObjectId } from 'mongodb';
|
|||
import { z } from 'zod';
|
||||
import { processRequest, validateRequest } from 'zod-express-middleware';
|
||||
|
||||
import { getRecentAlertHistories } from '@/controllers/alertHistory';
|
||||
import { getRecentAlertHistoriesBatch } from '@/controllers/alertHistory';
|
||||
import {
|
||||
createAlert,
|
||||
deleteAlert,
|
||||
|
|
@ -29,65 +29,67 @@ router.get('/', async (req, res: AlertsExpRes, next) => {
|
|||
|
||||
const alerts = await getAlertsEnhanced(teamId);
|
||||
|
||||
const data = await Promise.all(
|
||||
alerts.map(async alert => {
|
||||
const history = await getRecentAlertHistories({
|
||||
alertId: new ObjectId(alert._id),
|
||||
interval: alert.interval,
|
||||
limit: 20,
|
||||
});
|
||||
|
||||
return {
|
||||
history,
|
||||
silenced: alert.silenced
|
||||
? {
|
||||
by: alert.silenced.by?.email,
|
||||
at: alert.silenced.at,
|
||||
until: alert.silenced.until,
|
||||
}
|
||||
: undefined,
|
||||
createdBy: alert.createdBy
|
||||
? pick(alert.createdBy, ['email', 'name'])
|
||||
: undefined,
|
||||
channel: pick(alert.channel, ['type']),
|
||||
...(alert.dashboard && {
|
||||
dashboardId: alert.dashboard._id,
|
||||
dashboard: {
|
||||
tiles: alert.dashboard.tiles
|
||||
.filter(tile => tile.id === alert.tileId)
|
||||
.map(tile => ({
|
||||
id: tile.id,
|
||||
config: { name: tile.config.name },
|
||||
})),
|
||||
...pick(alert.dashboard, ['_id', 'updatedAt', 'name', 'tags']),
|
||||
},
|
||||
}),
|
||||
...(alert.savedSearch && {
|
||||
savedSearchId: alert.savedSearch._id,
|
||||
savedSearch: pick(alert.savedSearch, [
|
||||
'_id',
|
||||
'createdAt',
|
||||
'name',
|
||||
'updatedAt',
|
||||
'tags',
|
||||
]),
|
||||
}),
|
||||
...pick(alert, [
|
||||
'_id',
|
||||
'interval',
|
||||
'scheduleOffsetMinutes',
|
||||
'scheduleStartAt',
|
||||
'threshold',
|
||||
'thresholdType',
|
||||
'state',
|
||||
'source',
|
||||
'tileId',
|
||||
'createdAt',
|
||||
'updatedAt',
|
||||
]),
|
||||
};
|
||||
}),
|
||||
const historyMap = await getRecentAlertHistoriesBatch(
|
||||
alerts.map(alert => ({
|
||||
alertId: new ObjectId(alert._id),
|
||||
interval: alert.interval,
|
||||
})),
|
||||
20,
|
||||
);
|
||||
|
||||
const data = alerts.map(alert => {
|
||||
const history = historyMap.get(alert._id.toString()) ?? [];
|
||||
|
||||
return {
|
||||
history,
|
||||
silenced: alert.silenced
|
||||
? {
|
||||
by: alert.silenced.by?.email,
|
||||
at: alert.silenced.at,
|
||||
until: alert.silenced.until,
|
||||
}
|
||||
: undefined,
|
||||
createdBy: alert.createdBy
|
||||
? pick(alert.createdBy, ['email', 'name'])
|
||||
: undefined,
|
||||
channel: pick(alert.channel, ['type']),
|
||||
...(alert.dashboard && {
|
||||
dashboardId: alert.dashboard._id,
|
||||
dashboard: {
|
||||
tiles: alert.dashboard.tiles
|
||||
.filter(tile => tile.id === alert.tileId)
|
||||
.map(tile => ({
|
||||
id: tile.id,
|
||||
config: { name: tile.config.name },
|
||||
})),
|
||||
...pick(alert.dashboard, ['_id', 'updatedAt', 'name', 'tags']),
|
||||
},
|
||||
}),
|
||||
...(alert.savedSearch && {
|
||||
savedSearchId: alert.savedSearch._id,
|
||||
savedSearch: pick(alert.savedSearch, [
|
||||
'_id',
|
||||
'createdAt',
|
||||
'name',
|
||||
'updatedAt',
|
||||
'tags',
|
||||
]),
|
||||
}),
|
||||
...pick(alert, [
|
||||
'_id',
|
||||
'interval',
|
||||
'scheduleOffsetMinutes',
|
||||
'scheduleStartAt',
|
||||
'threshold',
|
||||
'thresholdType',
|
||||
'state',
|
||||
'source',
|
||||
'tileId',
|
||||
'createdAt',
|
||||
'updatedAt',
|
||||
]),
|
||||
};
|
||||
});
|
||||
sendJson(res, { data });
|
||||
} catch (e) {
|
||||
next(e);
|
||||
|
|
|
|||
|
|
@ -5137,7 +5137,7 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('should batch alert IDs across multiple aggregation queries if necessary', async () => {
|
||||
it('should issue one aggregation per alert ID (per-alert queries)', async () => {
|
||||
const alert1Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
|
||||
|
|
@ -5148,16 +5148,22 @@ describe('checkAlerts', () => {
|
|||
|
||||
const aggregateSpy = jest.spyOn(AlertHistory, 'aggregate');
|
||||
|
||||
const fakeAlertIds = Array(150)
|
||||
.fill(null)
|
||||
.map(() => new mongoose.Types.ObjectId().toString());
|
||||
const allIds = [
|
||||
alert1Id.toString(),
|
||||
...fakeAlertIds,
|
||||
alert2Id.toString(),
|
||||
];
|
||||
|
||||
const result = await getPreviousAlertHistories(
|
||||
[
|
||||
alert1Id.toString(),
|
||||
...Array(150).fill(new mongoose.Types.ObjectId().toString()),
|
||||
alert2Id.toString(),
|
||||
],
|
||||
allIds,
|
||||
new Date('2025-01-01T00:20:00Z'),
|
||||
);
|
||||
|
||||
expect(aggregateSpy).toHaveBeenCalledTimes(4); // 152 ids, batch size 50 => 4 batches
|
||||
// One aggregation per alert ID (no chunking)
|
||||
expect(aggregateSpy).toHaveBeenCalledTimes(allIds.length);
|
||||
expect(result.size).toBe(2);
|
||||
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
|
||||
new Date('2025-01-01T00:05:00Z'),
|
||||
|
|
|
|||
|
|
@ -26,12 +26,13 @@ import {
|
|||
SourceKind,
|
||||
} from '@hyperdx/common-utils/dist/types';
|
||||
import * as fns from 'date-fns';
|
||||
import { chunk, isString } from 'lodash';
|
||||
import { isString } from 'lodash';
|
||||
import { ObjectId } from 'mongoose';
|
||||
import mongoose from 'mongoose';
|
||||
import ms from 'ms';
|
||||
import { serializeError } from 'serialize-error';
|
||||
|
||||
import { ALERT_HISTORY_QUERY_CONCURRENCY } from '@/controllers/alertHistory';
|
||||
import { AlertState, AlertThresholdType, IAlert } from '@/models/alert';
|
||||
import AlertHistory, { IAlertHistory } from '@/models/alertHistory';
|
||||
import { IDashboard } from '@/models/dashboard';
|
||||
|
|
@ -974,8 +975,13 @@ export interface AggregatedAlertHistory {
|
|||
* Fetch the most recent AlertHistory value for each of the given alert IDs.
|
||||
* For group-by alerts, returns the latest history for each group within each alert.
|
||||
*
|
||||
* Uses per-alert queries instead of batched $in to leverage the compound index
|
||||
* {alert: 1, group: 1, createdAt: -1} for index-backed sorting. With a single
|
||||
* alert value, the index delivers results already sorted by {group, createdAt desc},
|
||||
* so the $sort is a no-op and $group + $first can short-circuit per group.
|
||||
*
|
||||
* @param alertIds The list of alert IDs to query the latest history for.
|
||||
* @param now The current date and time. AlertHistory documents that have a createdBy > now are ignored.
|
||||
* @param now The current date and time. AlertHistory documents that have a createdAt > now are ignored.
|
||||
* @returns A map from Alert IDs (or Alert ID + group) to their most recent AlertHistory.
|
||||
* For non-grouped alerts, the key is just the alert ID.
|
||||
* For grouped alerts, the key is "alertId||group" to track per-group state.
|
||||
|
|
@ -984,58 +990,66 @@ export const getPreviousAlertHistories = async (
|
|||
alertIds: string[],
|
||||
now: Date,
|
||||
) => {
|
||||
// Group the alert IDs into chunks of 50 to avoid exceeding MongoDB's recommendation that $in lists be on the order of 10s of items
|
||||
const chunkedIds = chunk(
|
||||
alertIds.map(id => new mongoose.Types.ObjectId(id)),
|
||||
50,
|
||||
);
|
||||
|
||||
const lookbackDate = new Date(now.getTime() - ms('7d'));
|
||||
|
||||
const resultChunks = await Promise.all(
|
||||
chunkedIds.map(async ids =>
|
||||
AlertHistory.aggregate<AggregatedAlertHistory>([
|
||||
{
|
||||
$match: {
|
||||
alert: { $in: ids },
|
||||
createdAt: { $lte: now, $gte: lookbackDate },
|
||||
},
|
||||
},
|
||||
{
|
||||
$sort: { alert: 1, group: 1, createdAt: -1 },
|
||||
},
|
||||
// Group by alert ID AND group (if present), taking the first (latest) document for each combination
|
||||
{
|
||||
$group: {
|
||||
_id: {
|
||||
alert: '$alert',
|
||||
group: '$group',
|
||||
// Use a concurrency-limited queue to avoid overwhelming the connection pool
|
||||
// when there are many alerts (e.g., 200+ alert IDs).
|
||||
const queue = new PQueue({ concurrency: ALERT_HISTORY_QUERY_CONCURRENCY });
|
||||
|
||||
const results = await Promise.all(
|
||||
alertIds.map(alertId =>
|
||||
queue.add(async () => {
|
||||
const id = new mongoose.Types.ObjectId(alertId);
|
||||
return AlertHistory.aggregate<AggregatedAlertHistory>([
|
||||
{
|
||||
$match: {
|
||||
alert: id,
|
||||
createdAt: { $lte: now, $gte: lookbackDate },
|
||||
},
|
||||
latestDoc: { $first: '$$ROOT' },
|
||||
},
|
||||
},
|
||||
// Reshape and extract fields from the latest document
|
||||
{
|
||||
$project: {
|
||||
_id: '$_id.alert',
|
||||
createdAt: '$latestDoc.createdAt',
|
||||
state: '$latestDoc.state',
|
||||
group: '$_id.group',
|
||||
// With a single alert value, the compound index {alert: 1, group: 1, createdAt: -1}
|
||||
// delivers results already in this sort order — this is an index-backed no-op sort.
|
||||
{
|
||||
$sort: { alert: 1, group: 1, createdAt: -1 },
|
||||
},
|
||||
},
|
||||
]),
|
||||
// Group by {alert, group}, taking the first (latest) document's fields.
|
||||
// Using $first on individual fields instead of $first: '$$ROOT' allows
|
||||
// DocumentDB to avoid fetching full documents when not needed.
|
||||
{
|
||||
$group: {
|
||||
_id: {
|
||||
alert: '$alert',
|
||||
group: '$group',
|
||||
},
|
||||
createdAt: { $first: '$createdAt' },
|
||||
state: { $first: '$state' },
|
||||
},
|
||||
},
|
||||
{
|
||||
$project: {
|
||||
_id: '$_id.alert',
|
||||
createdAt: 1,
|
||||
state: 1,
|
||||
group: '$_id.group',
|
||||
},
|
||||
},
|
||||
]);
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
// Create a map with composite keys for grouped alerts (alertId||group) or simple keys for non-grouped alerts
|
||||
return new Map<string, AggregatedAlertHistory>(
|
||||
resultChunks.flat().map(history => {
|
||||
const key = computeHistoryMapKey(
|
||||
history._id.toString(),
|
||||
history.group || '',
|
||||
);
|
||||
return [key, history];
|
||||
}),
|
||||
results
|
||||
.flat()
|
||||
.filter((h): h is AggregatedAlertHistory => h !== undefined)
|
||||
.map(history => {
|
||||
const key = computeHistoryMapKey(
|
||||
history._id.toString(),
|
||||
history.group || '',
|
||||
);
|
||||
return [key, history];
|
||||
}),
|
||||
);
|
||||
};
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue