mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
perf: Query AlertHistory in bulk (#1157)
This commit is contained in:
parent
bec64bfa7b
commit
fd732a083d
4 changed files with 281 additions and 12 deletions
5
.changeset/mean-pigs-fold.md
Normal file
5
.changeset/mean-pigs-fold.md
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
"@hyperdx/api": patch
|
||||
---
|
||||
|
||||
perf: Query AlertHistory in bulk
|
||||
|
|
@ -1,4 +1,6 @@
|
|||
import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse/node';
|
||||
import { AlertState } from '@hyperdx/common-utils/dist/types';
|
||||
import { create } from 'lodash';
|
||||
import mongoose from 'mongoose';
|
||||
import ms from 'ms';
|
||||
|
||||
|
|
@ -21,7 +23,11 @@ import { SavedSearch } from '@/models/savedSearch';
|
|||
import { Source } from '@/models/source';
|
||||
import Webhook from '@/models/webhook';
|
||||
import * as checkAlert from '@/tasks/checkAlerts';
|
||||
import { doesExceedThreshold, processAlert } from '@/tasks/checkAlerts';
|
||||
import {
|
||||
doesExceedThreshold,
|
||||
getPreviousAlertHistories,
|
||||
processAlert,
|
||||
} from '@/tasks/checkAlerts';
|
||||
import { AlertDetails, AlertTaskType, loadProvider } from '@/tasks/providers';
|
||||
import {
|
||||
AlertMessageTemplateDefaultView,
|
||||
|
|
@ -756,39 +762,55 @@ describe('checkAlerts', () => {
|
|||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
undefined,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
// skip since time diff is less than 1 window size
|
||||
const later = new Date('2023-11-16T22:14:00.000Z');
|
||||
const previousAlertsLater = await getPreviousAlertHistories(
|
||||
[enhancedAlert.id],
|
||||
later,
|
||||
);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsLater.get(enhancedAlert.id),
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
|
||||
const previousAlertsNextWindow = await getPreviousAlertHistories(
|
||||
[enhancedAlert.id],
|
||||
nextWindow,
|
||||
);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextNextWindow = new Date('2023-11-16T22:20:00.000Z');
|
||||
const previousAlertsNextNextWindow = await getPreviousAlertHistories(
|
||||
[enhancedAlert.id],
|
||||
nextNextWindow,
|
||||
);
|
||||
await processAlert(
|
||||
nextNextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextNextWindow.get(enhancedAlert.id),
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
|
@ -996,28 +1018,39 @@ describe('checkAlerts', () => {
|
|||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
undefined,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
// skip since time diff is less than 1 window size
|
||||
const later = new Date('2023-11-16T22:14:00.000Z');
|
||||
const previousAlertsLater = await getPreviousAlertHistories(
|
||||
[enhancedAlert.id],
|
||||
later,
|
||||
);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsLater.get(enhancedAlert.id),
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
|
||||
const previousAlertsNextWindow = await getPreviousAlertHistories(
|
||||
[enhancedAlert.id],
|
||||
nextWindow,
|
||||
);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
|
@ -1226,28 +1259,39 @@ describe('checkAlerts', () => {
|
|||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
undefined,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
// skip since time diff is less than 1 window size
|
||||
const later = new Date('2023-11-16T22:14:00.000Z');
|
||||
const previousAlertsLater = await getPreviousAlertHistories(
|
||||
[enhancedAlert.id],
|
||||
later,
|
||||
);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsLater.get(enhancedAlert.id),
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
|
||||
const previousAlertsNextWindow = await getPreviousAlertHistories(
|
||||
[enhancedAlert.id],
|
||||
nextWindow,
|
||||
);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
|
@ -1436,28 +1480,39 @@ describe('checkAlerts', () => {
|
|||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
undefined,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
// skip since time diff is less than 1 window size
|
||||
const later = new Date('2023-11-16T22:14:00.000Z');
|
||||
const previousAlertsLater = await getPreviousAlertHistories(
|
||||
[enhancedAlert.id],
|
||||
later,
|
||||
);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsLater.get(enhancedAlert.id),
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
|
||||
const previousAlertsNextWindow = await getPreviousAlertHistories(
|
||||
[enhancedAlert.id],
|
||||
nextWindow,
|
||||
);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
|
@ -1506,4 +1561,144 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getPreviousAlertHistories', () => {
|
||||
const server = getServer();
|
||||
|
||||
beforeAll(async () => {
|
||||
await server.start();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await server.clearDBs();
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await server.stop();
|
||||
});
|
||||
|
||||
const saveAlert = (id: mongoose.Types.ObjectId, createdAt: Date) => {
|
||||
return new AlertHistory({
|
||||
alert: id,
|
||||
createdAt,
|
||||
state: AlertState.OK,
|
||||
}).save();
|
||||
};
|
||||
|
||||
it('should return the latest alert history for each alert', async () => {
|
||||
const alert1Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
|
||||
|
||||
const alert2Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z'));
|
||||
await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z'));
|
||||
|
||||
const result = await getPreviousAlertHistories(
|
||||
[alert1Id.toString(), alert2Id.toString()],
|
||||
new Date('2025-01-01T00:20:00Z'),
|
||||
);
|
||||
|
||||
expect(result.size).toBe(2);
|
||||
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
|
||||
new Date('2025-01-01T00:05:00Z'),
|
||||
);
|
||||
expect(result.get(alert2Id.toString())!.createdAt).toEqual(
|
||||
new Date('2025-01-01T00:15:00Z'),
|
||||
);
|
||||
});
|
||||
|
||||
it('should not return alert histories from the future', async () => {
|
||||
const alert1Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
|
||||
|
||||
const alert2Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z'));
|
||||
await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z')); // This one is in the future
|
||||
|
||||
const result = await getPreviousAlertHistories(
|
||||
[alert1Id.toString(), alert2Id.toString()],
|
||||
new Date('2025-01-01T00:14:00Z'),
|
||||
);
|
||||
|
||||
expect(result.size).toBe(2);
|
||||
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
|
||||
new Date('2025-01-01T00:05:00Z'),
|
||||
);
|
||||
expect(result.get(alert2Id.toString())!.createdAt).toEqual(
|
||||
new Date('2025-01-01T00:10:00Z'),
|
||||
);
|
||||
});
|
||||
|
||||
it('should not return a history if there are no histories for the given alert', async () => {
|
||||
const alert1Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
|
||||
|
||||
const alert2Id = new mongoose.Types.ObjectId();
|
||||
|
||||
const result = await getPreviousAlertHistories(
|
||||
[alert1Id.toString(), alert2Id.toString()],
|
||||
new Date('2025-01-01T00:20:00Z'),
|
||||
);
|
||||
|
||||
expect(result.size).toBe(1);
|
||||
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
|
||||
new Date('2025-01-01T00:05:00Z'),
|
||||
);
|
||||
expect(result.get(alert2Id.toString())).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should not return a history for an alert that is not provided in the argument', async () => {
|
||||
const alert1Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
|
||||
|
||||
const alert2Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z'));
|
||||
await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z'));
|
||||
|
||||
const result = await getPreviousAlertHistories(
|
||||
[alert1Id.toString()],
|
||||
new Date('2025-01-01T00:20:00Z'),
|
||||
);
|
||||
|
||||
expect(result.size).toBe(1);
|
||||
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
|
||||
new Date('2025-01-01T00:05:00Z'),
|
||||
);
|
||||
});
|
||||
|
||||
it('should batch alert IDs across multiple aggregation queries if necessary', async () => {
|
||||
const alert1Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:00:00Z'));
|
||||
await saveAlert(alert1Id, new Date('2025-01-01T00:05:00Z'));
|
||||
|
||||
const alert2Id = new mongoose.Types.ObjectId();
|
||||
await saveAlert(alert2Id, new Date('2025-01-01T00:10:00Z'));
|
||||
await saveAlert(alert2Id, new Date('2025-01-01T00:15:00Z'));
|
||||
|
||||
const aggregateSpy = jest.spyOn(AlertHistory, 'aggregate');
|
||||
|
||||
const result = await getPreviousAlertHistories(
|
||||
[
|
||||
alert1Id.toString(),
|
||||
...Array(150).fill(new mongoose.Types.ObjectId().toString()),
|
||||
alert2Id.toString(),
|
||||
],
|
||||
new Date('2025-01-01T00:20:00Z'),
|
||||
);
|
||||
|
||||
expect(aggregateSpy).toHaveBeenCalledTimes(4); // 152 ids, batch size 50 => 4 batches
|
||||
expect(result.size).toBe(2);
|
||||
expect(result.get(alert1Id.toString())!.createdAt).toEqual(
|
||||
new Date('2025-01-01T00:05:00Z'),
|
||||
);
|
||||
expect(result.get(alert2Id.toString())!.createdAt).toEqual(
|
||||
new Date('2025-01-01T00:15:00Z'),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import Dashboard from '@/models/dashboard';
|
|||
import { SavedSearch } from '@/models/savedSearch';
|
||||
import { Source } from '@/models/source';
|
||||
import Webhook from '@/models/webhook';
|
||||
import { processAlert } from '@/tasks/checkAlerts';
|
||||
import { getPreviousAlertHistories, processAlert } from '@/tasks/checkAlerts';
|
||||
import { AlertTaskType, loadProvider } from '@/tasks/providers';
|
||||
import {
|
||||
AlertMessageTemplateDefaultView,
|
||||
|
|
@ -193,12 +193,14 @@ describe('Single Invocation Alert Test', () => {
|
|||
username: connection.username,
|
||||
password: connection.password,
|
||||
});
|
||||
const previousAlerts = await getPreviousAlertHistories([alert.id], now);
|
||||
await processAlert(
|
||||
now,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlerts.get(alert.id),
|
||||
);
|
||||
|
||||
// Verify alert state changed to ALERT (from DB)
|
||||
|
|
@ -429,12 +431,14 @@ describe('Single Invocation Alert Test', () => {
|
|||
}));
|
||||
|
||||
// Process alert - this triggers the webhook with the title
|
||||
const previousAlerts = await getPreviousAlertHistories([alert.id], now);
|
||||
await processAlert(
|
||||
now,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlerts.get(alert.id),
|
||||
);
|
||||
|
||||
// Get the webhook call to inspect the title
|
||||
|
|
|
|||
|
|
@ -10,7 +10,9 @@ import {
|
|||
DisplayType,
|
||||
} from '@hyperdx/common-utils/dist/types';
|
||||
import * as fns from 'date-fns';
|
||||
import { isString } from 'lodash';
|
||||
import { chunk, isString } from 'lodash';
|
||||
import { ObjectId } from 'mongoose';
|
||||
import mongoose from 'mongoose';
|
||||
import ms from 'ms';
|
||||
import { serializeError } from 'serialize-error';
|
||||
|
||||
|
|
@ -142,15 +144,10 @@ export const processAlert = async (
|
|||
clickhouseClient: ClickhouseClient,
|
||||
connectionId: string,
|
||||
alertProvider: AlertProvider,
|
||||
previous: AggregatedAlertHistory | undefined,
|
||||
) => {
|
||||
const { alert, source } = details;
|
||||
try {
|
||||
const previous: IAlertHistory | undefined = (
|
||||
await AlertHistory.find({ alert: alert.id })
|
||||
.sort({ createdAt: -1 })
|
||||
.limit(1)
|
||||
)[0];
|
||||
|
||||
const windowSizeInMins = ms(alert.interval) / 60000;
|
||||
const nowInMinsRoundDown = roundDownToXMinutes(windowSizeInMins)(now);
|
||||
if (
|
||||
|
|
@ -370,6 +367,55 @@ export const processAlert = async (
|
|||
// Re-export handleSendGenericWebhook for testing
|
||||
export { handleSendGenericWebhook };
|
||||
|
||||
interface AggregatedAlertHistory {
|
||||
_id: ObjectId;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the most recent AlertHistory value for each of the given alert IDs.
|
||||
*
|
||||
* @param alertIds The list of alert IDs to query the latest history for.
|
||||
* @param now The current date and time. AlertHistory documents that have a createdBy > now are ignored.
|
||||
* @returns A map from Alert IDs to their most recent AlertHistory. If there are no
|
||||
* AlertHistory documents for an Alert ID, that ID will not be a key in the map.
|
||||
*/
|
||||
export const getPreviousAlertHistories = async (
|
||||
alertIds: string[],
|
||||
now: Date,
|
||||
) => {
|
||||
// Group the alert IDs into chunks of 50 to avoid exceeding MongoDB's recommendation that $in lists be on the order of 10s of items
|
||||
const chunkedIds = chunk(
|
||||
alertIds.map(id => new mongoose.Types.ObjectId(id)),
|
||||
50,
|
||||
);
|
||||
|
||||
const resultChunks = await Promise.all(
|
||||
chunkedIds.map(async ids =>
|
||||
AlertHistory.aggregate<AggregatedAlertHistory>([
|
||||
// Filter for the given alerts, and only entries created before "now"
|
||||
{
|
||||
$match: {
|
||||
alert: { $in: ids },
|
||||
createdAt: { $lte: now },
|
||||
},
|
||||
},
|
||||
// Group by alert ID, taking the latest createdAt value for each group
|
||||
{
|
||||
$group: {
|
||||
_id: '$alert',
|
||||
createdAt: { $max: '$createdAt' },
|
||||
},
|
||||
},
|
||||
]),
|
||||
),
|
||||
);
|
||||
|
||||
return new Map<string, AggregatedAlertHistory>(
|
||||
resultChunks.flat().map(history => [history._id.toString(), history]),
|
||||
);
|
||||
};
|
||||
|
||||
export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
|
||||
private provider!: AlertProvider;
|
||||
private task_queue: PQueue;
|
||||
|
|
@ -382,7 +428,11 @@ export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
|
|||
});
|
||||
}
|
||||
|
||||
async processAlertTask(now: Date, alertTask: AlertTask) {
|
||||
async processAlertTask(
|
||||
now: Date,
|
||||
alertTask: AlertTask,
|
||||
previousAlerts: Map<string, AggregatedAlertHistory>,
|
||||
) {
|
||||
const { alerts, conn } = alertTask;
|
||||
logger.info({
|
||||
message: 'Processing alerts in batch',
|
||||
|
|
@ -405,8 +455,16 @@ export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
|
|||
});
|
||||
|
||||
for (const alert of alerts) {
|
||||
const previous = previousAlerts.get(alert.alert.id);
|
||||
await this.task_queue.add(() =>
|
||||
processAlert(now, alert, clickhouseClient, conn.id, this.provider),
|
||||
processAlert(
|
||||
now,
|
||||
alert,
|
||||
clickhouseClient,
|
||||
conn.id,
|
||||
this.provider,
|
||||
previous,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -428,8 +486,15 @@ export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
|
|||
taskCount: alertTasks.length,
|
||||
});
|
||||
|
||||
const alertIds = alertTasks.flatMap(({ alerts }) =>
|
||||
alerts.map(({ alert }) => alert.id),
|
||||
);
|
||||
const previousAlerts = await getPreviousAlertHistories(alertIds, now);
|
||||
|
||||
for (const task of alertTasks) {
|
||||
await this.task_queue.add(() => this.processAlertTask(now, task));
|
||||
await this.task_queue.add(() =>
|
||||
this.processAlertTask(now, task, previousAlerts),
|
||||
);
|
||||
}
|
||||
|
||||
await this.task_queue.onIdle();
|
||||
|
|
|
|||
Loading…
Reference in a new issue