mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
refactor: Decouple alerts processing from Mongo (#1176)
Closes HDX-2281 This PR adds two additional functions to the `AlertProvider` interface, and implements them for the default provider. The intention behind these changes is to eliminate all direct Mongo access from the alert check loop, and instead handle the connection to Mongo through the `AlertProvider` interface. The new functions are as follows: 1. `getWebhooks(teamId): Map<string, IWebhook>`: This function is used to retrieve the webhooks that may be used to send alerts. While it would be nice to just attach the webhook information to the AlertDetails, we have (unfinished?) support for referencing arbitrary wehbooks within message templates, and this pattern better supports that future use-case without having to process message templates while loading alerts. 3. `updateAlertState(AlertHistory)`: This function is used to update the state of an Alert document and save the given AlertHistory document. The AlertDetails and AlertTask interfaces have also been updated to reduce the number of parameters passed to some functions while still shifting Mongo accesses to the AlertProvider: 1. AlertDetails now includes the `previous` AlertHistory value 2. AlertTask now includes the value of `now`
This commit is contained in:
parent
0cf8556d47
commit
825452fe86
10 changed files with 623 additions and 177 deletions
6
.changeset/shiny-pugs-shave.md
Normal file
6
.changeset/shiny-pugs-shave.md
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
"@hyperdx/api": patch
|
||||
"@hyperdx/app": patch
|
||||
---
|
||||
|
||||
refactor: Decouple alerts processing from Mongo
|
||||
|
|
@ -1,6 +1,5 @@
|
|||
import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse/node';
|
||||
import { AlertState } from '@hyperdx/common-utils/dist/types';
|
||||
import { create } from 'lodash';
|
||||
import mongoose from 'mongoose';
|
||||
import ms from 'ms';
|
||||
|
||||
|
|
@ -39,18 +38,6 @@ import {
|
|||
} from '@/tasks/template';
|
||||
import * as slack from '@/utils/slack';
|
||||
|
||||
const MOCK_DASHBOARD = {
|
||||
name: 'Test Dashboard',
|
||||
tiles: [makeTile(), makeTile()],
|
||||
tags: ['test'],
|
||||
};
|
||||
|
||||
const MOCK_SOURCE = {};
|
||||
|
||||
const MOCK_SAVED_SEARCH: any = {
|
||||
id: 'fake-saved-search-id',
|
||||
};
|
||||
|
||||
// Create provider instance for tests
|
||||
let alertProvider: any;
|
||||
|
||||
|
|
@ -337,9 +324,9 @@ describe('checkAlerts', () => {
|
|||
},
|
||||
},
|
||||
title: 'Alert for "My Search" - 10 lines found',
|
||||
team: {
|
||||
id: team._id.toString(),
|
||||
},
|
||||
teamWebhooksById: new Map<string, typeof webhook>([
|
||||
[webhook._id.toString(), webhook],
|
||||
]),
|
||||
});
|
||||
|
||||
expect(slack.postMessageToWebhook).toHaveBeenCalledTimes(2);
|
||||
|
|
@ -352,7 +339,7 @@ describe('checkAlerts', () => {
|
|||
.mockResolvedValueOnce(null as any);
|
||||
|
||||
const team = await createTeam({ name: 'My Team' });
|
||||
await new Webhook({
|
||||
const webhook = await new Webhook({
|
||||
team: team._id,
|
||||
service: 'slack',
|
||||
url: 'https://hooks.slack.com/services/123',
|
||||
|
|
@ -374,9 +361,9 @@ describe('checkAlerts', () => {
|
|||
},
|
||||
},
|
||||
title: 'Alert for "My Search" - 10 lines found',
|
||||
team: {
|
||||
id: team._id.toString(),
|
||||
},
|
||||
teamWebhooksById: new Map<string, typeof webhook>([
|
||||
[webhook._id.toString(), webhook],
|
||||
]),
|
||||
});
|
||||
|
||||
expect(slack.postMessageToWebhook).toHaveBeenNthCalledWith(
|
||||
|
|
@ -412,7 +399,7 @@ describe('checkAlerts', () => {
|
|||
.mockResolvedValueOnce(null as any);
|
||||
|
||||
const team = await createTeam({ name: 'My Team' });
|
||||
await new Webhook({
|
||||
const webhook = await new Webhook({
|
||||
team: team._id,
|
||||
service: 'slack',
|
||||
url: 'https://hooks.slack.com/services/123',
|
||||
|
|
@ -437,9 +424,9 @@ describe('checkAlerts', () => {
|
|||
},
|
||||
},
|
||||
title: 'Alert for "My Search" - 10 lines found',
|
||||
team: {
|
||||
id: team._id.toString(),
|
||||
},
|
||||
teamWebhooksById: new Map<string, typeof webhook>([
|
||||
[webhook._id.toString(), webhook],
|
||||
]),
|
||||
});
|
||||
|
||||
expect(slack.postMessageToWebhook).toHaveBeenNthCalledWith(
|
||||
|
|
@ -473,18 +460,22 @@ describe('checkAlerts', () => {
|
|||
jest.spyOn(slack, 'postMessageToWebhook').mockResolvedValue(null as any);
|
||||
|
||||
const team = await createTeam({ name: 'My Team' });
|
||||
await new Webhook({
|
||||
const myWebhook = await new Webhook({
|
||||
team: team._id,
|
||||
service: 'slack',
|
||||
url: 'https://hooks.slack.com/services/123',
|
||||
name: 'My_Webhook',
|
||||
}).save();
|
||||
await new Webhook({
|
||||
const anotherWebhook = await new Webhook({
|
||||
team: team._id,
|
||||
service: 'slack',
|
||||
url: 'https://hooks.slack.com/services/456',
|
||||
name: 'Another_Webhook',
|
||||
}).save();
|
||||
const teamWebhooksById = new Map<string, typeof anotherWebhook>([
|
||||
[anotherWebhook._id.toString(), anotherWebhook],
|
||||
[myWebhook._id.toString(), myWebhook],
|
||||
]);
|
||||
|
||||
await renderAlertTemplate({
|
||||
alertProvider,
|
||||
|
|
@ -519,9 +510,7 @@ describe('checkAlerts', () => {
|
|||
},
|
||||
},
|
||||
title: 'Alert for "My Search" - 10 lines found',
|
||||
team: {
|
||||
id: team._id.toString(),
|
||||
},
|
||||
teamWebhooksById,
|
||||
});
|
||||
|
||||
// @webhook should not be called
|
||||
|
|
@ -544,9 +533,7 @@ describe('checkAlerts', () => {
|
|||
},
|
||||
},
|
||||
title: 'Alert for "My Search" - 10 lines found',
|
||||
team: {
|
||||
id: team._id.toString(),
|
||||
},
|
||||
teamWebhooksById,
|
||||
});
|
||||
|
||||
expect(slack.postMessageToWebhook).toHaveBeenCalledTimes(2);
|
||||
|
|
@ -673,6 +660,9 @@ describe('checkAlerts', () => {
|
|||
url: 'https://hooks.slack.com/services/123',
|
||||
name: 'My Webhook',
|
||||
}).save();
|
||||
const teamWebhooksById = new Map<string, typeof webhook>([
|
||||
[webhook._id.toString(), webhook],
|
||||
]);
|
||||
const connection = await Connection.create({
|
||||
team: team._id,
|
||||
name: 'Default',
|
||||
|
|
@ -723,13 +713,13 @@ describe('checkAlerts', () => {
|
|||
'savedSearch',
|
||||
]);
|
||||
|
||||
const details: any = {
|
||||
const details = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.SAVED_SEARCH,
|
||||
savedSearch,
|
||||
};
|
||||
previous: undefined,
|
||||
} satisfies AlertDetails;
|
||||
|
||||
const clickhouseClient = new ClickhouseClient({
|
||||
host: connection.host,
|
||||
|
|
@ -762,7 +752,7 @@ describe('checkAlerts', () => {
|
|||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
undefined,
|
||||
teamWebhooksById,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
|
|
@ -774,11 +764,11 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
{ ...details, previous: previousAlertsLater.get(enhancedAlert.id) },
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsLater.get(enhancedAlert.id),
|
||||
teamWebhooksById,
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
|
@ -790,11 +780,14 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
{
|
||||
...details,
|
||||
previous: previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
},
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
teamWebhooksById,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
|
@ -806,11 +799,14 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
await processAlert(
|
||||
nextNextWindow,
|
||||
details,
|
||||
{
|
||||
...details,
|
||||
previous: previousAlertsNextNextWindow.get(enhancedAlert.id),
|
||||
},
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextNextWindow.get(enhancedAlert.id),
|
||||
teamWebhooksById,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
|
@ -906,6 +902,9 @@ describe('checkAlerts', () => {
|
|||
url: 'https://hooks.slack.com/services/123',
|
||||
name: 'My Webhook',
|
||||
}).save();
|
||||
const teamWebhooksById = new Map<string, typeof webhook>([
|
||||
[webhook._id.toString(), webhook],
|
||||
]);
|
||||
const connection = await Connection.create({
|
||||
team: team._id,
|
||||
name: 'Default',
|
||||
|
|
@ -978,14 +977,14 @@ describe('checkAlerts', () => {
|
|||
|
||||
const tile = dashboard.tiles?.find((t: any) => t.id === '17quud');
|
||||
if (!tile) throw new Error('tile not found for dashboard test case');
|
||||
const details: any = {
|
||||
const details = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.TILE,
|
||||
tile,
|
||||
dashboard,
|
||||
};
|
||||
previous: undefined,
|
||||
} satisfies AlertDetails;
|
||||
|
||||
const clickhouseClient = new ClickhouseClient({
|
||||
host: connection.host,
|
||||
|
|
@ -1018,7 +1017,7 @@ describe('checkAlerts', () => {
|
|||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
undefined,
|
||||
teamWebhooksById,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
|
|
@ -1030,11 +1029,14 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
{
|
||||
...details,
|
||||
previous: previousAlertsLater.get(enhancedAlert.id),
|
||||
},
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsLater.get(enhancedAlert.id),
|
||||
teamWebhooksById,
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
|
@ -1046,11 +1048,14 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
{
|
||||
...details,
|
||||
previous: previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
},
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
teamWebhooksById,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
|
@ -1146,6 +1151,10 @@ describe('checkAlerts', () => {
|
|||
}),
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
}).save();
|
||||
const webhooks = await Webhook.find({});
|
||||
const teamWebhooksById = new Map<string, typeof webhook>(
|
||||
webhooks.map(w => [w._id.toString(), w]),
|
||||
);
|
||||
const connection = await Connection.create({
|
||||
team: team._id,
|
||||
name: 'Default',
|
||||
|
|
@ -1219,14 +1228,14 @@ describe('checkAlerts', () => {
|
|||
const tile = dashboard.tiles?.find((t: any) => t.id === '17quud');
|
||||
if (!tile)
|
||||
throw new Error('tile not found for dashboard generic webhook');
|
||||
const details: any = {
|
||||
const details = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.TILE,
|
||||
tile,
|
||||
dashboard,
|
||||
};
|
||||
previous: undefined,
|
||||
} satisfies AlertDetails;
|
||||
|
||||
const clickhouseClient = new ClickhouseClient({
|
||||
host: connection.host,
|
||||
|
|
@ -1259,7 +1268,7 @@ describe('checkAlerts', () => {
|
|||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
undefined,
|
||||
teamWebhooksById,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
|
|
@ -1271,11 +1280,14 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
{
|
||||
...details,
|
||||
previous: previousAlertsLater.get(enhancedAlert.id),
|
||||
},
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsLater.get(enhancedAlert.id),
|
||||
teamWebhooksById,
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
|
@ -1287,11 +1299,14 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
{
|
||||
...details,
|
||||
previous: previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
},
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
teamWebhooksById,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
|
@ -1362,6 +1377,9 @@ describe('checkAlerts', () => {
|
|||
url: 'https://hooks.slack.com/services/123',
|
||||
name: 'My Webhook',
|
||||
}).save();
|
||||
const teamWebhooksById = new Map<string, typeof webhook>([
|
||||
[webhook._id.toString(), webhook],
|
||||
]);
|
||||
const connection = await Connection.create({
|
||||
team: team._id,
|
||||
name: 'Default',
|
||||
|
|
@ -1439,14 +1457,14 @@ describe('checkAlerts', () => {
|
|||
const tile = dashboard.tiles?.find((t: any) => t.id === '17quud');
|
||||
if (!tile)
|
||||
throw new Error('tile not found for dashboard metrics webhook');
|
||||
const details: any = {
|
||||
const details = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.TILE,
|
||||
tile,
|
||||
dashboard,
|
||||
};
|
||||
previous: undefined,
|
||||
} satisfies AlertDetails;
|
||||
|
||||
const clickhouseClient = new ClickhouseClient({
|
||||
host: connection.host,
|
||||
|
|
@ -1480,7 +1498,7 @@ describe('checkAlerts', () => {
|
|||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
undefined,
|
||||
teamWebhooksById,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
|
|
@ -1492,11 +1510,11 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
{ ...details, previous: previousAlertsLater.get(enhancedAlert.id) },
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsLater.get(enhancedAlert.id),
|
||||
teamWebhooksById,
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
|
@ -1508,11 +1526,14 @@ describe('checkAlerts', () => {
|
|||
);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
{
|
||||
...details,
|
||||
previous: previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
},
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlertsNextWindow.get(enhancedAlert.id),
|
||||
teamWebhooksById,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
|
|
|||
323
packages/api/src/tasks/__tests__/checkAlertsTask.test.ts
Normal file
323
packages/api/src/tasks/__tests__/checkAlertsTask.test.ts
Normal file
|
|
@ -0,0 +1,323 @@
|
|||
import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse/node';
|
||||
import mongoose from 'mongoose';
|
||||
|
||||
import * as config from '@/config';
|
||||
import { ObjectId } from '@/models';
|
||||
import { AlertSource, AlertThresholdType, IAlert } from '@/models/alert';
|
||||
import { ISource } from '@/models/source';
|
||||
import { IWebhook } from '@/models/webhook';
|
||||
import CheckAlertTask from '@/tasks/checkAlerts';
|
||||
import * as checkAlerts from '@/tasks/checkAlerts';
|
||||
import {
|
||||
AlertDetails,
|
||||
AlertProvider,
|
||||
AlertTaskType,
|
||||
loadProvider,
|
||||
} from '@/tasks/providers';
|
||||
|
||||
import { CheckAlertsTaskArgs } from '../types';
|
||||
|
||||
jest.mock('@/tasks/providers', () => {
|
||||
return {
|
||||
...jest.requireActual('@/tasks/providers'),
|
||||
isValidProvider: jest.fn().mockReturnValue(true),
|
||||
loadProvider: jest.fn(),
|
||||
};
|
||||
});
|
||||
|
||||
describe('CheckAlertTask', () => {
|
||||
describe('execute', () => {
|
||||
let mockAlertProvider: jest.Mocked<AlertProvider>;
|
||||
let mockProcessAlert: jest.SpyInstance;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
|
||||
mockAlertProvider = {
|
||||
init: jest.fn(),
|
||||
getAlertTasks: jest.fn(),
|
||||
getWebhooks: jest.fn(),
|
||||
updateAlertState: jest.fn(),
|
||||
asyncDispose: jest.fn(),
|
||||
buildChartLink: jest.fn(),
|
||||
buildLogSearchLink: jest.fn(),
|
||||
};
|
||||
|
||||
jest.mocked(loadProvider).mockResolvedValue(mockAlertProvider);
|
||||
|
||||
mockProcessAlert = jest
|
||||
.spyOn(checkAlerts, 'processAlert')
|
||||
.mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it('should throw error for invalid task name', async () => {
|
||||
const args = { taskName: 'invalid-task' } as any;
|
||||
const task = new CheckAlertTask(args);
|
||||
|
||||
await expect(task.execute()).rejects.toThrow(
|
||||
`CheckAlertTask can only handle 'check-alerts' tasks, received: invalid-task`,
|
||||
);
|
||||
});
|
||||
|
||||
it('should execute successfully with no alert tasks', async () => {
|
||||
const args: CheckAlertsTaskArgs = { taskName: 'check-alerts' };
|
||||
const task = new CheckAlertTask(args);
|
||||
|
||||
mockAlertProvider.getAlertTasks.mockResolvedValue([]);
|
||||
mockAlertProvider.getWebhooks.mockResolvedValue(new Map());
|
||||
|
||||
await task.execute();
|
||||
|
||||
const mockLoadProvider = jest.mocked(loadProvider);
|
||||
expect(mockLoadProvider).toHaveBeenCalledWith(undefined);
|
||||
expect(mockAlertProvider.init).toHaveBeenCalled();
|
||||
expect(mockAlertProvider.getAlertTasks).toHaveBeenCalled();
|
||||
expect(mockAlertProvider.getWebhooks).not.toHaveBeenCalled();
|
||||
expect(mockProcessAlert).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should execute successfully with custom provider', async () => {
|
||||
const args: CheckAlertsTaskArgs = {
|
||||
taskName: 'check-alerts',
|
||||
provider: 'custom-provider',
|
||||
};
|
||||
const task = new CheckAlertTask(args);
|
||||
|
||||
mockAlertProvider.getAlertTasks.mockResolvedValue([]);
|
||||
mockAlertProvider.getWebhooks.mockResolvedValue(new Map());
|
||||
|
||||
await task.execute();
|
||||
|
||||
const mockLoadProvider = jest.mocked(loadProvider);
|
||||
expect(mockLoadProvider).toHaveBeenCalledWith('custom-provider');
|
||||
expect(mockAlertProvider.init).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should process alert tasks', async () => {
|
||||
const args: CheckAlertsTaskArgs = { taskName: 'check-alerts' };
|
||||
const task = new CheckAlertTask(args);
|
||||
|
||||
const mockAlert = {
|
||||
id: 'alert-123',
|
||||
team: { _id: new mongoose.Types.ObjectId() },
|
||||
source: AlertSource.SAVED_SEARCH,
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: { type: 'webhook', webhookId: 'webhook-123' },
|
||||
} as IAlert;
|
||||
|
||||
const mockSource = {
|
||||
id: 'source-123',
|
||||
from: { databaseName: 'default', tableName: 'otel_logs' },
|
||||
timestampValueExpression: 'Timestamp',
|
||||
} as ISource;
|
||||
|
||||
const mockAlertTask = {
|
||||
alerts: [
|
||||
{
|
||||
alert: mockAlert,
|
||||
source: mockSource,
|
||||
taskType: AlertTaskType.SAVED_SEARCH,
|
||||
previous: undefined,
|
||||
} as AlertDetails,
|
||||
],
|
||||
conn: {
|
||||
id: 'conn-123',
|
||||
_id: new mongoose.Types.ObjectId(),
|
||||
host: config.CLICKHOUSE_HOST,
|
||||
username: config.CLICKHOUSE_USER,
|
||||
password: config.CLICKHOUSE_PASSWORD,
|
||||
name: '',
|
||||
team: new mongoose.Types.ObjectId(),
|
||||
},
|
||||
now: new Date(),
|
||||
};
|
||||
|
||||
const teamWebhooksById = new Map<string, IWebhook>([
|
||||
[
|
||||
'webhook-123',
|
||||
{
|
||||
_id: 'webhook-123',
|
||||
url: 'http://example.com/webhook',
|
||||
} as unknown as IWebhook,
|
||||
],
|
||||
]);
|
||||
|
||||
mockAlertProvider.getAlertTasks.mockResolvedValue([mockAlertTask]);
|
||||
mockAlertProvider.getWebhooks.mockResolvedValue(teamWebhooksById);
|
||||
|
||||
await task.execute();
|
||||
|
||||
expect(mockAlertProvider.getAlertTasks).toHaveBeenCalled();
|
||||
expect(mockProcessAlert).toHaveBeenCalledWith(
|
||||
mockAlertTask.now,
|
||||
mockAlertTask.alerts[0],
|
||||
expect.any(ClickhouseClient),
|
||||
'conn-123',
|
||||
mockAlertProvider,
|
||||
teamWebhooksById,
|
||||
);
|
||||
|
||||
mockProcessAlert.mockRestore();
|
||||
});
|
||||
|
||||
it("should ensure that the correct team's webhooks are passed to processAlert", async () => {
|
||||
const args: CheckAlertsTaskArgs = { taskName: 'check-alerts' };
|
||||
const task = new CheckAlertTask(args);
|
||||
|
||||
// Create two teams
|
||||
const team1Id = new mongoose.Types.ObjectId();
|
||||
const team2Id = new mongoose.Types.ObjectId();
|
||||
|
||||
const mockAlert1 = {
|
||||
id: 'alert-123',
|
||||
team: { _id: team1Id },
|
||||
source: AlertSource.SAVED_SEARCH,
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: { type: 'webhook', webhookId: 'webhook-team1' },
|
||||
} as IAlert;
|
||||
|
||||
const mockAlert2 = {
|
||||
id: 'alert-456',
|
||||
team: { _id: team2Id },
|
||||
source: AlertSource.SAVED_SEARCH,
|
||||
threshold: 5,
|
||||
thresholdType: AlertThresholdType.BELOW,
|
||||
interval: '1m',
|
||||
channel: { type: 'webhook', webhookId: 'webhook-team2' },
|
||||
} as IAlert;
|
||||
|
||||
const mockSource = {
|
||||
id: 'source-123',
|
||||
from: { databaseName: 'default', tableName: 'otel_logs' },
|
||||
timestampValueExpression: 'Timestamp',
|
||||
} as ISource;
|
||||
|
||||
const mockAlertTask1 = {
|
||||
alerts: [
|
||||
{
|
||||
alert: mockAlert1,
|
||||
source: mockSource,
|
||||
taskType: AlertTaskType.SAVED_SEARCH,
|
||||
previous: undefined,
|
||||
} as AlertDetails,
|
||||
],
|
||||
conn: {
|
||||
id: 'conn-123',
|
||||
_id: new mongoose.Types.ObjectId(),
|
||||
host: config.CLICKHOUSE_HOST,
|
||||
username: config.CLICKHOUSE_USER,
|
||||
password: config.CLICKHOUSE_PASSWORD,
|
||||
name: 'Team1 Connection',
|
||||
team: team1Id,
|
||||
},
|
||||
now: new Date(),
|
||||
};
|
||||
|
||||
const mockAlertTask2 = {
|
||||
alerts: [
|
||||
{
|
||||
alert: mockAlert2,
|
||||
source: mockSource,
|
||||
taskType: AlertTaskType.SAVED_SEARCH,
|
||||
previous: undefined,
|
||||
} as AlertDetails,
|
||||
],
|
||||
conn: {
|
||||
id: 'conn-456',
|
||||
_id: new mongoose.Types.ObjectId(),
|
||||
host: config.CLICKHOUSE_HOST,
|
||||
username: config.CLICKHOUSE_USER,
|
||||
password: config.CLICKHOUSE_PASSWORD,
|
||||
name: 'Team2 Connection',
|
||||
team: team2Id,
|
||||
},
|
||||
now: new Date(),
|
||||
};
|
||||
|
||||
// Create team-specific webhooks
|
||||
const team1WebhooksById = new Map<string, IWebhook>([
|
||||
[
|
||||
'webhook-team1',
|
||||
{
|
||||
_id: 'webhook-team1',
|
||||
name: 'Team1 Webhook',
|
||||
url: 'http://team1.example.com/webhook',
|
||||
team: team1Id,
|
||||
} as unknown as IWebhook,
|
||||
],
|
||||
]);
|
||||
|
||||
const team2WebhooksById = new Map<string, IWebhook>([
|
||||
[
|
||||
'webhook-team2',
|
||||
{
|
||||
_id: 'webhook-team2',
|
||||
name: 'Team2 Webhook',
|
||||
url: 'http://team2.example.com/webhook',
|
||||
team: team2Id,
|
||||
} as unknown as IWebhook,
|
||||
],
|
||||
]);
|
||||
|
||||
mockAlertProvider.getAlertTasks.mockResolvedValue([
|
||||
mockAlertTask1,
|
||||
mockAlertTask2,
|
||||
]);
|
||||
|
||||
// Mock getWebhooks to return team-specific webhooks
|
||||
mockAlertProvider.getWebhooks.mockImplementation(
|
||||
(teamId: string | ObjectId): Promise<Map<string, IWebhook>> => {
|
||||
if (teamId === team1Id.toString()) {
|
||||
return Promise.resolve(team1WebhooksById);
|
||||
} else if (teamId === team2Id.toString()) {
|
||||
return Promise.resolve(team2WebhooksById);
|
||||
}
|
||||
return Promise.resolve(new Map());
|
||||
},
|
||||
);
|
||||
|
||||
await task.execute();
|
||||
|
||||
// Verify processAlert was called twice with correct team-specific webhooks
|
||||
expect(mockProcessAlert).toHaveBeenCalledTimes(2);
|
||||
|
||||
// First call should use team1's webhooks
|
||||
expect(mockProcessAlert).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
mockAlertTask1.now,
|
||||
mockAlertTask1.alerts[0],
|
||||
expect.any(ClickhouseClient),
|
||||
'conn-123',
|
||||
mockAlertProvider,
|
||||
team1WebhooksById,
|
||||
);
|
||||
|
||||
// Second call should use team2's webhooks
|
||||
expect(mockProcessAlert).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
mockAlertTask2.now,
|
||||
mockAlertTask2.alerts[0],
|
||||
expect.any(ClickhouseClient),
|
||||
'conn-456',
|
||||
mockAlertProvider,
|
||||
team2WebhooksById,
|
||||
);
|
||||
|
||||
// Verify getWebhooks was called for each team
|
||||
expect(mockAlertProvider.getWebhooks).toHaveBeenCalledWith(
|
||||
team1Id.toString(),
|
||||
);
|
||||
expect(mockAlertProvider.getWebhooks).toHaveBeenCalledWith(
|
||||
team2Id.toString(),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -14,12 +14,8 @@ import Dashboard from '@/models/dashboard';
|
|||
import { SavedSearch } from '@/models/savedSearch';
|
||||
import { Source } from '@/models/source';
|
||||
import Webhook from '@/models/webhook';
|
||||
import { getPreviousAlertHistories, processAlert } from '@/tasks/checkAlerts';
|
||||
import { AlertTaskType, loadProvider } from '@/tasks/providers';
|
||||
import {
|
||||
AlertMessageTemplateDefaultView,
|
||||
buildAlertMessageTemplateTitle,
|
||||
} from '@/tasks/template';
|
||||
import { processAlert } from '@/tasks/checkAlerts';
|
||||
import { AlertDetails, AlertTaskType, loadProvider } from '@/tasks/providers';
|
||||
import * as slack from '@/utils/slack';
|
||||
|
||||
describe('Single Invocation Alert Test', () => {
|
||||
|
|
@ -193,14 +189,13 @@ describe('Single Invocation Alert Test', () => {
|
|||
username: connection.username,
|
||||
password: connection.password,
|
||||
});
|
||||
const previousAlerts = await getPreviousAlertHistories([alert.id], now);
|
||||
await processAlert(
|
||||
now,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlerts.get(alert.id),
|
||||
new Map([[webhook.id.toString(), webhook]]),
|
||||
);
|
||||
|
||||
// Verify alert state changed to ALERT (from DB)
|
||||
|
|
@ -397,14 +392,14 @@ describe('Single Invocation Alert Test', () => {
|
|||
if (!tile) throw new Error('Second tile not found for multi-tile test');
|
||||
|
||||
// Set up alert processing details (like existing tile tests)
|
||||
const details: any = {
|
||||
const details = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.TILE,
|
||||
tile,
|
||||
dashboard,
|
||||
};
|
||||
previous: undefined,
|
||||
} satisfies AlertDetails;
|
||||
|
||||
const clickhouseClient = new ClickhouseClient({
|
||||
host: connection.host,
|
||||
|
|
@ -431,14 +426,13 @@ describe('Single Invocation Alert Test', () => {
|
|||
}));
|
||||
|
||||
// Process alert - this triggers the webhook with the title
|
||||
const previousAlerts = await getPreviousAlertHistories([alert.id], now);
|
||||
await processAlert(
|
||||
now,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
previousAlerts.get(alert.id),
|
||||
new Map([[webhook.id.toString(), webhook]]),
|
||||
);
|
||||
|
||||
// Get the webhook call to inspect the title
|
||||
|
|
|
|||
|
|
@ -16,11 +16,12 @@ import mongoose from 'mongoose';
|
|||
import ms from 'ms';
|
||||
import { serializeError } from 'serialize-error';
|
||||
|
||||
import Alert, { AlertState, AlertThresholdType, IAlert } from '@/models/alert';
|
||||
import { AlertState, AlertThresholdType, IAlert } from '@/models/alert';
|
||||
import AlertHistory, { IAlertHistory } from '@/models/alertHistory';
|
||||
import { IDashboard } from '@/models/dashboard';
|
||||
import { ISavedSearch } from '@/models/savedSearch';
|
||||
import { ISource } from '@/models/source';
|
||||
import { IWebhook } from '@/models/webhook';
|
||||
import {
|
||||
AlertDetails,
|
||||
AlertProvider,
|
||||
|
|
@ -66,6 +67,7 @@ const fireChannelEvent = async ({
|
|||
startTime,
|
||||
totalCount,
|
||||
windowSizeInMins,
|
||||
teamWebhooksById,
|
||||
}: {
|
||||
alert: IAlert;
|
||||
alertProvider: AlertProvider;
|
||||
|
|
@ -80,6 +82,7 @@ const fireChannelEvent = async ({
|
|||
startTime: Date;
|
||||
totalCount: number;
|
||||
windowSizeInMins: number;
|
||||
teamWebhooksById: Map<string, IWebhook>;
|
||||
}) => {
|
||||
const team = alert.team;
|
||||
if (team == null) {
|
||||
|
|
@ -132,9 +135,7 @@ const fireChannelEvent = async ({
|
|||
}),
|
||||
template: alert.message,
|
||||
view: templateView,
|
||||
team: {
|
||||
id: team._id.toString(),
|
||||
},
|
||||
teamWebhooksById,
|
||||
});
|
||||
};
|
||||
|
||||
|
|
@ -144,9 +145,9 @@ export const processAlert = async (
|
|||
clickhouseClient: ClickhouseClient,
|
||||
connectionId: string,
|
||||
alertProvider: AlertProvider,
|
||||
previous: AggregatedAlertHistory | undefined,
|
||||
teamWebhooksById: Map<string, IWebhook>,
|
||||
) => {
|
||||
const { alert, source } = details;
|
||||
const { alert, source, previous } = details;
|
||||
try {
|
||||
const windowSizeInMins = ms(alert.interval) / 60000;
|
||||
const nowInMinsRoundDown = roundDownToXMinutes(windowSizeInMins)(now);
|
||||
|
|
@ -247,12 +248,13 @@ export const processAlert = async (
|
|||
});
|
||||
|
||||
// TODO: support INSUFFICIENT_DATA state
|
||||
let alertState = AlertState.OK;
|
||||
const history = await new AlertHistory({
|
||||
alert: alert.id,
|
||||
const history: IAlertHistory = {
|
||||
alert: new mongoose.Types.ObjectId(alert.id),
|
||||
createdAt: nowInMinsRoundDown,
|
||||
state: alertState,
|
||||
}).save();
|
||||
state: AlertState.OK,
|
||||
counts: 0,
|
||||
lastValues: [],
|
||||
};
|
||||
|
||||
if (checksData?.data && checksData?.data.length > 0) {
|
||||
// attach JS type
|
||||
|
|
@ -307,7 +309,7 @@ export const processAlert = async (
|
|||
}
|
||||
const bucketStart = new Date(checkData[timestampColumnName]);
|
||||
if (doesExceedThreshold(alert.thresholdType, alert.threshold, _value)) {
|
||||
alertState = AlertState.ALERT;
|
||||
history.state = AlertState.ALERT;
|
||||
logger.info({
|
||||
message: `Triggering ${alert.channel.type} alarm!`,
|
||||
alertId: alert.id,
|
||||
|
|
@ -334,6 +336,7 @@ export const processAlert = async (
|
|||
startTime: bucketStart,
|
||||
totalCount: _value,
|
||||
windowSizeInMins,
|
||||
teamWebhooksById,
|
||||
});
|
||||
} catch (e) {
|
||||
logger.error({
|
||||
|
|
@ -347,12 +350,9 @@ export const processAlert = async (
|
|||
}
|
||||
history.lastValues.push({ count: _value, startTime: bucketStart });
|
||||
}
|
||||
|
||||
history.state = alertState;
|
||||
await history.save();
|
||||
}
|
||||
|
||||
await Alert.updateOne({ _id: alert.id }, { $set: { state: alertState } });
|
||||
await alertProvider.updateAlertState(history);
|
||||
} catch (e) {
|
||||
// Uncomment this for better error messages locally
|
||||
// console.error(e);
|
||||
|
|
@ -367,7 +367,7 @@ export const processAlert = async (
|
|||
// Re-export handleSendGenericWebhook for testing
|
||||
export { handleSendGenericWebhook };
|
||||
|
||||
interface AggregatedAlertHistory {
|
||||
export interface AggregatedAlertHistory {
|
||||
_id: ObjectId;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
|
@ -429,9 +429,8 @@ export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
|
|||
}
|
||||
|
||||
async processAlertTask(
|
||||
now: Date,
|
||||
alertTask: AlertTask,
|
||||
previousAlerts: Map<string, AggregatedAlertHistory>,
|
||||
teamWebhooksById: Map<string, IWebhook>,
|
||||
) {
|
||||
const { alerts, conn } = alertTask;
|
||||
logger.info({
|
||||
|
|
@ -455,15 +454,14 @@ export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
|
|||
});
|
||||
|
||||
for (const alert of alerts) {
|
||||
const previous = previousAlerts.get(alert.alert.id);
|
||||
await this.task_queue.add(() =>
|
||||
processAlert(
|
||||
now,
|
||||
alertTask.now,
|
||||
alert,
|
||||
clickhouseClient,
|
||||
conn.id,
|
||||
this.provider,
|
||||
previous,
|
||||
teamWebhooksById,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
|
@ -479,21 +477,24 @@ export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
|
|||
this.provider = await loadProvider(this.args.provider);
|
||||
await this.provider.init();
|
||||
|
||||
const now = new Date();
|
||||
const alertTasks = await this.provider.getAlertTasks();
|
||||
logger.info({
|
||||
message: 'Fetched alert tasks to process',
|
||||
taskCount: alertTasks.length,
|
||||
});
|
||||
|
||||
const alertIds = alertTasks.flatMap(({ alerts }) =>
|
||||
alerts.map(({ alert }) => alert.id),
|
||||
);
|
||||
const previousAlerts = await getPreviousAlertHistories(alertIds, now);
|
||||
const teams = new Set(alertTasks.map(t => t.conn.team.toString()));
|
||||
const teamToWebhooks = new Map<string, Map<string, IWebhook>>();
|
||||
for (const teamId of teams) {
|
||||
const teamWebhooksById = await this.provider.getWebhooks(teamId);
|
||||
teamToWebhooks.set(teamId, teamWebhooksById);
|
||||
}
|
||||
|
||||
for (const task of alertTasks) {
|
||||
const teamWebhooksById =
|
||||
teamToWebhooks.get(task.conn.team.toString()) ?? new Map();
|
||||
await this.task_queue.add(() =>
|
||||
this.processAlertTask(now, task, previousAlerts),
|
||||
this.processAlertTask(task, teamWebhooksById),
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ describe('isValidProvider', () => {
|
|||
getAlertTasks: async () => [],
|
||||
buildLogSearchLink: () => 'http://example.com/search',
|
||||
buildChartLink: () => 'http://example.com/chart',
|
||||
updateAlertState: () => Promise.resolve(),
|
||||
getWebhooks: () => Promise.resolve(new Map()),
|
||||
};
|
||||
|
||||
expect(isValidProvider(validProvider)).toBe(true);
|
||||
|
|
@ -32,6 +34,8 @@ describe('isValidProvider', () => {
|
|||
getAlertTasks: async () => [],
|
||||
buildLogSearchLink: () => 'http://example.com/search',
|
||||
buildChartLink: () => 'http://example.com/chart',
|
||||
updateAlertState: () => Promise.resolve(),
|
||||
getWebhooks: () => Promise.resolve(new Map()),
|
||||
};
|
||||
|
||||
expect(isValidProvider(invalidProvider)).toBe(false);
|
||||
|
|
@ -43,6 +47,8 @@ describe('isValidProvider', () => {
|
|||
getAlertTasks: async () => [],
|
||||
buildLogSearchLink: () => 'http://example.com/search',
|
||||
buildChartLink: () => 'http://example.com/chart',
|
||||
updateAlertState: () => Promise.resolve(),
|
||||
getWebhooks: () => Promise.resolve(new Map()),
|
||||
};
|
||||
|
||||
expect(isValidProvider(invalidProvider)).toBe(false);
|
||||
|
|
@ -54,6 +60,8 @@ describe('isValidProvider', () => {
|
|||
asyncDispose: async () => {},
|
||||
buildLogSearchLink: () => 'http://example.com/search',
|
||||
buildChartLink: () => 'http://example.com/chart',
|
||||
updateAlertState: () => Promise.resolve(),
|
||||
getWebhooks: () => Promise.resolve(new Map()),
|
||||
};
|
||||
|
||||
expect(isValidProvider(invalidProvider)).toBe(false);
|
||||
|
|
@ -65,6 +73,8 @@ describe('isValidProvider', () => {
|
|||
asyncDispose: async () => {},
|
||||
getAlertTasks: async () => [],
|
||||
buildChartLink: () => 'http://example.com/chart',
|
||||
updateAlertState: () => Promise.resolve(),
|
||||
getWebhooks: () => Promise.resolve(new Map()),
|
||||
};
|
||||
|
||||
expect(isValidProvider(invalidProvider)).toBe(false);
|
||||
|
|
@ -76,6 +86,8 @@ describe('isValidProvider', () => {
|
|||
asyncDispose: async () => {},
|
||||
getAlertTasks: async () => [],
|
||||
buildLogSearchLink: () => 'http://example.com/search',
|
||||
updateAlertState: () => Promise.resolve(),
|
||||
getWebhooks: () => Promise.resolve(new Map()),
|
||||
};
|
||||
|
||||
expect(isValidProvider(invalidProvider)).toBe(false);
|
||||
|
|
@ -88,6 +100,8 @@ describe('isValidProvider', () => {
|
|||
getAlertTasks: async () => [],
|
||||
buildLogSearchLink: () => 'http://example.com/search',
|
||||
buildChartLink: () => 'http://example.com/chart',
|
||||
updateAlertState: () => Promise.resolve(),
|
||||
getWebhooks: () => Promise.resolve(new Map()),
|
||||
};
|
||||
|
||||
expect(isValidProvider(invalidProvider)).toBe(false);
|
||||
|
|
@ -100,6 +114,8 @@ describe('isValidProvider', () => {
|
|||
getAlertTasks: null,
|
||||
buildLogSearchLink: () => 'http://example.com/search',
|
||||
buildChartLink: () => 'http://example.com/chart',
|
||||
updateAlertState: () => Promise.resolve(),
|
||||
getWebhooks: () => Promise.resolve(new Map()),
|
||||
};
|
||||
|
||||
expect(isValidProvider(invalidProvider)).toBe(false);
|
||||
|
|
@ -112,6 +128,8 @@ describe('isValidProvider', () => {
|
|||
getAlertTasks: async () => [],
|
||||
buildLogSearchLink: () => 'http://example.com/search',
|
||||
buildChartLink: () => 'http://example.com/chart',
|
||||
updateAlertState: () => Promise.resolve(),
|
||||
getWebhooks: () => Promise.resolve(new Map()),
|
||||
extraProperty: 'should not affect validation',
|
||||
anotherMethod: () => {},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -5,12 +5,14 @@ import { URLSearchParams } from 'url';
|
|||
|
||||
import * as config from '@/config';
|
||||
import { LOCAL_APP_TEAM } from '@/controllers/team';
|
||||
import { connectDB, mongooseConnection } from '@/models';
|
||||
import { connectDB, mongooseConnection, ObjectId } from '@/models';
|
||||
import Alert, { AlertSource, type IAlert } from '@/models/alert';
|
||||
import AlertHistory, { IAlertHistory } from '@/models/alertHistory';
|
||||
import Connection, { IConnection } from '@/models/connection';
|
||||
import Dashboard from '@/models/dashboard';
|
||||
import { type ISavedSearch, SavedSearch } from '@/models/savedSearch';
|
||||
import { type ISource, Source } from '@/models/source';
|
||||
import Webhook, { IWebhook } from '@/models/webhook';
|
||||
import {
|
||||
type AlertDetails,
|
||||
type AlertProvider,
|
||||
|
|
@ -20,9 +22,17 @@ import {
|
|||
import { convertMsToGranularityString } from '@/utils/common';
|
||||
import logger from '@/utils/logger';
|
||||
|
||||
import {
|
||||
AggregatedAlertHistory,
|
||||
getPreviousAlertHistories,
|
||||
} from '../checkAlerts';
|
||||
import { MappedOmit } from '../types';
|
||||
|
||||
type PartialAlertDetails = MappedOmit<AlertDetails, 'previous'>;
|
||||
|
||||
async function getSavedSearchDetails(
|
||||
alert: IAlert,
|
||||
): Promise<[IConnection, AlertDetails] | []> {
|
||||
): Promise<[IConnection, PartialAlertDetails] | []> {
|
||||
const savedSearchId = alert.savedSearch;
|
||||
const savedSearch = await SavedSearch.findOne({
|
||||
_id: savedSearchId,
|
||||
|
|
@ -70,7 +80,7 @@ async function getSavedSearchDetails(
|
|||
|
||||
async function getTileDetails(
|
||||
alert: IAlert,
|
||||
): Promise<[IConnection, AlertDetails] | []> {
|
||||
): Promise<[IConnection, PartialAlertDetails] | []> {
|
||||
const dashboardId = alert.dashboard;
|
||||
const tileId = alert.tileId;
|
||||
|
||||
|
|
@ -142,7 +152,12 @@ async function getTileDetails(
|
|||
];
|
||||
}
|
||||
|
||||
async function loadAlert(alert: IAlert, groupedTasks: Map<string, AlertTask>) {
|
||||
async function loadAlert(
|
||||
alert: IAlert,
|
||||
groupedTasks: Map<string, AlertTask>,
|
||||
previousAlerts: Map<string, AggregatedAlertHistory>,
|
||||
now: Date,
|
||||
) {
|
||||
if (!alert.source) {
|
||||
throw new Error('alert does not have a source');
|
||||
}
|
||||
|
|
@ -154,7 +169,7 @@ async function loadAlert(alert: IAlert, groupedTasks: Map<string, AlertTask>) {
|
|||
}
|
||||
|
||||
let conn: IConnection | undefined;
|
||||
let details: AlertDetails | undefined;
|
||||
let details: PartialAlertDetails | undefined;
|
||||
switch (alert.source) {
|
||||
case AlertSource.SAVED_SEARCH:
|
||||
[conn, details] = await getSavedSearchDetails(alert);
|
||||
|
|
@ -176,14 +191,16 @@ async function loadAlert(alert: IAlert, groupedTasks: Map<string, AlertTask>) {
|
|||
throw new Error('failed to fetch alert connection');
|
||||
}
|
||||
|
||||
const previous = previousAlerts.get(alert.id);
|
||||
|
||||
if (!groupedTasks.has(conn.id)) {
|
||||
groupedTasks.set(conn.id, { alerts: [], conn });
|
||||
groupedTasks.set(conn.id, { alerts: [], conn, now });
|
||||
}
|
||||
const v = groupedTasks.get(conn.id);
|
||||
if (!v) {
|
||||
throw new Error(`provider did not set key ${conn.id} before appending`);
|
||||
}
|
||||
v.alerts.push(details);
|
||||
v.alerts.push({ ...details, previous });
|
||||
}
|
||||
|
||||
export default class DefaultAlertProvider implements AlertProvider {
|
||||
|
|
@ -198,9 +215,14 @@ export default class DefaultAlertProvider implements AlertProvider {
|
|||
async getAlertTasks(): Promise<AlertTask[]> {
|
||||
const groupedTasks = new Map<string, AlertTask>();
|
||||
const alerts = await Alert.find({});
|
||||
|
||||
const now = new Date();
|
||||
const alertIds = alerts.map(({ id }) => id);
|
||||
const previousAlerts = await getPreviousAlertHistories(alertIds, now);
|
||||
|
||||
for (const alert of alerts) {
|
||||
try {
|
||||
await loadAlert(alert, groupedTasks);
|
||||
await loadAlert(alert, groupedTasks, previousAlerts, now);
|
||||
} catch (e) {
|
||||
logger.error({
|
||||
message: `failed to load alert: ${e}`,
|
||||
|
|
@ -257,4 +279,19 @@ export default class DefaultAlertProvider implements AlertProvider {
|
|||
url.search = queryParams.toString();
|
||||
return url.toString();
|
||||
}
|
||||
|
||||
async updateAlertState(alertHistory: IAlertHistory) {
|
||||
const { alert: alertId, state } = alertHistory;
|
||||
await Promise.all([
|
||||
Alert.updateOne({ _id: alertId }, { $set: { state } }),
|
||||
AlertHistory.create(alertHistory),
|
||||
]);
|
||||
}
|
||||
|
||||
async getWebhooks(teamId: string | ObjectId) {
|
||||
const webhooks = await Webhook.find({
|
||||
team: new mongoose.Types.ObjectId(teamId),
|
||||
});
|
||||
return new Map<string, IWebhook>(webhooks.map(w => [w.id, w]));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,17 +1,25 @@
|
|||
import { Tile } from '@hyperdx/common-utils/dist/types';
|
||||
|
||||
import { ObjectId } from '@/models';
|
||||
import { IAlert } from '@/models/alert';
|
||||
import { IAlertHistory } from '@/models/alertHistory';
|
||||
import { IConnection } from '@/models/connection';
|
||||
import { IDashboard } from '@/models/dashboard';
|
||||
import { ISavedSearch } from '@/models/savedSearch';
|
||||
import { ISource } from '@/models/source';
|
||||
import { IWebhook } from '@/models/webhook';
|
||||
import DefaultAlertProvider from '@/tasks/providers/default';
|
||||
|
||||
import { AggregatedAlertHistory } from '../checkAlerts';
|
||||
|
||||
export enum AlertTaskType {
|
||||
SAVED_SEARCH,
|
||||
TILE,
|
||||
}
|
||||
|
||||
// Discriminated union of possible alert channel types with populated channel data
|
||||
export type PopulatedAlertChannel = { type: 'webhook' } & { channel: IWebhook };
|
||||
|
||||
// Details about the alert and the source for the alert. Depending on
|
||||
// the taskType either:
|
||||
// 1. the savedSearch field is required or
|
||||
|
|
@ -22,6 +30,7 @@ export enum AlertTaskType {
|
|||
export type AlertDetails = {
|
||||
alert: IAlert;
|
||||
source: ISource;
|
||||
previous: AggregatedAlertHistory | undefined;
|
||||
} & (
|
||||
| {
|
||||
taskType: AlertTaskType.SAVED_SEARCH;
|
||||
|
|
@ -39,6 +48,7 @@ export type AlertDetails = {
|
|||
export type AlertTask<T = never> = {
|
||||
alerts: AlertDetails[];
|
||||
conn: IConnection;
|
||||
now: Date;
|
||||
} & ([T] extends [never] ? unknown : { metadata: T });
|
||||
|
||||
export interface AlertProvider {
|
||||
|
|
@ -60,6 +70,12 @@ export interface AlertProvider {
|
|||
granularity: string;
|
||||
startTime: Date;
|
||||
}): string;
|
||||
|
||||
/** Save the given AlertHistory and update the associated alert's state */
|
||||
updateAlertState(alertHistory: IAlertHistory): Promise<void>;
|
||||
|
||||
/** Fetch all webhooks for the given team, returning a map of webhook ID to webhook */
|
||||
getWebhooks(teamId: string | ObjectId): Promise<Map<string, IWebhook>>;
|
||||
}
|
||||
|
||||
export function isValidProvider(obj: any): obj is AlertProvider {
|
||||
|
|
@ -69,7 +85,9 @@ export function isValidProvider(obj: any): obj is AlertProvider {
|
|||
typeof obj.asyncDispose === 'function' &&
|
||||
typeof obj.getAlertTasks === 'function' &&
|
||||
typeof obj.buildLogSearchLink === 'function' &&
|
||||
typeof obj.buildChartLink === 'function'
|
||||
typeof obj.buildChartLink === 'function' &&
|
||||
typeof obj.updateAlertState === 'function' &&
|
||||
typeof obj.getWebhooks === 'function'
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,20 +1,20 @@
|
|||
import * as clickhouse from '@hyperdx/common-utils/dist/clickhouse';
|
||||
import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse/node';
|
||||
import { Metadata } from '@hyperdx/common-utils/dist/metadata';
|
||||
import { renderChartConfig } from '@hyperdx/common-utils/dist/renderChartConfig';
|
||||
import {
|
||||
AlertChannelType,
|
||||
ChartConfigWithOptDateRange,
|
||||
DisplayType,
|
||||
WebhookService,
|
||||
zAlertChannelType,
|
||||
} from '@hyperdx/common-utils/dist/types';
|
||||
import { _useTry, formatDate } from '@hyperdx/common-utils/dist/utils';
|
||||
import { isValidSlackUrl } from '@hyperdx/common-utils/dist/validation';
|
||||
import Handlebars, { HelperOptions } from 'handlebars';
|
||||
import _ from 'lodash';
|
||||
import { escapeRegExp } from 'lodash';
|
||||
import mongoose from 'mongoose';
|
||||
import PromisedHandlebars from 'promised-handlebars';
|
||||
import { serializeError } from 'serialize-error';
|
||||
import { z } from 'zod';
|
||||
|
||||
import * as config from '@/config';
|
||||
import { AlertInput } from '@/controllers/alerts';
|
||||
|
|
@ -23,9 +23,8 @@ import { IDashboard } from '@/models/dashboard';
|
|||
import { ISavedSearch } from '@/models/savedSearch';
|
||||
import { ISource } from '@/models/source';
|
||||
import { IWebhook } from '@/models/webhook';
|
||||
import Webhook from '@/models/webhook';
|
||||
import { doesExceedThreshold } from '@/tasks/checkAlerts';
|
||||
import { AlertProvider } from '@/tasks/providers';
|
||||
import { AlertProvider, PopulatedAlertChannel } from '@/tasks/providers';
|
||||
import { escapeJsonString, unflattenObject } from '@/tasks/util';
|
||||
import { truncateString } from '@/utils/common';
|
||||
import logger from '@/utils/logger';
|
||||
|
|
@ -35,6 +34,13 @@ const MAX_MESSAGE_LENGTH = 500;
|
|||
const NOTIFY_FN_NAME = '__hdx_notify_channel__';
|
||||
const IS_MATCH_FN_NAME = 'is_match';
|
||||
|
||||
const zNotifyFnParams = z.object({
|
||||
hash: z.object({
|
||||
channel: zAlertChannelType,
|
||||
id: z.string(),
|
||||
}),
|
||||
});
|
||||
|
||||
// should match the external alert schema
|
||||
export type AlertMessageTemplateDefaultView = {
|
||||
alert: AlertInput;
|
||||
|
|
@ -49,45 +55,31 @@ export type AlertMessageTemplateDefaultView = {
|
|||
value: number;
|
||||
};
|
||||
|
||||
interface Message {
|
||||
hdxLink: string;
|
||||
title: string;
|
||||
body: string;
|
||||
}
|
||||
|
||||
export const notifyChannel = async ({
|
||||
channel,
|
||||
id,
|
||||
message,
|
||||
team,
|
||||
}: {
|
||||
channel: AlertMessageTemplateDefaultView['alert']['channel']['type'];
|
||||
id: string;
|
||||
message: {
|
||||
hdxLink: string;
|
||||
title: string;
|
||||
body: string;
|
||||
};
|
||||
team: {
|
||||
id: string;
|
||||
};
|
||||
channel: PopulatedAlertChannel;
|
||||
message: Message;
|
||||
}) => {
|
||||
switch (channel) {
|
||||
switch (channel.type) {
|
||||
case 'webhook': {
|
||||
const webhook = await Webhook.findOne({
|
||||
team: team.id,
|
||||
...(mongoose.isValidObjectId(id)
|
||||
? { _id: id }
|
||||
: {
|
||||
name: {
|
||||
$regex: new RegExp(`^${escapeRegExp(id)}`), // FIXME: a hacky way to match the prefix
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
if (webhook?.service === WebhookService.Slack) {
|
||||
const webhook = channel.channel;
|
||||
if (webhook.service === WebhookService.Slack) {
|
||||
await handleSendSlackWebhook(webhook, message);
|
||||
} else if (webhook?.service === 'generic') {
|
||||
} else if (webhook.service === 'generic') {
|
||||
await handleSendGenericWebhook(webhook, message);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new Error(`Unsupported channel type: ${channel}`);
|
||||
throw new Error(`Unsupported channel type: ${channel.type}`);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -148,11 +140,7 @@ function validateWebhookUrl(
|
|||
|
||||
export const handleSendSlackWebhook = async (
|
||||
webhook: IWebhook,
|
||||
message: {
|
||||
hdxLink: string;
|
||||
title: string;
|
||||
body: string;
|
||||
},
|
||||
message: Message,
|
||||
) => {
|
||||
validateWebhookUrl(webhook);
|
||||
|
||||
|
|
@ -172,11 +160,7 @@ export const handleSendSlackWebhook = async (
|
|||
|
||||
export const handleSendGenericWebhook = async (
|
||||
webhook: IWebhook,
|
||||
message: {
|
||||
hdxLink: string;
|
||||
title: string;
|
||||
body: string;
|
||||
},
|
||||
message: Message,
|
||||
) => {
|
||||
validateWebhookUrl(webhook);
|
||||
|
||||
|
|
@ -342,6 +326,41 @@ export const translateExternalActionsToInternal = (template: string) => {
|
|||
});
|
||||
};
|
||||
|
||||
const findWebhookByName = (
|
||||
channelIdOrNamePrefix: string,
|
||||
teamWebhooksById: Map<string, IWebhook>,
|
||||
) => {
|
||||
return [...teamWebhooksById.values()].find(w =>
|
||||
w.name.startsWith(channelIdOrNamePrefix),
|
||||
);
|
||||
};
|
||||
|
||||
const getPopulatedChannel = (
|
||||
channelType: AlertChannelType,
|
||||
channelIdOrNamePrefix: string,
|
||||
teamWebhooksById: Map<string, IWebhook>,
|
||||
): PopulatedAlertChannel | undefined => {
|
||||
switch (channelType) {
|
||||
case 'webhook': {
|
||||
const webhook =
|
||||
teamWebhooksById.get(channelIdOrNamePrefix) ??
|
||||
findWebhookByName(channelIdOrNamePrefix, teamWebhooksById);
|
||||
|
||||
if (!webhook) {
|
||||
logger.error('webhook not found', {
|
||||
webhookId: channelIdOrNamePrefix,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
return { type: 'webhook', channel: webhook };
|
||||
}
|
||||
default: {
|
||||
logger.error(`unsupported alert channel type: ${channelType}`);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// this method will build the body of the alert message and will be used to send the alert to the channel
|
||||
export const renderAlertTemplate = async ({
|
||||
alertProvider,
|
||||
|
|
@ -350,7 +369,7 @@ export const renderAlertTemplate = async ({
|
|||
template,
|
||||
title,
|
||||
view,
|
||||
team,
|
||||
teamWebhooksById,
|
||||
}: {
|
||||
alertProvider: AlertProvider;
|
||||
clickhouseClient: ClickhouseClient;
|
||||
|
|
@ -358,9 +377,7 @@ export const renderAlertTemplate = async ({
|
|||
template?: string | null;
|
||||
title: string;
|
||||
view: AlertMessageTemplateDefaultView;
|
||||
team: {
|
||||
id: string;
|
||||
};
|
||||
teamWebhooksById: Map<string, IWebhook>;
|
||||
}) => {
|
||||
const {
|
||||
alert,
|
||||
|
|
@ -403,30 +420,36 @@ export const renderAlertTemplate = async ({
|
|||
const registerHelpers = (rawTemplateBody: string) => {
|
||||
hb.registerHelper(IS_MATCH_FN_NAME, isMatchFn(false));
|
||||
|
||||
hb.registerHelper(
|
||||
NOTIFY_FN_NAME,
|
||||
async (options: { hash: Record<string, string> }) => {
|
||||
const { channel, id } = options.hash;
|
||||
if (channel !== 'webhook') {
|
||||
throw new Error(`Unsupported channel type: ${channel}`);
|
||||
}
|
||||
// render id template
|
||||
const renderedId = _hb.compile(id)(view);
|
||||
// render body template
|
||||
const renderedBody = _hb.compile(rawTemplateBody)(view);
|
||||
// Register a custom helper which sends notifications to the specified channel
|
||||
// Usage: {{NOTIFY_FN_NAME channel="webhook" id="1234_5678"}}
|
||||
hb.registerHelper(NOTIFY_FN_NAME, async (options: unknown) => {
|
||||
const { hash } = zNotifyFnParams.parse(options);
|
||||
const { channel: channelType, id: idTemplate } = hash;
|
||||
|
||||
// The id field can also be a template itself, e.g. id="{{attributes.webhookId}}", so it must be compiled and rendered
|
||||
// The id might also be the prefix of the webhook name.
|
||||
const renderedIdOrNamePrefix = _hb.compile(idTemplate)(view);
|
||||
|
||||
// render body template
|
||||
const renderedBody = _hb.compile(rawTemplateBody)(view);
|
||||
|
||||
const channel = getPopulatedChannel(
|
||||
channelType,
|
||||
renderedIdOrNamePrefix,
|
||||
teamWebhooksById,
|
||||
);
|
||||
|
||||
if (channel) {
|
||||
await notifyChannel({
|
||||
channel,
|
||||
id: renderedId,
|
||||
message: {
|
||||
hdxLink: buildAlertMessageTemplateHdxLink(alertProvider, view),
|
||||
title,
|
||||
body: renderedBody,
|
||||
},
|
||||
team,
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
const timeRangeMessage = `Time Range (UTC): [${formatDate(view.startTime, {
|
||||
|
|
|
|||
|
|
@ -105,3 +105,8 @@ export interface HdxTask<T extends TaskArgs> {
|
|||
|
||||
name(): string;
|
||||
}
|
||||
|
||||
// Utility type to omit keys K from type T, similar to Omit<T, K> but works on discriminated unions
|
||||
export type MappedOmit<T, K extends keyof T> = {
|
||||
[P in keyof T as P extends K ? never : P]: T[P];
|
||||
};
|
||||
|
|
|
|||
Loading…
Reference in a new issue