diff --git a/deployment/grafana-dashboards/Schema-Service.json b/deployment/grafana-dashboards/Schema-Service.json index d97adb777..1f90e7888 100644 --- a/deployment/grafana-dashboards/Schema-Service.json +++ b/deployment/grafana-dashboards/Schema-Service.json @@ -269,7 +269,7 @@ "uid": "grafanacloud-prom" }, "editorMode": "code", - "expr": "sum by(cache) (rate(schema_external_composition_total{type=\"success\"}[$__rate_interval]))", + "expr": "sum by(cache) (rate(schema_composition_total{type=\"success\"}[$__rate_interval]))", "instant": false, "legendFormat": "{{cache}}", "range": true, @@ -334,7 +334,7 @@ "uid": "grafanacloud-prom" }, "editorMode": "code", - "expr": "sum by(cache) (rate(schema_external_composition_total{type=\"success\"}[$__rate_interval]))", + "expr": "sum by(cache) (rate(schema_composition_total{type=\"success\"}[$__rate_interval]))", "instant": false, "legendFormat": "{{cache}}", "range": true, @@ -426,7 +426,7 @@ "uid": "grafanacloud-prom" }, "editorMode": "code", - "expr": "sum by(cache) (rate(schema_external_composition_total{type=\"failure\"}[$__rate_interval]))", + "expr": "sum by(cache) (rate(schema_composition_total{type=\"failure\"}[$__rate_interval]))", "instant": false, "legendFormat": "{{cache}}", "range": true, @@ -491,7 +491,7 @@ "uid": "grafanacloud-prom" }, "editorMode": "code", - "expr": "sum by(cache) (rate(schema_external_composition_total{type=\"failure\"}[$__rate_interval]))", + "expr": "sum by(cache) (rate(schema_composition_total{type=\"failure\"}[$__rate_interval]))", "instant": false, "legendFormat": "{{cache}}", "range": true, @@ -587,7 +587,7 @@ "uid": "grafanacloud-prom" }, "editorMode": "code", - "expr": "sum by(cache) (rate(schema_external_composition_total{type=\"timeout\"}[$__rate_interval]))", + "expr": "sum by(cache) (rate(schema_composition_total{type=\"timeout\"}[$__rate_interval]))", "instant": false, "legendFormat": "{{cache}}", "range": true, @@ -652,7 +652,7 @@ "uid": "grafanacloud-prom" }, "editorMode": "code", - "expr": "sum by(cache) (rate(schema_external_composition_total{type=\"timeout\"}[$__rate_interval]))", + "expr": "sum by(cache) (rate(schema_composition_total{type=\"timeout\"}[$__rate_interval]))", "instant": false, "legendFormat": "{{cache}}", "range": true, diff --git a/packages/services/schema/src/cache.ts b/packages/services/schema/src/cache.ts index 7eac96455..eef7618e8 100644 --- a/packages/services/schema/src/cache.ts +++ b/packages/services/schema/src/cache.ts @@ -3,7 +3,7 @@ import stringify from 'fast-json-stable-stringify'; import type { Redis } from 'ioredis'; import { TimeoutError } from 'p-timeout'; import type { ServiceLogger } from '@hive/service-common'; -import { externalCompositionCounter } from './metrics'; +import { compositionCacheValueSizeBytes, schemaCompositionCounter } from './metrics'; function createChecksum(input: TInput): string { return createHash('sha256').update(stringify(input)).digest('hex'); @@ -101,13 +101,18 @@ export function createCache(options: { pickCacheType: (data: T) => CacheTTLType, ): Promise { logger.debug('Completing action (id=%s)', id); + const encodedData = JSON.stringify({ + status: 'completed', + result: data, + }); + + const sizeInBytes = Buffer.byteLength(encodedData, 'utf8'); + compositionCacheValueSizeBytes.observe(sizeInBytes); + await redis.psetex( id, pickCacheType(data) === 'long' ? ttlMs.success : ttlMs.failure, - JSON.stringify({ - status: 'completed', - result: data, - }), + encodedData, ); } @@ -152,13 +157,13 @@ export function createCache(options: { if (cached.status === 'failed') { logger.debug('Rejecting action from cache (id=%s)', id); if (cached.error.startsWith('TimeoutError:')) { - externalCompositionCounter.inc({ + schemaCompositionCounter.inc({ cache: 'hit', type: 'timeout', }); throw new TimeoutError(cached.error.replace('TimeoutError:', '')); } - externalCompositionCounter.inc({ + schemaCompositionCounter.inc({ cache: 'hit', type: 'failure', }); @@ -166,7 +171,7 @@ export function createCache(options: { } logger.debug('Resolving action from cache (id=%s)', id); - externalCompositionCounter.inc({ + schemaCompositionCounter.inc({ cache: 'hit', type: 'success', }); @@ -190,13 +195,13 @@ export function createCache(options: { ]); await completeAction(id, result, pickCacheType); - externalCompositionCounter.inc({ + schemaCompositionCounter.inc({ cache: 'miss', type: 'success', }); return result; } catch (error) { - externalCompositionCounter.inc({ + schemaCompositionCounter.inc({ cache: 'miss', type: error instanceof TimeoutError ? 'timeout' : 'failure', }); diff --git a/packages/services/schema/src/composition-scheduler.ts b/packages/services/schema/src/composition-scheduler.ts index f4af284d5..6fa025407 100644 --- a/packages/services/schema/src/composition-scheduler.ts +++ b/packages/services/schema/src/composition-scheduler.ts @@ -4,6 +4,11 @@ import fastq from 'fastq'; import * as Sentry from '@sentry/node'; import { registerWorkerLogging, type Logger } from '../../api/src/modules/shared/providers/logger'; import type { CompositionEvent, CompositionResultEvent } from './composition-worker'; +import { + compositionQueueDurationMS, + compositionTotalDurationMS, + compositionWorkerDurationMS, +} from './metrics'; type WorkerRunArgs = { data: CompositionEvent['data']; @@ -20,6 +25,11 @@ type WorkerInterface = { run: (args: WorkerRunArgs) => Promise; }; +type QueueData = { + args: WorkerRunArgs; + addedToQueueTime: number; +}; + export class CompositionScheduler { private logger: Logger; /** The amount of parallel workers */ @@ -28,7 +38,7 @@ export class CompositionScheduler { /** List of all workers */ private workers: Array; - private queue: fastq.queueAsPromised; + private queue: fastq.queueAsPromised; constructor(logger: Logger, workerCount: number, maxOldGenerationSizeMb: number) { this.workerCount = workerCount; @@ -38,17 +48,26 @@ export class CompositionScheduler { this.workers = workers; this.queue = fastq.promise( - function queue(data) { + async function queue(data) { // Let's not process aborted requests - if (data.abortSignal.aborted) { - throw data.abortSignal.reason; + if (data.args.abortSignal.aborted) { + throw data.args.abortSignal.reason; } + const startProcessingTime = now(); + compositionQueueDurationMS.observe(startProcessingTime - data.addedToQueueTime); const worker = workers.find(worker => worker.isIdle); - if (!worker) { throw new Error('No idle worker found.'); } - return worker.run(data); + + const result = await worker.run(data.args); + const finishedTime = now(); + compositionWorkerDurationMS.observe( + { type: data.args.data.type }, + finishedTime - startProcessingTime, + ); + compositionTotalDurationMS.observe(finishedTime - data.addedToQueueTime); + return result; }, // The size needs to be the same as the length of `this.workers`. // Otherwise a worker would process more than a single task at a time. @@ -179,6 +198,10 @@ export class CompositionScheduler { /** Process a composition task in a worker (once the next worker is free). */ process(args: WorkerRunArgs): Promise { - return this.queue.push(args); + return this.queue.push({ args, addedToQueueTime: now() }); } } + +function now() { + return new Date().getTime(); +} diff --git a/packages/services/schema/src/metrics.ts b/packages/services/schema/src/metrics.ts index 6aa35b3de..d31abf2c0 100644 --- a/packages/services/schema/src/metrics.ts +++ b/packages/services/schema/src/metrics.ts @@ -6,8 +6,29 @@ export const composeAndValidateCounter = new metrics.Counter({ labelNames: ['type'], }); -export const externalCompositionCounter = new metrics.Counter({ - name: 'schema_external_composition_total', - help: 'Number of external compositions', +export const schemaCompositionCounter = new metrics.Counter({ + name: 'schema_composition_total', + help: 'Number of schema compositions', labelNames: ['cache' /* hit or miss */, 'type' /* success, failure or timeout */], }); + +export const compositionTotalDurationMS = new metrics.Histogram({ + name: 'composition_total_duration_ms', + help: 'Total time of processing a composition (includes time in queue + actual processing time)', +}); + +export const compositionQueueDurationMS = new metrics.Histogram({ + name: 'composition_queue_duration_ms', + help: 'Time spent in queue before being processed.', +}); + +export const compositionWorkerDurationMS = new metrics.Histogram({ + name: 'composition_worker_duration_ms', + help: 'Time of running composition in worker', + labelNames: ['type' /* single, federation or stitching */], +}); + +export const compositionCacheValueSizeBytes = new metrics.Histogram({ + name: 'composition_cache_value_size_bytes', + help: 'The size of the cache entries.', +});