Request Proxy in CF Worker (#517)

This commit is contained in:
Kamil Kisiela 2022-11-07 14:27:19 +01:00 committed by GitHub
parent bf11f56469
commit 51cb4bb412
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
42 changed files with 1009 additions and 49 deletions

View file

@ -59,7 +59,7 @@ module.exports = {
},
overrides: [
{
files: ['packages/web/**', 'packages/services/cdn-worker/**', 'packages/services/police-worker/**'],
files: ['packages/web/**', 'packages/services/*-worker/**'],
rules: {
// because this folder is excluded in tsconfig.json
'@typescript-eslint/no-floating-promises': 'off',

View file

@ -461,6 +461,7 @@ jobs:
redis \
supertokens \
local_cdn \
local_broker \
external_composition
- name: Set up Docker Buildx

View file

@ -14,7 +14,8 @@ import { deployDocs } from './services/docs';
import { deployRedis } from './services/redis';
import { deployKafka } from './services/kafka';
import { deployMetrics } from './services/observability';
import { deployCloudflare } from './services/cloudflare';
import { deployCFCDN } from './services/cf-cdn';
import { deployCFBroker } from './services/cf-broker';
import { deployCloudflarePolice } from './services/police';
import { deployBotKube } from './services/bot-kube';
import { deployProxy } from './services/proxy';
@ -70,7 +71,13 @@ const deploymentEnv: DeploymentEnvironment = {
deployBotKube({ envName });
deployMetrics({ envName });
const cloudflare = deployCloudflare({
const cdn = deployCFCDN({
envName,
rootDns,
packageHelper,
});
const cfBroker = deployCFBroker({
envName,
rootDns,
packageHelper,
@ -106,6 +113,7 @@ const webhooksApi = deployWebhooks({
deploymentEnv,
redis: redisApi,
heartbeat: heartbeatsConfig.get('webhooks'),
broker: cfBroker,
});
const emailsApi = deployEmails({
@ -171,6 +179,7 @@ const schemaApi = deploySchema({
storageContainer,
deploymentEnv,
redis: redisApi,
broker: cfBroker,
});
const supertokensApiKey = new random.RandomPassword('supertokens-api-key', { length: 31, special: false });
@ -201,7 +210,7 @@ const graphqlApi = deployGraphQL({
dbMigrations,
redis: redisApi,
usage: usageApi,
cloudflare,
cdn,
usageEstimator: usageEstimationApi,
rateLimit: rateLimitApi,
billing: billingApi,

View file

@ -0,0 +1,34 @@
import * as pulumi from '@pulumi/pulumi';
import { CloudflareBroker } from '../utils/cloudflare';
import { PackageHelper } from '../utils/pack';
const commonConfig = new pulumi.Config('common');
const cfConfig = new pulumi.Config('cloudflareCustom');
const commonEnv = commonConfig.requireObject<Record<string, string>>('env');
export type Broker = ReturnType<typeof deployCFBroker>;
export function deployCFBroker({
rootDns,
envName,
packageHelper,
}: {
rootDns: string;
envName: string;
packageHelper: PackageHelper;
}) {
const cfBrokerSignature = commonConfig.requireSecret('cfBrokerSignature');
const broker = new CloudflareBroker({
envName,
zoneId: cfConfig.require('zoneId'),
// We can't cdn for staging env, since CF certificate only covers
// one level of subdomains. See: https://community.cloudflare.com/t/ssl-handshake-error-cloudflare-proxy/175088
// So for staging env, we are going to use `broker-staging` instead of `broker.staging`.
cdnDnsRecord: envName === 'staging' ? `broker-${rootDns}` : `broker.${rootDns}`,
secretSignature: cfBrokerSignature,
sentryDsn: commonEnv.SENTRY_DSN,
release: packageHelper.currentReleaseId(),
});
return broker.deploy();
}

View file

@ -1,5 +1,5 @@
import * as pulumi from '@pulumi/pulumi';
import { CloudflareCDN } from '../utils/cdn';
import { CloudflareCDN } from '../utils/cloudflare';
import { PackageHelper } from '../utils/pack';
const commonConfig = new pulumi.Config('common');
@ -7,9 +7,9 @@ const cfConfig = new pulumi.Config('cloudflareCustom');
const commonEnv = commonConfig.requireObject<Record<string, string>>('env');
export type Cloudflare = ReturnType<typeof deployCloudflare>;
export type CDN = ReturnType<typeof deployCFCDN>;
export function deployCloudflare({
export function deployCFCDN({
rootDns,
envName,
packageHelper,

View file

@ -1,6 +1,6 @@
import * as pulumi from '@pulumi/pulumi';
import * as azure from '@pulumi/azure';
import { Cloudflare } from './cloudflare';
import { CDN } from './cf-cdn';
import { parse } from 'pg-connection-string';
import { Tokens } from './tokens';
import { Webhooks } from './webhooks';
@ -37,7 +37,7 @@ export function deployGraphQL({
tokens,
webhooks,
schema,
cloudflare,
cdn,
redis,
usage,
usageEstimator,
@ -56,7 +56,7 @@ export function deployGraphQL({
webhooks: Webhooks;
schema: Schema;
redis: Redis;
cloudflare: Cloudflare;
cdn: CDN;
usage: Usage;
usageEstimator: UsageEstimator;
dbMigrations: DbMigrations;
@ -111,9 +111,9 @@ export function deployGraphQL({
CDN_CF_BASE_PATH: 'https://api.cloudflare.com/client/v4/accounts',
CDN_CF_ACCOUNT_ID: cloudflareConfig.require('accountId'),
CDN_CF_AUTH_TOKEN: cloudflareConfig.requireSecret('apiToken'),
CDN_CF_NAMESPACE_ID: cloudflare.cfStorageNamespaceId,
CDN_BASE_URL: cloudflare.workerBaseUrl,
CDN_AUTH_PRIVATE_KEY: cloudflare.authPrivateKey,
CDN_CF_NAMESPACE_ID: cdn.cfStorageNamespaceId,
CDN_BASE_URL: cdn.workerBaseUrl,
CDN_AUTH_PRIVATE_KEY: cdn.authPrivateKey,
// Hive
HIVE: '1',
HIVE_REPORTING: '1',

View file

@ -5,6 +5,7 @@ import { isProduction } from '../utils/helpers';
import { DeploymentEnvironment } from '../types';
import { Redis } from './redis';
import { PackageHelper } from '../utils/pack';
import type { Broker } from './cf-broker';
const commonConfig = new pulumi.Config('common');
const commonEnv = commonConfig.requireObject<Record<string, string>>('env');
@ -16,11 +17,13 @@ export function deploySchema({
redis,
packageHelper,
storageContainer,
broker,
}: {
storageContainer: azure.storage.Container;
packageHelper: PackageHelper;
deploymentEnv: DeploymentEnvironment;
redis: Redis;
broker: Broker;
}) {
return new RemoteArtifactAsServiceDeployment(
'schema-service',
@ -35,6 +38,9 @@ export function deploySchema({
REDIS_PORT: String(redis.config.port),
REDIS_PASSWORD: redis.config.password,
ENCRYPTION_SECRET: commonConfig.requireSecret('encryptionSecret'),
REQUEST_BROKER: '1',
REQUEST_BROKER_ENDPOINT: broker.workerBaseUrl,
REQUEST_BROKER_SIGNATURE: broker.secretSignature,
},
readinessProbe: '/_readiness',
livenessProbe: '/_health',

View file

@ -4,6 +4,7 @@ import { RemoteArtifactAsServiceDeployment } from '../utils/remote-artifact-as-s
import { DeploymentEnvironment } from '../types';
import { Redis } from './redis';
import { PackageHelper } from '../utils/pack';
import type { Broker } from './cf-broker';
const commonConfig = new pulumi.Config('common');
const commonEnv = commonConfig.requireObject<Record<string, string>>('env');
@ -16,11 +17,13 @@ export function deployWebhooks({
deploymentEnv,
redis,
heartbeat,
broker,
}: {
storageContainer: azure.storage.Container;
packageHelper: PackageHelper;
deploymentEnv: DeploymentEnvironment;
redis: Redis;
broker: Broker;
heartbeat?: string;
}) {
return new RemoteArtifactAsServiceDeployment(
@ -37,6 +40,9 @@ export function deployWebhooks({
REDIS_PORT: String(redis.config.port),
REDIS_PASSWORD: redis.config.password,
BULLMQ_COMMANDS_FROM_ROOT: 'true',
REQUEST_BROKER: '1',
REQUEST_BROKER_ENDPOINT: broker.workerBaseUrl,
REQUEST_BROKER_SIGNATURE: broker.secretSignature,
},
readinessProbe: '/_readiness',
livenessProbe: '/_health',

View file

@ -68,3 +68,55 @@ export class CloudflareCDN {
};
}
}
export class CloudflareBroker {
constructor(
private config: {
envName: string;
zoneId: string;
cdnDnsRecord: string;
secretSignature: pulumi.Output<string>;
sentryDsn: string;
release: string;
}
) {}
deploy() {
const script = new cf.WorkerScript('hive-broker-worker', {
content: readFileSync(resolve(__dirname, '../../packages/services/broker-worker/dist/worker.js'), 'utf-8'),
name: `hive-broker-${this.config.envName}`,
secretTextBindings: [
{
name: 'SIGNATURE',
text: this.config.secretSignature,
},
{
name: 'SENTRY_DSN',
text: this.config.sentryDsn,
},
{
name: 'SENTRY_ENVIRONMENT',
text: this.config.envName,
},
{
name: 'SENTRY_RELEASE',
text: this.config.release,
},
],
});
const workerBase = this.config.cdnDnsRecord;
const workerUrl = `https://${workerBase}`;
new cf.WorkerRoute('cf-hive-broker-worker', {
scriptName: script.name,
pattern: `${workerBase}/*`,
zoneId: this.config.zoneId,
});
return {
secretSignature: this.config.secretSignature,
workerBaseUrl: workerUrl,
};
}
}

View file

@ -143,6 +143,29 @@ services:
PORT: 3004
CDN_AUTH_PRIVATE_KEY: 1e1064ef9cda8bf38936b77317e90dc3
local_broker:
image: node:16.13.2-alpine3.14
entrypoint:
- '/bin/sh'
- '/run-local-broker.sh'
networks:
- 'stack'
healthcheck:
test: ['CMD', 'wget', '--spider', '-q', 'localhost:3013/_readiness']
interval: 5s
timeout: 5s
retries: 6
start_period: 5s
ports:
- 3013:3013
volumes:
- '../packages/services/broker-worker/dist/dev.js:/broker.js'
- './run-local-broker.sh:/run-local-broker.sh'
environment:
NODE_ENV: production
PORT: 3013
CF_BROKER_SIGNATURE: secretSignature
external_composition:
image: node:16.13.2-alpine3.14
entrypoint:
@ -219,6 +242,8 @@ services:
condition: service_healthy
local_cdn:
condition: service_healthy
local_broker:
condition: service_healthy
ports:
- 3001:3001
environment:
@ -244,6 +269,9 @@ services:
RATE_LIMIT_ENDPOINT: http://rate-limit:3009
BILLING_ENDPOINT: http://stripe-billing:3010
EMAILS_ENDPOINT: http://emails:3011
REQUEST_BROKER: '1'
REQUEST_BROKER_SIGNATURE: 'secretSignature'
REQUEST_BROKER_ENDPOINT: 'http://local_broker:3013'
CDN: '1'
CDN_CF_BASE_PATH: http://local_cdn:3004
CDN_CF_ACCOUNT_ID: 103df45224310d669213971ce28b5b70
@ -306,6 +334,8 @@ services:
depends_on:
redis:
condition: service_healthy
local_broker:
condition: service_healthy
ports:
- 3005:3005
environment:
@ -315,6 +345,9 @@ services:
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_PASSWORD: test
REQUEST_BROKER: '1'
REQUEST_BROKER_SIGNATURE: 'secretSignature'
REQUEST_BROKER_ENDPOINT: 'http://local_broker:3013'
emails:
image: '${DOCKER_REGISTRY}emails${DOCKER_TAG}'

View file

@ -0,0 +1,5 @@
#!/bin/sh
set -e
node broker.js

View file

@ -0,0 +1,10 @@
*-debug.log
*-error.log
/.nyc_output
/dist
/lib
/package-lock.json
/tmp
node_modules
src/sdk.ts
schema.graphql

View file

@ -0,0 +1,2 @@
src
examples

View file

@ -0,0 +1,6 @@
## Hive Broker Worker
Idea here is to have a broker worker that can be used to make requests outside GraphQL Hive and without any access to the internal network.
This is very useful for example for making requests to external APIs.
Look at [models.ts](./src/models.ts) to see the structure accepted payload.

View file

@ -0,0 +1,21 @@
import { build } from 'esbuild';
import { fileURLToPath } from 'url';
import { dirname } from 'path';
(async function main() {
const __dirname = dirname(fileURLToPath(import.meta.url));
const localBuild = !!process.env.BUILD_FOR_LOCAL;
const outfile = localBuild ? '/dist/dev.js' : '/dist/worker.js';
await build({
entryPoints: [__dirname + (localBuild ? '/src/dev.ts' : '/src/index.ts')],
bundle: true,
platform: localBuild ? 'node' : 'browser',
target: localBuild ? undefined : 'chrome95',
minify: false,
outfile: __dirname + '/' + outfile,
treeShaking: true,
});
console.info(`Done, file: ${outfile}`);
})();

View file

@ -0,0 +1,26 @@
{
"name": "@hive/broker-script",
"private": true,
"version": "0.0.0",
"license": "MIT",
"scripts": {
"dev": "tsup-node src/dev.ts --sourcemap --watch --onSuccess \"node --enable-source-maps dist/dev.js\"",
"build-local": "BUILD_FOR_LOCAL=1 node build.mjs",
"build": "node build.mjs",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"toucan-js": "2.7.0",
"zod": "3.19.1"
},
"devDependencies": {
"esbuild": "0.14.39",
"@whatwg-node/fetch": "0.4.7",
"@whatwg-node/server": "0.4.11",
"itty-router": "2.4.0",
"itty-router-extras": "0.4.2",
"nock": "13.2.4",
"@types/service-worker-mock": "2.0.1",
"@cloudflare/workers-types": "3.4.0"
}
}

View file

@ -0,0 +1,3 @@
export function isSignatureValid(signature: string) {
return SIGNATURE === signature;
}

View file

@ -0,0 +1,24 @@
import { createFetch, Response, Request, Headers, ReadableStream } from '@whatwg-node/fetch';
const nodeFetch = createFetch({
useNodeFetch: true,
});
if (!globalThis.Response) {
globalThis.Response = Response;
}
if (!globalThis.Request) {
globalThis.Request = Request;
}
if (!globalThis.Headers) {
globalThis.Headers = Headers;
}
if (!globalThis.ReadableStream) {
globalThis.ReadableStream = ReadableStream;
}
if (!globalThis.fetch) {
globalThis.fetch = nodeFetch.fetch;
}
// eslint-disable-next-line no-process-env
(globalThis as any).SIGNATURE = process.env.CF_BROKER_SIGNATURE || '';

View file

@ -0,0 +1,31 @@
import './dev-polyfill';
import { createServer } from 'http';
import { handleRequest } from './handler';
import { isSignatureValid } from './auth';
import { createServerAdapter } from '@whatwg-node/server';
import { Router } from 'itty-router';
// eslint-disable-next-line no-process-env
const PORT = process.env.PORT ? parseInt(process.env.PORT, 10) : 4010;
function main() {
const app = createServerAdapter(Router());
app.get(
'/_readiness',
() =>
new Response(null, {
status: 200,
})
);
app.all('*', (request: Request) => handleRequest(request, isSignatureValid));
const server = createServer(app);
return new Promise<void>(resolve => {
server.listen(PORT, '0.0.0.0', resolve);
});
}
main().catch(e => console.error(e));

View file

@ -0,0 +1,55 @@
export class InvalidRequestFormat extends Response {
constructor(message: string) {
super(
JSON.stringify({
code: 'INVALID_REQUEST_FORMAT',
error: `Invalid artifact type: "${message}"`,
}),
{
status: 400,
}
);
}
}
export class MissingSignature extends Response {
constructor() {
super(
JSON.stringify({
code: 'MISSING_SIGNATURE',
error: `Broker needs a signature to verify the origin of the request`,
}),
{
status: 401,
}
);
}
}
export class InvalidSignature extends Response {
constructor() {
super(
JSON.stringify({
code: 'INVALID_SIGNATURE',
error: `Failed to verify the origin of the request`,
}),
{
status: 403,
}
);
}
}
export class UnexpectedError extends Response {
constructor(errorId: string) {
super(
JSON.stringify({
code: 'UNEXPECTED_ERROR',
error: `Please try again later, or contact Hive support if the problem persists (error_id=${errorId})`,
}),
{
status: 500,
}
);
}
}

View file

@ -0,0 +1,17 @@
export {};
declare global {
/**
* Signature used to verify the origin of the request
*/
let SIGNATURE: string;
let SENTRY_DSN: string;
/**
* Name of the environment, e.g. staging, production
*/
let SENTRY_ENVIRONMENT: string;
/**
* Id of the release
*/
let SENTRY_RELEASE: string;
}

View file

@ -0,0 +1,46 @@
import { isSignatureValid } from './auth';
import { parseIncomingRequest } from './models';
/**
* gatherResponse awaits and returns a response body as a string.
*/
async function gatherResponse(response: Response) {
const contentType = response.headers.get('content-type') || '';
if (contentType.includes('json')) {
return JSON.stringify(await response.json());
} else if (contentType.includes('application/text')) {
return response.text();
} else if (contentType.startsWith('text/')) {
return response.text();
} else {
return response.text();
}
}
export async function handleRequest(request: Request, keyValidator: typeof isSignatureValid) {
const parsedRequest = await parseIncomingRequest(request, keyValidator);
if ('error' in parsedRequest) {
return parsedRequest.error;
}
const init =
parsedRequest.method === 'GET'
? {
method: 'GET',
headers: parsedRequest.headers,
}
: {
method: 'POST',
body: parsedRequest.body,
headers: parsedRequest.headers,
};
const response = await fetch(parsedRequest.url, init);
const text = await gatherResponse(response);
return new Response(text, {
...init,
status: response.status,
statusText: response.statusText,
});
}

View file

@ -0,0 +1,21 @@
import Toucan from 'toucan-js';
import { isSignatureValid } from './auth';
import { UnexpectedError } from './errors';
import { handleRequest } from './handler';
self.addEventListener('fetch', event => {
try {
event.respondWith(handleRequest(event.request, isSignatureValid));
} catch (error) {
const sentry = new Toucan({
dsn: SENTRY_DSN,
environment: SENTRY_ENVIRONMENT,
release: SENTRY_RELEASE,
context: event,
allowedHeaders: ['user-agent', 'cf-ipcountry', 'accept-encoding', 'accept', 'x-real-ip', 'cf-connecting-ip'],
allowedSearchParams: /(.*)/,
});
const eventId = sentry.captureException(error);
event.respondWith(new UnexpectedError(eventId ?? 'unknown'));
}
});

View file

@ -0,0 +1,55 @@
import { z } from 'zod';
import { isSignatureValid } from './auth';
import { MissingSignature, InvalidSignature, InvalidRequestFormat } from './errors';
const SIGNATURE_HEADER_NAME = 'x-hive-signature';
const RequestModelSchema = z.union([
z.object({
method: z.literal('GET'),
url: z.string().min(1),
headers: z.record(z.string()).optional(),
}),
z.object({
method: z.literal('POST'),
url: z.string().min(1),
headers: z.record(z.string()).optional(),
body: z.string().optional(),
}),
]);
export async function parseIncomingRequest(
request: Request,
keyValidator: typeof isSignatureValid
): Promise<{ error: Response } | z.infer<typeof RequestModelSchema>> {
if (request.method !== 'POST') {
return {
error: new Response('Only POST requests are allowed', {
status: 405,
statusText: 'Method Not Allowed',
}),
};
}
const signature = request.headers.get(SIGNATURE_HEADER_NAME);
if (!signature) {
return {
error: new MissingSignature(),
};
}
if (!keyValidator(signature)) {
return {
error: new InvalidSignature(),
};
}
const parseResult = RequestModelSchema.safeParse(await request.json<unknown>());
if (!parseResult.success) {
return { error: new InvalidRequestFormat(parseResult.error.message) };
}
return parseResult.data;
}

View file

@ -0,0 +1,339 @@
import '../src/dev-polyfill';
import { handleRequest } from '../src/handler';
import { InvalidSignature, MissingSignature, InvalidRequestFormat } from '../src/errors';
import { isSignatureValid } from '../src/auth';
import nock from 'nock';
const SignatureValidators = {
AlwaysTrue: () => true,
AlwaysFalse: () => false,
Real: isSignatureValid,
};
function mockWorkerEnv(input: { SIGNATURE: string }) {
Object.defineProperties(globalThis, {
SIGNATURE: {
value: input.SIGNATURE,
},
});
}
function clearWorkerEnv() {
Object.defineProperties(globalThis, {
SIGNATURE: {
value: undefined,
},
});
}
afterEach(clearWorkerEnv);
afterEach(nock.cleanAll);
test('401 on missing signature', async () => {
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
const request = new Request('https://fake-worker.com/', {
method: 'POST',
});
const response = await handleRequest(request, SignatureValidators.Real);
expect(response instanceof MissingSignature).toBeTruthy();
expect(response.status).toBe(401);
});
test('403 on non-matching signature', async () => {
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
const request = new Request('https://fake-worker.com/', {
method: 'POST',
headers: {
'x-hive-signature': '654321',
},
});
const response = await handleRequest(request, SignatureValidators.Real);
expect(response instanceof InvalidSignature).toBeTruthy();
expect(response.status).toBe(403);
});
test('405 allow only POST method', async () => {
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
let request = new Request('https://fake-worker.com/', {
method: 'GET',
headers: {
'x-hive-signature': SIGNATURE,
},
});
let response = await handleRequest(request, SignatureValidators.Real);
expect(response.status).toBe(405);
request = new Request('https://fake-worker.com/', {
method: 'PUT',
headers: {
'x-hive-signature': SIGNATURE,
},
body: JSON.stringify({}),
});
response = await handleRequest(request, SignatureValidators.Real);
expect(response.status).toBe(405);
});
test('400 on invalid request format', async () => {
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
const request = new Request('https://fake-worker.com/', {
method: 'POST',
headers: {
'x-hive-signature': SIGNATURE,
},
body: JSON.stringify({}),
});
const response = await handleRequest(request, SignatureValidators.Real);
expect(response instanceof InvalidRequestFormat).toBeTruthy();
expect(response.status).toBe(400);
});
test('GET text/plain', async () => {
nock('http://localhost')
.get('/webhook')
.once()
.matchHeader('X-Key', 'key')
.matchHeader('accept', 'text/plain')
.reply(200, 'OK');
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
const request = new Request('https://fake-worker.com/', {
method: 'POST',
headers: {
'x-hive-signature': SIGNATURE,
},
body: JSON.stringify({
url: 'http://localhost/webhook',
method: 'GET',
headers: {
'X-Key': 'key',
accept: 'text/plain',
},
}),
});
const response = await handleRequest(request, SignatureValidators.Real);
expect(response.status).toBe(200);
expect(await response.text()).toBe('OK');
});
test('GET application/json', async () => {
nock('http://localhost')
.get('/webhook')
.once()
.matchHeader('X-Key', 'key')
.matchHeader('accept', 'application/json')
.reply(200, {
message: 'OK',
});
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
const request = new Request('https://fake-worker.com/', {
method: 'POST',
headers: {
'x-hive-signature': SIGNATURE,
},
body: JSON.stringify({
url: 'http://localhost/webhook',
method: 'GET',
headers: {
'X-Key': 'key',
accept: 'application/json',
},
}),
});
const response = await handleRequest(request, SignatureValidators.Real);
expect(response.status).toBe(200);
expect(await response.text()).toBe(
JSON.stringify({
message: 'OK',
})
);
});
test('POST text/plain', async () => {
nock('http://localhost')
.post('/webhook')
.once()
.matchHeader('X-Key', 'key')
.matchHeader('accept', 'text/plain')
.reply(200, 'OK');
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
const request = new Request('https://fake-worker.com/', {
method: 'POST',
headers: {
'x-hive-signature': SIGNATURE,
},
body: JSON.stringify({
url: 'http://localhost/webhook',
method: 'POST',
headers: {
'X-Key': 'key',
accept: 'text/plain',
},
}),
});
const response = await handleRequest(request, SignatureValidators.Real);
expect(response.status).toBe(200);
expect(await response.text()).toBe('OK');
});
test('POST application/json', async () => {
nock('http://localhost')
.post('/webhook')
.once()
.matchHeader('X-Key', 'key')
.matchHeader('accept', 'application/json')
.reply(200, {
message: 'OK',
});
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
const request = new Request('https://fake-worker.com/', {
method: 'POST',
headers: {
'x-hive-signature': SIGNATURE,
},
body: JSON.stringify({
url: 'http://localhost/webhook',
method: 'POST',
headers: {
'X-Key': 'key',
accept: 'application/json',
},
}),
});
const response = await handleRequest(request, SignatureValidators.Real);
expect(response.status).toBe(200);
expect(await response.text()).toBe(
JSON.stringify({
message: 'OK',
})
);
});
test('POST application/json + body', async () => {
nock('http://localhost')
.post('/webhook')
.once()
.matchHeader('X-Key', 'key')
.matchHeader('accept', 'application/json')
.reply((_, requestBody) => [
200,
{
receivedBody: requestBody,
},
]);
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
const request = new Request('https://fake-worker.com/', {
method: 'POST',
headers: {
'x-hive-signature': SIGNATURE,
},
body: JSON.stringify({
url: 'http://localhost/webhook',
method: 'POST',
headers: {
'X-Key': 'key',
accept: 'application/json',
},
body: JSON.stringify({
message: 'OK',
}),
}),
});
const response = await handleRequest(request, SignatureValidators.Real);
expect(response.status).toBe(200);
expect(await response.text()).toBe(
JSON.stringify({
receivedBody: JSON.stringify({
message: 'OK',
}),
})
);
});
test('passthrough errors', async () => {
nock('http://localhost').get('/error').once().reply(500, 'Internal Server Error');
const SIGNATURE = '123456';
mockWorkerEnv({
SIGNATURE,
});
const request = new Request('https://fake-worker.com/', {
method: 'POST',
headers: {
'x-hive-signature': SIGNATURE,
},
body: JSON.stringify({
url: 'http://localhost/error',
method: 'GET',
headers: {
'X-Key': 'key',
accept: 'text/plain',
},
}),
});
const response = await handleRequest(request, SignatureValidators.Real);
expect(response.status).toBe(500);
expect(await response.text()).toBe('Internal Server Error');
});

View file

@ -0,0 +1,19 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"outDir": "./dist",
"module": "commonjs",
"target": "esnext",
"lib": ["esnext"],
"skipLibCheck": true,
"alwaysStrict": true,
"strict": true,
"preserveConstEnums": true,
"moduleResolution": "node",
"sourceMap": true,
"esModuleInterop": true,
"types": ["@cloudflare/workers-types", "@types/service-worker-mock"]
},
"include": ["src"],
"exclude": ["node_modules", "dist", "test", "src/dev.ts", "src/dev-polyfill.ts"]
}

View file

@ -4,7 +4,7 @@
"version": "0.0.0",
"license": "MIT",
"scripts": {
"dev": "tsup-node src/dev.ts --watch --onSuccess \"node dist/dev.js\"",
"dev": "tsup-node src/dev.ts --sourcemap --watch --onSuccess \"node --enable-source-maps dist/dev.js\"",
"build-local": "BUILD_FOR_LOCAL=1 node build.mjs",
"build": "node build.mjs",
"typecheck": "tsc --noEmit"

View file

@ -62,7 +62,7 @@ function postmark(config: PostmarkEmailProviderConfig, emailFrom: string) {
});
if (!response.ok) {
const details = await response.json();
const details: any = await response.json();
throw new Error(details.Message ?? response.statusText);
}
},

View file

@ -16,3 +16,6 @@ Service for validating schemas or verifying whether a composite GraphQL schema c
| `SENTRY_DSN` | No | The DSN for reporting errors to Sentry. | `https://dooobars@o557896.ingest.sentry.io/12121212` |
| `PROMETHEUS_METRICS` | No | Whether Prometheus metrics should be enabled | `1` (enabled) or `0` (disabled) |
| `PROMETHEUS_METRICS_LABEL_INSTANCE` | No | The instance label added for the prometheus metrics. | `webhooks-service` |
| `REQUEST_BROKER` | No | Whether Request Broker should be enabled. | `1` (enabled) or `0` (disabled) |
| `REQUEST_BROKER_ENDPOINT` | No | The address | `https://broker.worker.dev` |
| `REQUEST_BROKER_SIGNATURE` | No | A secret signature needed to verify the request origin | `hbsahdbzxch123` |

View file

@ -24,6 +24,10 @@ export const schemaBuilderApiRouter = trpc
logger: FastifyLoggerInstance;
redis: Redis;
decrypt(value: string): string;
broker: {
endpoint: string;
signature: string;
} | null;
}>()
.mutation('supergraph', {
input: z
@ -49,6 +53,11 @@ export const schemaBuilderApiRouter = trpc
return await pickOrchestrator(input.type, ctx.redis, ctx.logger, ctx.decrypt).supergraph(
input.schemas,
input.external
? {
...input.external,
broker: ctx.broker,
}
: null
);
},
})
@ -69,6 +78,11 @@ export const schemaBuilderApiRouter = trpc
return await pickOrchestrator(input.type, ctx.redis, ctx.logger, ctx.decrypt).validate(
input.schemas,
input.external
? {
...input.external,
broker: ctx.broker,
}
: null
);
},
})
@ -89,6 +103,11 @@ export const schemaBuilderApiRouter = trpc
return await pickOrchestrator(input.type, ctx.redis, ctx.logger, ctx.decrypt).build(
input.schemas,
input.external
? {
...input.external,
broker: ctx.broker,
}
: null
);
},
});

View file

@ -24,6 +24,17 @@ const EnvironmentModel = zod.object({
ENCRYPTION_SECRET: zod.string(),
});
const RequestBrokerModel = zod.union([
zod.object({
REQUEST_BROKER: emptyString(zod.literal('0').optional()),
}),
zod.object({
REQUEST_BROKER: zod.literal('1'),
REQUEST_BROKER_ENDPOINT: zod.string().min(1),
REQUEST_BROKER_SIGNATURE: zod.string().min(1),
}),
]);
const SentryModel = zod.union([
zod.object({
SENTRY: emptyString(zod.literal('0').optional()),
@ -54,6 +65,8 @@ const configs = {
redis: RedisModel.safeParse(process.env),
// eslint-disable-next-line no-process-env
prometheus: PrometheusModel.safeParse(process.env),
// eslint-disable-next-line no-process-env
requestBroker: RequestBrokerModel.safeParse(process.env),
};
const environmentErrors: Array<string> = [];
@ -81,6 +94,7 @@ const base = extractConfig(configs.base);
const sentry = extractConfig(configs.sentry);
const redis = extractConfig(configs.redis);
const prometheus = extractConfig(configs.prometheus);
const requestBroker = extractConfig(configs.requestBroker);
export const env = {
environment: base.ENVIRONMENT,
@ -103,4 +117,11 @@ export const env = {
},
}
: null,
requestBroker:
requestBroker.REQUEST_BROKER === '1'
? {
endpoint: requestBroker.REQUEST_BROKER_ENDPOINT,
signature: requestBroker.REQUEST_BROKER_SIGNATURE,
}
: null,
} as const;

View file

@ -102,7 +102,7 @@ async function main() {
trpcOptions: {
router: schemaBuilderApiRouter,
createContext({ req }: CreateFastifyContextOptions) {
return { redis, logger: req.log, decrypt };
return { redis, logger: req.log, decrypt, broker: env.requestBroker };
},
},
});

View file

@ -141,17 +141,34 @@ const createFederation: (
);
const signature = hash(decrypt(external.encryptedSecret), 'sha256', body);
logger.debug('Calling external composition service (url=%s)', external.endpoint);
const init = {
method: 'POST',
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
'x-hive-signature-256': signature,
},
body,
};
const response = await retry(
async () => {
const response = await fetch(external.endpoint, {
body,
method: 'POST',
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
'x-hive-signature-256': signature,
},
}).catch(error => {
const response = await (external.broker
? fetch(external.broker.endpoint, {
method: 'POST',
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
'x-hive-signature': external.broker.signature,
},
body: JSON.stringify({
url: external.endpoint,
...init,
}),
})
: fetch(external.endpoint, init)
).catch(error => {
logger.error(error);
return Promise.reject(error);
@ -162,15 +179,14 @@ const createFederation: (
throw new Error(`External composition failure: ${response.status} ${message}`);
}
return response;
return response.json();
},
{
retries: 3,
}
);
const result = await response.json();
const parseResult = EXTERNAL_COMPOSITION_RESULT.safeParse(result);
const parseResult = EXTERNAL_COMPOSITION_RESULT.safeParse(await response);
if (!parseResult.success) {
throw new Error(`External composition failure: invalid shape of data`);

View file

@ -34,4 +34,8 @@ export type SupergraphOutput = {
export type ExternalComposition = {
endpoint: string;
encryptedSecret: string;
broker: {
endpoint: string;
signature: string;
} | null;
} | null;

View file

@ -16,3 +16,6 @@ This service takes care of delivering WebHooks.
| `SENTRY_DSN` | No | The DSN for reporting errors to Sentry. | `https://dooobars@o557896.ingest.sentry.io/12121212` |
| `PROMETHEUS_METRICS` | No | Whether Prometheus metrics should be enabled | `1` (enabled) or `0` (disabled) |
| `PROMETHEUS_METRICS_LABEL_INSTANCE` | No | The instance label added for the prometheus metrics. | `webhooks-service` |
| `REQUEST_BROKER` | No | Whether Request Broker should be enabled. | `1` (enabled) or `0` (disabled) |
| `REQUEST_BROKER_ENDPOINT` | No | The address | `https://broker.worker.dev` |
| `REQUEST_BROKER_SIGNATURE` | No | A secret signature needed to verify the request origin | `hbsahdbzxch123` |

View file

@ -30,6 +30,17 @@ const RedisModel = zod.object({
REDIS_PASSWORD: emptyString(zod.string().optional()),
});
const RequestBrokerModel = zod.union([
zod.object({
REQUEST_BROKER: emptyString(zod.literal('0').optional()),
}),
zod.object({
REQUEST_BROKER: zod.literal('1'),
REQUEST_BROKER_ENDPOINT: zod.string().min(1),
REQUEST_BROKER_SIGNATURE: zod.string().min(1),
}),
]);
const SentryModel = zod.union([
zod.object({
SENTRY: emptyString(zod.literal('0').optional()),
@ -54,6 +65,8 @@ const configs = {
sentry: SentryModel.safeParse(process.env),
// eslint-disable-next-line no-process-env
prometheus: PrometheusModel.safeParse(process.env),
// eslint-disable-next-line no-process-env
requestBroker: RequestBrokerModel.safeParse(process.env),
};
const environmentErrors: Array<string> = [];
@ -81,6 +94,7 @@ const base = extractConfig(configs.base);
const redis = extractConfig(configs.redis);
const sentry = extractConfig(configs.sentry);
const prometheus = extractConfig(configs.prometheus);
const requestBroker = extractConfig(configs.requestBroker);
export const env = {
environment: base.ENVIRONMENT,
@ -103,4 +117,11 @@ export const env = {
},
}
: null,
requestBroker:
requestBroker.REQUEST_BROKER === '1'
? {
endpoint: requestBroker.REQUEST_BROKER_ENDPOINT,
signature: requestBroker.REQUEST_BROKER_SIGNATURE,
}
: null,
} as const;

View file

@ -44,6 +44,7 @@ async function main() {
webhookQueueName: 'webhook',
maxAttempts: 10,
backoffDelay: 2000,
requestBroker: env.requestBroker,
});
const stopHeartbeats = env.heartbeat

View file

@ -38,17 +38,40 @@ export function createWebhookJob({ config }: { config: Config }) {
job.attemptsMade + 1,
config.maxAttempts
);
await got.post(job.data.endpoint, {
headers: {
Accept: 'application/json',
'Accept-Encoding': 'gzip, deflate, br',
'Content-Type': 'application/json',
},
timeout: {
request: 10_000,
},
json: job.data.event,
});
if (config.requestBroker) {
await got.post(config.requestBroker.endpoint, {
headers: {
Accept: 'text/plain',
'x-hive-signature': config.requestBroker.signature,
},
timeout: {
request: 10_000,
},
json: {
url: job.data.endpoint,
method: 'POST',
headers: {
Accept: 'application/json',
'Accept-Encoding': 'gzip, deflate, br',
'Content-Type': 'application/json',
},
body: JSON.stringify(job.data.event),
},
});
} else {
await got.post(job.data.endpoint, {
headers: {
Accept: 'application/json',
'Accept-Encoding': 'gzip, deflate, br',
'Content-Type': 'application/json',
},
timeout: {
request: 10_000,
},
json: job.data.event,
});
}
} else {
config.logger.warn('Giving up on webhook (job=%s)', job.name);
}

View file

@ -12,6 +12,10 @@ export interface Config {
webhookQueueName: string;
maxAttempts: number;
backoffDelay: number;
requestBroker: null | {
endpoint: string;
signature: string;
};
}
export interface Context {

View file

@ -410,6 +410,31 @@ importers:
ioredis-mock: 7.4.0_ioredis@4.28.5
tslib: 2.4.0
packages/services/broker-worker:
specifiers:
'@cloudflare/workers-types': 3.4.0
'@types/service-worker-mock': 2.0.1
'@whatwg-node/fetch': 0.4.7
'@whatwg-node/server': 0.4.11
esbuild: 0.14.39
itty-router: 2.4.0
itty-router-extras: 0.4.2
nock: 13.2.4
toucan-js: 2.7.0
zod: 3.19.1
dependencies:
toucan-js: 2.7.0
zod: 3.19.1
devDependencies:
'@cloudflare/workers-types': 3.4.0
'@types/service-worker-mock': 2.0.1
'@whatwg-node/fetch': 0.4.7
'@whatwg-node/server': 0.4.11
esbuild: 0.14.39
itty-router: 2.4.0
itty-router-extras: 0.4.2
nock: 13.2.4
packages/services/cdn-worker:
specifiers:
'@cloudflare/workers-types': 3.4.0
@ -26572,7 +26597,6 @@ packages:
/zod/3.19.1:
resolution: {integrity: sha512-LYjZsEDhCdYET9ikFu6dVPGp2YH9DegXjdJToSzD9rO6fy4qiRYFoyEYwps88OseJlPyl2NOe2iJuhEhL7IpEA==}
dev: true
/zrender/5.3.1:
resolution: {integrity: sha512-7olqIjy0gWfznKr6vgfnGBk7y4UtdMvdwFmK92vVQsQeDPyzkHW1OlrLEKg6GHz1W5ePf0FeN1q2vkl/HFqhXw==}

View file

@ -45,13 +45,5 @@
}
},
"include": ["packages"],
"exclude": [
"**/node_modules/**",
"**/dist",
"**/temp",
"**/tmp",
"packages/web",
"packages/services/cdn-worker",
"packages/services/police-worker"
]
"exclude": ["**/node_modules/**", "**/dist", "**/temp", "**/tmp", "packages/web", "packages/services/*-worker"]
}

View file

@ -26,6 +26,14 @@
"dependsOn": ["^build"],
"outputs": ["dist/dev.js"]
},
"@hive/broker-script#build": {
"dependsOn": ["^build"],
"outputs": ["dist/worker.js"]
},
"@hive/broker-script#build-local": {
"dependsOn": ["^build"],
"outputs": ["dist/dev.js"]
},
"@hive/app#build": {
"dependsOn": ["^build"],
"outputs": ["dist/**"]