From 23d09836dfaca03da14eadb60b58d595f334fa2f Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Wed, 28 Aug 2024 21:48:57 +0200 Subject: [PATCH] feat: CDN request retry race from multiple S3 buckets (#5543) Co-authored-by: Kamil Kisiela --- .eslintrc.cjs | 1 - deployment/index.ts | 1 + deployment/services/cf-cdn.ts | 3 + deployment/utils/cloudflare.ts | 21 + packages/services/cdn-worker/package.json | 1 + packages/services/cdn-worker/src/analytics.ts | 6 +- .../cdn-worker/src/artifact-storage-reader.ts | 338 +++++++----- packages/services/cdn-worker/src/aws.ts | 65 ++- packages/services/cdn-worker/src/dev.ts | 2 +- packages/services/cdn-worker/src/index.ts | 22 +- .../services/cdn-worker/tests/cdn.spec.ts | 484 +++++++++++++++++- packages/services/server/src/index.ts | 14 +- pnpm-lock.yaml | 4 + 13 files changed, 825 insertions(+), 137 deletions(-) diff --git a/.eslintrc.cjs b/.eslintrc.cjs index b4ec0cff5..567256893 100644 --- a/.eslintrc.cjs +++ b/.eslintrc.cjs @@ -17,7 +17,6 @@ const rulesToExtends = Object.fromEntries( 'import/first', 'no-restricted-globals', '@typescript-eslint/no-unused-vars', - 'unicorn/no-useless-fallback-in-spread', 'unicorn/no-array-push-push', 'no-else-return', 'no-lonely-if', diff --git a/deployment/index.ts b/deployment/index.ts index 1d864397c..07041491e 100644 --- a/deployment/index.ts +++ b/deployment/index.ts @@ -84,6 +84,7 @@ const s3Mirror = deployS3Mirror(); const cdn = deployCFCDN({ s3, + s3Mirror, sentry, environment, }); diff --git a/deployment/services/cf-cdn.ts b/deployment/services/cf-cdn.ts index b8ea51dfd..d556d85dc 100644 --- a/deployment/services/cf-cdn.ts +++ b/deployment/services/cf-cdn.ts @@ -15,10 +15,12 @@ export class CDNSecret extends ServiceSecret<{ export function deployCFCDN({ environment, s3, + s3Mirror, sentry, }: { environment: Environment; s3: S3; + s3Mirror: S3; sentry: Sentry; }) { const cfConfig = new pulumi.Config('cloudflareCustom'); @@ -35,6 +37,7 @@ export function deployCFCDN({ sentryDsn: sentry.enabled && sentry.secret ? sentry.secret?.raw.dsn : '', release: environment.release, s3, + s3Mirror, }); const deployedCdn = cdn.deploy(); diff --git a/deployment/utils/cloudflare.ts b/deployment/utils/cloudflare.ts index 7b5d79853..8ae1d079a 100644 --- a/deployment/utils/cloudflare.ts +++ b/deployment/utils/cloudflare.ts @@ -13,6 +13,7 @@ export class CloudflareCDN { sentryDsn: string | pulumi.Output; release: string; s3: S3; + s3Mirror: S3; }, ) {} @@ -54,6 +55,10 @@ export class CloudflareCDN { name: 'R2_ANALYTICS', dataset: `hive_ha_cdn_r2_${this.config.envName}`, }, + { + name: 'S3_ANALYTICS', + dataset: `hive_ha_cdn_s3_${this.config.envName}`, + }, { name: 'RESPONSE_ANALYTICS', dataset: `hive_ha_cdn_response_${this.config.envName}`, @@ -88,6 +93,22 @@ export class CloudflareCDN { name: 'S3_BUCKET_NAME', text: this.config.s3.secret.raw.bucket, }, + { + name: 'S3_MIRROR_ENDPOINT', + text: this.config.s3Mirror.secret.raw.endpoint, + }, + { + name: 'S3_MIRROR_ACCESS_KEY_ID', + text: this.config.s3Mirror.secret.raw.accessKeyId, + }, + { + name: 'S3_MIRROR_SECRET_ACCESS_KEY', + text: this.config.s3Mirror.secret.raw.secretAccessKey, + }, + { + name: 'S3_MIRROR_BUCKET_NAME', + text: this.config.s3Mirror.secret.raw.bucket, + }, ], }); diff --git a/packages/services/cdn-worker/package.json b/packages/services/cdn-worker/package.json index fb813e14e..f8f4ffd67 100644 --- a/packages/services/cdn-worker/package.json +++ b/packages/services/cdn-worker/package.json @@ -29,6 +29,7 @@ "itty-router": "4.2.2", "itty-router-extras": "0.4.6", "toucan-js": "3.4.0", + "undici": "6.19.8", "zod": "3.23.8" } } diff --git a/packages/services/cdn-worker/src/analytics.ts b/packages/services/cdn-worker/src/analytics.ts index dd23d2307..4e918e075 100644 --- a/packages/services/cdn-worker/src/analytics.ts +++ b/packages/services/cdn-worker/src/analytics.ts @@ -55,7 +55,7 @@ type Event = value: [string, string] | [string]; } | { - type: 'r2'; + type: 'r2' | 's3'; action: | 'GET artifact' | 'GET cdn-legacy-keys' @@ -79,6 +79,7 @@ export function createAnalytics( error: AnalyticsEngine; keyValidation: AnalyticsEngine; r2: AnalyticsEngine; + s3: AnalyticsEngine; response: AnalyticsEngine; } | null = null, ) { @@ -99,7 +100,8 @@ export function createAnalytics( blobs: event.value, }); case 'r2': - return engines.r2.writeDataPoint({ + case 's3': + return engines[event.type].writeDataPoint({ blobs: [event.action, event.statusCodeOrErrCode.toString(), targetId], doubles: [event.duration], indexes: [targetId.substring(0, 32)], diff --git a/packages/services/cdn-worker/src/artifact-storage-reader.ts b/packages/services/cdn-worker/src/artifact-storage-reader.ts index f0005e340..8e7abdeb7 100644 --- a/packages/services/cdn-worker/src/artifact-storage-reader.ts +++ b/packages/services/cdn-worker/src/artifact-storage-reader.ts @@ -19,9 +19,6 @@ type SDLArtifactTypes = `sdl${'.graphql' | '.graphqls' | ''}`; export type ArtifactsType = SDLArtifactTypes | 'metadata' | 'services' | 'supergraph'; -/** Timeout in milliseconds for S3 read calls. */ -const READ_TIMEOUT_MS = 5_000; - const OperationS3BucketKeyModel = zod.tuple([ zod.string().uuid(), zod.string().min(1), @@ -65,14 +62,137 @@ export class ArtifactStorageReader { endpoint: string; bucketName: string; }, - // private s3Mirror: { - // client: AwsClient; - // endpoint: string; - // bucketName: string; - // }, + private s3Mirror: { + client: AwsClient; + endpoint: string; + bucketName: string; + } | null, private analytics: Analytics | null, + /** Timeout in milliseconds for S3 read calls. */ + private timeout: number = 5_000, ) {} + /** + * Perform a request to S3, with retries and optional mirror. + * If the initial request to primary fails, a race between mirror and primary is performed. + * The first successful response is returned. + */ + private request(args: { + /** S3 key in bucket */ + key: string; + method: 'GET' | 'HEAD' | 'POST'; + headers?: HeadersInit; + onAttempt: (args: { + /** whether the attempt is for the mirror */ + isMirror: boolean; + /** attempt number */ + attempt: number; + /** attempt duration in ms */ + duration: number; + /** result */ + result: + | { + // HTTP or other unexpected error + type: 'error'; + error: Error; + } + | { + // HTTP response sent by upstream server + type: 'success'; + response: Response; + }; + }) => void; + }) { + return this.s3.client + .fetch([this.s3.endpoint, this.s3.bucketName, args.key].join('/'), { + method: args.method, + headers: args.headers, + aws: { + signQuery: true, + }, + timeout: this.timeout, + retries: this.s3Mirror ? 1 : undefined, + isResponseOk: response => + response.status === 200 || response.status === 304 || response.status === 404, + onAttempt: args1 => { + args.onAttempt({ + ...args1, + isMirror: false, + }); + }, + }) + .catch(err => { + if (this.s3Mirror) { + // Use two AbortSignals to avoid a situation + // where Response.body is consumed, + // but the request was aborted after being resolved. + // When a fetch call is resolved successfully, + // but a shared AbortSignal.cancel() is called for two fetches, + // it causes an exception (can't read a response from an aborted requests) + // when Response.body is consumed. + const primaryController = new AbortController(); + const mirrorController = new AbortController(); + + function abortOtherRequest(ctrl: AbortController) { + return (res: Response) => { + // abort other pending request + const error = new Error('Another request won the race.'); + // change the name so we have some metrics for this on our analytics dashboard + error.name = 'AbortError'; + ctrl.abort(error); + + return res; + }; + } + + // Wait for the first successful response + // or reject if both requests fail + return Promise.any([ + this.s3.client + .fetch([this.s3.endpoint, this.s3.bucketName, args.key].join('/'), { + method: args.method, + headers: args.headers, + aws: { + signQuery: true, + }, + timeout: this.timeout, + signal: primaryController.signal, + isResponseOk: response => + response.status === 200 || response.status === 304 || response.status === 404, + onAttempt: args1 => { + args.onAttempt({ + ...args1, + isMirror: false, + }); + }, + }) + .then(abortOtherRequest(mirrorController)), + this.s3Mirror.client + .fetch([this.s3Mirror.endpoint, this.s3Mirror.bucketName, args.key].join('/'), { + method: args.method, + headers: args.headers, + aws: { + signQuery: true, + }, + timeout: this.timeout, + signal: mirrorController.signal, + isResponseOk: response => + response.status === 200 || response.status === 304 || response.status === 404, + onAttempt: args1 => { + args.onAttempt({ + ...args1, + isMirror: true, + }); + }, + }) + .then(abortOtherRequest(primaryController)), + ]); + } + + return Promise.reject(err); + }); + } + /** Read an artifact from S3 */ async readArtifact( targetId: string, @@ -92,31 +212,25 @@ export class ArtifactStorageReader { headers['if-none-match'] = etagValue; } - const response = await this.s3.client.fetch( - [this.s3.endpoint, this.s3.bucketName, key].join('/'), - { - method: 'GET', - headers, - aws: { - signQuery: true, - }, - timeout: READ_TIMEOUT_MS, - onAttempt: args => { - this.analytics?.track( - { - type: 'r2', - statusCodeOrErrCode: - args.result.type === 'error' - ? String(args.result.error.name ?? 'unknown') - : args.result.response.status, - action: 'GET artifact', - duration: args.duration, - }, - targetId, - ); - }, + const response = await this.request({ + key, + method: 'GET', + headers, + onAttempt: args => { + this.analytics?.track( + { + type: args.isMirror ? 's3' : 'r2', + statusCodeOrErrCode: + args.result.type === 'error' + ? String(args.result.error.name ?? 'unknown') + : args.result.response.status, + action: 'GET artifact', + duration: args.duration, + }, + targetId, + ); }, - ); + }); if (response.status === 404) { return { type: 'notFound' } as const; @@ -142,30 +256,24 @@ export class ArtifactStorageReader { async isAppDeploymentEnabled(targetId: string, appName: string, appVersion: string) { const key = buildAppDeploymentIsEnabledKey(targetId, appName, appVersion); - const response = await this.s3.client.fetch( - [this.s3.endpoint, this.s3.bucketName, key].join('/'), - { - method: 'HEAD', - aws: { - signQuery: true, - }, - timeout: READ_TIMEOUT_MS, - onAttempt: args => { - this.analytics?.track( - { - type: 'r2', - statusCodeOrErrCode: - args.result.type === 'error' - ? String(args.result.error.name ?? 'unknown') - : args.result.response.status, - action: 'HEAD appDeploymentIsEnabled', - duration: args.duration, - }, - targetId, - ); - }, + const response = await this.request({ + key, + method: 'HEAD', + onAttempt: args => { + this.analytics?.track( + { + type: args.isMirror ? 's3' : 'r2', + statusCodeOrErrCode: + args.result.type === 'error' + ? String(args.result.error.name ?? 'unknown') + : args.result.response.status, + action: 'HEAD appDeploymentIsEnabled', + duration: args.duration, + }, + targetId, + ); }, - ); + }); return response.status === 200; } @@ -184,31 +292,25 @@ export class ArtifactStorageReader { headers['if-none-match'] = etagValue; } - const response = await this.s3.client.fetch( - [this.s3.endpoint, this.s3.bucketName, key].join('/'), - { - method: 'GET', - aws: { - signQuery: true, - }, - headers, - timeout: READ_TIMEOUT_MS, - onAttempt: args => { - this.analytics?.track( - { - type: 'r2', - statusCodeOrErrCode: - args.result.type === 'error' - ? String(args.result.error.name ?? 'unknown') - : args.result.response.status, - action: 'GET persistedOperation', - duration: args.duration, - }, - targetId, - ); - }, + const response = await this.request({ + key, + method: 'GET', + headers, + onAttempt: args => { + this.analytics?.track( + { + type: args.isMirror ? 's3' : 'r2', + statusCodeOrErrCode: + args.result.type === 'error' + ? String(args.result.error.name ?? 'unknown') + : args.result.response.status, + action: 'GET persistedOperation', + duration: args.duration, + }, + targetId, + ); }, - ); + }); if (etagValue && response.status === 304) { return { type: 'notModified' } as const; @@ -231,27 +333,24 @@ export class ArtifactStorageReader { } async readLegacyAccessKey(targetId: string) { - const response = await this.s3.client.fetch( - [this.s3.endpoint, this.s3.bucketName, 'cdn-legacy-keys', targetId].join('/'), - { - method: 'GET', - timeout: READ_TIMEOUT_MS, - onAttempt: args => { - this.analytics?.track( - { - type: 'r2', - statusCodeOrErrCode: - args.result.type === 'error' - ? String(args.result.error.name ?? 'unknown') - : args.result.response.status, - action: 'GET cdn-legacy-keys', - duration: args.duration, - }, - targetId, - ); - }, + const response = await this.request({ + key: ['cdn-legacy-keys', targetId].join('/'), + method: 'GET', + onAttempt: args => { + this.analytics?.track( + { + type: args.isMirror ? 's3' : 'r2', + statusCodeOrErrCode: + args.result.type === 'error' + ? String(args.result.error.name ?? 'unknown') + : args.result.response.status, + action: 'GET cdn-legacy-keys', + duration: args.duration, + }, + targetId, + ); }, - ); + }); return response; } @@ -259,31 +358,24 @@ export class ArtifactStorageReader { async readAccessKey(targetId: string, keyId: string) { const s3KeyParts = ['cdn-keys', targetId, keyId]; - const response = await this.s3.client.fetch( - [this.s3.endpoint, this.s3.bucketName, ...s3KeyParts].join('/'), - { - method: 'GET', - aws: { - // This boolean makes Google Cloud Storage & AWS happy. - signQuery: true, - }, - timeout: READ_TIMEOUT_MS, - onAttempt: args => { - this.analytics?.track( - { - type: 'r2', - statusCodeOrErrCode: - args.result.type === 'error' - ? String(args.result.error.name ?? 'unknown') - : args.result.response.status, - action: 'GET cdn-access-token', - duration: args.duration, - }, - targetId, - ); - }, + const response = await this.request({ + key: s3KeyParts.join('/'), + method: 'GET', + onAttempt: args => { + this.analytics?.track( + { + type: args.isMirror ? 'r2' : 's3', + statusCodeOrErrCode: + args.result.type === 'error' + ? String(args.result.error.name ?? 'unknown') + : args.result.response.status, + action: 'GET cdn-access-token', + duration: args.duration, + }, + targetId, + ); }, - ); + }); return response; } diff --git a/packages/services/cdn-worker/src/aws.ts b/packages/services/cdn-worker/src/aws.ts index 9c835dfeb..2f7925a2e 100644 --- a/packages/services/cdn-worker/src/aws.ts +++ b/packages/services/cdn-worker/src/aws.ts @@ -49,6 +49,15 @@ type AwsRequestInit = RequestInit & { * Timeout in milliseconds for each fetch call. */ timeout?: number; + /** + * Abort signal for the fetch call and potential retries. + * Retries will not be attempted if the signal is already aborted. + */ + signal?: AbortSignal; + /** + * Overwrite the amount of retries + */ + retries?: number; /** Hook being invoked for each attempt for gathering analytics or similar. */ onAttempt?: (args: { /** attempt number */ @@ -68,6 +77,8 @@ type AwsRequestInit = RequestInit & { response: Response; }; }) => void; + /** Custom verifying function on whether the response is okay. */ + isResponseOk?: (response: Response) => boolean; }; export type AWSClientConfig = { @@ -126,9 +137,13 @@ export class AwsClient { input = url; } const signer = new AwsV4Signer(Object.assign({ url: input }, init, this, init && init.aws)); + const signed = Object.assign( { - signal: init?.timeout ? AbortSignal.timeout(init.timeout) : undefined, + signal: anySignal([ + init?.timeout ? AbortSignal.timeout(init.timeout) : undefined, + init?.signal, + ]), }, init, await signer.sign(), @@ -147,17 +162,27 @@ export class AwsClient { } async fetch(input: RequestInfo, init: AwsRequestInit): Promise { - for (let i = 0; i <= this.retries; i++) { + const maximumRetryCount = init.retries ?? this.retries; + + for (let retryCounter = 0; retryCounter <= maximumRetryCount; retryCounter++) { const attemptStart = performance.now(); try { const response = await this._fetch(...(await this.sign(input, init))); const duration = performance.now() - attemptStart; - init.onAttempt?.({ attempt: i, duration, result: { type: 'success', response } }); + init.onAttempt?.({ + attempt: retryCounter, + duration, + result: { type: 'success', response }, + }); if ( (response.status < 500 && response.status !== 429 && response.status !== 499) || - i === this.retries + retryCounter === maximumRetryCount ) { + if (init.isResponseOk && !init.isResponseOk(response)) { + throw new Error(`Response not okay, status: ${response.status}`); + } + return response; } } catch (error) { @@ -165,17 +190,21 @@ export class AwsClient { // Retry also when there's an exception console.error(error); init.onAttempt?.({ - attempt: i, + attempt: retryCounter, duration, result: { type: 'error', error: error as Error }, }); - if (i === this.retries) { + if ( + retryCounter === maximumRetryCount || + // If the signal was aborted, we don't want to retry + init.signal?.aborted === true + ) { throw error; } } await new Promise(resolve => - setTimeout(resolve, Math.random() * this.initRetryMs * Math.pow(2, i)), + setTimeout(resolve, Math.random() * this.initRetryMs * Math.pow(2, retryCounter)), ); } throw new Error('An unknown error occurred, ensure retries is not negative'); @@ -505,3 +534,25 @@ function guessServiceRegion(url: URL, headers: Headers) { return [HOST_SERVICES[service] || service, region]; } + +function anySignal(signals: Array) { + const controller = new AbortController(); + + function onAbort(reason: unknown) { + controller.abort(reason); + } + + for (const signal of signals) { + if (!signal) { + continue; + } + + if (signal.aborted) { + onAbort(signal.reason); + break; + } + signal.addEventListener('abort', onAbort); + } + + return controller.signal; +} diff --git a/packages/services/cdn-worker/src/dev.ts b/packages/services/cdn-worker/src/dev.ts index 3837ef2db..a969c6aa8 100644 --- a/packages/services/cdn-worker/src/dev.ts +++ b/packages/services/cdn-worker/src/dev.ts @@ -23,7 +23,7 @@ const s3 = { // eslint-disable-next-line no-process-env const PORT = process.env.PORT ? parseInt(process.env.PORT, 10) : 4010; -const artifactStorageReader = new ArtifactStorageReader(s3, null); +const artifactStorageReader = new ArtifactStorageReader(s3, null, null); const handleRequest = createRequestHandler({ isKeyValid: createIsKeyValid({ diff --git a/packages/services/cdn-worker/src/index.ts b/packages/services/cdn-worker/src/index.ts index a42e2ff5a..20ec4bd99 100644 --- a/packages/services/cdn-worker/src/index.ts +++ b/packages/services/cdn-worker/src/index.ts @@ -16,6 +16,13 @@ type Env = { S3_SECRET_ACCESS_KEY: string; S3_BUCKET_NAME: string; S3_SESSION_TOKEN?: string; + + S3_MIRROR_ENDPOINT: string; + S3_MIRROR_ACCESS_KEY_ID: string; + S3_MIRROR_SECRET_ACCESS_KEY: string; + S3_MIRROR_BUCKET_NAME: string; + S3_MIRROR_SESSION_TOKEN?: string; + SENTRY_DSN: string; /** * Name of the environment, e.g. staging, production @@ -32,6 +39,7 @@ type Env = { ERROR_ANALYTICS: AnalyticsEngine; RESPONSE_ANALYTICS: AnalyticsEngine; R2_ANALYTICS: AnalyticsEngine; + S3_ANALYTICS: AnalyticsEngine; KEY_VALIDATION_ANALYTICS: AnalyticsEngine; }; @@ -48,15 +56,27 @@ const handler: ExportedHandler = { endpoint: env.S3_ENDPOINT, }; + const s3Mirror = { + client: new AwsClient({ + accessKeyId: env.S3_MIRROR_ACCESS_KEY_ID, + secretAccessKey: env.S3_MIRROR_SECRET_ACCESS_KEY, + sessionToken: env.S3_MIRROR_SESSION_TOKEN, + service: 's3', + }), + bucketName: env.S3_MIRROR_BUCKET_NAME, + endpoint: env.S3_MIRROR_ENDPOINT, + }; + const analytics = createAnalytics({ usage: env.USAGE_ANALYTICS, error: env.ERROR_ANALYTICS, keyValidation: env.KEY_VALIDATION_ANALYTICS, response: env.RESPONSE_ANALYTICS, r2: env.R2_ANALYTICS, + s3: env.S3_ANALYTICS, }); - const artifactStorageReader = new ArtifactStorageReader(s3, analytics); + const artifactStorageReader = new ArtifactStorageReader(s3, s3Mirror, analytics); const isKeyValid = createIsKeyValid({ waitUntil: p => ctx.waitUntil(p), diff --git a/packages/services/cdn-worker/tests/cdn.spec.ts b/packages/services/cdn-worker/tests/cdn.spec.ts index 9789e822d..55cd3b4b4 100644 --- a/packages/services/cdn-worker/tests/cdn.spec.ts +++ b/packages/services/cdn-worker/tests/cdn.spec.ts @@ -1,9 +1,12 @@ import { createHmac } from 'node:crypto'; import * as bcrypt from 'bcryptjs'; import '../src/dev-polyfill'; +import { MockAgent, MockPool, fetch as undiciFetch } from 'undici'; // eslint-disable-next-line import/no-extraneous-dependencies -import { describe, expect, test } from 'vitest'; +import { afterAll, afterEach, beforeEach, describe, expect, test } from 'vitest'; import { ArtifactStorageReader } from '../src/artifact-storage-reader'; +import { AwsClient } from '../src/aws'; +import { encodeCdnToken } from '../src/cdn-token'; import { InvalidArtifactTypeResponse, InvalidAuthKeyResponse, @@ -26,6 +29,24 @@ describe('CDN Worker', () => { return createHmac('sha256', secretKeyData).update(encoder.encode(targetId)).digest('base64'); } + async function createV2Token(targetId: string) { + const SECRET = '123456'; + const tokenKeyId = 'secret-key'; + const secret = createToken(SECRET, targetId); + const key = encodeCdnToken({ + privateKey: secret, + keyId: tokenKeyId, + }); + + const hash = await bcrypt.hash(secret, await bcrypt.genSalt()); + + return { + key, + keyId: tokenKeyId, + hash, + }; + } + test('in /schema and /metadata the response should contain content-type: application/json header', async () => { const SECRET = '123456'; const targetId = 'fake-target-id'; @@ -62,6 +83,7 @@ describe('CDN Worker', () => { } as any, }, null, + null, ), }), async getArtifactAction(targetId, _, artifactType) { @@ -141,6 +163,7 @@ describe('CDN Worker', () => { } as any, }, null, + null, ), }), async getArtifactAction(targetId, _, artifactType) { @@ -236,6 +259,7 @@ describe('CDN Worker', () => { } as any, }, null, + null, ), }), async getArtifactAction(targetId, _, artifactType) { @@ -315,6 +339,7 @@ describe('CDN Worker', () => { } as any, }, null, + null, ), }), async getArtifactAction(targetId, _, artifactType) { @@ -400,6 +425,7 @@ describe('CDN Worker', () => { } as any, }, null, + null, ), }), async getArtifactAction(targetId, _, artifactType) { @@ -483,6 +509,7 @@ describe('CDN Worker', () => { } as any, }, null, + null, ), }), async getArtifactAction(targetId, _, artifactType) { @@ -651,6 +678,7 @@ describe('CDN Worker', () => { } as any, }, null, + null, ), }), async getArtifactAction(targetId, _, artifactType) { @@ -702,6 +730,7 @@ describe('CDN Worker', () => { } as any, }, null, + null, ), }), async getArtifactAction(targetId, _, artifactType) { @@ -751,6 +780,7 @@ describe('CDN Worker', () => { } as any, }, null, + null, ), }), async getArtifactAction() { @@ -774,4 +804,456 @@ describe('CDN Worker', () => { expect(response.status).toBe(403); }); }); + + describe('use S3', () => { + const mockAgent = new MockAgent(); + let r2MockPool: MockPool; + let s3MockPool: MockPool; + const r2Endpoint = 'http://localhost:3002'; + const s3Endpoint = 'http://localhost:3003'; + + const TIMEOUT = 200; + const DELAY = TIMEOUT + 100; + + const mockedFetch = (input: any, init?: any) => { + // Use undici's fetch with custom dispatcher to mock the network + return undiciFetch(input as any, { + ...(init ?? {}), + dispatcher: mockAgent, + }) as Promise; + }; + + beforeEach(() => { + r2MockPool = mockAgent.get(r2Endpoint); + s3MockPool = mockAgent.get(s3Endpoint); + }); + + afterEach(async () => { + await r2MockPool?.close(); + await s3MockPool?.close(); + mockAgent.assertNoPendingInterceptors(); + }); + afterAll(() => mockAgent.close()); + + test('when fetching access key from R2 takes longer than a timeout', async () => { + const targetId = 'fake-target-id'; + const services = [{ sdl: `type Query { dummy: String }` }]; + + const access = await createV2Token(targetId); + + // Fetching the key from R2 takes longer than the timeout + r2MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: access.hash, + }; + }) + .delay(DELAY); + + s3MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: access.hash, + }; + }); + + s3MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/artifact/${targetId}/services`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: JSON.stringify(services), + }; + }); + + const artifactStorageReader = new ArtifactStorageReader( + { + endpoint: r2Endpoint, + bucketName: 'artifacts', + client: new AwsClient({ + accessKeyId: 'r2-fake-access-key', + secretAccessKey: 'r2-fake-secret-key', + sessionToken: 'r2-fake-session-token', + service: 's3', + fetch: mockedFetch, + }), + }, + { + endpoint: s3Endpoint, + bucketName: 'artifacts', + client: new AwsClient({ + accessKeyId: 's3-fake-access-key', + secretAccessKey: 's3-fake-secret-key', + sessionToken: 's3-fake-session-token', + service: 's3', + fetch: mockedFetch, + }), + }, + null, + TIMEOUT, + ); + + const handleRequest = createRequestHandler({ + isKeyValid: createIsKeyValid({ + getCache: null, + waitUntil: null, + analytics: null, + artifactStorageReader, + }), + async getArtifactAction(targetId, contractName, artifactType, eTag) { + return artifactStorageReader.readArtifact(targetId, contractName, artifactType, eTag); + }, + async fetchText(url) { + return mockedFetch(url).then(r => { + if (r.ok) { + return r.text(); + } + + throw new Error(`Failed to fetch ${url}, status: ${r.status}`); + }); + }, + }); + + const firstRequest = new Request(`https://fake-worker.com/${targetId}/schema`, { + headers: { + 'x-hive-cdn-key': access.key, + }, + }); + + const response = await handleRequest(firstRequest); + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual([{ sdl: `type Query { dummy: String }` }]); + }); + + test('when fetching artifact from R2 takes longer than a timeout', async () => { + const targetId = 'fake-target-id'; + const services = [{ sdl: `type Query { dummy: String }` }]; + + const access = await createV2Token(targetId); + + // Fetching the key is instant + r2MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: access.hash, + }; + }); + + // Fetching the artifact from R2 takes longer than the timeout + r2MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/artifact/${targetId}/services`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: JSON.stringify(services), + }; + }) + .delay(DELAY); + + s3MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/artifact/${targetId}/services`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: JSON.stringify(services), + }; + }); + + const artifactStorageReader = new ArtifactStorageReader( + { + endpoint: r2Endpoint, + bucketName: 'artifacts', + client: new AwsClient({ + accessKeyId: 'r2-fake-access-key', + secretAccessKey: 'r2-fake-secret-key', + sessionToken: 'r2-fake-session-token', + service: 's3', + fetch: mockedFetch, + }), + }, + { + endpoint: s3Endpoint, + bucketName: 'artifacts', + client: new AwsClient({ + accessKeyId: 's3-fake-access-key', + secretAccessKey: 's3-fake-secret-key', + sessionToken: 's3-fake-session-token', + service: 's3', + fetch: mockedFetch, + }), + }, + null, + TIMEOUT, + ); + + const handleRequest = createRequestHandler({ + isKeyValid: createIsKeyValid({ + getCache: null, + waitUntil: null, + analytics: null, + artifactStorageReader, + }), + async getArtifactAction(targetId, contractName, artifactType, eTag) { + return artifactStorageReader.readArtifact(targetId, contractName, artifactType, eTag); + }, + async fetchText(url) { + return mockedFetch(url).then(r => { + if (r.ok) { + return r.text(); + } + + throw new Error(`Failed to fetch ${url}, status: ${r.status}`); + }); + }, + }); + + const firstRequest = new Request(`https://fake-worker.com/${targetId}/schema`, { + headers: { + 'x-hive-cdn-key': access.key, + }, + }); + + const response = await handleRequest(firstRequest); + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual([{ sdl: `type Query { dummy: String }` }]); + }); + + test('when R2 is down and access key fails to be fetched', async () => { + const targetId = 'fake-target-id'; + const services = [{ sdl: `type Query { dummy: String }` }]; + + const access = await createV2Token(targetId); + + // R2 is down and we fail to fetch the key + r2MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`); + }, + }) + .reply(() => { + return { + statusCode: 500, + data: 'Please try again later', + }; + }); + + s3MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: access.hash, + }; + }); + + s3MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/artifact/${targetId}/services`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: JSON.stringify(services), + }; + }); + + const artifactStorageReader = new ArtifactStorageReader( + { + endpoint: r2Endpoint, + bucketName: 'artifacts', + client: new AwsClient({ + accessKeyId: 'r2-fake-access-key', + secretAccessKey: 'r2-fake-secret-key', + sessionToken: 'r2-fake-session-token', + service: 's3', + fetch: mockedFetch, + }), + }, + { + endpoint: s3Endpoint, + bucketName: 'artifacts', + client: new AwsClient({ + accessKeyId: 's3-fake-access-key', + secretAccessKey: 's3-fake-secret-key', + sessionToken: 's3-fake-session-token', + service: 's3', + fetch: mockedFetch, + }), + }, + null, + ); + + const handleRequest = createRequestHandler({ + isKeyValid: createIsKeyValid({ + getCache: null, + waitUntil: null, + analytics: null, + artifactStorageReader, + }), + async getArtifactAction(targetId, contractName, artifactType, eTag) { + return artifactStorageReader.readArtifact(targetId, contractName, artifactType, eTag); + }, + async fetchText(url) { + return mockedFetch(url).then(r => { + if (r.ok) { + return r.text(); + } + + throw new Error(`Failed to fetch ${url}, status: ${r.status}`); + }); + }, + }); + + const firstRequest = new Request(`https://fake-worker.com/${targetId}/schema`, { + headers: { + 'x-hive-cdn-key': access.key, + }, + }); + + const response = await handleRequest(firstRequest); + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual([{ sdl: `type Query { dummy: String }` }]); + }); + + test('when R2 is down after we got the access key', async () => { + const targetId = 'fake-target-id'; + const services = [{ sdl: `type Query { dummy: String }` }]; + + const access = await createV2Token(targetId); + + // R2 is down and we fail to fetch the key + r2MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: access.hash, + }; + }); + + r2MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/artifact/${targetId}/services`); + }, + }) + .reply(() => { + return { + statusCode: 500, + data: 'We are so down', + }; + }); + + s3MockPool + .intercept({ + path(path) { + return path.startsWith(`/artifacts/artifact/${targetId}/services`); + }, + }) + .reply(() => { + return { + statusCode: 200, + data: JSON.stringify(services), + }; + }); + + const artifactStorageReader = new ArtifactStorageReader( + { + endpoint: r2Endpoint, + bucketName: 'artifacts', + client: new AwsClient({ + accessKeyId: 'r2-fake-access-key', + secretAccessKey: 'r2-fake-secret-key', + sessionToken: 'r2-fake-session-token', + service: 's3', + fetch: mockedFetch, + }), + }, + { + endpoint: s3Endpoint, + bucketName: 'artifacts', + client: new AwsClient({ + accessKeyId: 's3-fake-access-key', + secretAccessKey: 's3-fake-secret-key', + sessionToken: 's3-fake-session-token', + service: 's3', + fetch: mockedFetch, + }), + }, + null, + ); + + const handleRequest = createRequestHandler({ + isKeyValid: createIsKeyValid({ + getCache: null, + waitUntil: null, + analytics: null, + artifactStorageReader, + }), + async getArtifactAction(targetId, contractName, artifactType, eTag) { + return artifactStorageReader.readArtifact(targetId, contractName, artifactType, eTag); + }, + async fetchText(url) { + return mockedFetch(url).then(r => { + if (r.ok) { + return r.text(); + } + + throw new Error(`Failed to fetch ${url}, status: ${r.status}`); + }); + }, + }); + + const firstRequest = new Request(`https://fake-worker.com/${targetId}/schema`, { + headers: { + 'x-hive-cdn-key': access.key, + }, + }); + + const response = await handleRequest(firstRequest); + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual([{ sdl: `type Query { dummy: String }` }]); + }); + }); }); diff --git a/packages/services/server/src/index.ts b/packages/services/server/src/index.ts index d5ccfbd99..a9b0ed4d2 100644 --- a/packages/services/server/src/index.ts +++ b/packages/services/server/src/index.ts @@ -539,7 +539,19 @@ export async function main() { bucketName: env.s3.bucketName, }; - const artifactStorageReader = new ArtifactStorageReader(s3, null); + const s3Mirror = env.s3Mirror + ? { + client: new AwsClient({ + accessKeyId: env.s3Mirror.credentials.accessKeyId, + secretAccessKey: env.s3Mirror.credentials.secretAccessKey, + service: 's3', + }), + endpoint: env.s3Mirror.endpoint, + bucketName: env.s3Mirror.bucketName, + } + : null; + + const artifactStorageReader = new ArtifactStorageReader(s3, s3Mirror, null); const artifactHandler = createArtifactRequestHandler({ isKeyValid: createIsKeyValid({ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f89cddc14..263a2b6bc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -877,6 +877,9 @@ importers: toucan-js: specifier: 3.4.0 version: 3.4.0 + undici: + specifier: 6.19.8 + version: 6.19.8 zod: specifier: 3.23.8 version: 3.23.8 @@ -4161,6 +4164,7 @@ packages: '@fastify/vite@6.0.7': resolution: {integrity: sha512-+dRo9KUkvmbqdmBskG02SwigWl06Mwkw8SBDK1zTNH6vd4DyXbRvI7RmJEmBkLouSU81KTzy1+OzwHSffqSD6w==} + bundledDependencies: [] '@floating-ui/core@1.2.6': resolution: {integrity: sha512-EvYTiXet5XqweYGClEmpu3BoxmsQ4hkj3QaYA6qEnigCWffTP3vNRwBReTdrwDwo7OoJ3wM8Uoe9Uk4n+d4hfg==}