remove emails and webhooks service (#7458)

This commit is contained in:
Laurin Quast 2026-01-12 15:11:33 +01:00 committed by GitHub
parent ec77725ca1
commit ed3fea1423
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
51 changed files with 46 additions and 2722 deletions

View file

@ -31,6 +31,10 @@ services:
+ SENTRY_DSN: '${SENTRY_DSN:-}'
+ PROMETHEUS_METRICS: '${PROMETHEUS_METRICS:-}'
+ LOG_JSON: '1'
- emails:
- ...
- webhooks:
- ...
```
For different setups, we recommend using this as a reference.

View file

@ -66,10 +66,10 @@
"command": "pnpm dev"
},
{
"name": "emails:dev",
"description": "Run Emails Service",
"name": "workflows:dev",
"description": "Run Wrokflows Service",
"open": true,
"cwd": "packages/services/emails",
"cwd": "packages/services/workflows",
"command": "pnpm dev"
},
{

View file

@ -9,7 +9,6 @@ import { deployCommerce } from './services/commerce';
import { deployDatabaseCleanupJob } from './services/database-cleanup';
import { deployDbMigrations } from './services/db-migrations';
import { configureDocker } from './services/docker';
import { deployEmails } from './services/emails';
import { prepareEnvironment } from './services/environment';
import { configureGithubApp } from './services/github';
import { deployGraphQL } from './services/graphql';
@ -29,7 +28,6 @@ import { deploySuperTokens } from './services/supertokens';
import { deployTokens } from './services/tokens';
import { deployUsage } from './services/usage';
import { deployUsageIngestor } from './services/usage-ingestor';
import { deployWebhooks } from './services/webhooks';
import { deployWorkflows, PostmarkSecret } from './services/workflows';
import { configureZendesk } from './services/zendesk';
import { optimizeAzureCluster } from './utils/azure-helpers';
@ -138,27 +136,6 @@ const tokens = deployTokens({
observability,
});
const webhooks = deployWebhooks({
image: docker.factory.getImageId('webhooks', imagesTag),
environment,
heartbeat: heartbeatsConfig.get('webhooks'),
broker,
docker,
redis,
sentry,
observability,
});
const emails = deployEmails({
image: docker.factory.getImageId('emails', imagesTag),
docker,
environment,
redis,
postmarkSecret,
sentry,
observability,
});
deployWorkflows({
image: docker.factory.getImageId('workflows', imagesTag),
docker,
@ -178,7 +155,6 @@ const commerce = deployCommerce({
dbMigrations,
sentry,
observability,
emails,
postgres,
});
@ -237,7 +213,6 @@ const graphql = deployGraphQL({
image: docker.factory.getImageId('server', imagesTag),
docker,
tokens,
webhooks,
schema,
schemaPolicy,
dbMigrations,
@ -245,7 +220,6 @@ const graphql = deployGraphQL({
usage,
cdn,
commerce,
emails,
supertokens,
s3,
s3Mirror,
@ -368,7 +342,6 @@ export const usageApiServiceId = usage.service.id;
export const usageIngestorApiServiceId = usageIngestor.service.id;
export const tokensApiServiceId = tokens.service.id;
export const schemaApiServiceId = schema.service.id;
export const webhooksApiServiceId = webhooks.service.id;
export const appId = app.deployment.id;
export const otelCollectorId = otelCollector.deployment.id;

View file

@ -1,11 +1,9 @@
import * as pulumi from '@pulumi/pulumi';
import { serviceLocalEndpoint } from '../utils/local-endpoint';
import { ServiceSecret } from '../utils/secrets';
import { ServiceDeployment } from '../utils/service-deployment';
import { Clickhouse } from './clickhouse';
import { DbMigrations } from './db-migrations';
import { Docker } from './docker';
import { Emails } from './emails';
import { Environment } from './environment';
import { Observability } from './observability';
import { Postgres } from './postgres';
@ -22,7 +20,6 @@ export function deployCommerce({
observability,
environment,
dbMigrations,
emails,
image,
docker,
postgres,
@ -34,7 +31,6 @@ export function deployCommerce({
environment: Environment;
dbMigrations: DbMigrations;
docker: Docker;
emails: Emails;
postgres: Postgres;
clickhouse: Clickhouse;
sentry: Sentry;

View file

@ -1,66 +0,0 @@
import * as pulumi from '@pulumi/pulumi';
import { serviceLocalEndpoint } from '../utils/local-endpoint';
import { ServiceSecret } from '../utils/secrets';
import { ServiceDeployment } from '../utils/service-deployment';
import { Docker } from './docker';
import { Environment } from './environment';
import { Observability } from './observability';
import { Redis } from './redis';
import { Sentry } from './sentry';
import { PostmarkSecret } from './workflows';
export type Emails = ReturnType<typeof deployEmails>;
export function deployEmails({
environment,
redis,
heartbeat,
image,
docker,
sentry,
observability,
postmarkSecret,
}: {
observability: Observability;
environment: Environment;
image: string;
redis: Redis;
docker: Docker;
postmarkSecret: PostmarkSecret;
heartbeat?: string;
sentry: Sentry;
}) {
const { deployment, service } = new ServiceDeployment(
'emails-service',
{
imagePullSecret: docker.secret,
env: {
...environment.envVars,
SENTRY: sentry.enabled ? '1' : '0',
EMAIL_PROVIDER: 'postmark',
HEARTBEAT_ENDPOINT: heartbeat ?? '',
OPENTELEMETRY_COLLECTOR_ENDPOINT:
observability.enabled && observability.tracingEndpoint
? observability.tracingEndpoint
: '',
},
readinessProbe: '/_readiness',
livenessProbe: '/_health',
startupProbe: '/_health',
exposesMetrics: true,
image,
replicas: environment.podsConfig.general.replicas,
},
[redis.deployment, redis.service],
)
.withSecret('REDIS_HOST', redis.secret, 'host')
.withSecret('REDIS_PORT', redis.secret, 'port')
.withSecret('REDIS_PASSWORD', redis.secret, 'password')
.withSecret('EMAIL_FROM', postmarkSecret, 'from')
.withSecret('EMAIL_PROVIDER_POSTMARK_TOKEN', postmarkSecret, 'token')
.withSecret('EMAIL_PROVIDER_POSTMARK_MESSAGE_STREAM', postmarkSecret, 'messageStream')
.withConditionalSecret(sentry.enabled, 'SENTRY_DSN', sentry.secret, 'dsn')
.deploy();
return { deployment, service, localEndpoint: serviceLocalEndpoint(service) };
}

View file

@ -7,7 +7,6 @@ import { Clickhouse } from './clickhouse';
import { CommerceService } from './commerce';
import { DbMigrations } from './db-migrations';
import { Docker } from './docker';
import { Emails } from './emails';
import { Environment } from './environment';
import { GitHubApp } from './github';
import { Observability } from './observability';
@ -20,7 +19,6 @@ import { Sentry } from './sentry';
import { Supertokens } from './supertokens';
import { Tokens } from './tokens';
import { Usage } from './usage';
import { Webhooks } from './webhooks';
import { Zendesk } from './zendesk';
export type GraphQL = ReturnType<typeof deployGraphQL>;
@ -35,7 +33,6 @@ export function deployGraphQL({
image,
environment,
tokens,
webhooks,
schema,
schemaPolicy,
cdn,
@ -43,7 +40,6 @@ export function deployGraphQL({
usage,
commerce,
dbMigrations,
emails,
supertokens,
s3,
s3Mirror,
@ -62,7 +58,6 @@ export function deployGraphQL({
clickhouse: Clickhouse;
environment: Environment;
tokens: Tokens;
webhooks: Webhooks;
schema: Schema;
schemaPolicy: SchemaPolicy;
redis: Redis;
@ -73,7 +68,6 @@ export function deployGraphQL({
usage: Usage;
dbMigrations: DbMigrations;
commerce: CommerceService;
emails: Emails;
supertokens: Supertokens;
zendesk: Zendesk;
docker: Docker;

View file

@ -1,60 +0,0 @@
import { ServiceDeployment } from '../utils/service-deployment';
import type { Broker } from './cf-broker';
import { Docker } from './docker';
import { Environment } from './environment';
import { Observability } from './observability';
import { Redis } from './redis';
import { Sentry } from './sentry';
export type Webhooks = ReturnType<typeof deployWebhooks>;
export function deployWebhooks({
environment,
heartbeat,
broker,
image,
docker,
redis,
sentry,
observability,
}: {
observability: Observability;
image: string;
environment: Environment;
heartbeat?: string;
docker: Docker;
broker: Broker;
redis: Redis;
sentry: Sentry;
}) {
return new ServiceDeployment(
'webhooks-service',
{
imagePullSecret: docker.secret,
env: {
...environment.envVars,
SENTRY: sentry.enabled ? '1' : '0',
HEARTBEAT_ENDPOINT: heartbeat ?? '',
REQUEST_BROKER: '1',
OPENTELEMETRY_COLLECTOR_ENDPOINT:
observability.enabled && observability.tracingEndpoint
? observability.tracingEndpoint
: '',
},
readinessProbe: '/_readiness',
livenessProbe: '/_health',
startupProbe: '/_health',
exposesMetrics: true,
replicas: environment.podsConfig.general.replicas,
image,
},
[redis.deployment, redis.service],
)
.withSecret('REDIS_HOST', redis.secret, 'host')
.withSecret('REDIS_PORT', redis.secret, 'port')
.withSecret('REDIS_PASSWORD', redis.secret, 'password')
.withSecret('REQUEST_BROKER_ENDPOINT', broker.secret, 'baseUrl')
.withSecret('REQUEST_BROKER_SIGNATURE', broker.secret, 'secretSignature')
.withConditionalSecret(sentry.enabled, 'SENTRY_DSN', sentry.secret, 'dsn')
.deploy();
}

View file

@ -192,10 +192,6 @@ services:
condition: service_completed_successfully
tokens:
condition: service_healthy
webhooks:
condition: service_healthy
emails:
condition: service_healthy
schema:
condition: service_healthy
policy:
@ -304,45 +300,6 @@ services:
SENTRY_DSN: '${SENTRY_DSN:-}'
PROMETHEUS_METRICS: '${PROMETHEUS_METRICS:-}'
webhooks:
image: '${DOCKER_REGISTRY}webhooks${DOCKER_TAG}'
networks:
- 'stack'
depends_on:
redis:
condition: service_healthy
environment:
NODE_ENV: production
PORT: 3005
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_PASSWORD: '${REDIS_PASSWORD}'
LOG_LEVEL: '${LOG_LEVEL:-debug}'
OPENTELEMETRY_COLLECTOR_ENDPOINT: '${OPENTELEMETRY_COLLECTOR_ENDPOINT:-}'
SENTRY: '${SENTRY:-0}'
SENTRY_DSN: '${SENTRY_DSN:-}'
PROMETHEUS_METRICS: '${PROMETHEUS_METRICS:-}'
emails:
image: '${DOCKER_REGISTRY}emails${DOCKER_TAG}'
networks:
- 'stack'
depends_on:
redis:
condition: service_healthy
environment:
NODE_ENV: production
PORT: 3011
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_PASSWORD: '${REDIS_PASSWORD}'
EMAIL_FROM: no-reply@graphql-hive.com
EMAIL_PROVIDER: sendmail
LOG_LEVEL: '${LOG_LEVEL:-debug}'
SENTRY: '${SENTRY:-0}'
SENTRY_DSN: '${SENTRY_DSN:-}'
PROMETHEUS_METRICS: '${PROMETHEUS_METRICS:-}'
workflows:
image: '${DOCKER_REGISTRY}workflows${DOCKER_TAG}'
networks:

View file

@ -109,27 +109,6 @@ target "target-publish" {
cache-to = ["type=gha,mode=max,ignore-error=true"]
}
target "emails" {
inherits = ["service-base", get_target()]
contexts = {
dist = "${PWD}/packages/services/emails/dist"
shared = "${PWD}/docker/shared"
}
args = {
SERVICE_DIR_NAME = "@hive/emails"
IMAGE_TITLE = "graphql-hive/emails"
IMAGE_DESCRIPTION = "The emails service of the GraphQL Hive project."
PORT = "3006"
HEALTHCHECK_CMD = "wget --spider -q http://127.0.0.1:$${PORT}/_readiness"
}
tags = [
local_image_tag("emails"),
stable_image_tag("emails"),
image_tag("emails", COMMIT_SHA),
image_tag("emails", BRANCH_NAME)
]
}
target "schema" {
inherits = ["service-base", get_target()]
contexts = {
@ -295,27 +274,6 @@ target "usage" {
]
}
target "webhooks" {
inherits = ["service-base", get_target()]
contexts = {
dist = "${PWD}/packages/services/webhooks/dist"
shared = "${PWD}/docker/shared"
}
args = {
SERVICE_DIR_NAME = "@hive/webhooks"
IMAGE_TITLE = "graphql-hive/webhooks"
IMAGE_DESCRIPTION = "The webhooks ingestor service of the GraphQL Hive project."
PORT = "3005"
HEALTHCHECK_CMD = "wget --spider -q http://127.0.0.1:$${PORT}/_readiness"
}
tags = [
local_image_tag("webhooks"),
stable_image_tag("webhooks"),
image_tag("webhooks", COMMIT_SHA),
image_tag("webhooks", BRANCH_NAME)
]
}
target "workflows" {
inherits = ["service-base", get_target()]
contexts = {
@ -434,14 +392,12 @@ target "cli" {
group "build" {
targets = [
"emails",
"schema",
"policy",
"storage",
"tokens",
"usage-ingestor",
"usage",
"webhooks",
"server",
"commerce",
"composition-federation-2",
@ -454,14 +410,12 @@ group "build" {
group "integration-tests" {
targets = [
"commerce",
"emails",
"schema",
"policy",
"storage",
"tokens",
"usage-ingestor",
"usage",
"webhooks",
"server",
"composition-federation-2",
"workflows",

View file

@ -122,8 +122,6 @@ services:
condition: service_healthy
migrations:
condition: service_completed_successfully
emails:
condition: service_healthy
environment:
NODE_ENV: production
LOG_LEVEL: debug
@ -238,13 +236,6 @@ services:
tokens:
condition: service_healthy
emails:
environment:
EMAIL_PROVIDER: '${EMAIL_PROVIDER}'
LOG_LEVEL: debug
ports:
- '3011:3011'
workflows:
environment:
EMAIL_PROVIDER: '${EMAIL_PROVIDER}'

View file

@ -1,6 +0,0 @@
REDIS_HOST="localhost"
REDIS_PORT="6379"
REDIS_PASSWORD=""
EMAIL_PROVIDER="mock"
EMAIL_FROM="mock@graphql-hive.com"
OPENTELEMETRY_COLLECTOR_ENDPOINT="<sync>"

View file

@ -1,4 +0,0 @@
*.log
.DS_Store
node_modules
dist

View file

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2022 The Guild
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -1,34 +0,0 @@
# Emails
Service for sending Hive Emails.
## Configuration
## Configuration
| Name | Required | Description | Example Value |
| ---------------------------------------- | ----------------------------------------------------- | -------------------------------------------------------------------------------------------------------- | ---------------------------------------------------- |
| `PORT` | No | The port this service is running on. | `6260` |
| `REDIS_HOST` | **Yes** | The host of your redis instance. | `"127.0.0.1"` |
| `REDIS_PORT` | **Yes** | The port of your redis instance. | `6379` |
| `REDIS_PASSWORD` | **Yes** | The password of your redis instance. | `"apollorocks"` |
| `REDIS_TLS_ENABLED` | **No** | Enable TLS for redis connection (rediss://). | `"0"` |
| `EMAIL_FROM` | **Yes** | The email address used for sending emails | `kamil@graphql-hive.com` |
| `EMAIL_PROVIDER` | **Yes** | The email provider that should be used for sending emails. | `smtp` or `postmark` or `mock` |
| `EMAIL_PROVIDER_SMTP_PROTOCOL` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The protocol used for the smtp server | `smtp` or `smtps` |
| `EMAIL_PROVIDER_SMTP_HOST` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The host of the smtp server | `127.0.0.1` |
| `EMAIL_PROVIDER_SMTP_PORT` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The port of the smtp server | `25` |
| `EMAIL_PROVIDER_SMTP_AUTH_USERNAME` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The username for the smtp server. | `letmein` |
| `EMAIL_PROVIDER_SMTP_AUTH_PASSWORD` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The password for the smtp server. | `letmein` |
| `EMAIL_PROVIDER_POSTMARK_TOKEN` | No (**Yes** if `EMAIL_PROVIDER` is set to `postmark`) | The postmark token. | `abcdefg123` |
| `EMAIL_PROVIDER_POSTMARK_MESSAGE_STREAM` | No (**Yes** if `EMAIL_PROVIDER` is set to `postmark`) | The postmark message stream. | `abcdefg123` |
| `ENVIRONMENT` | No | The environment of your Hive app. (**Note:** This will be used for Sentry reporting.) | `staging` |
| `HEARTBEAT_ENDPOINT` | No | The endpoint for a heartbeat. | `http://127.0.0.1:6969/heartbeat` |
| `SENTRY` | No | Whether Sentry error reporting should be enabled. | `1` (enabled) or `0` (disabled) |
| `SENTRY_DSN` | No | The DSN for reporting errors to Sentry. | `https://dooobars@o557896.ingest.sentry.io/12121212` |
| `PROMETHEUS_METRICS` | No | Whether Prometheus metrics should be enabled | `1` (enabled) or `0` (disabled) |
| `PROMETHEUS_METRICS_LABEL_INSTANCE` | No | The instance label added for the prometheus metrics. | `emails` |
| `PROMETHEUS_METRICS_PORT` | No | Port on which prometheus metrics are exposed | Defaults to `10254` |
| `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) |
| `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) |
| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` |

View file

@ -1,31 +0,0 @@
{
"name": "@hive/emails",
"type": "module",
"license": "MIT",
"private": true,
"scripts": {
"build": "tsx ../../../scripts/runify.ts",
"dev": "tsup-node --config ../../../configs/tsup/dev.config.node.ts src/dev.ts",
"postbuild": "copyfiles -f \"node_modules/bullmq/dist/esm/commands/*.lua\" dist && copyfiles -f \"node_modules/bullmq/dist/esm/commands/includes/*.lua\" dist/includes",
"typecheck": "tsc --noEmit"
},
"devDependencies": {
"@hive/service-common": "workspace:*",
"@sentry/node": "7.120.2",
"@trpc/server": "10.45.3",
"@types/mjml": "4.7.1",
"@types/nodemailer": "7.0.4",
"@types/sendmail": "1.4.7",
"bullmq": "5.34.8",
"copyfiles": "2.4.1",
"dotenv": "16.4.7",
"ioredis": "5.8.2",
"mjml": "4.14.0",
"nodemailer": "7.0.11",
"p-timeout": "6.1.4",
"pino-pretty": "11.3.0",
"sendmail": "1.6.1",
"tslib": "2.8.1",
"zod": "3.25.76"
}
}

View file

@ -1,257 +0,0 @@
import { createHash } from 'node:crypto';
import { z } from 'zod';
import { handleTRPCError } from '@hive/service-common';
import type { inferRouterInputs } from '@trpc/server';
import { initTRPC } from '@trpc/server';
import type { Context } from './context';
import { renderAuditLogsReportEmail } from './templates/audit-logs-report';
import { renderEmailVerificationEmail } from './templates/email-verification';
import { renderOrganizationInvitation } from './templates/organization-invitation';
import { renderOrganizationOwnershipTransferEmail } from './templates/organization-ownership-transfer';
import { renderPasswordResetEmail } from './templates/password-reset';
import { renderRateLimitExceededEmail } from './templates/rate-limit-exceeded';
import { renderRateLimitWarningEmail } from './templates/rate-limit-warning';
const t = initTRPC.context<Context>().create();
const procedure = t.procedure.use(handleTRPCError);
export const emailsApiRouter = t.router({
sendAuditLogsReportEmail: procedure
.input(
z.object({
organizationId: z.string(),
organizationName: z.string(),
formattedStartDate: z.string(),
formattedEndDate: z.string(),
url: z.string(),
email: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
try {
const job = await ctx.schedule({
email: input.email,
subject: 'Hive - Audit Log Report',
body: renderAuditLogsReportEmail({
url: input.url,
organizationName: input.organizationName,
formattedStartDate: input.formattedStartDate,
formattedEndDate: input.formattedEndDate,
}),
});
return { job: job.id ?? 'unknown' };
} catch (error) {
ctx.errorHandler('Failed to schedule an email', error as Error);
throw error;
}
}),
sendOrganizationOwnershipTransferEmail: procedure
.input(
z.object({
organizationId: z.string(),
organizationName: z.string(),
authorName: z.string(),
email: z.string(),
link: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
try {
const job = await ctx.schedule({
email: input.email,
subject: `Organization transfer from ${input.authorName} (${input.organizationName})`,
body: renderOrganizationOwnershipTransferEmail({
link: input.link,
organizationName: input.organizationName,
authorName: input.authorName,
}),
});
return { job: job.id ?? 'unknown' };
} catch (error) {
ctx.errorHandler('Failed to schedule an email', error as Error);
throw error;
}
}),
sendOrganizationInviteEmail: procedure
.input(
z.object({
organizationId: z.string(),
organizationName: z.string(),
code: z.string(),
email: z.string(),
link: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
try {
const subject = `You have been invited to join ${input.organizationName}`;
const job = await ctx.schedule({
id: JSON.stringify({
id: 'org-invitation',
organization: input.organizationId,
code: createHash('sha256').update(input.code).digest('hex'),
email: createHash('sha256').update(input.email).digest('hex'),
}),
email: input.email,
subject,
body: renderOrganizationInvitation({
link: input.link,
organizationName: input.organizationName,
}),
});
return { job: job.id ?? 'unknown' };
} catch (error) {
ctx.errorHandler('Failed to schedule an email', error as Error);
throw error;
}
}),
sendEmailVerificationEmail: procedure
.input(
z.object({
user: z.object({
email: z.string(),
id: z.string(),
}),
emailVerifyLink: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
try {
const subject = 'Verify your email';
const job = await ctx.schedule({
id: `email-verification-${input.user.id}-${new Date().getTime()}`,
email: input.user.email,
subject,
body: renderEmailVerificationEmail({
subject,
verificationLink: input.emailVerifyLink,
toEmail: input.user.email,
}),
});
return { job: job.id ?? 'unknown' };
} catch (error) {
ctx.errorHandler('Failed to schedule an email', error as Error);
throw error;
}
}),
sendPasswordResetEmail: procedure
.input(
z.object({
user: z.object({
email: z.string(),
id: z.string(),
}),
passwordResetLink: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
try {
const subject = 'Reset your password.';
const job = await ctx.schedule({
id: `password-reset-${input.user.id}-${new Date().getTime()}`,
email: input.user.email,
subject,
body: renderPasswordResetEmail({
subject,
passwordResetLink: input.passwordResetLink,
toEmail: input.user.email,
}),
});
return { job: job.id ?? 'unknown' };
} catch (error) {
ctx.errorHandler('Failed to schedule an email', error as Error);
throw error;
}
}),
sendRateLimitExceededEmail: procedure
.input(
z.object({
organizationId: z.string(),
organizationName: z.string(),
limit: z.number(),
currentUsage: z.number(),
startDate: z.number(),
endDate: z.number(),
subscriptionManagementLink: z.string(),
email: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
try {
const job = await ctx.schedule({
// If the jobId would include only the period and org id, then we would be able to notify the user once per month.
// There's a chance that an organization will increase the limit and we might need to notify them again.
id: JSON.stringify({
id: 'rate-limit-exceeded',
organization: input.organizationId,
period: {
start: input.startDate,
end: input.endDate,
},
limit: input.limit,
}),
email: input.email,
subject: `GraphQL-Hive operations quota for ${input.organizationName} exceeded`,
body: renderRateLimitExceededEmail({
organizationName: input.organizationName,
limit: input.limit,
currentUsage: input.currentUsage,
subscriptionManagementLink: input.subscriptionManagementLink,
}),
});
return { job: job.id ?? 'unknown' };
} catch (error) {
ctx.errorHandler('Failed to schedule an email', error as Error);
throw error;
}
}),
sendRateLimitWarningEmail: procedure
.input(
z.object({
organizationId: z.string(),
organizationName: z.string(),
limit: z.number(),
currentUsage: z.number(),
startDate: z.number(),
endDate: z.number(),
subscriptionManagementLink: z.string(),
email: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
try {
const job = await ctx.schedule({
// If the jobId would include only the period and org id, then we would be able to notify the user once per month.
// There's a chance that an organization will increase the limit and we might need to notify them again.
id: JSON.stringify({
id: 'rate-limit-warning',
organization: input.organizationId,
period: {
start: input.startDate,
end: input.endDate,
},
limit: input.limit,
}),
email: input.email,
subject: `${input.organizationName} is approaching its rate limit`,
body: renderRateLimitWarningEmail({
organizationName: input.organizationName,
limit: input.limit,
currentUsage: input.currentUsage,
subscriptionManagementLink: input.subscriptionManagementLink,
}),
});
return { job: job.id ?? 'unknown' };
} catch (error) {
ctx.errorHandler('Failed to schedule an email', error as Error);
throw error;
}
}),
});
export type EmailsApi = typeof emailsApiRouter;
export type EmailsApiInput = inferRouterInputs<EmailsApi>;

View file

@ -1,9 +0,0 @@
import type { Job } from 'bullmq';
import type { FastifyRequest } from '@hive/service-common';
import type { EmailInput } from './shapes';
export type Context = {
req: FastifyRequest;
errorHandler(message: string, error: Error): void;
schedule(input: EmailInput): Promise<Job<any, any, string>>;
};

View file

@ -1,8 +0,0 @@
import { config } from 'dotenv';
config({
debug: true,
encoding: 'utf8',
});
await import('./index');

View file

@ -1,217 +0,0 @@
import zod from 'zod';
import { OpenTelemetryConfigurationModel } from '@hive/service-common';
const isNumberString = (input: unknown) => zod.string().regex(/^\d+$/).safeParse(input).success;
const numberFromNumberOrNumberString = (input: unknown): number | undefined => {
if (typeof input == 'number') return input;
if (isNumberString(input)) return Number(input);
};
const NumberFromString = zod.preprocess(numberFromNumberOrNumberString, zod.number().min(1));
// treat an empty string (`''`) as undefined
const emptyString = <T extends zod.ZodType>(input: T) => {
return zod.preprocess((value: unknown) => {
if (value === '') return undefined;
return value;
}, input);
};
const EnvironmentModel = zod.object({
PORT: emptyString(NumberFromString.optional()),
ENVIRONMENT: emptyString(zod.string().optional()),
RELEASE: emptyString(zod.string().optional()),
HEARTBEAT_ENDPOINT: emptyString(zod.string().url().optional()),
EMAIL_FROM: zod.string().email(),
});
const SentryModel = zod.union([
zod.object({
SENTRY: emptyString(zod.literal('0').optional()),
}),
zod.object({
SENTRY: zod.literal('1'),
SENTRY_DSN: zod.string(),
}),
]);
const RedisModel = zod.object({
REDIS_HOST: zod.string(),
REDIS_PORT: NumberFromString,
REDIS_PASSWORD: emptyString(zod.string().optional()),
REDIS_TLS_ENABLED: emptyString(zod.union([zod.literal('1'), zod.literal('0')]).optional()),
});
const PostmarkEmailModel = zod.object({
EMAIL_PROVIDER: zod.literal('postmark'),
EMAIL_PROVIDER_POSTMARK_TOKEN: zod.string(),
EMAIL_PROVIDER_POSTMARK_MESSAGE_STREAM: zod.string(),
});
const SMTPEmailModel = zod.object({
EMAIL_PROVIDER: zod.literal('smtp'),
EMAIL_PROVIDER_SMTP_PROTOCOL: emptyString(
zod.union([zod.literal('smtp'), zod.literal('smtps')]).optional(),
),
EMAIL_PROVIDER_SMTP_HOST: zod.string(),
EMAIL_PROVIDER_SMTP_PORT: NumberFromString,
EMAIL_PROVIDER_SMTP_AUTH_USERNAME: zod.string(),
EMAIL_PROVIDER_SMTP_AUTH_PASSWORD: zod.string(),
EMAIL_PROVIDER_SMTP_REJECT_UNAUTHORIZED: emptyString(
zod.union([zod.literal('0'), zod.literal('1')]).optional(),
),
});
const SendmailEmailModel = zod.object({
EMAIL_PROVIDER: zod.literal('sendmail'),
});
const MockEmailProviderModel = zod.object({
EMAIL_PROVIDER: zod.literal('mock'),
});
const EmailProviderModel = zod.union([
PostmarkEmailModel,
MockEmailProviderModel,
SMTPEmailModel,
SendmailEmailModel,
]);
const PrometheusModel = zod.object({
PROMETHEUS_METRICS: emptyString(zod.union([zod.literal('0'), zod.literal('1')]).optional()),
PROMETHEUS_METRICS_LABEL_INSTANCE: emptyString(zod.string().optional()),
PROMETHEUS_METRICS_PORT: emptyString(NumberFromString.optional()),
});
const LogModel = zod.object({
LOG_LEVEL: emptyString(
zod
.union([
zod.literal('trace'),
zod.literal('debug'),
zod.literal('info'),
zod.literal('warn'),
zod.literal('error'),
zod.literal('fatal'),
zod.literal('silent'),
])
.optional(),
),
REQUEST_LOGGING: emptyString(zod.union([zod.literal('0'), zod.literal('1')]).optional()).default(
'1',
),
});
const configs = {
base: EnvironmentModel.safeParse(process.env),
email: EmailProviderModel.safeParse(process.env),
sentry: SentryModel.safeParse(process.env),
redis: RedisModel.safeParse(process.env),
prometheus: PrometheusModel.safeParse(process.env),
log: LogModel.safeParse(process.env),
tracing: OpenTelemetryConfigurationModel.safeParse(process.env),
};
const environmentErrors: Array<string> = [];
for (const config of Object.values(configs)) {
if (config.success === false) {
environmentErrors.push(JSON.stringify(config.error.format(), null, 4));
}
}
if (environmentErrors.length) {
const fullError = environmentErrors.join(`\n`);
console.error('❌ Invalid environment variables:', fullError);
process.exit(1);
}
function extractConfig<Input, Output>(config: zod.SafeParseReturnType<Input, Output>): Output {
if (!config.success) {
throw new Error('Something went wrong.');
}
return config.data;
}
const base = extractConfig(configs.base);
const email = extractConfig(configs.email);
const redis = extractConfig(configs.redis);
const sentry = extractConfig(configs.sentry);
const prometheus = extractConfig(configs.prometheus);
const log = extractConfig(configs.log);
const tracing = extractConfig(configs.tracing);
const emailProviderConfig =
email.EMAIL_PROVIDER === 'postmark'
? ({
provider: 'postmark' as const,
token: email.EMAIL_PROVIDER_POSTMARK_TOKEN,
messageStream: email.EMAIL_PROVIDER_POSTMARK_MESSAGE_STREAM,
} as const)
: email.EMAIL_PROVIDER === 'smtp'
? ({
provider: 'smtp' as const,
protocol: email.EMAIL_PROVIDER_SMTP_PROTOCOL ?? 'smtp',
host: email.EMAIL_PROVIDER_SMTP_HOST,
port: email.EMAIL_PROVIDER_SMTP_PORT,
auth: {
user: email.EMAIL_PROVIDER_SMTP_AUTH_USERNAME,
pass: email.EMAIL_PROVIDER_SMTP_AUTH_PASSWORD,
},
tls: {
rejectUnauthorized: email.EMAIL_PROVIDER_SMTP_REJECT_UNAUTHORIZED !== '0',
},
} as const)
: email.EMAIL_PROVIDER === 'sendmail'
? ({ provider: 'sendmail' } as const)
: ({ provider: 'mock' } as const);
export type EmailProviderConfig = typeof emailProviderConfig;
export type PostmarkEmailProviderConfig = Extract<EmailProviderConfig, { provider: 'postmark' }>;
export type SMTPEmailProviderConfig = Extract<EmailProviderConfig, { provider: 'smtp' }>;
export type SendmailEmailProviderConfig = Extract<EmailProviderConfig, { provider: 'sendmail' }>;
export type MockEmailProviderConfig = Extract<EmailProviderConfig, { provider: 'mock' }>;
export const env = {
environment: base.ENVIRONMENT,
release: base.RELEASE ?? 'local',
http: {
port: base.PORT ?? 6260,
},
tracing: {
enabled: !!tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT,
collectorEndpoint: tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT,
},
redis: {
host: redis.REDIS_HOST,
port: redis.REDIS_PORT,
password: redis.REDIS_PASSWORD ?? '',
tlsEnabled: redis.REDIS_TLS_ENABLED === '1',
},
email: {
provider: emailProviderConfig,
emailFrom: base.EMAIL_FROM,
},
heartbeat: base.HEARTBEAT_ENDPOINT ? { endpoint: base.HEARTBEAT_ENDPOINT } : null,
sentry: sentry.SENTRY === '1' ? { dsn: sentry.SENTRY_DSN } : null,
log: {
level: log.LOG_LEVEL ?? 'info',
requests: log.REQUEST_LOGGING === '1',
},
prometheus:
prometheus.PROMETHEUS_METRICS === '1'
? {
labels: {
instance: prometheus.PROMETHEUS_METRICS_LABEL_INSTANCE ?? 'emails',
},
port: prometheus.PROMETHEUS_METRICS_PORT ?? 10_254,
}
: null,
} as const;

View file

@ -1,158 +0,0 @@
#!/usr/bin/env node
import { hostname } from 'os';
import {
configureTracing,
createErrorHandler,
createServer,
registerShutdown,
registerTRPC,
reportReadiness,
startHeartbeats,
startMetrics,
TracingInstance,
} from '@hive/service-common';
import * as Sentry from '@sentry/node';
import { emailsApiRouter } from './api';
import type { Context } from './context';
import { env } from './environment';
import { createEmailProvider } from './providers';
import { createScheduler } from './scheduler';
async function main() {
let tracing: TracingInstance | undefined;
if (env.tracing.enabled && env.tracing.collectorEndpoint) {
tracing = configureTracing({
collectorEndpoint: env.tracing.collectorEndpoint,
serviceName: 'emails',
});
tracing.instrumentNodeFetch();
tracing.setup();
}
if (env.sentry) {
Sentry.init({
dist: 'emails',
serverName: hostname(),
enabled: !!env.sentry,
environment: env.environment,
dsn: env.sentry.dsn,
release: env.release,
});
}
const server = await createServer({
name: 'emails',
sentryErrorHandler: true,
log: {
level: env.log.level,
requests: env.log.requests,
},
});
if (tracing) {
await server.register(...tracing.instrumentFastify());
}
const errorHandler = createErrorHandler(server);
try {
const emailProvider = createEmailProvider(env.email.provider, env.email.emailFrom);
const { schedule, readiness, start, stop } = createScheduler({
logger: server.log,
redis: {
host: env.redis.host,
port: env.redis.port,
password: env.redis.password,
tlsEnabled: env.redis.tlsEnabled,
},
queueName: 'emails',
emailProvider,
});
const stopHeartbeats = env.heartbeat
? startHeartbeats({
enabled: true,
endpoint: env.heartbeat.endpoint,
intervalInMS: 20_000,
onError: e => server.log.error(e, `Heartbeat failed with error`),
isReady: readiness,
})
: startHeartbeats({ enabled: false });
registerShutdown({
logger: server.log,
async onShutdown() {
stopHeartbeats();
await stop();
},
});
await registerTRPC(server, {
router: emailsApiRouter,
createContext({ req }): Context {
return {
req,
errorHandler(message: string, error: Error) {
return errorHandler(message, error, req.log);
},
schedule,
};
},
});
server.route({
method: ['GET', 'HEAD'],
url: '/_health',
handler(req, res) {
void res.status(200).send();
},
});
server.route({
method: ['GET', 'HEAD'],
url: '/_readiness',
handler(_, res) {
const isReady = readiness();
reportReadiness(isReady);
void res.status(isReady ? 200 : 400).send();
},
});
if (emailProvider.id === 'mock') {
server.route({
method: ['GET'],
url: '/_history',
handler(_, res) {
void res.status(200).send(emailProvider.history);
},
});
}
await server.listen({
port: env.http.port,
host: '::',
});
if (env.prometheus) {
await startMetrics(env.prometheus.labels.instance, env.prometheus.port);
}
await start();
} catch (error) {
server.log.fatal(error);
Sentry.captureException(error, {
level: 'fatal',
});
process.exit(1);
}
}
main().catch(err => {
Sentry.captureException(err, {
level: 'fatal',
});
console.error(err);
process.exit(1);
});

View file

@ -1,11 +0,0 @@
import { metrics } from '@hive/service-common';
export const emailsTotal = new metrics.Counter({
name: 'emails_total',
help: 'Number of sent emails',
});
export const emailsFailuresTotal = new metrics.Counter({
name: 'emails_failures_total',
help: 'Number of failures',
});

View file

@ -1,102 +0,0 @@
export type MJMLValue = {
readonly kind: 'mjml';
readonly content: string;
};
type RawValue = {
readonly kind: 'raw';
readonly content: string;
};
type SpecialValues = RawValue;
type ValueExpression = string | SpecialValues | MJMLValue;
export function mjml(parts: TemplateStringsArray, ...values: ValueExpression[]): MJMLValue {
let content = '';
let index = 0;
for (const part of parts) {
const token = values[index++];
content += part;
if (index >= parts.length) {
continue;
}
if (token === undefined) {
throw new Error('MJML tag cannot be bound an undefined value.');
} else if (isRawValue(token)) {
content += token.content;
} else if (typeof token === 'string') {
content += escapeHtml(token);
} else if (token.kind === 'mjml') {
content += token.content;
} else {
throw new TypeError('mjml: Unexpected value expression.');
}
}
return {
kind: 'mjml',
content: content,
};
}
mjml.raw = (content: string): RawValue => ({
kind: 'raw',
content,
});
/**
* @source https://github.com/component/escape-html
*/
function escapeHtml(input: string): string {
const matchHtmlRegExp = /["'<>]/;
const match = matchHtmlRegExp.exec(input);
if (!match) {
return input;
}
let escapeSequence;
let html = '';
let index = 0;
let lastIndex = 0;
for (index = match.index; index < input.length; index++) {
switch (input.charCodeAt(index)) {
case 34: // "
escapeSequence = '&quot;';
break;
case 39: // '
escapeSequence = '&#39;';
break;
case 60: // <
escapeSequence = '&lt;';
break;
case 62: // >
escapeSequence = '&gt;';
break;
default:
continue;
}
if (lastIndex !== index) {
html += input.substring(lastIndex, index);
}
lastIndex = index + 1;
html += escapeSequence;
}
return lastIndex !== index ? html + input.substring(lastIndex, index) : html;
}
function isOfKind<T extends SpecialValues>(value: unknown, kind: T['kind']): value is T {
return !!value && typeof value === 'object' && 'kind' in value && value.kind === kind;
}
function isRawValue(value: unknown): value is RawValue {
return isOfKind<RawValue>(value, 'raw');
}

View file

@ -1,138 +0,0 @@
import nodemailer from 'nodemailer';
import sm from 'sendmail';
import type {
EmailProviderConfig,
MockEmailProviderConfig,
PostmarkEmailProviderConfig,
SendmailEmailProviderConfig,
SMTPEmailProviderConfig,
} from './environment';
interface Email {
to: string;
subject: string;
body: string;
}
const emailProviders = {
postmark,
mock,
smtp,
sendmail,
};
export interface EmailProvider {
id: keyof typeof emailProviders;
send(email: Email): Promise<void>;
history: Email[];
}
export function createEmailProvider(config: EmailProviderConfig, emailFrom: string): EmailProvider {
switch (config.provider) {
case 'mock':
return mock(config, emailFrom);
case 'postmark':
return postmark(config, emailFrom);
case 'smtp':
return smtp(config, emailFrom);
case 'sendmail':
return sendmail(config, emailFrom);
}
}
function postmark(config: PostmarkEmailProviderConfig, emailFrom: string) {
return {
id: 'postmark' as const,
async send(email: Email) {
const response = await fetch('https://api.postmarkapp.com/email', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'application/json',
'X-Postmark-Server-Token': config.token,
},
body: JSON.stringify({
From: emailFrom,
To: email.to,
Subject: email.subject,
HtmlBody: email.body,
MessageStream: config.messageStream,
}),
});
if (!response.ok) {
const details: any = await response.json();
throw new Error(details.Message ?? response.statusText);
}
},
history: [],
};
}
function mock(_config: MockEmailProviderConfig, _emailFrom: string): EmailProvider {
const history: Email[] = [];
return {
id: 'mock' as const,
async send(email: Email) {
history.push(email);
},
history,
};
}
function smtp(config: SMTPEmailProviderConfig, emailFrom: string) {
const transporter = nodemailer.createTransport({
host: config.host,
port: config.port,
secure: config.protocol === 'smtps',
auth: {
user: config.auth.user,
pass: config.auth.pass,
},
tls: {
rejectUnauthorized: config.tls.rejectUnauthorized,
},
});
return {
id: 'smtp' as const,
async send(email: Email) {
await transporter.sendMail({
from: emailFrom,
to: email.to,
subject: email.subject,
html: email.body,
});
},
history: [],
};
}
function sendmail(_config: SendmailEmailProviderConfig, emailFrom: string) {
const client = sm({});
return {
id: 'sendmail' as const,
async send(email: Email) {
await new Promise((resolve, reject) => {
client(
{
from: emailFrom,
to: email.to,
subject: email.subject,
html: email.body,
},
(err, reply) => {
if (err) {
reject(err);
} else {
resolve(reply);
}
},
);
});
},
history: [],
};
}

View file

@ -1,232 +0,0 @@
import { Job, Queue, Worker } from 'bullmq';
import Redis, { Redis as RedisInstance } from 'ioredis';
import mjml2html from 'mjml';
import pTimeout from 'p-timeout';
import type { ServiceLogger } from '@hive/service-common';
import * as Sentry from '@sentry/node';
import { emailsFailuresTotal, emailsTotal } from './metrics';
import type { EmailProvider } from './providers';
import type { EmailInput } from './shapes';
const DAY_IN_SECONDS = 86_400;
export const clientCommandMessageReg = /ERR unknown command ['`]\s*client\s*['`]/;
export function createScheduler(config: {
logger: ServiceLogger;
redis: {
host: string;
port: number;
password: string;
tlsEnabled: boolean;
};
queueName: string;
emailProvider: EmailProvider;
}) {
let redisConnection: RedisInstance | null;
let queue: Queue | null;
let stopped = false;
const logger = config.logger;
function onError(source: string) {
return (error: Error) => {
logger.error(`onError called from source ${source}`, error);
Sentry.captureException(error, {
extra: {
error,
source,
},
level: 'error',
});
};
}
function onFailed(job: Job<EmailInput> | undefined, error: Error) {
logger.debug(
`Job %s failed after %s attempts, reason: %s`,
job?.name,
job?.attemptsMade,
job?.failedReason,
);
logger.error(error);
emailsFailuresTotal.inc();
}
async function initQueueAndWorkers() {
if (!redisConnection) {
return;
}
const prefix = 'hive-emails';
queue = new Queue(config.queueName, {
prefix,
connection: redisConnection,
});
// Wait for Queues to be ready
await queue.waitUntilReady();
const worker = new Worker<EmailInput>(
config.queueName,
async job => {
logger.info('Sending email to %s', job.data.email);
let body = job.data.body;
// Poor mans MJML check :)
if (job.data.body.includes('<mjml>')) {
const rendered = mjml2html(body, {
minify: false,
minifyOptions: undefined,
});
if (rendered.errors.length > 0) {
throw new Error(rendered.errors.map(e => e.formattedMessage).join('\n'));
}
body = rendered.html;
}
await config.emailProvider.send({
to: job.data.email,
subject: job.data.subject,
body,
});
logger.info('Email sent');
emailsTotal.inc();
},
{
prefix,
connection: redisConnection,
},
);
worker.on('error', onError('emailsWorker'));
worker.on('failed', onFailed);
// Wait for Workers
await worker.waitUntilReady();
logger.info('BullMQ started');
}
async function start() {
redisConnection = new Redis({
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
retryStrategy(times) {
return Math.min(times * 500, 2000);
},
reconnectOnError(error) {
onError('redis:reconnectOnError')(error);
if (clientCommandMessageReg.test(error.message)) {
return false;
}
return 1;
},
db: 0,
maxRetriesPerRequest: null,
enableReadyCheck: false,
tls: config.redis.tlsEnabled ? {} : undefined,
});
redisConnection.on('error', err => {
onError('redis:error')(err);
});
redisConnection.on('connect', () => {
logger.info('Redis connection established');
});
redisConnection.on('ready', async () => {
logger.info('Redis connection ready... creating queues and workers...');
await initQueueAndWorkers();
});
redisConnection.on('close', () => {
logger.info('Redis connection closed');
});
redisConnection.on('reconnecting', (timeToReconnect?: number) => {
logger.info('Redis reconnecting in %s', timeToReconnect);
});
redisConnection.on('end', async () => {
logger.info('Redis ended - no more reconnections will be made');
await stop();
});
}
async function stop() {
logger.info('Started Usage shutdown...');
stopped = true;
logger.info('Clearing BullMQ...');
try {
if (queue) {
queue.removeAllListeners();
await pTimeout(queue.close(), {
milliseconds: 5000,
message: 'BullMQ close timeout',
});
}
} catch (e) {
logger.error('Failed to stop queues', e);
} finally {
queue = null;
logger.info('BullMQ stopped');
}
if (redisConnection) {
logger.info('Stopping Redis...');
try {
redisConnection.disconnect(false);
} catch (e) {
logger.error('Failed to stop Redis connection', e);
} finally {
redisConnection = null;
queue = null;
logger.info('Redis stopped');
}
}
logger.info('Exiting');
process.exit(0);
}
async function schedule(email: EmailInput) {
if (!queue) {
throw new Error('Queue not initialized');
}
return queue.add(email.id ?? email.subject, email, {
jobId: email.id,
// We don't want to remove completed jobs, because it tells us that the job has been processed
// and we avoid sending the same email twice.
removeOnComplete: {
// Let's keep the job longer than a full month, just in case :)
age: DAY_IN_SECONDS * 32,
},
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000,
},
});
}
return {
schedule,
start,
stop,
readiness() {
if (stopped) {
return false;
}
return queue !== null && redisConnection?.status === 'ready';
},
};
}

View file

@ -1,6 +0,0 @@
export type EmailInput = {
id?: string;
email: string;
subject: string;
body: string;
};

View file

@ -1,18 +0,0 @@
import { button, email, mjml, paragraph } from './components';
export function renderAuditLogsReportEmail(input: {
organizationName: string;
formattedStartDate: string;
formattedEndDate: string;
url: string;
}) {
return email({
title: 'Your Requested Audit Logs Are Ready',
body: mjml`
${paragraph(mjml`You requested audit logs for ${input.formattedStartDate} ${input.formattedEndDate}, and they are now ready for download.`)}
${paragraph('Click the link below to download your CSV file:')}
${button({ url: input.url, text: 'Download Audit Logs' })}
${paragraph(`If you didn't request this, please contact support@graphql-hive.com.`)}
`,
});
}

View file

@ -1,52 +0,0 @@
import { mjml, type MJMLValue } from '../mjml';
export { mjml };
export function paragraph(content: string | MJMLValue) {
return mjml`
<mj-text padding-bottom="10px" line-height="1.6" font-size="16px">
${content}
</mj-text>
`;
}
export function button(input: { url: string; text: string }) {
return mjml`
<mj-button align="left" href="${input.url}" font-size="16px" border-radius="3px" color="#fff" background-color="#245850">
${input.text}
</mj-button>
`;
}
export function email(input: { title: string | MJMLValue; body: MJMLValue }) {
return mjml`
<mjml>
<mj-body>
<mj-section background-color="#e6eded">
<mj-column>
<mj-text color="#245850" font-size="28px" font-weight="300">
Hive
</mj-text>
</mj-column>
</mj-section>
<mj-section>
<mj-column>
<mj-text color="#245850" font-size="24px" font-weight="300" padding-bottom="20px">
${input.title}
</mj-text>
${input.body}
</mj-column>
</mj-section>
<mj-section>
<mj-column>
<mj-divider border-width="1px" border-color="#eeeeee"></mj-divider>
<mj-text align="center" padding-bottom="20px" line-height="1.6" font-size="14px" color="#888888">
© ${mjml.raw(String(new Date().getFullYear()))} Hive. All rights reserved.
</mj-text>
</mj-column>
</mj-section>
</mj-body>
</mjml>
`.content;
}

View file

@ -1,16 +0,0 @@
import { button, email, mjml, paragraph } from './components';
export function renderEmailVerificationEmail(input: {
subject: string;
verificationLink: string;
toEmail: string;
}) {
return email({
title: `Verify Your Email Address`,
body: mjml`
${paragraph(`To complete your sign-up, please verify your email address by clicking the link below:`)}
${button({ url: input.verificationLink, text: 'Verify Email' })}
${paragraph(`If you didn't sign up, you can ignore this email.`)}
`,
});
}

View file

@ -1,11 +0,0 @@
import { button, email, mjml, paragraph } from './components';
export function renderOrganizationInvitation(input: { organizationName: string; link: string }) {
return email({
title: `Join ${input.organizationName}`,
body: mjml`
${paragraph(mjml`You've been invited to join ${input.organizationName} on GraphQL Hive.`)}
${button({ url: input.link, text: 'Accept the invitation' })}
`,
});
}

View file

@ -1,18 +0,0 @@
import { button, email, mjml, paragraph } from './components';
export function renderOrganizationOwnershipTransferEmail(input: {
authorName: string;
organizationName: string;
link: string;
}) {
return email({
title: 'Organization Ownership Transfer Initiated',
body: mjml`
${paragraph(
mjml`${input.authorName} wants to transfer the ownership of the <strong>${input.organizationName}</strong> organization.`,
)}
${button({ url: input.link, text: 'Accept the transfer' })}
${paragraph(`This link will expire in a day.`)}
`,
});
}

View file

@ -1,16 +0,0 @@
import { button, email, mjml, paragraph } from './components';
export function renderPasswordResetEmail(input: {
subject: string;
passwordResetLink: string;
toEmail: string;
}) {
return email({
title: `Reset Your Password`,
body: mjml`
${paragraph(`We received a request to reset your password. Click the link below to set a new password:`)}
${button({ url: input.passwordResetLink, text: 'Reset your password' })}
${paragraph(`If you didn't request a password reset, you can ignore this email.`)}
`,
});
}

View file

@ -1,25 +0,0 @@
import { button, email, mjml, paragraph } from './components';
const numberFormatter = new Intl.NumberFormat();
export function renderRateLimitExceededEmail(input: {
organizationName: string;
limit: number;
currentUsage: number;
subscriptionManagementLink: string;
}) {
return email({
title: 'Rate Limit Reached',
body: mjml`
${paragraph(
mjml`Your Hive organization <strong>${
input.organizationName
}</strong> has reached over 100% of the operations limit quota.. Used ${numberFormatter.format(input.currentUsage)} of ${numberFormatter.format(
input.limit,
)}.`,
)}
${paragraph(`We recommend to increase the limit.`)}
${button({ url: input.subscriptionManagementLink, text: 'Manage your subscription' })}
`,
});
}

View file

@ -1,25 +0,0 @@
import { button, email, mjml, paragraph } from './components';
const numberFormatter = new Intl.NumberFormat();
export function renderRateLimitWarningEmail(input: {
organizationName: string;
limit: number;
currentUsage: number;
subscriptionManagementLink: string;
}) {
return email({
title: 'Approaching Rate Limit',
body: mjml`
${paragraph(
mjml`Your Hive organization <strong>${
input.organizationName
}</strong> is approaching its operations limit quota. Used ${numberFormatter.format(input.currentUsage)} of ${numberFormatter.format(
input.limit,
)}.`,
)}
${paragraph(`We recommend to increase the limit.`)}
${button({ url: input.subscriptionManagementLink, text: 'Manage your subscription' })}
`,
});
}

View file

@ -1,9 +0,0 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"target": "ES2020",
"module": "esnext",
"rootDir": "../.."
},
"files": ["src/index.ts"]
}

View file

@ -1,10 +0,0 @@
{
"$schema": "https://turborepo.org/schema.json",
"extends": ["//"],
"tasks": {
"dev": {
"persistent": true,
"cache": false
}
}
}

View file

@ -1,4 +0,0 @@
REDIS_HOST="localhost"
REDIS_PORT="6379"
REDIS_PASSWORD=""
OPENTELEMETRY_COLLECTOR_ENDPOINT="<sync>"

View file

@ -1,4 +0,0 @@
*.log
.DS_Store
node_modules
dist

View file

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2022 The Guild
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -1,26 +0,0 @@
# `@hive/webhooks`
This service takes care of delivering WebHooks.
## Configuration
| Name | Required | Description | Example Value |
| ----------------------------------- | -------- | -------------------------------------------------------------------------------------------------------- | ---------------------------------------------------- |
| `PORT` | **Yes** | The port on which this service runs. | `6250` |
| `REDIS_HOST` | **Yes** | The host of your redis instance. | `"127.0.0.1"` |
| `REDIS_PORT` | **Yes** | The port of your redis instance. | `6379` |
| `REDIS_PASSWORD` | **Yes** | The password of your redis instance. | `"apollorocks"` |
| `REDIS_TLS_ENABLED` | **No** | Enable TLS for redis connection (rediss://). | `"0"` |
| `ENVIRONMENT` | No | The environment of your Hive app. (**Note:** This will be used for Sentry reporting.) | `staging` |
| `HEARTBEAT_ENDPOINT` | No | The endpoint for a heartbeat. | `http://127.0.0.1:6969/heartbeat` |
| `SENTRY` | No | Whether Sentry error reporting should be enabled. | `1` (enabled) or `0` (disabled) |
| `SENTRY_DSN` | No | The DSN for reporting errors to Sentry. | `https://dooobars@o557896.ingest.sentry.io/12121212` |
| `PROMETHEUS_METRICS` | No | Whether Prometheus metrics should be enabled | `1` (enabled) or `0` (disabled) |
| `PROMETHEUS_METRICS_LABEL_INSTANCE` | No | The instance label added for the prometheus metrics. | `webhooks-service` |
| `PROMETHEUS_METRICS_PORT` | No | Port on which prometheus metrics are exposed | Defaults to `10254` |
| `REQUEST_BROKER` | No | Whether Request Broker should be enabled. | `1` (enabled) or `0` (disabled) |
| `REQUEST_BROKER_ENDPOINT` | No | The address | `https://broker.worker.dev` |
| `REQUEST_BROKER_SIGNATURE` | No | A secret signature needed to verify the request origin | `hbsahdbzxch123` |
| `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) |
| `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) |
| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` |

View file

@ -1,26 +0,0 @@
{
"name": "@hive/webhooks",
"type": "module",
"license": "MIT",
"private": true,
"scripts": {
"build": "tsx ../../../scripts/runify.ts",
"dev": "tsup-node --config ../../../configs/tsup/dev.config.node.ts src/dev.ts",
"postbuild": "copyfiles -f \"node_modules/bullmq/dist/esm/commands/*.lua\" dist && copyfiles -f \"node_modules/bullmq/dist/esm/commands/includes/*.lua\" dist/includes",
"typecheck": "tsc --noEmit"
},
"devDependencies": {
"@hive/service-common": "workspace:*",
"@sentry/node": "7.120.2",
"@trpc/server": "10.45.3",
"bullmq": "5.34.8",
"copyfiles": "2.4.1",
"dotenv": "16.4.7",
"got": "14.4.7",
"ioredis": "5.8.2",
"p-timeout": "6.1.4",
"pino-pretty": "11.3.0",
"tslib": "2.8.1",
"zod": "3.25.76"
}
}

View file

@ -1,67 +0,0 @@
import { z } from 'zod';
import { handleTRPCError } from '@hive/service-common';
import type { inferRouterInputs } from '@trpc/server';
import { initTRPC } from '@trpc/server';
import type { Context } from './types';
const webhookInput = z
.object({
endpoint: z.string().nonempty(),
event: z
.object({
organization: z
.object({
id: z.string().nonempty(),
cleanId: z.string().nonempty(),
slug: z.string().nonempty(),
name: z.string().nonempty(),
})
.required(),
project: z
.object({
id: z.string().nonempty(),
cleanId: z.string().nonempty(),
slug: z.string().nonempty(),
name: z.string().nonempty(),
})
.required(),
target: z
.object({
id: z.string().nonempty(),
cleanId: z.string().nonempty(),
slug: z.string().nonempty(),
name: z.string().nonempty(),
})
.required(),
schema: z
.object({
id: z.string().nonempty(),
valid: z.boolean(),
commit: z.string().nonempty(),
})
.required(),
changes: z.array(z.any()),
errors: z.array(z.any()),
})
.required(),
})
.required();
const t = initTRPC.context<Context>().create();
const procedure = t.procedure.use(handleTRPCError);
export const webhooksApiRouter = t.router({
schedule: procedure.input(webhookInput).mutation(async ({ ctx, input }) => {
try {
const job = await ctx.schedule(input);
return { job: job.id ?? 'unknown' };
} catch (error) {
ctx.errorHandler('Failed to schedule a webhook', error as Error, ctx.req.log);
throw error;
}
}),
});
export type WebhooksApi = typeof webhooksApiRouter;
export type WebhooksApiInput = inferRouterInputs<WebhooksApi>;

View file

@ -1,8 +0,0 @@
import { config } from 'dotenv';
config({
debug: true,
encoding: 'utf8',
});
await import('./index');

View file

@ -1,164 +0,0 @@
import zod from 'zod';
import { OpenTelemetryConfigurationModel } from '@hive/service-common';
const isNumberString = (input: unknown) => zod.string().regex(/^\d+$/).safeParse(input).success;
const numberFromNumberOrNumberString = (input: unknown): number | undefined => {
if (typeof input == 'number') return input;
if (isNumberString(input)) return Number(input);
};
const NumberFromString = zod.preprocess(numberFromNumberOrNumberString, zod.number().min(1));
// treat an empty string (`''`) as undefined
const emptyString = <T extends zod.ZodType>(input: T) => {
return zod.preprocess((value: unknown) => {
if (value === '') return undefined;
return value;
}, input);
};
const EnvironmentModel = zod.object({
PORT: emptyString(NumberFromString.optional()),
ENVIRONMENT: emptyString(zod.string().optional()),
RELEASE: emptyString(zod.string().optional()),
HEARTBEAT_ENDPOINT: emptyString(zod.string().url().optional()),
});
const RedisModel = zod.object({
REDIS_HOST: zod.string(),
REDIS_PORT: NumberFromString,
REDIS_PASSWORD: emptyString(zod.string().optional()),
REDIS_TLS_ENABLED: emptyString(zod.union([zod.literal('1'), zod.literal('0')]).optional()),
});
const RequestBrokerModel = zod.union([
zod.object({
REQUEST_BROKER: emptyString(zod.literal('0').optional()),
}),
zod.object({
REQUEST_BROKER: zod.literal('1'),
REQUEST_BROKER_ENDPOINT: zod.string().min(1),
REQUEST_BROKER_SIGNATURE: zod.string().min(1),
}),
]);
const SentryModel = zod.union([
zod.object({
SENTRY: emptyString(zod.literal('0').optional()),
}),
zod.object({
SENTRY: zod.literal('1'),
SENTRY_DSN: zod.string(),
}),
]);
const PrometheusModel = zod.object({
PROMETHEUS_METRICS: emptyString(zod.union([zod.literal('0'), zod.literal('1')]).optional()),
PROMETHEUS_METRICS_LABEL_INSTANCE: emptyString(zod.string().optional()),
PROMETHEUS_METRICS_PORT: emptyString(NumberFromString.optional()),
});
const LogModel = zod.object({
LOG_LEVEL: emptyString(
zod
.union([
zod.literal('trace'),
zod.literal('debug'),
zod.literal('info'),
zod.literal('warn'),
zod.literal('error'),
zod.literal('fatal'),
zod.literal('silent'),
])
.optional(),
),
REQUEST_LOGGING: emptyString(zod.union([zod.literal('0'), zod.literal('1')]).optional()).default(
'1',
),
});
const configs = {
base: EnvironmentModel.safeParse(process.env),
redis: RedisModel.safeParse(process.env),
sentry: SentryModel.safeParse(process.env),
prometheus: PrometheusModel.safeParse(process.env),
log: LogModel.safeParse(process.env),
requestBroker: RequestBrokerModel.safeParse(process.env),
tracing: OpenTelemetryConfigurationModel.safeParse(process.env),
};
const environmentErrors: Array<string> = [];
for (const config of Object.values(configs)) {
if (config.success === false) {
environmentErrors.push(JSON.stringify(config.error.format(), null, 4));
}
}
if (environmentErrors.length) {
const fullError = environmentErrors.join(`\n`);
console.error('❌ Invalid environment variables:', fullError);
process.exit(1);
}
function extractConfig<Input, Output>(config: zod.SafeParseReturnType<Input, Output>): Output {
if (!config.success) {
throw new Error('Something went wrong.');
}
return config.data;
}
const base = extractConfig(configs.base);
const redis = extractConfig(configs.redis);
const sentry = extractConfig(configs.sentry);
const prometheus = extractConfig(configs.prometheus);
const log = extractConfig(configs.log);
const requestBroker = extractConfig(configs.requestBroker);
const tracing = extractConfig(configs.tracing);
export const env = {
environment: base.ENVIRONMENT,
release: base.RELEASE ?? 'local',
http: {
port: base.PORT ?? 6250,
},
tracing: {
enabled: !!tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT,
collectorEndpoint: tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT,
},
redis: {
host: redis.REDIS_HOST,
port: redis.REDIS_PORT,
password: redis.REDIS_PASSWORD ?? '',
tlsEnabled: redis.REDIS_TLS_ENABLED === '1',
},
heartbeat: base.HEARTBEAT_ENDPOINT ? { endpoint: base.HEARTBEAT_ENDPOINT } : null,
sentry: sentry.SENTRY === '1' ? { dsn: sentry.SENTRY_DSN } : null,
log: {
level: log.LOG_LEVEL ?? 'info',
requests: log.REQUEST_LOGGING === '1',
},
prometheus:
prometheus.PROMETHEUS_METRICS === '1'
? {
labels: {
instance: prometheus.PROMETHEUS_METRICS_LABEL_INSTANCE ?? 'usage-service',
},
port: prometheus.PROMETHEUS_METRICS_PORT ?? 10_254,
}
: null,
requestBroker:
requestBroker.REQUEST_BROKER === '1'
? {
endpoint: requestBroker.REQUEST_BROKER_ENDPOINT,
signature: requestBroker.REQUEST_BROKER_SIGNATURE,
}
: null,
} as const;

View file

@ -1,142 +0,0 @@
#!/usr/bin/env node
import { hostname } from 'os';
import {
configureTracing,
createErrorHandler,
createServer,
registerShutdown,
registerTRPC,
reportReadiness,
startHeartbeats,
startMetrics,
TracingInstance,
} from '@hive/service-common';
import * as Sentry from '@sentry/node';
import { webhooksApiRouter } from './api';
import { env } from './environment';
import { createScheduler } from './scheduler';
import type { Context } from './types';
async function main() {
let tracing: TracingInstance | undefined;
if (env.tracing.enabled && env.tracing.collectorEndpoint) {
tracing = configureTracing({
collectorEndpoint: env.tracing.collectorEndpoint,
serviceName: 'webhooks',
});
tracing.instrumentNodeFetch();
tracing.setup();
}
if (env.sentry) {
Sentry.init({
serverName: hostname(),
dist: 'webhooks',
enabled: !!env.sentry,
environment: env.environment,
dsn: env.sentry.dsn,
release: env.release,
});
}
const server = await createServer({
name: 'webhooks',
sentryErrorHandler: true,
log: {
level: env.log.level,
requests: env.log.requests,
},
});
if (tracing) {
await server.register(...tracing.instrumentFastify());
}
const errorHandler = createErrorHandler(server);
try {
const { schedule, readiness, start, stop } = createScheduler({
logger: server.log,
redis: {
host: env.redis.host,
port: env.redis.port,
password: env.redis.password,
tlsEnabled: env.redis.tlsEnabled,
},
webhookQueueName: 'webhook',
maxAttempts: 10,
backoffDelay: 2000,
requestBroker: env.requestBroker,
});
const stopHeartbeats = env.heartbeat
? startHeartbeats({
enabled: true,
endpoint: env.heartbeat.endpoint,
intervalInMS: 20_000,
onError: e => server.log.error(e, `Heartbeat failed with error`),
isReady: readiness,
})
: startHeartbeats({ enabled: false });
registerShutdown({
logger: server.log,
async onShutdown() {
stopHeartbeats();
await stop();
},
});
await registerTRPC(server, {
router: webhooksApiRouter,
createContext({ req }): Context {
return { req, errorHandler, schedule };
},
});
server.route({
method: ['GET', 'HEAD'],
url: '/_health',
handler(req, res) {
void res.status(200).send();
},
});
server.route({
method: ['GET', 'HEAD'],
url: '/_readiness',
handler(_, res) {
const isReady = readiness();
reportReadiness(isReady);
void res.status(isReady ? 200 : 400).send();
},
});
await server.listen({
port: env.http.port,
host: '::',
});
if (env.prometheus) {
await startMetrics(env.prometheus.labels.instance, env.prometheus.port);
}
await start();
} catch (error) {
server.log.fatal(error);
Sentry.captureException(error, {
level: 'fatal',
});
process.exit(1);
}
}
main().catch(err => {
Sentry.captureException(err, {
level: 'fatal',
});
console.error(err);
process.exit(1);
});

View file

@ -1,86 +0,0 @@
import { createHash } from 'node:crypto';
import type { Job, Queue } from 'bullmq';
import { got } from 'got';
import type { WebhookInput } from './scheduler';
import type { Config } from './types';
export async function scheduleWebhook({
queue,
webhook,
config,
}: {
webhook: WebhookInput;
config: Config;
queue: Queue;
}) {
const checksum = createHash('sha256').update(JSON.stringify(webhook)).digest('hex');
const jobName = `${webhook.event.target.id}-${checksum}`;
config.logger.debug(`Schedule ${jobName}`);
return queue
.add(jobName, webhook, {
jobId: jobName,
attempts: config.maxAttempts,
backoff: { type: 'exponential', delay: config.backoffDelay },
})
.then(result => {
config.logger.debug(`Scheduled ${jobName}`);
return Promise.resolve(result);
});
}
export function createWebhookJob({ config }: { config: Config }) {
return async function sendWebhook(job: Job<WebhookInput>) {
if (job.attemptsMade < config.maxAttempts) {
config.logger.debug(
'Calling webhook (job=%s, attempt=%d of %d)',
job.name,
job.attemptsMade,
config.maxAttempts,
);
try {
if (config.requestBroker) {
await got.post(config.requestBroker.endpoint, {
headers: {
Accept: 'text/plain',
'x-hive-signature': config.requestBroker.signature,
},
timeout: {
request: 10_000,
},
json: {
url: job.data.endpoint,
method: 'POST',
headers: {
Accept: 'application/json',
'Accept-Encoding': 'gzip, deflate, br',
'Content-Type': 'application/json',
},
body: JSON.stringify(job.data.event),
resolveResponseBody: false,
},
});
} else {
await got.post(job.data.endpoint, {
headers: {
Accept: 'application/json',
'Accept-Encoding': 'gzip, deflate, br',
'Content-Type': 'application/json',
},
timeout: {
request: 10_000,
},
json: job.data.event,
});
}
} catch (error) {
config.logger.error('Failed to call webhook (job=%s)', job.name, error);
// so we can re-try
throw error;
}
} else {
config.logger.warn('Giving up on webhook (job=%s)', job.name);
}
};
}

View file

@ -1,231 +0,0 @@
import { Job, Queue, Worker } from 'bullmq';
import Redis, { Redis as RedisInstance } from 'ioredis';
import pTimeout from 'p-timeout';
import * as Sentry from '@sentry/node';
import { createWebhookJob, scheduleWebhook } from './jobs';
import type { Config } from './types';
export const clientCommandMessageReg = /ERR unknown command ['`]\s*client\s*['`]/;
export interface WebhookInput {
endpoint: string;
event: {
organization: {
id: string;
/**
* @deprecated
* We moved away from cleanId and replaced it with slug,
* but we need to keep it for backwards compatibility.
*/
cleanId: string;
slug: string;
name: string;
};
project: {
id: string;
/**
* @deprecated
* We moved away from cleanId and replaced it with slug,
* but we need to keep it for backwards compatibility.
*
* @deprecated
*/
cleanId: string;
slug: string;
name: string;
};
target: {
id: string;
/**
* @deprecated
* We moved away from cleanId and replaced it with slug,
* but we need to keep it for backwards compatibility.
*
*/
cleanId: string;
slug: string;
name: string;
};
schema: {
id: string;
valid: boolean;
commit: string;
};
changes: any[];
errors: any[];
};
}
export function createScheduler(config: Config) {
let redisConnection: RedisInstance | null;
let webhookQueue: Queue | null;
let stopped = false;
const logger = config.logger;
async function clearBull() {
logger.info('Clearing BullMQ...');
try {
if (webhookQueue) {
webhookQueue.removeAllListeners();
await pTimeout(webhookQueue.close(), {
milliseconds: 5000,
message: 'BullMQ close timeout',
});
}
} catch (e) {
logger.error('Failed to stop queues', e);
} finally {
webhookQueue = null;
logger.info('BullMQ stopped');
}
}
async function initQueueAndWorkers() {
if (!redisConnection) {
return;
}
const prefix = 'hive-webhooks';
webhookQueue = new Queue(config.webhookQueueName, {
prefix,
connection: redisConnection,
});
// Wait for Queue to be ready
await webhookQueue.waitUntilReady();
const webhookJob = createWebhookJob({ config });
const webhookWorker = new Worker<WebhookInput>(config.webhookQueueName, webhookJob, {
prefix,
connection: redisConnection,
});
webhookWorker.on('error', onError('webhookWorker'));
webhookWorker.on('failed', onFailed);
// Wait for Workers
await webhookWorker.waitUntilReady();
}
async function start() {
redisConnection = new Redis({
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
retryStrategy(times) {
return Math.min(times * 500, 2000);
},
reconnectOnError(error) {
onError('redis:reconnectOnError')(error);
if (clientCommandMessageReg.test(error.message)) {
return false;
}
return 1;
},
db: 0,
maxRetriesPerRequest: null,
enableReadyCheck: false,
tls: config.redis.tlsEnabled ? {} : undefined,
});
redisConnection.on('error', err => {
onError('redis:error')(err);
});
redisConnection.on('connect', () => {
logger.info('Redis connection established');
});
redisConnection.on('ready', async () => {
logger.info('Redis connection ready... creating queues and workers...');
await initQueueAndWorkers();
});
redisConnection.on('close', () => {
logger.info('Redis connection closed');
});
redisConnection.on('reconnecting', (timeToReconnect?: number) => {
logger.info('Redis reconnecting in %s', timeToReconnect);
});
redisConnection.on('end', async () => {
logger.info('Redis ended - no more reconnections will be made');
await stop();
});
}
function onError(source: string) {
return (error: Error) => {
logger.error(`onError called from source ${source}`, error);
Sentry.captureException(error, {
extra: {
error,
source,
},
level: 'error',
});
};
}
function onFailed(job: Job | undefined, error: Error) {
logger.debug(
`Job %s failed after %s attempts, reason: %s (orgId=%s, projectId=%s, targetId=%s, schemaId=%s)`,
job?.name,
job?.attemptsMade,
job?.failedReason,
job?.data?.event?.organization?.id,
job?.data?.event?.project?.id,
job?.data?.event?.target?.id,
job?.data?.event?.schema?.id,
);
logger.error(error);
}
async function stop() {
logger.info('Started Usage shutdown...');
stopped = true;
await clearBull();
if (redisConnection) {
logger.info('Stopping Redis...');
try {
redisConnection.disconnect(false);
} catch (e) {
logger.error('Failed to stop Redis connection', e);
} finally {
redisConnection = null;
webhookQueue = null;
logger.info('Redis stopped');
}
}
logger.info('Existing');
process.exit(0);
}
async function schedule(webhook: WebhookInput) {
return scheduleWebhook({ queue: webhookQueue!, webhook, config });
}
return {
schedule,
start,
stop,
readiness() {
if (stopped) {
return false;
}
return (
webhookQueue !== null && redisConnection !== null && redisConnection?.status === 'ready'
);
},
};
}

View file

@ -1,26 +0,0 @@
import type { Job } from 'bullmq';
import type { FastifyRequest, ServiceLogger } from '@hive/service-common';
import type { WebhookInput } from './scheduler';
export interface Config {
logger: ServiceLogger;
redis: {
host: string;
port: number;
password: string;
tlsEnabled: boolean;
};
webhookQueueName: string;
maxAttempts: number;
backoffDelay: number;
requestBroker: null | {
endpoint: string;
signature: string;
};
}
export type Context = {
req: FastifyRequest;
errorHandler(message: string, error: Error, logger?: ServiceLogger | undefined): void;
schedule(webhook: WebhookInput): Promise<Job<any, any, string>>;
};

View file

@ -1,9 +0,0 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"target": "ES2020",
"module": "esnext",
"rootDir": "../.."
},
"files": ["src/index.ts"]
}

View file

@ -17,3 +17,37 @@ src/lib/*
## References
- [Graphile Worker Documentation](https://worker.graphile.org/)
## Configuration
| Name | Required | Description | Example Value |
| ---------------------------------------- | ----------------------------------------------------- | -------------------------------------------------------------------------------------------------------- | ---------------------------------------------------- |
| `PORT` | No | The port this service is running on. | `6260` |
| `POSTGRES_SSL` | No | Whether the postgres connection should be established via SSL. | `1` (enabled) or `0` (disabled) |
| `POSTGRES_HOST` | **Yes** | Host of the postgres database | `127.0.0.1` |
| `POSTGRES_PORT` | **Yes** | Port of the postgres database | `5432` |
| `POSTGRES_DB` | **Yes** | Name of the postgres database. | `registry` |
| `POSTGRES_USER` | **Yes** | User name for accessing the postgres database. | `postgres` |
| `POSTGRES_PASSWORD` | No | Password for accessing the postgres database. | `postgres` |
| `EMAIL_FROM` | **Yes** | The email address used for sending emails | `kamil@graphql-hive.com` |
| `EMAIL_PROVIDER` | **Yes** | The email provider that should be used for sending emails. | `smtp` or `postmark` or `mock` |
| `EMAIL_PROVIDER_SMTP_PROTOCOL` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The protocol used for the smtp server | `smtp` or `smtps` |
| `EMAIL_PROVIDER_SMTP_HOST` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The host of the smtp server | `127.0.0.1` |
| `EMAIL_PROVIDER_SMTP_PORT` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The port of the smtp server | `25` |
| `EMAIL_PROVIDER_SMTP_AUTH_USERNAME` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The username for the smtp server. | `letmein` |
| `EMAIL_PROVIDER_SMTP_AUTH_PASSWORD` | No (**Yes** if `EMAIL_PROVIDER` is set to `smtp`) | The password for the smtp server. | `letmein` |
| `EMAIL_PROVIDER_POSTMARK_TOKEN` | No (**Yes** if `EMAIL_PROVIDER` is set to `postmark`) | The postmark token. | `abcdefg123` |
| `EMAIL_PROVIDER_POSTMARK_MESSAGE_STREAM` | No (**Yes** if `EMAIL_PROVIDER` is set to `postmark`) | The postmark message stream. | `abcdefg123` |
| `ENVIRONMENT` | No | The environment of your Hive app. (**Note:** This will be used for Sentry reporting.) | `staging` |
| `HEARTBEAT_ENDPOINT` | No | The endpoint for a heartbeat. | `http://127.0.0.1:6969/heartbeat` |
| `SENTRY` | No | Whether Sentry error reporting should be enabled. | `1` (enabled) or `0` (disabled) |
| `SENTRY_DSN` | No | The DSN for reporting errors to Sentry. | `https://dooobars@o557896.ingest.sentry.io/12121212` |
| `PROMETHEUS_METRICS` | No | Whether Prometheus metrics should be enabled | `1` (enabled) or `0` (disabled) |
| `PROMETHEUS_METRICS_LABEL_INSTANCE` | No | The instance label added for the prometheus metrics. | `emails` |
| `PROMETHEUS_METRICS_PORT` | No | Port on which prometheus metrics are exposed | Defaults to `10254` |
| `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) |
| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` |
| `REQUEST_BROKER` | No | Whether Request Broker should be enabled. | `1` (enabled) or `0` (disabled) |
| `REQUEST_BROKER_ENDPOINT` | No | The address | `https://broker.worker.dev` |
| `REQUEST_BROKER_SIGNATURE` | No | A secret signature needed to verify the request origin | `hbsahdbzxch123` |
| `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) |

View file

@ -15,6 +15,7 @@
"@sentry/node": "7.120.2",
"@types/mjml": "4.7.1",
"@types/nodemailer": "7.0.4",
"@types/sendmail": "1.4.7",
"bentocache": "1.1.0",
"dotenv": "16.4.7",
"graphile-worker": "0.16.6",

View file

@ -1069,60 +1069,6 @@ importers:
specifier: 3.107.2
version: 3.107.2(@cloudflare/workers-types@4.20250913.0)
packages/services/emails:
devDependencies:
'@hive/service-common':
specifier: workspace:*
version: link:../service-common
'@sentry/node':
specifier: 7.120.2
version: 7.120.2
'@trpc/server':
specifier: 10.45.3
version: 10.45.3
'@types/mjml':
specifier: 4.7.1
version: 4.7.1
'@types/nodemailer':
specifier: 7.0.4
version: 7.0.4
'@types/sendmail':
specifier: 1.4.7
version: 1.4.7
bullmq:
specifier: 5.34.8
version: 5.34.8
copyfiles:
specifier: 2.4.1
version: 2.4.1
dotenv:
specifier: 16.4.7
version: 16.4.7
ioredis:
specifier: 5.8.2
version: 5.8.2
mjml:
specifier: 4.14.0
version: 4.14.0(encoding@0.1.13)
nodemailer:
specifier: 7.0.11
version: 7.0.11
p-timeout:
specifier: 6.1.4
version: 6.1.4
pino-pretty:
specifier: 11.3.0
version: 11.3.0
sendmail:
specifier: 1.6.1
version: 1.6.1
tslib:
specifier: 2.8.1
version: 2.8.1
zod:
specifier: 3.25.76
version: 3.25.76
packages/services/external-composition/federation-2:
devDependencies:
'@apollo/composition':
@ -1697,45 +1643,6 @@ importers:
specifier: 3.25.76
version: 3.25.76
packages/services/webhooks:
devDependencies:
'@hive/service-common':
specifier: workspace:*
version: link:../service-common
'@sentry/node':
specifier: 7.120.2
version: 7.120.2
'@trpc/server':
specifier: 10.45.3
version: 10.45.3
bullmq:
specifier: 5.34.8
version: 5.34.8
copyfiles:
specifier: 2.4.1
version: 2.4.1
dotenv:
specifier: 16.4.7
version: 16.4.7
got:
specifier: 14.4.7
version: 14.4.7(patch_hash=f7660444905ddadee251ff98241119fb54f5fec1e673a428192da361d5636299)
ioredis:
specifier: 5.8.2
version: 5.8.2
p-timeout:
specifier: 6.1.4
version: 6.1.4
pino-pretty:
specifier: 11.3.0
version: 11.3.0
tslib:
specifier: 2.8.1
version: 2.8.1
zod:
specifier: 3.25.76
version: 3.25.76
packages/services/workflows:
devDependencies:
'@graphql-hive/logger':
@ -1756,6 +1663,9 @@ importers:
'@types/nodemailer':
specifier: 7.0.4
version: 7.0.4
'@types/sendmail':
specifier: 1.4.7
version: 1.4.7
bentocache:
specifier: 1.1.0
version: 1.1.0(patch_hash=98c0f93795fdd4f5eae32ee7915de8e9a346a24c3a917262b1f4551190f1a1af)(ioredis@5.8.2)
@ -5894,36 +5804,6 @@ packages:
react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0
react-dom: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==}
cpu: [arm64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==}
cpu: [x64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==}
cpu: [arm64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==}
cpu: [arm]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==}
cpu: [x64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==}
cpu: [x64]
os: [win32]
'@mswjs/interceptors@0.39.6':
resolution: {integrity: sha512-bndDP83naYYkfayr/qhBHMhk0YGwS1iv6vaEGcr0SQbO0IZtbOPqjKjds/WcG+bJA+1T5vCx6kprKOzn5Bg+Vw==}
engines: {node: '>=18'}
@ -10794,9 +10674,6 @@ packages:
builtins@5.0.1:
resolution: {integrity: sha512-qwVpFEHNfhYJIzNRBvd2C1kyo6jz3ZSMPyyuR47OPdiKWlbYnZNyDWuyR175qDnAJLiCo5fBBqPb3RiXgWlkOQ==}
bullmq@5.34.8:
resolution: {integrity: sha512-id5mmPg3K8tNXQ9VVlmUxBSeLmliIWUrB8Hd5c62PFrIiHywz4TN1PEqU6OWvYXEvoFCr8/BlnbE4JCrGqPVmg==}
bundle-name@3.0.0:
resolution: {integrity: sha512-PKA4BeSvBpQKQ8iPOGCSiell+N8P+Tf1DlwqmYhpe2gAhKPHn8EYOxVT+ShuGmhg8lN8XiSlS80yiExKXrURlw==}
engines: {node: '>=12'}
@ -11334,10 +11211,6 @@ packages:
crelt@1.0.6:
resolution: {integrity: sha512-VQ2MBenTq1fWZUH9DJNGti7kKv6EeAuYr3cLwxUWhIu1baTaXh4Ib5W2CqHVqib4/MqbYGJqiL3Zb8GJZr3l4g==}
cron-parser@4.9.0:
resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==}
engines: {node: '>=12.0.0'}
cross-fetch@3.1.8:
resolution: {integrity: sha512-cvA+JwZoU0Xq+h6WkMvAUqPEYy92Obet6UdKLfW60qn99ftItKjB5T+BkyWOFWe2pUyfQ+IJHmpOTznqk1M6Kg==}
@ -11768,10 +11641,6 @@ packages:
engines: {node: '>=0.10'}
hasBin: true
detect-libc@2.1.0:
resolution: {integrity: sha512-vEtk+OcP7VBRtQZ1EJ3bdgzSfBjgnEalLTp5zjJrS+2Z1w2KZly4SBdac/WDU3hhsNAZ9E8SC96ME4Ey8MZ7cg==}
engines: {node: '>=8'}
detect-libc@2.1.2:
resolution: {integrity: sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==}
engines: {node: '>=8'}
@ -14459,10 +14328,6 @@ packages:
peerDependencies:
react: ^16.5.1 || ^17.0.0 || ^18.0.0 || ^19.0.0
luxon@3.5.0:
resolution: {integrity: sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==}
engines: {node: '>=12'}
lz-string@1.5.0:
resolution: {integrity: sha512-h5bgJWpxJNswbU7qCrV0tIKQCaS3blPDrqKWx+QxzuzL1zGUzij9XCWLrSLsJPu5t+eWA/ycetzYAO5IOMcWAQ==}
hasBin: true
@ -15089,13 +14954,6 @@ packages:
ms@2.1.3:
resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==}
msgpackr-extract@3.0.3:
resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==}
hasBin: true
msgpackr@1.11.2:
resolution: {integrity: sha512-F9UngXRlPyWCDEASDpTf6c9uNhGPTqnTeLVt7bN+bU1eajoR/8V9ys2BRaV5C/e5ihE6sJ9uPIKaYt6bFuO32g==}
multi-fork@0.0.2:
resolution: {integrity: sha512-SHWGuze0cZNiH+JGJQFlB1k7kZLGFCvW1Xo5Fcpe86KICkC3aVTJWpjUcmyYcLCB0I6gdzKLCia/bTIw2ggl8A==}
@ -15242,10 +15100,6 @@ packages:
resolution: {integrity: sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==}
engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0}
node-gyp-build-optional-packages@5.2.2:
resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==}
hasBin: true
node-gyp@10.0.1:
resolution: {integrity: sha512-gg3/bHehQfZivQVfqIyy8wTdSymF9yTyP4CJifK73imyNMU8AIGQE2pUa7dNWfmMeG9cDVF2eehiRMv0LC1iAg==}
engines: {node: ^16.14.0 || >=18.0.0}
@ -25716,24 +25570,6 @@ snapshots:
react: 18.3.1
react-dom: 18.3.1(react@18.3.1)
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
optional: true
'@mswjs/interceptors@0.39.6':
dependencies:
'@open-draft/deferred-promise': 2.2.0
@ -26853,7 +26689,7 @@ snapshots:
'@types/shimmer': 1.2.0
import-in-the-middle: 1.7.1
require-in-the-middle: 7.3.0
semver: 7.6.3
semver: 7.7.2
shimmer: 1.2.1
transitivePeerDependencies:
- supports-color
@ -32141,18 +31977,6 @@ snapshots:
dependencies:
semver: 7.7.2
bullmq@5.34.8:
dependencies:
cron-parser: 4.9.0
ioredis: 5.8.2
msgpackr: 1.11.2
node-abort-controller: 3.1.1
semver: 7.6.2
tslib: 2.8.1
uuid: 9.0.1
transitivePeerDependencies:
- supports-color
bundle-name@3.0.0:
dependencies:
run-applescript: 5.0.0
@ -32748,10 +32572,6 @@ snapshots:
crelt@1.0.6: {}
cron-parser@4.9.0:
dependencies:
luxon: 3.5.0
cross-fetch@3.1.8(encoding@0.1.13):
dependencies:
node-fetch: 2.6.12(encoding@0.1.13)
@ -33202,9 +33022,6 @@ snapshots:
detect-libc@1.0.3: {}
detect-libc@2.1.0:
optional: true
detect-libc@2.1.2: {}
detect-newline@4.0.1: {}
@ -36482,8 +36299,6 @@ snapshots:
dependencies:
react: 19.2.2
luxon@3.5.0: {}
lz-string@1.5.0: {}
magic-string@0.25.9:
@ -37628,22 +37443,6 @@ snapshots:
ms@2.1.3: {}
msgpackr-extract@3.0.3:
dependencies:
node-gyp-build-optional-packages: 5.2.2
optionalDependencies:
'@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3
'@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3
'@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3
optional: true
msgpackr@1.11.2:
optionalDependencies:
msgpackr-extract: 3.0.3
multi-fork@0.0.2: {}
mustache@4.2.0: {}
@ -37830,11 +37629,6 @@ snapshots:
fetch-blob: 3.2.0
formdata-polyfill: 4.0.10
node-gyp-build-optional-packages@5.2.2:
dependencies:
detect-libc: 2.1.0
optional: true
node-gyp@10.0.1:
dependencies:
env-paths: 2.2.1