feat: CDN request retry race from multiple S3 buckets (#5543)

Co-authored-by: Kamil Kisiela <kamil.kisiela@gmail.com>
This commit is contained in:
Laurin Quast 2024-08-28 21:48:57 +02:00 committed by GitHub
parent 9eb5dcc405
commit 23d09836df
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 825 additions and 137 deletions

View file

@ -17,7 +17,6 @@ const rulesToExtends = Object.fromEntries(
'import/first',
'no-restricted-globals',
'@typescript-eslint/no-unused-vars',
'unicorn/no-useless-fallback-in-spread',
'unicorn/no-array-push-push',
'no-else-return',
'no-lonely-if',

View file

@ -84,6 +84,7 @@ const s3Mirror = deployS3Mirror();
const cdn = deployCFCDN({
s3,
s3Mirror,
sentry,
environment,
});

View file

@ -15,10 +15,12 @@ export class CDNSecret extends ServiceSecret<{
export function deployCFCDN({
environment,
s3,
s3Mirror,
sentry,
}: {
environment: Environment;
s3: S3;
s3Mirror: S3;
sentry: Sentry;
}) {
const cfConfig = new pulumi.Config('cloudflareCustom');
@ -35,6 +37,7 @@ export function deployCFCDN({
sentryDsn: sentry.enabled && sentry.secret ? sentry.secret?.raw.dsn : '',
release: environment.release,
s3,
s3Mirror,
});
const deployedCdn = cdn.deploy();

View file

@ -13,6 +13,7 @@ export class CloudflareCDN {
sentryDsn: string | pulumi.Output<string>;
release: string;
s3: S3;
s3Mirror: S3;
},
) {}
@ -54,6 +55,10 @@ export class CloudflareCDN {
name: 'R2_ANALYTICS',
dataset: `hive_ha_cdn_r2_${this.config.envName}`,
},
{
name: 'S3_ANALYTICS',
dataset: `hive_ha_cdn_s3_${this.config.envName}`,
},
{
name: 'RESPONSE_ANALYTICS',
dataset: `hive_ha_cdn_response_${this.config.envName}`,
@ -88,6 +93,22 @@ export class CloudflareCDN {
name: 'S3_BUCKET_NAME',
text: this.config.s3.secret.raw.bucket,
},
{
name: 'S3_MIRROR_ENDPOINT',
text: this.config.s3Mirror.secret.raw.endpoint,
},
{
name: 'S3_MIRROR_ACCESS_KEY_ID',
text: this.config.s3Mirror.secret.raw.accessKeyId,
},
{
name: 'S3_MIRROR_SECRET_ACCESS_KEY',
text: this.config.s3Mirror.secret.raw.secretAccessKey,
},
{
name: 'S3_MIRROR_BUCKET_NAME',
text: this.config.s3Mirror.secret.raw.bucket,
},
],
});

View file

@ -29,6 +29,7 @@
"itty-router": "4.2.2",
"itty-router-extras": "0.4.6",
"toucan-js": "3.4.0",
"undici": "6.19.8",
"zod": "3.23.8"
}
}

View file

@ -55,7 +55,7 @@ type Event =
value: [string, string] | [string];
}
| {
type: 'r2';
type: 'r2' | 's3';
action:
| 'GET artifact'
| 'GET cdn-legacy-keys'
@ -79,6 +79,7 @@ export function createAnalytics(
error: AnalyticsEngine;
keyValidation: AnalyticsEngine;
r2: AnalyticsEngine;
s3: AnalyticsEngine;
response: AnalyticsEngine;
} | null = null,
) {
@ -99,7 +100,8 @@ export function createAnalytics(
blobs: event.value,
});
case 'r2':
return engines.r2.writeDataPoint({
case 's3':
return engines[event.type].writeDataPoint({
blobs: [event.action, event.statusCodeOrErrCode.toString(), targetId],
doubles: [event.duration],
indexes: [targetId.substring(0, 32)],

View file

@ -19,9 +19,6 @@ type SDLArtifactTypes = `sdl${'.graphql' | '.graphqls' | ''}`;
export type ArtifactsType = SDLArtifactTypes | 'metadata' | 'services' | 'supergraph';
/** Timeout in milliseconds for S3 read calls. */
const READ_TIMEOUT_MS = 5_000;
const OperationS3BucketKeyModel = zod.tuple([
zod.string().uuid(),
zod.string().min(1),
@ -65,14 +62,137 @@ export class ArtifactStorageReader {
endpoint: string;
bucketName: string;
},
// private s3Mirror: {
// client: AwsClient;
// endpoint: string;
// bucketName: string;
// },
private s3Mirror: {
client: AwsClient;
endpoint: string;
bucketName: string;
} | null,
private analytics: Analytics | null,
/** Timeout in milliseconds for S3 read calls. */
private timeout: number = 5_000,
) {}
/**
* Perform a request to S3, with retries and optional mirror.
* If the initial request to primary fails, a race between mirror and primary is performed.
* The first successful response is returned.
*/
private request(args: {
/** S3 key in bucket */
key: string;
method: 'GET' | 'HEAD' | 'POST';
headers?: HeadersInit;
onAttempt: (args: {
/** whether the attempt is for the mirror */
isMirror: boolean;
/** attempt number */
attempt: number;
/** attempt duration in ms */
duration: number;
/** result */
result:
| {
// HTTP or other unexpected error
type: 'error';
error: Error;
}
| {
// HTTP response sent by upstream server
type: 'success';
response: Response;
};
}) => void;
}) {
return this.s3.client
.fetch([this.s3.endpoint, this.s3.bucketName, args.key].join('/'), {
method: args.method,
headers: args.headers,
aws: {
signQuery: true,
},
timeout: this.timeout,
retries: this.s3Mirror ? 1 : undefined,
isResponseOk: response =>
response.status === 200 || response.status === 304 || response.status === 404,
onAttempt: args1 => {
args.onAttempt({
...args1,
isMirror: false,
});
},
})
.catch(err => {
if (this.s3Mirror) {
// Use two AbortSignals to avoid a situation
// where Response.body is consumed,
// but the request was aborted after being resolved.
// When a fetch call is resolved successfully,
// but a shared AbortSignal.cancel() is called for two fetches,
// it causes an exception (can't read a response from an aborted requests)
// when Response.body is consumed.
const primaryController = new AbortController();
const mirrorController = new AbortController();
function abortOtherRequest(ctrl: AbortController) {
return (res: Response) => {
// abort other pending request
const error = new Error('Another request won the race.');
// change the name so we have some metrics for this on our analytics dashboard
error.name = 'AbortError';
ctrl.abort(error);
return res;
};
}
// Wait for the first successful response
// or reject if both requests fail
return Promise.any([
this.s3.client
.fetch([this.s3.endpoint, this.s3.bucketName, args.key].join('/'), {
method: args.method,
headers: args.headers,
aws: {
signQuery: true,
},
timeout: this.timeout,
signal: primaryController.signal,
isResponseOk: response =>
response.status === 200 || response.status === 304 || response.status === 404,
onAttempt: args1 => {
args.onAttempt({
...args1,
isMirror: false,
});
},
})
.then(abortOtherRequest(mirrorController)),
this.s3Mirror.client
.fetch([this.s3Mirror.endpoint, this.s3Mirror.bucketName, args.key].join('/'), {
method: args.method,
headers: args.headers,
aws: {
signQuery: true,
},
timeout: this.timeout,
signal: mirrorController.signal,
isResponseOk: response =>
response.status === 200 || response.status === 304 || response.status === 404,
onAttempt: args1 => {
args.onAttempt({
...args1,
isMirror: true,
});
},
})
.then(abortOtherRequest(primaryController)),
]);
}
return Promise.reject(err);
});
}
/** Read an artifact from S3 */
async readArtifact(
targetId: string,
@ -92,31 +212,25 @@ export class ArtifactStorageReader {
headers['if-none-match'] = etagValue;
}
const response = await this.s3.client.fetch(
[this.s3.endpoint, this.s3.bucketName, key].join('/'),
{
method: 'GET',
headers,
aws: {
signQuery: true,
},
timeout: READ_TIMEOUT_MS,
onAttempt: args => {
this.analytics?.track(
{
type: 'r2',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'GET artifact',
duration: args.duration,
},
targetId,
);
},
const response = await this.request({
key,
method: 'GET',
headers,
onAttempt: args => {
this.analytics?.track(
{
type: args.isMirror ? 's3' : 'r2',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'GET artifact',
duration: args.duration,
},
targetId,
);
},
);
});
if (response.status === 404) {
return { type: 'notFound' } as const;
@ -142,30 +256,24 @@ export class ArtifactStorageReader {
async isAppDeploymentEnabled(targetId: string, appName: string, appVersion: string) {
const key = buildAppDeploymentIsEnabledKey(targetId, appName, appVersion);
const response = await this.s3.client.fetch(
[this.s3.endpoint, this.s3.bucketName, key].join('/'),
{
method: 'HEAD',
aws: {
signQuery: true,
},
timeout: READ_TIMEOUT_MS,
onAttempt: args => {
this.analytics?.track(
{
type: 'r2',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'HEAD appDeploymentIsEnabled',
duration: args.duration,
},
targetId,
);
},
const response = await this.request({
key,
method: 'HEAD',
onAttempt: args => {
this.analytics?.track(
{
type: args.isMirror ? 's3' : 'r2',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'HEAD appDeploymentIsEnabled',
duration: args.duration,
},
targetId,
);
},
);
});
return response.status === 200;
}
@ -184,31 +292,25 @@ export class ArtifactStorageReader {
headers['if-none-match'] = etagValue;
}
const response = await this.s3.client.fetch(
[this.s3.endpoint, this.s3.bucketName, key].join('/'),
{
method: 'GET',
aws: {
signQuery: true,
},
headers,
timeout: READ_TIMEOUT_MS,
onAttempt: args => {
this.analytics?.track(
{
type: 'r2',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'GET persistedOperation',
duration: args.duration,
},
targetId,
);
},
const response = await this.request({
key,
method: 'GET',
headers,
onAttempt: args => {
this.analytics?.track(
{
type: args.isMirror ? 's3' : 'r2',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'GET persistedOperation',
duration: args.duration,
},
targetId,
);
},
);
});
if (etagValue && response.status === 304) {
return { type: 'notModified' } as const;
@ -231,27 +333,24 @@ export class ArtifactStorageReader {
}
async readLegacyAccessKey(targetId: string) {
const response = await this.s3.client.fetch(
[this.s3.endpoint, this.s3.bucketName, 'cdn-legacy-keys', targetId].join('/'),
{
method: 'GET',
timeout: READ_TIMEOUT_MS,
onAttempt: args => {
this.analytics?.track(
{
type: 'r2',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'GET cdn-legacy-keys',
duration: args.duration,
},
targetId,
);
},
const response = await this.request({
key: ['cdn-legacy-keys', targetId].join('/'),
method: 'GET',
onAttempt: args => {
this.analytics?.track(
{
type: args.isMirror ? 's3' : 'r2',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'GET cdn-legacy-keys',
duration: args.duration,
},
targetId,
);
},
);
});
return response;
}
@ -259,31 +358,24 @@ export class ArtifactStorageReader {
async readAccessKey(targetId: string, keyId: string) {
const s3KeyParts = ['cdn-keys', targetId, keyId];
const response = await this.s3.client.fetch(
[this.s3.endpoint, this.s3.bucketName, ...s3KeyParts].join('/'),
{
method: 'GET',
aws: {
// This boolean makes Google Cloud Storage & AWS happy.
signQuery: true,
},
timeout: READ_TIMEOUT_MS,
onAttempt: args => {
this.analytics?.track(
{
type: 'r2',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'GET cdn-access-token',
duration: args.duration,
},
targetId,
);
},
const response = await this.request({
key: s3KeyParts.join('/'),
method: 'GET',
onAttempt: args => {
this.analytics?.track(
{
type: args.isMirror ? 'r2' : 's3',
statusCodeOrErrCode:
args.result.type === 'error'
? String(args.result.error.name ?? 'unknown')
: args.result.response.status,
action: 'GET cdn-access-token',
duration: args.duration,
},
targetId,
);
},
);
});
return response;
}

View file

@ -49,6 +49,15 @@ type AwsRequestInit = RequestInit & {
* Timeout in milliseconds for each fetch call.
*/
timeout?: number;
/**
* Abort signal for the fetch call and potential retries.
* Retries will not be attempted if the signal is already aborted.
*/
signal?: AbortSignal;
/**
* Overwrite the amount of retries
*/
retries?: number;
/** Hook being invoked for each attempt for gathering analytics or similar. */
onAttempt?: (args: {
/** attempt number */
@ -68,6 +77,8 @@ type AwsRequestInit = RequestInit & {
response: Response;
};
}) => void;
/** Custom verifying function on whether the response is okay. */
isResponseOk?: (response: Response) => boolean;
};
export type AWSClientConfig = {
@ -126,9 +137,13 @@ export class AwsClient {
input = url;
}
const signer = new AwsV4Signer(Object.assign({ url: input }, init, this, init && init.aws));
const signed = Object.assign(
{
signal: init?.timeout ? AbortSignal.timeout(init.timeout) : undefined,
signal: anySignal([
init?.timeout ? AbortSignal.timeout(init.timeout) : undefined,
init?.signal,
]),
},
init,
await signer.sign(),
@ -147,17 +162,27 @@ export class AwsClient {
}
async fetch(input: RequestInfo, init: AwsRequestInit): Promise<Response> {
for (let i = 0; i <= this.retries; i++) {
const maximumRetryCount = init.retries ?? this.retries;
for (let retryCounter = 0; retryCounter <= maximumRetryCount; retryCounter++) {
const attemptStart = performance.now();
try {
const response = await this._fetch(...(await this.sign(input, init)));
const duration = performance.now() - attemptStart;
init.onAttempt?.({ attempt: i, duration, result: { type: 'success', response } });
init.onAttempt?.({
attempt: retryCounter,
duration,
result: { type: 'success', response },
});
if (
(response.status < 500 && response.status !== 429 && response.status !== 499) ||
i === this.retries
retryCounter === maximumRetryCount
) {
if (init.isResponseOk && !init.isResponseOk(response)) {
throw new Error(`Response not okay, status: ${response.status}`);
}
return response;
}
} catch (error) {
@ -165,17 +190,21 @@ export class AwsClient {
// Retry also when there's an exception
console.error(error);
init.onAttempt?.({
attempt: i,
attempt: retryCounter,
duration,
result: { type: 'error', error: error as Error },
});
if (i === this.retries) {
if (
retryCounter === maximumRetryCount ||
// If the signal was aborted, we don't want to retry
init.signal?.aborted === true
) {
throw error;
}
}
await new Promise(resolve =>
setTimeout(resolve, Math.random() * this.initRetryMs * Math.pow(2, i)),
setTimeout(resolve, Math.random() * this.initRetryMs * Math.pow(2, retryCounter)),
);
}
throw new Error('An unknown error occurred, ensure retries is not negative');
@ -505,3 +534,25 @@ function guessServiceRegion(url: URL, headers: Headers) {
return [HOST_SERVICES[service] || service, region];
}
function anySignal(signals: Array<AbortSignal | undefined>) {
const controller = new AbortController();
function onAbort(reason: unknown) {
controller.abort(reason);
}
for (const signal of signals) {
if (!signal) {
continue;
}
if (signal.aborted) {
onAbort(signal.reason);
break;
}
signal.addEventListener('abort', onAbort);
}
return controller.signal;
}

View file

@ -23,7 +23,7 @@ const s3 = {
// eslint-disable-next-line no-process-env
const PORT = process.env.PORT ? parseInt(process.env.PORT, 10) : 4010;
const artifactStorageReader = new ArtifactStorageReader(s3, null);
const artifactStorageReader = new ArtifactStorageReader(s3, null, null);
const handleRequest = createRequestHandler({
isKeyValid: createIsKeyValid({

View file

@ -16,6 +16,13 @@ type Env = {
S3_SECRET_ACCESS_KEY: string;
S3_BUCKET_NAME: string;
S3_SESSION_TOKEN?: string;
S3_MIRROR_ENDPOINT: string;
S3_MIRROR_ACCESS_KEY_ID: string;
S3_MIRROR_SECRET_ACCESS_KEY: string;
S3_MIRROR_BUCKET_NAME: string;
S3_MIRROR_SESSION_TOKEN?: string;
SENTRY_DSN: string;
/**
* Name of the environment, e.g. staging, production
@ -32,6 +39,7 @@ type Env = {
ERROR_ANALYTICS: AnalyticsEngine;
RESPONSE_ANALYTICS: AnalyticsEngine;
R2_ANALYTICS: AnalyticsEngine;
S3_ANALYTICS: AnalyticsEngine;
KEY_VALIDATION_ANALYTICS: AnalyticsEngine;
};
@ -48,15 +56,27 @@ const handler: ExportedHandler<Env> = {
endpoint: env.S3_ENDPOINT,
};
const s3Mirror = {
client: new AwsClient({
accessKeyId: env.S3_MIRROR_ACCESS_KEY_ID,
secretAccessKey: env.S3_MIRROR_SECRET_ACCESS_KEY,
sessionToken: env.S3_MIRROR_SESSION_TOKEN,
service: 's3',
}),
bucketName: env.S3_MIRROR_BUCKET_NAME,
endpoint: env.S3_MIRROR_ENDPOINT,
};
const analytics = createAnalytics({
usage: env.USAGE_ANALYTICS,
error: env.ERROR_ANALYTICS,
keyValidation: env.KEY_VALIDATION_ANALYTICS,
response: env.RESPONSE_ANALYTICS,
r2: env.R2_ANALYTICS,
s3: env.S3_ANALYTICS,
});
const artifactStorageReader = new ArtifactStorageReader(s3, analytics);
const artifactStorageReader = new ArtifactStorageReader(s3, s3Mirror, analytics);
const isKeyValid = createIsKeyValid({
waitUntil: p => ctx.waitUntil(p),

View file

@ -1,9 +1,12 @@
import { createHmac } from 'node:crypto';
import * as bcrypt from 'bcryptjs';
import '../src/dev-polyfill';
import { MockAgent, MockPool, fetch as undiciFetch } from 'undici';
// eslint-disable-next-line import/no-extraneous-dependencies
import { describe, expect, test } from 'vitest';
import { afterAll, afterEach, beforeEach, describe, expect, test } from 'vitest';
import { ArtifactStorageReader } from '../src/artifact-storage-reader';
import { AwsClient } from '../src/aws';
import { encodeCdnToken } from '../src/cdn-token';
import {
InvalidArtifactTypeResponse,
InvalidAuthKeyResponse,
@ -26,6 +29,24 @@ describe('CDN Worker', () => {
return createHmac('sha256', secretKeyData).update(encoder.encode(targetId)).digest('base64');
}
async function createV2Token(targetId: string) {
const SECRET = '123456';
const tokenKeyId = 'secret-key';
const secret = createToken(SECRET, targetId);
const key = encodeCdnToken({
privateKey: secret,
keyId: tokenKeyId,
});
const hash = await bcrypt.hash(secret, await bcrypt.genSalt());
return {
key,
keyId: tokenKeyId,
hash,
};
}
test('in /schema and /metadata the response should contain content-type: application/json header', async () => {
const SECRET = '123456';
const targetId = 'fake-target-id';
@ -62,6 +83,7 @@ describe('CDN Worker', () => {
} as any,
},
null,
null,
),
}),
async getArtifactAction(targetId, _, artifactType) {
@ -141,6 +163,7 @@ describe('CDN Worker', () => {
} as any,
},
null,
null,
),
}),
async getArtifactAction(targetId, _, artifactType) {
@ -236,6 +259,7 @@ describe('CDN Worker', () => {
} as any,
},
null,
null,
),
}),
async getArtifactAction(targetId, _, artifactType) {
@ -315,6 +339,7 @@ describe('CDN Worker', () => {
} as any,
},
null,
null,
),
}),
async getArtifactAction(targetId, _, artifactType) {
@ -400,6 +425,7 @@ describe('CDN Worker', () => {
} as any,
},
null,
null,
),
}),
async getArtifactAction(targetId, _, artifactType) {
@ -483,6 +509,7 @@ describe('CDN Worker', () => {
} as any,
},
null,
null,
),
}),
async getArtifactAction(targetId, _, artifactType) {
@ -651,6 +678,7 @@ describe('CDN Worker', () => {
} as any,
},
null,
null,
),
}),
async getArtifactAction(targetId, _, artifactType) {
@ -702,6 +730,7 @@ describe('CDN Worker', () => {
} as any,
},
null,
null,
),
}),
async getArtifactAction(targetId, _, artifactType) {
@ -751,6 +780,7 @@ describe('CDN Worker', () => {
} as any,
},
null,
null,
),
}),
async getArtifactAction() {
@ -774,4 +804,456 @@ describe('CDN Worker', () => {
expect(response.status).toBe(403);
});
});
describe('use S3', () => {
const mockAgent = new MockAgent();
let r2MockPool: MockPool;
let s3MockPool: MockPool;
const r2Endpoint = 'http://localhost:3002';
const s3Endpoint = 'http://localhost:3003';
const TIMEOUT = 200;
const DELAY = TIMEOUT + 100;
const mockedFetch = (input: any, init?: any) => {
// Use undici's fetch with custom dispatcher to mock the network
return undiciFetch(input as any, {
...(init ?? {}),
dispatcher: mockAgent,
}) as Promise<Response>;
};
beforeEach(() => {
r2MockPool = mockAgent.get(r2Endpoint);
s3MockPool = mockAgent.get(s3Endpoint);
});
afterEach(async () => {
await r2MockPool?.close();
await s3MockPool?.close();
mockAgent.assertNoPendingInterceptors();
});
afterAll(() => mockAgent.close());
test('when fetching access key from R2 takes longer than a timeout', async () => {
const targetId = 'fake-target-id';
const services = [{ sdl: `type Query { dummy: String }` }];
const access = await createV2Token(targetId);
// Fetching the key from R2 takes longer than the timeout
r2MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`);
},
})
.reply(() => {
return {
statusCode: 200,
data: access.hash,
};
})
.delay(DELAY);
s3MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`);
},
})
.reply(() => {
return {
statusCode: 200,
data: access.hash,
};
});
s3MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/artifact/${targetId}/services`);
},
})
.reply(() => {
return {
statusCode: 200,
data: JSON.stringify(services),
};
});
const artifactStorageReader = new ArtifactStorageReader(
{
endpoint: r2Endpoint,
bucketName: 'artifacts',
client: new AwsClient({
accessKeyId: 'r2-fake-access-key',
secretAccessKey: 'r2-fake-secret-key',
sessionToken: 'r2-fake-session-token',
service: 's3',
fetch: mockedFetch,
}),
},
{
endpoint: s3Endpoint,
bucketName: 'artifacts',
client: new AwsClient({
accessKeyId: 's3-fake-access-key',
secretAccessKey: 's3-fake-secret-key',
sessionToken: 's3-fake-session-token',
service: 's3',
fetch: mockedFetch,
}),
},
null,
TIMEOUT,
);
const handleRequest = createRequestHandler({
isKeyValid: createIsKeyValid({
getCache: null,
waitUntil: null,
analytics: null,
artifactStorageReader,
}),
async getArtifactAction(targetId, contractName, artifactType, eTag) {
return artifactStorageReader.readArtifact(targetId, contractName, artifactType, eTag);
},
async fetchText(url) {
return mockedFetch(url).then(r => {
if (r.ok) {
return r.text();
}
throw new Error(`Failed to fetch ${url}, status: ${r.status}`);
});
},
});
const firstRequest = new Request(`https://fake-worker.com/${targetId}/schema`, {
headers: {
'x-hive-cdn-key': access.key,
},
});
const response = await handleRequest(firstRequest);
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual([{ sdl: `type Query { dummy: String }` }]);
});
test('when fetching artifact from R2 takes longer than a timeout', async () => {
const targetId = 'fake-target-id';
const services = [{ sdl: `type Query { dummy: String }` }];
const access = await createV2Token(targetId);
// Fetching the key is instant
r2MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`);
},
})
.reply(() => {
return {
statusCode: 200,
data: access.hash,
};
});
// Fetching the artifact from R2 takes longer than the timeout
r2MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/artifact/${targetId}/services`);
},
})
.reply(() => {
return {
statusCode: 200,
data: JSON.stringify(services),
};
})
.delay(DELAY);
s3MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/artifact/${targetId}/services`);
},
})
.reply(() => {
return {
statusCode: 200,
data: JSON.stringify(services),
};
});
const artifactStorageReader = new ArtifactStorageReader(
{
endpoint: r2Endpoint,
bucketName: 'artifacts',
client: new AwsClient({
accessKeyId: 'r2-fake-access-key',
secretAccessKey: 'r2-fake-secret-key',
sessionToken: 'r2-fake-session-token',
service: 's3',
fetch: mockedFetch,
}),
},
{
endpoint: s3Endpoint,
bucketName: 'artifacts',
client: new AwsClient({
accessKeyId: 's3-fake-access-key',
secretAccessKey: 's3-fake-secret-key',
sessionToken: 's3-fake-session-token',
service: 's3',
fetch: mockedFetch,
}),
},
null,
TIMEOUT,
);
const handleRequest = createRequestHandler({
isKeyValid: createIsKeyValid({
getCache: null,
waitUntil: null,
analytics: null,
artifactStorageReader,
}),
async getArtifactAction(targetId, contractName, artifactType, eTag) {
return artifactStorageReader.readArtifact(targetId, contractName, artifactType, eTag);
},
async fetchText(url) {
return mockedFetch(url).then(r => {
if (r.ok) {
return r.text();
}
throw new Error(`Failed to fetch ${url}, status: ${r.status}`);
});
},
});
const firstRequest = new Request(`https://fake-worker.com/${targetId}/schema`, {
headers: {
'x-hive-cdn-key': access.key,
},
});
const response = await handleRequest(firstRequest);
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual([{ sdl: `type Query { dummy: String }` }]);
});
test('when R2 is down and access key fails to be fetched', async () => {
const targetId = 'fake-target-id';
const services = [{ sdl: `type Query { dummy: String }` }];
const access = await createV2Token(targetId);
// R2 is down and we fail to fetch the key
r2MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`);
},
})
.reply(() => {
return {
statusCode: 500,
data: 'Please try again later',
};
});
s3MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`);
},
})
.reply(() => {
return {
statusCode: 200,
data: access.hash,
};
});
s3MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/artifact/${targetId}/services`);
},
})
.reply(() => {
return {
statusCode: 200,
data: JSON.stringify(services),
};
});
const artifactStorageReader = new ArtifactStorageReader(
{
endpoint: r2Endpoint,
bucketName: 'artifacts',
client: new AwsClient({
accessKeyId: 'r2-fake-access-key',
secretAccessKey: 'r2-fake-secret-key',
sessionToken: 'r2-fake-session-token',
service: 's3',
fetch: mockedFetch,
}),
},
{
endpoint: s3Endpoint,
bucketName: 'artifacts',
client: new AwsClient({
accessKeyId: 's3-fake-access-key',
secretAccessKey: 's3-fake-secret-key',
sessionToken: 's3-fake-session-token',
service: 's3',
fetch: mockedFetch,
}),
},
null,
);
const handleRequest = createRequestHandler({
isKeyValid: createIsKeyValid({
getCache: null,
waitUntil: null,
analytics: null,
artifactStorageReader,
}),
async getArtifactAction(targetId, contractName, artifactType, eTag) {
return artifactStorageReader.readArtifact(targetId, contractName, artifactType, eTag);
},
async fetchText(url) {
return mockedFetch(url).then(r => {
if (r.ok) {
return r.text();
}
throw new Error(`Failed to fetch ${url}, status: ${r.status}`);
});
},
});
const firstRequest = new Request(`https://fake-worker.com/${targetId}/schema`, {
headers: {
'x-hive-cdn-key': access.key,
},
});
const response = await handleRequest(firstRequest);
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual([{ sdl: `type Query { dummy: String }` }]);
});
test('when R2 is down after we got the access key', async () => {
const targetId = 'fake-target-id';
const services = [{ sdl: `type Query { dummy: String }` }];
const access = await createV2Token(targetId);
// R2 is down and we fail to fetch the key
r2MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/cdn-keys/${targetId}/${access.keyId}`);
},
})
.reply(() => {
return {
statusCode: 200,
data: access.hash,
};
});
r2MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/artifact/${targetId}/services`);
},
})
.reply(() => {
return {
statusCode: 500,
data: 'We are so down',
};
});
s3MockPool
.intercept({
path(path) {
return path.startsWith(`/artifacts/artifact/${targetId}/services`);
},
})
.reply(() => {
return {
statusCode: 200,
data: JSON.stringify(services),
};
});
const artifactStorageReader = new ArtifactStorageReader(
{
endpoint: r2Endpoint,
bucketName: 'artifacts',
client: new AwsClient({
accessKeyId: 'r2-fake-access-key',
secretAccessKey: 'r2-fake-secret-key',
sessionToken: 'r2-fake-session-token',
service: 's3',
fetch: mockedFetch,
}),
},
{
endpoint: s3Endpoint,
bucketName: 'artifacts',
client: new AwsClient({
accessKeyId: 's3-fake-access-key',
secretAccessKey: 's3-fake-secret-key',
sessionToken: 's3-fake-session-token',
service: 's3',
fetch: mockedFetch,
}),
},
null,
);
const handleRequest = createRequestHandler({
isKeyValid: createIsKeyValid({
getCache: null,
waitUntil: null,
analytics: null,
artifactStorageReader,
}),
async getArtifactAction(targetId, contractName, artifactType, eTag) {
return artifactStorageReader.readArtifact(targetId, contractName, artifactType, eTag);
},
async fetchText(url) {
return mockedFetch(url).then(r => {
if (r.ok) {
return r.text();
}
throw new Error(`Failed to fetch ${url}, status: ${r.status}`);
});
},
});
const firstRequest = new Request(`https://fake-worker.com/${targetId}/schema`, {
headers: {
'x-hive-cdn-key': access.key,
},
});
const response = await handleRequest(firstRequest);
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual([{ sdl: `type Query { dummy: String }` }]);
});
});
});

View file

@ -539,7 +539,19 @@ export async function main() {
bucketName: env.s3.bucketName,
};
const artifactStorageReader = new ArtifactStorageReader(s3, null);
const s3Mirror = env.s3Mirror
? {
client: new AwsClient({
accessKeyId: env.s3Mirror.credentials.accessKeyId,
secretAccessKey: env.s3Mirror.credentials.secretAccessKey,
service: 's3',
}),
endpoint: env.s3Mirror.endpoint,
bucketName: env.s3Mirror.bucketName,
}
: null;
const artifactStorageReader = new ArtifactStorageReader(s3, s3Mirror, null);
const artifactHandler = createArtifactRequestHandler({
isKeyValid: createIsKeyValid({

View file

@ -877,6 +877,9 @@ importers:
toucan-js:
specifier: 3.4.0
version: 3.4.0
undici:
specifier: 6.19.8
version: 6.19.8
zod:
specifier: 3.23.8
version: 3.23.8
@ -4161,6 +4164,7 @@ packages:
'@fastify/vite@6.0.7':
resolution: {integrity: sha512-+dRo9KUkvmbqdmBskG02SwigWl06Mwkw8SBDK1zTNH6vd4DyXbRvI7RmJEmBkLouSU81KTzy1+OzwHSffqSD6w==}
bundledDependencies: []
'@floating-ui/core@1.2.6':
resolution: {integrity: sha512-EvYTiXet5XqweYGClEmpu3BoxmsQ4hkj3QaYA6qEnigCWffTP3vNRwBReTdrwDwo7OoJ3wM8Uoe9Uk4n+d4hfg==}