feat(tasks): emit duration and success/failure counts for tasks (#1364)

Emits success/failure counter values as well as execution duration as a gauge for task execution. This allows monitoring the background task health using HyperDX alerts.
This commit is contained in:
Dan Hable 2025-11-13 17:26:22 -06:00 committed by GitHub
parent cfba5cb63d
commit 94a669d3ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 114 additions and 21 deletions

View file

@ -0,0 +1,5 @@
---
"@hyperdx/api": minor
---
Add metrics to task execution

View file

@ -14,7 +14,7 @@ import {
AlertTaskType,
loadProvider,
} from '@/tasks/checkAlerts/providers';
import { CheckAlertsTaskArgs } from '@/tasks/types';
import { CheckAlertsTaskArgs, TaskName } from '@/tasks/types';
jest.mock('@/tasks/checkAlerts/providers', () => {
return {
@ -65,7 +65,7 @@ describe('CheckAlertTask', () => {
});
it('should execute successfully with no alert tasks', async () => {
const args: CheckAlertsTaskArgs = { taskName: 'check-alerts' };
const args: CheckAlertsTaskArgs = { taskName: TaskName.CHECK_ALERTS };
const task = new CheckAlertTask(args);
mockAlertProvider.getAlertTasks.mockResolvedValue([]);
@ -83,7 +83,7 @@ describe('CheckAlertTask', () => {
it('should execute successfully with custom provider', async () => {
const args: CheckAlertsTaskArgs = {
taskName: 'check-alerts',
taskName: TaskName.CHECK_ALERTS,
provider: 'custom-provider',
};
const task = new CheckAlertTask(args);
@ -99,7 +99,7 @@ describe('CheckAlertTask', () => {
});
it('should process alert tasks', async () => {
const args: CheckAlertsTaskArgs = { taskName: 'check-alerts' };
const args: CheckAlertsTaskArgs = { taskName: TaskName.CHECK_ALERTS };
const task = new CheckAlertTask(args);
const mockAlert = {
@ -171,7 +171,7 @@ describe('CheckAlertTask', () => {
});
it("should ensure that the correct team's webhooks are passed to processAlert", async () => {
const args: CheckAlertsTaskArgs = { taskName: 'check-alerts' };
const args: CheckAlertsTaskArgs = { taskName: TaskName.CHECK_ALERTS };
const task = new CheckAlertTask(args);
// Create two teams

View file

@ -1,12 +1,17 @@
import { CronJob } from 'cron';
import minimist from 'minimist';
import { performance } from 'perf_hooks';
import { serializeError } from 'serialize-error';
import { RUN_SCHEDULED_TASKS_EXTERNALLY } from '@/config';
import CheckAlertTask from '@/tasks/checkAlerts';
import {
taskExecutionDurationGauge,
taskExecutionFailureCounter,
taskExecutionSuccessCounter,
timeExec,
} from '@/tasks/metrics';
import PingPongTask from '@/tasks/pingPongTask';
import { asTaskArgs, HdxTask, TaskArgs } from '@/tasks/types';
import { asTaskArgs, HdxTask, TaskArgs, TaskName } from '@/tasks/types';
import logger from '@/utils/logger';
import { tasksTracer } from './tracer';
@ -14,42 +19,49 @@ import { tasksTracer } from './tracer';
function createTask(argv: TaskArgs): HdxTask<TaskArgs> {
const taskName = argv.taskName;
switch (taskName) {
case 'check-alerts':
case TaskName.CHECK_ALERTS:
return new CheckAlertTask(argv);
case 'ping-pong':
case TaskName.PING_PONG:
return new PingPongTask(argv);
default:
throw new Error(`Unknown task name ${taskName}`);
}
}
const main = async (argv: TaskArgs) => {
async function main(argv: TaskArgs): Promise<void> {
await tasksTracer.startActiveSpan(argv.taskName || 'task', async span => {
const task: HdxTask<TaskArgs> = createTask(argv);
try {
const t0 = performance.now();
logger.info(`Task [${task.name()}] started at ${new Date()}`);
logger.info(`${task.name()} started at ${new Date()}`);
await task.execute();
logger.info(
`Task [${task.name()}] finished in ${(performance.now() - t0).toFixed(2)} ms`,
);
taskExecutionSuccessCounter.get(argv.taskName)?.add(1);
} catch (e: unknown) {
logger.error(
{
cause: e,
task,
},
`Task [${task.name()}] failed: ${serializeError(e)}`,
`${task.name()} failed: ${serializeError(e)}`,
);
taskExecutionFailureCounter.get(argv.taskName)?.add(1);
} finally {
await task.asyncDispose();
span.end();
}
});
};
}
// Entry point
const argv = asTaskArgs(minimist(process.argv.slice(2)));
const instrumentedMain = timeExec(main, duration => {
const gauge = taskExecutionDurationGauge.get(argv.taskName);
if (gauge) {
gauge.record(duration, { useCron: !RUN_SCHEDULED_TASKS_EXTERNALLY });
}
logger.info(`${argv.taskName} finished in ${duration.toFixed(2)} ms`);
});
// WARNING: the cron job will be enabled only in development mode
if (!RUN_SCHEDULED_TASKS_EXTERNALLY) {
logger.info('In-app cron job is enabled');
@ -57,7 +69,7 @@ if (!RUN_SCHEDULED_TASKS_EXTERNALLY) {
const job = CronJob.from({
cronTime: '0 * * * * *',
waitForCompletion: true,
onTick: async () => main(argv),
onTick: async () => instrumentedMain(argv),
errorHandler: async err => {
console.error(err);
},
@ -66,7 +78,7 @@ if (!RUN_SCHEDULED_TASKS_EXTERNALLY) {
});
} else {
logger.warn('In-app cron job is disabled');
main(argv)
instrumentedMain(argv)
.then(() => {
process.exit(0);
})

View file

@ -0,0 +1,71 @@
import {
Attributes,
Counter,
Gauge,
metrics,
ValueType,
} from '@opentelemetry/api';
import { performance } from 'perf_hooks';
import { TaskName } from '@/tasks/types';
const meter = metrics.getMeter('hyperdx-tasks');
export const taskExecutionSuccessCounter: Map<
TaskName,
Counter<Attributes>
> = new Map();
export const taskExecutionFailureCounter: Map<
TaskName,
Counter<Attributes>
> = new Map();
export const taskExecutionDurationGauge: Map<
TaskName,
Gauge<Attributes>
> = new Map();
for (const name of Object.values(TaskName)) {
taskExecutionSuccessCounter.set(
name,
meter.createCounter(`hyperdx.tasks.${name}.success`, {
description:
'Count of the number of times the task finished without exceptions.',
}),
);
taskExecutionFailureCounter.set(
name,
meter.createCounter(`hyperdx.tasks.${name}.failure`, {
description:
'Count of the number of times the task failed to finish because of an exception',
}),
);
taskExecutionDurationGauge.set(
name,
meter.createGauge(`hyperdx.tasks.${name}.duration`, {
description: `The wall time required for the ${name} task to complete execution.`,
unit: 'ms',
valueType: ValueType.DOUBLE,
}),
);
}
export function timeExec<T extends unknown[], R>(
fn: (...args: T) => Promise<R>,
recordFn?: (duration: number) => void,
) {
return async (...args: T) => {
const start = performance.now();
try {
return await fn(...args);
} finally {
if (recordFn) {
const end = performance.now();
recordFn(end - start);
}
}
};
}

View file

@ -1,15 +1,20 @@
import { z } from 'zod';
export enum TaskName {
PING_PONG = 'ping-pong',
CHECK_ALERTS = 'check-alerts',
}
/**
* Command line arguments structure for tasks.
* Contains task name and optional provider configuration.
*/
const pingTaskArgsSchema = z.object({
taskName: z.literal('ping-pong'),
taskName: z.literal(TaskName.PING_PONG),
});
const checkAlertsTaskArgsSchema = z.object({
taskName: z.literal('check-alerts'),
taskName: z.literal(TaskName.CHECK_ALERTS),
provider: z.string().optional(),
concurrency: z
.number()