mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
feat: group tasks by connection (#1071)
Group the alert tasks fetched from mongo into groups based on what needs to run from the same connection. This sets up for further optimizations and connection reuse.
This commit is contained in:
parent
adb05ac723
commit
c2160536ac
11 changed files with 1177 additions and 191 deletions
5
.changeset/ninety-books-hear.md
Normal file
5
.changeset/ninety-books-hear.md
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
"@hyperdx/api": patch
|
||||
---
|
||||
|
||||
Changes the order of alert evaluation to group queries by the connection on the alert.
|
||||
|
|
@ -18,7 +18,7 @@ export function getConnectionById(
|
|||
|
||||
export function createConnection(
|
||||
team: string,
|
||||
connection: Omit<IConnection, '_id'>,
|
||||
connection: Omit<IConnection, 'id' | '_id'>,
|
||||
) {
|
||||
return Connection.create({ ...connection, team });
|
||||
}
|
||||
|
|
@ -26,7 +26,7 @@ export function createConnection(
|
|||
export function updateConnection(
|
||||
team: string,
|
||||
connectionId: string,
|
||||
connection: Omit<IConnection, '_id'>,
|
||||
connection: Omit<IConnection, 'id' | '_id'>,
|
||||
) {
|
||||
return Connection.findOneAndUpdate({ _id: connectionId, team }, connection, {
|
||||
new: true,
|
||||
|
|
|
|||
|
|
@ -21,8 +21,7 @@ export async function getSavedSearches(teamId: string) {
|
|||
return savedSearches.map(savedSearch => ({
|
||||
...savedSearch.toJSON(),
|
||||
alerts: alertsBySavedSearchId[savedSearch._id.toString()]?.map(alert => {
|
||||
const { _id, ...restAlert } = alert.toJSON();
|
||||
return { id: _id, ...restAlert };
|
||||
return alert.toJSON();
|
||||
}),
|
||||
}));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ export enum AlertSource {
|
|||
}
|
||||
|
||||
export interface IAlert {
|
||||
_id: ObjectId;
|
||||
id: string;
|
||||
channel: AlertChannel;
|
||||
interval: AlertInterval;
|
||||
source?: AlertSource;
|
||||
|
|
@ -162,6 +162,7 @@ const AlertSchema = new Schema<IAlert>(
|
|||
},
|
||||
{
|
||||
timestamps: true,
|
||||
toJSON: { virtuals: true },
|
||||
},
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ type ObjectId = mongoose.Types.ObjectId;
|
|||
|
||||
export interface IConnection {
|
||||
_id: ObjectId;
|
||||
id: string;
|
||||
host: string;
|
||||
name: string;
|
||||
password: string;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import * as clickhouse from '@hyperdx/common-utils/dist/clickhouse';
|
||||
import mongoose from 'mongoose';
|
||||
import ms from 'ms';
|
||||
|
||||
|
|
@ -21,7 +22,7 @@ import { Source } from '@/models/source';
|
|||
import Webhook from '@/models/webhook';
|
||||
import * as checkAlert from '@/tasks/checkAlerts';
|
||||
import { doesExceedThreshold, processAlert } from '@/tasks/checkAlerts';
|
||||
import { loadProvider } from '@/tasks/providers';
|
||||
import { AlertDetails, AlertTaskType, loadProvider } from '@/tasks/providers';
|
||||
import {
|
||||
AlertMessageTemplateDefaultView,
|
||||
buildAlertMessageTemplateHdxLink,
|
||||
|
|
@ -709,34 +710,90 @@ describe('checkAlerts', () => {
|
|||
mockUserId,
|
||||
);
|
||||
|
||||
const enhancedAlert: any = await Alert.findById(alert._id).populate([
|
||||
const enhancedAlert: any = await Alert.findById(alert.id).populate([
|
||||
'team',
|
||||
'savedSearch',
|
||||
]);
|
||||
|
||||
const details: any = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.SAVED_SEARCH,
|
||||
savedSearch,
|
||||
};
|
||||
|
||||
const clickhouseClient = new clickhouse.ClickhouseClient({
|
||||
host: connection.host,
|
||||
username: connection.username,
|
||||
password: connection.password,
|
||||
});
|
||||
|
||||
const mockMetadata = {
|
||||
getColumn: jest.fn().mockImplementation(({ column }) => {
|
||||
const columnMap = {
|
||||
Body: { name: 'Body', type: 'String' },
|
||||
Timestamp: { name: 'Timestamp', type: 'DateTime' },
|
||||
SeverityText: { name: 'SeverityText', type: 'String' },
|
||||
ServiceName: { name: 'ServiceName', type: 'String' },
|
||||
};
|
||||
return Promise.resolve(columnMap[column]);
|
||||
}),
|
||||
};
|
||||
|
||||
// Mock the getMetadata function
|
||||
jest.mock('@hyperdx/common-utils/dist/metadata', () => ({
|
||||
...jest.requireActual('@hyperdx/common-utils/dist/metadata'),
|
||||
getMetadata: jest.fn().mockReturnValue(mockMetadata),
|
||||
}));
|
||||
|
||||
// should fetch 5m of logs
|
||||
await processAlert(now, enhancedAlert, alertProvider);
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
await processAlert(
|
||||
now,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
// skip since time diff is less than 1 window size
|
||||
const later = new Date('2023-11-16T22:14:00.000Z');
|
||||
await processAlert(later, enhancedAlert, alertProvider);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
|
||||
await processAlert(nextWindow, enhancedAlert, alertProvider);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextNextWindow = new Date('2023-11-16T22:20:00.000Z');
|
||||
await processAlert(nextNextWindow, enhancedAlert, alertProvider);
|
||||
await processAlert(
|
||||
nextNextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect(enhancedAlert.state).toBe('OK');
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
||||
// check alert history
|
||||
const alertHistories = await AlertHistory.find({
|
||||
alert: alert._id,
|
||||
alert: alert.id,
|
||||
}).sort({
|
||||
createdAt: 1,
|
||||
});
|
||||
|
|
@ -890,29 +947,82 @@ describe('checkAlerts', () => {
|
|||
mockUserId,
|
||||
);
|
||||
|
||||
const enhancedAlert: any = await Alert.findById(alert._id).populate([
|
||||
const enhancedAlert: any = await Alert.findById(alert.id).populate([
|
||||
'team',
|
||||
'dashboard',
|
||||
]);
|
||||
|
||||
const tile = dashboard.tiles?.find((t: any) => t.id === '17quud');
|
||||
if (!tile) throw new Error('tile not found for dashboard test case');
|
||||
const details: any = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.TILE,
|
||||
tile,
|
||||
dashboard,
|
||||
};
|
||||
|
||||
const clickhouseClient = new clickhouse.ClickhouseClient({
|
||||
host: connection.host,
|
||||
username: connection.username,
|
||||
password: connection.password,
|
||||
});
|
||||
|
||||
const mockMetadata = {
|
||||
getColumn: jest.fn().mockImplementation(({ column }) => {
|
||||
const columnMap = {
|
||||
ServiceName: { name: 'ServiceName', type: 'String' },
|
||||
Timestamp: { name: 'Timestamp', type: 'DateTime' },
|
||||
SeverityText: { name: 'SeverityText', type: 'String' },
|
||||
Body: { name: 'Body', type: 'String' },
|
||||
};
|
||||
return Promise.resolve(columnMap[column]);
|
||||
}),
|
||||
};
|
||||
|
||||
// Mock the getMetadata function
|
||||
jest.mock('@hyperdx/common-utils/dist/metadata', () => ({
|
||||
...jest.requireActual('@hyperdx/common-utils/dist/metadata'),
|
||||
getMetadata: jest.fn().mockReturnValue(mockMetadata),
|
||||
}));
|
||||
|
||||
// should fetch 5m of logs
|
||||
await processAlert(now, enhancedAlert, alertProvider);
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
await processAlert(
|
||||
now,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
// skip since time diff is less than 1 window size
|
||||
const later = new Date('2023-11-16T22:14:00.000Z');
|
||||
await processAlert(later, enhancedAlert, alertProvider);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
|
||||
await processAlert(nextWindow, enhancedAlert, alertProvider);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect(enhancedAlert.state).toBe('OK');
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
||||
// check alert history
|
||||
const alertHistories = await AlertHistory.find({
|
||||
alert: alert._id,
|
||||
alert: alert.id,
|
||||
}).sort({
|
||||
createdAt: 1,
|
||||
});
|
||||
|
|
@ -957,7 +1067,10 @@ describe('checkAlerts', () => {
|
|||
it('TILE alert (events) - generic webhook', async () => {
|
||||
jest.spyOn(checkAlert, 'handleSendGenericWebhook');
|
||||
|
||||
const fetchMock = jest.fn().mockResolvedValue({});
|
||||
const fetchMock = jest.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
text: jest.fn().mockResolvedValue(''),
|
||||
});
|
||||
global.fetch = fetchMock;
|
||||
|
||||
const team = await createTeam({ name: 'My Team' });
|
||||
|
|
@ -1063,29 +1176,83 @@ describe('checkAlerts', () => {
|
|||
mockUserId,
|
||||
);
|
||||
|
||||
const enhancedAlert: any = await Alert.findById(alert._id).populate([
|
||||
const enhancedAlert: any = await Alert.findById(alert.id).populate([
|
||||
'team',
|
||||
'dashboard',
|
||||
]);
|
||||
|
||||
const tile = dashboard.tiles?.find((t: any) => t.id === '17quud');
|
||||
if (!tile)
|
||||
throw new Error('tile not found for dashboard generic webhook');
|
||||
const details: any = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.TILE,
|
||||
tile,
|
||||
dashboard,
|
||||
};
|
||||
|
||||
const clickhouseClient = new clickhouse.ClickhouseClient({
|
||||
host: connection.host,
|
||||
username: connection.username,
|
||||
password: connection.password,
|
||||
});
|
||||
|
||||
const mockMetadata = {
|
||||
getColumn: jest.fn().mockImplementation(({ column }) => {
|
||||
const columnMap = {
|
||||
ServiceName: { name: 'ServiceName', type: 'String' },
|
||||
Timestamp: { name: 'Timestamp', type: 'DateTime' },
|
||||
SeverityText: { name: 'SeverityText', type: 'String' },
|
||||
Body: { name: 'Body', type: 'String' },
|
||||
};
|
||||
return Promise.resolve(columnMap[column]);
|
||||
}),
|
||||
};
|
||||
|
||||
// Mock the getMetadata function
|
||||
jest.mock('@hyperdx/common-utils/dist/metadata', () => ({
|
||||
...jest.requireActual('@hyperdx/common-utils/dist/metadata'),
|
||||
getMetadata: jest.fn().mockReturnValue(mockMetadata),
|
||||
}));
|
||||
|
||||
// should fetch 5m of logs
|
||||
await processAlert(now, enhancedAlert, alertProvider);
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
await processAlert(
|
||||
now,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
// skip since time diff is less than 1 window size
|
||||
const later = new Date('2023-11-16T22:14:00.000Z');
|
||||
await processAlert(later, enhancedAlert, alertProvider);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
|
||||
await processAlert(nextWindow, enhancedAlert, alertProvider);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect(enhancedAlert.state).toBe('OK');
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
||||
// check alert history
|
||||
const alertHistories = await AlertHistory.find({
|
||||
alert: alert._id,
|
||||
alert: alert.id,
|
||||
}).sort({
|
||||
createdAt: 1,
|
||||
});
|
||||
|
|
@ -1218,29 +1385,84 @@ describe('checkAlerts', () => {
|
|||
mockUserId,
|
||||
);
|
||||
|
||||
const enhancedAlert: any = await Alert.findById(alert._id).populate([
|
||||
const enhancedAlert: any = await Alert.findById(alert.id).populate([
|
||||
'team',
|
||||
'dashboard',
|
||||
]);
|
||||
|
||||
const tile = dashboard.tiles?.find((t: any) => t.id === '17quud');
|
||||
if (!tile)
|
||||
throw new Error('tile not found for dashboard metrics webhook');
|
||||
const details: any = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.TILE,
|
||||
tile,
|
||||
dashboard,
|
||||
};
|
||||
|
||||
const clickhouseClient = new clickhouse.ClickhouseClient({
|
||||
host: connection.host,
|
||||
username: connection.username,
|
||||
password: connection.password,
|
||||
});
|
||||
|
||||
const mockMetadata = {
|
||||
getColumn: jest.fn().mockImplementation(({ column }) => {
|
||||
const columnMap = {
|
||||
ServiceName: { name: 'ServiceName', type: 'String' },
|
||||
Timestamp: { name: 'Timestamp', type: 'DateTime' },
|
||||
Value: { name: 'Value', type: 'Double' },
|
||||
MetricName: { name: 'MetricName', type: 'String' },
|
||||
TimeUnix: { name: 'TimeUnix', type: 'DateTime' },
|
||||
};
|
||||
return Promise.resolve(columnMap[column]);
|
||||
}),
|
||||
};
|
||||
|
||||
// Mock the getMetadata function
|
||||
jest.mock('@hyperdx/common-utils/dist/metadata', () => ({
|
||||
...jest.requireActual('@hyperdx/common-utils/dist/metadata'),
|
||||
getMetadata: jest.fn().mockReturnValue(mockMetadata),
|
||||
}));
|
||||
|
||||
// should fetch 5m of logs
|
||||
await processAlert(now, enhancedAlert, alertProvider);
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
await processAlert(
|
||||
now,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
// skip since time diff is less than 1 window size
|
||||
const later = new Date('2023-11-16T22:14:00.000Z');
|
||||
await processAlert(later, enhancedAlert, alertProvider);
|
||||
await processAlert(
|
||||
later,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
// alert should still be in alert state
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
|
||||
await processAlert(nextWindow, enhancedAlert, alertProvider);
|
||||
await processAlert(
|
||||
nextWindow,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
// alert should be in ok state
|
||||
expect(enhancedAlert.state).toBe('OK');
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
|
||||
|
||||
// check alert history
|
||||
const alertHistories = await AlertHistory.find({
|
||||
alert: alert._id,
|
||||
alert: alert.id,
|
||||
}).sort({
|
||||
createdAt: 1,
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import * as clickhouse from '@hyperdx/common-utils/dist/clickhouse';
|
||||
import { createServer } from 'http';
|
||||
import mongoose from 'mongoose';
|
||||
import ms from 'ms';
|
||||
|
|
@ -13,7 +14,7 @@ import { SavedSearch } from '@/models/savedSearch';
|
|||
import { Source } from '@/models/source';
|
||||
import Webhook from '@/models/webhook';
|
||||
import { processAlert } from '@/tasks/checkAlerts';
|
||||
import { loadProvider } from '@/tasks/providers';
|
||||
import { AlertTaskType, loadProvider } from '@/tasks/providers';
|
||||
import * as slack from '@/utils/slack';
|
||||
|
||||
describe('Single Invocation Alert Test', () => {
|
||||
|
|
@ -169,20 +170,38 @@ describe('Single Invocation Alert Test', () => {
|
|||
]);
|
||||
|
||||
// Get the alert with populated references
|
||||
const enhancedAlert: any = await Alert.findById(alert._id).populate([
|
||||
const enhancedAlert: any = await Alert.findById(alert.id).populate([
|
||||
'team',
|
||||
'savedSearch',
|
||||
]);
|
||||
|
||||
// Process the alert - this should trigger the webhook
|
||||
await processAlert(now, enhancedAlert, alertProvider);
|
||||
const details: any = {
|
||||
alert: enhancedAlert,
|
||||
source,
|
||||
conn: connection,
|
||||
taskType: AlertTaskType.SAVED_SEARCH,
|
||||
savedSearch,
|
||||
};
|
||||
const clickhouseClient = new clickhouse.ClickhouseClient({
|
||||
host: connection.host,
|
||||
username: connection.username,
|
||||
password: connection.password,
|
||||
});
|
||||
await processAlert(
|
||||
now,
|
||||
details,
|
||||
clickhouseClient,
|
||||
connection.id,
|
||||
alertProvider,
|
||||
);
|
||||
|
||||
// Verify alert state changed to ALERT
|
||||
expect(enhancedAlert.state).toBe('ALERT');
|
||||
// Verify alert state changed to ALERT (from DB)
|
||||
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
|
||||
|
||||
// Verify alert history was created
|
||||
const alertHistories = await AlertHistory.find({
|
||||
alert: alert._id,
|
||||
alert: alert.id,
|
||||
}).sort({ createdAt: 1 });
|
||||
|
||||
expect(alertHistories.length).toBe(1);
|
||||
|
|
|
|||
|
|
@ -8,23 +8,22 @@ import {
|
|||
DisplayType,
|
||||
} from '@hyperdx/common-utils/dist/types';
|
||||
import * as fns from 'date-fns';
|
||||
import _ from 'lodash';
|
||||
import { isString } from 'lodash';
|
||||
import ms from 'ms';
|
||||
import { serializeError } from 'serialize-error';
|
||||
|
||||
import { getConnectionById } from '@/controllers/connection';
|
||||
import {
|
||||
AlertDocument,
|
||||
AlertSource,
|
||||
AlertState,
|
||||
AlertThresholdType,
|
||||
} from '@/models/alert';
|
||||
import Alert, { AlertState, AlertThresholdType, IAlert } from '@/models/alert';
|
||||
import AlertHistory, { IAlertHistory } from '@/models/alertHistory';
|
||||
import Dashboard, { IDashboard } from '@/models/dashboard';
|
||||
import { ISavedSearch, SavedSearch } from '@/models/savedSearch';
|
||||
import { ISource, Source } from '@/models/source';
|
||||
import { AlertProvider, loadProvider } from '@/tasks/providers';
|
||||
import { IDashboard } from '@/models/dashboard';
|
||||
import { ISavedSearch } from '@/models/savedSearch';
|
||||
import { ISource } from '@/models/source';
|
||||
import {
|
||||
AlertDetails,
|
||||
AlertProvider,
|
||||
AlertTask,
|
||||
AlertTaskType,
|
||||
loadProvider,
|
||||
} from '@/tasks/providers';
|
||||
import {
|
||||
AlertMessageTemplateDefaultView,
|
||||
buildAlertMessageTemplateTitle,
|
||||
|
|
@ -64,7 +63,7 @@ const fireChannelEvent = async ({
|
|||
totalCount,
|
||||
windowSizeInMins,
|
||||
}: {
|
||||
alert: AlertDocument;
|
||||
alert: IAlert;
|
||||
alertProvider: AlertProvider;
|
||||
attributes: Record<string, string>; // TODO: support other types than string
|
||||
clickhouseClient: clickhouse.ClickhouseClient;
|
||||
|
|
@ -137,12 +136,15 @@ const fireChannelEvent = async ({
|
|||
|
||||
export const processAlert = async (
|
||||
now: Date,
|
||||
alert: AlertDocument,
|
||||
details: AlertDetails,
|
||||
clickhouseClient: clickhouse.ClickhouseClient,
|
||||
connectionId: string,
|
||||
alertProvider: AlertProvider,
|
||||
) => {
|
||||
const { alert, source } = details;
|
||||
try {
|
||||
const previous: IAlertHistory | undefined = (
|
||||
await AlertHistory.find({ alert: alert._id })
|
||||
await AlertHistory.find({ alert: alert.id })
|
||||
.sort({ createdAt: -1 })
|
||||
.limit(1)
|
||||
)[0];
|
||||
|
|
@ -169,30 +171,8 @@ export const processAlert = async (
|
|||
const checkEndTime = nowInMinsRoundDown;
|
||||
|
||||
let chartConfig: ChartConfigWithOptDateRange | undefined;
|
||||
let connectionId: string | undefined;
|
||||
let savedSearch: ISavedSearch | undefined | null;
|
||||
let dashboard: IDashboard | undefined | null;
|
||||
let source: ISource | undefined | null;
|
||||
// SAVED_SEARCH Source
|
||||
if (alert.source === AlertSource.SAVED_SEARCH && alert.savedSearch) {
|
||||
savedSearch = await SavedSearch.findById(alert.savedSearch);
|
||||
if (savedSearch == null) {
|
||||
logger.error({
|
||||
message: 'SavedSearch not found',
|
||||
alertId: alert.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
source = await Source.findById(savedSearch.source);
|
||||
if (source == null) {
|
||||
logger.error({
|
||||
message: 'Source not found',
|
||||
alertId: alert.id,
|
||||
savedSearch: alert.savedSearch,
|
||||
});
|
||||
return;
|
||||
}
|
||||
connectionId = source.connection.toString();
|
||||
if (details.taskType === AlertTaskType.SAVED_SEARCH) {
|
||||
const savedSearch = details.savedSearch;
|
||||
chartConfig = {
|
||||
connection: connectionId,
|
||||
displayType: DisplayType.Line,
|
||||
|
|
@ -214,61 +194,26 @@ export const processAlert = async (
|
|||
implicitColumnExpression: source.implicitColumnExpression,
|
||||
timestampValueExpression: source.timestampValueExpression,
|
||||
};
|
||||
}
|
||||
// TILE Source
|
||||
else if (
|
||||
alert.source === AlertSource.TILE &&
|
||||
alert.dashboard &&
|
||||
alert.tileId
|
||||
) {
|
||||
dashboard = await Dashboard.findById(alert.dashboard);
|
||||
if (dashboard == null) {
|
||||
logger.error({
|
||||
message: 'Dashboard not found',
|
||||
alertId: alert.id,
|
||||
dashboardId: alert.dashboard,
|
||||
});
|
||||
return;
|
||||
}
|
||||
// filter tiles
|
||||
dashboard.tiles = dashboard.tiles.filter(
|
||||
tile => tile.id === alert.tileId,
|
||||
);
|
||||
|
||||
if (dashboard.tiles.length === 1) {
|
||||
// Doesn't work for metric alerts yet
|
||||
const MAX_NUM_GROUPS = 20;
|
||||
// TODO: assuming that the chart has only 1 series for now
|
||||
const firstTile = dashboard.tiles[0];
|
||||
if (firstTile.config.displayType === DisplayType.Line) {
|
||||
// fetch source data
|
||||
source = await Source.findById(firstTile.config.source);
|
||||
if (!source) {
|
||||
logger.error({
|
||||
message: 'Source not found',
|
||||
dashboardId: alert.dashboard,
|
||||
tile: firstTile,
|
||||
});
|
||||
return;
|
||||
}
|
||||
connectionId = source.connection.toString();
|
||||
chartConfig = {
|
||||
connection: connectionId,
|
||||
dateRange: [checkStartTime, checkEndTime],
|
||||
dateRangeStartInclusive: true,
|
||||
dateRangeEndInclusive: false,
|
||||
displayType: firstTile.config.displayType,
|
||||
from: source.from,
|
||||
granularity: `${windowSizeInMins} minute`,
|
||||
groupBy: firstTile.config.groupBy,
|
||||
implicitColumnExpression: source.implicitColumnExpression,
|
||||
metricTables: source.metricTables,
|
||||
select: firstTile.config.select,
|
||||
timestampValueExpression: source.timestampValueExpression,
|
||||
where: firstTile.config.where,
|
||||
seriesReturnType: firstTile.config.seriesReturnType,
|
||||
};
|
||||
}
|
||||
} else if (details.taskType === AlertTaskType.TILE) {
|
||||
const tile = details.tile;
|
||||
// Doesn't work for metric alerts yet
|
||||
if (tile.config.displayType === DisplayType.Line) {
|
||||
chartConfig = {
|
||||
connection: connectionId,
|
||||
dateRange: [checkStartTime, checkEndTime],
|
||||
dateRangeStartInclusive: true,
|
||||
dateRangeEndInclusive: false,
|
||||
displayType: tile.config.displayType,
|
||||
from: source.from,
|
||||
granularity: `${windowSizeInMins} minute`,
|
||||
groupBy: tile.config.groupBy,
|
||||
implicitColumnExpression: source.implicitColumnExpression,
|
||||
metricTables: source.metricTables,
|
||||
select: tile.config.select,
|
||||
timestampValueExpression: source.timestampValueExpression,
|
||||
where: tile.config.where,
|
||||
seriesReturnType: tile.config.seriesReturnType,
|
||||
};
|
||||
}
|
||||
} else {
|
||||
logger.error({
|
||||
|
|
@ -279,34 +224,15 @@ export const processAlert = async (
|
|||
}
|
||||
|
||||
// Fetch data
|
||||
if (chartConfig == null || connectionId == null) {
|
||||
if (chartConfig == null) {
|
||||
logger.error({
|
||||
message: 'Failed to build chart config',
|
||||
chartConfig,
|
||||
connectionId,
|
||||
alertId: alert.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const connection = await getConnectionById(
|
||||
alert.team._id.toString(),
|
||||
connectionId,
|
||||
true,
|
||||
);
|
||||
|
||||
if (connection == null) {
|
||||
logger.error({
|
||||
message: 'Connection not found',
|
||||
alertId: alert.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const clickhouseClient = new clickhouse.ClickhouseClient({
|
||||
host: connection.host,
|
||||
username: connection.username,
|
||||
password: connection.password,
|
||||
});
|
||||
const metadata = getMetadata(clickhouseClient);
|
||||
const checksData = await clickhouseClient.queryChartConfig({
|
||||
config: chartConfig,
|
||||
|
|
@ -324,7 +250,7 @@ export const processAlert = async (
|
|||
// TODO: support INSUFFICIENT_DATA state
|
||||
let alertState = AlertState.OK;
|
||||
const history = await new AlertHistory({
|
||||
alert: alert._id,
|
||||
alert: alert.id,
|
||||
createdAt: nowInMinsRoundDown,
|
||||
state: alertState,
|
||||
}).save();
|
||||
|
|
@ -391,16 +317,20 @@ export const processAlert = async (
|
|||
});
|
||||
|
||||
try {
|
||||
// Casts to any here because this is where I stopped unraveling the
|
||||
// alert logic requiring large, nested objects. We should look at
|
||||
// cleaning this up next. fireChannelEvent guards against null values
|
||||
// for these properties.
|
||||
await fireChannelEvent({
|
||||
alert,
|
||||
alertProvider,
|
||||
attributes: {}, // FIXME: support attributes (logs + resources ?)
|
||||
clickhouseClient,
|
||||
dashboard,
|
||||
dashboard: (details as any).dashboard,
|
||||
endTime: fns.addMinutes(bucketStart, windowSizeInMins),
|
||||
group: extraFields.join(', '),
|
||||
metadata,
|
||||
savedSearch,
|
||||
savedSearch: (details as any).savedSearch,
|
||||
source,
|
||||
startTime: bucketStart,
|
||||
totalCount: _value,
|
||||
|
|
@ -423,8 +353,7 @@ export const processAlert = async (
|
|||
await history.save();
|
||||
}
|
||||
|
||||
alert.state = alertState;
|
||||
await alert.save();
|
||||
await Alert.updateOne({ _id: alert.id }, { $set: { state: alertState } });
|
||||
} catch (e) {
|
||||
// Uncomment this for better error messages locally
|
||||
// console.error(e);
|
||||
|
|
@ -436,6 +365,30 @@ export const processAlert = async (
|
|||
}
|
||||
};
|
||||
|
||||
export const processAlertTask = async (
|
||||
now: Date,
|
||||
alertTask: AlertTask,
|
||||
alertProvider: AlertProvider,
|
||||
) => {
|
||||
const { alerts, conn } = alertTask;
|
||||
logger.info({
|
||||
message: 'Processing alerts in batch',
|
||||
alertCount: alerts.length,
|
||||
});
|
||||
|
||||
const clickhouseClient = new clickhouse.ClickhouseClient({
|
||||
host: conn.host,
|
||||
username: conn.username,
|
||||
password: conn.password,
|
||||
});
|
||||
|
||||
const p: Promise<void>[] = [];
|
||||
for (const alert of alerts) {
|
||||
p.push(processAlert(now, alert, clickhouseClient, conn.id, alertProvider));
|
||||
}
|
||||
await Promise.all(p);
|
||||
};
|
||||
|
||||
// Re-export handleSendGenericWebhook for testing
|
||||
export { handleSendGenericWebhook };
|
||||
|
||||
|
|
@ -448,11 +401,14 @@ export default class CheckAlertTask implements HdxTask {
|
|||
|
||||
const now = new Date();
|
||||
const alertTasks = await this.provider.getAlertTasks();
|
||||
const alerts = alertTasks[0].alerts;
|
||||
logger.info(`Going to process ${alerts.length} alerts`);
|
||||
await Promise.all(
|
||||
alerts.map(alert => processAlert(now, alert, this.provider)),
|
||||
);
|
||||
logger.info({
|
||||
message: 'Fetched alert tasks to process',
|
||||
taskCount: alertTasks.length,
|
||||
});
|
||||
|
||||
for (const task of alertTasks) {
|
||||
await processAlertTask(now, task, this.provider);
|
||||
}
|
||||
}
|
||||
|
||||
async asyncDispose(): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,18 @@
|
|||
import mongoose from 'mongoose';
|
||||
|
||||
import * as config from '@/config';
|
||||
import { AlertProvider, loadProvider } from '@/tasks/providers/index';
|
||||
import { createAlert } from '@/controllers/alerts';
|
||||
import { createTeam } from '@/controllers/team';
|
||||
import { getServer, makeTile } from '@/fixtures';
|
||||
import Alert, { AlertSource, AlertThresholdType } from '@/models/alert';
|
||||
import Connection from '@/models/connection';
|
||||
import Dashboard from '@/models/dashboard';
|
||||
import { SavedSearch } from '@/models/savedSearch';
|
||||
import { Source } from '@/models/source';
|
||||
import {
|
||||
AlertProvider,
|
||||
AlertTaskType,
|
||||
loadProvider,
|
||||
} from '@/tasks/providers/index';
|
||||
|
||||
const MOCK_SAVED_SEARCH: any = {
|
||||
id: 'fake-saved-search-id',
|
||||
|
|
@ -9,9 +20,571 @@ const MOCK_SAVED_SEARCH: any = {
|
|||
|
||||
describe('DefaultAlertProvider', () => {
|
||||
let provider: AlertProvider;
|
||||
const server = getServer();
|
||||
|
||||
beforeEach(async () => {
|
||||
beforeAll(async () => {
|
||||
provider = await loadProvider('default');
|
||||
await server.start();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await server.clearDBs();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
if (provider) {
|
||||
await provider.asyncDispose();
|
||||
}
|
||||
await server.stop();
|
||||
});
|
||||
|
||||
describe('getAlertTasks', () => {
|
||||
it('should return empty array when no alerts exist', async () => {
|
||||
const result = await provider.getAlertTasks();
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('should process a single saved search alert', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
// Create connection
|
||||
const connection = await Connection.create({
|
||||
team: team._id,
|
||||
name: 'Test Connection',
|
||||
host: 'http://localhost:8123',
|
||||
username: 'test',
|
||||
password: 'test',
|
||||
});
|
||||
|
||||
// Create source
|
||||
const source = await Source.create({
|
||||
team: team._id,
|
||||
name: 'Test Source',
|
||||
kind: 'log',
|
||||
from: {
|
||||
databaseName: 'default',
|
||||
tableName: 'logs',
|
||||
},
|
||||
timestampValueExpression: 'timestamp',
|
||||
connection: connection._id,
|
||||
});
|
||||
|
||||
// Create saved search
|
||||
const savedSearch = await SavedSearch.create({
|
||||
team: team._id,
|
||||
name: 'Test Search',
|
||||
select: 'message',
|
||||
where: 'level: error',
|
||||
whereLanguage: 'lucene',
|
||||
orderBy: 'timestamp',
|
||||
source: source._id,
|
||||
tags: [],
|
||||
});
|
||||
|
||||
// Create alert
|
||||
const alert = await createAlert(
|
||||
team._id,
|
||||
{
|
||||
source: AlertSource.SAVED_SEARCH,
|
||||
savedSearchId: savedSearch._id.toString(),
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
},
|
||||
new mongoose.Types.ObjectId(),
|
||||
);
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0].conn.id).toBe(connection.id);
|
||||
expect(result[0].alerts).toHaveLength(1);
|
||||
expect(result[0].alerts[0].taskType).toBe(AlertTaskType.SAVED_SEARCH);
|
||||
expect(result[0].alerts[0].alert.id).toBe(alert.id);
|
||||
|
||||
// Type narrowing for SAVED_SEARCH alert
|
||||
if (result[0].alerts[0].taskType === AlertTaskType.SAVED_SEARCH) {
|
||||
expect(result[0].alerts[0].savedSearch.name).toBe('Test Search');
|
||||
}
|
||||
});
|
||||
|
||||
it('should process a single tile alert', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
// Create connection
|
||||
const connection = await Connection.create({
|
||||
team: team._id,
|
||||
name: 'Test Connection',
|
||||
host: 'http://localhost:8123',
|
||||
username: 'test',
|
||||
password: 'test',
|
||||
});
|
||||
|
||||
// Create source
|
||||
const source = await Source.create({
|
||||
team: team._id,
|
||||
name: 'Test Source',
|
||||
kind: 'log',
|
||||
from: {
|
||||
databaseName: 'default',
|
||||
tableName: 'logs',
|
||||
},
|
||||
timestampValueExpression: 'timestamp',
|
||||
connection: connection._id,
|
||||
});
|
||||
|
||||
// Create tile with source
|
||||
const tile = makeTile({ id: 'test-tile-123' });
|
||||
tile.config.source = source._id.toString();
|
||||
|
||||
// Create dashboard
|
||||
const dashboard = await Dashboard.create({
|
||||
team: team._id,
|
||||
name: 'Test Dashboard',
|
||||
tiles: [tile],
|
||||
});
|
||||
|
||||
// Create alert
|
||||
const alert = await createAlert(
|
||||
team._id,
|
||||
{
|
||||
source: AlertSource.TILE,
|
||||
dashboardId: dashboard._id.toString(),
|
||||
tileId: tile.id,
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
},
|
||||
new mongoose.Types.ObjectId(),
|
||||
);
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0].conn.id).toBe(connection.id);
|
||||
expect(result[0].alerts).toHaveLength(1);
|
||||
expect(result[0].alerts[0].taskType).toBe(AlertTaskType.TILE);
|
||||
expect(result[0].alerts[0].alert.id).toBe(alert.id);
|
||||
|
||||
// Type narrowing for TILE alert
|
||||
if (result[0].alerts[0].taskType === AlertTaskType.TILE) {
|
||||
expect(result[0].alerts[0].tile.id).toBe(tile.id);
|
||||
expect(result[0].alerts[0].dashboard.name).toBe('Test Dashboard');
|
||||
|
||||
// Validate source is proper ISource object
|
||||
const alertSource = result[0].alerts[0].source;
|
||||
expect(alertSource.connection).toBe(connection.id); // Should be ObjectId, not populated IConnection
|
||||
expect(alertSource.name).toBe('Test Source');
|
||||
expect(alertSource.kind).toBe('log');
|
||||
expect(alertSource.team).toBeDefined();
|
||||
expect(alertSource.from?.databaseName).toBe('default');
|
||||
expect(alertSource.from?.tableName).toBe('logs');
|
||||
expect(alertSource.timestampValueExpression).toBe('timestamp');
|
||||
|
||||
// Ensure it's a plain object, not a mongoose document
|
||||
expect((alertSource as any).toObject).toBeUndefined(); // mongoose documents have toObject method
|
||||
expect((alertSource as any).save).toBeUndefined(); // mongoose documents have save method
|
||||
}
|
||||
});
|
||||
|
||||
it('should skip alerts with missing saved search', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
// Create alert directly in database with non-existent saved search
|
||||
// This simulates an alert that exists but references a deleted saved search
|
||||
await Alert.create({
|
||||
team: team._id,
|
||||
source: AlertSource.SAVED_SEARCH,
|
||||
savedSearch: new mongoose.Types.ObjectId(), // Non-existent ID
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
});
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('should skip alerts with no source field', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
// Create alert directly without source
|
||||
await Alert.create({
|
||||
team: team._id,
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
// Missing source field
|
||||
});
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('should group multiple alerts with the same connection', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
// Create single connection
|
||||
const connection = await Connection.create({
|
||||
team: team._id,
|
||||
name: 'Shared Connection',
|
||||
host: 'http://localhost:8123',
|
||||
username: 'test',
|
||||
password: 'test',
|
||||
});
|
||||
|
||||
// Create source
|
||||
const source = await Source.create({
|
||||
team: team._id,
|
||||
name: 'Test Source',
|
||||
kind: 'log',
|
||||
from: {
|
||||
databaseName: 'default',
|
||||
tableName: 'logs',
|
||||
},
|
||||
timestampValueExpression: 'timestamp',
|
||||
connection: connection._id,
|
||||
});
|
||||
|
||||
// Create saved search and alert
|
||||
const savedSearch = await SavedSearch.create({
|
||||
team: team._id,
|
||||
name: 'Test Search',
|
||||
select: 'message',
|
||||
where: 'level: error',
|
||||
whereLanguage: 'lucene',
|
||||
orderBy: 'timestamp',
|
||||
source: source._id,
|
||||
tags: [],
|
||||
});
|
||||
|
||||
const savedSearchAlert = await createAlert(
|
||||
team._id,
|
||||
{
|
||||
source: AlertSource.SAVED_SEARCH,
|
||||
savedSearchId: savedSearch._id.toString(),
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
},
|
||||
new mongoose.Types.ObjectId(),
|
||||
);
|
||||
|
||||
// Create tile and alert
|
||||
const tile = makeTile({ id: 'test-tile-123' });
|
||||
tile.config.source = source._id.toString();
|
||||
|
||||
const dashboard = await Dashboard.create({
|
||||
team: team._id,
|
||||
name: 'Test Dashboard',
|
||||
tiles: [tile],
|
||||
});
|
||||
|
||||
const tileAlert = await createAlert(
|
||||
team._id,
|
||||
{
|
||||
source: AlertSource.TILE,
|
||||
dashboardId: dashboard._id.toString(),
|
||||
tileId: tile.id,
|
||||
threshold: 15,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '15m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
},
|
||||
new mongoose.Types.ObjectId(),
|
||||
);
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
|
||||
expect(result).toHaveLength(1); // Should group into one task
|
||||
expect(result[0].conn.id).toBe(connection.id);
|
||||
expect(result[0].alerts).toHaveLength(2); // Both alerts should be in the same task
|
||||
|
||||
const alertIds = result[0].alerts.map(a => a.alert.id).sort();
|
||||
expect(alertIds).toEqual([savedSearchAlert.id, tileAlert.id].sort());
|
||||
});
|
||||
|
||||
it('should create separate tasks for different connections', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
// Create two different connections
|
||||
const connection1 = await Connection.create({
|
||||
team: team._id,
|
||||
name: 'Connection 1',
|
||||
host: 'http://localhost:8123',
|
||||
username: 'test1',
|
||||
password: 'test1',
|
||||
});
|
||||
|
||||
const connection2 = await Connection.create({
|
||||
team: team._id,
|
||||
name: 'Connection 2',
|
||||
host: 'http://localhost:8124',
|
||||
username: 'test2',
|
||||
password: 'test2',
|
||||
});
|
||||
|
||||
// Create sources for each connection
|
||||
const source1 = await Source.create({
|
||||
team: team._id,
|
||||
name: 'Source 1',
|
||||
kind: 'log',
|
||||
from: {
|
||||
databaseName: 'default',
|
||||
tableName: 'logs',
|
||||
},
|
||||
timestampValueExpression: 'timestamp',
|
||||
connection: connection1._id,
|
||||
});
|
||||
|
||||
const source2 = await Source.create({
|
||||
team: team._id,
|
||||
name: 'Source 2',
|
||||
kind: 'log',
|
||||
from: {
|
||||
databaseName: 'default',
|
||||
tableName: 'logs',
|
||||
},
|
||||
timestampValueExpression: 'timestamp',
|
||||
connection: connection2._id,
|
||||
});
|
||||
|
||||
// Create saved searches and alerts
|
||||
const savedSearch1 = await SavedSearch.create({
|
||||
team: team._id,
|
||||
name: 'Search 1',
|
||||
select: 'message',
|
||||
where: 'level: error',
|
||||
whereLanguage: 'lucene',
|
||||
orderBy: 'timestamp',
|
||||
source: source1._id,
|
||||
tags: [],
|
||||
});
|
||||
|
||||
const savedSearch2 = await SavedSearch.create({
|
||||
team: team._id,
|
||||
name: 'Search 2',
|
||||
select: 'message',
|
||||
where: 'level: warn',
|
||||
whereLanguage: 'lucene',
|
||||
orderBy: 'timestamp',
|
||||
source: source2._id,
|
||||
tags: [],
|
||||
});
|
||||
|
||||
await createAlert(
|
||||
team._id,
|
||||
{
|
||||
source: AlertSource.SAVED_SEARCH,
|
||||
savedSearchId: savedSearch1._id.toString(),
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
},
|
||||
new mongoose.Types.ObjectId(),
|
||||
);
|
||||
|
||||
await createAlert(
|
||||
team._id,
|
||||
{
|
||||
source: AlertSource.SAVED_SEARCH,
|
||||
savedSearchId: savedSearch2._id.toString(),
|
||||
threshold: 15,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '15m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
},
|
||||
new mongoose.Types.ObjectId(),
|
||||
);
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
|
||||
expect(result).toHaveLength(2); // Should create separate tasks
|
||||
|
||||
const connectionIds = result.map(task => task.conn.id).sort();
|
||||
expect(connectionIds).toEqual([connection1.id, connection2.id].sort());
|
||||
|
||||
// Each task should have one alert
|
||||
expect(result[0].alerts).toHaveLength(1);
|
||||
expect(result[1].alerts).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('should skip alerts with missing dashboard', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
// Create alert directly in database with non-existent dashboard
|
||||
// This simulates an alert that exists but references a deleted dashboard
|
||||
await Alert.create({
|
||||
team: team._id,
|
||||
source: AlertSource.TILE,
|
||||
dashboard: new mongoose.Types.ObjectId(), // Non-existent ID
|
||||
tileId: 'some-tile-id',
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
});
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('should skip alerts with missing tile in dashboard', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
const dashboard = await Dashboard.create({
|
||||
team: team._id,
|
||||
name: 'Test Dashboard',
|
||||
tiles: [makeTile({ id: 'existing-tile' })],
|
||||
});
|
||||
|
||||
// Create alert directly in database with non-existent tile ID
|
||||
await Alert.create({
|
||||
team: team._id,
|
||||
source: AlertSource.TILE,
|
||||
dashboard: dashboard._id,
|
||||
tileId: 'non-existent-tile', // Non-existent tile ID
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
});
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('should skip alerts with missing source', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
const tile = makeTile({ id: 'test-tile' });
|
||||
tile.config.source = new mongoose.Types.ObjectId().toString(); // Non-existent source
|
||||
|
||||
const dashboard = await Dashboard.create({
|
||||
team: team._id,
|
||||
name: 'Test Dashboard',
|
||||
tiles: [tile],
|
||||
});
|
||||
|
||||
// Create alert directly in database
|
||||
await Alert.create({
|
||||
team: team._id,
|
||||
source: AlertSource.TILE,
|
||||
dashboard: dashboard._id,
|
||||
tileId: tile.id,
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
});
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('should skip alerts with missing connection', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
// Create source with non-existent connection
|
||||
const source = await Source.create({
|
||||
team: team._id,
|
||||
name: 'Test Source',
|
||||
kind: 'log',
|
||||
from: {
|
||||
databaseName: 'default',
|
||||
tableName: 'logs',
|
||||
},
|
||||
timestampValueExpression: 'timestamp',
|
||||
connection: new mongoose.Types.ObjectId(), // Non-existent connection
|
||||
});
|
||||
|
||||
const savedSearch = await SavedSearch.create({
|
||||
team: team._id,
|
||||
name: 'Test Search',
|
||||
select: 'message',
|
||||
where: 'level: error',
|
||||
whereLanguage: 'lucene',
|
||||
orderBy: 'timestamp',
|
||||
source: source._id,
|
||||
tags: [],
|
||||
});
|
||||
|
||||
// Create alert directly in database
|
||||
await Alert.create({
|
||||
team: team._id,
|
||||
source: AlertSource.SAVED_SEARCH,
|
||||
savedSearch: savedSearch._id,
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
});
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('should skip alerts with unsupported source type', async () => {
|
||||
const team = await createTeam({ name: 'Test Team' });
|
||||
|
||||
// Create alert with invalid source
|
||||
await Alert.create({
|
||||
team: team._id,
|
||||
source: 'UNSUPPORTED_SOURCE' as any,
|
||||
threshold: 10,
|
||||
thresholdType: AlertThresholdType.ABOVE,
|
||||
interval: '5m',
|
||||
channel: {
|
||||
type: 'webhook',
|
||||
webhookId: new mongoose.Types.ObjectId().toString(),
|
||||
},
|
||||
});
|
||||
|
||||
const result = await provider.getAlertTasks();
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('buildLogSearchLink', () => {
|
||||
|
|
@ -30,7 +603,6 @@ describe('DefaultAlertProvider', () => {
|
|||
it('should handle different saved search IDs', () => {
|
||||
const customSavedSearch: any = {
|
||||
id: 'custom-search-123',
|
||||
_id: new mongoose.Types.ObjectId(),
|
||||
team: new mongoose.Types.ObjectId(),
|
||||
source: new mongoose.Types.ObjectId(),
|
||||
select: 'Body',
|
||||
|
|
@ -96,7 +668,6 @@ describe('DefaultAlertProvider', () => {
|
|||
it('should handle saved search ID with special characters', () => {
|
||||
const specialSavedSearch: any = {
|
||||
id: 'search-with-special-chars-123_456',
|
||||
_id: new mongoose.Types.ObjectId(),
|
||||
team: new mongoose.Types.ObjectId(),
|
||||
source: new mongoose.Types.ObjectId(),
|
||||
select: 'Body',
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import { Tile } from '@hyperdx/common-utils/dist/types';
|
||||
import mongoose from 'mongoose';
|
||||
import ms from 'ms';
|
||||
import { URLSearchParams } from 'url';
|
||||
|
|
@ -5,11 +6,181 @@ import { URLSearchParams } from 'url';
|
|||
import * as config from '@/config';
|
||||
import { LOCAL_APP_TEAM } from '@/controllers/team';
|
||||
import { connectDB, mongooseConnection } from '@/models';
|
||||
import Alert from '@/models/alert';
|
||||
import { ISavedSearch } from '@/models/savedSearch';
|
||||
import Alert, { AlertSource, type IAlert } from '@/models/alert';
|
||||
import Connection, { IConnection } from '@/models/connection';
|
||||
import Dashboard from '@/models/dashboard';
|
||||
import { type ISavedSearch, SavedSearch } from '@/models/savedSearch';
|
||||
import { type ISource, Source } from '@/models/source';
|
||||
import {
|
||||
type AlertDetails,
|
||||
type AlertProvider,
|
||||
type AlertTask,
|
||||
AlertTaskType,
|
||||
} from '@/tasks/providers';
|
||||
import { convertMsToGranularityString } from '@/utils/common';
|
||||
import logger from '@/utils/logger';
|
||||
|
||||
import { AlertProvider, AlertTask } from './index';
|
||||
async function getSavedSearchDetails(
|
||||
alert: IAlert,
|
||||
): Promise<[IConnection, AlertDetails] | []> {
|
||||
const savedSearchId = alert.savedSearch;
|
||||
const savedSearch = await SavedSearch.findOne({
|
||||
_id: savedSearchId,
|
||||
team: alert.team,
|
||||
}).populate<Omit<ISavedSearch, 'source'> & { source: ISource }>({
|
||||
path: 'source',
|
||||
match: { team: alert.team },
|
||||
});
|
||||
|
||||
if (!savedSearch) {
|
||||
logger.error({
|
||||
message: 'savedSearch not found',
|
||||
savedSearchId,
|
||||
alertId: alert.id,
|
||||
});
|
||||
return [];
|
||||
}
|
||||
|
||||
const { source } = savedSearch;
|
||||
const connId = source.connection;
|
||||
const conn = await Connection.findOne({ _id: connId, team: alert.team });
|
||||
if (!conn) {
|
||||
logger.error({
|
||||
message: 'connection not found',
|
||||
alertId: alert.id,
|
||||
connId,
|
||||
savedSearchId,
|
||||
});
|
||||
return [];
|
||||
}
|
||||
|
||||
return [
|
||||
conn,
|
||||
{
|
||||
alert,
|
||||
source,
|
||||
taskType: AlertTaskType.SAVED_SEARCH,
|
||||
savedSearch,
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
async function getTileDetails(
|
||||
alert: IAlert,
|
||||
): Promise<[IConnection, AlertDetails] | []> {
|
||||
const dashboardId = alert.dashboard;
|
||||
const tileId = alert.tileId;
|
||||
|
||||
const dashboard = await Dashboard.findOne({
|
||||
_id: dashboardId,
|
||||
team: alert.team,
|
||||
});
|
||||
if (!dashboard) {
|
||||
logger.error({
|
||||
message: 'dashboard not found',
|
||||
dashboardId,
|
||||
alertId: alert.id,
|
||||
});
|
||||
return [];
|
||||
}
|
||||
|
||||
const tile = dashboard.tiles?.find((t: Tile) => t.id === tileId);
|
||||
if (!tile) {
|
||||
logger.error({
|
||||
message: 'tile matching alert not found',
|
||||
tileId,
|
||||
dashboardId: dashboard._id,
|
||||
alertId: alert.id,
|
||||
});
|
||||
return [];
|
||||
}
|
||||
|
||||
const source = await Source.findOne({
|
||||
_id: tile.config.source,
|
||||
team: alert.team,
|
||||
}).populate<Omit<ISource, 'connection'> & { connection: IConnection }>({
|
||||
path: 'connection',
|
||||
match: { team: alert.team },
|
||||
});
|
||||
if (!source) {
|
||||
logger.error({
|
||||
message: 'source not found',
|
||||
sourceId: tile.config.source,
|
||||
tileId,
|
||||
dashboardId: dashboard._id,
|
||||
alertId: alert.id,
|
||||
});
|
||||
return [];
|
||||
}
|
||||
|
||||
if (!source.connection) {
|
||||
logger.error({
|
||||
message: 'connection not found',
|
||||
alertId: alert.id,
|
||||
tileId,
|
||||
dashboardId: dashboard._id,
|
||||
sourceId: source.id,
|
||||
});
|
||||
return [];
|
||||
}
|
||||
|
||||
const connection = source.connection;
|
||||
const sourceProps = source.toObject();
|
||||
return [
|
||||
connection,
|
||||
{
|
||||
alert,
|
||||
source: { ...sourceProps, connection: connection.id },
|
||||
taskType: AlertTaskType.TILE,
|
||||
tile,
|
||||
dashboard,
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
async function loadAlert(alert: IAlert, groupedTasks: Map<string, AlertTask>) {
|
||||
if (!alert.source) {
|
||||
throw new Error('alert does not have a source');
|
||||
}
|
||||
|
||||
if (config.IS_LOCAL_APP_MODE) {
|
||||
// The id is the 12 character string `_local_team_', which will become an ObjectId
|
||||
// as the ASCII hex values, so 5f6c6f63616c5f7465616d5f.
|
||||
alert.team = new mongoose.Types.ObjectId(LOCAL_APP_TEAM.id);
|
||||
}
|
||||
|
||||
let conn: IConnection | undefined;
|
||||
let details: AlertDetails | undefined;
|
||||
switch (alert.source) {
|
||||
case AlertSource.SAVED_SEARCH:
|
||||
[conn, details] = await getSavedSearchDetails(alert);
|
||||
break;
|
||||
|
||||
case AlertSource.TILE:
|
||||
[conn, details] = await getTileDetails(alert);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new Error(`unsupported source: ${alert.source}`);
|
||||
}
|
||||
|
||||
if (!details) {
|
||||
throw new Error('failed to fetch alert details');
|
||||
}
|
||||
|
||||
if (!conn) {
|
||||
throw new Error('failed to fetch alert connection');
|
||||
}
|
||||
|
||||
if (!groupedTasks.has(conn.id)) {
|
||||
groupedTasks.set(conn.id, { alerts: [], conn });
|
||||
}
|
||||
const v = groupedTasks.get(conn.id);
|
||||
if (!v) {
|
||||
throw new Error(`provider did not set key ${conn.id} before appending`);
|
||||
}
|
||||
v.alerts.push(details);
|
||||
}
|
||||
|
||||
export default class DefaultAlertProvider implements AlertProvider {
|
||||
async init() {
|
||||
|
|
@ -21,16 +192,23 @@ export default class DefaultAlertProvider implements AlertProvider {
|
|||
}
|
||||
|
||||
async getAlertTasks(): Promise<AlertTask[]> {
|
||||
const groupedTasks = new Map<string, AlertTask>();
|
||||
const alerts = await Alert.find({});
|
||||
if (config.IS_LOCAL_APP_MODE) {
|
||||
alerts.forEach(_alert => {
|
||||
// The id is the 12 character string `_local_team_', which will become an ObjectId
|
||||
// as the ASCII hex values, so 5f6c6f63616c5f7465616d5f.
|
||||
_alert.team = new mongoose.Types.ObjectId(LOCAL_APP_TEAM.id);
|
||||
});
|
||||
for (const alert of alerts) {
|
||||
try {
|
||||
await loadAlert(alert, groupedTasks);
|
||||
} catch (e) {
|
||||
logger.error({
|
||||
message: `failed to load alert: ${e}`,
|
||||
alertId: alert.id,
|
||||
team: alert.team,
|
||||
channel: alert.channel,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return [{ alerts }];
|
||||
// Flatten out our groupings for execution
|
||||
return Array.from(groupedTasks.values());
|
||||
}
|
||||
|
||||
buildLogSearchLink({
|
||||
|
|
|
|||
|
|
@ -1,11 +1,45 @@
|
|||
import { AlertDocument } from '@/models/alert';
|
||||
import { Tile } from '@hyperdx/common-utils/dist/types';
|
||||
|
||||
import { IAlert } from '@/models/alert';
|
||||
import { IConnection } from '@/models/connection';
|
||||
import { IDashboard } from '@/models/dashboard';
|
||||
import { ISavedSearch } from '@/models/savedSearch';
|
||||
import { ISource } from '@/models/source';
|
||||
import DefaultAlertProvider from '@/tasks/providers/default';
|
||||
|
||||
import DefaultAlertProvider from './default';
|
||||
export enum AlertTaskType {
|
||||
SAVED_SEARCH,
|
||||
TILE,
|
||||
}
|
||||
|
||||
export type AlertTask = {
|
||||
alerts: AlertDocument[];
|
||||
};
|
||||
// Details about the alert and the source for the alert. Depending on
|
||||
// the taskType either:
|
||||
// 1. the savedSearch field is required or
|
||||
// 2. the tile and dashboard field are required
|
||||
//
|
||||
// The dependent typing means less null checks when using these values as
|
||||
// the are required when the type is set accordingly.
|
||||
export type AlertDetails = {
|
||||
alert: IAlert;
|
||||
source: ISource;
|
||||
} & (
|
||||
| {
|
||||
taskType: AlertTaskType.SAVED_SEARCH;
|
||||
savedSearch: Omit<ISavedSearch, 'source'>;
|
||||
}
|
||||
| {
|
||||
taskType: AlertTaskType.TILE;
|
||||
tile: Tile;
|
||||
dashboard: IDashboard;
|
||||
}
|
||||
);
|
||||
|
||||
// AlertTask instances can carry metadata, of type T, for the provider that created
|
||||
// them. The `metadata` field is only valid when T is defined to be a legal type.
|
||||
export type AlertTask<T = never> = {
|
||||
alerts: AlertDetails[];
|
||||
conn: IConnection;
|
||||
} & ([T] extends [never] ? unknown : { metadata: T });
|
||||
|
||||
export interface AlertProvider {
|
||||
init(): Promise<void>;
|
||||
|
|
|
|||
Loading…
Reference in a new issue