From d0e0b65fc693a30d733ea3e0a89a22e74e64282d Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Mon, 14 Apr 2025 13:48:42 +0200 Subject: [PATCH] feat(schema): run composition in worker threads (#6725) --- configs/tsup/utils.ts | 2 +- deployment/services/schema.ts | 2 +- .../providers/persisted-document-scheduler.ts | 26 +- .../schema/providers/orchestrators/single.ts | 47 +- .../providers/orchestrators/stitching.ts | 49 +- .../src/modules/shared/providers/logger.ts | 58 ++ packages/services/schema/README.md | 52 +- .../services/schema/__tests__/cache.spec.ts | 2 +- packages/services/schema/package.json | 6 +- packages/services/schema/src/api.ts | 133 +++- packages/services/schema/src/cache.ts | 20 +- .../schema/src/composition-scheduler.ts | 184 ++++++ .../schema/src/composition-worker-main.ts | 17 + .../services/schema/src/composition-worker.ts | 186 ++++++ .../schema/src/composition/federation.ts | 353 ++++++++++ .../services/schema/src/composition/shared.ts | 64 ++ .../services/schema/src/composition/single.ts | 68 ++ .../schema/src/composition/stitching.ts | 92 +++ packages/services/schema/src/environment.ts | 6 + packages/services/schema/src/index.ts | 31 +- packages/services/schema/src/lib/compose.ts | 7 +- .../schema/src/lib/trim-descriptions.ts | 31 + packages/services/schema/src/orchestrators.ts | 604 ------------------ .../server/src/persisted-documents-worker.ts | 31 +- pnpm-lock.yaml | 18 +- 25 files changed, 1360 insertions(+), 729 deletions(-) create mode 100644 packages/services/schema/src/composition-scheduler.ts create mode 100644 packages/services/schema/src/composition-worker-main.ts create mode 100644 packages/services/schema/src/composition-worker.ts create mode 100644 packages/services/schema/src/composition/federation.ts create mode 100644 packages/services/schema/src/composition/shared.ts create mode 100644 packages/services/schema/src/composition/single.ts create mode 100644 packages/services/schema/src/composition/stitching.ts create mode 100644 packages/services/schema/src/lib/trim-descriptions.ts delete mode 100644 packages/services/schema/src/orchestrators.ts diff --git a/configs/tsup/utils.ts b/configs/tsup/utils.ts index 702eda465..885765e3b 100644 --- a/configs/tsup/utils.ts +++ b/configs/tsup/utils.ts @@ -58,7 +58,7 @@ export const watchEntryPlugin = () => { name: 'node-watch-entry', esbuildOptions(options) { const entries = (options.entryPoints as string[]) || []; - const entry = entries[0]; + const entry = entries.find(entry => entry === 'src/dev.ts' || entry === 'test/root.ts'); if (!entry) { throw new Error('No entry point found'); diff --git a/deployment/services/schema.ts b/deployment/services/schema.ts index 13c46628f..4d7d5b800 100644 --- a/deployment/services/schema.ts +++ b/deployment/services/schema.ts @@ -50,7 +50,7 @@ export function deploySchema({ startupProbe: '/_health', exposesMetrics: true, replicas: environment.isProduction ? 3 : 1, - memoryLimit: '1Gi', + memoryLimit: '2Gi', pdb: true, }, [redis.deployment, redis.service], diff --git a/packages/services/api/src/modules/app-deployments/providers/persisted-document-scheduler.ts b/packages/services/api/src/modules/app-deployments/providers/persisted-document-scheduler.ts index 436763f83..40e042de0 100644 --- a/packages/services/api/src/modules/app-deployments/providers/persisted-document-scheduler.ts +++ b/packages/services/api/src/modules/app-deployments/providers/persisted-document-scheduler.ts @@ -2,8 +2,7 @@ import path from 'node:path'; import { Worker } from 'node:worker_threads'; import { fileURLToPath } from 'url'; import { Injectable, Scope } from 'graphql-modules'; -import { LogLevel } from 'graphql-yoga'; -import { Logger } from '../../shared/providers/logger'; +import { Logger, registerWorkerLogging } from '../../shared/providers/logger'; import { BatchProcessedEvent, BatchProcessEvent } from './persisted-document-ingester'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); @@ -48,32 +47,19 @@ export class PersistedDocumentScheduler { } this.logger.debug('Re-Creating worker %s', index); + this.workers[index] = this.createWorker(index); + this.logger.debug('Cancel pending tasks %s', index); for (const [, task] of tasks) { task.reject(new Error('Worker stopped.')); } - - this.workers[index] = this.createWorker(index); }); + registerWorkerLogging(this.logger, worker, name); + worker.on( 'message', - ( - data: - | BatchProcessedEvent - | { event: 'error'; id: string; err: Error } - | { - event: 'log'; - bindings: Record; - level: LogLevel; - args: [string, ...unknown[]]; - }, - ) => { - if (data.event === 'log') { - this.logger.child(data.bindings)[data.level](...data.args); - return; - } - + (data: BatchProcessedEvent | { event: 'error'; id: string; err: Error }) => { if (data.event === 'error') { tasks.get(data.id)?.reject(data.err); } diff --git a/packages/services/api/src/modules/schema/providers/orchestrators/single.ts b/packages/services/api/src/modules/schema/providers/orchestrators/single.ts index b602a6dd4..1d0a2da27 100644 --- a/packages/services/api/src/modules/schema/providers/orchestrators/single.ts +++ b/packages/services/api/src/modules/schema/providers/orchestrators/single.ts @@ -1,4 +1,5 @@ import { CONTEXT, Inject, Injectable, Scope } from 'graphql-modules'; +import { abortSignalAny } from '@graphql-hive/signal'; import type { SchemaBuilderApi } from '@hive/schema'; import { traceFn } from '@hive/service-common'; import { createTRPCProxyClient, httpLink } from '@trpc/client'; @@ -14,6 +15,7 @@ export class SingleOrchestrator implements Orchestrator { type = ProjectType.SINGLE; private logger: Logger; private schemaService; + private incomingRequestAbortSignal: AbortSignal; constructor( logger: Logger, @@ -32,6 +34,7 @@ export class SingleOrchestrator implements Orchestrator { }), ], }); + this.incomingRequestAbortSignal = context.request.signal; } @traceFn('SingleOrchestrator.composeAndValidate', { @@ -54,14 +57,42 @@ export class SingleOrchestrator implements Orchestrator { throw new Error('too many schemas'); } - const result = await this.schemaService.composeAndValidate.mutate({ - type: 'single', - schemas: schemas.map(s => ({ - raw: s.raw, - source: s.source, - })), - }); + const timeoutAbortSignal = AbortSignal.timeout(30_000); - return result; + const onTimeout = () => { + this.logger.debug('Composition HTTP request aborted due to timeout of 30 seconds.'); + }; + timeoutAbortSignal.addEventListener('abort', onTimeout); + + const onIncomingRequestAbort = () => { + this.logger.debug('Composition HTTP request aborted due to incoming request being canceled.'); + }; + this.incomingRequestAbortSignal.addEventListener('abort', onIncomingRequestAbort); + + try { + const result = await this.schemaService.composeAndValidate.mutate( + { + type: 'single', + schemas: schemas.map(s => ({ + raw: s.raw, + source: s.source, + })), + }, + { + // We want to abort composition if the request that does the composition is aborted + // We also limit the maximum time allowed for composition requests to 30 seconds to avoid + // + // The reason for these is a potential dead-lock. + // + // Note: We are using `abortSignalAny` over `AbortSignal.any` because of leak issues. + // @source https://github.com/nodejs/node/issues/57584 + signal: abortSignalAny([this.incomingRequestAbortSignal, timeoutAbortSignal]), + }, + ); + return result; + } finally { + timeoutAbortSignal.removeEventListener('abort', onTimeout); + this.incomingRequestAbortSignal.removeEventListener('abort', onIncomingRequestAbort); + } } } diff --git a/packages/services/api/src/modules/schema/providers/orchestrators/stitching.ts b/packages/services/api/src/modules/schema/providers/orchestrators/stitching.ts index 7f07693d9..f500acea3 100644 --- a/packages/services/api/src/modules/schema/providers/orchestrators/stitching.ts +++ b/packages/services/api/src/modules/schema/providers/orchestrators/stitching.ts @@ -1,4 +1,5 @@ import { CONTEXT, Inject, Injectable, Scope } from 'graphql-modules'; +import { abortSignalAny } from '@graphql-hive/signal'; import type { SchemaBuilderApi } from '@hive/schema'; import { traceFn } from '@hive/service-common'; import { createTRPCProxyClient, httpLink } from '@trpc/client'; @@ -14,6 +15,7 @@ export class StitchingOrchestrator implements Orchestrator { type = ProjectType.STITCHING; private logger: Logger; private schemaService; + private incomingRequestAbortSignal: AbortSignal; constructor( logger: Logger, @@ -32,6 +34,7 @@ export class StitchingOrchestrator implements Orchestrator { }), ], }); + this.incomingRequestAbortSignal = context.request.signal; } @traceFn('StitchingOrchestrator.composeAndValidate', { @@ -47,15 +50,43 @@ export class StitchingOrchestrator implements Orchestrator { async composeAndValidate(schemas: SchemaObject[]) { this.logger.debug('Composing and Validating Stitched Schemas'); - const result = await this.schemaService.composeAndValidate.mutate({ - type: 'stitching', - schemas: schemas.map(s => ({ - raw: s.raw, - source: s.source, - url: s.url ?? null, - })), - }); + const timeoutAbortSignal = AbortSignal.timeout(30_000); - return result; + const onTimeout = () => { + this.logger.debug('Composition HTTP request aborted due to timeout of 30 seconds.'); + }; + timeoutAbortSignal.addEventListener('abort', onTimeout); + + const onIncomingRequestAbort = () => { + this.logger.debug('Composition HTTP request aborted due to incoming request being canceled.'); + }; + this.incomingRequestAbortSignal.addEventListener('abort', onIncomingRequestAbort); + + try { + const result = await this.schemaService.composeAndValidate.mutate( + { + type: 'stitching', + schemas: schemas.map(s => ({ + raw: s.raw, + source: s.source, + url: s.url ?? null, + })), + }, + { + // We want to abort composition if the request that does the composition is aborted + // We also limit the maximum time allowed for composition requests to 30 seconds to avoid + // + // The reason for these is a potential dead-lock. + // + // Note: We are using `abortSignalAny` over `AbortSignal.any` because of leak issues. + // @source https://github.com/nodejs/node/issues/57584 + signal: abortSignalAny([this.incomingRequestAbortSignal, timeoutAbortSignal]), + }, + ); + return result; + } finally { + timeoutAbortSignal.removeEventListener('abort', onTimeout); + this.incomingRequestAbortSignal.removeEventListener('abort', onIncomingRequestAbort); + } } } diff --git a/packages/services/api/src/modules/shared/providers/logger.ts b/packages/services/api/src/modules/shared/providers/logger.ts index 65bb8254f..ec147f0ed 100644 --- a/packages/services/api/src/modules/shared/providers/logger.ts +++ b/packages/services/api/src/modules/shared/providers/logger.ts @@ -1,3 +1,5 @@ +import type { MessagePort, Worker } from 'node:worker_threads'; +import type { LogLevel } from 'fastify'; import { Injectable } from 'graphql-modules'; export type LogFn = (msg: string, ...args: unknown[]) => void; @@ -30,3 +32,59 @@ export class NoopLogger extends Logger { debug = noop; child = () => this; } + +export type MessagePortLog = { + event: 'log'; + bindings: Record; + level: Exclude; + args: [string, ...unknown[]]; +}; + +/** + * Create a logger that posts the logs to the message port. + * The main use-case of this is to forward logs from a worker thread to the main thread. + */ +export function createMessagePortLogger( + port: MessagePort, + bindings: Record = {}, +): Logger { + return { + child(newBindings) { + return createMessagePortLogger(port, { ...bindings, ...newBindings }); + }, + debug(...args) { + port.postMessage({ event: 'log', level: 'debug', args, bindings }); + }, + error(...args) { + port.postMessage({ event: 'log', level: 'error', args, bindings }); + }, + fatal(...args) { + port.postMessage({ event: 'log', level: 'fatal', args, bindings }); + }, + info(...args) { + port.postMessage({ event: 'log', level: 'info', args, bindings }); + }, + trace(...args) { + port.postMessage({ event: 'log', level: 'trace', args, bindings }); + }, + warn(...args) { + port.postMessage({ event: 'log', level: 'warn', args, bindings }); + }, + }; +} + +/** + * Register a logger from the message port to log to this threads logger. + */ +export function registerWorkerLogging(logger: Logger, worker: Worker, workerId: string) { + worker.on('message', (data: MessagePortLog) => { + if ('event' in data && data.event === 'log') { + logger + .child({ + ...data.bindings, + workerId, + }) + [data.level](...data.args); + } + }); +} diff --git a/packages/services/schema/README.md b/packages/services/schema/README.md index 846c2b42b..7bce8e3fa 100644 --- a/packages/services/schema/README.md +++ b/packages/services/schema/README.md @@ -1,7 +1,7 @@ # `@hive/schema` Service for validating schemas or verifying whether a composite GraphQL schema can be composed out -of subschemas. +of subschemas. Supports Federation, Schema Stitching and Monolithic Schemas. ## Configuration @@ -26,3 +26,53 @@ of subschemas. | `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) | | `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) | | `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` | + +## Documentation + +### Composition Request Handling + +The following diagram outlines how the service handles incoming composition requests via HTTP +(tRPC). It details the decision-making process around caching with Redis, reuse of in-progress +tasks, and task execution using a limited pool of worker threads. + +Each composition task runs in an isolated worker thread with memory limits to prevent a single +malfunctioning task from affecting the stability of the entire service. This setup ensures robust +and efficient processing by avoiding redundant computation, serving cached results when possible, +and queuing tasks when resources are saturated. + +```mermaid +sequenceDiagram + participant Client + participant Service + participant Redis + participant TaskManager + participant WorkerPool + + Client->>Service: Composition HTTP request (tRPC) + Service->>Redis: Check for cached result + alt Cached result found + Redis-->>Service: Return result + Service-->>Client: Send cached result + else Not cached + Service->>TaskManager: Check if task in progress + alt Task in progress + TaskManager-->>Service: Return existing task + Service->>TaskManager: Wait for task completion + TaskManager-->>Service: Return result + Service-->>Client: Send result + else No task in progress + TaskManager->>WorkerPool: Check for available worker + alt Worker available + WorkerPool-->>TaskManager: Assign task + else No workers available + TaskManager->>TaskManager: Enqueue task in memory + TaskManager->>WorkerPool: Wait for available worker + WorkerPool-->>TaskManager: Assign task when ready + end + WorkerPool->>TaskManager: Task completed + TaskManager->>Redis: Cache result + TaskManager-->>Service: Return result to pending requests + Service-->>Client: Send result + end + end +``` diff --git a/packages/services/schema/__tests__/cache.spec.ts b/packages/services/schema/__tests__/cache.spec.ts index 7470563bc..3f91b2f25 100644 --- a/packages/services/schema/__tests__/cache.spec.ts +++ b/packages/services/schema/__tests__/cache.spec.ts @@ -258,7 +258,7 @@ test('run action again when the action expires', async ({ expect }) => { const actionId = randomString(); async function actionFn() { - await waitFor(timeoutMs); + await waitFor(timeoutMs - 1); return 'foo'; } diff --git a/packages/services/schema/package.json b/packages/services/schema/package.json index fcbbdf36e..c14440f39 100644 --- a/packages/services/schema/package.json +++ b/packages/services/schema/package.json @@ -4,8 +4,8 @@ "license": "MIT", "private": true, "scripts": { - "build": "tsx ../../../scripts/runify.ts", - "dev": "tsup-node --config ../../../configs/tsup/dev.config.node.ts src/dev.ts", + "build": "tsx ../../../scripts/runify.ts src/index.ts src/composition-worker-main.ts", + "dev": "tsup-node --config ../../../configs/tsup/dev.config.node.ts src/dev.ts src/composition-worker-main.ts ", "typecheck": "tsc --noEmit" }, "devDependencies": { @@ -22,12 +22,14 @@ "dotenv": "16.4.7", "fast-json-stable-stringify": "2.1.0", "fastify": "4.29.0", + "fastq": "1.19.1", "got": "14.4.7", "graphql": "16.9.0", "ioredis": "5.4.2", "ioredis-mock": "8.9.0", "p-timeout": "6.1.4", "pino-pretty": "11.3.0", + "reflect-metadata": "0.2.2", "zod": "3.24.1" } } diff --git a/packages/services/schema/src/api.ts b/packages/services/schema/src/api.ts index ddf65d583..c25865eb3 100644 --- a/packages/services/schema/src/api.ts +++ b/packages/services/schema/src/api.ts @@ -4,19 +4,24 @@ import { handleTRPCError } from '@hive/service-common'; import type { inferRouterInputs } from '@trpc/server'; import { initTRPC } from '@trpc/server'; import type { Cache } from './cache'; +import type { CompositionScheduler } from './composition-scheduler'; +import { type ComposeFederationArgs } from './composition/federation'; +import type { CompositionErrorType } from './composition/shared'; +import { ComposeSingleArgs } from './composition/single'; +import { ComposeStitchingArgs } from './composition/stitching'; import { composeAndValidateCounter } from './metrics'; -import { pickOrchestrator } from './orchestrators'; +import type { Metadata } from './types'; export type { CompositionFailureError, CompositionErrorSource } from './lib/errors'; export interface Context { req: FastifyRequest; cache: Cache; - decrypt(value: string): string; broker: { endpoint: string; signature: string; } | null; + compositionScheduler: CompositionScheduler; } const t = initTRPC.context().create(); @@ -83,21 +88,121 @@ export const schemaBuilderApiRouter = t.router({ }), ]), ) - .mutation(async ({ ctx, input }) => { + .mutation(async ({ ctx, input }): Promise => { composeAndValidateCounter.inc({ type: input.type }); - return await pickOrchestrator(input.type, ctx.cache, ctx.req, ctx.decrypt).composeAndValidate( - input.schemas, - 'external' in input && input.external - ? { - ...input.external, - broker: ctx.broker, - } - : null, - 'native' in input && input.native ? true : false, - 'contracts' in input && input.contracts ? input.contracts : undefined, - ); + + try { + if (input.type === 'federation') { + return await ctx.cache.reuse( + 'federation', + async (args: ComposeFederationArgs, abortSignal) => { + const result = await ctx.compositionScheduler.process({ + data: { + type: 'federation', + args, + requestTimeoutMs: ctx.cache.timeoutMs, + }, + abortSignal, + requestId: ctx.req.id, + }); + return result.result; + }, + result => + result.includesNetworkError === true || result.includesException === true + ? 'short' + : 'long', + )({ + schemas: input.schemas, + external: + 'external' in input && input.external + ? { + ...input.external, + broker: ctx.broker, + } + : null, + native: 'native' in input && input.native ? true : false, + contracts: 'contracts' in input && input.contracts ? input.contracts : undefined, + requestId: ctx.req.id, + }); + } + + if (input.type === 'stitching') { + return await ctx.cache.reuse( + 'stitching', + async (args: ComposeStitchingArgs, abortSignal) => { + const result = await ctx.compositionScheduler.process({ + data: { + type: 'stitching', + args, + }, + abortSignal, + requestId: ctx.req.id, + }); + return result.result; + }, + )({ schemas: input.schemas }); + } + + if (input.type === 'single') { + return await ctx.cache.reuse('single', async (args: ComposeSingleArgs, abortSignal) => { + const result = await ctx.compositionScheduler.process({ + data: { + type: 'single', + args, + }, + abortSignal, + requestId: ctx.req.id, + }); + return result.result; + })({ schemas: input.schemas }); + } + + assertAllCasesExhausted(input); + } catch (error) { + // Treat timeouts caused by external composition as "expected errors" + if (ctx.cache.isTimeoutError(error) && input.type === 'federation' && input.external) { + return { + errors: [ + { + message: error.message, + source: 'graphql', + }, + ], + sdl: null, + supergraph: null, + includesNetworkError: true, + contracts: null, + tags: null, + schemaMetadata: null, + metadataAttributes: null, + } satisfies CompositionResponse; + } + throw error; + } + + throw new Error('tRCP and TypeScript for the win.'); }), }); export type SchemaBuilderApi = typeof schemaBuilderApiRouter; export type SchemaBuilderApiInput = inferRouterInputs; + +function assertAllCasesExhausted(value: never) { + throw new Error(`Not all cases are exhaused. Value '${value}'.`); +} + +export type CompositionResponse = { + errors: Array; + sdl: null | string; + supergraph: null | string; + contracts: Array<{ + id: string; + errors: Array; + sdl: string | null; + supergraph: string | null; + }> | null; + schemaMetadata: Record | null; + metadataAttributes: Record | null; + tags: Array | null; + includesNetworkError?: boolean; +}; diff --git a/packages/services/schema/src/cache.ts b/packages/services/schema/src/cache.ts index 8031b0348..7eac96455 100644 --- a/packages/services/schema/src/cache.ts +++ b/packages/services/schema/src/cache.ts @@ -1,7 +1,7 @@ import { createHash } from 'node:crypto'; import stringify from 'fast-json-stable-stringify'; import type { Redis } from 'ioredis'; -import pTimeout, { TimeoutError } from 'p-timeout'; +import { TimeoutError } from 'p-timeout'; import type { ServiceLogger } from '@hive/service-common'; import { externalCompositionCounter } from './metrics'; @@ -125,7 +125,7 @@ export function createCache(options: { async function runAction( groupKey: string, - factory: (input: I) => Promise, + factory: (input: I, signal: AbortSignal) => Promise, pickCacheType: (output: O) => CacheTTLType, input: I, attempt: number, @@ -179,10 +179,16 @@ export function createCache(options: { try { logger.debug('Executing action (id=%s)', id); - const result = await pTimeout(factory(input), { - milliseconds: timeoutMs, - message: `Timeout: took longer than ${timeoutMs}ms to complete`, - }); + const timeoutSignal = AbortSignal.timeout(timeoutMs); + const result = await Promise.race([ + factory(input, timeoutSignal), + new Promise((_, reject) => { + timeoutSignal.addEventListener('abort', () => { + reject(new TimeoutError(`Timeout: took longer than ${timeoutMs}ms to complete`)); + }); + }), + ]); + await completeAction(id, result, pickCacheType); externalCompositionCounter.inc({ cache: 'miss', @@ -206,7 +212,7 @@ export function createCache(options: { }, reuse( groupKey: string, - factory: (input: I) => Promise, + factory: (input: I, signal: AbortSignal) => Promise, pickCacheType: (output: O) => CacheTTLType = () => 'long', ): (input: I) => Promise { return async input => runAction(groupKey, factory, pickCacheType, input, 1); diff --git a/packages/services/schema/src/composition-scheduler.ts b/packages/services/schema/src/composition-scheduler.ts new file mode 100644 index 000000000..f4af284d5 --- /dev/null +++ b/packages/services/schema/src/composition-scheduler.ts @@ -0,0 +1,184 @@ +import * as path from 'node:path'; +import { Worker } from 'node:worker_threads'; +import fastq from 'fastq'; +import * as Sentry from '@sentry/node'; +import { registerWorkerLogging, type Logger } from '../../api/src/modules/shared/providers/logger'; +import type { CompositionEvent, CompositionResultEvent } from './composition-worker'; + +type WorkerRunArgs = { + data: CompositionEvent['data']; + requestId: string; + abortSignal: AbortSignal; +}; + +type Task = Omit, 'promise'>; + +type WorkerInterface = { + readonly isIdle: boolean; + name: string; + /** Run a task on the worker. */ + run: (args: WorkerRunArgs) => Promise; +}; + +export class CompositionScheduler { + private logger: Logger; + /** The amount of parallel workers */ + private workerCount: number; + private maxOldGenerationSizeMb: number; + /** List of all workers */ + private workers: Array; + + private queue: fastq.queueAsPromised; + + constructor(logger: Logger, workerCount: number, maxOldGenerationSizeMb: number) { + this.workerCount = workerCount; + this.maxOldGenerationSizeMb = maxOldGenerationSizeMb; + this.logger = logger.child({ source: 'CompositionScheduler' }); + const workers = Array.from({ length: this.workerCount }, (_, i) => this.createWorker(i)); + this.workers = workers; + + this.queue = fastq.promise( + function queue(data) { + // Let's not process aborted requests + if (data.abortSignal.aborted) { + throw data.abortSignal.reason; + } + const worker = workers.find(worker => worker.isIdle); + + if (!worker) { + throw new Error('No idle worker found.'); + } + return worker.run(data); + }, + // The size needs to be the same as the length of `this.workers`. + // Otherwise a worker would process more than a single task at a time. + this.workerCount, + ); + } + + private createWorker(index: number): WorkerInterface { + this.logger.debug('Creating worker %s', index); + const name = `composition-worker-${index}`; + const worker = new Worker(path.join(__dirname, 'composition-worker-main.js'), { + name, + resourceLimits: { + maxOldGenerationSizeMb: this.maxOldGenerationSizeMb, + }, + }); + + let workerState: { + task: Task; + args: WorkerRunArgs; + } | null = null; + + const recreate = (reason: Error) => { + void worker.terminate().finally(() => { + this.logger.debug('Re-Creating worker %s', index); + this.workers[index] = this.createWorker(index); + + if (workerState) { + this.logger.debug('Cancel pending task %s', index); + workerState.task.reject(reason); + } + }); + }; + + worker.on('error', error => { + console.error(error); + this.logger.error('Worker error %s', error); + Sentry.captureException(error, { + extra: { + requestId: workerState?.args.requestId ?? '', + compositionType: workerState?.args.data.type, + }, + }); + recreate(error); + }); + + worker.on('exit', code => { + this.logger.error('Worker stopped with exit code %s', String(code)); + }); + + registerWorkerLogging(this.logger, worker, name); + + worker.on( + 'message', + (data: CompositionResultEvent | { event: 'error'; id: string; err: Error }) => { + if (data.event === 'error') { + workerState?.task.reject(data.err); + } + + if (data.event === 'compositionResult') { + workerState?.task.resolve(data); + } + }, + ); + + const { logger: baseLogger } = this; + + function run(args: WorkerRunArgs) { + if (workerState) { + throw new Error('Can not run task in worker that is not idle.'); + } + const taskId = crypto.randomUUID(); + const logger = baseLogger.child({ taskId, reqId: args.requestId }); + const d = Promise.withResolvers(); + + let task: Task = { + resolve: data => { + args.abortSignal.removeEventListener('abort', onAbort); + workerState = null; + d.resolve(data); + }, + reject: err => { + args.abortSignal.removeEventListener('abort', onAbort); + void worker.terminate().finally(() => { + workerState = null; + d.reject(err); + }); + }, + }; + workerState = { + task, + args, + }; + + function onAbort() { + logger.error('Task aborted.'); + recreate(new Error('Task aborted')); + } + + args.abortSignal.addEventListener('abort', onAbort); + + const time = process.hrtime(); + + worker.postMessage({ + event: 'composition', + id: taskId, + data: args.data, + taskId, + requestId: args.requestId, + } satisfies CompositionEvent); + + return d.promise + .finally(() => { + const endTime = process.hrtime(time); + logger.debug('Time taken: %ds:%dms', endTime[0], endTime[1] / 1000000); + }) + .then(result => result.data); + } + + return { + get isIdle() { + return workerState === null; + }, + name, + run, + }; + } + + /** Process a composition task in a worker (once the next worker is free). */ + process(args: WorkerRunArgs): Promise { + return this.queue.push(args); + } +} diff --git a/packages/services/schema/src/composition-worker-main.ts b/packages/services/schema/src/composition-worker-main.ts new file mode 100644 index 000000000..7d7fcf636 --- /dev/null +++ b/packages/services/schema/src/composition-worker-main.ts @@ -0,0 +1,17 @@ +import 'reflect-metadata'; +import { parentPort } from 'node:worker_threads'; +import { createMessagePortLogger } from '../../api/src/modules/shared/providers/logger'; +import { createCompositionWorker } from './composition-worker'; +import { env } from './environment'; + +if (!parentPort) { + throw new Error('This script must be run as a worker.'); +} + +const baseLogger = createMessagePortLogger(parentPort); + +createCompositionWorker({ + baseLogger, + port: parentPort, + env, +}); diff --git a/packages/services/schema/src/composition-worker.ts b/packages/services/schema/src/composition-worker.ts new file mode 100644 index 000000000..f7a16bdf7 --- /dev/null +++ b/packages/services/schema/src/composition-worker.ts @@ -0,0 +1,186 @@ +import * as crypto from 'node:crypto'; +import type { MessagePort } from 'node:worker_threads'; +import type { Logger } from '@hive/api'; +import { CompositionResponse } from './api'; +import { createComposeFederation, type ComposeFederationArgs } from './composition/federation'; +import { composeSingle, type ComposeSingleArgs } from './composition/single'; +import { composeStitching, type ComposeStitchingArgs } from './composition/stitching'; +import type { env } from './environment'; + +export function createCompositionWorker(args: { + baseLogger: Logger; + port: MessagePort; + env: typeof env; +}) { + const baseLogger = args.baseLogger.child({ + source: 'CompositionWorker', + }); + + const decrypt = decryptFactory(args.env.encryptionSecret); + + process.on('unhandledRejection', function (err) { + console.error('unhandledRejection', err); + console.error(err); + args.port.postMessage({ + code: 'ERROR', + err, + }); + process.exit(1); + }); + + process.on('uncaughtException', function (err) { + console.error('uncaughtException', err); + args.port.postMessage({ + code: 'ERROR', + err, + }); + process.exit(1); + }); + + args.port.on('message', async (message: CompositionEvent) => { + const logger = baseLogger.child({ + taskId: message.taskId, + reqId: message.requestId, + messageId: message.id, + event: message.event, + }); + logger.debug('processing message'); + + try { + if (message.event === 'composition') { + if (message.data.type === 'federation') { + const composeFederation = createComposeFederation({ + decrypt, + logger: baseLogger.child({ reqId: message.data.args.requestId }) as any, + requestTimeoutMs: message.data.requestTimeoutMs, + }); + const composed = await composeFederation(message.data.args); + + args.port.postMessage({ + event: 'compositionResult', + id: message.id, + data: { + type: message.data.type, + result: { + errors: composed.result.errors ?? [], + sdl: composed.result.sdl ?? null, + supergraph: composed.result.supergraph ?? null, + includesNetworkError: composed.result.includesNetworkError === true, + contracts: + composed.result.contracts?.map(contract => ({ + id: contract.id, + errors: 'errors' in contract.result.result ? contract.result.result.errors : [], + sdl: contract.result.result.sdl ?? null, + supergraph: contract.result.result.supergraph ?? null, + })) ?? null, + tags: composed.result.tags ?? null, + schemaMetadata: composed.result.schemaMetadata ?? null, + metadataAttributes: composed.result.metadataAttributes ?? null, + includesException: composed.result.includesException === true, + }, + }, + } satisfies CompositionResultEvent); + return; + } + + if (message.data.type === 'single') { + const result = await composeSingle(message.data.args); + args.port.postMessage({ + event: 'compositionResult', + id: message.id, + data: { + type: message.data.type, + result, + }, + } satisfies CompositionResultEvent); + return; + } + + if (message.data.type === 'stitching') { + const result = await composeStitching(message.data.args); + args.port.postMessage({ + event: 'compositionResult', + id: message.id, + data: { + type: message.data.type, + result, + }, + } satisfies CompositionResultEvent); + return; + } + assertAllCasesExhausted(message.data); + return; + } + assertAllCasesExhausted(message.event); + } catch (err: unknown) { + baseLogger.error( + 'unexpected error while processing message in worker (messageId=%s)', + message.id, + ); + baseLogger.error(String(err)); + args.port.postMessage({ + event: 'error', + id: message.id, + err, + }); + } + }); + + process.on('exit', function (code) { + console.log('exit', code); + }); +} + +export type CompositionEvent = { + id: string; + event: 'composition'; + requestId: string; + taskId: string; + data: + | { + type: 'federation'; + args: ComposeFederationArgs; + requestTimeoutMs: number; + } + | { + type: 'single'; + args: ComposeSingleArgs; + } + | { + type: 'stitching'; + args: ComposeStitchingArgs; + }; +}; + +export type CompositionResultEvent = { + id: string; + event: 'compositionResult'; + data: { + type: 'federation' | 'single' | 'stitching'; + result: CompositionResponse & { + includesException?: boolean; + }; + }; +}; + +function decryptFactory(encryptionSecret: string) { + const ENCRYPTION_SECRET = crypto.createHash('md5').update(encryptionSecret).digest('hex'); + + const ALG = 'aes256'; + const IN_ENC = 'utf8'; + const OUT_ENC = 'hex'; + + const secretBuffer = Buffer.from(ENCRYPTION_SECRET, 'latin1'); + + return function decrypt(text: string) { + const components = text.split(':'); + const iv = Buffer.from(components.shift() || '', OUT_ENC); + const decipher = crypto.createDecipheriv(ALG, secretBuffer, iv); + + return decipher.update(components.join(':'), OUT_ENC, IN_ENC) + decipher.final(IN_ENC); + }; +} + +function assertAllCasesExhausted(value: never) { + throw new Error(`Not all cases are exhaused. Value '${value}'.`); +} diff --git a/packages/services/schema/src/composition/federation.ts b/packages/services/schema/src/composition/federation.ts new file mode 100644 index 000000000..747b2d0a8 --- /dev/null +++ b/packages/services/schema/src/composition/federation.ts @@ -0,0 +1,353 @@ +import { + Kind, + parse, + visit, + type ConstDirectiveNode, + type FieldDefinitionNode, + type NameNode, +} from 'graphql'; +import type { ServiceLogger } from '@hive/service-common'; +import { extractLinkImplementations } from '@theguild/federation-composition'; +import type { ContractsInputType } from '../api'; +import { addInaccessibleToUnreachableTypes } from '../lib/add-inaccessible-to-unreachable-types'; +import { + composeExternalFederation, + composeFederationV1, + composeFederationV2, + ComposerMethodResult, + SubgraphInput, +} from '../lib/compose'; +import { + applyTagFilterOnSubgraphs, + createTagDirectiveNameExtractionStrategy, + extractTagsFromDocument, +} from '../lib/federation-tag-extraction'; +import { extractMetadata, mergeMetadata } from '../lib/metadata-extraction'; +import { SetMap } from '../lib/setmap'; +import { trimDescriptions } from '../lib/trim-descriptions'; +import type { ComposeAndValidateInput, ExternalComposition } from '../types'; +import { + CompositionResult, + ContractResultFailure, + ContractResultSuccess, + ContractResultType, +} from './shared'; + +export type ComposeFederationDeps = { + logger: ServiceLogger; + decrypt: (value: string) => string; + requestTimeoutMs: number; +}; + +export type ComposeFederationArgs = { + schemas: ComposeAndValidateInput; + external: ExternalComposition; + native: boolean; + contracts: ContractsInputType | undefined; + requestId: string; +}; + +export const createComposeFederation = (deps: ComposeFederationDeps) => + async function composeFederation(args: ComposeFederationArgs): Promise { + const subgraphs = args.schemas + .map(schema => { + deps.logger.debug(`Parsing subgraph schema SDL (name=%s)`, schema.source); + return { + typeDefs: trimDescriptions(parse(schema.raw)), + name: schema.source, + url: 'url' in schema && typeof schema.url === 'string' ? schema.url : undefined, + }; + }) + .map(subgraph => { + deps.logger.debug(`Extracting link implementations from subgraph (name=%s)`, subgraph.name); + const { matchesImplementation, resolveImportName } = extractLinkImplementations( + subgraph.typeDefs, + ); + if (matchesImplementation('https://specs.graphql-hive.com/hive', 'v1.0')) { + deps.logger.debug( + `Found hive link in subgraph. Applying link logic. (name=%s)`, + subgraph.name, + ); + // if this subgraph implements the metadata spec + // then copy metadata from the schema to all fields. + // @note this is similar to how federation's compose copies join__ directives to fields based on the + // subgraph that the field is a part of. + const metaDirectiveName = resolveImportName( + 'https://specs.graphql-hive.com/hive', + '@meta', + ); + const applyMetaToField = ( + fieldNode: FieldDefinitionNode, + metaDirectives: ConstDirectiveNode[], + ) => { + return { + ...fieldNode, + directives: [ + ...(fieldNode.directives ?? []), + ...metaDirectives.map(d => ({ ...d, loc: undefined })), + ], + }; + }; + + const schemaNodes = subgraph.typeDefs.definitions.filter( + d => d.kind === Kind.SCHEMA_DEFINITION || d.kind === Kind.SCHEMA_EXTENSION, + ); + const schemaMetaDirectives = schemaNodes + .flatMap(node => node.directives?.filter(d => d.name.value === metaDirectiveName)) + .filter(d => d !== undefined); + const interfaceAndObjectHandler = (node: { + readonly fields?: ReadonlyArray | undefined; + readonly directives?: ReadonlyArray | undefined; + readonly name: NameNode; + }) => { + // apply type/interface metadata to fields + const objectMetaDirectives = node.directives + ?.filter(d => d.name.value === metaDirectiveName) + .filter(d => d !== undefined); + if (objectMetaDirectives?.length) { + return { + ...node, + fields: node.fields?.map(f => applyMetaToField(f, objectMetaDirectives)), + }; + } + return node; + }; + subgraph.typeDefs = visit(subgraph.typeDefs, { + FieldDefinition: field => { + return applyMetaToField(field, schemaMetaDirectives); + }, + ObjectTypeDefinition: interfaceAndObjectHandler, + InterfaceTypeDefinition: interfaceAndObjectHandler, + }); + } + return subgraph; + }); + + /** Determine the correct compose method... */ + let compose: (subgraphs: Array) => Promise< + ComposerMethodResult & { + includesException?: boolean; + } + >; + + // Federation v2 + if (args.native) { + deps.logger.debug( + 'Using built-in Federation v2 composition service (schemas=%s)', + args.schemas.length, + ); + compose = subgraphs => Promise.resolve(composeFederationV2(subgraphs, deps.logger)); + } else if (args.external) { + const { external } = args; + compose = subgraphs => + composeExternalFederation({ + decrypt: deps.decrypt, + external, + logger: deps.logger, + requestId: args.requestId, + subgraphs, + requestTimeoutMs: deps.requestTimeoutMs, + }); + } else { + deps.logger.debug( + 'Using built-in Federation v1 composition service (schemas=%s)', + args.schemas.length, + ); + compose = subgraphs => Promise.resolve(composeFederationV1(subgraphs)); + } + let result: CompositionResult; + + { + const composed = await compose(subgraphs); + + if (composed.type === 'success') { + // merge all metadata from every subgraph by coordinate + const subgraphsMetadata = subgraphs.map(({ name, typeDefs }) => + extractMetadata(typeDefs, name), + ); + const supergraphSDL = parse(composed.result.supergraph); + const { resolveImportName } = extractLinkImplementations(supergraphSDL); + const tagDirectiveName = resolveImportName('https://specs.apollo.dev/tag', '@tag'); + const tagStrategy = createTagDirectiveNameExtractionStrategy(tagDirectiveName); + const tags = extractTagsFromDocument(supergraphSDL, tagStrategy); + const schemaMetadata = mergeMetadata(...subgraphsMetadata); + const metadataAttributes = new SetMap(); + for (const [_coord, attrs] of schemaMetadata) { + for (const attr of attrs) { + metadataAttributes.add(attr.name, attr.content); + } + } + result = { + type: 'success', + result: { + supergraph: composed.result.supergraph, + sdl: composed.result.sdl, + contracts: null, + tags, + schemaMetadata: Object.fromEntries(schemaMetadata), + metadataAttributes: metadataAttributes.toObject(), + }, + }; + } else { + result = { + type: 'failure', + result: { + supergraph: composed.result.supergraph ?? null, + sdl: composed.result.sdl ?? null, + errors: composed.result.errors, + contracts: null, + includesNetworkError: composed.includesNetworkError, + includesException: composed.includesException ?? false, + tags: null, + schemaMetadata: null, + metadataAttributes: null, + }, + }; + } + } + + if (!args.contracts?.length) { + // if no contracts, then compose and return the result + return result; + } + + if (result.type == 'failure') { + return { + ...result, + result: { + ...result.result, + contracts: args.contracts.map(contract => ({ + id: contract.id, + result: { + type: 'failure', + result: { + supergraph: null, + sdl: null, + includesNetworkError: false, + includesException: false, + errors: [ + { + message: 'Skipped contract composition, as default graph composition failed.', + source: 'composition', + }, + ], + }, + }, + })), + }, + }; + } + + // if there are contracts, then create + const contractResults: Array = await Promise.all( + args.contracts.map(async contract => { + // apply contracts to replace tags with inaccessible directives + const filteredSubgraphs = applyTagFilterOnSubgraphs(subgraphs, { + include: new Set(contract.filter.include), + exclude: new Set(contract.filter.exclude), + }); + + // attempt to compose the contract filtered subgraph + const compositionResult = await compose(filteredSubgraphs); + + // Remove unreachable types from public API schema + if ( + contract.filter.removeUnreachableTypesFromPublicApiSchema === true && + compositionResult.type === 'success' + ) { + let supergraphSDL = parse(compositionResult.result.supergraph); + const { resolveImportName } = extractLinkImplementations(supergraphSDL); + const result = addInaccessibleToUnreachableTypes( + resolveImportName, + compositionResult, + supergraphSDL, + ); + + if (result.type === 'success') { + return { + id: contract.id, + result: { + type: 'success', + result: { + supergraph: result.result.supergraph, + sdl: result.result.sdl, + }, + }, + } satisfies ContractResultSuccess; + } + + return { + id: contract.id, + result: { + type: 'failure', + result: { + supergraph: null, + sdl: null, + errors: result.result.errors, + includesNetworkError: false, + includesException: false, + }, + }, + } satisfies ContractResultFailure; + } + + if (compositionResult.type === 'success') { + return { + id: contract.id, + result: { + type: 'success', + result: { + supergraph: compositionResult.result.supergraph, + sdl: compositionResult.result.sdl, + }, + }, + } satisfies ContractResultSuccess; + } + + return { + id: contract.id, + result: { + type: 'failure', + result: { + supergraph: null, + sdl: null, + errors: compositionResult.result.errors, + includesNetworkError: compositionResult.includesNetworkError, + includesException: compositionResult.includesException ?? false, + }, + }, + } satisfies ContractResultFailure; + }), + ); + + const networkErrorContract = contractResults.find( + (contract): contract is ContractResultFailure => + contract.result.type === 'failure' && contract.result.result.includesNetworkError === true, + ); + + // In case any of the contract composition fails, we will fail the whole composition. + if (networkErrorContract) { + return { + type: 'failure', + result: { + errors: networkErrorContract.result.result.errors, + supergraph: null, + sdl: null, + tags: null, + includesNetworkError: true, + includesException: false, + schemaMetadata: null, + metadataAttributes: null, + contracts: null, + }, + }; + } + + return { + ...result, + result: { + ...result.result, + contracts: contractResults, + }, + }; + }; diff --git a/packages/services/schema/src/composition/shared.ts b/packages/services/schema/src/composition/shared.ts new file mode 100644 index 000000000..87e36fe4c --- /dev/null +++ b/packages/services/schema/src/composition/shared.ts @@ -0,0 +1,64 @@ +import type { CompositionErrorSource } from '../lib/errors'; +import type { Metadata } from '../types'; + +export type CompositionErrorType = { + message: string; + source: CompositionErrorSource; +}; + +type ContractResult = { + id: string; + result: T; +}; + +export type ContractResultSuccess = ContractResult<{ + type: 'success'; + result: { + supergraph: string; + sdl: string; + }; +}>; + +export type ContractResultFailure = ContractResult<{ + type: 'failure'; + result: { + supergraph: string | null; + sdl: string | null; + errors: Array; + includesNetworkError: boolean; + includesException: boolean; + }; +}>; + +export type ContractResultType = ContractResultSuccess | ContractResultFailure; + +type SharedResult = { + contracts: Array | null; + schemaMetadata: Record | null; + metadataAttributes: Record | null; + tags: Array | null; +}; + +export type CompositionResultSuccess = { + type: 'success'; + result: { + supergraph: string; + sdl: string; + errors?: never; + includesNetworkError?: never; + includesException?: never; + } & SharedResult; +}; + +export type CompositionResultFailure = { + type: 'failure'; + result: { + supergraph: string | null; + sdl: string | null; + errors: Array; + includesNetworkError: boolean; + includesException: boolean; + } & SharedResult; +}; + +export type CompositionResult = CompositionResultSuccess | CompositionResultFailure; diff --git a/packages/services/schema/src/composition/single.ts b/packages/services/schema/src/composition/single.ts new file mode 100644 index 000000000..d58df17b0 --- /dev/null +++ b/packages/services/schema/src/composition/single.ts @@ -0,0 +1,68 @@ +import { + buildASTSchema, + DocumentNode, + GraphQLError, + isTypeSystemExtensionNode, + parse, + print, + validateSchema, +} from 'graphql'; +import { validateSDL } from 'graphql/validation/validate.js'; +import { mergeTypeDefs } from '@graphql-tools/merge'; +import { errorWithSource, type CompositionErrorSource } from '../lib/errors'; +import { trimDescriptions } from '../lib/trim-descriptions'; +import type { ComposeAndValidateInput } from '../types'; +import type { CompositionErrorType } from './shared'; + +export type ComposeSingleArgs = { + schemas: ComposeAndValidateInput; +}; + +export async function composeSingle(args: ComposeSingleArgs) { + const schema = args.schemas[0]; + let schemaAst = parse(schema.raw); + + // If the schema contains type system extension nodes, merge them into the schema. + // We don't want to show many type extension of User, we want to show single User type. + if (schemaAst.definitions.some(isTypeSystemExtensionNode)) { + schemaAst = mergeTypeDefs(schemaAst); + } + const errors: Array = validateSingleSDL(schemaAst); + + return { + errors, + sdl: print(trimDescriptions(schemaAst)), + supergraph: null, + contracts: null, + tags: null, + schemaMetadata: null, + metadataAttributes: null, + }; +} + +function validateSingleSDL(document: DocumentNode): Array<{ + message: string; + source: CompositionErrorSource; +}> { + const errors = validateSDL(document); + + if (errors.length) { + return errors.map(errorWithSource('graphql')); + } + + try { + const schema = buildASTSchema(document); + const errors = validateSchema(schema); + + if (errors.length) { + return errors.map(errorWithSource('graphql')); + } + } catch (err: unknown) { + if (err instanceof GraphQLError) { + return [errorWithSource('graphql')(err)]; + } + throw err; + } + + return []; +} diff --git a/packages/services/schema/src/composition/stitching.ts b/packages/services/schema/src/composition/stitching.ts new file mode 100644 index 000000000..1b002bdc5 --- /dev/null +++ b/packages/services/schema/src/composition/stitching.ts @@ -0,0 +1,92 @@ +import { buildASTSchema, concatAST, DocumentNode, Kind, parse, printSchema } from 'graphql'; +import { validateSDL } from 'graphql/validation/validate.js'; +import { stitchSchemas } from '@graphql-tools/stitch'; +import { stitchingDirectives } from '@graphql-tools/stitching-directives'; +import { errorWithSource, toValidationError } from '../lib/errors'; +import { trimDescriptions } from '../lib/trim-descriptions'; +import type { ComposeAndValidateInput } from '../types'; +import type { CompositionErrorType } from './shared'; + +const { allStitchingDirectivesTypeDefs, stitchingDirectivesValidator } = stitchingDirectives(); +const parsedStitchingDirectives = parse(allStitchingDirectivesTypeDefs); +const stitchingDirectivesNames = extractDirectiveNames(parsedStitchingDirectives); + +function extractDirectiveNames(doc: DocumentNode) { + const directives: string[] = []; + + for (const definition of doc.definitions) { + if (definition.kind === Kind.DIRECTIVE_DEFINITION) { + directives.push(definition.name.value); + } + } + + return directives; +} + +function definesStitchingDirective(doc: DocumentNode) { + return extractDirectiveNames(doc).some(name => stitchingDirectivesNames.includes(name)); +} + +function validateStitchedSchema(doc: DocumentNode) { + const definesItsOwnStitchingDirectives = definesStitchingDirective(doc); + const fullDoc = definesItsOwnStitchingDirectives + ? doc + : concatAST([parsedStitchingDirectives, doc]); + const errors = validateSDL(fullDoc).map(errorWithSource('graphql')); + + // If the schema defines its own stitching directives, + // it means we can't be sure that it follows the official spec. + if (definesItsOwnStitchingDirectives) { + return errors; + } + + try { + stitchingDirectivesValidator( + buildASTSchema(fullDoc, { + assumeValid: true, + assumeValidSDL: true, + }), + ); + } catch (error) { + errors.push(toValidationError(error, 'composition')); + } + + return errors; +} + +export type ComposeStitchingArgs = { + schemas: ComposeAndValidateInput; +}; + +export async function composeStitching(args: ComposeStitchingArgs) { + const parsed = args.schemas.map(s => parse(s.raw)); + const errors: Array = parsed + .map(schema => validateStitchedSchema(schema)) + .flat(); + + let sdl: string | null = null; + try { + sdl = printSchema( + stitchSchemas({ + subschemas: args.schemas.map(schema => + buildASTSchema(trimDescriptions(parse(schema.raw)), { + assumeValid: true, + assumeValidSDL: true, + }), + ), + }), + ); + } catch (error) { + errors.push(toValidationError(error, 'composition')); + } + + return { + errors, + sdl, + supergraph: null, + contracts: null, + tags: null, + schemaMetadata: null, + metadataAttributes: null, + }; +} diff --git a/packages/services/schema/src/environment.ts b/packages/services/schema/src/environment.ts index 7017624f7..87f786630 100644 --- a/packages/services/schema/src/environment.ts +++ b/packages/services/schema/src/environment.ts @@ -25,6 +25,8 @@ const EnvironmentModel = zod.object({ ENVIRONMENT: emptyString(zod.string().optional()), RELEASE: emptyString(zod.string().optional()), ENCRYPTION_SECRET: zod.string(), + COMPOSITION_WORKER_COUNT: zod.number().min(1).default(4), + COMPOSITION_WORKER_MAX_OLD_GENERATION_SIZE_MB: NumberFromString(1).optional().default(512), }); const RequestBrokerModel = zod.union([ @@ -182,4 +184,8 @@ export const env = { signature: requestBroker.REQUEST_BROKER_SIGNATURE, } : null, + compositionWorker: { + count: base.COMPOSITION_WORKER_COUNT, + maxOldGenerationSizeMb: base.COMPOSITION_WORKER_MAX_OLD_GENERATION_SIZE_MB, + }, } as const; diff --git a/packages/services/schema/src/index.ts b/packages/services/schema/src/index.ts index 4a970acf1..175b0c73b 100644 --- a/packages/services/schema/src/index.ts +++ b/packages/services/schema/src/index.ts @@ -1,5 +1,5 @@ #!/usr/bin/env node -import crypto from 'node:crypto'; +import 'reflect-metadata'; import { hostname } from 'os'; import Redis from 'ioredis'; import { @@ -15,26 +15,9 @@ import { import * as Sentry from '@sentry/node'; import { Context, schemaBuilderApiRouter } from './api'; import { createCache } from './cache'; +import { CompositionScheduler } from './composition-scheduler'; import { env } from './environment'; -const ENCRYPTION_SECRET = crypto.createHash('md5').update(env.encryptionSecret).digest('hex'); - -function decryptFactory() { - const ALG = 'aes256'; - const IN_ENC = 'utf8'; - const OUT_ENC = 'hex'; - - const secretBuffer = Buffer.from(ENCRYPTION_SECRET, 'latin1'); - - return function decrypt(text: string) { - const components = text.split(':'); - const iv = Buffer.from(components.shift() || '', OUT_ENC); - const decipher = crypto.createDecipheriv(ALG, secretBuffer, iv); - - return decipher.update(components.join(':'), OUT_ENC, IN_ENC) + decipher.final(IN_ENC); - }; -} - async function main() { let tracing: TracingInstance | undefined; @@ -70,6 +53,12 @@ async function main() { bodyLimit: env.http.bodyLimit, }); + const compositionScheduler = new CompositionScheduler( + server.log, + env.compositionWorker.count, + env.compositionWorker.maxOldGenerationSizeMb, + ); + if (tracing) { await server.register(...tracing.instrumentFastify()); } @@ -122,8 +111,6 @@ async function main() { server.log.info('Redis reconnecting in %s', timeToReconnect); }); - const decrypt = decryptFactory(); - await registerTRPC(server, { router: schemaBuilderApiRouter, createContext({ req }): Context { @@ -138,7 +125,7 @@ async function main() { failure: env.timings.cacheTTL, }, }); - return { cache, req, decrypt, broker: env.requestBroker }; + return { cache, req, broker: env.requestBroker, compositionScheduler }; }, }); diff --git a/packages/services/schema/src/lib/compose.ts b/packages/services/schema/src/lib/compose.ts index ec15bf683..6c92d90bf 100644 --- a/packages/services/schema/src/lib/compose.ts +++ b/packages/services/schema/src/lib/compose.ts @@ -12,7 +12,6 @@ import { compositionHasErrors as nativeCompositionHasErrors, transformSupergraphToPublicSchema, } from '@theguild/federation-composition'; -import type { Cache } from '../cache'; import type { ExternalComposition } from '../types'; import { toValidationError } from './errors'; @@ -146,7 +145,7 @@ export async function composeExternalFederation(args: { subgraphs: Array; decrypt: (value: string) => string; external: Exclude; - cache: Cache; + requestTimeoutMs: number; requestId: string; }): Promise { args.logger.debug( @@ -189,10 +188,10 @@ export async function composeExternalFederation(args: { ...request, }, args.logger, - args.cache.timeoutMs, + args.requestTimeoutMs, args.requestId, ) - : callExternalService(request, args.logger, args.cache.timeoutMs)); + : callExternalService(request, args.logger, args.requestTimeoutMs)); args.logger.debug('Got response from external composition service, trying to safe parse'); const parseResult = EXTERNAL_COMPOSITION_RESULT.safeParse(externalResponse); diff --git a/packages/services/schema/src/lib/trim-descriptions.ts b/packages/services/schema/src/lib/trim-descriptions.ts new file mode 100644 index 000000000..068c678b1 --- /dev/null +++ b/packages/services/schema/src/lib/trim-descriptions.ts @@ -0,0 +1,31 @@ +import { visit, type ASTNode, type DocumentNode } from 'graphql'; + +export function trimDescriptions(doc: DocumentNode): DocumentNode { + function trim(node: T): T { + if (node && 'description' in node && node.description) { + (node.description as any).value = node.description.value.trim(); + } + + return node; + } + + return visit(doc, { + SchemaDefinition: trim, + ObjectTypeDefinition: trim, + ObjectTypeExtension: trim, + InterfaceTypeExtension: trim, + UnionTypeExtension: trim, + InputObjectTypeExtension: trim, + EnumTypeExtension: trim, + SchemaExtension: trim, + ScalarTypeExtension: trim, + FieldDefinition: trim, + InputValueDefinition: trim, + InterfaceTypeDefinition: trim, + UnionTypeDefinition: trim, + EnumTypeDefinition: trim, + EnumValueDefinition: trim, + InputObjectTypeDefinition: trim, + DirectiveDefinition: trim, + }); +} diff --git a/packages/services/schema/src/orchestrators.ts b/packages/services/schema/src/orchestrators.ts deleted file mode 100644 index 177db2884..000000000 --- a/packages/services/schema/src/orchestrators.ts +++ /dev/null @@ -1,604 +0,0 @@ -import type { FastifyRequest } from 'fastify'; -import type { ConstDirectiveNode, DocumentNode, FieldDefinitionNode, NameNode } from 'graphql'; -import { - ASTNode, - buildASTSchema, - concatAST, - GraphQLError, - isTypeSystemExtensionNode, - Kind, - parse, - print, - printSchema, - validateSchema, - visit, -} from 'graphql'; -import { validateSDL } from 'graphql/validation/validate.js'; -import { mergeTypeDefs } from '@graphql-tools/merge'; -import { stitchSchemas } from '@graphql-tools/stitch'; -import { stitchingDirectives } from '@graphql-tools/stitching-directives'; -import type { ServiceLogger } from '@hive/service-common'; -import { extractLinkImplementations } from '@theguild/federation-composition'; -import type { ContractsInputType } from './api'; -import type { Cache } from './cache'; -import { addInaccessibleToUnreachableTypes } from './lib/add-inaccessible-to-unreachable-types'; -import { - composeExternalFederation, - composeFederationV1, - composeFederationV2, - ComposerMethodResult, - SubgraphInput, -} from './lib/compose'; -import { CompositionErrorSource, errorWithSource, toValidationError } from './lib/errors'; -import { - applyTagFilterOnSubgraphs, - createTagDirectiveNameExtractionStrategy, - extractTagsFromDocument, -} from './lib/federation-tag-extraction'; -import { extractMetadata, mergeMetadata } from './lib/metadata-extraction'; -import { SetMap } from './lib/setmap'; -import type { - ComposeAndValidateInput, - ComposeAndValidateOutput, - ExternalComposition, - Metadata, - SchemaType, -} from './types'; - -const { allStitchingDirectivesTypeDefs, stitchingDirectivesValidator } = stitchingDirectives(); -const parsedStitchingDirectives = parse(allStitchingDirectivesTypeDefs); -const stitchingDirectivesNames = extractDirectiveNames(parsedStitchingDirectives); - -function extractDirectiveNames(doc: DocumentNode) { - const directives: string[] = []; - - for (const definition of doc.definitions) { - if (definition.kind === Kind.DIRECTIVE_DEFINITION) { - directives.push(definition.name.value); - } - } - - return directives; -} - -function definesStitchingDirective(doc: DocumentNode) { - return extractDirectiveNames(doc).some(name => stitchingDirectivesNames.includes(name)); -} - -type CompositionErrorType = { - message: string; - source: 'composition' | 'graphql'; -}; - -type ContractResultType = { - id: string; - result: - | { - type: 'success'; - result: { - supergraph: string; - sdl: string; - }; - } - | { - type: 'failure'; - result: { - supergraph?: string; - sdl?: string; - errors: Array; - }; - }; -}; - -type CompositionResultSuccess = { - type: 'success'; - result: { - supergraph: string; - sdl: string; - contracts?: Array; - }; -}; - -type CompositionResultFailure = { - type: 'failure'; - result: { - supergraph?: string; - sdl?: string; - errors: Array; - contracts?: Array; - }; -}; - -type CompositionResult = CompositionResultSuccess | CompositionResultFailure; - -function trimDescriptions(doc: DocumentNode): DocumentNode { - function trim(node: T): T { - if (node && 'description' in node && node.description) { - (node.description as any).value = node.description.value.trim(); - } - - return node; - } - - return visit(doc, { - SchemaDefinition: trim, - ObjectTypeDefinition: trim, - ObjectTypeExtension: trim, - InterfaceTypeExtension: trim, - UnionTypeExtension: trim, - InputObjectTypeExtension: trim, - EnumTypeExtension: trim, - SchemaExtension: trim, - ScalarTypeExtension: trim, - FieldDefinition: trim, - InputValueDefinition: trim, - InterfaceTypeDefinition: trim, - UnionTypeDefinition: trim, - EnumTypeDefinition: trim, - EnumValueDefinition: trim, - InputObjectTypeDefinition: trim, - DirectiveDefinition: trim, - }); -} - -interface Orchestrator { - composeAndValidate( - input: ComposeAndValidateInput, - external: ExternalComposition, - native: boolean, - contracts?: ContractsInputType, - ): Promise; -} - -const createFederation: ( - cache: Cache, - logger: ServiceLogger, - requestId: string, - decrypt: (value: string) => string, -) => Orchestrator = (cache, logger, requestId, decrypt) => { - const compose = cache.reuse< - { - schemas: ComposeAndValidateInput; - external: ExternalComposition; - native: boolean; - contracts: ContractsInputType | undefined; - }, - CompositionResult & { - includesNetworkError: boolean; - includesException?: boolean; - tags: Array | null; - schemaMetadata: Record | null; - metadataAttributes: Record | null; - } - >( - 'federation', - async ({ schemas, external, native, contracts }) => { - const subgraphs = schemas - .map(schema => { - logger.debug(`Parsing subgraph schema SDL (name=%s)`, schema.source); - return { - typeDefs: trimDescriptions(parse(schema.raw)), - name: schema.source, - url: 'url' in schema && typeof schema.url === 'string' ? schema.url : undefined, - }; - }) - .map(subgraph => { - logger.debug(`Extracting link implementations from subgraph (name=%s)`, subgraph.name); - const { matchesImplementation, resolveImportName } = extractLinkImplementations( - subgraph.typeDefs, - ); - if (matchesImplementation('https://specs.graphql-hive.com/hive', 'v1.0')) { - logger.debug( - `Found hive link in subgraph. Applying link logic. (name=%s)`, - subgraph.name, - ); - // if this subgraph implements the metadata spec - // then copy metadata from the schema to all fields. - // @note this is similar to how federation's compose copies join__ directives to fields based on the - // subgraph that the field is a part of. - const metaDirectiveName = resolveImportName( - 'https://specs.graphql-hive.com/hive', - '@meta', - ); - const applyMetaToField = ( - fieldNode: FieldDefinitionNode, - metaDirectives: ConstDirectiveNode[], - ) => { - return { - ...fieldNode, - directives: [ - ...(fieldNode.directives ?? []), - ...metaDirectives.map(d => ({ ...d, loc: undefined })), - ], - }; - }; - - const schemaNodes = subgraph.typeDefs.definitions.filter( - d => d.kind === Kind.SCHEMA_DEFINITION || d.kind === Kind.SCHEMA_EXTENSION, - ); - const schemaMetaDirectives = schemaNodes - .flatMap(node => node.directives?.filter(d => d.name.value === metaDirectiveName)) - .filter(d => d !== undefined); - const interfaceAndObjectHandler = (node: { - readonly fields?: ReadonlyArray | undefined; - readonly directives?: ReadonlyArray | undefined; - readonly name: NameNode; - }) => { - // apply type/interface metadata to fields - const objectMetaDirectives = node.directives - ?.filter(d => d.name.value === metaDirectiveName) - .filter(d => d !== undefined); - if (objectMetaDirectives?.length) { - return { - ...node, - fields: node.fields?.map(f => applyMetaToField(f, objectMetaDirectives)), - }; - } - return node; - }; - subgraph.typeDefs = visit(subgraph.typeDefs, { - FieldDefinition: field => { - return applyMetaToField(field, schemaMetaDirectives); - }, - ObjectTypeDefinition: interfaceAndObjectHandler, - InterfaceTypeDefinition: interfaceAndObjectHandler, - }); - } - return subgraph; - }); - - /** Determine the correct compose method... */ - let compose: (subgraphs: Array) => Promise; - - // Federation v2 - if (native) { - logger.debug( - 'Using built-in Federation v2 composition service (schemas=%s)', - schemas.length, - ); - compose = subgraphs => Promise.resolve(composeFederationV2(subgraphs, logger)); - } else if (external) { - compose = subgraphs => - composeExternalFederation({ - cache, - decrypt, - external, - logger, - requestId, - subgraphs, - }); - } else { - logger.debug( - 'Using built-in Federation v1 composition service (schemas=%s)', - schemas.length, - ); - compose = subgraphs => Promise.resolve(composeFederationV1(subgraphs)); - } - let result: CompositionResult & { - includesNetworkError: boolean; - tags: Array | null; - /** Metadata stored by coordinate and then by subgraph */ - schemaMetadata: Record | null; - metadataAttributes: Record | null; - }; - - { - const composed: CompositionResult & { - includesNetworkError: boolean; - includesException?: boolean; - } = await compose(subgraphs); - - if (composed.type === 'success') { - // merge all metadata from every subgraph by coordinate - const subgraphsMetadata = subgraphs.map(({ name, typeDefs }) => - extractMetadata(typeDefs, name), - ); - const supergraphSDL = parse(composed.result.supergraph); - const { resolveImportName } = extractLinkImplementations(supergraphSDL); - const tagDirectiveName = resolveImportName('https://specs.apollo.dev/tag', '@tag'); - const tagStrategy = createTagDirectiveNameExtractionStrategy(tagDirectiveName); - const tags = extractTagsFromDocument(supergraphSDL, tagStrategy); - const schemaMetadata = mergeMetadata(...subgraphsMetadata); - const metadataAttributes = new SetMap(); - for (const [_coord, attrs] of schemaMetadata) { - for (const attr of attrs) { - metadataAttributes.add(attr.name, attr.content); - } - } - result = { - ...composed, - tags, - schemaMetadata: Object.fromEntries(schemaMetadata), - metadataAttributes: metadataAttributes.toObject(), - }; - } else { - result = { - ...composed, - tags: null, - schemaMetadata: null, - metadataAttributes: null, - }; - } - } - - if (!contracts?.length) { - // if no contracts, then compose and return the result - return result; - } - - if (result.type == 'failure') { - return { - ...result, - result: { - ...result.result, - contracts: contracts.map(contract => ({ - id: contract.id, - result: { - type: 'failure', - result: { - errors: [ - { - message: 'Skipped contract composition, as default graph composition failed.', - source: 'composition', - }, - ], - }, - }, - })), - }, - }; - } - - // if there are contracts, then create - const contractResults = await Promise.all( - contracts.map(async contract => { - // apply contracts to replace tags with inaccessible directives - const filteredSubgraphs = applyTagFilterOnSubgraphs(subgraphs, { - include: new Set(contract.filter.include), - exclude: new Set(contract.filter.exclude), - }); - - // attempt to compose the contract filtered subgraph - const compositionResult = await compose(filteredSubgraphs); - - // Remove unreachable types from public API schema - if ( - contract.filter.removeUnreachableTypesFromPublicApiSchema === true && - compositionResult.type === 'success' - ) { - let supergraphSDL = parse(compositionResult.result.supergraph); - const { resolveImportName } = extractLinkImplementations(supergraphSDL); - return { - id: contract.id, - result: addInaccessibleToUnreachableTypes( - resolveImportName, - compositionResult, - supergraphSDL, - ), - }; - } - return { - id: contract.id, - result: compositionResult, - }; - }), - ); - - const networkErrorContract = contractResults.find( - contract => contract.result.includesNetworkError === true, - ); - - // In case any of the contract composition fails, we will fail the whole composition. - if (networkErrorContract) { - return { - ...networkErrorContract.result, - schemaMetadata: null, - tags: null, - metadataAttributes: null, - }; - } - - return { - ...result, - result: { - supergraph: result.result.supergraph, - sdl: result.result.sdl, - contracts: contractResults, - }, - }; - }, - function pickCacheType(result) { - return ('includesNetworkError' in result && result.includesNetworkError === true) || - ('includesException' in result && result.includesException === true) - ? 'short' - : 'long'; - }, - ); - - return { - async composeAndValidate(schemas, external, native, contracts) { - try { - const composed = await compose({ schemas, external, native, contracts }); - return { - errors: composed.type === 'failure' ? composed.result.errors : [], - sdl: composed.result.sdl ?? null, - supergraph: composed.result.supergraph ?? null, - includesNetworkError: - composed.type === 'failure' && composed.includesNetworkError === true, - contracts: - composed.result.contracts?.map(contract => ({ - id: contract.id, - errors: 'errors' in contract.result.result ? contract.result.result.errors : [], - sdl: contract.result.result.sdl ?? null, - supergraph: contract.result.result.supergraph ?? null, - })) ?? null, - tags: composed.tags ?? null, - schemaMetadata: composed.schemaMetadata ?? null, - metadataAttributes: composed.metadataAttributes ?? null, - }; - } catch (error) { - if (cache.isTimeoutError(error)) { - return { - errors: [ - { - message: error.message, - source: 'graphql', - }, - ], - sdl: null, - supergraph: null, - includesNetworkError: true, - contracts: null, - tags: null, - schemaMetadata: null, - metadataAttributes: null, - }; - } - - throw error; - } - }, - }; -}; - -function validateSingleSDL(document: DocumentNode): Array<{ - message: string; - source: CompositionErrorSource; -}> { - const errors = validateSDL(document); - - if (errors.length) { - return errors.map(errorWithSource('graphql')); - } - - try { - const schema = buildASTSchema(document); - const errors = validateSchema(schema); - - if (errors.length) { - return errors.map(errorWithSource('graphql')); - } - } catch (err: unknown) { - if (err instanceof GraphQLError) { - return [errorWithSource('graphql')(err)]; - } - throw err; - } - - return []; -} - -function createSingle(): Orchestrator { - return { - async composeAndValidate(schemas) { - const schema = schemas[0]; - let schemaAst = parse(schema.raw); - - // If the schema contains type system extension nodes, merge them into the schema. - // We don't want to show many type extension of User, we want to show single User type. - if (schemaAst.definitions.some(isTypeSystemExtensionNode)) { - schemaAst = mergeTypeDefs(schemaAst); - } - const errors = validateSingleSDL(schemaAst); - - return { - errors, - sdl: print(trimDescriptions(schemaAst)), - supergraph: null, - contracts: null, - tags: null, - schemaMetadata: null, - metadataAttributes: null, - }; - }, - }; -} - -const createStitching: (cache: Cache) => Orchestrator = cache => { - const stitchAndPrint = cache.reuse('stitching', async (schemas: string[]) => { - return printSchema( - stitchSchemas({ - subschemas: schemas.map(schema => - buildASTSchema(trimDescriptions(parse(schema)), { - assumeValid: true, - assumeValidSDL: true, - }), - ), - }), - ); - }); - - return { - async composeAndValidate(schemas) { - const parsed = schemas.map(s => parse(s.raw)); - const errors = parsed.map(schema => validateStitchedSchema(schema)).flat(); - - let sdl: string | null = null; - try { - sdl = await stitchAndPrint(schemas.map(s => s.raw)); - } catch (error) { - errors.push(toValidationError(error, 'composition')); - } - - return { - errors, - sdl, - supergraph: null, - contracts: null, - tags: null, - schemaMetadata: null, - metadataAttributes: null, - }; - }, - }; -}; - -function validateStitchedSchema(doc: DocumentNode) { - const definesItsOwnStitchingDirectives = definesStitchingDirective(doc); - const fullDoc = definesItsOwnStitchingDirectives - ? doc - : concatAST([parsedStitchingDirectives, doc]); - const errors = validateSDL(fullDoc).map(errorWithSource('graphql')); - - // If the schema defines its own stitching directives, - // it means we can't be sure that it follows the official spec. - if (definesItsOwnStitchingDirectives) { - return errors; - } - - try { - stitchingDirectivesValidator( - buildASTSchema(fullDoc, { - assumeValid: true, - assumeValidSDL: true, - }), - ); - } catch (error) { - errors.push(toValidationError(error, 'composition')); - } - - return errors; -} - -export function pickOrchestrator( - type: SchemaType, - cache: Cache, - req: FastifyRequest, - decrypt: (value: string) => string, -) { - switch (type) { - case 'federation': - return createFederation( - cache, - req.log, - req.id ?? Math.random().toString(16).substring(2), - decrypt, - ); - case 'single': - return createSingle(); - case 'stitching': - return createStitching(cache); - default: - throw new Error(`Unknown schema type: ${type}`); - } -} diff --git a/packages/services/server/src/persisted-documents-worker.ts b/packages/services/server/src/persisted-documents-worker.ts index 9356ee75c..1241eb5fc 100644 --- a/packages/services/server/src/persisted-documents-worker.ts +++ b/packages/services/server/src/persisted-documents-worker.ts @@ -1,41 +1,14 @@ import 'reflect-metadata'; import { parentPort } from 'node:worker_threads'; -import { Logger } from '@hive/api'; import { createWorker } from '../../api/src/modules/app-deployments/worker/persisted-documents-worker'; +import { createMessagePortLogger } from '../../api/src/modules/shared/providers/logger'; import { env } from './environment'; if (!parentPort) { throw new Error('This script must be run as a worker.'); } -const pp = parentPort; -function createLogger(bindings: Record = {}): Logger { - return { - child(newBindings) { - return createLogger({ ...bindings, ...newBindings }); - }, - debug(...args) { - pp.postMessage({ event: 'log', level: 'debug', args, bindings }); - }, - error(...args) { - pp.postMessage({ event: 'log', level: 'error', args, bindings }); - }, - fatal(...args) { - pp.postMessage({ event: 'log', level: 'fatal', args, bindings }); - }, - info(...args) { - pp.postMessage({ event: 'log', level: 'info', args, bindings }); - }, - trace(...args) { - pp.postMessage({ event: 'log', level: 'trace', args, bindings }); - }, - warn(...args) { - pp.postMessage({ event: 'log', level: 'warn', args, bindings }); - }, - }; -} - -createWorker(parentPort, createLogger(), { +createWorker(parentPort, createMessagePortLogger(parentPort), { clickhouse: env.clickhouse, s3: env.s3, s3Mirror: env.s3Mirror, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0198d9c2b..1bc809a5d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1171,6 +1171,9 @@ importers: fastify: specifier: 4.29.0 version: 4.29.0 + fastq: + specifier: 1.19.1 + version: 1.19.1 got: specifier: 14.4.7 version: 14.4.7(patch_hash=f7660444905ddadee251ff98241119fb54f5fec1e673a428192da361d5636299) @@ -1189,6 +1192,9 @@ importers: pino-pretty: specifier: 11.3.0 version: 11.3.0 + reflect-metadata: + specifier: 0.2.2 + version: 0.2.2 zod: specifier: 3.24.1 version: 3.24.1 @@ -10235,8 +10241,8 @@ packages: fastify@4.29.0: resolution: {integrity: sha512-MaaUHUGcCgC8fXQDsDtioaCcag1fmPJ9j64vAKunqZF4aSub040ZGi/ag8NGE2714yREPOKZuHCfpPzuUD3UQQ==} - fastq@1.17.1: - resolution: {integrity: sha512-sRVD3lWVIXWg6By68ZN7vho9a1pQcN/WBFaAAsDDFzlJjvoGx0P8z7V1t72grFJfJhu3YPZBuu25f7Kaw2jN1w==} + fastq@1.19.1: + resolution: {integrity: sha512-GwLTyxkCXjXbxqIhTsMI2Nui8huMPtnxg7krajPJAjnEG/iiOS7i+zCtWGZR9G0NBKbXKh6X9m9UIsYX/N6vvQ==} fault@2.0.1: resolution: {integrity: sha512-WtySTkS4OKev5JtpHXnib4Gxiurzh5NCGvWrFaZ34m6JehfTUhKZvn9njTfw48t6JumVQOmrKqpmGcdwxnhqBQ==} @@ -18389,7 +18395,7 @@ snapshots: '@fastify/send': 2.1.0 content-disposition: 0.5.4 fastify-plugin: 4.5.1 - fastq: 1.17.1 + fastq: 1.19.1 glob: 10.3.12 '@fastify/vite@6.0.7(patch_hash=f5ce073a4db250ed3db1d9c19e2a253418454ee379530bdee25869570d7b500b)(@types/node@22.10.5)(less@4.2.0)(lightningcss@1.28.2)(terser@5.37.0)': @@ -20259,7 +20265,7 @@ snapshots: '@nodelib/fs.walk@1.2.8': dependencies: '@nodelib/fs.scandir': 2.1.5 - fastq: 1.17.1 + fastq: 1.19.1 '@nodesecure/estree-ast-utils@1.5.0': dependencies: @@ -25181,7 +25187,7 @@ snapshots: avvio@8.4.0: dependencies: '@fastify/error': 3.4.1 - fastq: 1.17.1 + fastq: 1.19.1 aws-sign2@0.7.0: {} @@ -27365,7 +27371,7 @@ snapshots: semver: 7.6.3 toad-cache: 3.7.0 - fastq@1.17.1: + fastq@1.19.1: dependencies: reusify: 1.0.4