feat(schema): run composition in worker threads (#6725)

This commit is contained in:
Laurin Quast 2025-04-14 13:48:42 +02:00 committed by GitHub
parent 114f10177c
commit d0e0b65fc6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1360 additions and 729 deletions

View file

@ -58,7 +58,7 @@ export const watchEntryPlugin = () => {
name: 'node-watch-entry',
esbuildOptions(options) {
const entries = (options.entryPoints as string[]) || [];
const entry = entries[0];
const entry = entries.find(entry => entry === 'src/dev.ts' || entry === 'test/root.ts');
if (!entry) {
throw new Error('No entry point found');

View file

@ -50,7 +50,7 @@ export function deploySchema({
startupProbe: '/_health',
exposesMetrics: true,
replicas: environment.isProduction ? 3 : 1,
memoryLimit: '1Gi',
memoryLimit: '2Gi',
pdb: true,
},
[redis.deployment, redis.service],

View file

@ -2,8 +2,7 @@ import path from 'node:path';
import { Worker } from 'node:worker_threads';
import { fileURLToPath } from 'url';
import { Injectable, Scope } from 'graphql-modules';
import { LogLevel } from 'graphql-yoga';
import { Logger } from '../../shared/providers/logger';
import { Logger, registerWorkerLogging } from '../../shared/providers/logger';
import { BatchProcessedEvent, BatchProcessEvent } from './persisted-document-ingester';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
@ -48,32 +47,19 @@ export class PersistedDocumentScheduler {
}
this.logger.debug('Re-Creating worker %s', index);
this.workers[index] = this.createWorker(index);
this.logger.debug('Cancel pending tasks %s', index);
for (const [, task] of tasks) {
task.reject(new Error('Worker stopped.'));
}
this.workers[index] = this.createWorker(index);
});
registerWorkerLogging(this.logger, worker, name);
worker.on(
'message',
(
data:
| BatchProcessedEvent
| { event: 'error'; id: string; err: Error }
| {
event: 'log';
bindings: Record<string, unknown>;
level: LogLevel;
args: [string, ...unknown[]];
},
) => {
if (data.event === 'log') {
this.logger.child(data.bindings)[data.level](...data.args);
return;
}
(data: BatchProcessedEvent | { event: 'error'; id: string; err: Error }) => {
if (data.event === 'error') {
tasks.get(data.id)?.reject(data.err);
}

View file

@ -1,4 +1,5 @@
import { CONTEXT, Inject, Injectable, Scope } from 'graphql-modules';
import { abortSignalAny } from '@graphql-hive/signal';
import type { SchemaBuilderApi } from '@hive/schema';
import { traceFn } from '@hive/service-common';
import { createTRPCProxyClient, httpLink } from '@trpc/client';
@ -14,6 +15,7 @@ export class SingleOrchestrator implements Orchestrator {
type = ProjectType.SINGLE;
private logger: Logger;
private schemaService;
private incomingRequestAbortSignal: AbortSignal;
constructor(
logger: Logger,
@ -32,6 +34,7 @@ export class SingleOrchestrator implements Orchestrator {
}),
],
});
this.incomingRequestAbortSignal = context.request.signal;
}
@traceFn('SingleOrchestrator.composeAndValidate', {
@ -54,14 +57,42 @@ export class SingleOrchestrator implements Orchestrator {
throw new Error('too many schemas');
}
const result = await this.schemaService.composeAndValidate.mutate({
type: 'single',
schemas: schemas.map(s => ({
raw: s.raw,
source: s.source,
})),
});
const timeoutAbortSignal = AbortSignal.timeout(30_000);
return result;
const onTimeout = () => {
this.logger.debug('Composition HTTP request aborted due to timeout of 30 seconds.');
};
timeoutAbortSignal.addEventListener('abort', onTimeout);
const onIncomingRequestAbort = () => {
this.logger.debug('Composition HTTP request aborted due to incoming request being canceled.');
};
this.incomingRequestAbortSignal.addEventListener('abort', onIncomingRequestAbort);
try {
const result = await this.schemaService.composeAndValidate.mutate(
{
type: 'single',
schemas: schemas.map(s => ({
raw: s.raw,
source: s.source,
})),
},
{
// We want to abort composition if the request that does the composition is aborted
// We also limit the maximum time allowed for composition requests to 30 seconds to avoid
//
// The reason for these is a potential dead-lock.
//
// Note: We are using `abortSignalAny` over `AbortSignal.any` because of leak issues.
// @source https://github.com/nodejs/node/issues/57584
signal: abortSignalAny([this.incomingRequestAbortSignal, timeoutAbortSignal]),
},
);
return result;
} finally {
timeoutAbortSignal.removeEventListener('abort', onTimeout);
this.incomingRequestAbortSignal.removeEventListener('abort', onIncomingRequestAbort);
}
}
}

View file

@ -1,4 +1,5 @@
import { CONTEXT, Inject, Injectable, Scope } from 'graphql-modules';
import { abortSignalAny } from '@graphql-hive/signal';
import type { SchemaBuilderApi } from '@hive/schema';
import { traceFn } from '@hive/service-common';
import { createTRPCProxyClient, httpLink } from '@trpc/client';
@ -14,6 +15,7 @@ export class StitchingOrchestrator implements Orchestrator {
type = ProjectType.STITCHING;
private logger: Logger;
private schemaService;
private incomingRequestAbortSignal: AbortSignal;
constructor(
logger: Logger,
@ -32,6 +34,7 @@ export class StitchingOrchestrator implements Orchestrator {
}),
],
});
this.incomingRequestAbortSignal = context.request.signal;
}
@traceFn('StitchingOrchestrator.composeAndValidate', {
@ -47,15 +50,43 @@ export class StitchingOrchestrator implements Orchestrator {
async composeAndValidate(schemas: SchemaObject[]) {
this.logger.debug('Composing and Validating Stitched Schemas');
const result = await this.schemaService.composeAndValidate.mutate({
type: 'stitching',
schemas: schemas.map(s => ({
raw: s.raw,
source: s.source,
url: s.url ?? null,
})),
});
const timeoutAbortSignal = AbortSignal.timeout(30_000);
return result;
const onTimeout = () => {
this.logger.debug('Composition HTTP request aborted due to timeout of 30 seconds.');
};
timeoutAbortSignal.addEventListener('abort', onTimeout);
const onIncomingRequestAbort = () => {
this.logger.debug('Composition HTTP request aborted due to incoming request being canceled.');
};
this.incomingRequestAbortSignal.addEventListener('abort', onIncomingRequestAbort);
try {
const result = await this.schemaService.composeAndValidate.mutate(
{
type: 'stitching',
schemas: schemas.map(s => ({
raw: s.raw,
source: s.source,
url: s.url ?? null,
})),
},
{
// We want to abort composition if the request that does the composition is aborted
// We also limit the maximum time allowed for composition requests to 30 seconds to avoid
//
// The reason for these is a potential dead-lock.
//
// Note: We are using `abortSignalAny` over `AbortSignal.any` because of leak issues.
// @source https://github.com/nodejs/node/issues/57584
signal: abortSignalAny([this.incomingRequestAbortSignal, timeoutAbortSignal]),
},
);
return result;
} finally {
timeoutAbortSignal.removeEventListener('abort', onTimeout);
this.incomingRequestAbortSignal.removeEventListener('abort', onIncomingRequestAbort);
}
}
}

View file

@ -1,3 +1,5 @@
import type { MessagePort, Worker } from 'node:worker_threads';
import type { LogLevel } from 'fastify';
import { Injectable } from 'graphql-modules';
export type LogFn = (msg: string, ...args: unknown[]) => void;
@ -30,3 +32,59 @@ export class NoopLogger extends Logger {
debug = noop;
child = () => this;
}
export type MessagePortLog = {
event: 'log';
bindings: Record<string, unknown>;
level: Exclude<LogLevel, 'silent'>;
args: [string, ...unknown[]];
};
/**
* Create a logger that posts the logs to the message port.
* The main use-case of this is to forward logs from a worker thread to the main thread.
*/
export function createMessagePortLogger(
port: MessagePort,
bindings: Record<string, unknown> = {},
): Logger {
return {
child(newBindings) {
return createMessagePortLogger(port, { ...bindings, ...newBindings });
},
debug(...args) {
port.postMessage({ event: 'log', level: 'debug', args, bindings });
},
error(...args) {
port.postMessage({ event: 'log', level: 'error', args, bindings });
},
fatal(...args) {
port.postMessage({ event: 'log', level: 'fatal', args, bindings });
},
info(...args) {
port.postMessage({ event: 'log', level: 'info', args, bindings });
},
trace(...args) {
port.postMessage({ event: 'log', level: 'trace', args, bindings });
},
warn(...args) {
port.postMessage({ event: 'log', level: 'warn', args, bindings });
},
};
}
/**
* Register a logger from the message port to log to this threads logger.
*/
export function registerWorkerLogging(logger: Logger, worker: Worker, workerId: string) {
worker.on('message', (data: MessagePortLog) => {
if ('event' in data && data.event === 'log') {
logger
.child({
...data.bindings,
workerId,
})
[data.level](...data.args);
}
});
}

View file

@ -1,7 +1,7 @@
# `@hive/schema`
Service for validating schemas or verifying whether a composite GraphQL schema can be composed out
of subschemas.
of subschemas. Supports Federation, Schema Stitching and Monolithic Schemas.
## Configuration
@ -26,3 +26,53 @@ of subschemas.
| `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) |
| `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) |
| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` |
## Documentation
### Composition Request Handling
The following diagram outlines how the service handles incoming composition requests via HTTP
(tRPC). It details the decision-making process around caching with Redis, reuse of in-progress
tasks, and task execution using a limited pool of worker threads.
Each composition task runs in an isolated worker thread with memory limits to prevent a single
malfunctioning task from affecting the stability of the entire service. This setup ensures robust
and efficient processing by avoiding redundant computation, serving cached results when possible,
and queuing tasks when resources are saturated.
```mermaid
sequenceDiagram
participant Client
participant Service
participant Redis
participant TaskManager
participant WorkerPool
Client->>Service: Composition HTTP request (tRPC)
Service->>Redis: Check for cached result
alt Cached result found
Redis-->>Service: Return result
Service-->>Client: Send cached result
else Not cached
Service->>TaskManager: Check if task in progress
alt Task in progress
TaskManager-->>Service: Return existing task
Service->>TaskManager: Wait for task completion
TaskManager-->>Service: Return result
Service-->>Client: Send result
else No task in progress
TaskManager->>WorkerPool: Check for available worker
alt Worker available
WorkerPool-->>TaskManager: Assign task
else No workers available
TaskManager->>TaskManager: Enqueue task in memory
TaskManager->>WorkerPool: Wait for available worker
WorkerPool-->>TaskManager: Assign task when ready
end
WorkerPool->>TaskManager: Task completed
TaskManager->>Redis: Cache result
TaskManager-->>Service: Return result to pending requests
Service-->>Client: Send result
end
end
```

View file

@ -258,7 +258,7 @@ test('run action again when the action expires', async ({ expect }) => {
const actionId = randomString();
async function actionFn() {
await waitFor(timeoutMs);
await waitFor(timeoutMs - 1);
return 'foo';
}

View file

@ -4,8 +4,8 @@
"license": "MIT",
"private": true,
"scripts": {
"build": "tsx ../../../scripts/runify.ts",
"dev": "tsup-node --config ../../../configs/tsup/dev.config.node.ts src/dev.ts",
"build": "tsx ../../../scripts/runify.ts src/index.ts src/composition-worker-main.ts",
"dev": "tsup-node --config ../../../configs/tsup/dev.config.node.ts src/dev.ts src/composition-worker-main.ts ",
"typecheck": "tsc --noEmit"
},
"devDependencies": {
@ -22,12 +22,14 @@
"dotenv": "16.4.7",
"fast-json-stable-stringify": "2.1.0",
"fastify": "4.29.0",
"fastq": "1.19.1",
"got": "14.4.7",
"graphql": "16.9.0",
"ioredis": "5.4.2",
"ioredis-mock": "8.9.0",
"p-timeout": "6.1.4",
"pino-pretty": "11.3.0",
"reflect-metadata": "0.2.2",
"zod": "3.24.1"
}
}

View file

@ -4,19 +4,24 @@ import { handleTRPCError } from '@hive/service-common';
import type { inferRouterInputs } from '@trpc/server';
import { initTRPC } from '@trpc/server';
import type { Cache } from './cache';
import type { CompositionScheduler } from './composition-scheduler';
import { type ComposeFederationArgs } from './composition/federation';
import type { CompositionErrorType } from './composition/shared';
import { ComposeSingleArgs } from './composition/single';
import { ComposeStitchingArgs } from './composition/stitching';
import { composeAndValidateCounter } from './metrics';
import { pickOrchestrator } from './orchestrators';
import type { Metadata } from './types';
export type { CompositionFailureError, CompositionErrorSource } from './lib/errors';
export interface Context {
req: FastifyRequest;
cache: Cache;
decrypt(value: string): string;
broker: {
endpoint: string;
signature: string;
} | null;
compositionScheduler: CompositionScheduler;
}
const t = initTRPC.context<Context>().create();
@ -83,21 +88,121 @@ export const schemaBuilderApiRouter = t.router({
}),
]),
)
.mutation(async ({ ctx, input }) => {
.mutation(async ({ ctx, input }): Promise<CompositionResponse> => {
composeAndValidateCounter.inc({ type: input.type });
return await pickOrchestrator(input.type, ctx.cache, ctx.req, ctx.decrypt).composeAndValidate(
input.schemas,
'external' in input && input.external
? {
...input.external,
broker: ctx.broker,
}
: null,
'native' in input && input.native ? true : false,
'contracts' in input && input.contracts ? input.contracts : undefined,
);
try {
if (input.type === 'federation') {
return await ctx.cache.reuse(
'federation',
async (args: ComposeFederationArgs, abortSignal) => {
const result = await ctx.compositionScheduler.process({
data: {
type: 'federation',
args,
requestTimeoutMs: ctx.cache.timeoutMs,
},
abortSignal,
requestId: ctx.req.id,
});
return result.result;
},
result =>
result.includesNetworkError === true || result.includesException === true
? 'short'
: 'long',
)({
schemas: input.schemas,
external:
'external' in input && input.external
? {
...input.external,
broker: ctx.broker,
}
: null,
native: 'native' in input && input.native ? true : false,
contracts: 'contracts' in input && input.contracts ? input.contracts : undefined,
requestId: ctx.req.id,
});
}
if (input.type === 'stitching') {
return await ctx.cache.reuse(
'stitching',
async (args: ComposeStitchingArgs, abortSignal) => {
const result = await ctx.compositionScheduler.process({
data: {
type: 'stitching',
args,
},
abortSignal,
requestId: ctx.req.id,
});
return result.result;
},
)({ schemas: input.schemas });
}
if (input.type === 'single') {
return await ctx.cache.reuse('single', async (args: ComposeSingleArgs, abortSignal) => {
const result = await ctx.compositionScheduler.process({
data: {
type: 'single',
args,
},
abortSignal,
requestId: ctx.req.id,
});
return result.result;
})({ schemas: input.schemas });
}
assertAllCasesExhausted(input);
} catch (error) {
// Treat timeouts caused by external composition as "expected errors"
if (ctx.cache.isTimeoutError(error) && input.type === 'federation' && input.external) {
return {
errors: [
{
message: error.message,
source: 'graphql',
},
],
sdl: null,
supergraph: null,
includesNetworkError: true,
contracts: null,
tags: null,
schemaMetadata: null,
metadataAttributes: null,
} satisfies CompositionResponse;
}
throw error;
}
throw new Error('tRCP and TypeScript for the win.');
}),
});
export type SchemaBuilderApi = typeof schemaBuilderApiRouter;
export type SchemaBuilderApiInput = inferRouterInputs<SchemaBuilderApi>;
function assertAllCasesExhausted(value: never) {
throw new Error(`Not all cases are exhaused. Value '${value}'.`);
}
export type CompositionResponse = {
errors: Array<CompositionErrorType>;
sdl: null | string;
supergraph: null | string;
contracts: Array<{
id: string;
errors: Array<CompositionErrorType>;
sdl: string | null;
supergraph: string | null;
}> | null;
schemaMetadata: Record<string, Metadata[]> | null;
metadataAttributes: Record<string, string[]> | null;
tags: Array<string> | null;
includesNetworkError?: boolean;
};

View file

@ -1,7 +1,7 @@
import { createHash } from 'node:crypto';
import stringify from 'fast-json-stable-stringify';
import type { Redis } from 'ioredis';
import pTimeout, { TimeoutError } from 'p-timeout';
import { TimeoutError } from 'p-timeout';
import type { ServiceLogger } from '@hive/service-common';
import { externalCompositionCounter } from './metrics';
@ -125,7 +125,7 @@ export function createCache(options: {
async function runAction<I, O>(
groupKey: string,
factory: (input: I) => Promise<O>,
factory: (input: I, signal: AbortSignal) => Promise<O>,
pickCacheType: (output: O) => CacheTTLType,
input: I,
attempt: number,
@ -179,10 +179,16 @@ export function createCache(options: {
try {
logger.debug('Executing action (id=%s)', id);
const result = await pTimeout(factory(input), {
milliseconds: timeoutMs,
message: `Timeout: took longer than ${timeoutMs}ms to complete`,
});
const timeoutSignal = AbortSignal.timeout(timeoutMs);
const result = await Promise.race([
factory(input, timeoutSignal),
new Promise<never>((_, reject) => {
timeoutSignal.addEventListener('abort', () => {
reject(new TimeoutError(`Timeout: took longer than ${timeoutMs}ms to complete`));
});
}),
]);
await completeAction(id, result, pickCacheType);
externalCompositionCounter.inc({
cache: 'miss',
@ -206,7 +212,7 @@ export function createCache(options: {
},
reuse<I, O>(
groupKey: string,
factory: (input: I) => Promise<O>,
factory: (input: I, signal: AbortSignal) => Promise<O>,
pickCacheType: (output: O) => CacheTTLType = () => 'long',
): (input: I) => Promise<O> {
return async input => runAction(groupKey, factory, pickCacheType, input, 1);

View file

@ -0,0 +1,184 @@
import * as path from 'node:path';
import { Worker } from 'node:worker_threads';
import fastq from 'fastq';
import * as Sentry from '@sentry/node';
import { registerWorkerLogging, type Logger } from '../../api/src/modules/shared/providers/logger';
import type { CompositionEvent, CompositionResultEvent } from './composition-worker';
type WorkerRunArgs = {
data: CompositionEvent['data'];
requestId: string;
abortSignal: AbortSignal;
};
type Task = Omit<PromiseWithResolvers<CompositionResultEvent>, 'promise'>;
type WorkerInterface = {
readonly isIdle: boolean;
name: string;
/** Run a task on the worker. */
run: (args: WorkerRunArgs) => Promise<CompositionResultEvent['data']>;
};
export class CompositionScheduler {
private logger: Logger;
/** The amount of parallel workers */
private workerCount: number;
private maxOldGenerationSizeMb: number;
/** List of all workers */
private workers: Array<WorkerInterface>;
private queue: fastq.queueAsPromised<WorkerRunArgs, CompositionResultEvent['data']>;
constructor(logger: Logger, workerCount: number, maxOldGenerationSizeMb: number) {
this.workerCount = workerCount;
this.maxOldGenerationSizeMb = maxOldGenerationSizeMb;
this.logger = logger.child({ source: 'CompositionScheduler' });
const workers = Array.from({ length: this.workerCount }, (_, i) => this.createWorker(i));
this.workers = workers;
this.queue = fastq.promise(
function queue(data) {
// Let's not process aborted requests
if (data.abortSignal.aborted) {
throw data.abortSignal.reason;
}
const worker = workers.find(worker => worker.isIdle);
if (!worker) {
throw new Error('No idle worker found.');
}
return worker.run(data);
},
// The size needs to be the same as the length of `this.workers`.
// Otherwise a worker would process more than a single task at a time.
this.workerCount,
);
}
private createWorker(index: number): WorkerInterface {
this.logger.debug('Creating worker %s', index);
const name = `composition-worker-${index}`;
const worker = new Worker(path.join(__dirname, 'composition-worker-main.js'), {
name,
resourceLimits: {
maxOldGenerationSizeMb: this.maxOldGenerationSizeMb,
},
});
let workerState: {
task: Task;
args: WorkerRunArgs;
} | null = null;
const recreate = (reason: Error) => {
void worker.terminate().finally(() => {
this.logger.debug('Re-Creating worker %s', index);
this.workers[index] = this.createWorker(index);
if (workerState) {
this.logger.debug('Cancel pending task %s', index);
workerState.task.reject(reason);
}
});
};
worker.on('error', error => {
console.error(error);
this.logger.error('Worker error %s', error);
Sentry.captureException(error, {
extra: {
requestId: workerState?.args.requestId ?? '',
compositionType: workerState?.args.data.type,
},
});
recreate(error);
});
worker.on('exit', code => {
this.logger.error('Worker stopped with exit code %s', String(code));
});
registerWorkerLogging(this.logger, worker, name);
worker.on(
'message',
(data: CompositionResultEvent | { event: 'error'; id: string; err: Error }) => {
if (data.event === 'error') {
workerState?.task.reject(data.err);
}
if (data.event === 'compositionResult') {
workerState?.task.resolve(data);
}
},
);
const { logger: baseLogger } = this;
function run(args: WorkerRunArgs) {
if (workerState) {
throw new Error('Can not run task in worker that is not idle.');
}
const taskId = crypto.randomUUID();
const logger = baseLogger.child({ taskId, reqId: args.requestId });
const d = Promise.withResolvers<CompositionResultEvent>();
let task: Task = {
resolve: data => {
args.abortSignal.removeEventListener('abort', onAbort);
workerState = null;
d.resolve(data);
},
reject: err => {
args.abortSignal.removeEventListener('abort', onAbort);
void worker.terminate().finally(() => {
workerState = null;
d.reject(err);
});
},
};
workerState = {
task,
args,
};
function onAbort() {
logger.error('Task aborted.');
recreate(new Error('Task aborted'));
}
args.abortSignal.addEventListener('abort', onAbort);
const time = process.hrtime();
worker.postMessage({
event: 'composition',
id: taskId,
data: args.data,
taskId,
requestId: args.requestId,
} satisfies CompositionEvent);
return d.promise
.finally(() => {
const endTime = process.hrtime(time);
logger.debug('Time taken: %ds:%dms', endTime[0], endTime[1] / 1000000);
})
.then(result => result.data);
}
return {
get isIdle() {
return workerState === null;
},
name,
run,
};
}
/** Process a composition task in a worker (once the next worker is free). */
process(args: WorkerRunArgs): Promise<CompositionResultEvent['data']> {
return this.queue.push(args);
}
}

View file

@ -0,0 +1,17 @@
import 'reflect-metadata';
import { parentPort } from 'node:worker_threads';
import { createMessagePortLogger } from '../../api/src/modules/shared/providers/logger';
import { createCompositionWorker } from './composition-worker';
import { env } from './environment';
if (!parentPort) {
throw new Error('This script must be run as a worker.');
}
const baseLogger = createMessagePortLogger(parentPort);
createCompositionWorker({
baseLogger,
port: parentPort,
env,
});

View file

@ -0,0 +1,186 @@
import * as crypto from 'node:crypto';
import type { MessagePort } from 'node:worker_threads';
import type { Logger } from '@hive/api';
import { CompositionResponse } from './api';
import { createComposeFederation, type ComposeFederationArgs } from './composition/federation';
import { composeSingle, type ComposeSingleArgs } from './composition/single';
import { composeStitching, type ComposeStitchingArgs } from './composition/stitching';
import type { env } from './environment';
export function createCompositionWorker(args: {
baseLogger: Logger;
port: MessagePort;
env: typeof env;
}) {
const baseLogger = args.baseLogger.child({
source: 'CompositionWorker',
});
const decrypt = decryptFactory(args.env.encryptionSecret);
process.on('unhandledRejection', function (err) {
console.error('unhandledRejection', err);
console.error(err);
args.port.postMessage({
code: 'ERROR',
err,
});
process.exit(1);
});
process.on('uncaughtException', function (err) {
console.error('uncaughtException', err);
args.port.postMessage({
code: 'ERROR',
err,
});
process.exit(1);
});
args.port.on('message', async (message: CompositionEvent) => {
const logger = baseLogger.child({
taskId: message.taskId,
reqId: message.requestId,
messageId: message.id,
event: message.event,
});
logger.debug('processing message');
try {
if (message.event === 'composition') {
if (message.data.type === 'federation') {
const composeFederation = createComposeFederation({
decrypt,
logger: baseLogger.child({ reqId: message.data.args.requestId }) as any,
requestTimeoutMs: message.data.requestTimeoutMs,
});
const composed = await composeFederation(message.data.args);
args.port.postMessage({
event: 'compositionResult',
id: message.id,
data: {
type: message.data.type,
result: {
errors: composed.result.errors ?? [],
sdl: composed.result.sdl ?? null,
supergraph: composed.result.supergraph ?? null,
includesNetworkError: composed.result.includesNetworkError === true,
contracts:
composed.result.contracts?.map(contract => ({
id: contract.id,
errors: 'errors' in contract.result.result ? contract.result.result.errors : [],
sdl: contract.result.result.sdl ?? null,
supergraph: contract.result.result.supergraph ?? null,
})) ?? null,
tags: composed.result.tags ?? null,
schemaMetadata: composed.result.schemaMetadata ?? null,
metadataAttributes: composed.result.metadataAttributes ?? null,
includesException: composed.result.includesException === true,
},
},
} satisfies CompositionResultEvent);
return;
}
if (message.data.type === 'single') {
const result = await composeSingle(message.data.args);
args.port.postMessage({
event: 'compositionResult',
id: message.id,
data: {
type: message.data.type,
result,
},
} satisfies CompositionResultEvent);
return;
}
if (message.data.type === 'stitching') {
const result = await composeStitching(message.data.args);
args.port.postMessage({
event: 'compositionResult',
id: message.id,
data: {
type: message.data.type,
result,
},
} satisfies CompositionResultEvent);
return;
}
assertAllCasesExhausted(message.data);
return;
}
assertAllCasesExhausted(message.event);
} catch (err: unknown) {
baseLogger.error(
'unexpected error while processing message in worker (messageId=%s)',
message.id,
);
baseLogger.error(String(err));
args.port.postMessage({
event: 'error',
id: message.id,
err,
});
}
});
process.on('exit', function (code) {
console.log('exit', code);
});
}
export type CompositionEvent = {
id: string;
event: 'composition';
requestId: string;
taskId: string;
data:
| {
type: 'federation';
args: ComposeFederationArgs;
requestTimeoutMs: number;
}
| {
type: 'single';
args: ComposeSingleArgs;
}
| {
type: 'stitching';
args: ComposeStitchingArgs;
};
};
export type CompositionResultEvent = {
id: string;
event: 'compositionResult';
data: {
type: 'federation' | 'single' | 'stitching';
result: CompositionResponse & {
includesException?: boolean;
};
};
};
function decryptFactory(encryptionSecret: string) {
const ENCRYPTION_SECRET = crypto.createHash('md5').update(encryptionSecret).digest('hex');
const ALG = 'aes256';
const IN_ENC = 'utf8';
const OUT_ENC = 'hex';
const secretBuffer = Buffer.from(ENCRYPTION_SECRET, 'latin1');
return function decrypt(text: string) {
const components = text.split(':');
const iv = Buffer.from(components.shift() || '', OUT_ENC);
const decipher = crypto.createDecipheriv(ALG, secretBuffer, iv);
return decipher.update(components.join(':'), OUT_ENC, IN_ENC) + decipher.final(IN_ENC);
};
}
function assertAllCasesExhausted(value: never) {
throw new Error(`Not all cases are exhaused. Value '${value}'.`);
}

View file

@ -0,0 +1,353 @@
import {
Kind,
parse,
visit,
type ConstDirectiveNode,
type FieldDefinitionNode,
type NameNode,
} from 'graphql';
import type { ServiceLogger } from '@hive/service-common';
import { extractLinkImplementations } from '@theguild/federation-composition';
import type { ContractsInputType } from '../api';
import { addInaccessibleToUnreachableTypes } from '../lib/add-inaccessible-to-unreachable-types';
import {
composeExternalFederation,
composeFederationV1,
composeFederationV2,
ComposerMethodResult,
SubgraphInput,
} from '../lib/compose';
import {
applyTagFilterOnSubgraphs,
createTagDirectiveNameExtractionStrategy,
extractTagsFromDocument,
} from '../lib/federation-tag-extraction';
import { extractMetadata, mergeMetadata } from '../lib/metadata-extraction';
import { SetMap } from '../lib/setmap';
import { trimDescriptions } from '../lib/trim-descriptions';
import type { ComposeAndValidateInput, ExternalComposition } from '../types';
import {
CompositionResult,
ContractResultFailure,
ContractResultSuccess,
ContractResultType,
} from './shared';
export type ComposeFederationDeps = {
logger: ServiceLogger;
decrypt: (value: string) => string;
requestTimeoutMs: number;
};
export type ComposeFederationArgs = {
schemas: ComposeAndValidateInput;
external: ExternalComposition;
native: boolean;
contracts: ContractsInputType | undefined;
requestId: string;
};
export const createComposeFederation = (deps: ComposeFederationDeps) =>
async function composeFederation(args: ComposeFederationArgs): Promise<CompositionResult> {
const subgraphs = args.schemas
.map(schema => {
deps.logger.debug(`Parsing subgraph schema SDL (name=%s)`, schema.source);
return {
typeDefs: trimDescriptions(parse(schema.raw)),
name: schema.source,
url: 'url' in schema && typeof schema.url === 'string' ? schema.url : undefined,
};
})
.map(subgraph => {
deps.logger.debug(`Extracting link implementations from subgraph (name=%s)`, subgraph.name);
const { matchesImplementation, resolveImportName } = extractLinkImplementations(
subgraph.typeDefs,
);
if (matchesImplementation('https://specs.graphql-hive.com/hive', 'v1.0')) {
deps.logger.debug(
`Found hive link in subgraph. Applying link logic. (name=%s)`,
subgraph.name,
);
// if this subgraph implements the metadata spec
// then copy metadata from the schema to all fields.
// @note this is similar to how federation's compose copies join__ directives to fields based on the
// subgraph that the field is a part of.
const metaDirectiveName = resolveImportName(
'https://specs.graphql-hive.com/hive',
'@meta',
);
const applyMetaToField = (
fieldNode: FieldDefinitionNode,
metaDirectives: ConstDirectiveNode[],
) => {
return {
...fieldNode,
directives: [
...(fieldNode.directives ?? []),
...metaDirectives.map(d => ({ ...d, loc: undefined })),
],
};
};
const schemaNodes = subgraph.typeDefs.definitions.filter(
d => d.kind === Kind.SCHEMA_DEFINITION || d.kind === Kind.SCHEMA_EXTENSION,
);
const schemaMetaDirectives = schemaNodes
.flatMap(node => node.directives?.filter(d => d.name.value === metaDirectiveName))
.filter(d => d !== undefined);
const interfaceAndObjectHandler = (node: {
readonly fields?: ReadonlyArray<FieldDefinitionNode> | undefined;
readonly directives?: ReadonlyArray<ConstDirectiveNode> | undefined;
readonly name: NameNode;
}) => {
// apply type/interface metadata to fields
const objectMetaDirectives = node.directives
?.filter(d => d.name.value === metaDirectiveName)
.filter(d => d !== undefined);
if (objectMetaDirectives?.length) {
return {
...node,
fields: node.fields?.map(f => applyMetaToField(f, objectMetaDirectives)),
};
}
return node;
};
subgraph.typeDefs = visit(subgraph.typeDefs, {
FieldDefinition: field => {
return applyMetaToField(field, schemaMetaDirectives);
},
ObjectTypeDefinition: interfaceAndObjectHandler,
InterfaceTypeDefinition: interfaceAndObjectHandler,
});
}
return subgraph;
});
/** Determine the correct compose method... */
let compose: (subgraphs: Array<SubgraphInput>) => Promise<
ComposerMethodResult & {
includesException?: boolean;
}
>;
// Federation v2
if (args.native) {
deps.logger.debug(
'Using built-in Federation v2 composition service (schemas=%s)',
args.schemas.length,
);
compose = subgraphs => Promise.resolve(composeFederationV2(subgraphs, deps.logger));
} else if (args.external) {
const { external } = args;
compose = subgraphs =>
composeExternalFederation({
decrypt: deps.decrypt,
external,
logger: deps.logger,
requestId: args.requestId,
subgraphs,
requestTimeoutMs: deps.requestTimeoutMs,
});
} else {
deps.logger.debug(
'Using built-in Federation v1 composition service (schemas=%s)',
args.schemas.length,
);
compose = subgraphs => Promise.resolve(composeFederationV1(subgraphs));
}
let result: CompositionResult;
{
const composed = await compose(subgraphs);
if (composed.type === 'success') {
// merge all metadata from every subgraph by coordinate
const subgraphsMetadata = subgraphs.map(({ name, typeDefs }) =>
extractMetadata(typeDefs, name),
);
const supergraphSDL = parse(composed.result.supergraph);
const { resolveImportName } = extractLinkImplementations(supergraphSDL);
const tagDirectiveName = resolveImportName('https://specs.apollo.dev/tag', '@tag');
const tagStrategy = createTagDirectiveNameExtractionStrategy(tagDirectiveName);
const tags = extractTagsFromDocument(supergraphSDL, tagStrategy);
const schemaMetadata = mergeMetadata(...subgraphsMetadata);
const metadataAttributes = new SetMap<string, string>();
for (const [_coord, attrs] of schemaMetadata) {
for (const attr of attrs) {
metadataAttributes.add(attr.name, attr.content);
}
}
result = {
type: 'success',
result: {
supergraph: composed.result.supergraph,
sdl: composed.result.sdl,
contracts: null,
tags,
schemaMetadata: Object.fromEntries(schemaMetadata),
metadataAttributes: metadataAttributes.toObject(),
},
};
} else {
result = {
type: 'failure',
result: {
supergraph: composed.result.supergraph ?? null,
sdl: composed.result.sdl ?? null,
errors: composed.result.errors,
contracts: null,
includesNetworkError: composed.includesNetworkError,
includesException: composed.includesException ?? false,
tags: null,
schemaMetadata: null,
metadataAttributes: null,
},
};
}
}
if (!args.contracts?.length) {
// if no contracts, then compose and return the result
return result;
}
if (result.type == 'failure') {
return {
...result,
result: {
...result.result,
contracts: args.contracts.map(contract => ({
id: contract.id,
result: {
type: 'failure',
result: {
supergraph: null,
sdl: null,
includesNetworkError: false,
includesException: false,
errors: [
{
message: 'Skipped contract composition, as default graph composition failed.',
source: 'composition',
},
],
},
},
})),
},
};
}
// if there are contracts, then create
const contractResults: Array<ContractResultType> = await Promise.all(
args.contracts.map(async contract => {
// apply contracts to replace tags with inaccessible directives
const filteredSubgraphs = applyTagFilterOnSubgraphs(subgraphs, {
include: new Set(contract.filter.include),
exclude: new Set(contract.filter.exclude),
});
// attempt to compose the contract filtered subgraph
const compositionResult = await compose(filteredSubgraphs);
// Remove unreachable types from public API schema
if (
contract.filter.removeUnreachableTypesFromPublicApiSchema === true &&
compositionResult.type === 'success'
) {
let supergraphSDL = parse(compositionResult.result.supergraph);
const { resolveImportName } = extractLinkImplementations(supergraphSDL);
const result = addInaccessibleToUnreachableTypes(
resolveImportName,
compositionResult,
supergraphSDL,
);
if (result.type === 'success') {
return {
id: contract.id,
result: {
type: 'success',
result: {
supergraph: result.result.supergraph,
sdl: result.result.sdl,
},
},
} satisfies ContractResultSuccess;
}
return {
id: contract.id,
result: {
type: 'failure',
result: {
supergraph: null,
sdl: null,
errors: result.result.errors,
includesNetworkError: false,
includesException: false,
},
},
} satisfies ContractResultFailure;
}
if (compositionResult.type === 'success') {
return {
id: contract.id,
result: {
type: 'success',
result: {
supergraph: compositionResult.result.supergraph,
sdl: compositionResult.result.sdl,
},
},
} satisfies ContractResultSuccess;
}
return {
id: contract.id,
result: {
type: 'failure',
result: {
supergraph: null,
sdl: null,
errors: compositionResult.result.errors,
includesNetworkError: compositionResult.includesNetworkError,
includesException: compositionResult.includesException ?? false,
},
},
} satisfies ContractResultFailure;
}),
);
const networkErrorContract = contractResults.find(
(contract): contract is ContractResultFailure =>
contract.result.type === 'failure' && contract.result.result.includesNetworkError === true,
);
// In case any of the contract composition fails, we will fail the whole composition.
if (networkErrorContract) {
return {
type: 'failure',
result: {
errors: networkErrorContract.result.result.errors,
supergraph: null,
sdl: null,
tags: null,
includesNetworkError: true,
includesException: false,
schemaMetadata: null,
metadataAttributes: null,
contracts: null,
},
};
}
return {
...result,
result: {
...result.result,
contracts: contractResults,
},
};
};

View file

@ -0,0 +1,64 @@
import type { CompositionErrorSource } from '../lib/errors';
import type { Metadata } from '../types';
export type CompositionErrorType = {
message: string;
source: CompositionErrorSource;
};
type ContractResult<T> = {
id: string;
result: T;
};
export type ContractResultSuccess = ContractResult<{
type: 'success';
result: {
supergraph: string;
sdl: string;
};
}>;
export type ContractResultFailure = ContractResult<{
type: 'failure';
result: {
supergraph: string | null;
sdl: string | null;
errors: Array<CompositionErrorType>;
includesNetworkError: boolean;
includesException: boolean;
};
}>;
export type ContractResultType = ContractResultSuccess | ContractResultFailure;
type SharedResult = {
contracts: Array<ContractResultType> | null;
schemaMetadata: Record<string, Metadata[]> | null;
metadataAttributes: Record<string, string[]> | null;
tags: Array<string> | null;
};
export type CompositionResultSuccess = {
type: 'success';
result: {
supergraph: string;
sdl: string;
errors?: never;
includesNetworkError?: never;
includesException?: never;
} & SharedResult;
};
export type CompositionResultFailure = {
type: 'failure';
result: {
supergraph: string | null;
sdl: string | null;
errors: Array<CompositionErrorType>;
includesNetworkError: boolean;
includesException: boolean;
} & SharedResult;
};
export type CompositionResult = CompositionResultSuccess | CompositionResultFailure;

View file

@ -0,0 +1,68 @@
import {
buildASTSchema,
DocumentNode,
GraphQLError,
isTypeSystemExtensionNode,
parse,
print,
validateSchema,
} from 'graphql';
import { validateSDL } from 'graphql/validation/validate.js';
import { mergeTypeDefs } from '@graphql-tools/merge';
import { errorWithSource, type CompositionErrorSource } from '../lib/errors';
import { trimDescriptions } from '../lib/trim-descriptions';
import type { ComposeAndValidateInput } from '../types';
import type { CompositionErrorType } from './shared';
export type ComposeSingleArgs = {
schemas: ComposeAndValidateInput;
};
export async function composeSingle(args: ComposeSingleArgs) {
const schema = args.schemas[0];
let schemaAst = parse(schema.raw);
// If the schema contains type system extension nodes, merge them into the schema.
// We don't want to show many type extension of User, we want to show single User type.
if (schemaAst.definitions.some(isTypeSystemExtensionNode)) {
schemaAst = mergeTypeDefs(schemaAst);
}
const errors: Array<CompositionErrorType> = validateSingleSDL(schemaAst);
return {
errors,
sdl: print(trimDescriptions(schemaAst)),
supergraph: null,
contracts: null,
tags: null,
schemaMetadata: null,
metadataAttributes: null,
};
}
function validateSingleSDL(document: DocumentNode): Array<{
message: string;
source: CompositionErrorSource;
}> {
const errors = validateSDL(document);
if (errors.length) {
return errors.map(errorWithSource('graphql'));
}
try {
const schema = buildASTSchema(document);
const errors = validateSchema(schema);
if (errors.length) {
return errors.map(errorWithSource('graphql'));
}
} catch (err: unknown) {
if (err instanceof GraphQLError) {
return [errorWithSource('graphql')(err)];
}
throw err;
}
return [];
}

View file

@ -0,0 +1,92 @@
import { buildASTSchema, concatAST, DocumentNode, Kind, parse, printSchema } from 'graphql';
import { validateSDL } from 'graphql/validation/validate.js';
import { stitchSchemas } from '@graphql-tools/stitch';
import { stitchingDirectives } from '@graphql-tools/stitching-directives';
import { errorWithSource, toValidationError } from '../lib/errors';
import { trimDescriptions } from '../lib/trim-descriptions';
import type { ComposeAndValidateInput } from '../types';
import type { CompositionErrorType } from './shared';
const { allStitchingDirectivesTypeDefs, stitchingDirectivesValidator } = stitchingDirectives();
const parsedStitchingDirectives = parse(allStitchingDirectivesTypeDefs);
const stitchingDirectivesNames = extractDirectiveNames(parsedStitchingDirectives);
function extractDirectiveNames(doc: DocumentNode) {
const directives: string[] = [];
for (const definition of doc.definitions) {
if (definition.kind === Kind.DIRECTIVE_DEFINITION) {
directives.push(definition.name.value);
}
}
return directives;
}
function definesStitchingDirective(doc: DocumentNode) {
return extractDirectiveNames(doc).some(name => stitchingDirectivesNames.includes(name));
}
function validateStitchedSchema(doc: DocumentNode) {
const definesItsOwnStitchingDirectives = definesStitchingDirective(doc);
const fullDoc = definesItsOwnStitchingDirectives
? doc
: concatAST([parsedStitchingDirectives, doc]);
const errors = validateSDL(fullDoc).map(errorWithSource('graphql'));
// If the schema defines its own stitching directives,
// it means we can't be sure that it follows the official spec.
if (definesItsOwnStitchingDirectives) {
return errors;
}
try {
stitchingDirectivesValidator(
buildASTSchema(fullDoc, {
assumeValid: true,
assumeValidSDL: true,
}),
);
} catch (error) {
errors.push(toValidationError(error, 'composition'));
}
return errors;
}
export type ComposeStitchingArgs = {
schemas: ComposeAndValidateInput;
};
export async function composeStitching(args: ComposeStitchingArgs) {
const parsed = args.schemas.map(s => parse(s.raw));
const errors: Array<CompositionErrorType> = parsed
.map(schema => validateStitchedSchema(schema))
.flat();
let sdl: string | null = null;
try {
sdl = printSchema(
stitchSchemas({
subschemas: args.schemas.map(schema =>
buildASTSchema(trimDescriptions(parse(schema.raw)), {
assumeValid: true,
assumeValidSDL: true,
}),
),
}),
);
} catch (error) {
errors.push(toValidationError(error, 'composition'));
}
return {
errors,
sdl,
supergraph: null,
contracts: null,
tags: null,
schemaMetadata: null,
metadataAttributes: null,
};
}

View file

@ -25,6 +25,8 @@ const EnvironmentModel = zod.object({
ENVIRONMENT: emptyString(zod.string().optional()),
RELEASE: emptyString(zod.string().optional()),
ENCRYPTION_SECRET: zod.string(),
COMPOSITION_WORKER_COUNT: zod.number().min(1).default(4),
COMPOSITION_WORKER_MAX_OLD_GENERATION_SIZE_MB: NumberFromString(1).optional().default(512),
});
const RequestBrokerModel = zod.union([
@ -182,4 +184,8 @@ export const env = {
signature: requestBroker.REQUEST_BROKER_SIGNATURE,
}
: null,
compositionWorker: {
count: base.COMPOSITION_WORKER_COUNT,
maxOldGenerationSizeMb: base.COMPOSITION_WORKER_MAX_OLD_GENERATION_SIZE_MB,
},
} as const;

View file

@ -1,5 +1,5 @@
#!/usr/bin/env node
import crypto from 'node:crypto';
import 'reflect-metadata';
import { hostname } from 'os';
import Redis from 'ioredis';
import {
@ -15,26 +15,9 @@ import {
import * as Sentry from '@sentry/node';
import { Context, schemaBuilderApiRouter } from './api';
import { createCache } from './cache';
import { CompositionScheduler } from './composition-scheduler';
import { env } from './environment';
const ENCRYPTION_SECRET = crypto.createHash('md5').update(env.encryptionSecret).digest('hex');
function decryptFactory() {
const ALG = 'aes256';
const IN_ENC = 'utf8';
const OUT_ENC = 'hex';
const secretBuffer = Buffer.from(ENCRYPTION_SECRET, 'latin1');
return function decrypt(text: string) {
const components = text.split(':');
const iv = Buffer.from(components.shift() || '', OUT_ENC);
const decipher = crypto.createDecipheriv(ALG, secretBuffer, iv);
return decipher.update(components.join(':'), OUT_ENC, IN_ENC) + decipher.final(IN_ENC);
};
}
async function main() {
let tracing: TracingInstance | undefined;
@ -70,6 +53,12 @@ async function main() {
bodyLimit: env.http.bodyLimit,
});
const compositionScheduler = new CompositionScheduler(
server.log,
env.compositionWorker.count,
env.compositionWorker.maxOldGenerationSizeMb,
);
if (tracing) {
await server.register(...tracing.instrumentFastify());
}
@ -122,8 +111,6 @@ async function main() {
server.log.info('Redis reconnecting in %s', timeToReconnect);
});
const decrypt = decryptFactory();
await registerTRPC(server, {
router: schemaBuilderApiRouter,
createContext({ req }): Context {
@ -138,7 +125,7 @@ async function main() {
failure: env.timings.cacheTTL,
},
});
return { cache, req, decrypt, broker: env.requestBroker };
return { cache, req, broker: env.requestBroker, compositionScheduler };
},
});

View file

@ -12,7 +12,6 @@ import {
compositionHasErrors as nativeCompositionHasErrors,
transformSupergraphToPublicSchema,
} from '@theguild/federation-composition';
import type { Cache } from '../cache';
import type { ExternalComposition } from '../types';
import { toValidationError } from './errors';
@ -146,7 +145,7 @@ export async function composeExternalFederation(args: {
subgraphs: Array<SubgraphInput>;
decrypt: (value: string) => string;
external: Exclude<ExternalComposition, null>;
cache: Cache;
requestTimeoutMs: number;
requestId: string;
}): Promise<ComposerMethodResult> {
args.logger.debug(
@ -189,10 +188,10 @@ export async function composeExternalFederation(args: {
...request,
},
args.logger,
args.cache.timeoutMs,
args.requestTimeoutMs,
args.requestId,
)
: callExternalService(request, args.logger, args.cache.timeoutMs));
: callExternalService(request, args.logger, args.requestTimeoutMs));
args.logger.debug('Got response from external composition service, trying to safe parse');
const parseResult = EXTERNAL_COMPOSITION_RESULT.safeParse(externalResponse);

View file

@ -0,0 +1,31 @@
import { visit, type ASTNode, type DocumentNode } from 'graphql';
export function trimDescriptions(doc: DocumentNode): DocumentNode {
function trim<T extends ASTNode>(node: T): T {
if (node && 'description' in node && node.description) {
(node.description as any).value = node.description.value.trim();
}
return node;
}
return visit(doc, {
SchemaDefinition: trim,
ObjectTypeDefinition: trim,
ObjectTypeExtension: trim,
InterfaceTypeExtension: trim,
UnionTypeExtension: trim,
InputObjectTypeExtension: trim,
EnumTypeExtension: trim,
SchemaExtension: trim,
ScalarTypeExtension: trim,
FieldDefinition: trim,
InputValueDefinition: trim,
InterfaceTypeDefinition: trim,
UnionTypeDefinition: trim,
EnumTypeDefinition: trim,
EnumValueDefinition: trim,
InputObjectTypeDefinition: trim,
DirectiveDefinition: trim,
});
}

View file

@ -1,604 +0,0 @@
import type { FastifyRequest } from 'fastify';
import type { ConstDirectiveNode, DocumentNode, FieldDefinitionNode, NameNode } from 'graphql';
import {
ASTNode,
buildASTSchema,
concatAST,
GraphQLError,
isTypeSystemExtensionNode,
Kind,
parse,
print,
printSchema,
validateSchema,
visit,
} from 'graphql';
import { validateSDL } from 'graphql/validation/validate.js';
import { mergeTypeDefs } from '@graphql-tools/merge';
import { stitchSchemas } from '@graphql-tools/stitch';
import { stitchingDirectives } from '@graphql-tools/stitching-directives';
import type { ServiceLogger } from '@hive/service-common';
import { extractLinkImplementations } from '@theguild/federation-composition';
import type { ContractsInputType } from './api';
import type { Cache } from './cache';
import { addInaccessibleToUnreachableTypes } from './lib/add-inaccessible-to-unreachable-types';
import {
composeExternalFederation,
composeFederationV1,
composeFederationV2,
ComposerMethodResult,
SubgraphInput,
} from './lib/compose';
import { CompositionErrorSource, errorWithSource, toValidationError } from './lib/errors';
import {
applyTagFilterOnSubgraphs,
createTagDirectiveNameExtractionStrategy,
extractTagsFromDocument,
} from './lib/federation-tag-extraction';
import { extractMetadata, mergeMetadata } from './lib/metadata-extraction';
import { SetMap } from './lib/setmap';
import type {
ComposeAndValidateInput,
ComposeAndValidateOutput,
ExternalComposition,
Metadata,
SchemaType,
} from './types';
const { allStitchingDirectivesTypeDefs, stitchingDirectivesValidator } = stitchingDirectives();
const parsedStitchingDirectives = parse(allStitchingDirectivesTypeDefs);
const stitchingDirectivesNames = extractDirectiveNames(parsedStitchingDirectives);
function extractDirectiveNames(doc: DocumentNode) {
const directives: string[] = [];
for (const definition of doc.definitions) {
if (definition.kind === Kind.DIRECTIVE_DEFINITION) {
directives.push(definition.name.value);
}
}
return directives;
}
function definesStitchingDirective(doc: DocumentNode) {
return extractDirectiveNames(doc).some(name => stitchingDirectivesNames.includes(name));
}
type CompositionErrorType = {
message: string;
source: 'composition' | 'graphql';
};
type ContractResultType = {
id: string;
result:
| {
type: 'success';
result: {
supergraph: string;
sdl: string;
};
}
| {
type: 'failure';
result: {
supergraph?: string;
sdl?: string;
errors: Array<CompositionErrorType>;
};
};
};
type CompositionResultSuccess = {
type: 'success';
result: {
supergraph: string;
sdl: string;
contracts?: Array<ContractResultType>;
};
};
type CompositionResultFailure = {
type: 'failure';
result: {
supergraph?: string;
sdl?: string;
errors: Array<CompositionErrorType>;
contracts?: Array<ContractResultType>;
};
};
type CompositionResult = CompositionResultSuccess | CompositionResultFailure;
function trimDescriptions(doc: DocumentNode): DocumentNode {
function trim<T extends ASTNode>(node: T): T {
if (node && 'description' in node && node.description) {
(node.description as any).value = node.description.value.trim();
}
return node;
}
return visit(doc, {
SchemaDefinition: trim,
ObjectTypeDefinition: trim,
ObjectTypeExtension: trim,
InterfaceTypeExtension: trim,
UnionTypeExtension: trim,
InputObjectTypeExtension: trim,
EnumTypeExtension: trim,
SchemaExtension: trim,
ScalarTypeExtension: trim,
FieldDefinition: trim,
InputValueDefinition: trim,
InterfaceTypeDefinition: trim,
UnionTypeDefinition: trim,
EnumTypeDefinition: trim,
EnumValueDefinition: trim,
InputObjectTypeDefinition: trim,
DirectiveDefinition: trim,
});
}
interface Orchestrator {
composeAndValidate(
input: ComposeAndValidateInput,
external: ExternalComposition,
native: boolean,
contracts?: ContractsInputType,
): Promise<ComposeAndValidateOutput>;
}
const createFederation: (
cache: Cache,
logger: ServiceLogger,
requestId: string,
decrypt: (value: string) => string,
) => Orchestrator = (cache, logger, requestId, decrypt) => {
const compose = cache.reuse<
{
schemas: ComposeAndValidateInput;
external: ExternalComposition;
native: boolean;
contracts: ContractsInputType | undefined;
},
CompositionResult & {
includesNetworkError: boolean;
includesException?: boolean;
tags: Array<string> | null;
schemaMetadata: Record<string, Metadata[]> | null;
metadataAttributes: Record<string, string[]> | null;
}
>(
'federation',
async ({ schemas, external, native, contracts }) => {
const subgraphs = schemas
.map(schema => {
logger.debug(`Parsing subgraph schema SDL (name=%s)`, schema.source);
return {
typeDefs: trimDescriptions(parse(schema.raw)),
name: schema.source,
url: 'url' in schema && typeof schema.url === 'string' ? schema.url : undefined,
};
})
.map(subgraph => {
logger.debug(`Extracting link implementations from subgraph (name=%s)`, subgraph.name);
const { matchesImplementation, resolveImportName } = extractLinkImplementations(
subgraph.typeDefs,
);
if (matchesImplementation('https://specs.graphql-hive.com/hive', 'v1.0')) {
logger.debug(
`Found hive link in subgraph. Applying link logic. (name=%s)`,
subgraph.name,
);
// if this subgraph implements the metadata spec
// then copy metadata from the schema to all fields.
// @note this is similar to how federation's compose copies join__ directives to fields based on the
// subgraph that the field is a part of.
const metaDirectiveName = resolveImportName(
'https://specs.graphql-hive.com/hive',
'@meta',
);
const applyMetaToField = (
fieldNode: FieldDefinitionNode,
metaDirectives: ConstDirectiveNode[],
) => {
return {
...fieldNode,
directives: [
...(fieldNode.directives ?? []),
...metaDirectives.map(d => ({ ...d, loc: undefined })),
],
};
};
const schemaNodes = subgraph.typeDefs.definitions.filter(
d => d.kind === Kind.SCHEMA_DEFINITION || d.kind === Kind.SCHEMA_EXTENSION,
);
const schemaMetaDirectives = schemaNodes
.flatMap(node => node.directives?.filter(d => d.name.value === metaDirectiveName))
.filter(d => d !== undefined);
const interfaceAndObjectHandler = (node: {
readonly fields?: ReadonlyArray<FieldDefinitionNode> | undefined;
readonly directives?: ReadonlyArray<ConstDirectiveNode> | undefined;
readonly name: NameNode;
}) => {
// apply type/interface metadata to fields
const objectMetaDirectives = node.directives
?.filter(d => d.name.value === metaDirectiveName)
.filter(d => d !== undefined);
if (objectMetaDirectives?.length) {
return {
...node,
fields: node.fields?.map(f => applyMetaToField(f, objectMetaDirectives)),
};
}
return node;
};
subgraph.typeDefs = visit(subgraph.typeDefs, {
FieldDefinition: field => {
return applyMetaToField(field, schemaMetaDirectives);
},
ObjectTypeDefinition: interfaceAndObjectHandler,
InterfaceTypeDefinition: interfaceAndObjectHandler,
});
}
return subgraph;
});
/** Determine the correct compose method... */
let compose: (subgraphs: Array<SubgraphInput>) => Promise<ComposerMethodResult>;
// Federation v2
if (native) {
logger.debug(
'Using built-in Federation v2 composition service (schemas=%s)',
schemas.length,
);
compose = subgraphs => Promise.resolve(composeFederationV2(subgraphs, logger));
} else if (external) {
compose = subgraphs =>
composeExternalFederation({
cache,
decrypt,
external,
logger,
requestId,
subgraphs,
});
} else {
logger.debug(
'Using built-in Federation v1 composition service (schemas=%s)',
schemas.length,
);
compose = subgraphs => Promise.resolve(composeFederationV1(subgraphs));
}
let result: CompositionResult & {
includesNetworkError: boolean;
tags: Array<string> | null;
/** Metadata stored by coordinate and then by subgraph */
schemaMetadata: Record<string, Metadata[]> | null;
metadataAttributes: Record<string, string[]> | null;
};
{
const composed: CompositionResult & {
includesNetworkError: boolean;
includesException?: boolean;
} = await compose(subgraphs);
if (composed.type === 'success') {
// merge all metadata from every subgraph by coordinate
const subgraphsMetadata = subgraphs.map(({ name, typeDefs }) =>
extractMetadata(typeDefs, name),
);
const supergraphSDL = parse(composed.result.supergraph);
const { resolveImportName } = extractLinkImplementations(supergraphSDL);
const tagDirectiveName = resolveImportName('https://specs.apollo.dev/tag', '@tag');
const tagStrategy = createTagDirectiveNameExtractionStrategy(tagDirectiveName);
const tags = extractTagsFromDocument(supergraphSDL, tagStrategy);
const schemaMetadata = mergeMetadata(...subgraphsMetadata);
const metadataAttributes = new SetMap<string, string>();
for (const [_coord, attrs] of schemaMetadata) {
for (const attr of attrs) {
metadataAttributes.add(attr.name, attr.content);
}
}
result = {
...composed,
tags,
schemaMetadata: Object.fromEntries(schemaMetadata),
metadataAttributes: metadataAttributes.toObject(),
};
} else {
result = {
...composed,
tags: null,
schemaMetadata: null,
metadataAttributes: null,
};
}
}
if (!contracts?.length) {
// if no contracts, then compose and return the result
return result;
}
if (result.type == 'failure') {
return {
...result,
result: {
...result.result,
contracts: contracts.map(contract => ({
id: contract.id,
result: {
type: 'failure',
result: {
errors: [
{
message: 'Skipped contract composition, as default graph composition failed.',
source: 'composition',
},
],
},
},
})),
},
};
}
// if there are contracts, then create
const contractResults = await Promise.all(
contracts.map(async contract => {
// apply contracts to replace tags with inaccessible directives
const filteredSubgraphs = applyTagFilterOnSubgraphs(subgraphs, {
include: new Set(contract.filter.include),
exclude: new Set(contract.filter.exclude),
});
// attempt to compose the contract filtered subgraph
const compositionResult = await compose(filteredSubgraphs);
// Remove unreachable types from public API schema
if (
contract.filter.removeUnreachableTypesFromPublicApiSchema === true &&
compositionResult.type === 'success'
) {
let supergraphSDL = parse(compositionResult.result.supergraph);
const { resolveImportName } = extractLinkImplementations(supergraphSDL);
return {
id: contract.id,
result: addInaccessibleToUnreachableTypes(
resolveImportName,
compositionResult,
supergraphSDL,
),
};
}
return {
id: contract.id,
result: compositionResult,
};
}),
);
const networkErrorContract = contractResults.find(
contract => contract.result.includesNetworkError === true,
);
// In case any of the contract composition fails, we will fail the whole composition.
if (networkErrorContract) {
return {
...networkErrorContract.result,
schemaMetadata: null,
tags: null,
metadataAttributes: null,
};
}
return {
...result,
result: {
supergraph: result.result.supergraph,
sdl: result.result.sdl,
contracts: contractResults,
},
};
},
function pickCacheType(result) {
return ('includesNetworkError' in result && result.includesNetworkError === true) ||
('includesException' in result && result.includesException === true)
? 'short'
: 'long';
},
);
return {
async composeAndValidate(schemas, external, native, contracts) {
try {
const composed = await compose({ schemas, external, native, contracts });
return {
errors: composed.type === 'failure' ? composed.result.errors : [],
sdl: composed.result.sdl ?? null,
supergraph: composed.result.supergraph ?? null,
includesNetworkError:
composed.type === 'failure' && composed.includesNetworkError === true,
contracts:
composed.result.contracts?.map(contract => ({
id: contract.id,
errors: 'errors' in contract.result.result ? contract.result.result.errors : [],
sdl: contract.result.result.sdl ?? null,
supergraph: contract.result.result.supergraph ?? null,
})) ?? null,
tags: composed.tags ?? null,
schemaMetadata: composed.schemaMetadata ?? null,
metadataAttributes: composed.metadataAttributes ?? null,
};
} catch (error) {
if (cache.isTimeoutError(error)) {
return {
errors: [
{
message: error.message,
source: 'graphql',
},
],
sdl: null,
supergraph: null,
includesNetworkError: true,
contracts: null,
tags: null,
schemaMetadata: null,
metadataAttributes: null,
};
}
throw error;
}
},
};
};
function validateSingleSDL(document: DocumentNode): Array<{
message: string;
source: CompositionErrorSource;
}> {
const errors = validateSDL(document);
if (errors.length) {
return errors.map(errorWithSource('graphql'));
}
try {
const schema = buildASTSchema(document);
const errors = validateSchema(schema);
if (errors.length) {
return errors.map(errorWithSource('graphql'));
}
} catch (err: unknown) {
if (err instanceof GraphQLError) {
return [errorWithSource('graphql')(err)];
}
throw err;
}
return [];
}
function createSingle(): Orchestrator {
return {
async composeAndValidate(schemas) {
const schema = schemas[0];
let schemaAst = parse(schema.raw);
// If the schema contains type system extension nodes, merge them into the schema.
// We don't want to show many type extension of User, we want to show single User type.
if (schemaAst.definitions.some(isTypeSystemExtensionNode)) {
schemaAst = mergeTypeDefs(schemaAst);
}
const errors = validateSingleSDL(schemaAst);
return {
errors,
sdl: print(trimDescriptions(schemaAst)),
supergraph: null,
contracts: null,
tags: null,
schemaMetadata: null,
metadataAttributes: null,
};
},
};
}
const createStitching: (cache: Cache) => Orchestrator = cache => {
const stitchAndPrint = cache.reuse('stitching', async (schemas: string[]) => {
return printSchema(
stitchSchemas({
subschemas: schemas.map(schema =>
buildASTSchema(trimDescriptions(parse(schema)), {
assumeValid: true,
assumeValidSDL: true,
}),
),
}),
);
});
return {
async composeAndValidate(schemas) {
const parsed = schemas.map(s => parse(s.raw));
const errors = parsed.map(schema => validateStitchedSchema(schema)).flat();
let sdl: string | null = null;
try {
sdl = await stitchAndPrint(schemas.map(s => s.raw));
} catch (error) {
errors.push(toValidationError(error, 'composition'));
}
return {
errors,
sdl,
supergraph: null,
contracts: null,
tags: null,
schemaMetadata: null,
metadataAttributes: null,
};
},
};
};
function validateStitchedSchema(doc: DocumentNode) {
const definesItsOwnStitchingDirectives = definesStitchingDirective(doc);
const fullDoc = definesItsOwnStitchingDirectives
? doc
: concatAST([parsedStitchingDirectives, doc]);
const errors = validateSDL(fullDoc).map(errorWithSource('graphql'));
// If the schema defines its own stitching directives,
// it means we can't be sure that it follows the official spec.
if (definesItsOwnStitchingDirectives) {
return errors;
}
try {
stitchingDirectivesValidator(
buildASTSchema(fullDoc, {
assumeValid: true,
assumeValidSDL: true,
}),
);
} catch (error) {
errors.push(toValidationError(error, 'composition'));
}
return errors;
}
export function pickOrchestrator(
type: SchemaType,
cache: Cache,
req: FastifyRequest,
decrypt: (value: string) => string,
) {
switch (type) {
case 'federation':
return createFederation(
cache,
req.log,
req.id ?? Math.random().toString(16).substring(2),
decrypt,
);
case 'single':
return createSingle();
case 'stitching':
return createStitching(cache);
default:
throw new Error(`Unknown schema type: ${type}`);
}
}

View file

@ -1,41 +1,14 @@
import 'reflect-metadata';
import { parentPort } from 'node:worker_threads';
import { Logger } from '@hive/api';
import { createWorker } from '../../api/src/modules/app-deployments/worker/persisted-documents-worker';
import { createMessagePortLogger } from '../../api/src/modules/shared/providers/logger';
import { env } from './environment';
if (!parentPort) {
throw new Error('This script must be run as a worker.');
}
const pp = parentPort;
function createLogger(bindings: Record<string, unknown> = {}): Logger {
return {
child(newBindings) {
return createLogger({ ...bindings, ...newBindings });
},
debug(...args) {
pp.postMessage({ event: 'log', level: 'debug', args, bindings });
},
error(...args) {
pp.postMessage({ event: 'log', level: 'error', args, bindings });
},
fatal(...args) {
pp.postMessage({ event: 'log', level: 'fatal', args, bindings });
},
info(...args) {
pp.postMessage({ event: 'log', level: 'info', args, bindings });
},
trace(...args) {
pp.postMessage({ event: 'log', level: 'trace', args, bindings });
},
warn(...args) {
pp.postMessage({ event: 'log', level: 'warn', args, bindings });
},
};
}
createWorker(parentPort, createLogger(), {
createWorker(parentPort, createMessagePortLogger(parentPort), {
clickhouse: env.clickhouse,
s3: env.s3,
s3Mirror: env.s3Mirror,

View file

@ -1171,6 +1171,9 @@ importers:
fastify:
specifier: 4.29.0
version: 4.29.0
fastq:
specifier: 1.19.1
version: 1.19.1
got:
specifier: 14.4.7
version: 14.4.7(patch_hash=f7660444905ddadee251ff98241119fb54f5fec1e673a428192da361d5636299)
@ -1189,6 +1192,9 @@ importers:
pino-pretty:
specifier: 11.3.0
version: 11.3.0
reflect-metadata:
specifier: 0.2.2
version: 0.2.2
zod:
specifier: 3.24.1
version: 3.24.1
@ -10235,8 +10241,8 @@ packages:
fastify@4.29.0:
resolution: {integrity: sha512-MaaUHUGcCgC8fXQDsDtioaCcag1fmPJ9j64vAKunqZF4aSub040ZGi/ag8NGE2714yREPOKZuHCfpPzuUD3UQQ==}
fastq@1.17.1:
resolution: {integrity: sha512-sRVD3lWVIXWg6By68ZN7vho9a1pQcN/WBFaAAsDDFzlJjvoGx0P8z7V1t72grFJfJhu3YPZBuu25f7Kaw2jN1w==}
fastq@1.19.1:
resolution: {integrity: sha512-GwLTyxkCXjXbxqIhTsMI2Nui8huMPtnxg7krajPJAjnEG/iiOS7i+zCtWGZR9G0NBKbXKh6X9m9UIsYX/N6vvQ==}
fault@2.0.1:
resolution: {integrity: sha512-WtySTkS4OKev5JtpHXnib4Gxiurzh5NCGvWrFaZ34m6JehfTUhKZvn9njTfw48t6JumVQOmrKqpmGcdwxnhqBQ==}
@ -18389,7 +18395,7 @@ snapshots:
'@fastify/send': 2.1.0
content-disposition: 0.5.4
fastify-plugin: 4.5.1
fastq: 1.17.1
fastq: 1.19.1
glob: 10.3.12
'@fastify/vite@6.0.7(patch_hash=f5ce073a4db250ed3db1d9c19e2a253418454ee379530bdee25869570d7b500b)(@types/node@22.10.5)(less@4.2.0)(lightningcss@1.28.2)(terser@5.37.0)':
@ -20259,7 +20265,7 @@ snapshots:
'@nodelib/fs.walk@1.2.8':
dependencies:
'@nodelib/fs.scandir': 2.1.5
fastq: 1.17.1
fastq: 1.19.1
'@nodesecure/estree-ast-utils@1.5.0':
dependencies:
@ -25181,7 +25187,7 @@ snapshots:
avvio@8.4.0:
dependencies:
'@fastify/error': 3.4.1
fastq: 1.17.1
fastq: 1.19.1
aws-sign2@0.7.0: {}
@ -27365,7 +27371,7 @@ snapshots:
semver: 7.6.3
toad-cache: 3.7.0
fastq@1.17.1:
fastq@1.19.1:
dependencies:
reusify: 1.0.4