diff --git a/.changeset/six-squids-stare.md b/.changeset/six-squids-stare.md new file mode 100644 index 00000000..42c46da0 --- /dev/null +++ b/.changeset/six-squids-stare.md @@ -0,0 +1,5 @@ +--- +"@hyperdx/api": patch +--- + +feat: limit how many tasks are executing at any time diff --git a/packages/api/package.json b/packages/api/package.json index a07e5c6b..6ec4e809 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -8,6 +8,7 @@ }, "dependencies": { "@clickhouse/client": "^0.2.10", + "@esm2cjs/p-queue": "^7.3.0", "@hyperdx/common-utils": "^0.3.2", "@hyperdx/lucene": "^3.1.1", "@hyperdx/node-opentelemetry": "^0.8.2", diff --git a/packages/api/src/tasks/__tests__/types.test.ts b/packages/api/src/tasks/__tests__/types.test.ts index 0b8469c2..d0ebb764 100644 --- a/packages/api/src/tasks/__tests__/types.test.ts +++ b/packages/api/src/tasks/__tests__/types.test.ts @@ -14,7 +14,8 @@ describe('asTaskArgs', () => { provider: 'default', }); expect(result.taskName).toBe('command'); - expect(result.provider).toBe('default'); + // For non-check-alerts tasks, we need to use type assertion to access provider + expect((result as any).provider).toBe('default'); }); it('should return valid TaskArgs when provider is undefined', () => { @@ -29,18 +30,8 @@ describe('asTaskArgs', () => { provider: undefined, }); expect(result.taskName).toBe('command'); - expect(result.provider).toBeUndefined(); - }); - - it('should throw error when provider is empty string', () => { - const invalidArgs = { - _: ['command'], - provider: '', - }; - - expect(() => asTaskArgs(invalidArgs)).toThrow( - 'Provider must contain valid characters', - ); + // For non-check-alerts tasks, we need to use type assertion to access provider + expect((result as any).provider).toBeUndefined(); }); it('should throw error for null input', () => { @@ -95,21 +86,6 @@ describe('asTaskArgs', () => { ); }); - it('should throw error when provider is not a string', () => { - const invalidProviders = [123, null, { name: 'default' }, ['default']]; - - invalidProviders.forEach(provider => { - const invalidArgs = { - _: ['command'], - provider, - }; - - expect(() => asTaskArgs(invalidArgs)).toThrow( - 'Provider must be a string if provided', - ); - }); - }); - it('should handle empty array for _ property', () => { const validArgs = { _: [], @@ -169,6 +145,11 @@ describe('asTaskArgs', () => { taskName: 'check-alerts', provider: 'default', }); + expect(result.taskName).toBe('check-alerts'); + // For check-alerts tasks, provider property is directly accessible + if (result.taskName === 'check-alerts') { + expect(result.provider).toBe('default'); + } }); it('should accept check-alerts task without provider', () => { @@ -182,16 +163,257 @@ describe('asTaskArgs', () => { taskName: 'check-alerts', provider: undefined, }); + expect(result.taskName).toBe('check-alerts'); + // For check-alerts tasks, provider property is directly accessible + if (result.taskName === 'check-alerts') { + expect(result.provider).toBeUndefined(); + } }); - it('should throw error when provider is whitespace-only', () => { - const invalidArgs = { - _: ['command'], - provider: ' ', + it('should accept ping-pong task without provider', () => { + const validArgs = { + _: ['ping-pong'], }; - expect(() => asTaskArgs(invalidArgs)).toThrow( - 'Provider must contain valid characters', - ); + const result = asTaskArgs(validArgs); + + expect(result).toEqual({ + taskName: 'ping-pong', + }); + expect(result.taskName).toBe('ping-pong'); + // Ping-pong tasks should not have a provider property + expect('provider' in result).toBe(false); + }); + + describe('concurrency parameter validation', () => { + it('should accept check-alerts task with valid concurrency', () => { + const validArgs = { + _: ['check-alerts'], + concurrency: 4, + }; + + const result = asTaskArgs(validArgs); + + expect(result).toEqual({ + taskName: 'check-alerts', + provider: undefined, + concurrency: 4, + }); + expect(result.taskName).toBe('check-alerts'); + if (result.taskName === 'check-alerts') { + expect(result.concurrency).toBe(4); + } + }); + + it('should accept check-alerts task with concurrency value of 1', () => { + const validArgs = { + _: ['check-alerts'], + concurrency: 1, + }; + + const result = asTaskArgs(validArgs); + + expect(result).toEqual({ + taskName: 'check-alerts', + provider: undefined, + concurrency: 1, + }); + expect(result.taskName).toBe('check-alerts'); + if (result.taskName === 'check-alerts') { + expect(result.concurrency).toBe(1); + } + }); + + it('should accept check-alerts task with large concurrency values', () => { + const validArgs = { + _: ['check-alerts'], + concurrency: 100, + }; + + const result = asTaskArgs(validArgs); + + expect(result).toEqual({ + taskName: 'check-alerts', + provider: undefined, + concurrency: 100, + }); + expect(result.taskName).toBe('check-alerts'); + if (result.taskName === 'check-alerts') { + expect(result.concurrency).toBe(100); + } + }); + + it('should accept check-alerts task without concurrency parameter', () => { + const validArgs = { + _: ['check-alerts'], + provider: 'default', + }; + + const result = asTaskArgs(validArgs); + + expect(result).toEqual({ + taskName: 'check-alerts', + provider: 'default', + concurrency: undefined, + }); + expect(result.taskName).toBe('check-alerts'); + if (result.taskName === 'check-alerts') { + expect(result.concurrency).toBeUndefined(); + } + }); + + it('should throw error when concurrency is not a number', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: 'invalid', + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency must be a number if provided', + ); + }); + + it('should throw error when concurrency is boolean', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: true, + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency must be a number if provided', + ); + }); + + it('should throw error when concurrency is null', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: null, + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency must be a number if provided', + ); + }); + + it('should throw error when concurrency is an object', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: { value: 4 }, + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency must be a number if provided', + ); + }); + + it('should throw error when concurrency is an array', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: [4], + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency must be a number if provided', + ); + }); + + it('should throw error when concurrency is zero', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: 0, + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency cannot be less than 1', + ); + }); + + it('should throw error when concurrency is negative', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: -1, + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency cannot be less than 1', + ); + }); + + it('should throw error when concurrency is a negative decimal', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: -0.5, + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency must be an integer if provided', + ); + }); + + it('should throw error when concurrency is a positive decimal', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: 2.5, + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency must be an integer if provided', + ); + }); + + it('should throw error when concurrency is a small decimal', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: 1.1, + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency must be an integer if provided', + ); + }); + + it('should throw error when concurrency is a large decimal', () => { + const invalidArgs = { + _: ['check-alerts'], + concurrency: 100.999, + }; + + expect(() => asTaskArgs(invalidArgs)).toThrow( + 'Concurrency must be an integer if provided', + ); + }); + + it('should ignore concurrency parameter for non-check-alerts tasks', () => { + const validArgs = { + _: ['ping-pong'], + concurrency: 4, + }; + + const result = asTaskArgs(validArgs); + + expect(result).toEqual({ + taskName: 'ping-pong', + }); + expect(result.taskName).toBe('ping-pong'); + // Ping-pong tasks should not have a concurrency property + expect('concurrency' in result).toBe(false); + }); + + it('should ignore concurrency parameter for unknown task types', () => { + const validArgs = { + _: ['unknown-task'], + concurrency: 4, + }; + + const result = asTaskArgs(validArgs); + + expect(result).toEqual({ + taskName: 'unknown-task', + provider: undefined, + }); + expect(result.taskName).toBe('unknown-task'); + // Unknown task types should not process concurrency parameter + expect('concurrency' in result).toBe(false); + }); }); }); diff --git a/packages/api/src/tasks/checkAlerts.ts b/packages/api/src/tasks/checkAlerts.ts index c34f659e..2b139c39 100644 --- a/packages/api/src/tasks/checkAlerts.ts +++ b/packages/api/src/tasks/checkAlerts.ts @@ -1,6 +1,7 @@ // -------------------------------------------------------- // -------------- EXECUTE EVERY MINUTE -------------------- // -------------------------------------------------------- +import PQueue from '@esm2cjs/p-queue'; import * as clickhouse from '@hyperdx/common-utils/dist/clickhouse'; import { getMetadata, Metadata } from '@hyperdx/common-utils/dist/metadata'; import { @@ -30,7 +31,7 @@ import { handleSendGenericWebhook, renderAlertTemplate, } from '@/tasks/template'; -import { HdxTask, TaskArgs } from '@/tasks/types'; +import { CheckAlertsTaskArgs, HdxTask } from '@/tasks/types'; import { roundDownToXMinutes, unflattenObject } from '@/tasks/util'; import logger from '@/utils/logger'; @@ -365,38 +366,49 @@ export const processAlert = async ( } }; -export const processAlertTask = async ( - now: Date, - alertTask: AlertTask, - alertProvider: AlertProvider, -) => { - const { alerts, conn } = alertTask; - logger.info({ - message: 'Processing alerts in batch', - alertCount: alerts.length, - }); - - const clickhouseClient = new clickhouse.ClickhouseClient({ - host: conn.host, - username: conn.username, - password: conn.password, - }); - - const p: Promise[] = []; - for (const alert of alerts) { - p.push(processAlert(now, alert, clickhouseClient, conn.id, alertProvider)); - } - await Promise.all(p); -}; - // Re-export handleSendGenericWebhook for testing export { handleSendGenericWebhook }; -export default class CheckAlertTask implements HdxTask { +export default class CheckAlertTask implements HdxTask { private provider!: AlertProvider; + private task_queue: PQueue; - async execute(args: TaskArgs): Promise { - this.provider = await loadProvider(args.provider); + constructor(private args: CheckAlertsTaskArgs) { + const concurrency = this.args.concurrency; + this.task_queue = new PQueue({ + autoStart: true, + ...(concurrency ? { concurrency } : null), + }); + } + + async processAlertTask(now: Date, alertTask: AlertTask) { + const { alerts, conn } = alertTask; + logger.info({ + message: 'Processing alerts in batch', + alertCount: alerts.length, + }); + + const clickhouseClient = new clickhouse.ClickhouseClient({ + host: conn.host, + username: conn.username, + password: conn.password, + }); + + for (const alert of alerts) { + await this.task_queue.add(() => + processAlert(now, alert, clickhouseClient, conn.id, this.provider), + ); + } + } + + async execute(): Promise { + if (this.args.taskName !== 'check-alerts') { + throw new Error( + `CheckAlertTask can only handle 'check-alerts' tasks, received: ${this.args.taskName}`, + ); + } + + this.provider = await loadProvider(this.args.provider); await this.provider.init(); const now = new Date(); @@ -407,8 +419,14 @@ export default class CheckAlertTask implements HdxTask { }); for (const task of alertTasks) { - await processAlertTask(now, task, this.provider); + await this.task_queue.add(() => this.processAlertTask(now, task)); } + + await this.task_queue.onIdle(); + } + + name(): string { + return this.args.taskName; } async asyncDispose(): Promise { diff --git a/packages/api/src/tasks/index.ts b/packages/api/src/tasks/index.ts index 3726b528..e4b24420 100644 --- a/packages/api/src/tasks/index.ts +++ b/packages/api/src/tasks/index.ts @@ -9,26 +9,26 @@ import PingPongTask from '@/tasks/pingPongTask'; import { asTaskArgs, HdxTask, TaskArgs } from '@/tasks/types'; import logger from '@/utils/logger'; -function createTask(taskName: string): HdxTask { +function createTask(argv: TaskArgs): HdxTask { + const taskName = argv.taskName; switch (taskName) { case 'check-alerts': - return new CheckAlertTask(); + return new CheckAlertTask(argv); case 'ping-pong': - return new PingPongTask(); + return new PingPongTask(argv); default: throw new Error(`Unknown task name ${taskName}`); } } const main = async (argv: TaskArgs) => { - const taskName = argv.taskName; - const task: HdxTask = createTask(taskName); + const task: HdxTask = createTask(argv); try { const t0 = performance.now(); - logger.info(`Task [${taskName}] started at ${new Date()}`); - await task.execute(argv); + logger.info(`Task [${task.name()}] started at ${new Date()}`); + await task.execute(); logger.info( - `Task [${taskName}] finished in ${(performance.now() - t0).toFixed(2)} ms`, + `Task [${task.name()}] finished in ${(performance.now() - t0).toFixed(2)} ms`, ); } finally { await task.asyncDispose(); diff --git a/packages/api/src/tasks/pingPongTask.ts b/packages/api/src/tasks/pingPongTask.ts index 533a2389..841468c8 100644 --- a/packages/api/src/tasks/pingPongTask.ts +++ b/packages/api/src/tasks/pingPongTask.ts @@ -1,9 +1,11 @@ -import { HdxTask, TaskArgs } from '@/tasks/types'; +import { HdxTask, PingTaskArgs } from '@/tasks/types'; import logger from '@/utils/logger'; -export default class PingPongTask implements HdxTask { +export default class PingPongTask implements HdxTask { + constructor(private args: PingTaskArgs) {} + // eslint-disable-next-line @typescript-eslint/no-unused-vars - async execute(args: TaskArgs): Promise { + async execute(): Promise { logger.info(` O . _/|\\_-O @@ -22,5 +24,9 @@ export default class PingPongTask implements HdxTask { `); } + name(): string { + return this.args.taskName; + } + async asyncDispose(): Promise {} } diff --git a/packages/api/src/tasks/types.ts b/packages/api/src/tasks/types.ts index ae94b7fa..c31188e3 100644 --- a/packages/api/src/tasks/types.ts +++ b/packages/api/src/tasks/types.ts @@ -1,10 +1,18 @@ -import { ParsedArgs } from 'minimist'; - /** * Command line arguments structure for tasks. * Contains task name and optional provider configuration. */ -export type TaskArgs = { taskName: string; provider?: string }; +export type PingTaskArgs = { taskName: 'ping-pong' }; +export type CheckAlertsTaskArgs = { + taskName: 'check-alerts'; + // name of the provider module to use for fetching alert task data. If not defined, + // the default provider will be used. + provider?: string; + // Limits number of concurrent tasks processed. If omitted, there is no concurrency + // limit. Must be an integer greater than 0. + concurrency?: number; +}; +export type TaskArgs = PingTaskArgs | CheckAlertsTaskArgs; /** * Validates and converts command line arguments to TaskArgs type. @@ -33,44 +41,67 @@ export function asTaskArgs(argv: any): TaskArgs { throw new Error('All arguments in "_" array must be strings'); } - if (argv.provider !== undefined && typeof argv.provider !== 'string') { - throw new Error('Provider must be a string if provided'); - } + const taskName = argv._[0]; + if (taskName === 'check-alerts') { + const { provider, concurrency } = argv; + if (provider) { + if (typeof provider !== 'string') { + throw new Error('Provider must be a string if provided'); + } - // Provider is optional for check-alerts task, but if provided must be valid - if ( - argv._[0] === 'check-alerts' && - argv.provider !== undefined && - argv.provider.trim() === '' - ) { - throw new Error('Provider must contain valid characters'); - } + if (provider.trim() === '') { + throw new Error('Provider must contain valid characters'); + } + } - // Provider must contain valid characters if provided (for non-check-alerts tasks) - if (argv.provider !== undefined && argv.provider.trim() === '') { - throw new Error('Provider must contain valid characters'); - } + if (concurrency !== undefined) { + if (typeof concurrency !== 'number') { + throw new Error('Concurrency must be a number if provided'); + } - return { - taskName: argv._[0], - provider: argv.provider, - } as TaskArgs; + if (!Number.isInteger(concurrency)) { + throw new Error('Concurrency must be an integer if provided'); + } + + if (concurrency < 1) { + throw new Error('Concurrency cannot be less than 1'); + } + } + + return { + taskName: 'check-alerts', + provider: provider, + concurrency: concurrency, + }; + } else if (taskName === 'ping-pong') { + return { + taskName: 'ping-pong', + }; + } else { + // For any other task names, create a generic structure without provider + return { + taskName, + provider: argv.provider, + } as TaskArgs; + } } /** * Interface for HyperDX task implementations. * All tasks must implement execute and asyncDispose methods. */ -export interface HdxTask { +export interface HdxTask { /** * Executes the main task logic with validated arguments. * @param args - Validated command line arguments */ - execute(args: TaskArgs): Promise; + execute(): Promise; /** * Performs cleanup operations when the task is finished. * Should dispose of any resources held by the task. */ asyncDispose(): Promise; + + name(): string; } diff --git a/yarn.lock b/yarn.lock index cf4e1646..b6d7753c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4302,6 +4302,23 @@ __metadata: languageName: node linkType: hard +"@esm2cjs/p-queue@npm:^7.3.0": + version: 7.3.0 + resolution: "@esm2cjs/p-queue@npm:7.3.0" + dependencies: + "@esm2cjs/p-timeout": "npm:^5.0.2" + eventemitter3: "npm:^4.0.7" + checksum: 10c0/95ee99d71781d02a8b264be893aa3c6a0292123de3e52a7fc80ccdfd4f426f3e7d93862c2f386e1ef266a6c61d29c921fb5343cccabbf2ecbf88888a6278a592 + languageName: node + linkType: hard + +"@esm2cjs/p-timeout@npm:^5.0.2": + version: 5.1.0 + resolution: "@esm2cjs/p-timeout@npm:5.1.0" + checksum: 10c0/55019425b87a2c0ac1aca16107facedef256f0fcb4e845a3e79d971aa23d019e8db38a342ada82a6c1520c63b00f178ade8c1f113bfd2592bd3b93cbe6c20961 + languageName: node + linkType: hard + "@fal-works/esbuild-plugin-global-externals@npm:^2.1.2": version: 2.1.2 resolution: "@fal-works/esbuild-plugin-global-externals@npm:2.1.2" @@ -4472,6 +4489,7 @@ __metadata: resolution: "@hyperdx/api@workspace:packages/api" dependencies: "@clickhouse/client": "npm:^0.2.10" + "@esm2cjs/p-queue": "npm:^7.3.0" "@hyperdx/common-utils": "npm:^0.3.2" "@hyperdx/lucene": "npm:^3.1.1" "@hyperdx/node-opentelemetry": "npm:^0.8.2" @@ -16253,7 +16271,7 @@ __metadata: languageName: node linkType: hard -"eventemitter3@npm:^4.0.0, eventemitter3@npm:^4.0.1": +"eventemitter3@npm:^4.0.0, eventemitter3@npm:^4.0.1, eventemitter3@npm:^4.0.7": version: 4.0.7 resolution: "eventemitter3@npm:4.0.7" checksum: 10c0/5f6d97cbcbac47be798e6355e3a7639a84ee1f7d9b199a07017f1d2f1e2fe236004d14fa5dfaeba661f94ea57805385e326236a6debbc7145c8877fbc0297c6b