feat: limit how many tasks are executing at any time (#1103)

Allows user defined concurrency limit when evaluating check alert tasks. Includes some further refinements of the task execution types to make actual task execution more type safe.
This commit is contained in:
Dan Hable 2025-08-25 14:01:03 -05:00 committed by GitHub
parent bb2221a185
commit 261d4693a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 401 additions and 100 deletions

View file

@ -0,0 +1,5 @@
---
"@hyperdx/api": patch
---
feat: limit how many tasks are executing at any time

View file

@ -8,6 +8,7 @@
},
"dependencies": {
"@clickhouse/client": "^0.2.10",
"@esm2cjs/p-queue": "^7.3.0",
"@hyperdx/common-utils": "^0.3.2",
"@hyperdx/lucene": "^3.1.1",
"@hyperdx/node-opentelemetry": "^0.8.2",

View file

@ -14,7 +14,8 @@ describe('asTaskArgs', () => {
provider: 'default',
});
expect(result.taskName).toBe('command');
expect(result.provider).toBe('default');
// For non-check-alerts tasks, we need to use type assertion to access provider
expect((result as any).provider).toBe('default');
});
it('should return valid TaskArgs when provider is undefined', () => {
@ -29,18 +30,8 @@ describe('asTaskArgs', () => {
provider: undefined,
});
expect(result.taskName).toBe('command');
expect(result.provider).toBeUndefined();
});
it('should throw error when provider is empty string', () => {
const invalidArgs = {
_: ['command'],
provider: '',
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Provider must contain valid characters',
);
// For non-check-alerts tasks, we need to use type assertion to access provider
expect((result as any).provider).toBeUndefined();
});
it('should throw error for null input', () => {
@ -95,21 +86,6 @@ describe('asTaskArgs', () => {
);
});
it('should throw error when provider is not a string', () => {
const invalidProviders = [123, null, { name: 'default' }, ['default']];
invalidProviders.forEach(provider => {
const invalidArgs = {
_: ['command'],
provider,
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Provider must be a string if provided',
);
});
});
it('should handle empty array for _ property', () => {
const validArgs = {
_: [],
@ -169,6 +145,11 @@ describe('asTaskArgs', () => {
taskName: 'check-alerts',
provider: 'default',
});
expect(result.taskName).toBe('check-alerts');
// For check-alerts tasks, provider property is directly accessible
if (result.taskName === 'check-alerts') {
expect(result.provider).toBe('default');
}
});
it('should accept check-alerts task without provider', () => {
@ -182,16 +163,257 @@ describe('asTaskArgs', () => {
taskName: 'check-alerts',
provider: undefined,
});
expect(result.taskName).toBe('check-alerts');
// For check-alerts tasks, provider property is directly accessible
if (result.taskName === 'check-alerts') {
expect(result.provider).toBeUndefined();
}
});
it('should throw error when provider is whitespace-only', () => {
const invalidArgs = {
_: ['command'],
provider: ' ',
it('should accept ping-pong task without provider', () => {
const validArgs = {
_: ['ping-pong'],
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Provider must contain valid characters',
);
const result = asTaskArgs(validArgs);
expect(result).toEqual({
taskName: 'ping-pong',
});
expect(result.taskName).toBe('ping-pong');
// Ping-pong tasks should not have a provider property
expect('provider' in result).toBe(false);
});
describe('concurrency parameter validation', () => {
it('should accept check-alerts task with valid concurrency', () => {
const validArgs = {
_: ['check-alerts'],
concurrency: 4,
};
const result = asTaskArgs(validArgs);
expect(result).toEqual({
taskName: 'check-alerts',
provider: undefined,
concurrency: 4,
});
expect(result.taskName).toBe('check-alerts');
if (result.taskName === 'check-alerts') {
expect(result.concurrency).toBe(4);
}
});
it('should accept check-alerts task with concurrency value of 1', () => {
const validArgs = {
_: ['check-alerts'],
concurrency: 1,
};
const result = asTaskArgs(validArgs);
expect(result).toEqual({
taskName: 'check-alerts',
provider: undefined,
concurrency: 1,
});
expect(result.taskName).toBe('check-alerts');
if (result.taskName === 'check-alerts') {
expect(result.concurrency).toBe(1);
}
});
it('should accept check-alerts task with large concurrency values', () => {
const validArgs = {
_: ['check-alerts'],
concurrency: 100,
};
const result = asTaskArgs(validArgs);
expect(result).toEqual({
taskName: 'check-alerts',
provider: undefined,
concurrency: 100,
});
expect(result.taskName).toBe('check-alerts');
if (result.taskName === 'check-alerts') {
expect(result.concurrency).toBe(100);
}
});
it('should accept check-alerts task without concurrency parameter', () => {
const validArgs = {
_: ['check-alerts'],
provider: 'default',
};
const result = asTaskArgs(validArgs);
expect(result).toEqual({
taskName: 'check-alerts',
provider: 'default',
concurrency: undefined,
});
expect(result.taskName).toBe('check-alerts');
if (result.taskName === 'check-alerts') {
expect(result.concurrency).toBeUndefined();
}
});
it('should throw error when concurrency is not a number', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: 'invalid',
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency must be a number if provided',
);
});
it('should throw error when concurrency is boolean', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: true,
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency must be a number if provided',
);
});
it('should throw error when concurrency is null', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: null,
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency must be a number if provided',
);
});
it('should throw error when concurrency is an object', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: { value: 4 },
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency must be a number if provided',
);
});
it('should throw error when concurrency is an array', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: [4],
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency must be a number if provided',
);
});
it('should throw error when concurrency is zero', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: 0,
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency cannot be less than 1',
);
});
it('should throw error when concurrency is negative', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: -1,
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency cannot be less than 1',
);
});
it('should throw error when concurrency is a negative decimal', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: -0.5,
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency must be an integer if provided',
);
});
it('should throw error when concurrency is a positive decimal', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: 2.5,
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency must be an integer if provided',
);
});
it('should throw error when concurrency is a small decimal', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: 1.1,
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency must be an integer if provided',
);
});
it('should throw error when concurrency is a large decimal', () => {
const invalidArgs = {
_: ['check-alerts'],
concurrency: 100.999,
};
expect(() => asTaskArgs(invalidArgs)).toThrow(
'Concurrency must be an integer if provided',
);
});
it('should ignore concurrency parameter for non-check-alerts tasks', () => {
const validArgs = {
_: ['ping-pong'],
concurrency: 4,
};
const result = asTaskArgs(validArgs);
expect(result).toEqual({
taskName: 'ping-pong',
});
expect(result.taskName).toBe('ping-pong');
// Ping-pong tasks should not have a concurrency property
expect('concurrency' in result).toBe(false);
});
it('should ignore concurrency parameter for unknown task types', () => {
const validArgs = {
_: ['unknown-task'],
concurrency: 4,
};
const result = asTaskArgs(validArgs);
expect(result).toEqual({
taskName: 'unknown-task',
provider: undefined,
});
expect(result.taskName).toBe('unknown-task');
// Unknown task types should not process concurrency parameter
expect('concurrency' in result).toBe(false);
});
});
});

View file

@ -1,6 +1,7 @@
// --------------------------------------------------------
// -------------- EXECUTE EVERY MINUTE --------------------
// --------------------------------------------------------
import PQueue from '@esm2cjs/p-queue';
import * as clickhouse from '@hyperdx/common-utils/dist/clickhouse';
import { getMetadata, Metadata } from '@hyperdx/common-utils/dist/metadata';
import {
@ -30,7 +31,7 @@ import {
handleSendGenericWebhook,
renderAlertTemplate,
} from '@/tasks/template';
import { HdxTask, TaskArgs } from '@/tasks/types';
import { CheckAlertsTaskArgs, HdxTask } from '@/tasks/types';
import { roundDownToXMinutes, unflattenObject } from '@/tasks/util';
import logger from '@/utils/logger';
@ -365,38 +366,49 @@ export const processAlert = async (
}
};
export const processAlertTask = async (
now: Date,
alertTask: AlertTask,
alertProvider: AlertProvider,
) => {
const { alerts, conn } = alertTask;
logger.info({
message: 'Processing alerts in batch',
alertCount: alerts.length,
});
const clickhouseClient = new clickhouse.ClickhouseClient({
host: conn.host,
username: conn.username,
password: conn.password,
});
const p: Promise<void>[] = [];
for (const alert of alerts) {
p.push(processAlert(now, alert, clickhouseClient, conn.id, alertProvider));
}
await Promise.all(p);
};
// Re-export handleSendGenericWebhook for testing
export { handleSendGenericWebhook };
export default class CheckAlertTask implements HdxTask {
export default class CheckAlertTask implements HdxTask<CheckAlertsTaskArgs> {
private provider!: AlertProvider;
private task_queue: PQueue;
async execute(args: TaskArgs): Promise<void> {
this.provider = await loadProvider(args.provider);
constructor(private args: CheckAlertsTaskArgs) {
const concurrency = this.args.concurrency;
this.task_queue = new PQueue({
autoStart: true,
...(concurrency ? { concurrency } : null),
});
}
async processAlertTask(now: Date, alertTask: AlertTask) {
const { alerts, conn } = alertTask;
logger.info({
message: 'Processing alerts in batch',
alertCount: alerts.length,
});
const clickhouseClient = new clickhouse.ClickhouseClient({
host: conn.host,
username: conn.username,
password: conn.password,
});
for (const alert of alerts) {
await this.task_queue.add(() =>
processAlert(now, alert, clickhouseClient, conn.id, this.provider),
);
}
}
async execute(): Promise<void> {
if (this.args.taskName !== 'check-alerts') {
throw new Error(
`CheckAlertTask can only handle 'check-alerts' tasks, received: ${this.args.taskName}`,
);
}
this.provider = await loadProvider(this.args.provider);
await this.provider.init();
const now = new Date();
@ -407,8 +419,14 @@ export default class CheckAlertTask implements HdxTask {
});
for (const task of alertTasks) {
await processAlertTask(now, task, this.provider);
await this.task_queue.add(() => this.processAlertTask(now, task));
}
await this.task_queue.onIdle();
}
name(): string {
return this.args.taskName;
}
async asyncDispose(): Promise<void> {

View file

@ -9,26 +9,26 @@ import PingPongTask from '@/tasks/pingPongTask';
import { asTaskArgs, HdxTask, TaskArgs } from '@/tasks/types';
import logger from '@/utils/logger';
function createTask(taskName: string): HdxTask {
function createTask(argv: TaskArgs): HdxTask<TaskArgs> {
const taskName = argv.taskName;
switch (taskName) {
case 'check-alerts':
return new CheckAlertTask();
return new CheckAlertTask(argv);
case 'ping-pong':
return new PingPongTask();
return new PingPongTask(argv);
default:
throw new Error(`Unknown task name ${taskName}`);
}
}
const main = async (argv: TaskArgs) => {
const taskName = argv.taskName;
const task: HdxTask = createTask(taskName);
const task: HdxTask<TaskArgs> = createTask(argv);
try {
const t0 = performance.now();
logger.info(`Task [${taskName}] started at ${new Date()}`);
await task.execute(argv);
logger.info(`Task [${task.name()}] started at ${new Date()}`);
await task.execute();
logger.info(
`Task [${taskName}] finished in ${(performance.now() - t0).toFixed(2)} ms`,
`Task [${task.name()}] finished in ${(performance.now() - t0).toFixed(2)} ms`,
);
} finally {
await task.asyncDispose();

View file

@ -1,9 +1,11 @@
import { HdxTask, TaskArgs } from '@/tasks/types';
import { HdxTask, PingTaskArgs } from '@/tasks/types';
import logger from '@/utils/logger';
export default class PingPongTask implements HdxTask {
export default class PingPongTask implements HdxTask<PingTaskArgs> {
constructor(private args: PingTaskArgs) {}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async execute(args: TaskArgs): Promise<void> {
async execute(): Promise<void> {
logger.info(`
O .
_/|\\_-O
@ -22,5 +24,9 @@ export default class PingPongTask implements HdxTask {
`);
}
name(): string {
return this.args.taskName;
}
async asyncDispose(): Promise<void> {}
}

View file

@ -1,10 +1,18 @@
import { ParsedArgs } from 'minimist';
/**
* Command line arguments structure for tasks.
* Contains task name and optional provider configuration.
*/
export type TaskArgs = { taskName: string; provider?: string };
export type PingTaskArgs = { taskName: 'ping-pong' };
export type CheckAlertsTaskArgs = {
taskName: 'check-alerts';
// name of the provider module to use for fetching alert task data. If not defined,
// the default provider will be used.
provider?: string;
// Limits number of concurrent tasks processed. If omitted, there is no concurrency
// limit. Must be an integer greater than 0.
concurrency?: number;
};
export type TaskArgs = PingTaskArgs | CheckAlertsTaskArgs;
/**
* Validates and converts command line arguments to TaskArgs type.
@ -33,44 +41,67 @@ export function asTaskArgs(argv: any): TaskArgs {
throw new Error('All arguments in "_" array must be strings');
}
if (argv.provider !== undefined && typeof argv.provider !== 'string') {
throw new Error('Provider must be a string if provided');
}
const taskName = argv._[0];
if (taskName === 'check-alerts') {
const { provider, concurrency } = argv;
if (provider) {
if (typeof provider !== 'string') {
throw new Error('Provider must be a string if provided');
}
// Provider is optional for check-alerts task, but if provided must be valid
if (
argv._[0] === 'check-alerts' &&
argv.provider !== undefined &&
argv.provider.trim() === ''
) {
throw new Error('Provider must contain valid characters');
}
if (provider.trim() === '') {
throw new Error('Provider must contain valid characters');
}
}
// Provider must contain valid characters if provided (for non-check-alerts tasks)
if (argv.provider !== undefined && argv.provider.trim() === '') {
throw new Error('Provider must contain valid characters');
}
if (concurrency !== undefined) {
if (typeof concurrency !== 'number') {
throw new Error('Concurrency must be a number if provided');
}
return {
taskName: argv._[0],
provider: argv.provider,
} as TaskArgs;
if (!Number.isInteger(concurrency)) {
throw new Error('Concurrency must be an integer if provided');
}
if (concurrency < 1) {
throw new Error('Concurrency cannot be less than 1');
}
}
return {
taskName: 'check-alerts',
provider: provider,
concurrency: concurrency,
};
} else if (taskName === 'ping-pong') {
return {
taskName: 'ping-pong',
};
} else {
// For any other task names, create a generic structure without provider
return {
taskName,
provider: argv.provider,
} as TaskArgs;
}
}
/**
* Interface for HyperDX task implementations.
* All tasks must implement execute and asyncDispose methods.
*/
export interface HdxTask {
export interface HdxTask<T extends TaskArgs> {
/**
* Executes the main task logic with validated arguments.
* @param args - Validated command line arguments
*/
execute(args: TaskArgs): Promise<void>;
execute(): Promise<void>;
/**
* Performs cleanup operations when the task is finished.
* Should dispose of any resources held by the task.
*/
asyncDispose(): Promise<void>;
name(): string;
}

View file

@ -4302,6 +4302,23 @@ __metadata:
languageName: node
linkType: hard
"@esm2cjs/p-queue@npm:^7.3.0":
version: 7.3.0
resolution: "@esm2cjs/p-queue@npm:7.3.0"
dependencies:
"@esm2cjs/p-timeout": "npm:^5.0.2"
eventemitter3: "npm:^4.0.7"
checksum: 10c0/95ee99d71781d02a8b264be893aa3c6a0292123de3e52a7fc80ccdfd4f426f3e7d93862c2f386e1ef266a6c61d29c921fb5343cccabbf2ecbf88888a6278a592
languageName: node
linkType: hard
"@esm2cjs/p-timeout@npm:^5.0.2":
version: 5.1.0
resolution: "@esm2cjs/p-timeout@npm:5.1.0"
checksum: 10c0/55019425b87a2c0ac1aca16107facedef256f0fcb4e845a3e79d971aa23d019e8db38a342ada82a6c1520c63b00f178ade8c1f113bfd2592bd3b93cbe6c20961
languageName: node
linkType: hard
"@fal-works/esbuild-plugin-global-externals@npm:^2.1.2":
version: 2.1.2
resolution: "@fal-works/esbuild-plugin-global-externals@npm:2.1.2"
@ -4472,6 +4489,7 @@ __metadata:
resolution: "@hyperdx/api@workspace:packages/api"
dependencies:
"@clickhouse/client": "npm:^0.2.10"
"@esm2cjs/p-queue": "npm:^7.3.0"
"@hyperdx/common-utils": "npm:^0.3.2"
"@hyperdx/lucene": "npm:^3.1.1"
"@hyperdx/node-opentelemetry": "npm:^0.8.2"
@ -16253,7 +16271,7 @@ __metadata:
languageName: node
linkType: hard
"eventemitter3@npm:^4.0.0, eventemitter3@npm:^4.0.1":
"eventemitter3@npm:^4.0.0, eventemitter3@npm:^4.0.1, eventemitter3@npm:^4.0.7":
version: 4.0.7
resolution: "eventemitter3@npm:4.0.7"
checksum: 10c0/5f6d97cbcbac47be798e6355e3a7639a84ee1f7d9b199a07017f1d2f1e2fe236004d14fa5dfaeba661f94ea57805385e326236a6debbc7145c8877fbc0297c6b