diff --git a/.changeset/mean-pigs-fold.md b/.changeset/mean-pigs-fold.md new file mode 100644 index 00000000..32e60723 --- /dev/null +++ b/.changeset/mean-pigs-fold.md @@ -0,0 +1,5 @@ +--- +"@hyperdx/api": patch +--- + +perf: Query AlertHistory in bulk diff --git a/packages/api/src/tasks/__tests__/checkAlerts.test.ts b/packages/api/src/tasks/__tests__/checkAlerts.test.ts index 91107492..d32c05aa 100644 --- a/packages/api/src/tasks/__tests__/checkAlerts.test.ts +++ b/packages/api/src/tasks/__tests__/checkAlerts.test.ts @@ -1,4 +1,6 @@ import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse/node'; +import { AlertState } from '@hyperdx/common-utils/dist/types'; +import { create } from 'lodash'; import mongoose from 'mongoose'; import ms from 'ms'; @@ -21,7 +23,11 @@ import { SavedSearch } from '@/models/savedSearch'; import { Source } from '@/models/source'; import Webhook from '@/models/webhook'; import * as checkAlert from '@/tasks/checkAlerts'; -import { doesExceedThreshold, processAlert } from '@/tasks/checkAlerts'; +import { + doesExceedThreshold, + getPreviousAlertHistories, + processAlert, +} from '@/tasks/checkAlerts'; import { AlertDetails, AlertTaskType, loadProvider } from '@/tasks/providers'; import { AlertMessageTemplateDefaultView, @@ -756,39 +762,55 @@ describe('checkAlerts', () => { clickhouseClient, connection.id, alertProvider, + undefined, ); expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT'); // skip since time diff is less than 1 window size const later = new Date('2023-11-16T22:14:00.000Z'); + const previousAlertsLater = await getPreviousAlertHistories( + [enhancedAlert.id], + later, + ); await processAlert( later, details, clickhouseClient, connection.id, alertProvider, + previousAlertsLater.get(enhancedAlert.id), ); // alert should still be in alert state expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT'); const nextWindow = new Date('2023-11-16T22:16:00.000Z'); + const previousAlertsNextWindow = await getPreviousAlertHistories( + [enhancedAlert.id], + nextWindow, + ); await processAlert( nextWindow, details, clickhouseClient, connection.id, alertProvider, + previousAlertsNextWindow.get(enhancedAlert.id), ); // alert should be in ok state expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT'); const nextNextWindow = new Date('2023-11-16T22:20:00.000Z'); + const previousAlertsNextNextWindow = await getPreviousAlertHistories( + [enhancedAlert.id], + nextNextWindow, + ); await processAlert( nextNextWindow, details, clickhouseClient, connection.id, alertProvider, + previousAlertsNextNextWindow.get(enhancedAlert.id), ); // alert should be in ok state expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK'); @@ -996,28 +1018,39 @@ describe('checkAlerts', () => { clickhouseClient, connection.id, alertProvider, + undefined, ); expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT'); // skip since time diff is less than 1 window size const later = new Date('2023-11-16T22:14:00.000Z'); + const previousAlertsLater = await getPreviousAlertHistories( + [enhancedAlert.id], + later, + ); await processAlert( later, details, clickhouseClient, connection.id, alertProvider, + previousAlertsLater.get(enhancedAlert.id), ); // alert should still be in alert state expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT'); const nextWindow = new Date('2023-11-16T22:16:00.000Z'); + const previousAlertsNextWindow = await getPreviousAlertHistories( + [enhancedAlert.id], + nextWindow, + ); await processAlert( nextWindow, details, clickhouseClient, connection.id, alertProvider, + previousAlertsNextWindow.get(enhancedAlert.id), ); // alert should be in ok state expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK'); @@ -1226,28 +1259,39 @@ describe('checkAlerts', () => { clickhouseClient, connection.id, alertProvider, + undefined, ); expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT'); // skip since time diff is less than 1 window size const later = new Date('2023-11-16T22:14:00.000Z'); + const previousAlertsLater = await getPreviousAlertHistories( + [enhancedAlert.id], + later, + ); await processAlert( later, details, clickhouseClient, connection.id, alertProvider, + previousAlertsLater.get(enhancedAlert.id), ); // alert should still be in alert state expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT'); const nextWindow = new Date('2023-11-16T22:16:00.000Z'); + const previousAlertsNextWindow = await getPreviousAlertHistories( + [enhancedAlert.id], + nextWindow, + ); await processAlert( nextWindow, details, clickhouseClient, connection.id, alertProvider, + previousAlertsNextWindow.get(enhancedAlert.id), ); // alert should be in ok state expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK'); @@ -1436,28 +1480,39 @@ describe('checkAlerts', () => { clickhouseClient, connection.id, alertProvider, + undefined, ); expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT'); // skip since time diff is less than 1 window size const later = new Date('2023-11-16T22:14:00.000Z'); + const previousAlertsLater = await getPreviousAlertHistories( + [enhancedAlert.id], + later, + ); await processAlert( later, details, clickhouseClient, connection.id, alertProvider, + previousAlertsLater.get(enhancedAlert.id), ); // alert should still be in alert state expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT'); const nextWindow = new Date('2023-11-16T22:16:00.000Z'); + const previousAlertsNextWindow = await getPreviousAlertHistories( + [enhancedAlert.id], + nextWindow, + ); await processAlert( nextWindow, details, clickhouseClient, connection.id, alertProvider, + previousAlertsNextWindow.get(enhancedAlert.id), ); // alert should be in ok state expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK'); @@ -1506,4 +1561,144 @@ describe('checkAlerts', () => { ); }); }); + + describe('getPreviousAlertHistories', () => { + const server = getServer(); + + beforeAll(async () => { + await server.start(); + }); + + afterEach(async () => { + await server.clearDBs(); + jest.clearAllMocks(); + }); + + afterAll(async () => { + await server.stop(); + }); + + const saveAlert = (id: mongoose.Types.ObjectId, createdAt: Date) => { + return new AlertHistory({ + alert: id, + createdAt, + state: AlertState.OK, + }).save(); + }; + + it('should return the latest alert history for each alert', async () => { + const alert1Id = new mongoose.Types.ObjectId(); + await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z')); + await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z')); + + const alert2Id = new mongoose.Types.ObjectId(); + await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z')); + await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z')); + + const result = await getPreviousAlertHistories( + [alert1Id.toString(), alert2Id.toString()], + new Date('2025-01-01T00:20:00Z'), + ); + + expect(result.size).toBe(2); + expect(result.get(alert1Id.toString())!.createdAt).toEqual( + new Date('2025-01-01T00:05:00Z'), + ); + expect(result.get(alert2Id.toString())!.createdAt).toEqual( + new Date('2025-01-01T00:15:00Z'), + ); + }); + + it('should not return alert histories from the future', async () => { + const alert1Id = new mongoose.Types.ObjectId(); + await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z')); + await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z')); + + const alert2Id = new mongoose.Types.ObjectId(); + await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z')); + await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z')); // This one is in the future + + const result = await getPreviousAlertHistories( + [alert1Id.toString(), alert2Id.toString()], + new Date('2025-01-01T00:14:00Z'), + ); + + expect(result.size).toBe(2); + expect(result.get(alert1Id.toString())!.createdAt).toEqual( + new Date('2025-01-01T00:05:00Z'), + ); + expect(result.get(alert2Id.toString())!.createdAt).toEqual( + new Date('2025-01-01T00:10:00Z'), + ); + }); + + it('should not return a history if there are no histories for the given alert', async () => { + const alert1Id = new mongoose.Types.ObjectId(); + await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z')); + await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z')); + + const alert2Id = new mongoose.Types.ObjectId(); + + const result = await getPreviousAlertHistories( + [alert1Id.toString(), alert2Id.toString()], + new Date('2025-01-01T00:20:00Z'), + ); + + expect(result.size).toBe(1); + expect(result.get(alert1Id.toString())!.createdAt).toEqual( + new Date('2025-01-01T00:05:00Z'), + ); + expect(result.get(alert2Id.toString())).toBeUndefined(); + }); + + it('should not return a history for an alert that is not provided in the argument', async () => { + const alert1Id = new mongoose.Types.ObjectId(); + await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z')); + await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z')); + + const alert2Id = new mongoose.Types.ObjectId(); + await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z')); + await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z')); + + const result = await getPreviousAlertHistories( + [alert1Id.toString()], + new Date('2025-01-01T00:20:00Z'), + ); + + expect(result.size).toBe(1); + expect(result.get(alert1Id.toString())!.createdAt).toEqual( + new Date('2025-01-01T00:05:00Z'), + ); + }); + + it('should batch alert IDs across multiple aggregation queries if necessary', async () => { + const alert1Id = new mongoose.Types.ObjectId(); + await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z')); + await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z')); + + const alert2Id = new mongoose.Types.ObjectId(); + await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z')); + await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z')); + + const aggregateSpy = jest.spyOn(AlertHistory, 'aggregate'); + + const result = await getPreviousAlertHistories( + [ + alert1Id.toString(), + ...Array(150).fill(new mongoose.Types.ObjectId().toString()), + alert2Id.toString(), + ], + new Date('2025-01-01T00:20:00Z'), + ); + + expect(aggregateSpy).toHaveBeenCalledTimes(4); // 152 ids, batch size 50 => 4 batches + expect(result.size).toBe(2); + expect(result.get(alert1Id.toString())!.createdAt).toEqual( + new Date('2025-01-01T00:05:00Z'), + ); + expect(result.get(alert2Id.toString())!.createdAt).toEqual( + new Date('2025-01-01T00:15:00Z'), + ); + }); + }); }); diff --git a/packages/api/src/tasks/__tests__/singleInvocationAlert.test.ts b/packages/api/src/tasks/__tests__/singleInvocationAlert.test.ts index 99090daf..e8850169 100644 --- a/packages/api/src/tasks/__tests__/singleInvocationAlert.test.ts +++ b/packages/api/src/tasks/__tests__/singleInvocationAlert.test.ts @@ -14,7 +14,7 @@ import Dashboard from '@/models/dashboard'; import { SavedSearch } from '@/models/savedSearch'; import { Source } from '@/models/source'; import Webhook from '@/models/webhook'; -import { processAlert } from '@/tasks/checkAlerts'; +import { getPreviousAlertHistories, processAlert } from '@/tasks/checkAlerts'; import { AlertTaskType, loadProvider } from '@/tasks/providers'; import { AlertMessageTemplateDefaultView, @@ -193,12 +193,14 @@ describe('Single Invocation Alert Test', () => { username: connection.username, password: connection.password, }); + const previousAlerts = await getPreviousAlertHistories([alert.id], now); await processAlert( now, details, clickhouseClient, connection.id, alertProvider, + previousAlerts.get(alert.id), ); // Verify alert state changed to ALERT (from DB) @@ -429,12 +431,14 @@ describe('Single Invocation Alert Test', () => { })); // Process alert - this triggers the webhook with the title + const previousAlerts = await getPreviousAlertHistories([alert.id], now); await processAlert( now, details, clickhouseClient, connection.id, alertProvider, + previousAlerts.get(alert.id), ); // Get the webhook call to inspect the title diff --git a/packages/api/src/tasks/checkAlerts.ts b/packages/api/src/tasks/checkAlerts.ts index 43524f47..65c7928b 100644 --- a/packages/api/src/tasks/checkAlerts.ts +++ b/packages/api/src/tasks/checkAlerts.ts @@ -10,7 +10,9 @@ import { DisplayType, } from '@hyperdx/common-utils/dist/types'; import * as fns from 'date-fns'; -import { isString } from 'lodash'; +import { chunk, isString } from 'lodash'; +import { ObjectId } from 'mongoose'; +import mongoose from 'mongoose'; import ms from 'ms'; import { serializeError } from 'serialize-error'; @@ -142,15 +144,10 @@ export const processAlert = async ( clickhouseClient: ClickhouseClient, connectionId: string, alertProvider: AlertProvider, + previous: AggregatedAlertHistory | undefined, ) => { const { alert, source } = details; try { - const previous: IAlertHistory | undefined = ( - await AlertHistory.find({ alert: alert.id }) - .sort({ createdAt: -1 }) - .limit(1) - )[0]; - const windowSizeInMins = ms(alert.interval) / 60000; const nowInMinsRoundDown = roundDownToXMinutes(windowSizeInMins)(now); if ( @@ -370,6 +367,55 @@ export const processAlert = async ( // Re-export handleSendGenericWebhook for testing export { handleSendGenericWebhook }; +interface AggregatedAlertHistory { + _id: ObjectId; + createdAt: Date; +} + +/** + * Fetch the most recent AlertHistory value for each of the given alert IDs. + * + * @param alertIds The list of alert IDs to query the latest history for. + * @param now The current date and time. AlertHistory documents that have a createdBy > now are ignored. + * @returns A map from Alert IDs to their most recent AlertHistory. If there are no + * AlertHistory documents for an Alert ID, that ID will not be a key in the map. + */ +export const getPreviousAlertHistories = async ( + alertIds: string[], + now: Date, +) => { + // Group the alert IDs into chunks of 50 to avoid exceeding MongoDB's recommendation that $in lists be on the order of 10s of items + const chunkedIds = chunk( + alertIds.map(id => new mongoose.Types.ObjectId(id)), + 50, + ); + + const resultChunks = await Promise.all( + chunkedIds.map(async ids => + AlertHistory.aggregate([ + // Filter for the given alerts, and only entries created before "now" + { + $match: { + alert: { $in: ids }, + createdAt: { $lte: now }, + }, + }, + // Group by alert ID, taking the latest createdAt value for each group + { + $group: { + _id: '$alert', + createdAt: { $max: '$createdAt' }, + }, + }, + ]), + ), + ); + + return new Map( + resultChunks.flat().map(history => [history._id.toString(), history]), + ); +}; + export default class CheckAlertTask implements HdxTask { private provider!: AlertProvider; private task_queue: PQueue; @@ -382,7 +428,11 @@ export default class CheckAlertTask implements HdxTask { }); } - async processAlertTask(now: Date, alertTask: AlertTask) { + async processAlertTask( + now: Date, + alertTask: AlertTask, + previousAlerts: Map, + ) { const { alerts, conn } = alertTask; logger.info({ message: 'Processing alerts in batch', @@ -405,8 +455,16 @@ export default class CheckAlertTask implements HdxTask { }); for (const alert of alerts) { + const previous = previousAlerts.get(alert.alert.id); await this.task_queue.add(() => - processAlert(now, alert, clickhouseClient, conn.id, this.provider), + processAlert( + now, + alert, + clickhouseClient, + conn.id, + this.provider, + previous, + ), ); } } @@ -428,8 +486,15 @@ export default class CheckAlertTask implements HdxTask { taskCount: alertTasks.length, }); + const alertIds = alertTasks.flatMap(({ alerts }) => + alerts.map(({ alert }) => alert.id), + ); + const previousAlerts = await getPreviousAlertHistories(alertIds, now); + for (const task of alertTasks) { - await this.task_queue.add(() => this.processAlertTask(now, task)); + await this.task_queue.add(() => + this.processAlertTask(now, task, previousAlerts), + ); } await this.task_queue.onIdle();