feat(redis): add support for dedicated queue client configuration (#14840)

Fix https://github.com/twentyhq/core-team-issues/issues/452
Fix https://github.com/twentyhq/core-team-issues/issues/923

---------

Co-authored-by: Félix Malfait <felix@twenty.com>
This commit is contained in:
Antoine Moreaux 2025-10-02 19:14:12 +02:00 committed by GitHub
parent d2fad6754d
commit d4160bf064
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 49 additions and 185 deletions

View file

@ -163,7 +163,6 @@
"passport-microsoft": "2.1.0",
"path-to-regexp": "^8.2.0",
"pg": "8.12.0",
"pg-boss": "9.0.3",
"planer": "1.2.0",
"pluralize": "8.0.0",
"psl": "^1.9.0",

View file

@ -1,120 +0,0 @@
import { type OnModuleDestroy, type OnModuleInit } from '@nestjs/common';
import PgBoss from 'pg-boss';
import {
type QueueCronJobOptions,
type QueueJobOptions,
} from 'src/engine/core-modules/message-queue/drivers/interfaces/job-options.interface';
import { type MessageQueueJob } from 'src/engine/core-modules/message-queue/interfaces/message-queue-job.interface';
import { type MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
import { type MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import { type MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { getJobKey } from 'src/engine/core-modules/message-queue/utils/get-job-key.util';
export type PgBossDriverOptions = PgBoss.ConstructorOptions;
const DEFAULT_PG_BOSS_CRON_PATTERN_WHEN_NOT_PROVIDED = '*/1 * * * *';
export class PgBossDriver
implements MessageQueueDriver, OnModuleInit, OnModuleDestroy
{
private pgBoss: PgBoss;
constructor(options: PgBossDriverOptions) {
this.pgBoss = new PgBoss(options);
}
async onModuleInit() {
await this.pgBoss.start();
}
async onModuleDestroy() {
await this.pgBoss.stop();
}
async work<T>(
queueName: string,
handler: (job: MessageQueueJob<T>) => Promise<void>,
options?: MessageQueueWorkerOptions,
) {
return this.pgBoss.work<T>(
`${queueName}.*`,
options?.concurrency
? {
teamConcurrency: options.concurrency,
}
: {},
async (job) => {
// PGBoss work with wildcard job name
const jobName = job.name.split('.')?.[1];
if (!jobName) {
throw new Error('Job name could not be splited from the job.');
}
await handler({
data: job.data,
id: job.id,
name: jobName,
});
},
);
}
async addCron<T>({
queueName,
jobName,
data,
options,
jobId,
}: {
queueName: MessageQueue;
jobName: string;
data: T;
options: QueueCronJobOptions;
jobId?: string;
}): Promise<void> {
const name = `${queueName}.${getJobKey({ jobName, jobId })}`;
await this.pgBoss.schedule(
name,
options.repeat.pattern ?? DEFAULT_PG_BOSS_CRON_PATTERN_WHEN_NOT_PROVIDED,
data as object,
);
}
async removeCron({
queueName,
jobName,
jobId,
}: {
queueName: MessageQueue;
jobName: string;
jobId?: string;
}): Promise<void> {
const name = `${queueName}.${getJobKey({ jobName, jobId })}`;
await this.pgBoss.unschedule(name);
}
async add<T>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
await this.pgBoss.send(
`${queueName}.${jobName}`,
data as object,
options
? {
...options,
singletonKey: options?.id,
useSingletonQueue: true, // When used with singletonKey, ensures only one job can be queued. See https://logsnag.com/blog/deep-dive-into-background-jobs-with-pg-boss-and-typescript
}
: {},
);
}
}

View file

@ -1,17 +1,10 @@
import { type BullMQDriverOptions } from 'src/engine/core-modules/message-queue/drivers/bullmq.driver';
import { type PgBossDriverOptions } from 'src/engine/core-modules/message-queue/drivers/pg-boss.driver';
export enum MessageQueueDriverType {
PgBoss = 'pg-boss',
BullMQ = 'bull-mq',
Sync = 'sync',
}
export interface PgBossDriverFactoryOptions {
type: MessageQueueDriverType.PgBoss;
options: PgBossDriverOptions;
}
export interface BullMQDriverFactoryOptions {
type: MessageQueueDriverType.BullMQ;
options: BullMQDriverOptions;
@ -24,6 +17,5 @@ export interface SyncDriverFactoryOptions {
}
export type MessageQueueModuleOptions =
| PgBossDriverFactoryOptions
| BullMQDriverFactoryOptions
| SyncDriverFactoryOptions;

View file

@ -8,21 +8,20 @@ import {
import { type MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import { BullMQDriver } from 'src/engine/core-modules/message-queue/drivers/bullmq.driver';
import { SyncDriver } from 'src/engine/core-modules/message-queue/drivers/sync.driver';
import { MessageQueueDriverType } from 'src/engine/core-modules/message-queue/interfaces';
import {
MessageQueue,
QUEUE_DRIVER,
} from 'src/engine/core-modules/message-queue/message-queue.constants';
import { PgBossDriver } from 'src/engine/core-modules/message-queue/drivers/pg-boss.driver';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { BullMQDriver } from 'src/engine/core-modules/message-queue/drivers/bullmq.driver';
import { SyncDriver } from 'src/engine/core-modules/message-queue/drivers/sync.driver';
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
import {
type ASYNC_OPTIONS_TYPE,
ConfigurableModuleClass,
type OPTIONS_TYPE,
} from 'src/engine/core-modules/message-queue/message-queue.module-definition';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
@Global()
@Module({})
@ -94,9 +93,6 @@ export class MessageQueueCoreModule extends ConfigurableModuleClass {
static async createDriver({ type, options }: typeof OPTIONS_TYPE) {
switch (type) {
case MessageQueueDriverType.PgBoss: {
return new PgBossDriver(options);
}
case MessageQueueDriverType.BullMQ: {
return new BullMQDriver(options);
}

View file

@ -18,28 +18,11 @@ export const messageQueueModuleFactory = async (
const driverType = MessageQueueDriverType.BullMQ;
switch (driverType) {
/*
case MessageQueueDriverType.Sync: {
return {
type: MessageQueueDriverType.Sync,
options: {},
} satisfies SyncDriverFactoryOptions;
}
case MessageQueueDriverType.PgBoss: {
const connectionString = twentyConfigService.get('PG_DATABASE_URL');
return {
type: MessageQueueDriverType.PgBoss,
options: {
connectionString,
},
} satisfies PgBossDriverFactoryOptions;
}*/
case MessageQueueDriverType.BullMQ: {
return {
type: MessageQueueDriverType.BullMQ,
options: {
connection: redisClientService.getClient(),
connection: redisClientService.getQueueClient(),
},
} satisfies BullMQDriverFactoryOptions;
}

View file

@ -1,15 +1,35 @@
import { Injectable, type OnModuleDestroy } from '@nestjs/common';
import IORedis from 'ioredis';
import { isDefined } from 'twenty-shared/utils';
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
@Injectable()
export class RedisClientService implements OnModuleDestroy {
private redisClient: IORedis | null = null;
private redisQueueClient: IORedis | null = null;
constructor(private readonly twentyConfigService: TwentyConfigService) {}
getQueueClient() {
if (!this.redisQueueClient) {
const redisQueueUrl =
this.twentyConfigService.get('REDIS_QUEUE_URL') ??
this.twentyConfigService.get('REDIS_URL');
if (!redisQueueUrl) {
throw new Error('REDIS_QUEUE_URL or REDIS_URL must be defined');
}
this.redisQueueClient = new IORedis(redisQueueUrl, {
maxRetriesPerRequest: null,
});
}
return this.redisQueueClient;
}
getClient() {
if (!this.redisClient) {
const redisUrl = this.twentyConfigService.get('REDIS_URL');
@ -27,7 +47,11 @@ export class RedisClientService implements OnModuleDestroy {
}
async onModuleDestroy() {
if (this.redisClient) {
if (isDefined(this.redisQueueClient)) {
await this.redisQueueClient.quit();
this.redisQueueClient = null;
}
if (isDefined(this.redisClient)) {
await this.redisClient.quit();
this.redisClient = null;
}

View file

@ -841,7 +841,22 @@ export class ConfigVariables {
@ConfigVariablesMetadata({
group: ConfigVariablesGroup.ServerConfig,
isSensitive: true,
description: 'URL for cache storage (e.g., Redis connection URL)',
description: 'Redis connection URL used for cache and queues by default',
isEnvOnly: true,
type: ConfigVariableType.STRING,
})
@IsUrl({
protocols: ['redis', 'rediss'],
require_tld: false,
allow_underscores: true,
})
REDIS_URL: string;
@ConfigVariablesMetadata({
group: ConfigVariablesGroup.ServerConfig,
isSensitive: true,
description:
'Optional separate Redis connection for queues with a different eviction policy (advanced production use case, most self-hosters do not need this)',
isEnvOnly: true,
type: ConfigVariableType.STRING,
})
@ -851,7 +866,7 @@ export class ConfigVariables {
require_tld: false,
allow_underscores: true,
})
REDIS_URL: string;
REDIS_QUEUE_URL: string;
@ConfigVariablesMetadata({
group: ConfigVariablesGroup.ServerConfig,

View file

@ -28989,7 +28989,7 @@ __metadata:
languageName: node
linkType: hard
"cron-parser@npm:^4.0.0, cron-parser@npm:^4.2.0, cron-parser@npm:^4.9.0":
"cron-parser@npm:^4.2.0, cron-parser@npm:^4.9.0":
version: 4.9.0
resolution: "cron-parser@npm:4.9.0"
dependencies:
@ -45469,21 +45469,6 @@ __metadata:
languageName: node
linkType: hard
"pg-boss@npm:9.0.3":
version: 9.0.3
resolution: "pg-boss@npm:9.0.3"
dependencies:
cron-parser: "npm:^4.0.0"
delay: "npm:^5.0.0"
lodash.debounce: "npm:^4.0.8"
p-map: "npm:^4.0.0"
pg: "npm:^8.5.1"
serialize-error: "npm:^8.1.0"
uuid: "npm:^9.0.0"
checksum: 10c0/7021733e75c59ddcccc0effeceb745e689b3b975dea817910696c0b1b0e986cb32ddd886418b9db8b4cfa0fceaf826754bd9c1a3438095893a39cccf1149d793
languageName: node
linkType: hard
"pg-cloudflare@npm:^1.1.1":
version: 1.1.1
resolution: "pg-cloudflare@npm:1.1.1"
@ -45556,7 +45541,7 @@ __metadata:
languageName: node
linkType: hard
"pg@npm:8.12.0, pg@npm:^8.5.1":
"pg@npm:8.12.0":
version: 8.12.0
resolution: "pg@npm:8.12.0"
dependencies:
@ -49321,15 +49306,6 @@ __metadata:
languageName: node
linkType: hard
"serialize-error@npm:^8.1.0":
version: 8.1.0
resolution: "serialize-error@npm:8.1.0"
dependencies:
type-fest: "npm:^0.20.2"
checksum: 10c0/8cfd89f43ca93e283c5f1d16178a536bdfac9bc6029f4a9df988610cc399bc4f2478d1f10ce40b9dff66b863a5158a19b438fbec929045c96d92174f6bca1e88
languageName: node
linkType: hard
"serialize-javascript@npm:^6.0.2":
version: 6.0.2
resolution: "serialize-javascript@npm:6.0.2"
@ -52685,7 +52661,6 @@ __metadata:
passport-microsoft: "npm:2.1.0"
path-to-regexp: "npm:^8.2.0"
pg: "npm:8.12.0"
pg-boss: "npm:9.0.3"
planer: "npm:1.2.0"
pluralize: "npm:8.0.0"
psl: "npm:^1.9.0"