Add a timeout to external schema composition (#1341)

This commit is contained in:
Kamil Kisiela 2023-02-10 17:14:40 +01:00 committed by GitHub
parent b824f5e906
commit dc9041adea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 99 additions and 100 deletions

View file

@ -17,8 +17,8 @@
"@sentry/tracing": "7.37.0",
"@trpc/server": "10.9.1",
"@whatwg-node/fetch": "0.7.1",
"async-retry": "1.3.3",
"dotenv": "16.0.3",
"got": "12.5.3",
"graphql": "16.6.0",
"ioredis": "5.3.0",
"zod": "3.20.5"

View file

@ -1,5 +1,6 @@
import { createHash, createHmac } from 'crypto';
import retry from 'async-retry';
import got from 'got';
import { RequestError } from 'got';
import type { DocumentNode } from 'graphql';
import {
ASTNode,
@ -20,7 +21,6 @@ import type { ErrorCode } from '@graphql-hive/external-composition';
import { stitchSchemas } from '@graphql-tools/stitch';
import { stitchingDirectives } from '@graphql-tools/stitching-directives';
import type { FastifyLoggerInstance } from '@hive/service-common';
import { fetch } from '@whatwg-node/fetch';
import type {
BuildInput,
BuildOutput,
@ -107,12 +107,6 @@ const EXTERNAL_COMPOSITION_RESULT = z.union([
.required(),
]);
class NetworkError extends Error {
constructor(message: string, public readonly statusCode: number) {
super(message);
}
}
function trimDescriptions(doc: DocumentNode): DocumentNode {
function trim<T extends ASTNode>(node: T): T {
if (node && 'description' in node && node.description) {
@ -201,6 +195,75 @@ function translateMessage(errorCode: string) {
}
}
async function callExternalService(
input: { url: string; headers: Record<string, string>; body: string },
logger: FastifyLoggerInstance,
) {
try {
const response = await got(input.url, {
method: 'POST',
headers: input.headers,
body: input.body,
responseType: 'text',
retry: {
limit: 2,
methods: ['POST'],
backoffLimit: 500,
},
timeout: {
request: 10_000,
},
});
return JSON.parse(response.body) as unknown;
} catch (error) {
if (error instanceof RequestError) {
if (error.response) {
const message = error.response.body ? error.response.body : error.response.statusMessage; // await res.text().catch(_ => Promise.resolve(res.statusText));
// If the response is a string starting with ERR_ it's a special error returned by the composition service.
// We don't want to throw an error in this case, but instead return a failure result.
if (typeof message === 'string') {
const translatedMessage = translateMessage(message);
if (translatedMessage) {
return {
type: 'failure',
result: {
errors: [
{
message: `External composition failure: ${translatedMessage}`,
source: 'graphql',
},
],
},
} satisfies CompositionFailure;
}
}
logger.info(
'Network error so return failure (status=%s, message=%s)',
error.response.statusCode,
error.message,
);
return {
type: 'failure',
result: {
errors: [
{
message: `External composition network failure: ${error.message}`,
source: 'graphql',
},
],
},
} satisfies CompositionFailure;
}
}
throw error;
}
}
const createFederation: (
redis: RedisInstance,
logger: FastifyLoggerInstance,
@ -230,104 +293,40 @@ const createFederation: (
}),
);
const signature = hash(decrypt(external.encryptedSecret), 'sha256', body);
logger.debug('Calling external composition service (url=%s)', external.endpoint);
logger.debug(
'Calling external composition service (url=%s, broker=%s)',
external.endpoint,
external.broker ? 'yes' : 'no',
);
const headers = {
Accept: 'application/json',
'Content-Type': 'application/json',
'x-hive-signature-256': signature,
};
const init = {
method: 'POST',
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
'x-hive-signature-256': signature,
},
url: external.endpoint,
headers,
body,
};
const response: unknown = await retry(
async (_, attempt) => {
logger.debug(
'Calling external composition service (broker=%s, attempt=%s, url=%s)',
external.broker ? 'yes' : 'no',
attempt,
external.endpoint,
);
const res = await (external.broker
? fetch(external.broker.endpoint, {
method: 'POST',
const parseResult = EXTERNAL_COMPOSITION_RESULT.safeParse(
await callExternalService(
external.broker
? {
url: external.broker.endpoint,
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
'x-hive-signature': external.broker.signature,
},
body: JSON.stringify({
url: external.endpoint,
...init,
}),
})
: fetch(external.endpoint, init)
).catch(async error => {
logger.error(error);
throw error;
});
if (!res.ok) {
const message = await res.text().catch(_ => Promise.resolve(res.statusText));
// If the response is a string starting with ERR_ it's a special error returned by the composition service.
// We don't want to throw an error in this case, but instead return a failure result.
// This is useful for cases where the composition service is not able to compose the schemas for technical reasons,
// and we do want to pass the error to the user and not do a retry.
if (typeof message === 'string') {
const translatedMessage = translateMessage(message);
if (translatedMessage) {
return {
type: 'failure',
result: {
errors: [
{
message: `External composition failure: ${translatedMessage}`,
source: 'graphql',
},
],
},
} satisfies CompositionFailure;
body: JSON.stringify(init),
}
}
// If it does not start with ERR_ we throw an error, which will be caught by the retry logic.
throw new NetworkError(message, res.status);
}
return res.json();
},
{
retries: 3,
},
).catch(async error => {
// The expected error
if (error instanceof NetworkError) {
logger.info(
'Network error so return failure (status=%s, message=%s)',
error.statusCode,
error.message,
);
return {
type: 'failure',
result: {
errors: [
{
message: `External composition network failure: [${error.statusCode}] ${error.message}`,
source: 'graphql',
},
],
},
} satisfies CompositionFailure;
}
throw error;
});
const parseResult = EXTERNAL_COMPOSITION_RESULT.safeParse(await response);
: init,
logger,
),
);
if (!parseResult.success) {
throw new Error(`External composition failure: invalid shape of data`);
@ -680,9 +679,9 @@ function reuse<I, O>(
await new Promise(resolve => setTimeout(resolve, 500));
cached = await readAction<O>(checksum, redis);
if (Date.now() - startedAt > 30_000) {
if (Date.now() - startedAt > 25_000) {
await removeAction(checksum, redis, logger);
throw new Error('Timeout after 30s');
throw new Error('Timeout after 25s');
}
}

View file

@ -613,9 +613,9 @@ importers:
'@trpc/server': 10.9.1
'@types/async-retry': 1.4.5
'@whatwg-node/fetch': 0.7.1
async-retry: 1.3.3
dotenv: 16.0.3
fastify: 3.29.5
got: 12.5.3
graphql: 16.6.0
ioredis: 5.3.0
pino-pretty: 9.1.1
@ -628,8 +628,8 @@ importers:
'@sentry/tracing': 7.37.0
'@trpc/server': 10.9.1
'@whatwg-node/fetch': 0.7.1
async-retry: 1.3.3
dotenv: 16.0.3
got: 12.5.3
graphql: 16.6.0
ioredis: 5.3.0
zod: 3.20.5