perf: Query AlertHistory in bulk (#1157)

This commit is contained in:
Drew Davis 2025-09-11 11:07:17 -04:00 committed by GitHub
parent bec64bfa7b
commit fd732a083d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 281 additions and 12 deletions

View file

@ -0,0 +1,5 @@
---
"@hyperdx/api": patch
---
perf: Query AlertHistory in bulk

View file

@ -1,4 +1,6 @@
import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse/node';
import { AlertState } from '@hyperdx/common-utils/dist/types';
import { create } from 'lodash';
import mongoose from 'mongoose';
import ms from 'ms';
@ -21,7 +23,11 @@ import { SavedSearch } from '@/models/savedSearch';
import { Source } from '@/models/source';
import Webhook from '@/models/webhook';
import * as checkAlert from '@/tasks/checkAlerts';
import { doesExceedThreshold, processAlert } from '@/tasks/checkAlerts';
import {
doesExceedThreshold,
getPreviousAlertHistories,
processAlert,
} from '@/tasks/checkAlerts';
import { AlertDetails, AlertTaskType, loadProvider } from '@/tasks/providers';
import {
AlertMessageTemplateDefaultView,
@ -756,39 +762,55 @@ describe('checkAlerts', () => {
clickhouseClient,
connection.id,
alertProvider,
undefined,
);
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
// skip since time diff is less than 1 window size
const later = new Date('2023-11-16T22:14:00.000Z');
const previousAlertsLater = await getPreviousAlertHistories(
[enhancedAlert.id],
later,
);
await processAlert(
later,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlertsLater.get(enhancedAlert.id),
);
// alert should still be in alert state
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
const previousAlertsNextWindow = await getPreviousAlertHistories(
[enhancedAlert.id],
nextWindow,
);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlertsNextWindow.get(enhancedAlert.id),
);
// alert should be in ok state
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextNextWindow = new Date('2023-11-16T22:20:00.000Z');
const previousAlertsNextNextWindow = await getPreviousAlertHistories(
[enhancedAlert.id],
nextNextWindow,
);
await processAlert(
nextNextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlertsNextNextWindow.get(enhancedAlert.id),
);
// alert should be in ok state
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
@ -996,28 +1018,39 @@ describe('checkAlerts', () => {
clickhouseClient,
connection.id,
alertProvider,
undefined,
);
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
// skip since time diff is less than 1 window size
const later = new Date('2023-11-16T22:14:00.000Z');
const previousAlertsLater = await getPreviousAlertHistories(
[enhancedAlert.id],
later,
);
await processAlert(
later,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlertsLater.get(enhancedAlert.id),
);
// alert should still be in alert state
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
const previousAlertsNextWindow = await getPreviousAlertHistories(
[enhancedAlert.id],
nextWindow,
);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlertsNextWindow.get(enhancedAlert.id),
);
// alert should be in ok state
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
@ -1226,28 +1259,39 @@ describe('checkAlerts', () => {
clickhouseClient,
connection.id,
alertProvider,
undefined,
);
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
// skip since time diff is less than 1 window size
const later = new Date('2023-11-16T22:14:00.000Z');
const previousAlertsLater = await getPreviousAlertHistories(
[enhancedAlert.id],
later,
);
await processAlert(
later,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlertsLater.get(enhancedAlert.id),
);
// alert should still be in alert state
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
const previousAlertsNextWindow = await getPreviousAlertHistories(
[enhancedAlert.id],
nextWindow,
);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlertsNextWindow.get(enhancedAlert.id),
);
// alert should be in ok state
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
@ -1436,28 +1480,39 @@ describe('checkAlerts', () => {
clickhouseClient,
connection.id,
alertProvider,
undefined,
);
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
// skip since time diff is less than 1 window size
const later = new Date('2023-11-16T22:14:00.000Z');
const previousAlertsLater = await getPreviousAlertHistories(
[enhancedAlert.id],
later,
);
await processAlert(
later,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlertsLater.get(enhancedAlert.id),
);
// alert should still be in alert state
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
const previousAlertsNextWindow = await getPreviousAlertHistories(
[enhancedAlert.id],
nextWindow,
);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlertsNextWindow.get(enhancedAlert.id),
);
// alert should be in ok state
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
@ -1506,4 +1561,144 @@ describe('checkAlerts', () => {
);
});
});
describe('getPreviousAlertHistories', () => {
const server = getServer();
beforeAll(async () => {
await server.start();
});
afterEach(async () => {
await server.clearDBs();
jest.clearAllMocks();
});
afterAll(async () => {
await server.stop();
});
const saveAlert = (id: mongoose.Types.ObjectId, createdAt: Date) => {
return new AlertHistory({
alert: id,
createdAt,
state: AlertState.OK,
}).save();
};
it('should return the latest alert history for each alert', async () => {
const alert1Id = new mongoose.Types.ObjectId();
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
const alert2Id = new mongoose.Types.ObjectId();
await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z'));
await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z'));
const result = await getPreviousAlertHistories(
[alert1Id.toString(), alert2Id.toString()],
new Date('2025-01-01T00:20:00Z'),
);
expect(result.size).toBe(2);
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
new Date('2025-01-01T00:05:00Z'),
);
expect(result.get(alert2Id.toString())!.createdAt).toEqual(
new Date('2025-01-01T00:15:00Z'),
);
});
it('should not return alert histories from the future', async () => {
const alert1Id = new mongoose.Types.ObjectId();
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
const alert2Id = new mongoose.Types.ObjectId();
await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z'));
await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z')); // This one is in the future
const result = await getPreviousAlertHistories(
[alert1Id.toString(), alert2Id.toString()],
new Date('2025-01-01T00:14:00Z'),
);
expect(result.size).toBe(2);
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
new Date('2025-01-01T00:05:00Z'),
);
expect(result.get(alert2Id.toString())!.createdAt).toEqual(
new Date('2025-01-01T00:10:00Z'),
);
});
it('should not return a history if there are no histories for the given alert', async () => {
const alert1Id = new mongoose.Types.ObjectId();
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
const alert2Id = new mongoose.Types.ObjectId();
const result = await getPreviousAlertHistories(
[alert1Id.toString(), alert2Id.toString()],
new Date('2025-01-01T00:20:00Z'),
);
expect(result.size).toBe(1);
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
new Date('2025-01-01T00:05:00Z'),
);
expect(result.get(alert2Id.toString())).toBeUndefined();
});
it('should not return a history for an alert that is not provided in the argument', async () => {
const alert1Id = new mongoose.Types.ObjectId();
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
const alert2Id = new mongoose.Types.ObjectId();
await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z'));
await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z'));
const result = await getPreviousAlertHistories(
[alert1Id.toString()],
new Date('2025-01-01T00:20:00Z'),
);
expect(result.size).toBe(1);
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
new Date('2025-01-01T00:05:00Z'),
);
});
it('should batch alert IDs across multiple aggregation queries if necessary', async () => {
const alert1Id = new mongoose.Types.ObjectId();
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
const alert2Id = new mongoose.Types.ObjectId();
await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z'));
await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z'));
const aggregateSpy = jest.spyOn(AlertHistory, 'aggregate');
const result = await getPreviousAlertHistories(
[
alert1Id.toString(),
...Array(150).fill(new mongoose.Types.ObjectId().toString()),
alert2Id.toString(),
],
new Date('2025-01-01T00:20:00Z'),
);
expect(aggregateSpy).toHaveBeenCalledTimes(4); // 152 ids, batch size 50 => 4 batches
expect(result.size).toBe(2);
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
new Date('2025-01-01T00:05:00Z'),
);
expect(result.get(alert2Id.toString())!.createdAt).toEqual(
new Date('2025-01-01T00:15:00Z'),
);
});
});
});

View file

@ -14,7 +14,7 @@ import Dashboard from '@/models/dashboard';
import { SavedSearch } from '@/models/savedSearch';
import { Source } from '@/models/source';
import Webhook from '@/models/webhook';
import { processAlert } from '@/tasks/checkAlerts';
import { getPreviousAlertHistories, processAlert } from '@/tasks/checkAlerts';
import { AlertTaskType, loadProvider } from '@/tasks/providers';
import {
AlertMessageTemplateDefaultView,
@ -193,12 +193,14 @@ describe('Single Invocation Alert Test', () => {
username: connection.username,
password: connection.password,
});
const previousAlerts = await getPreviousAlertHistories([alert.id], now);
await processAlert(
now,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlerts.get(alert.id),
);
// Verify alert state changed to ALERT (from DB)
@ -429,12 +431,14 @@ describe('Single Invocation Alert Test', () => {
}));
// Process alert - this triggers the webhook with the title
const previousAlerts = await getPreviousAlertHistories([alert.id], now);
await processAlert(
now,
details,
clickhouseClient,
connection.id,
alertProvider,
previousAlerts.get(alert.id),
);
// Get the webhook call to inspect the title

View file

@ -10,7 +10,9 @@ import {
DisplayType,
} from '@hyperdx/common-utils/dist/types';
import * as fns from 'date-fns';
import { isString } from 'lodash';
import { chunk, isString } from 'lodash';
import { ObjectId } from 'mongoose';
import mongoose from 'mongoose';
import ms from 'ms';
import { serializeError } from 'serialize-error';
@ -142,15 +144,10 @@ export const processAlert = async (
clickhouseClient: ClickhouseClient,
connectionId: string,
alertProvider: AlertProvider,
previous: AggregatedAlertHistory | undefined,
) => {
const { alert, source } = details;
try {
const previous: IAlertHistory | undefined = (
await AlertHistory.find({ alert: alert.id })
.sort({ createdAt: -1 })
.limit(1)
)[0];
const windowSizeInMins = ms(alert.interval) / 60000;
const nowInMinsRoundDown = roundDownToXMinutes(windowSizeInMins)(now);
if (
@ -370,6 +367,55 @@ export const processAlert = async (
// Re-export handleSendGenericWebhook for testing
export { handleSendGenericWebhook };
interface AggregatedAlertHistory {
_id: ObjectId;
createdAt: Date;
}
/**
* Fetch the most recent AlertHistory value for each of the given alert IDs.
*
* @param alertIds The list of alert IDs to query the latest history for.
* @param now The current date and time. AlertHistory documents that have a createdBy > now are ignored.
* @returns A map from Alert IDs to their most recent AlertHistory. If there are no
* AlertHistory documents for an Alert ID, that ID will not be a key in the map.
*/
export const getPreviousAlertHistories = async (
alertIds: string[],
now: Date,
) => {
// Group the alert IDs into chunks of 50 to avoid exceeding MongoDB's recommendation that $in lists be on the order of 10s of items
const chunkedIds = chunk(
alertIds.map(id => new mongoose.Types.ObjectId(id)),
50,
);
const resultChunks = await Promise.all(
chunkedIds.map(async ids =>
AlertHistory.aggregate<AggregatedAlertHistory>([
// Filter for the given alerts, and only entries created before "now"
{
$match: {
alert: { $in: ids },
createdAt: { $lte: now },
},
},
// Group by alert ID, taking the latest createdAt value for each group
{
$group: {
_id: '$alert',
createdAt: { $max: '$createdAt' },
},
},
]),
),
);
return new Map<string, AggregatedAlertHistory>(
resultChunks.flat().map(history => [history._id.toString(), history]),
);
};
export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
private provider!: AlertProvider;
private task_queue: PQueue;
@ -382,7 +428,11 @@ export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
});
}
async processAlertTask(now: Date, alertTask: AlertTask) {
async processAlertTask(
now: Date,
alertTask: AlertTask,
previousAlerts: Map<string, AggregatedAlertHistory>,
) {
const { alerts, conn } = alertTask;
logger.info({
message: 'Processing alerts in batch',
@ -405,8 +455,16 @@ export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
});
for (const alert of alerts) {
const previous = previousAlerts.get(alert.alert.id);
await this.task_queue.add(() =>
processAlert(now, alert, clickhouseClient, conn.id, this.provider),
processAlert(
now,
alert,
clickhouseClient,
conn.id,
this.provider,
previous,
),
);
}
}
@ -428,8 +486,15 @@ export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
taskCount: alertTasks.length,
});
const alertIds = alertTasks.flatMap(({ alerts }) =>
alerts.map(({ alert }) => alert.id),
);
const previousAlerts = await getPreviousAlertHistories(alertIds, now);
for (const task of alertTasks) {
await this.task_queue.add(() => this.processAlertTask(now, task));
await this.task_queue.add(() =>
this.processAlertTask(now, task, previousAlerts),
);
}
await this.task_queue.onIdle();