fix(billing): add credit check to queued message flush and stream-agent-chat job

https://sonarly.com/issue/24307?type=bug

Users can consume unlimited AI credits beyond their assigned limit through two bypass paths: queued messages that auto-flush without any billing check, and the inherent race condition where the credit cap flag is set asynchronously by Stripe webhooks after usage has already occurred.
This commit is contained in:
Sonarly Claude Code 2026-04-12 09:43:21 +00:00
parent 55b1624210
commit 38c6fb2015
2 changed files with 58 additions and 0 deletions

View file

@ -9,9 +9,12 @@ import type {
} from 'twenty-shared/ai';
import { Repository } from 'typeorm';
import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum';
import { BillingService } from 'src/engine/core-modules/billing/services/billing.service';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
import { toDisplayCredits } from 'src/engine/core-modules/usage/utils/to-display-credits.util';
import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
import { AgentMessageRole } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message.entity';
@ -46,6 +49,8 @@ export class StreamAgentChatJob {
private readonly eventPublisherService: AgentChatEventPublisherService,
private readonly cancelSubscriberService: AgentChatCancelSubscriberService,
private readonly agentChatStreamingService: AgentChatStreamingService,
private readonly billingService: BillingService,
private readonly twentyConfigService: TwentyConfigService,
) {}
@Process(STREAM_AGENT_CHAT_JOB_NAME)
@ -69,6 +74,30 @@ export class StreamAgentChatJob {
return;
}
if (this.twentyConfigService.get('IS_BILLING_ENABLED')) {
const canBill = await this.billingService.canBillMeteredProduct(
data.workspaceId,
BillingProductKey.WORKFLOW_NODE_EXECUTION,
);
if (!canBill) {
this.logger.warn(
`Credits exhausted for workspace ${data.workspaceId}, aborting stream ${data.streamId}`,
);
await this.eventPublisherService.publish({
threadId: data.threadId,
workspaceId: data.workspaceId,
event: {
type: 'stream-error',
code: 'BILLING_CREDITS_EXHAUSTED',
message: 'Credits exhausted',
},
});
return;
}
}
const abortController = new AbortController();
const cancelChannel = getCancelChannel(data.threadId);

View file

@ -10,11 +10,14 @@ import {
import { FileFolder } from 'twenty-shared/types';
import { In, Like, type Repository } from 'typeorm';
import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum';
import { BillingService } from 'src/engine/core-modules/billing/services/billing.service';
import { FileEntity } from 'src/engine/core-modules/file/entities/file.entity';
import { FileUrlService } from 'src/engine/core-modules/file/file-url/file-url.service';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
import { type WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
import {
AgentMessageRole,
@ -57,6 +60,8 @@ export class AgentChatStreamingService {
private readonly agentChatService: AgentChatService,
private readonly eventPublisherService: AgentChatEventPublisherService,
private readonly fileUrlService: FileUrlService,
private readonly billingService: BillingService,
private readonly twentyConfigService: TwentyConfigService,
) {}
async streamAgentChat({
@ -139,6 +144,30 @@ export class AgentChatStreamingService {
workspaceId: string,
hasTitle: boolean,
): Promise<void> {
if (this.twentyConfigService.get('IS_BILLING_ENABLED')) {
const canBill = await this.billingService.canBillMeteredProduct(
workspaceId,
BillingProductKey.WORKFLOW_NODE_EXECUTION,
);
if (!canBill) {
this.logger.warn(
`Credits exhausted for workspace ${workspaceId}, skipping queued message flush for thread ${threadId}`,
);
await this.eventPublisherService.publish({
threadId,
workspaceId,
event: {
type: 'stream-error',
code: 'BILLING_CREDITS_EXHAUSTED',
message: 'Credits exhausted',
},
});
return;
}
}
const queuedMessages =
await this.agentChatService.getQueuedMessages(threadId);