feat: group tasks by connection (#1071)

Group the alert tasks fetched from mongo into groups based on what needs to run from the same connection. This sets up for further optimizations and connection reuse.
This commit is contained in:
Dan Hable 2025-08-19 10:26:01 -05:00 committed by GitHub
parent adb05ac723
commit c2160536ac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 1177 additions and 191 deletions

View file

@ -0,0 +1,5 @@
---
"@hyperdx/api": patch
---
Changes the order of alert evaluation to group queries by the connection on the alert.

View file

@ -18,7 +18,7 @@ export function getConnectionById(
export function createConnection(
team: string,
connection: Omit<IConnection, '_id'>,
connection: Omit<IConnection, 'id' | '_id'>,
) {
return Connection.create({ ...connection, team });
}
@ -26,7 +26,7 @@ export function createConnection(
export function updateConnection(
team: string,
connectionId: string,
connection: Omit<IConnection, '_id'>,
connection: Omit<IConnection, 'id' | '_id'>,
) {
return Connection.findOneAndUpdate({ _id: connectionId, team }, connection, {
new: true,

View file

@ -21,8 +21,7 @@ export async function getSavedSearches(teamId: string) {
return savedSearches.map(savedSearch => ({
...savedSearch.toJSON(),
alerts: alertsBySavedSearchId[savedSearch._id.toString()]?.map(alert => {
const { _id, ...restAlert } = alert.toJSON();
return { id: _id, ...restAlert };
return alert.toJSON();
}),
}));
}

View file

@ -41,7 +41,7 @@ export enum AlertSource {
}
export interface IAlert {
_id: ObjectId;
id: string;
channel: AlertChannel;
interval: AlertInterval;
source?: AlertSource;
@ -162,6 +162,7 @@ const AlertSchema = new Schema<IAlert>(
},
{
timestamps: true,
toJSON: { virtuals: true },
},
);

View file

@ -5,6 +5,7 @@ type ObjectId = mongoose.Types.ObjectId;
export interface IConnection {
_id: ObjectId;
id: string;
host: string;
name: string;
password: string;

View file

@ -1,3 +1,4 @@
import * as clickhouse from '@hyperdx/common-utils/dist/clickhouse';
import mongoose from 'mongoose';
import ms from 'ms';
@ -21,7 +22,7 @@ import { Source } from '@/models/source';
import Webhook from '@/models/webhook';
import * as checkAlert from '@/tasks/checkAlerts';
import { doesExceedThreshold, processAlert } from '@/tasks/checkAlerts';
import { loadProvider } from '@/tasks/providers';
import { AlertDetails, AlertTaskType, loadProvider } from '@/tasks/providers';
import {
AlertMessageTemplateDefaultView,
buildAlertMessageTemplateHdxLink,
@ -709,34 +710,90 @@ describe('checkAlerts', () => {
mockUserId,
);
const enhancedAlert: any = await Alert.findById(alert._id).populate([
const enhancedAlert: any = await Alert.findById(alert.id).populate([
'team',
'savedSearch',
]);
const details: any = {
alert: enhancedAlert,
source,
conn: connection,
taskType: AlertTaskType.SAVED_SEARCH,
savedSearch,
};
const clickhouseClient = new clickhouse.ClickhouseClient({
host: connection.host,
username: connection.username,
password: connection.password,
});
const mockMetadata = {
getColumn: jest.fn().mockImplementation(({ column }) => {
const columnMap = {
Body: { name: 'Body', type: 'String' },
Timestamp: { name: 'Timestamp', type: 'DateTime' },
SeverityText: { name: 'SeverityText', type: 'String' },
ServiceName: { name: 'ServiceName', type: 'String' },
};
return Promise.resolve(columnMap[column]);
}),
};
// Mock the getMetadata function
jest.mock('@hyperdx/common-utils/dist/metadata', () => ({
...jest.requireActual('@hyperdx/common-utils/dist/metadata'),
getMetadata: jest.fn().mockReturnValue(mockMetadata),
}));
// should fetch 5m of logs
await processAlert(now, enhancedAlert, alertProvider);
expect(enhancedAlert.state).toBe('ALERT');
await processAlert(
now,
details,
clickhouseClient,
connection.id,
alertProvider,
);
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
// skip since time diff is less than 1 window size
const later = new Date('2023-11-16T22:14:00.000Z');
await processAlert(later, enhancedAlert, alertProvider);
await processAlert(
later,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// alert should still be in alert state
expect(enhancedAlert.state).toBe('ALERT');
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
await processAlert(nextWindow, enhancedAlert, alertProvider);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// alert should be in ok state
expect(enhancedAlert.state).toBe('ALERT');
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextNextWindow = new Date('2023-11-16T22:20:00.000Z');
await processAlert(nextNextWindow, enhancedAlert, alertProvider);
await processAlert(
nextNextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// alert should be in ok state
expect(enhancedAlert.state).toBe('OK');
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
// check alert history
const alertHistories = await AlertHistory.find({
alert: alert._id,
alert: alert.id,
}).sort({
createdAt: 1,
});
@ -890,29 +947,82 @@ describe('checkAlerts', () => {
mockUserId,
);
const enhancedAlert: any = await Alert.findById(alert._id).populate([
const enhancedAlert: any = await Alert.findById(alert.id).populate([
'team',
'dashboard',
]);
const tile = dashboard.tiles?.find((t: any) => t.id === '17quud');
if (!tile) throw new Error('tile not found for dashboard test case');
const details: any = {
alert: enhancedAlert,
source,
conn: connection,
taskType: AlertTaskType.TILE,
tile,
dashboard,
};
const clickhouseClient = new clickhouse.ClickhouseClient({
host: connection.host,
username: connection.username,
password: connection.password,
});
const mockMetadata = {
getColumn: jest.fn().mockImplementation(({ column }) => {
const columnMap = {
ServiceName: { name: 'ServiceName', type: 'String' },
Timestamp: { name: 'Timestamp', type: 'DateTime' },
SeverityText: { name: 'SeverityText', type: 'String' },
Body: { name: 'Body', type: 'String' },
};
return Promise.resolve(columnMap[column]);
}),
};
// Mock the getMetadata function
jest.mock('@hyperdx/common-utils/dist/metadata', () => ({
...jest.requireActual('@hyperdx/common-utils/dist/metadata'),
getMetadata: jest.fn().mockReturnValue(mockMetadata),
}));
// should fetch 5m of logs
await processAlert(now, enhancedAlert, alertProvider);
expect(enhancedAlert.state).toBe('ALERT');
await processAlert(
now,
details,
clickhouseClient,
connection.id,
alertProvider,
);
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
// skip since time diff is less than 1 window size
const later = new Date('2023-11-16T22:14:00.000Z');
await processAlert(later, enhancedAlert, alertProvider);
await processAlert(
later,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// alert should still be in alert state
expect(enhancedAlert.state).toBe('ALERT');
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
await processAlert(nextWindow, enhancedAlert, alertProvider);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// alert should be in ok state
expect(enhancedAlert.state).toBe('OK');
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
// check alert history
const alertHistories = await AlertHistory.find({
alert: alert._id,
alert: alert.id,
}).sort({
createdAt: 1,
});
@ -957,7 +1067,10 @@ describe('checkAlerts', () => {
it('TILE alert (events) - generic webhook', async () => {
jest.spyOn(checkAlert, 'handleSendGenericWebhook');
const fetchMock = jest.fn().mockResolvedValue({});
const fetchMock = jest.fn().mockResolvedValue({
ok: true,
text: jest.fn().mockResolvedValue(''),
});
global.fetch = fetchMock;
const team = await createTeam({ name: 'My Team' });
@ -1063,29 +1176,83 @@ describe('checkAlerts', () => {
mockUserId,
);
const enhancedAlert: any = await Alert.findById(alert._id).populate([
const enhancedAlert: any = await Alert.findById(alert.id).populate([
'team',
'dashboard',
]);
const tile = dashboard.tiles?.find((t: any) => t.id === '17quud');
if (!tile)
throw new Error('tile not found for dashboard generic webhook');
const details: any = {
alert: enhancedAlert,
source,
conn: connection,
taskType: AlertTaskType.TILE,
tile,
dashboard,
};
const clickhouseClient = new clickhouse.ClickhouseClient({
host: connection.host,
username: connection.username,
password: connection.password,
});
const mockMetadata = {
getColumn: jest.fn().mockImplementation(({ column }) => {
const columnMap = {
ServiceName: { name: 'ServiceName', type: 'String' },
Timestamp: { name: 'Timestamp', type: 'DateTime' },
SeverityText: { name: 'SeverityText', type: 'String' },
Body: { name: 'Body', type: 'String' },
};
return Promise.resolve(columnMap[column]);
}),
};
// Mock the getMetadata function
jest.mock('@hyperdx/common-utils/dist/metadata', () => ({
...jest.requireActual('@hyperdx/common-utils/dist/metadata'),
getMetadata: jest.fn().mockReturnValue(mockMetadata),
}));
// should fetch 5m of logs
await processAlert(now, enhancedAlert, alertProvider);
expect(enhancedAlert.state).toBe('ALERT');
await processAlert(
now,
details,
clickhouseClient,
connection.id,
alertProvider,
);
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
// skip since time diff is less than 1 window size
const later = new Date('2023-11-16T22:14:00.000Z');
await processAlert(later, enhancedAlert, alertProvider);
await processAlert(
later,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// alert should still be in alert state
expect(enhancedAlert.state).toBe('ALERT');
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
await processAlert(nextWindow, enhancedAlert, alertProvider);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// alert should be in ok state
expect(enhancedAlert.state).toBe('OK');
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
// check alert history
const alertHistories = await AlertHistory.find({
alert: alert._id,
alert: alert.id,
}).sort({
createdAt: 1,
});
@ -1218,29 +1385,84 @@ describe('checkAlerts', () => {
mockUserId,
);
const enhancedAlert: any = await Alert.findById(alert._id).populate([
const enhancedAlert: any = await Alert.findById(alert.id).populate([
'team',
'dashboard',
]);
const tile = dashboard.tiles?.find((t: any) => t.id === '17quud');
if (!tile)
throw new Error('tile not found for dashboard metrics webhook');
const details: any = {
alert: enhancedAlert,
source,
conn: connection,
taskType: AlertTaskType.TILE,
tile,
dashboard,
};
const clickhouseClient = new clickhouse.ClickhouseClient({
host: connection.host,
username: connection.username,
password: connection.password,
});
const mockMetadata = {
getColumn: jest.fn().mockImplementation(({ column }) => {
const columnMap = {
ServiceName: { name: 'ServiceName', type: 'String' },
Timestamp: { name: 'Timestamp', type: 'DateTime' },
Value: { name: 'Value', type: 'Double' },
MetricName: { name: 'MetricName', type: 'String' },
TimeUnix: { name: 'TimeUnix', type: 'DateTime' },
};
return Promise.resolve(columnMap[column]);
}),
};
// Mock the getMetadata function
jest.mock('@hyperdx/common-utils/dist/metadata', () => ({
...jest.requireActual('@hyperdx/common-utils/dist/metadata'),
getMetadata: jest.fn().mockReturnValue(mockMetadata),
}));
// should fetch 5m of logs
await processAlert(now, enhancedAlert, alertProvider);
expect(enhancedAlert.state).toBe('ALERT');
await processAlert(
now,
details,
clickhouseClient,
connection.id,
alertProvider,
);
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
// skip since time diff is less than 1 window size
const later = new Date('2023-11-16T22:14:00.000Z');
await processAlert(later, enhancedAlert, alertProvider);
await processAlert(
later,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// alert should still be in alert state
expect(enhancedAlert.state).toBe('ALERT');
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
const nextWindow = new Date('2023-11-16T22:16:00.000Z');
await processAlert(nextWindow, enhancedAlert, alertProvider);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// alert should be in ok state
expect(enhancedAlert.state).toBe('OK');
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('OK');
// check alert history
const alertHistories = await AlertHistory.find({
alert: alert._id,
alert: alert.id,
}).sort({
createdAt: 1,
});

View file

@ -1,3 +1,4 @@
import * as clickhouse from '@hyperdx/common-utils/dist/clickhouse';
import { createServer } from 'http';
import mongoose from 'mongoose';
import ms from 'ms';
@ -13,7 +14,7 @@ import { SavedSearch } from '@/models/savedSearch';
import { Source } from '@/models/source';
import Webhook from '@/models/webhook';
import { processAlert } from '@/tasks/checkAlerts';
import { loadProvider } from '@/tasks/providers';
import { AlertTaskType, loadProvider } from '@/tasks/providers';
import * as slack from '@/utils/slack';
describe('Single Invocation Alert Test', () => {
@ -169,20 +170,38 @@ describe('Single Invocation Alert Test', () => {
]);
// Get the alert with populated references
const enhancedAlert: any = await Alert.findById(alert._id).populate([
const enhancedAlert: any = await Alert.findById(alert.id).populate([
'team',
'savedSearch',
]);
// Process the alert - this should trigger the webhook
await processAlert(now, enhancedAlert, alertProvider);
const details: any = {
alert: enhancedAlert,
source,
conn: connection,
taskType: AlertTaskType.SAVED_SEARCH,
savedSearch,
};
const clickhouseClient = new clickhouse.ClickhouseClient({
host: connection.host,
username: connection.username,
password: connection.password,
});
await processAlert(
now,
details,
clickhouseClient,
connection.id,
alertProvider,
);
// Verify alert state changed to ALERT
expect(enhancedAlert.state).toBe('ALERT');
// Verify alert state changed to ALERT (from DB)
expect((await Alert.findById(enhancedAlert.id))!.state).toBe('ALERT');
// Verify alert history was created
const alertHistories = await AlertHistory.find({
alert: alert._id,
alert: alert.id,
}).sort({ createdAt: 1 });
expect(alertHistories.length).toBe(1);

View file

@ -8,23 +8,22 @@ import {
DisplayType,
} from '@hyperdx/common-utils/dist/types';
import * as fns from 'date-fns';
import _ from 'lodash';
import { isString } from 'lodash';
import ms from 'ms';
import { serializeError } from 'serialize-error';
import { getConnectionById } from '@/controllers/connection';
import {
AlertDocument,
AlertSource,
AlertState,
AlertThresholdType,
} from '@/models/alert';
import Alert, { AlertState, AlertThresholdType, IAlert } from '@/models/alert';
import AlertHistory, { IAlertHistory } from '@/models/alertHistory';
import Dashboard, { IDashboard } from '@/models/dashboard';
import { ISavedSearch, SavedSearch } from '@/models/savedSearch';
import { ISource, Source } from '@/models/source';
import { AlertProvider, loadProvider } from '@/tasks/providers';
import { IDashboard } from '@/models/dashboard';
import { ISavedSearch } from '@/models/savedSearch';
import { ISource } from '@/models/source';
import {
AlertDetails,
AlertProvider,
AlertTask,
AlertTaskType,
loadProvider,
} from '@/tasks/providers';
import {
AlertMessageTemplateDefaultView,
buildAlertMessageTemplateTitle,
@ -64,7 +63,7 @@ const fireChannelEvent = async ({
totalCount,
windowSizeInMins,
}: {
alert: AlertDocument;
alert: IAlert;
alertProvider: AlertProvider;
attributes: Record<string, string>; // TODO: support other types than string
clickhouseClient: clickhouse.ClickhouseClient;
@ -137,12 +136,15 @@ const fireChannelEvent = async ({
export const processAlert = async (
now: Date,
alert: AlertDocument,
details: AlertDetails,
clickhouseClient: clickhouse.ClickhouseClient,
connectionId: string,
alertProvider: AlertProvider,
) => {
const { alert, source } = details;
try {
const previous: IAlertHistory | undefined = (
await AlertHistory.find({ alert: alert._id })
await AlertHistory.find({ alert: alert.id })
.sort({ createdAt: -1 })
.limit(1)
)[0];
@ -169,30 +171,8 @@ export const processAlert = async (
const checkEndTime = nowInMinsRoundDown;
let chartConfig: ChartConfigWithOptDateRange | undefined;
let connectionId: string | undefined;
let savedSearch: ISavedSearch | undefined | null;
let dashboard: IDashboard | undefined | null;
let source: ISource | undefined | null;
// SAVED_SEARCH Source
if (alert.source === AlertSource.SAVED_SEARCH && alert.savedSearch) {
savedSearch = await SavedSearch.findById(alert.savedSearch);
if (savedSearch == null) {
logger.error({
message: 'SavedSearch not found',
alertId: alert.id,
});
return;
}
source = await Source.findById(savedSearch.source);
if (source == null) {
logger.error({
message: 'Source not found',
alertId: alert.id,
savedSearch: alert.savedSearch,
});
return;
}
connectionId = source.connection.toString();
if (details.taskType === AlertTaskType.SAVED_SEARCH) {
const savedSearch = details.savedSearch;
chartConfig = {
connection: connectionId,
displayType: DisplayType.Line,
@ -214,61 +194,26 @@ export const processAlert = async (
implicitColumnExpression: source.implicitColumnExpression,
timestampValueExpression: source.timestampValueExpression,
};
}
// TILE Source
else if (
alert.source === AlertSource.TILE &&
alert.dashboard &&
alert.tileId
) {
dashboard = await Dashboard.findById(alert.dashboard);
if (dashboard == null) {
logger.error({
message: 'Dashboard not found',
alertId: alert.id,
dashboardId: alert.dashboard,
});
return;
}
// filter tiles
dashboard.tiles = dashboard.tiles.filter(
tile => tile.id === alert.tileId,
);
if (dashboard.tiles.length === 1) {
// Doesn't work for metric alerts yet
const MAX_NUM_GROUPS = 20;
// TODO: assuming that the chart has only 1 series for now
const firstTile = dashboard.tiles[0];
if (firstTile.config.displayType === DisplayType.Line) {
// fetch source data
source = await Source.findById(firstTile.config.source);
if (!source) {
logger.error({
message: 'Source not found',
dashboardId: alert.dashboard,
tile: firstTile,
});
return;
}
connectionId = source.connection.toString();
chartConfig = {
connection: connectionId,
dateRange: [checkStartTime, checkEndTime],
dateRangeStartInclusive: true,
dateRangeEndInclusive: false,
displayType: firstTile.config.displayType,
from: source.from,
granularity: `${windowSizeInMins} minute`,
groupBy: firstTile.config.groupBy,
implicitColumnExpression: source.implicitColumnExpression,
metricTables: source.metricTables,
select: firstTile.config.select,
timestampValueExpression: source.timestampValueExpression,
where: firstTile.config.where,
seriesReturnType: firstTile.config.seriesReturnType,
};
}
} else if (details.taskType === AlertTaskType.TILE) {
const tile = details.tile;
// Doesn't work for metric alerts yet
if (tile.config.displayType === DisplayType.Line) {
chartConfig = {
connection: connectionId,
dateRange: [checkStartTime, checkEndTime],
dateRangeStartInclusive: true,
dateRangeEndInclusive: false,
displayType: tile.config.displayType,
from: source.from,
granularity: `${windowSizeInMins} minute`,
groupBy: tile.config.groupBy,
implicitColumnExpression: source.implicitColumnExpression,
metricTables: source.metricTables,
select: tile.config.select,
timestampValueExpression: source.timestampValueExpression,
where: tile.config.where,
seriesReturnType: tile.config.seriesReturnType,
};
}
} else {
logger.error({
@ -279,34 +224,15 @@ export const processAlert = async (
}
// Fetch data
if (chartConfig == null || connectionId == null) {
if (chartConfig == null) {
logger.error({
message: 'Failed to build chart config',
chartConfig,
connectionId,
alertId: alert.id,
});
return;
}
const connection = await getConnectionById(
alert.team._id.toString(),
connectionId,
true,
);
if (connection == null) {
logger.error({
message: 'Connection not found',
alertId: alert.id,
});
return;
}
const clickhouseClient = new clickhouse.ClickhouseClient({
host: connection.host,
username: connection.username,
password: connection.password,
});
const metadata = getMetadata(clickhouseClient);
const checksData = await clickhouseClient.queryChartConfig({
config: chartConfig,
@ -324,7 +250,7 @@ export const processAlert = async (
// TODO: support INSUFFICIENT_DATA state
let alertState = AlertState.OK;
const history = await new AlertHistory({
alert: alert._id,
alert: alert.id,
createdAt: nowInMinsRoundDown,
state: alertState,
}).save();
@ -391,16 +317,20 @@ export const processAlert = async (
});
try {
// Casts to any here because this is where I stopped unraveling the
// alert logic requiring large, nested objects. We should look at
// cleaning this up next. fireChannelEvent guards against null values
// for these properties.
await fireChannelEvent({
alert,
alertProvider,
attributes: {}, // FIXME: support attributes (logs + resources ?)
clickhouseClient,
dashboard,
dashboard: (details as any).dashboard,
endTime: fns.addMinutes(bucketStart, windowSizeInMins),
group: extraFields.join(', '),
metadata,
savedSearch,
savedSearch: (details as any).savedSearch,
source,
startTime: bucketStart,
totalCount: _value,
@ -423,8 +353,7 @@ export const processAlert = async (
await history.save();
}
alert.state = alertState;
await alert.save();
await Alert.updateOne({ _id: alert.id }, { $set: { state: alertState } });
} catch (e) {
// Uncomment this for better error messages locally
// console.error(e);
@ -436,6 +365,30 @@ export const processAlert = async (
}
};
export const processAlertTask = async (
now: Date,
alertTask: AlertTask,
alertProvider: AlertProvider,
) => {
const { alerts, conn } = alertTask;
logger.info({
message: 'Processing alerts in batch',
alertCount: alerts.length,
});
const clickhouseClient = new clickhouse.ClickhouseClient({
host: conn.host,
username: conn.username,
password: conn.password,
});
const p: Promise<void>[] = [];
for (const alert of alerts) {
p.push(processAlert(now, alert, clickhouseClient, conn.id, alertProvider));
}
await Promise.all(p);
};
// Re-export handleSendGenericWebhook for testing
export { handleSendGenericWebhook };
@ -448,11 +401,14 @@ export default class CheckAlertTask implements HdxTask {
const now = new Date();
const alertTasks = await this.provider.getAlertTasks();
const alerts = alertTasks[0].alerts;
logger.info(`Going to process ${alerts.length} alerts`);
await Promise.all(
alerts.map(alert => processAlert(now, alert, this.provider)),
);
logger.info({
message: 'Fetched alert tasks to process',
taskCount: alertTasks.length,
});
for (const task of alertTasks) {
await processAlertTask(now, task, this.provider);
}
}
async asyncDispose(): Promise<void> {

View file

@ -1,7 +1,18 @@
import mongoose from 'mongoose';
import * as config from '@/config';
import { AlertProvider, loadProvider } from '@/tasks/providers/index';
import { createAlert } from '@/controllers/alerts';
import { createTeam } from '@/controllers/team';
import { getServer, makeTile } from '@/fixtures';
import Alert, { AlertSource, AlertThresholdType } from '@/models/alert';
import Connection from '@/models/connection';
import Dashboard from '@/models/dashboard';
import { SavedSearch } from '@/models/savedSearch';
import { Source } from '@/models/source';
import {
AlertProvider,
AlertTaskType,
loadProvider,
} from '@/tasks/providers/index';
const MOCK_SAVED_SEARCH: any = {
id: 'fake-saved-search-id',
@ -9,9 +20,571 @@ const MOCK_SAVED_SEARCH: any = {
describe('DefaultAlertProvider', () => {
let provider: AlertProvider;
const server = getServer();
beforeEach(async () => {
beforeAll(async () => {
provider = await loadProvider('default');
await server.start();
});
afterEach(async () => {
await server.clearDBs();
});
afterAll(async () => {
if (provider) {
await provider.asyncDispose();
}
await server.stop();
});
describe('getAlertTasks', () => {
it('should return empty array when no alerts exist', async () => {
const result = await provider.getAlertTasks();
expect(result).toEqual([]);
});
it('should process a single saved search alert', async () => {
const team = await createTeam({ name: 'Test Team' });
// Create connection
const connection = await Connection.create({
team: team._id,
name: 'Test Connection',
host: 'http://localhost:8123',
username: 'test',
password: 'test',
});
// Create source
const source = await Source.create({
team: team._id,
name: 'Test Source',
kind: 'log',
from: {
databaseName: 'default',
tableName: 'logs',
},
timestampValueExpression: 'timestamp',
connection: connection._id,
});
// Create saved search
const savedSearch = await SavedSearch.create({
team: team._id,
name: 'Test Search',
select: 'message',
where: 'level: error',
whereLanguage: 'lucene',
orderBy: 'timestamp',
source: source._id,
tags: [],
});
// Create alert
const alert = await createAlert(
team._id,
{
source: AlertSource.SAVED_SEARCH,
savedSearchId: savedSearch._id.toString(),
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
},
new mongoose.Types.ObjectId(),
);
const result = await provider.getAlertTasks();
expect(result).toHaveLength(1);
expect(result[0].conn.id).toBe(connection.id);
expect(result[0].alerts).toHaveLength(1);
expect(result[0].alerts[0].taskType).toBe(AlertTaskType.SAVED_SEARCH);
expect(result[0].alerts[0].alert.id).toBe(alert.id);
// Type narrowing for SAVED_SEARCH alert
if (result[0].alerts[0].taskType === AlertTaskType.SAVED_SEARCH) {
expect(result[0].alerts[0].savedSearch.name).toBe('Test Search');
}
});
it('should process a single tile alert', async () => {
const team = await createTeam({ name: 'Test Team' });
// Create connection
const connection = await Connection.create({
team: team._id,
name: 'Test Connection',
host: 'http://localhost:8123',
username: 'test',
password: 'test',
});
// Create source
const source = await Source.create({
team: team._id,
name: 'Test Source',
kind: 'log',
from: {
databaseName: 'default',
tableName: 'logs',
},
timestampValueExpression: 'timestamp',
connection: connection._id,
});
// Create tile with source
const tile = makeTile({ id: 'test-tile-123' });
tile.config.source = source._id.toString();
// Create dashboard
const dashboard = await Dashboard.create({
team: team._id,
name: 'Test Dashboard',
tiles: [tile],
});
// Create alert
const alert = await createAlert(
team._id,
{
source: AlertSource.TILE,
dashboardId: dashboard._id.toString(),
tileId: tile.id,
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
},
new mongoose.Types.ObjectId(),
);
const result = await provider.getAlertTasks();
expect(result).toHaveLength(1);
expect(result[0].conn.id).toBe(connection.id);
expect(result[0].alerts).toHaveLength(1);
expect(result[0].alerts[0].taskType).toBe(AlertTaskType.TILE);
expect(result[0].alerts[0].alert.id).toBe(alert.id);
// Type narrowing for TILE alert
if (result[0].alerts[0].taskType === AlertTaskType.TILE) {
expect(result[0].alerts[0].tile.id).toBe(tile.id);
expect(result[0].alerts[0].dashboard.name).toBe('Test Dashboard');
// Validate source is proper ISource object
const alertSource = result[0].alerts[0].source;
expect(alertSource.connection).toBe(connection.id); // Should be ObjectId, not populated IConnection
expect(alertSource.name).toBe('Test Source');
expect(alertSource.kind).toBe('log');
expect(alertSource.team).toBeDefined();
expect(alertSource.from?.databaseName).toBe('default');
expect(alertSource.from?.tableName).toBe('logs');
expect(alertSource.timestampValueExpression).toBe('timestamp');
// Ensure it's a plain object, not a mongoose document
expect((alertSource as any).toObject).toBeUndefined(); // mongoose documents have toObject method
expect((alertSource as any).save).toBeUndefined(); // mongoose documents have save method
}
});
it('should skip alerts with missing saved search', async () => {
const team = await createTeam({ name: 'Test Team' });
// Create alert directly in database with non-existent saved search
// This simulates an alert that exists but references a deleted saved search
await Alert.create({
team: team._id,
source: AlertSource.SAVED_SEARCH,
savedSearch: new mongoose.Types.ObjectId(), // Non-existent ID
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
});
const result = await provider.getAlertTasks();
expect(result).toEqual([]);
});
it('should skip alerts with no source field', async () => {
const team = await createTeam({ name: 'Test Team' });
// Create alert directly without source
await Alert.create({
team: team._id,
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
// Missing source field
});
const result = await provider.getAlertTasks();
expect(result).toEqual([]);
});
it('should group multiple alerts with the same connection', async () => {
const team = await createTeam({ name: 'Test Team' });
// Create single connection
const connection = await Connection.create({
team: team._id,
name: 'Shared Connection',
host: 'http://localhost:8123',
username: 'test',
password: 'test',
});
// Create source
const source = await Source.create({
team: team._id,
name: 'Test Source',
kind: 'log',
from: {
databaseName: 'default',
tableName: 'logs',
},
timestampValueExpression: 'timestamp',
connection: connection._id,
});
// Create saved search and alert
const savedSearch = await SavedSearch.create({
team: team._id,
name: 'Test Search',
select: 'message',
where: 'level: error',
whereLanguage: 'lucene',
orderBy: 'timestamp',
source: source._id,
tags: [],
});
const savedSearchAlert = await createAlert(
team._id,
{
source: AlertSource.SAVED_SEARCH,
savedSearchId: savedSearch._id.toString(),
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
},
new mongoose.Types.ObjectId(),
);
// Create tile and alert
const tile = makeTile({ id: 'test-tile-123' });
tile.config.source = source._id.toString();
const dashboard = await Dashboard.create({
team: team._id,
name: 'Test Dashboard',
tiles: [tile],
});
const tileAlert = await createAlert(
team._id,
{
source: AlertSource.TILE,
dashboardId: dashboard._id.toString(),
tileId: tile.id,
threshold: 15,
thresholdType: AlertThresholdType.ABOVE,
interval: '15m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
},
new mongoose.Types.ObjectId(),
);
const result = await provider.getAlertTasks();
expect(result).toHaveLength(1); // Should group into one task
expect(result[0].conn.id).toBe(connection.id);
expect(result[0].alerts).toHaveLength(2); // Both alerts should be in the same task
const alertIds = result[0].alerts.map(a => a.alert.id).sort();
expect(alertIds).toEqual([savedSearchAlert.id, tileAlert.id].sort());
});
it('should create separate tasks for different connections', async () => {
const team = await createTeam({ name: 'Test Team' });
// Create two different connections
const connection1 = await Connection.create({
team: team._id,
name: 'Connection 1',
host: 'http://localhost:8123',
username: 'test1',
password: 'test1',
});
const connection2 = await Connection.create({
team: team._id,
name: 'Connection 2',
host: 'http://localhost:8124',
username: 'test2',
password: 'test2',
});
// Create sources for each connection
const source1 = await Source.create({
team: team._id,
name: 'Source 1',
kind: 'log',
from: {
databaseName: 'default',
tableName: 'logs',
},
timestampValueExpression: 'timestamp',
connection: connection1._id,
});
const source2 = await Source.create({
team: team._id,
name: 'Source 2',
kind: 'log',
from: {
databaseName: 'default',
tableName: 'logs',
},
timestampValueExpression: 'timestamp',
connection: connection2._id,
});
// Create saved searches and alerts
const savedSearch1 = await SavedSearch.create({
team: team._id,
name: 'Search 1',
select: 'message',
where: 'level: error',
whereLanguage: 'lucene',
orderBy: 'timestamp',
source: source1._id,
tags: [],
});
const savedSearch2 = await SavedSearch.create({
team: team._id,
name: 'Search 2',
select: 'message',
where: 'level: warn',
whereLanguage: 'lucene',
orderBy: 'timestamp',
source: source2._id,
tags: [],
});
await createAlert(
team._id,
{
source: AlertSource.SAVED_SEARCH,
savedSearchId: savedSearch1._id.toString(),
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
},
new mongoose.Types.ObjectId(),
);
await createAlert(
team._id,
{
source: AlertSource.SAVED_SEARCH,
savedSearchId: savedSearch2._id.toString(),
threshold: 15,
thresholdType: AlertThresholdType.ABOVE,
interval: '15m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
},
new mongoose.Types.ObjectId(),
);
const result = await provider.getAlertTasks();
expect(result).toHaveLength(2); // Should create separate tasks
const connectionIds = result.map(task => task.conn.id).sort();
expect(connectionIds).toEqual([connection1.id, connection2.id].sort());
// Each task should have one alert
expect(result[0].alerts).toHaveLength(1);
expect(result[1].alerts).toHaveLength(1);
});
it('should skip alerts with missing dashboard', async () => {
const team = await createTeam({ name: 'Test Team' });
// Create alert directly in database with non-existent dashboard
// This simulates an alert that exists but references a deleted dashboard
await Alert.create({
team: team._id,
source: AlertSource.TILE,
dashboard: new mongoose.Types.ObjectId(), // Non-existent ID
tileId: 'some-tile-id',
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
});
const result = await provider.getAlertTasks();
expect(result).toEqual([]);
});
it('should skip alerts with missing tile in dashboard', async () => {
const team = await createTeam({ name: 'Test Team' });
const dashboard = await Dashboard.create({
team: team._id,
name: 'Test Dashboard',
tiles: [makeTile({ id: 'existing-tile' })],
});
// Create alert directly in database with non-existent tile ID
await Alert.create({
team: team._id,
source: AlertSource.TILE,
dashboard: dashboard._id,
tileId: 'non-existent-tile', // Non-existent tile ID
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
});
const result = await provider.getAlertTasks();
expect(result).toEqual([]);
});
it('should skip alerts with missing source', async () => {
const team = await createTeam({ name: 'Test Team' });
const tile = makeTile({ id: 'test-tile' });
tile.config.source = new mongoose.Types.ObjectId().toString(); // Non-existent source
const dashboard = await Dashboard.create({
team: team._id,
name: 'Test Dashboard',
tiles: [tile],
});
// Create alert directly in database
await Alert.create({
team: team._id,
source: AlertSource.TILE,
dashboard: dashboard._id,
tileId: tile.id,
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
});
const result = await provider.getAlertTasks();
expect(result).toEqual([]);
});
it('should skip alerts with missing connection', async () => {
const team = await createTeam({ name: 'Test Team' });
// Create source with non-existent connection
const source = await Source.create({
team: team._id,
name: 'Test Source',
kind: 'log',
from: {
databaseName: 'default',
tableName: 'logs',
},
timestampValueExpression: 'timestamp',
connection: new mongoose.Types.ObjectId(), // Non-existent connection
});
const savedSearch = await SavedSearch.create({
team: team._id,
name: 'Test Search',
select: 'message',
where: 'level: error',
whereLanguage: 'lucene',
orderBy: 'timestamp',
source: source._id,
tags: [],
});
// Create alert directly in database
await Alert.create({
team: team._id,
source: AlertSource.SAVED_SEARCH,
savedSearch: savedSearch._id,
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
});
const result = await provider.getAlertTasks();
expect(result).toEqual([]);
});
it('should skip alerts with unsupported source type', async () => {
const team = await createTeam({ name: 'Test Team' });
// Create alert with invalid source
await Alert.create({
team: team._id,
source: 'UNSUPPORTED_SOURCE' as any,
threshold: 10,
thresholdType: AlertThresholdType.ABOVE,
interval: '5m',
channel: {
type: 'webhook',
webhookId: new mongoose.Types.ObjectId().toString(),
},
});
const result = await provider.getAlertTasks();
expect(result).toEqual([]);
});
});
describe('buildLogSearchLink', () => {
@ -30,7 +603,6 @@ describe('DefaultAlertProvider', () => {
it('should handle different saved search IDs', () => {
const customSavedSearch: any = {
id: 'custom-search-123',
_id: new mongoose.Types.ObjectId(),
team: new mongoose.Types.ObjectId(),
source: new mongoose.Types.ObjectId(),
select: 'Body',
@ -96,7 +668,6 @@ describe('DefaultAlertProvider', () => {
it('should handle saved search ID with special characters', () => {
const specialSavedSearch: any = {
id: 'search-with-special-chars-123_456',
_id: new mongoose.Types.ObjectId(),
team: new mongoose.Types.ObjectId(),
source: new mongoose.Types.ObjectId(),
select: 'Body',

View file

@ -1,3 +1,4 @@
import { Tile } from '@hyperdx/common-utils/dist/types';
import mongoose from 'mongoose';
import ms from 'ms';
import { URLSearchParams } from 'url';
@ -5,11 +6,181 @@ import { URLSearchParams } from 'url';
import * as config from '@/config';
import { LOCAL_APP_TEAM } from '@/controllers/team';
import { connectDB, mongooseConnection } from '@/models';
import Alert from '@/models/alert';
import { ISavedSearch } from '@/models/savedSearch';
import Alert, { AlertSource, type IAlert } from '@/models/alert';
import Connection, { IConnection } from '@/models/connection';
import Dashboard from '@/models/dashboard';
import { type ISavedSearch, SavedSearch } from '@/models/savedSearch';
import { type ISource, Source } from '@/models/source';
import {
type AlertDetails,
type AlertProvider,
type AlertTask,
AlertTaskType,
} from '@/tasks/providers';
import { convertMsToGranularityString } from '@/utils/common';
import logger from '@/utils/logger';
import { AlertProvider, AlertTask } from './index';
async function getSavedSearchDetails(
alert: IAlert,
): Promise<[IConnection, AlertDetails] | []> {
const savedSearchId = alert.savedSearch;
const savedSearch = await SavedSearch.findOne({
_id: savedSearchId,
team: alert.team,
}).populate<Omit<ISavedSearch, 'source'> & { source: ISource }>({
path: 'source',
match: { team: alert.team },
});
if (!savedSearch) {
logger.error({
message: 'savedSearch not found',
savedSearchId,
alertId: alert.id,
});
return [];
}
const { source } = savedSearch;
const connId = source.connection;
const conn = await Connection.findOne({ _id: connId, team: alert.team });
if (!conn) {
logger.error({
message: 'connection not found',
alertId: alert.id,
connId,
savedSearchId,
});
return [];
}
return [
conn,
{
alert,
source,
taskType: AlertTaskType.SAVED_SEARCH,
savedSearch,
},
];
}
async function getTileDetails(
alert: IAlert,
): Promise<[IConnection, AlertDetails] | []> {
const dashboardId = alert.dashboard;
const tileId = alert.tileId;
const dashboard = await Dashboard.findOne({
_id: dashboardId,
team: alert.team,
});
if (!dashboard) {
logger.error({
message: 'dashboard not found',
dashboardId,
alertId: alert.id,
});
return [];
}
const tile = dashboard.tiles?.find((t: Tile) => t.id === tileId);
if (!tile) {
logger.error({
message: 'tile matching alert not found',
tileId,
dashboardId: dashboard._id,
alertId: alert.id,
});
return [];
}
const source = await Source.findOne({
_id: tile.config.source,
team: alert.team,
}).populate<Omit<ISource, 'connection'> & { connection: IConnection }>({
path: 'connection',
match: { team: alert.team },
});
if (!source) {
logger.error({
message: 'source not found',
sourceId: tile.config.source,
tileId,
dashboardId: dashboard._id,
alertId: alert.id,
});
return [];
}
if (!source.connection) {
logger.error({
message: 'connection not found',
alertId: alert.id,
tileId,
dashboardId: dashboard._id,
sourceId: source.id,
});
return [];
}
const connection = source.connection;
const sourceProps = source.toObject();
return [
connection,
{
alert,
source: { ...sourceProps, connection: connection.id },
taskType: AlertTaskType.TILE,
tile,
dashboard,
},
];
}
async function loadAlert(alert: IAlert, groupedTasks: Map<string, AlertTask>) {
if (!alert.source) {
throw new Error('alert does not have a source');
}
if (config.IS_LOCAL_APP_MODE) {
// The id is the 12 character string `_local_team_', which will become an ObjectId
// as the ASCII hex values, so 5f6c6f63616c5f7465616d5f.
alert.team = new mongoose.Types.ObjectId(LOCAL_APP_TEAM.id);
}
let conn: IConnection | undefined;
let details: AlertDetails | undefined;
switch (alert.source) {
case AlertSource.SAVED_SEARCH:
[conn, details] = await getSavedSearchDetails(alert);
break;
case AlertSource.TILE:
[conn, details] = await getTileDetails(alert);
break;
default:
throw new Error(`unsupported source: ${alert.source}`);
}
if (!details) {
throw new Error('failed to fetch alert details');
}
if (!conn) {
throw new Error('failed to fetch alert connection');
}
if (!groupedTasks.has(conn.id)) {
groupedTasks.set(conn.id, { alerts: [], conn });
}
const v = groupedTasks.get(conn.id);
if (!v) {
throw new Error(`provider did not set key ${conn.id} before appending`);
}
v.alerts.push(details);
}
export default class DefaultAlertProvider implements AlertProvider {
async init() {
@ -21,16 +192,23 @@ export default class DefaultAlertProvider implements AlertProvider {
}
async getAlertTasks(): Promise<AlertTask[]> {
const groupedTasks = new Map<string, AlertTask>();
const alerts = await Alert.find({});
if (config.IS_LOCAL_APP_MODE) {
alerts.forEach(_alert => {
// The id is the 12 character string `_local_team_', which will become an ObjectId
// as the ASCII hex values, so 5f6c6f63616c5f7465616d5f.
_alert.team = new mongoose.Types.ObjectId(LOCAL_APP_TEAM.id);
});
for (const alert of alerts) {
try {
await loadAlert(alert, groupedTasks);
} catch (e) {
logger.error({
message: `failed to load alert: ${e}`,
alertId: alert.id,
team: alert.team,
channel: alert.channel,
});
}
}
return [{ alerts }];
// Flatten out our groupings for execution
return Array.from(groupedTasks.values());
}
buildLogSearchLink({

View file

@ -1,11 +1,45 @@
import { AlertDocument } from '@/models/alert';
import { Tile } from '@hyperdx/common-utils/dist/types';
import { IAlert } from '@/models/alert';
import { IConnection } from '@/models/connection';
import { IDashboard } from '@/models/dashboard';
import { ISavedSearch } from '@/models/savedSearch';
import { ISource } from '@/models/source';
import DefaultAlertProvider from '@/tasks/providers/default';
import DefaultAlertProvider from './default';
export enum AlertTaskType {
SAVED_SEARCH,
TILE,
}
export type AlertTask = {
alerts: AlertDocument[];
};
// Details about the alert and the source for the alert. Depending on
// the taskType either:
// 1. the savedSearch field is required or
// 2. the tile and dashboard field are required
//
// The dependent typing means less null checks when using these values as
// the are required when the type is set accordingly.
export type AlertDetails = {
alert: IAlert;
source: ISource;
} & (
| {
taskType: AlertTaskType.SAVED_SEARCH;
savedSearch: Omit<ISavedSearch, 'source'>;
}
| {
taskType: AlertTaskType.TILE;
tile: Tile;
dashboard: IDashboard;
}
);
// AlertTask instances can carry metadata, of type T, for the provider that created
// them. The `metadata` field is only valid when T is defined to be a legal type.
export type AlertTask<T = never> = {
alerts: AlertDetails[];
conn: IConnection;
} & ([T] extends [never] ? unknown : { metadata: T });
export interface AlertProvider {
init(): Promise<void>;