feat: Use materialized views in alert execution (#1565)

Closes HDX-2899

# Summary

This PR updates the check alerts task so that it uses materialized views associated with a source, when possible.

The same `tryOptimizeConfigWithMaterializedView` function is used to identify a materialized view which can be used for an alert query, by first checking that the query has a supported granularity and date range, then by transforming the query and testing the transformed query using `EXPLAIN`. If a valid MV query is found, it is used to process the alert. If not, the original config is used.

Alerts date ranges are already aligned to the granularity specified in the chart config, which is a multiple of the MV granularity (if an MV is used) so we don't need to add additional logic to align the date range to the MV granularity.

## Demo

An alert running locally hits a materialized view: 

<img width="525" height="818" alt="Screenshot 2026-01-07 at 8 20 55 AM" src="https://github.com/user-attachments/assets/5b0be283-c3bf-4e34-b16c-604084be57bf" />
This commit is contained in:
Drew Davis 2026-01-07 10:27:54 -05:00 committed by GitHub
parent 5dded38f87
commit ebaebc14a0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 451 additions and 16 deletions

View file

@ -0,0 +1,6 @@
---
"@hyperdx/common-utils": patch
"@hyperdx/api": patch
---
feat: Use materialized views in alert execution

View file

@ -36,7 +36,7 @@ export const DEFAULT_METRICS_TABLE = {
let clickhouseClient: any;
const getClickhouseClient = async () => {
export const getTestFixtureClickHouseClient = async () => {
if (!clickhouseClient) {
clickhouseClient = createNativeClient({
url: config.CLICKHOUSE_HOST,
@ -60,7 +60,7 @@ const getClickhouseClient = async () => {
};
const healthCheck = async () => {
const client = await getClickhouseClient();
const client = await getTestFixtureClickHouseClient();
const result = await client.ping();
if (!result.success) {
logger.error({ error: result.error }, 'ClickHouse health check failed');
@ -72,7 +72,7 @@ const connectClickhouse = async () => {
// health check
await healthCheck();
const client = await getClickhouseClient();
const client = await getTestFixtureClickHouseClient();
await client.command({
query: `
CREATE TABLE IF NOT EXISTS ${DEFAULT_DATABASE}.${DEFAULT_LOGS_TABLE}
@ -371,7 +371,7 @@ export const getLoggedInAgent = async (server: MockServer) => {
// ------------------ Clickhouse ------------------
// ------------------------------------------------
export const executeSqlCommand = async (sql: string) => {
const client = await getClickhouseClient();
const client = await getTestFixtureClickHouseClient();
return await client.command({
query: sql,
clickhouse_settings: {
@ -393,7 +393,7 @@ export const clearClickhouseTables = async () => {
];
const promises: any = [];
const client = await getClickhouseClient();
const client = await getTestFixtureClickHouseClient();
for (const table of tables) {
promises.push(
client.command({
@ -419,11 +419,14 @@ export const selectAllLogs = async () => {
.then(res => res.json());
};
const _bulkInsertData = async (table: string, data: Record<string, any>[]) => {
export const bulkInsertData = async (
table: string,
data: Record<string, any>[],
) => {
if (!config.IS_CI) {
throw new Error('ONLY execute this in CI env 😈 !!!');
}
const client = await getClickhouseClient();
const client = await getTestFixtureClickHouseClient();
await client.insert({
table,
values: data,
@ -447,7 +450,7 @@ export const bulkInsertLogs = async (
if (!config.IS_CI) {
throw new Error('ONLY execute this in CI env 😈 !!!');
}
await _bulkInsertData(`${DEFAULT_DATABASE}.${DEFAULT_LOGS_TABLE}`, events);
await bulkInsertData(`${DEFAULT_DATABASE}.${DEFAULT_LOGS_TABLE}`, events);
};
export const bulkInsertMetricsGauge = async (
@ -462,7 +465,7 @@ export const bulkInsertMetricsGauge = async (
if (!config.IS_CI) {
throw new Error('ONLY execute this in CI env 😈 !!!');
}
await _bulkInsertData(
await bulkInsertData(
`${DEFAULT_DATABASE}.${DEFAULT_METRICS_TABLE.GAUGE}`,
metrics,
);
@ -482,7 +485,7 @@ export const bulkInsertMetricsSum = async (
if (!config.IS_CI) {
throw new Error('ONLY execute this in CI env 😈 !!!');
}
await _bulkInsertData(
await bulkInsertData(
`${DEFAULT_DATABASE}.${DEFAULT_METRICS_TABLE.SUM}`,
metrics,
);
@ -501,7 +504,7 @@ export const bulkInsertMetricsHistogram = async (
if (!config.IS_CI) {
throw new Error('ONLY execute this in CI env 😈 !!!');
}
await _bulkInsertData(
await bulkInsertData(
`${DEFAULT_DATABASE}.${DEFAULT_METRICS_TABLE.HISTOGRAM}`,
metrics,
);

View file

@ -11,11 +11,13 @@ import * as config from '@/config';
import { createAlert } from '@/controllers/alerts';
import { createTeam } from '@/controllers/team';
import {
bulkInsertData,
bulkInsertLogs,
bulkInsertMetricsGauge,
DEFAULT_DATABASE,
DEFAULT_METRICS_TABLE,
getServer,
getTestFixtureClickHouseClient,
makeTile,
} from '@/fixtures';
import Alert, { AlertSource, AlertThresholdType } from '@/models/alert';
@ -3849,6 +3851,416 @@ describe('checkAlerts', () => {
});
});
describe('processAlert with materialized views', () => {
const MV_TABLE_NAME = 'otel_logs_rollup_5m';
const server = getServer();
const createMV = async () => {
const client = await getTestFixtureClickHouseClient();
await client.command({
query: `
CREATE TABLE IF NOT EXISTS ${DEFAULT_DATABASE}.${MV_TABLE_NAME}
(
Timestamp DateTime,
ServiceName LowCardinality(String),
SeverityText LowCardinality(String),
count SimpleAggregateFunction(sum, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, SeverityText, Timestamp)
SETTINGS index_granularity = 8192
`,
clickhouse_settings: {
wait_end_of_query: 1,
},
});
};
const clearMV = async () => {
const client = await getTestFixtureClickHouseClient();
await client.command({
query: `TRUNCATE ${DEFAULT_DATABASE}.${MV_TABLE_NAME}`,
clickhouse_settings: {
wait_end_of_query: 1,
},
});
};
const createSavedSearchWithMVSource = async (savedSearchWhere: string) => {
const team = await createTeam({ name: 'My Team' });
const webhook = await new Webhook({
team: team._id,
service: 'slack',
url: 'https://hooks.slack.com/services/123',
name: 'My Webhook',
}).save();
const teamWebhooksById = new Map<string, typeof webhook>([
[webhook._id.toString(), webhook],
]);
const connection = await Connection.create({
team: team._id,
name: 'Default',
host: config.CLICKHOUSE_HOST,
username: config.CLICKHOUSE_USER,
password: config.CLICKHOUSE_PASSWORD,
});
const source = await Source.create({
kind: 'log',
team: team._id,
from: {
databaseName: 'default',
tableName: 'otel_logs',
},
timestampValueExpression: 'Timestamp',
connection: connection.id,
name: 'Logs',
materializedViews: [
{
databaseName: DEFAULT_DATABASE,
tableName: MV_TABLE_NAME,
dimensionColumns: 'ServiceName, SeverityText',
minGranularity: '5 minute',
timestampColumn: 'Timestamp',
aggregatedColumns: [
{
sourceColumn: '',
aggFn: 'count',
mvColumn: 'count',
},
],
},
],
});
const savedSearch = await new SavedSearch({
team: team._id,
name: 'My Search',
select: 'Body',
where: savedSearchWhere,
whereLanguage: 'lucene',
orderBy: 'Timestamp',
source: source.id,
tags: ['test'],
}).save();
const clickhouseClient = new ClickhouseClient({
host: connection.host,
username: connection.username,
password: connection.password,
});
return {
team,
clickhouseClient,
webhook,
savedSearch,
source,
connection,
teamWebhooksById,
};
};
beforeAll(async () => {
await server.start();
await createMV();
});
beforeEach(async () => {
jest
.spyOn(slack, 'postMessageToWebhook')
.mockResolvedValueOnce({ text: 'ok' });
jest.spyOn(checkAlert, 'handleSendGenericWebhook');
});
afterEach(async () => {
await server.clearDBs();
await clearMV();
jest.clearAllMocks();
});
afterAll(async () => {
const client = await getTestFixtureClickHouseClient();
await client.command({
query: `DROP TABLE IF EXISTS ${DEFAULT_DATABASE}.${MV_TABLE_NAME}`,
clickhouse_settings: { wait_end_of_query: 1 },
});
await server.stop();
});
it('should process alerts using materialized views when a compatible materialized view is available', async () => {
// Arrange
const {
team,
clickhouseClient,
webhook,
savedSearch,
source,
connection,
teamWebhooksById,
} = await createSavedSearchWithMVSource('SeverityText:"error"');
const alert = await createAlert(
team._id,
{
source: AlertSource.SAVED_SEARCH,
channel: {
type: 'webhook',
webhookId: webhook._id.toString(),
},
interval: '5m',
thresholdType: AlertThresholdType.ABOVE,
threshold: 1,
savedSearchId: savedSearch.id,
},
new mongoose.Types.ObjectId(),
);
const enhancedAlert: any = await Alert.findById(alert.id).populate([
'team',
'savedSearch',
]);
const details = {
alert: enhancedAlert,
source,
previousMap: new Map(),
taskType: AlertTaskType.SAVED_SEARCH,
savedSearch,
} satisfies AlertDetails;
const now = new Date('2023-11-16T22:12:00.000Z');
const eventMs = new Date('2023-11-16T22:05:00.000Z');
const eventNextMs = new Date('2023-11-16T22:10:00.000Z');
// Insert directly into the MV so that we can be sure the MV is being used
await bulkInsertData(`${DEFAULT_DATABASE}.${MV_TABLE_NAME}`, [
// logs from 22:05 - 22:10
{
ServiceName: 'api',
Timestamp: eventMs,
SeverityText: 'error',
count: 3,
},
// logs from 22:10 - 22:15
{
ServiceName: 'api',
Timestamp: eventNextMs,
SeverityText: 'error',
count: 1,
},
{
ServiceName: 'api',
Timestamp: eventNextMs,
SeverityText: 'info',
count: 2,
},
]);
// Act - Run alerts twice to cover two periods
let previousMap = await getPreviousAlertHistories(
[details.alert.id],
now,
);
await processAlert(
now,
{
...details,
previousMap,
},
clickhouseClient,
connection.id,
alertProvider,
teamWebhooksById,
);
const nextWindow = new Date('2023-11-16T22:15:00.000Z');
previousMap = await getPreviousAlertHistories(
[details.alert.id],
nextWindow,
);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
teamWebhooksById,
);
// Assert - Alert ran and has a state consistent with the data in the MV
expect((await Alert.findById(details.alert.id))!.state).toBe('ALERT');
const alertHistories = await AlertHistory.find({
alert: details.alert.id,
}).sort({
createdAt: 1,
});
expect(alertHistories.length).toBe(2);
expect(alertHistories[0].state).toBe('ALERT');
expect(alertHistories[0].counts).toBe(1);
expect(alertHistories[0].lastValues[0].count).toBe(3);
expect(alertHistories[0].createdAt).toEqual(
new Date('2023-11-16T22:10:00.000Z'),
);
expect(alertHistories[1].state).toBe('ALERT');
expect(alertHistories[1].counts).toBe(1);
expect(alertHistories[1].lastValues[0].count).toBe(1);
expect(alertHistories[1].createdAt).toEqual(
new Date('2023-11-16T22:15:00.000Z'),
);
});
it('should not use a materialized view when the query is incompatible with the available materialized view', async () => {
// Arrange
const {
team,
clickhouseClient,
webhook,
savedSearch,
source,
connection,
teamWebhooksById,
} = await createSavedSearchWithMVSource('Body:no'); // Body is not in the MV, so the MV should not be used
const mockUserId = new mongoose.Types.ObjectId();
const alert = await createAlert(
team._id,
{
source: AlertSource.SAVED_SEARCH,
channel: {
type: 'webhook',
webhookId: webhook._id.toString(),
},
interval: '5m',
thresholdType: AlertThresholdType.ABOVE,
threshold: 1,
savedSearchId: savedSearch.id,
},
mockUserId,
);
const enhancedAlert: any = await Alert.findById(alert.id).populate([
'team',
'savedSearch',
]);
const details = {
alert: enhancedAlert,
source,
previousMap: new Map(),
taskType: AlertTaskType.SAVED_SEARCH,
savedSearch,
} satisfies AlertDetails;
const now = new Date('2023-11-16T22:12:00.000Z');
const eventMs = new Date('2023-11-16T22:05:00.000Z');
const eventNextMs = new Date('2023-11-16T22:10:00.000Z');
// Insert directly into the MV so that we can be sure the MV is being used
await bulkInsertLogs([
// logs from 22:05 - 22:10
{
ServiceName: 'api',
Timestamp: eventMs,
SeverityText: 'error',
Body: 'Oh no! Something went wrong!',
},
{
ServiceName: 'api',
Timestamp: eventMs,
SeverityText: 'error',
Body: 'Oh no! Something went wrong!',
},
{
ServiceName: 'api',
Timestamp: eventMs,
SeverityText: 'error',
Body: 'Oh no! Something went wrong!',
},
// logs from 22:10 - 22:15
{
ServiceName: 'api',
Timestamp: eventNextMs,
SeverityText: 'error',
Body: 'Oh no! Something went wrong!',
},
{
ServiceName: 'api',
Timestamp: eventNextMs,
SeverityText: 'error',
Body: 'Oh no! Something went wrong!',
},
{
ServiceName: 'api',
Timestamp: eventNextMs,
SeverityText: 'info',
Body: 'Something went right for a change!',
},
]);
// Act - Run alerts twice to cover two periods
let previousMap = await getPreviousAlertHistories(
[details.alert.id],
now,
);
await processAlert(
now,
{
...details,
previousMap,
},
clickhouseClient,
connection.id,
alertProvider,
teamWebhooksById,
);
const nextWindow = new Date('2023-11-16T22:15:00.000Z');
previousMap = await getPreviousAlertHistories(
[details.alert.id],
nextWindow,
);
await processAlert(
nextWindow,
details,
clickhouseClient,
connection.id,
alertProvider,
teamWebhooksById,
);
// Assert - Alert ran and has a state consistent with the data in the base table
expect((await Alert.findById(details.alert.id))!.state).toBe('ALERT');
const alertHistories = await AlertHistory.find({
alert: details.alert.id,
}).sort({
createdAt: 1,
});
expect(alertHistories.length).toBe(2);
expect(alertHistories[0].state).toBe('ALERT');
expect(alertHistories[0].counts).toBe(1);
expect(alertHistories[0].lastValues[0].count).toBe(3);
expect(alertHistories[0].createdAt).toEqual(
new Date('2023-11-16T22:10:00.000Z'),
);
expect(alertHistories[1].state).toBe('ALERT');
expect(alertHistories[1].counts).toBe(1);
expect(alertHistories[1].lastValues[0].count).toBe(2);
expect(alertHistories[1].createdAt).toEqual(
new Date('2023-11-16T22:15:00.000Z'),
);
});
});
describe('getPreviousAlertHistories', () => {
const server = getServer();

View file

@ -5,6 +5,7 @@ import PQueue from '@esm2cjs/p-queue';
import * as clickhouse from '@hyperdx/common-utils/dist/clickhouse';
import { ResponseJSON } from '@hyperdx/common-utils/dist/clickhouse';
import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse/node';
import { tryOptimizeConfigWithMaterializedView } from '@hyperdx/common-utils/dist/core/materializedViews';
import {
getMetadata,
Metadata,
@ -430,10 +431,22 @@ export const processAlert = async (
return;
}
// Query for alert data
const metadata = getMetadata(clickhouseClient);
// Optimize chart config with materialized views, if available
const optimizedChartConfig = source?.materializedViews?.length
? await tryOptimizeConfigWithMaterializedView(
chartConfig,
metadata,
clickhouseClient,
undefined,
source,
)
: chartConfig;
// Query for alert data
const checksData = await clickhouseClient.queryChartConfig({
config: chartConfig,
config: optimizedChartConfig,
metadata,
});
@ -441,6 +454,7 @@ export const processAlert = async (
{
alertId: alert.id,
chartConfig,
optimizedChartConfig,
checksData,
checkStartTime: dateRange[0],
checkEndTime: dateRange[1],

View file

@ -353,7 +353,7 @@ async function tryOptimizeConfig<C extends ChartConfigWithOptDateRange>(
config: C,
metadata: Metadata,
clickhouseClient: BaseClickhouseClient,
signal: AbortSignal,
signal: AbortSignal | undefined,
mvConfig: MaterializedViewConfiguration,
sourceFrom: TSource['from'],
) {
@ -457,7 +457,7 @@ export async function tryOptimizeConfigWithMaterializedViewWithExplanations<
config: C,
metadata: Metadata,
clickhouseClient: BaseClickhouseClient,
signal: AbortSignal,
signal: AbortSignal | undefined,
source: Pick<TSource, 'from'> & Partial<Pick<TSource, 'materializedViews'>>,
): Promise<{
optimizedConfig?: C;
@ -511,7 +511,7 @@ export async function tryOptimizeConfigWithMaterializedView<
config: C,
metadata: Metadata,
clickhouseClient: BaseClickhouseClient,
signal: AbortSignal,
signal: AbortSignal | undefined,
source: Pick<TSource, 'from'> & Partial<Pick<TSource, 'materializedViews'>>,
) {
const { optimizedConfig } =