feat: add resumable stream support for agent chat (#19107)

## Overview
Add resumable stream support for agent chat to allow clients to
reconnect and resume streaming responses if the connection is
interrupted (e.g., during page refresh).

## Changes

### Backend (Twenty Server)
- Add `activeStreamId` column to `AgentChatThreadEntity` to track
ongoing streams
- Create `AgentChatResumableStreamService` to manage Redis-backed
resumable streams using the `resumable-stream` library with ioredis
- Extend `AgentChatController` with:
  - `GET /:threadId/stream` endpoint to resume an existing stream
  - `DELETE /:threadId/stream` endpoint to stop an active stream
- Update `AgentChatStreamingService` to store streams in Redis and track
active stream IDs
- Add `resumable-stream@^2.2.12` dependency to package.json

### Frontend (Twenty Front)
- Update `useAgentChat` hook to:
- Use a persistent transport with `prepareReconnectToStreamRequest` for
resumable streams
  - Export `resumeStream` function from useChat
  - Add `handleStop` callback to clear active stream on DELETE endpoint
- Use thread ID as stable message ID instead of including message count
- Add stream resumption logic in `AgentChatAiSdkStreamEffect` component
to automatically call `resumeStream()` when switching threads

## Database Migration
New migration `1774003611071-add-active-stream-id-to-agent-chat-thread`
adds the `activeStreamId` column to store the current resumable stream
identifier.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Félix Malfait 2026-03-30 18:19:25 +02:00 committed by GitHub
parent ca00e8dece
commit 8fa3962e1c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 983 additions and 283 deletions

View file

@ -86,6 +86,8 @@ jobs:
run: |
echo "Attempting to merge main into current branch..."
git config user.email "ci@twenty.com"
git config user.name "CI"
git fetch origin main
CURRENT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
@ -99,8 +101,8 @@ jobs:
echo "❌ Merge failed due to conflicts"
echo "⚠️ Falling back to comparing current branch against main without merge"
# Abort the failed merge
git merge --abort
# Abort the failed merge (may not exist if merge never started)
git merge --abort 2>/dev/null || true
echo "merged=false" >> $GITHUB_OUTPUT
echo "BRANCH_STATE=conflicts" >> $GITHUB_ENV
@ -402,6 +404,23 @@ jobs:
# Clean up temp directory
rm -rf /tmp/current-branch-files
- name: Validate downloaded schema files
id: validate-schemas
run: |
valid=true
for file in main-schema-introspection.json current-schema-introspection.json \
main-metadata-schema-introspection.json current-metadata-schema-introspection.json \
main-rest-api.json current-rest-api.json \
main-rest-metadata-api.json current-rest-metadata-api.json; do
if [ ! -f "$file" ] || ! jq empty "$file" 2>/dev/null; then
echo "::warning::Invalid or missing schema file: $file"
valid=false
fi
done
echo "valid=$valid" >> $GITHUB_OUTPUT
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
@ -414,6 +433,7 @@ jobs:
echo "Using OpenAPITools/openapi-diff via Docker"
- name: Generate GraphQL Schema Diff Reports
if: steps.validate-schemas.outputs.valid == 'true'
run: |
echo "=== INSTALLING GRAPHQL INSPECTOR CLI ==="
npm install -g @graphql-inspector/cli
@ -424,7 +444,6 @@ jobs:
echo "Checking GraphQL schema for changes..."
if graphql-inspector diff main-schema-introspection.json current-schema-introspection.json >/dev/null 2>&1; then
echo "✅ No changes in GraphQL schema"
# Don't create a diff file for no changes
else
echo "⚠️ Changes detected in GraphQL schema, generating report..."
echo "# GraphQL Schema Changes" > graphql-schema-diff.md
@ -442,7 +461,6 @@ jobs:
echo "Checking GraphQL metadata schema for changes..."
if graphql-inspector diff main-metadata-schema-introspection.json current-metadata-schema-introspection.json >/dev/null 2>&1; then
echo "✅ No changes in GraphQL metadata schema"
# Don't create a diff file for no changes
else
echo "⚠️ Changes detected in GraphQL metadata schema, generating report..."
echo "# GraphQL Metadata Schema Changes" > graphql-metadata-diff.md
@ -461,6 +479,7 @@ jobs:
ls -la *-diff.md 2>/dev/null || echo "No diff files generated (no changes detected)"
- name: Check REST API Breaking Changes
if: steps.validate-schemas.outputs.valid == 'true'
run: |
echo "=== CHECKING REST API FOR BREAKING CHANGES ==="
@ -529,6 +548,7 @@ jobs:
fi
- name: Check REST Metadata API Breaking Changes
if: steps.validate-schemas.outputs.valid == 'true'
run: |
echo "=== CHECKING REST METADATA API FOR BREAKING CHANGES ==="

View file

@ -1,28 +1,29 @@
import { AGENT_CHAT_ENSURE_THREAD_FOR_DRAFT_EVENT_NAME } from '@/ai/constants/AgentChatEnsureThreadForDraftEventName';
import { AGENT_CHAT_REFETCH_MESSAGES_EVENT_NAME } from '@/ai/constants/AgentChatRefetchMessagesEventName';
import { useAgentChat } from '@/ai/hooks/useAgentChat';
import { useCreateAgentChatThread } from '@/ai/hooks/useCreateAgentChatThread';
import { useEnsureAgentChatThreadExistsForDraft } from '@/ai/hooks/useEnsureAgentChatThreadExistsForDraft';
import { useEnsureAgentChatThreadIdForSend } from '@/ai/hooks/useEnsureAgentChatThreadIdForSend';
import { agentChatDisplayedThreadState } from '@/ai/states/agentChatDisplayedThreadState';
import { agentChatErrorState } from '@/ai/states/agentChatErrorState';
import { agentChatFetchedMessagesComponentFamilyState } from '@/ai/states/agentChatFetchedMessagesComponentFamilyState';
import { agentChatIsInitialScrollPendingOnThreadChangeState } from '@/ai/states/agentChatIsInitialScrollPendingOnThreadChangeState';
import { agentChatIsLoadingState } from '@/ai/states/agentChatIsLoadingState';
import { agentChatIsStreamingState } from '@/ai/states/agentChatIsStreamingState';
import { normalizeAiSdkError } from '@/ai/utils/normalizeAiSdkError';
import { agentChatMessagesComponentFamilyState } from '@/ai/states/agentChatMessagesComponentFamilyState';
import { agentChatMessagesLoadingState } from '@/ai/states/agentChatMessagesLoadingState';
import { agentChatThreadsLoadingState } from '@/ai/states/agentChatThreadsLoadingState';
import { agentChatDisplayedThreadState } from '@/ai/states/agentChatDisplayedThreadState';
import { agentChatFetchedMessagesComponentFamilyState } from '@/ai/states/agentChatFetchedMessagesComponentFamilyState';
import { agentChatIsInitialScrollPendingOnThreadChangeState } from '@/ai/states/agentChatIsInitialScrollPendingOnThreadChangeState';
import { mergeAgentChatFetchedAndStreamingMessages } from '@/ai/utils/mergeAgentChatFetchedAndStreamingMessages';
import { AGENT_CHAT_REFETCH_MESSAGES_EVENT_NAME } from '@/ai/constants/AgentChatRefetchMessagesEventName';
import { dispatchBrowserEvent } from '@/browser-event/utils/dispatchBrowserEvent';
import { currentAIChatThreadState } from '@/ai/states/currentAIChatThreadState';
import { mergeAgentChatFetchedAndStreamingMessages } from '@/ai/utils/mergeAgentChatFetchedAndStreamingMessages';
import { normalizeAiSdkError } from '@/ai/utils/normalizeAiSdkError';
import { useListenToBrowserEvent } from '@/browser-event/hooks/useListenToBrowserEvent';
import { dispatchBrowserEvent } from '@/browser-event/utils/dispatchBrowserEvent';
import { useAtomComponentFamilyStateValue } from '@/ui/utilities/state/jotai/hooks/useAtomComponentFamilyStateValue';
import { useAtomStateValue } from '@/ui/utilities/state/jotai/hooks/useAtomStateValue';
import { useSetAtomComponentFamilyState } from '@/ui/utilities/state/jotai/hooks/useSetAtomComponentFamilyState';
import { useSetAtomState } from '@/ui/utilities/state/jotai/hooks/useSetAtomState';
import { useCallback, useEffect } from 'react';
import { isValidUuid } from 'twenty-shared/utils';
export const AgentChatAiSdkStreamEffect = () => {
const currentAIChatThread = useAtomStateValue(currentAIChatThreadState);
@ -54,6 +55,32 @@ export const AgentChatAiSdkStreamEffect = () => {
onStreamingComplete,
);
// Attempt to resume an active stream when navigating to an existing
// thread. We call resumeStream() manually instead of using useChat's
// resume:true option so that the stop button can coexist with
// resumption (resume:true is incompatible with abort signals).
// Only resume when the thread already has fetched messages — this
// avoids resuming on newly created threads where the thread ID
// transitions from a placeholder to a real UUID mid-conversation.
useEffect(() => {
if (
currentAIChatThread === null ||
!isValidUuid(currentAIChatThread) ||
agentChatFetchedMessages.length === 0 ||
chatState.status === 'streaming' ||
chatState.status === 'submitted'
) {
return;
}
chatState.resumeStream();
// We intentionally omit chatState.resumeStream and status from deps
// to avoid resume loops. We do include agentChatFetchedMessages.length
// so that resume fires once messages are fetched (they may arrive
// after the thread ID is set).
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [currentAIChatThread, agentChatFetchedMessages.length]);
const setAgentChatMessages = useSetAtomComponentFamilyState(
agentChatMessagesComponentFamilyState,
{ threadId: currentAIChatThread },
@ -76,6 +103,7 @@ export const AgentChatAiSdkStreamEffect = () => {
agentChatFetchedMessages,
chatState.messages,
);
setAgentChatMessages(mergedMessages);
if (currentAIChatThread !== agentChatDisplayedThread) {
@ -87,7 +115,6 @@ export const AgentChatAiSdkStreamEffect = () => {
}, [
agentChatFetchedMessages,
chatState.messages,
chatState.status,
setAgentChatMessages,
currentAIChatThread,
agentChatDisplayedThread,

View file

@ -4,7 +4,7 @@ import { useSetAtomState } from '@/ui/utilities/state/jotai/hooks/useSetAtomStat
import { styled } from '@linaria/react';
import { t } from '@lingui/core/macro';
import React, { useRef } from 'react';
import { IconPlus } from 'twenty-ui/display';
import { IconPaperclip } from 'twenty-ui/display';
import { IconButton } from 'twenty-ui/input';
import { themeCssVariables } from 'twenty-ui/theme-constants';
@ -52,7 +52,7 @@ export const AgentChatFileUploadButton = () => {
onClick={() => {
fileInputRef.current?.click();
}}
Icon={IconPlus}
Icon={IconPaperclip}
ariaLabel={t`Attach files`}
/>
</StyledFileUploadContainer>

View file

@ -2,7 +2,6 @@ import type { MessageDescriptor } from '@lingui/core';
import { msg } from '@lingui/core/macro';
import {
type IconComponent,
IconJetpack,
IconLayoutDashboard,
IconPlus,
IconSettingsAutomation,
@ -16,14 +15,6 @@ export type SuggestedPrompt = {
};
export const DEFAULT_SUGGESTED_PROMPTS: SuggestedPrompt[] = [
{
id: 'demo-workspace',
label: msg`Setup a tailor-made workspace`,
Icon: IconJetpack,
prefillPrompts: [
msg`Seed a demo workspace using the dedicated tool, ask me questions about my activity and goals to tailor the seeded data to my needs, and then help me explore the workspace and data you created.`,
],
},
{
id: 'dashboard',
label: msg`Create a dashboard`,

View file

@ -27,10 +27,11 @@ import { useSetAtomState } from '@/ui/utilities/state/jotai/hooks/useSetAtomStat
import { useChat } from '@ai-sdk/react';
import { DefaultChatTransport } from 'ai';
import { useStore } from 'jotai';
import { useCallback, useState } from 'react';
import { useCallback, useMemo, useState } from 'react';
import { type ExtendedUIMessage } from 'twenty-shared/ai';
import { isDefined } from 'twenty-shared/utils';
import { isDefined, isValidUuid } from 'twenty-shared/utils';
import { REACT_APP_SERVER_BASE_URL } from '~/config';
export const useAgentChat = (
uiMessages: ExtendedUIMessage[],
ensureThreadIdForSend: () => Promise<string | null>,
@ -107,40 +108,65 @@ export const useAgentChat = (
}
};
const { sendMessage, messages, status, error, regenerate, stop } = useChat({
transport: new DefaultChatTransport({
api: `${REST_API_BASE_URL}/agent-chat/stream`,
headers: () => ({
Authorization: `Bearer ${getTokenPair()?.accessOrWorkspaceAgnosticToken.token}`,
}),
fetch: async (input, init) => {
const response = await fetch(input, init);
// eslint-disable-next-line react-hooks/exhaustive-deps
const transport = useMemo(
() =>
new DefaultChatTransport({
api: `${REST_API_BASE_URL}/agent-chat/stream`,
headers: () => ({
Authorization: `Bearer ${getTokenPair()?.accessOrWorkspaceAgnosticToken.token}`,
}),
prepareReconnectToStreamRequest: ({ id }) => ({
api: `${REST_API_BASE_URL}/agent-chat/${id}/stream`,
headers: {
Authorization: `Bearer ${getTokenPair()?.accessOrWorkspaceAgnosticToken.token}`,
},
}),
fetch: async (input, init) => {
const response = await fetch(input, init);
if (response.status === 401) {
const retriedResponse = await retryFetchWithRenewedToken(input, init);
if (response.status === 401) {
const retriedResponse = await retryFetchWithRenewedToken(
input,
init,
);
return retriedResponse ?? response;
}
// For non-2xx responses, parse the error body and throw with the code
if (!response.ok) {
const errorBody = await response.json().catch(() => ({}));
const error = new Error(
errorBody.messages?.[0] ||
`Request failed with status ${response.status}`,
) as Error & { code?: string };
if (isDefined(errorBody.code)) {
error.code = errorBody.code;
return retriedResponse ?? response;
}
throw error;
}
return response;
},
}),
if (!response.ok) {
const errorBody = await response.json().catch(() => ({}));
const error = new Error(
errorBody.messages?.[0] ||
`Request failed with status ${response.status}`,
) as Error & { code?: string };
if (isDefined(errorBody.code)) {
error.code = errorBody.code;
}
throw error;
}
return response;
},
}),
// Intentionally created once — closures inside (getTokenPair, etc.)
// read fresh values via function references, not stale captures.
[],
);
const {
sendMessage,
messages,
status,
error,
regenerate,
stop,
resumeStream,
} = useChat({
transport,
messages: uiMessages,
id: `${currentAIChatThread}-${uiMessages.length}`,
id: currentAIChatThread ?? undefined,
experimental_throttle: 100,
onFinish: ({ message }) => {
type UsageMetadata = {
@ -183,7 +209,8 @@ export const useAgentChat = (
);
setPendingThreadIdAfterFirstSend((pendingId) => {
const threadIdForTitle = pendingId ?? currentAIChatThread;
const threadIdForTitle =
pendingId ?? store.get(currentAIChatThreadState.atom);
if (isDefined(titlePart) && titlePart.type === 'data-thread-title') {
setCurrentAIChatThreadTitle(titlePart.data.title);
if (isDefined(threadIdForTitle)) {
@ -283,9 +310,32 @@ export const useAgentChat = (
onBrowserEvent: handleSendMessage,
});
const handleStop = useCallback(async () => {
stop();
const threadId = store.get(currentAIChatThreadState.atom);
if (!isDefined(threadId) || !isValidUuid(threadId)) {
return;
}
const tokenPair = getTokenPair();
if (!isDefined(tokenPair)) {
return;
}
fetch(`${REST_API_BASE_URL}/agent-chat/${threadId}/stream`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${tokenPair.accessOrWorkspaceAgnosticToken.token}`,
},
}).catch(() => {});
}, [stop, store]);
useListenToBrowserEvent({
eventName: AGENT_CHAT_STOP_EVENT_NAME,
onBrowserEvent: stop,
onBrowserEvent: handleStop,
});
useListenToBrowserEvent({
@ -296,7 +346,8 @@ export const useAgentChat = (
return {
messages,
handleSendMessage,
handleStop: stop,
handleStop,
resumeStream,
isLoading,
error,
status,

View file

@ -1,16 +1,15 @@
import { type ExtendedUIMessage } from 'twenty-shared/ai';
// The SDK's messages array is always [...seedMessages, ...newMessages]
// where seedMessages are the fetchedMessages we passed to useChat.
// SDK-generated IDs differ from server-persisted IDs, so comparing
// by ID doesn't work. Instead, slice by position: everything beyond
// the fetched count is a new streaming message.
export const mergeAgentChatFetchedAndStreamingMessages = (
fetchedMessages: ExtendedUIMessage[],
streamingMessages: ExtendedUIMessage[],
): ExtendedUIMessage[] => {
const fetchedMessageIds = new Set(
fetchedMessages.map((message) => message.id),
);
const newStreamingMessages = streamingMessages.slice(fetchedMessages.length);
const streamingOnlyMessages = streamingMessages.filter(
(message) => !fetchedMessageIds.has(message.id),
);
return [...fetchedMessages, ...streamingOnlyMessages];
return [...fetchedMessages, ...newStreamingMessages];
};

View file

@ -168,6 +168,7 @@
"react-dom": "18.3.1",
"redis": "^4.7.0",
"reflect-metadata": "0.2.2",
"resumable-stream": "^2.2.12",
"rxjs": "7.8.1",
"semver": "7.6.3",
"sharp": "0.32.6",

View file

@ -0,0 +1,19 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class AddActiveStreamIdToAgentChatThread1774003611071
implements MigrationInterface
{
name = 'AddActiveStreamIdToAgentChatThread1774003611071';
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "core"."agentChatThread" ADD "activeStreamId" character varying`,
);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "core"."agentChatThread" DROP COLUMN "activeStreamId"`,
);
}
}

View file

@ -20,6 +20,7 @@ import { HandleWorkspaceMemberDeletedJob } from 'src/engine/core-modules/workspa
import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
import { AiAgentMonitorModule } from 'src/engine/metadata-modules/ai/ai-agent-monitor/ai-agent-monitor.module';
import { AiChatModule } from 'src/engine/metadata-modules/ai/ai-chat/ai-chat.module';
import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-source.module';
import { LogicFunctionModule } from 'src/engine/metadata-modules/logic-function/logic-function.module';
import { NavigationMenuItemModule } from 'src/engine/metadata-modules/navigation-menu-item/navigation-menu-item.module';
@ -67,6 +68,7 @@ import { WorkflowModule } from 'src/modules/workflow/workflow.module';
SubscriptionsModule,
AuditJobModule,
AiAgentMonitorModule,
AiChatModule,
LogicFunctionModule,
EnterpriseModule,
],

View file

@ -17,4 +17,5 @@ export const MESSAGE_QUEUE_PRIORITY = {
[MessageQueue.deleteCascadeQueue]: 6,
[MessageQueue.cronQueue]: 7,
[MessageQueue.aiQueue]: 5,
[MessageQueue.aiStreamQueue]: 2,
};

View file

@ -0,0 +1,7 @@
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
export const QUEUE_WORKER_OPTIONS: Partial<
Record<MessageQueue, { concurrency: number }>
> = {
[MessageQueue.aiStreamQueue]: { concurrency: 20 },
};

View file

@ -19,4 +19,5 @@ export enum MessageQueue {
logicFunctionQueue = 'logic-function-queue',
triggerQueue = 'trigger-queue',
aiQueue = 'ai-queue',
aiStreamQueue = 'ai-stream-queue',
}

View file

@ -18,6 +18,8 @@ import { type MessageQueueWorkerOptions } from 'src/engine/core-modules/message-
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
import { MessageQueueMetadataAccessor } from 'src/engine/core-modules/message-queue/message-queue-metadata.accessor';
import { type MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { type MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { QUEUE_WORKER_OPTIONS } from 'src/engine/core-modules/message-queue/message-queue-worker-options.constant';
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
import { shouldCaptureException } from 'src/engine/utils/global-exception-handler.util';
@ -67,6 +69,7 @@ export class MessageQueueExplorer implements OnModuleInit {
this.handleProcessorGroupCollection(
processorGroupCollection,
messageQueueService,
QUEUE_WORKER_OPTIONS[queueName as MessageQueue],
);
}
}

View file

@ -19,6 +19,7 @@ import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.mod
import { ToolProviderModule } from 'src/engine/core-modules/tool-provider/tool-provider.module';
import { UserWorkspaceEntity } from 'src/engine/core-modules/user-workspace/user-workspace.entity';
import { UserWorkspaceModule } from 'src/engine/core-modules/user-workspace/user-workspace.module';
import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
import { FeatureFlagGuard } from 'src/engine/guards/feature-flag.guard';
import { SettingsPermissionGuard } from 'src/engine/guards/settings-permission.guard';
import { WorkspaceAuthGuard } from 'src/engine/guards/workspace-auth.guard';
@ -35,7 +36,10 @@ import { WorkflowToolsModule } from 'src/modules/workflow/workflow-tools/workflo
import { AgentChatController } from './controllers/agent-chat.controller';
import { AgentChatThreadDTO } from './dtos/agent-chat-thread.dto';
import { AgentChatThreadEntity } from './entities/agent-chat-thread.entity';
import { StreamAgentChatJob } from './jobs/stream-agent-chat.job';
import { AgentChatResolver } from './resolvers/agent-chat.resolver';
import { AgentChatCancelSubscriberService } from './services/agent-chat-cancel-subscriber.service';
import { AgentChatResumableStreamService } from './services/agent-chat-resumable-stream.service';
import { AgentChatStreamingService } from './services/agent-chat-streaming.service';
import { AgentChatService } from './services/agent-chat.service';
import { AgentTitleGenerationService } from './services/agent-title-generation.service';
@ -48,6 +52,7 @@ import { SystemPromptBuilderService } from './services/system-prompt-builder.ser
AgentChatThreadEntity,
FileEntity,
UserWorkspaceEntity,
WorkspaceEntity,
]),
NestjsQueryGraphQLModule.forFeature({
imports: [
@ -98,11 +103,14 @@ import { SystemPromptBuilderService } from './services/system-prompt-builder.ser
],
controllers: [AgentChatController],
providers: [
AgentChatCancelSubscriberService,
AgentChatResolver,
AgentChatResumableStreamService,
AgentChatService,
AgentChatStreamingService,
AgentTitleGenerationService,
ChatExecutionService,
StreamAgentChatJob,
SystemPromptBuilderService,
],
exports: [

View file

@ -1,6 +1,9 @@
import {
Body,
Controller,
Delete,
Get,
Param,
Post,
Res,
UseFilters,
@ -9,8 +12,12 @@ import {
import { PermissionFlagType } from 'twenty-shared/constants';
import { InjectRepository } from '@nestjs/typeorm';
import { UI_MESSAGE_STREAM_HEADERS } from 'ai';
import type { Response } from 'express';
import type { ExtendedUIMessage } from 'twenty-shared/ai';
import { isDefined } from 'twenty-shared/utils';
import type { Repository } from 'typeorm';
import { RestApiExceptionFilter } from 'src/engine/api/rest/rest-api-exception.filter';
import {
@ -20,6 +27,7 @@ import {
import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum';
import { BillingRestApiExceptionFilter } from 'src/engine/core-modules/billing/filters/billing-api-exception.filter';
import { BillingService } from 'src/engine/core-modules/billing/services/billing.service';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
import type { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
import { AuthUserWorkspaceId } from 'src/engine/decorators/auth/auth-user-workspace-id.decorator';
@ -33,6 +41,9 @@ import {
} from 'src/engine/metadata-modules/ai/ai-agent/agent.exception';
import { AgentRestApiExceptionFilter } from 'src/engine/metadata-modules/ai/ai-agent/filters/agent-api-exception.filter';
import type { BrowsingContextType } from 'src/engine/metadata-modules/ai/ai-agent/types/browsingContext.type';
import { AgentChatThreadEntity } from 'src/engine/metadata-modules/ai/ai-chat/entities/agent-chat-thread.entity';
import { getCancelChannel } from 'src/engine/metadata-modules/ai/ai-chat/utils/get-cancel-channel.util';
import { AgentChatResumableStreamService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat-resumable-stream.service';
import { AgentChatStreamingService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service';
import { AiModelRegistryService } from 'src/engine/metadata-modules/ai/ai-models/services/ai-model-registry.service';
@ -46,9 +57,13 @@ import { AiModelRegistryService } from 'src/engine/metadata-modules/ai/ai-models
export class AgentChatController {
constructor(
private readonly agentStreamingService: AgentChatStreamingService,
private readonly resumableStreamService: AgentChatResumableStreamService,
private readonly billingService: BillingService,
private readonly twentyConfigService: TwentyConfigService,
private readonly aiModelRegistryService: AiModelRegistryService,
private readonly redisClientService: RedisClientService,
@InjectRepository(AgentChatThreadEntity)
private readonly threadRepository: Repository<AgentChatThreadEntity>,
) {}
@Post('stream')
@ -93,7 +108,7 @@ export class AgentChatController {
}
}
this.agentStreamingService.streamAgentChat({
return this.agentStreamingService.streamAgentChat({
threadId: body.threadId,
messages: body.messages,
browsingContext: body.browsingContext ?? null,
@ -103,4 +118,66 @@ export class AgentChatController {
response,
});
}
@Get(':threadId/stream')
@UseGuards(SettingsPermissionGuard(PermissionFlagType.AI))
async resumeAgentChatStream(
@Param('threadId') threadId: string,
@AuthUserWorkspaceId() userWorkspaceId: string,
@Res() response: Response,
) {
const thread = await this.threadRepository.findOne({
where: { id: threadId, userWorkspaceId },
});
if (!isDefined(thread) || !isDefined(thread.activeStreamId)) {
response.status(204).end();
return;
}
const resumedNodeReadable =
await this.resumableStreamService.resumeExistingStreamAsNodeReadable(
thread.activeStreamId,
);
if (!isDefined(resumedNodeReadable)) {
response.status(204).end();
return;
}
response.writeHead(200, UI_MESSAGE_STREAM_HEADERS);
resumedNodeReadable.pipe(response);
}
@Delete(':threadId/stream')
@UseGuards(SettingsPermissionGuard(PermissionFlagType.AI))
async stopAgentChatStream(
@Param('threadId') threadId: string,
@AuthUserWorkspaceId() userWorkspaceId: string,
) {
const thread = await this.threadRepository.findOne({
where: { id: threadId, userWorkspaceId },
});
if (!isDefined(thread) || !isDefined(thread.activeStreamId)) {
return { success: true };
}
// Publish a cancel signal via Redis pub/sub. The BullMQ worker
// processing this thread's stream subscribes to this channel and
// will abort the LLM connection when the message arrives — stopping
// token generation and billing immediately.
const redis = this.redisClientService.getClient();
await redis.publish(getCancelChannel(threadId), 'cancel');
await this.threadRepository.update(
{ id: threadId, userWorkspaceId },
{ activeStreamId: null },
);
return { success: true };
}
}

View file

@ -51,6 +51,9 @@ export class AgentChatThreadEntity {
@Column({ type: 'bigint', default: 0 })
totalOutputCredits: number;
@Column({ type: 'varchar', nullable: true })
activeStreamId: string | null;
@OneToMany(() => AgentTurnEntity, (turn) => turn.thread)
turns: EntityRelation<AgentTurnEntity[]>;

View file

@ -0,0 +1,387 @@
import { Logger, Scope } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { createUIMessageStream, JsonToSseTransformStream } from 'ai';
import type {
CodeExecutionData,
ExtendedUIMessage,
ExtendedUIMessagePart,
} from 'twenty-shared/ai';
import { Repository } from 'typeorm';
import { AgentChatCancelSubscriberService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat-cancel-subscriber.service';
import { toDisplayCredits } from 'src/engine/core-modules/usage/utils/to-display-credits.util';
import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { AgentMessageRole } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message.entity';
import type { BrowsingContextType } from 'src/engine/metadata-modules/ai/ai-agent/types/browsingContext.type';
import { computeCostBreakdown } from 'src/engine/metadata-modules/ai/ai-billing/utils/compute-cost-breakdown.util';
import { convertDollarsToBillingCredits } from 'src/engine/metadata-modules/ai/ai-billing/utils/convert-dollars-to-billing-credits.util';
import { extractCacheCreationTokens } from 'src/engine/metadata-modules/ai/ai-billing/utils/extract-cache-creation-tokens.util';
import type { AIModelConfig } from 'src/engine/metadata-modules/ai/ai-models/types/ai-model-config.type';
import { AgentChatThreadEntity } from 'src/engine/metadata-modules/ai/ai-chat/entities/agent-chat-thread.entity';
import { AgentChatResumableStreamService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat-resumable-stream.service';
import { AgentChatService } from 'src/engine/metadata-modules/ai/ai-chat/services/agent-chat.service';
import { ChatExecutionService } from 'src/engine/metadata-modules/ai/ai-chat/services/chat-execution.service';
import { getCancelChannel } from 'src/engine/metadata-modules/ai/ai-chat/utils/get-cancel-channel.util';
export const STREAM_AGENT_CHAT_JOB_NAME = 'StreamAgentChatJob';
export type StreamAgentChatJobData = {
threadId: string;
streamId: string;
userWorkspaceId: string;
workspaceId: string;
messages: ExtendedUIMessage[];
browsingContext: BrowsingContextType | null;
modelId?: string;
lastUserMessageText: string;
lastUserMessageParts: ExtendedUIMessagePart[];
hasTitle: boolean;
};
@Processor({ queueName: MessageQueue.aiStreamQueue, scope: Scope.REQUEST })
export class StreamAgentChatJob {
private readonly logger = new Logger(StreamAgentChatJob.name);
constructor(
@InjectRepository(AgentChatThreadEntity)
private readonly threadRepository: Repository<AgentChatThreadEntity>,
@InjectRepository(WorkspaceEntity)
private readonly workspaceRepository: Repository<WorkspaceEntity>,
private readonly agentChatService: AgentChatService,
private readonly chatExecutionService: ChatExecutionService,
private readonly resumableStreamService: AgentChatResumableStreamService,
private readonly cancelSubscriberService: AgentChatCancelSubscriberService,
) {}
@Process(STREAM_AGENT_CHAT_JOB_NAME)
async handle(data: StreamAgentChatJobData): Promise<void> {
const workspace = await this.workspaceRepository.findOne({
where: { id: data.workspaceId },
});
if (!workspace) {
this.logger.error(`Workspace ${data.workspaceId} not found`);
await this.resumableStreamService.writeStreamError(data.streamId, {
code: 'WORKSPACE_NOT_FOUND',
message: `Workspace ${data.workspaceId} not found`,
});
return;
}
const abortController = new AbortController();
const cancelChannel = getCancelChannel(data.threadId);
await this.cancelSubscriberService.subscribe(cancelChannel, () => {
abortController.abort();
});
try {
await this.executeStream(data, workspace, abortController.signal);
} catch (error) {
this.logger.error(
`Stream ${data.streamId} failed: ${error instanceof Error ? error.message : String(error)}`,
);
await this.resumableStreamService
.writeStreamError(data.streamId, {
code: 'STREAM_EXECUTION_FAILED',
message:
error instanceof Error ? error.message : 'Stream execution failed',
})
.catch(() => {});
} finally {
await this.cancelSubscriberService.unsubscribe(cancelChannel);
await this.threadRepository
.createQueryBuilder()
.update(AgentChatThreadEntity)
.set({ activeStreamId: null })
.where('id = :id AND "activeStreamId" = :streamId', {
id: data.threadId,
streamId: data.streamId,
})
.execute()
.catch(() => {});
}
}
private async executeStream(
data: StreamAgentChatJobData,
workspace: WorkspaceEntity,
abortSignal: AbortSignal,
): Promise<void> {
const userMessagePromise = this.agentChatService.addMessage({
threadId: data.threadId,
uiMessage: {
role: AgentMessageRole.USER,
parts: data.lastUserMessageParts.filter(
(part): part is ExtendedUIMessagePart =>
part.type === 'text' || part.type === 'file',
),
},
});
userMessagePromise.catch(() => {});
const titlePromise = data.hasTitle
? Promise.resolve(null)
: this.agentChatService
.generateTitleIfNeeded(data.threadId, data.lastUserMessageText)
.catch(() => null);
await this.buildAndPipeStream({
workspace,
data,
userMessagePromise,
titlePromise,
abortSignal,
});
}
private async buildAndPipeStream({
workspace,
data,
userMessagePromise,
titlePromise,
abortSignal,
}: {
workspace: WorkspaceEntity;
data: StreamAgentChatJobData;
userMessagePromise: Promise<{ turnId: string }>;
titlePromise: Promise<string | null>;
abortSignal: AbortSignal;
}): Promise<void> {
return new Promise<void>((resolve, reject) => {
let streamUsage = {
inputTokens: 0,
outputTokens: 0,
inputCredits: 0,
outputCredits: 0,
};
let lastStepConversationSize = 0;
let totalCacheCreationTokens = 0;
abortSignal.addEventListener('abort', () => resolve(), { once: true });
const uiStream = createUIMessageStream<ExtendedUIMessage>({
execute: async ({ writer }) => {
const onCodeExecutionUpdate = (
codeExecutionData: CodeExecutionData,
) => {
writer.write({
type: 'data-code-execution' as const,
id: `code-execution-${codeExecutionData.executionId}`,
data: codeExecutionData,
});
};
const { stream, modelConfig } =
await this.chatExecutionService.streamChat({
workspace,
userWorkspaceId: data.userWorkspaceId,
messages: data.messages,
browsingContext: data.browsingContext,
modelId: data.modelId,
onCodeExecutionUpdate,
abortSignal,
});
const titleWritePromise = titlePromise.then((generatedTitle) => {
if (generatedTitle) {
writer.write({
type: 'data-thread-title' as const,
id: `thread-title-${data.threadId}`,
data: { title: generatedTitle },
});
}
});
writer.merge(
stream.toUIMessageStream({
onError: (error) => {
return error instanceof Error ? error.message : String(error);
},
sendStart: false,
messageMetadata: ({ part }) => {
return this.computeMessageMetadata({
part,
modelConfig,
lastStepConversationSize,
totalCacheCreationTokens,
onUpdateUsage: (usage) => {
streamUsage = usage;
},
onUpdateConversationSize: (size) => {
lastStepConversationSize = size;
},
onUpdateCacheCreationTokens: (tokens) => {
totalCacheCreationTokens = tokens;
},
});
},
onFinish: async ({ responseMessage }) => {
try {
await this.handleStreamFinish({
responseMessage,
threadId: data.threadId,
streamUsage,
lastStepConversationSize,
modelConfig,
userMessagePromise,
});
await titleWritePromise;
resolve();
} catch (error) {
reject(error);
}
},
sendReasoning: true,
}),
);
},
});
const sseStream = uiStream.pipeThrough(new JsonToSseTransformStream());
this.resumableStreamService
.createResumableStream(data.streamId, () => sseStream)
.catch(reject);
});
}
private computeMessageMetadata({
part,
modelConfig,
lastStepConversationSize,
totalCacheCreationTokens,
onUpdateUsage,
onUpdateConversationSize,
onUpdateCacheCreationTokens,
}: {
part: {
type: string;
usage?: {
inputTokens?: number;
inputTokenDetails?: { cacheReadTokens?: number };
};
totalUsage?: {
inputTokens?: number;
outputTokens?: number;
inputTokenDetails?: { cacheReadTokens?: number };
outputTokenDetails?: { reasoningTokens?: number };
};
providerMetadata?: Record<string, Record<string, unknown> | undefined>;
};
modelConfig: AIModelConfig;
lastStepConversationSize: number;
totalCacheCreationTokens: number;
onUpdateUsage: (usage: {
inputTokens: number;
outputTokens: number;
inputCredits: number;
outputCredits: number;
}) => void;
onUpdateConversationSize: (size: number) => void;
onUpdateCacheCreationTokens: (tokens: number) => void;
}) {
if (part.type === 'finish-step') {
const stepInput = part.usage?.inputTokens ?? 0;
const stepCached = part.usage?.inputTokenDetails?.cacheReadTokens ?? 0;
const stepCacheCreation = extractCacheCreationTokens(
part.providerMetadata,
);
onUpdateCacheCreationTokens(totalCacheCreationTokens + stepCacheCreation);
onUpdateConversationSize(stepInput + stepCached + stepCacheCreation);
}
if (part.type === 'finish') {
const breakdown = computeCostBreakdown(modelConfig, {
inputTokens: part.totalUsage?.inputTokens,
outputTokens: part.totalUsage?.outputTokens,
cachedInputTokens: part.totalUsage?.inputTokenDetails?.cacheReadTokens,
reasoningTokens: part.totalUsage?.outputTokenDetails?.reasoningTokens,
cacheCreationTokens: totalCacheCreationTokens,
});
const inputCredits = Math.round(
convertDollarsToBillingCredits(breakdown.inputCostInDollars),
);
const outputCredits = Math.round(
convertDollarsToBillingCredits(breakdown.outputCostInDollars),
);
onUpdateUsage({
inputTokens: breakdown.tokenCounts.totalInputTokens,
outputTokens: part.totalUsage?.outputTokens ?? 0,
inputCredits,
outputCredits,
});
return {
createdAt: new Date().toISOString(),
usage: {
inputTokens: breakdown.tokenCounts.totalInputTokens,
outputTokens: part.totalUsage?.outputTokens ?? 0,
cachedInputTokens: breakdown.tokenCounts.cachedInputTokens,
inputCredits: toDisplayCredits(inputCredits),
outputCredits: toDisplayCredits(outputCredits),
conversationSize: lastStepConversationSize,
},
model: {
contextWindowTokens: modelConfig.contextWindowTokens,
},
};
}
return undefined;
}
private async handleStreamFinish({
responseMessage,
threadId,
streamUsage,
lastStepConversationSize,
modelConfig,
userMessagePromise,
}: {
responseMessage: Omit<ExtendedUIMessage, 'id'>;
threadId: string;
streamUsage: {
inputTokens: number;
outputTokens: number;
inputCredits: number;
outputCredits: number;
};
lastStepConversationSize: number;
modelConfig: AIModelConfig;
userMessagePromise: Promise<{ turnId: string }>;
}): Promise<void> {
if (responseMessage.parts.length === 0) {
return;
}
const userMessage = await userMessagePromise;
await this.agentChatService.addMessage({
threadId,
uiMessage: responseMessage,
turnId: userMessage.turnId,
});
await this.threadRepository.update(threadId, {
totalInputTokens: () => `"totalInputTokens" + ${streamUsage.inputTokens}`,
totalOutputTokens: () =>
`"totalOutputTokens" + ${streamUsage.outputTokens}`,
totalInputCredits: () =>
`"totalInputCredits" + ${streamUsage.inputCredits}`,
totalOutputCredits: () =>
`"totalOutputCredits" + ${streamUsage.outputCredits}`,
contextWindowTokens: modelConfig.contextWindowTokens,
conversationSize: lastStepConversationSize,
});
}
}

View file

@ -0,0 +1,54 @@
import { Injectable, Logger, type OnModuleDestroy } from '@nestjs/common';
import type { Redis } from 'ioredis';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
// Single shared Redis subscriber connection per process for AI stream
// cancellation. Multiplexes all cancel channels onto one connection
// so we use exactly 1 Redis connection regardless of how many
// concurrent streams are running.
@Injectable()
export class AgentChatCancelSubscriberService implements OnModuleDestroy {
private readonly logger = new Logger(AgentChatCancelSubscriberService.name);
private subscriber: Redis | null = null;
private readonly callbacks = new Map<string, () => void>();
constructor(private readonly redisClientService: RedisClientService) {}
private ensureSubscriber(): Redis {
if (!this.subscriber) {
this.subscriber = this.redisClientService.getClient().duplicate();
this.subscriber.on('message', (channel: string) => {
const callback = this.callbacks.get(channel);
if (callback) {
callback();
this.callbacks.delete(channel);
this.subscriber?.unsubscribe(channel).catch(() => {});
}
});
}
return this.subscriber;
}
async subscribe(channel: string, onCancel: () => void): Promise<void> {
this.callbacks.set(channel, onCancel);
await this.ensureSubscriber().subscribe(channel);
}
async unsubscribe(channel: string): Promise<void> {
this.callbacks.delete(channel);
await this.subscriber?.unsubscribe(channel).catch(() => {});
}
async onModuleDestroy(): Promise<void> {
if (this.subscriber) {
await this.subscriber.quit().catch(() => {});
this.subscriber = null;
}
this.callbacks.clear();
}
}

View file

@ -0,0 +1,104 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { Readable } from 'stream';
import { type ReadableStream as NodeWebReadableStream } from 'stream/web';
import type { Redis } from 'ioredis';
import { createResumableStreamContext } from 'resumable-stream/ioredis';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
@Injectable()
export class AgentChatResumableStreamService implements OnModuleDestroy {
private streamContext: ReturnType<typeof createResumableStreamContext>;
private streamPublisher: Redis;
private streamSubscriber: Redis;
private redisClient: Redis;
constructor(private readonly redisClientService: RedisClientService) {
const baseClient = this.redisClientService.getClient();
this.streamPublisher = baseClient.duplicate();
this.streamSubscriber = baseClient.duplicate();
this.redisClient = baseClient.duplicate();
this.streamContext = createResumableStreamContext({
waitUntil: () => {},
publisher: this.streamPublisher,
subscriber: this.streamSubscriber,
});
}
async onModuleDestroy() {
await this.streamPublisher.quit();
await this.streamSubscriber.quit();
await this.redisClient.quit();
}
async createResumableStream(
streamId: string,
streamFactory: () => ReadableStream<string>,
) {
const resumableStream = await this.streamContext.createNewResumableStream(
streamId,
streamFactory,
);
if (!resumableStream) {
return;
}
// Read the stream to completion in the background so chunks are
// published to Redis and available for later resume consumers.
const reader = resumableStream.getReader();
void (async () => {
try {
while (true) {
const { done } = await reader.read();
if (done) {
break;
}
}
} catch {
// Stream interrupted — chunks already published are still resumable.
}
})();
}
async resumeExistingStreamAsNodeReadable(
streamId: string,
): Promise<Readable | null> {
const webStream = await this.streamContext.resumeExistingStream(streamId);
if (!webStream) {
return null;
}
return Readable.fromWeb(webStream as NodeWebReadableStream);
}
async writeStreamError(
streamId: string,
error: { code: string; message: string },
): Promise<void> {
await this.redisClient.set(
`ai-stream:error:${streamId}`,
JSON.stringify(error),
'EX',
60,
);
}
async readStreamError(
streamId: string,
): Promise<{ code: string; message: string } | null> {
const raw = await this.redisClient.get(`ai-stream:error:${streamId}`);
if (!raw) {
return null;
}
return JSON.parse(raw);
}
}

View file

@ -1,30 +1,28 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { type Readable } from 'stream';
import { createUIMessageStream, pipeUIMessageStreamToResponse } from 'ai';
import { generateId, UI_MESSAGE_STREAM_HEADERS } from 'ai';
import { type Response } from 'express';
import {
type CodeExecutionData,
type ExtendedUIMessage,
} from 'twenty-shared/ai';
import { type ExtendedUIMessage } from 'twenty-shared/ai';
import { type Repository } from 'typeorm';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { type WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
import { AgentMessageRole } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message.entity';
import {
AgentException,
AgentExceptionCode,
} from 'src/engine/metadata-modules/ai/ai-agent/agent.exception';
import { type BrowsingContextType } from 'src/engine/metadata-modules/ai/ai-agent/types/browsingContext.type';
import { computeCostBreakdown } from 'src/engine/metadata-modules/ai/ai-billing/utils/compute-cost-breakdown.util';
import { convertDollarsToBillingCredits } from 'src/engine/metadata-modules/ai/ai-billing/utils/convert-dollars-to-billing-credits.util';
import { extractCacheCreationTokens } from 'src/engine/metadata-modules/ai/ai-billing/utils/extract-cache-creation-tokens.util';
import { toDisplayCredits } from 'src/engine/core-modules/usage/utils/to-display-credits.util';
import { type AIModelConfig } from 'src/engine/metadata-modules/ai/ai-models/types/ai-model-config.type';
import { AgentChatThreadEntity } from 'src/engine/metadata-modules/ai/ai-chat/entities/agent-chat-thread.entity';
import {
STREAM_AGENT_CHAT_JOB_NAME,
type StreamAgentChatJobData,
} from 'src/engine/metadata-modules/ai/ai-chat/jobs/stream-agent-chat.job';
import { AgentChatService } from './agent-chat.service';
import { ChatExecutionService } from './chat-execution.service';
import { AgentChatResumableStreamService } from './agent-chat-resumable-stream.service';
export type StreamAgentChatOptions = {
threadId: string;
@ -36,13 +34,19 @@ export type StreamAgentChatOptions = {
modelId?: string;
};
const STREAM_READY_TIMEOUT_MS = 5_000;
const STREAM_READY_POLL_INTERVAL_MS = 50;
@Injectable()
export class AgentChatStreamingService {
private readonly logger = new Logger(AgentChatStreamingService.name);
constructor(
@InjectRepository(AgentChatThreadEntity)
private readonly threadRepository: Repository<AgentChatThreadEntity>,
private readonly agentChatService: AgentChatService,
private readonly chatExecutionService: ChatExecutionService,
@InjectMessageQueue(MessageQueue.aiStreamQueue)
private readonly messageQueueService: MessageQueueService,
private readonly resumableStreamService: AgentChatResumableStreamService,
) {}
async streamAgentChat({
@ -68,215 +72,92 @@ export class AgentChatStreamingService {
);
}
// Fire user-message save without awaiting to avoid delaying time-to-first-letter.
// The promise is awaited inside onFinish where we need the turnId.
const streamId = generateId();
const lastUserMessage = messages[messages.length - 1];
const lastUserText =
lastUserMessage?.parts.find((part) => part.type === 'text')?.text ?? '';
const userMessagePromise = this.agentChatService.addMessage({
threadId: thread.id,
uiMessage: {
role: AgentMessageRole.USER,
parts:
lastUserMessage?.parts.filter(
(part) => part.type === 'text' || part.type === 'file',
) ?? [],
await this.messageQueueService.add<StreamAgentChatJobData>(
STREAM_AGENT_CHAT_JOB_NAME,
{
threadId: thread.id,
streamId,
userWorkspaceId,
workspaceId: workspace.id,
messages,
browsingContext,
modelId,
lastUserMessageText: lastUserText,
lastUserMessageParts: lastUserMessage?.parts ?? [],
hasTitle: !!thread.title,
},
);
await this.threadRepository.update(thread.id, {
activeStreamId: streamId,
});
// Prevent unhandled rejection if onFinish never runs (e.g. stream
// setup error or empty response early-return). The real error still
// surfaces when awaited in onFinish.
userMessagePromise.catch(() => {});
// Title generation runs in parallel with AI streaming so it's
// typically ready by the time onFinish fires
const titlePromise = thread.title
? Promise.resolve(null)
: this.agentChatService
.generateTitleIfNeeded(thread.id, lastUserText)
.catch(() => null);
try {
const uiStream = createUIMessageStream<ExtendedUIMessage>({
execute: async ({ writer }) => {
const onCodeExecutionUpdate = (data: CodeExecutionData) => {
writer.write({
type: 'data-code-execution' as const,
id: `code-execution-${data.executionId}`,
data,
});
};
const result = await this.waitForResumableStream(streamId);
const { stream, modelConfig } =
await this.chatExecutionService.streamChat({
workspace,
userWorkspaceId,
messages,
browsingContext,
onCodeExecutionUpdate,
modelId,
});
if ('error' in result) {
response.status(500).json(result.error);
let streamUsage = {
inputTokens: 0,
outputTokens: 0,
inputCredits: 0,
outputCredits: 0,
};
let lastStepConversationSize = 0;
let totalCacheCreationTokens = 0;
return;
}
writer.merge(
stream.toUIMessageStream({
onError: (error) => {
return error instanceof Error ? error.message : String(error);
},
sendStart: false,
messageMetadata: ({ part }) => {
if (part.type === 'finish-step') {
const stepInput = part.usage?.inputTokens ?? 0;
const stepCached =
part.usage?.inputTokenDetails?.cacheReadTokens ?? 0;
const stepCacheCreation = extractCacheCreationTokens(
(
part as {
providerMetadata?: Record<
string,
Record<string, unknown> | undefined
>;
}
).providerMetadata,
);
if (!result.readable) {
this.logger.error(
`Stream ${streamId} did not become available within timeout`,
);
response
.status(500)
.json({ code: 'WORKER_UNREACHABLE', message: 'Stream timed out' });
totalCacheCreationTokens += stepCacheCreation;
lastStepConversationSize =
stepInput + stepCached + stepCacheCreation;
}
return;
}
if (part.type === 'finish') {
const { inputCredits, outputCredits, tokenCounts } =
computeStreamCosts(
modelConfig,
part.totalUsage,
totalCacheCreationTokens,
);
streamUsage = {
inputTokens: tokenCounts.totalInputTokens,
outputTokens: tokenCounts.outputTokens,
inputCredits,
outputCredits,
};
return {
createdAt: new Date().toISOString(),
usage: {
inputTokens: tokenCounts.totalInputTokens,
outputTokens: tokenCounts.outputTokens,
cachedInputTokens: tokenCounts.cachedInputTokens,
inputCredits: toDisplayCredits(inputCredits),
outputCredits: toDisplayCredits(outputCredits),
conversationSize: lastStepConversationSize,
},
model: {
contextWindowTokens: modelConfig.contextWindowTokens,
},
};
}
return undefined;
},
onFinish: async ({ responseMessage }) => {
if (responseMessage.parts.length === 0) {
return;
}
const userMessage = await userMessagePromise;
await this.agentChatService.addMessage({
threadId: thread.id,
uiMessage: responseMessage,
turnId: userMessage.turnId,
});
await this.threadRepository.update(thread.id, {
totalInputTokens: () =>
`"totalInputTokens" + ${streamUsage.inputTokens}`,
totalOutputTokens: () =>
`"totalOutputTokens" + ${streamUsage.outputTokens}`,
totalInputCredits: () =>
`"totalInputCredits" + ${streamUsage.inputCredits}`,
totalOutputCredits: () =>
`"totalOutputCredits" + ${streamUsage.outputCredits}`,
contextWindowTokens: modelConfig.contextWindowTokens,
conversationSize: lastStepConversationSize,
});
const generatedTitle = await titlePromise;
if (generatedTitle) {
writer.write({
type: 'data-thread-title' as const,
id: `thread-title-${thread.id}`,
data: { title: generatedTitle },
});
}
},
sendReasoning: true,
}),
);
},
});
pipeUIMessageStreamToResponse({
stream: uiStream,
response,
// Consume the stream independently so onFinish fires even if
// the client disconnects (e.g., page refresh mid-stream)
consumeSseStream: ({ stream }) => {
stream.pipeTo(new WritableStream()).catch(() => {});
},
});
response.writeHead(200, UI_MESSAGE_STREAM_HEADERS);
result.readable.pipe(response);
} catch (error) {
response.end();
throw error;
}
}
}
function computeStreamCosts(
modelConfig: AIModelConfig,
totalUsage:
| {
inputTokens?: number;
outputTokens?: number;
inputTokenDetails?: { cacheReadTokens?: number };
outputTokenDetails?: { reasoningTokens?: number };
private async waitForResumableStream(
streamId: string,
): Promise<
| { readable: Readable }
| { error: { code: string; message: string } }
| { readable: null }
> {
const maxAttempts = Math.ceil(
STREAM_READY_TIMEOUT_MS / STREAM_READY_POLL_INTERVAL_MS,
);
for (let attempt = 0; attempt < maxAttempts; attempt++) {
const streamError =
await this.resumableStreamService.readStreamError(streamId);
if (streamError) {
return { error: streamError };
}
| undefined,
cacheCreationTokens: number,
) {
const breakdown = computeCostBreakdown(modelConfig, {
inputTokens: totalUsage?.inputTokens,
outputTokens: totalUsage?.outputTokens,
cachedInputTokens: totalUsage?.inputTokenDetails?.cacheReadTokens,
reasoningTokens: totalUsage?.outputTokenDetails?.reasoningTokens,
cacheCreationTokens,
});
return {
inputCredits: Math.round(
convertDollarsToBillingCredits(breakdown.inputCostInDollars),
),
outputCredits: Math.round(
convertDollarsToBillingCredits(breakdown.outputCostInDollars),
),
tokenCounts: {
totalInputTokens: breakdown.tokenCounts.totalInputTokens,
outputTokens: totalUsage?.outputTokens ?? 0,
cachedInputTokens: breakdown.tokenCounts.cachedInputTokens,
},
};
const readable =
await this.resumableStreamService.resumeExistingStreamAsNodeReadable(
streamId,
);
if (readable) {
return { readable };
}
await new Promise((resolve) =>
setTimeout(resolve, STREAM_READY_POLL_INTERVAL_MS),
);
}
return { readable: null };
}
}

View file

@ -2,7 +2,9 @@ import { Injectable, Logger } from '@nestjs/common';
import {
convertToModelMessages,
type LanguageModelUsage,
stepCountIs,
type StepResult,
streamText,
type SystemModelMessage,
type ToolSet,
@ -63,6 +65,7 @@ export type ChatExecutionOptions = {
browsingContext: BrowsingContextType | null;
onCodeExecutionUpdate?: CodeExecutionStreamEmitter;
modelId?: string;
abortSignal?: AbortSignal;
};
export type ChatExecutionResult = {
@ -93,6 +96,7 @@ export class ChatExecutionService {
browsingContext,
onCodeExecutionUpdate,
modelId,
abortSignal,
}: ChatExecutionOptions): Promise<ChatExecutionResult> {
const { actorContext, roleId, userId, userContext } =
await this.agentActorContextService.buildUserAndAgentActorContext(
@ -223,12 +227,68 @@ export class ChatExecutionService {
const modelMessages = await convertToModelMessages(processedMessages);
const billUsageFromSteps = (steps: StepResult<ToolSet>[]) => {
const usage = steps.reduce<LanguageModelUsage>(
(acc, step) => ({
inputTokens: (acc.inputTokens ?? 0) + (step.usage.inputTokens ?? 0),
outputTokens:
(acc.outputTokens ?? 0) + (step.usage.outputTokens ?? 0),
totalTokens: (acc.totalTokens ?? 0) + (step.usage.totalTokens ?? 0),
inputTokenDetails: {
noCacheTokens:
(acc.inputTokenDetails?.noCacheTokens ?? 0) +
(step.usage.inputTokenDetails?.noCacheTokens ?? 0),
cacheReadTokens:
(acc.inputTokenDetails?.cacheReadTokens ?? 0) +
(step.usage.inputTokenDetails?.cacheReadTokens ?? 0),
cacheWriteTokens:
(acc.inputTokenDetails?.cacheWriteTokens ?? 0) +
(step.usage.inputTokenDetails?.cacheWriteTokens ?? 0),
},
outputTokenDetails: {
textTokens:
(acc.outputTokenDetails?.textTokens ?? 0) +
(step.usage.outputTokenDetails?.textTokens ?? 0),
reasoningTokens:
(acc.outputTokenDetails?.reasoningTokens ?? 0) +
(step.usage.outputTokenDetails?.reasoningTokens ?? 0),
},
}),
{
inputTokens: 0,
outputTokens: 0,
totalTokens: 0,
inputTokenDetails: {
noCacheTokens: 0,
cacheReadTokens: 0,
cacheWriteTokens: 0,
},
outputTokenDetails: { textTokens: 0, reasoningTokens: 0 },
},
);
const cacheCreationTokens = extractCacheCreationTokensFromSteps(steps);
this.aiBillingService.calculateAndBillUsage(
registeredModel.modelId,
{ usage, cacheCreationTokens },
workspace.id,
UsageOperationType.AI_CHAT_TOKEN,
null,
userWorkspaceId,
);
};
const stream = streamText({
model: registeredModel.model,
messages: [systemMessage, ...modelMessages],
tools: activeTools,
abortSignal,
stopWhen: stepCountIs(AGENT_CONFIG.MAX_STEPS),
experimental_telemetry: AI_TELEMETRY_CONFIG,
onAbort: ({ steps }) => {
billUsageFromSteps(steps);
},
experimental_repairToolCall: async ({
toolCall,
tools: toolsForRepair,
@ -246,19 +306,13 @@ export class ChatExecutionService {
});
Promise.all([stream.usage, stream.steps])
.then(([usage, steps]) => {
const cacheCreationTokens = extractCacheCreationTokensFromSteps(steps);
this.aiBillingService.calculateAndBillUsage(
registeredModel.modelId,
{ usage, cacheCreationTokens },
workspace.id,
UsageOperationType.AI_CHAT_TOKEN,
null,
userWorkspaceId,
);
.then(([, steps]) => {
billUsageFromSteps(steps);
})
.catch((error) => {
if (error?.name === 'AbortError') {
return;
}
this.exceptionHandlerService.captureExceptions([error]);
});

View file

@ -0,0 +1,2 @@
export const getCancelChannel = (threadId: string) =>
`ai-stream:cancel:${threadId}`;

View file

@ -55145,6 +55145,13 @@ __metadata:
languageName: node
linkType: hard
"resumable-stream@npm:^2.2.12":
version: 2.2.12
resolution: "resumable-stream@npm:2.2.12"
checksum: 10c0/6fa3ddb31d7d88e720c8313532799e7e913dec31890fd1132d4a324718fe55eeb82782c4a571c3be3f6ce8e0396253eb5b775983f035a6a88d01a886154420c2
languageName: node
linkType: hard
"retext-latin@npm:^4.0.0":
version: 4.0.0
resolution: "retext-latin@npm:4.0.0"
@ -59932,6 +59939,7 @@ __metadata:
react-dom: "npm:18.3.1"
redis: "npm:^4.7.0"
reflect-metadata: "npm:0.2.2"
resumable-stream: "npm:^2.2.12"
rimraf: "npm:^5.0.5"
rxjs: "npm:7.8.1"
semver: "npm:7.6.3"