diff --git a/packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/jobs/stream-agent-chat.job.ts b/packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/jobs/stream-agent-chat.job.ts index ee69d8a3708..1ccb6698b00 100644 --- a/packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/jobs/stream-agent-chat.job.ts +++ b/packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/jobs/stream-agent-chat.job.ts @@ -9,9 +9,12 @@ import type { } from 'twenty-shared/ai'; import { Repository } from 'typeorm'; +import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum'; +import { BillingService } from 'src/engine/core-modules/billing/services/billing.service'; import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; import { toDisplayCredits } from 'src/engine/core-modules/usage/utils/to-display-credits.util'; import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity'; import { AgentMessageRole } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message.entity'; @@ -46,6 +49,8 @@ export class StreamAgentChatJob { private readonly eventPublisherService: AgentChatEventPublisherService, private readonly cancelSubscriberService: AgentChatCancelSubscriberService, private readonly agentChatStreamingService: AgentChatStreamingService, + private readonly billingService: BillingService, + private readonly twentyConfigService: TwentyConfigService, ) {} @Process(STREAM_AGENT_CHAT_JOB_NAME) @@ -69,6 +74,30 @@ export class StreamAgentChatJob { return; } + if (this.twentyConfigService.get('IS_BILLING_ENABLED')) { + const canBill = await this.billingService.canBillMeteredProduct( + data.workspaceId, + BillingProductKey.WORKFLOW_NODE_EXECUTION, + ); + + if (!canBill) { + this.logger.warn( + `Credits exhausted for workspace ${data.workspaceId}, aborting stream ${data.streamId}`, + ); + await this.eventPublisherService.publish({ + threadId: data.threadId, + workspaceId: data.workspaceId, + event: { + type: 'stream-error', + code: 'BILLING_CREDITS_EXHAUSTED', + message: 'Credits exhausted', + }, + }); + + return; + } + } + const abortController = new AbortController(); const cancelChannel = getCancelChannel(data.threadId); diff --git a/packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service.ts b/packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service.ts index 5c34a13a17b..d7448ce46fe 100644 --- a/packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service.ts +++ b/packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service.ts @@ -10,11 +10,14 @@ import { import { FileFolder } from 'twenty-shared/types'; import { In, Like, type Repository } from 'typeorm'; +import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum'; +import { BillingService } from 'src/engine/core-modules/billing/services/billing.service'; import { FileEntity } from 'src/engine/core-modules/file/entities/file.entity'; import { FileUrlService } from 'src/engine/core-modules/file/file-url/file-url.service'; import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; import { type WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity'; import { AgentMessageRole, @@ -57,6 +60,8 @@ export class AgentChatStreamingService { private readonly agentChatService: AgentChatService, private readonly eventPublisherService: AgentChatEventPublisherService, private readonly fileUrlService: FileUrlService, + private readonly billingService: BillingService, + private readonly twentyConfigService: TwentyConfigService, ) {} async streamAgentChat({ @@ -139,6 +144,30 @@ export class AgentChatStreamingService { workspaceId: string, hasTitle: boolean, ): Promise { + if (this.twentyConfigService.get('IS_BILLING_ENABLED')) { + const canBill = await this.billingService.canBillMeteredProduct( + workspaceId, + BillingProductKey.WORKFLOW_NODE_EXECUTION, + ); + + if (!canBill) { + this.logger.warn( + `Credits exhausted for workspace ${workspaceId}, skipping queued message flush for thread ${threadId}`, + ); + await this.eventPublisherService.publish({ + threadId, + workspaceId, + event: { + type: 'stream-error', + code: 'BILLING_CREDITS_EXHAUSTED', + message: 'Credits exhausted', + }, + }); + + return; + } + } + const queuedMessages = await this.agentChatService.getQueuedMessages(threadId);