From ccbb75da06bd8272842170821277731fa146e36d Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Sun, 19 Apr 2026 16:19:18 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor(hetero-agent):=20?= =?UTF-8?q?persist=20per-step=20usage=20to=20each=20step=20assistant=20mes?= =?UTF-8?q?sage=20(#13964)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ♻️ refactor(hetero-agent): persist per-step usage to each step assistant message Previously, usage tokens from a multi-step Claude Code run were accumulated across all turns and written only to the final assistant message, leaving intermediate step messages with no usage metadata. Each Claude Code `turn_metadata` event carries per-turn token usage (deduped by adapter per message.id), so write it straight through to the current step's assistant message via persistQueue (runs after any in-flight stream_start(newStep) that swaps currentAssistantMessageId). The `result_usage` grand-total event is intentionally dropped — applying it would overwrite the last step with the sum of all prior steps. Co-Authored-By: Claude Opus 4.7 (1M context) * ♻️ refactor(hetero-agent): normalize usage inside CC adapter (UsageData) Follows the same principle as LOBE-7363: provider-native shape knowledge stays in the adapter, executor only sees normalized events. The previous commit left Anthropic-shape fields (input_tokens, cache_creation_input_tokens, cache_read_input_tokens) leaking into the executor via `buildUsageMetadata`. Introduce `UsageData` in `@lobechat/heterogeneous-agents` types with LobeHub's MessageMetadata.usage field names. The Claude Code adapter now normalizes Anthropic usage into `UsageData` before emitting step_complete, for both turn_metadata (per-turn) and result_usage (grand total). Executor drops `buildUsageMetadata` and writes `{ metadata: { usage: event.data.usage } }` directly. Future adapters (Codex, Kimi-CLI) normalize their native usage into the same shape; executor stays provider-agnostic. Co-Authored-By: Claude Opus 4.7 (1M context) * ♻️ refactor(hetero-agent): persist per-step provider alongside model CC / hetero-agent assistant messages were writing `model` per step but leaving `message.provider` NULL, so pricing/usage lookups could not key on the adapter (e.g. `claude-code`, billed via CLI subscription rather than raw Anthropic API rates). CC adapter now emits `provider: 'claude-code'` on every turn_metadata event (same collection point as model + normalized usage). Executor tracks `lastProvider` alongside `lastModel` and writes it into: - the step-boundary update for the previous step - `createMessage` for each new step's assistant - the onComplete write for the final step Provider choice is the CLI flavor (what the adapter knows), not the wrapped model's native vendor — CC runs under its own subscription billing, so downstream pricing must treat `claude-code` as its own provider rather than conflating with `anthropic`. Co-Authored-By: Claude Opus 4.7 (1M context) * 🐛 fix(hetero-agent): read authoritative usage from message_delta, not assistant Under `--include-partial-messages` (enabled by the CC adapter preset), Claude Code echoes a STALE usage snapshot from `message_start` on every content-block `assistant` event — e.g. `output_tokens: 8` or `1` — and never updates that snapshot as more output tokens are generated. The authoritative per-turn total arrives on a separate `stream_event: message_delta` with the final `input_tokens` + cache counts + cumulative `output_tokens` (e.g. 265). The adapter previously grabbed usage from the first `assistant` event per message.id and deduped, so DB rows ended up with `totalOutputTokens: 1` on every CC turn. Move turn_metadata emission from `handleAssistant` to a new `message_delta` case in `handleStreamEvent`. `handleAssistant` still tracks the latest model so turn_metadata (emitted later on message_delta) carries the correct model even if `message_start` doesn't. Co-Authored-By: Claude Opus 4.7 (1M context) * 💄 style(extras-usage): fall back to metadata.usage when top-level is absent The assistant Extras bar passes `message.usage` to the Usage component, which conditionally renders a token-count badge on `!!usage.totalTokens`. Nothing in the read path aggregates `message.metadata.usage` up to `message.usage`, so the top-level field is always undefined for DB-read messages — the badge never shows for CC/hetero turns (and in practice also skips the gateway path where usage only lands in `metadata.usage`). Prefer `usage` when the top-level field is populated, fall back to `metadata.usage` otherwise. Both fields are the same `ModelUsage` shape, so the Usage/TokenDetail components don't need any other change. Co-Authored-By: Claude Opus 4.7 (1M context) * ♻️ refactor(extras-usage): promote metadata.usage inside conversation-flow parse The previous fix spread a `usage ?? metadata?.usage` fallback across each renderer site that passed usage to the Extras bar. Consolidate: `parse` (src/store → packages/conversation-flow) is the single renderer-side transform every consumer flows through, so promote `metadata.usage` onto the top-level `usage` field there and revert the per-site fallbacks. UIChatMessage exposes a canonical `usage` field, but no server-side or client-side transform populated it — executors write to `metadata.usage` (canonical storage, JSONB-friendly). Doing the promotion in parse keeps the rule in one place, close to where display shapes are built, and covers both desktop (local PGlite) and web (remote Postgres) without a backend deploy. Top-level `usage` is preserved when already present (e.g. group-level aggregates) — `metadata.usage` is strictly a fallback. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- .../src/__tests__/parse.test.ts | 57 +++++++ packages/conversation-flow/src/parse.ts | 20 ++- .../src/adapters/claudeCode.e2e.test.ts | 14 +- .../src/adapters/claudeCode.test.ts | 109 ++++++++++-- .../src/adapters/claudeCode.ts | 91 +++++++--- packages/heterogeneous-agents/src/types.ts | 42 +++++ .../heterogeneousAgentExecutor.test.ts | 161 ++++++++++++++---- .../actions/heterogeneousAgentExecutor.ts | 87 ++++------ 8 files changed, 459 insertions(+), 122 deletions(-) diff --git a/packages/conversation-flow/src/__tests__/parse.test.ts b/packages/conversation-flow/src/__tests__/parse.test.ts index 8464487a48..0bd1f2ca40 100644 --- a/packages/conversation-flow/src/__tests__/parse.test.ts +++ b/packages/conversation-flow/src/__tests__/parse.test.ts @@ -418,6 +418,63 @@ describe('parse', () => { }); }); + describe('Usage promotion', () => { + it('should promote metadata.usage onto the top-level usage field', () => { + // UIChatMessage consumers (Extras token badge, tokenCounter) read from + // the top-level `usage` field, but executors only write to + // `metadata.usage`. `parse` is the single renderer-side transform that + // every read flows through, so it owns the promotion. + const usage = { + inputCacheMissTokens: 6, + inputCachedTokens: 16204, + inputWriteCacheTokens: 13964, + totalInputTokens: 30174, + totalOutputTokens: 265, + totalTokens: 30439, + }; + const input = [ + { + id: 'u1', + role: 'user' as const, + content: 'hi', + createdAt: 1, + }, + { + id: 'a1', + role: 'assistant' as const, + content: 'hello', + parentId: 'u1', + metadata: { usage }, + createdAt: 2, + }, + ]; + + const result = parse(input as any[]); + const assistant = result.flatList.find((m) => m.id === 'a1'); + expect(assistant?.usage).toEqual(usage); + }); + + it('should not overwrite an existing top-level usage', () => { + // If a message already carries a top-level `usage` (e.g. aggregated + // group-level total), we keep it — `metadata.usage` is only a fallback. + const topLevelUsage = { totalTokens: 999, totalInputTokens: 900, totalOutputTokens: 99 }; + const metaUsage = { totalTokens: 1, totalInputTokens: 1, totalOutputTokens: 0 }; + const input = [ + { + id: 'a1', + role: 'assistant' as const, + content: 'hi', + createdAt: 1, + usage: topLevelUsage, + metadata: { usage: metaUsage }, + }, + ]; + + const result = parse(input as any[]); + expect(result.flatList[0]?.usage).toEqual(topLevelUsage); + }); + }); + describe('Performance', () => { it('should parse 10000 items within 100ms', () => { // Generate 10000 messages as flat siblings (no deep nesting to avoid stack overflow) diff --git a/packages/conversation-flow/src/parse.ts b/packages/conversation-flow/src/parse.ts index d669e52655..719fd2c399 100644 --- a/packages/conversation-flow/src/parse.ts +++ b/packages/conversation-flow/src/parse.ts @@ -117,11 +117,25 @@ export function parse(messages: Message[], messageGroups?: MessageGroupMetadata[ // For non-grouped supervisor messages (e.g., supervisor summary without tools) // Note: sub_agent scope transformation is done in pre-processing phase (before buildHelperMaps) const processedFlatList = flatList.map((msg) => { + let next = msg; + // Transform supervisor messages - if (msg.role === 'assistant' && msg.metadata?.isSupervisor) { - return { ...msg, role: 'supervisor' as const }; + if (next.role === 'assistant' && next.metadata?.isSupervisor) { + next = { ...next, role: 'supervisor' as const }; } - return msg; + + // Promote `metadata.usage` (canonical storage) onto the top-level `usage` + // field that UIChatMessage consumers (Extras token badge, tokenCounter, + // etc.) read from. The DB layer stores token usage inside the metadata + // JSONB column — executors on every path (Gateway, hetero-agent CLI) write + // there — but no server-side transform lifts it out. Doing it here keeps + // the promotion in one place, close to where display shapes are built, + // and works for both desktop (local PGlite) and web (remote Postgres). + if (!next.usage && next.metadata?.usage) { + next = { ...next, usage: next.metadata.usage }; + } + + return next; }); return { diff --git a/packages/heterogeneous-agents/src/adapters/claudeCode.e2e.test.ts b/packages/heterogeneous-agents/src/adapters/claudeCode.e2e.test.ts index 249dff8c44..8b57de5e6c 100644 --- a/packages/heterogeneous-agents/src/adapters/claudeCode.e2e.test.ts +++ b/packages/heterogeneous-agents/src/adapters/claudeCode.e2e.test.ts @@ -182,11 +182,21 @@ describe('ClaudeCodeAdapter E2E', () => { // 2 boundaries: msg_01 → msg_02, msg_02 → msg_03 expect(newStepStarts.length).toBe(2); - // 5. Should have usage metadata events + // 5. turn_metadata is now emitted on `stream_event: message_delta`, not on + // `assistant` events (CC echoes a stale message_start usage on every + // content block). This simplified fixture has no stream_event records, so + // no turn_metadata fires here — a dedicated test in claudeCode.test.ts + // covers the message_delta flow. We still assert the result_usage summary + // lands at session end. const metaEvents = allEvents.filter( (e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata', ); - expect(metaEvents.length).toBeGreaterThanOrEqual(3); // At least one per assistant turn + expect(metaEvents.length).toBe(0); + const resultUsage = allEvents.filter( + (e) => e.type === 'step_complete' && e.data?.phase === 'result_usage', + ); + // No `result.usage` in this fixture, so none emitted either. + expect(resultUsage.length).toBe(0); // 6. Should end with stream_end + agent_runtime_end (from result) const lastTwo = types.slice(-2); diff --git a/packages/heterogeneous-agents/src/adapters/claudeCode.test.ts b/packages/heterogeneous-agents/src/adapters/claudeCode.test.ts index 4c3cc262b0..c8657d32a6 100644 --- a/packages/heterogeneous-agents/src/adapters/claudeCode.test.ts +++ b/packages/heterogeneous-agents/src/adapters/claudeCode.test.ts @@ -269,7 +269,12 @@ describe('ClaudeCodeAdapter', () => { }); describe('usage and model extraction', () => { - it('emits step_complete with turn_metadata when message has model and usage', () => { + // Under `--include-partial-messages` (our preset default), CC emits a + // stale `message_start.usage` snapshot (e.g. `output_tokens: 8`) that it + // echoes verbatim on every content-block `assistant` event. The + // authoritative per-turn total only arrives later as `message_delta`. + // So turn_metadata emission is wired to `message_delta`, not `assistant`. + it('does NOT emit turn_metadata on assistant events (usage there is stale)', () => { const adapter = new ClaudeCodeAdapter(); adapter.adapt({ subtype: 'init', type: 'system' }); @@ -278,29 +283,69 @@ describe('ClaudeCodeAdapter', () => { id: 'msg_1', content: [{ text: 'hello', type: 'text' }], model: 'claude-sonnet-4-6', - usage: { input_tokens: 100, output_tokens: 50 }, + usage: { input_tokens: 100, output_tokens: 1 }, // stale placeholder }, type: 'assistant', }); + expect( + events.find((e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata'), + ).toBeUndefined(); + }); + + it('emits turn_metadata on message_delta with authoritative usage', () => { + const adapter = new ClaudeCodeAdapter(); + adapter.adapt({ subtype: 'init', type: 'system' }); + + // stream_event:message_start primes the current message id + model + adapter.adapt({ + event: { + message: { id: 'msg_1', model: 'claude-sonnet-4-6' }, + type: 'message_start', + }, + type: 'stream_event', + }); + + // message_delta carries the final per-turn usage + const events = adapter.adapt({ + event: { + type: 'message_delta', + usage: { input_tokens: 100, output_tokens: 50 }, + }, + type: 'stream_event', + }); + const meta = events.find( (e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata', ); expect(meta).toBeDefined(); expect(meta!.data.model).toBe('claude-sonnet-4-6'); - expect(meta!.data.usage.input_tokens).toBe(100); - expect(meta!.data.usage.output_tokens).toBe(50); + expect(meta!.data.provider).toBe('claude-code'); + expect(meta!.data.usage).toEqual({ + inputCacheMissTokens: 100, + inputCachedTokens: undefined, + inputWriteCacheTokens: undefined, + totalInputTokens: 100, + totalOutputTokens: 50, + totalTokens: 150, + }); }); - it('emits step_complete with cache token usage', () => { + it('normalizes cache creation and cache read from message_delta usage', () => { const adapter = new ClaudeCodeAdapter(); adapter.adapt({ subtype: 'init', type: 'system' }); + adapter.adapt({ + event: { + message: { id: 'msg_1', model: 'claude-sonnet-4-6' }, + type: 'message_start', + }, + type: 'stream_event', + }); + const events = adapter.adapt({ - message: { - id: 'msg_1', - content: [{ text: 'hi', type: 'text' }], - model: 'claude-sonnet-4-6', + event: { + type: 'message_delta', usage: { cache_creation_input_tokens: 200, cache_read_input_tokens: 300, @@ -308,14 +353,54 @@ describe('ClaudeCodeAdapter', () => { output_tokens: 50, }, }, - type: 'assistant', + type: 'stream_event', }); const meta = events.find( (e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata', ); - expect(meta!.data.usage.cache_creation_input_tokens).toBe(200); - expect(meta!.data.usage.cache_read_input_tokens).toBe(300); + expect(meta!.data.usage).toEqual({ + inputCacheMissTokens: 100, + inputCachedTokens: 300, + inputWriteCacheTokens: 200, + totalInputTokens: 600, + totalOutputTokens: 50, + totalTokens: 650, + }); + }); + + it('uses model from the latest assistant event when message_start lacks one', () => { + // Non-partial edge case: no message_start carries model, but assistant + // events always do. The adapter should still attach the right model. + const adapter = new ClaudeCodeAdapter(); + adapter.adapt({ subtype: 'init', type: 'system' }); + + adapter.adapt({ + event: { message: { id: 'msg_1' }, type: 'message_start' }, + type: 'stream_event', + }); + adapter.adapt({ + message: { + id: 'msg_1', + content: [{ text: 'hi', type: 'text' }], + model: 'claude-opus-4-7', + usage: { input_tokens: 1, output_tokens: 1 }, + }, + type: 'assistant', + }); + + const events = adapter.adapt({ + event: { + type: 'message_delta', + usage: { input_tokens: 10, output_tokens: 100 }, + }, + type: 'stream_event', + }); + + const meta = events.find( + (e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata', + ); + expect(meta!.data.model).toBe('claude-opus-4-7'); }); }); diff --git a/packages/heterogeneous-agents/src/adapters/claudeCode.ts b/packages/heterogeneous-agents/src/adapters/claudeCode.ts index c3a0286ca7..209c79c1e1 100644 --- a/packages/heterogeneous-agents/src/adapters/claudeCode.ts +++ b/packages/heterogeneous-agents/src/adapters/claudeCode.ts @@ -41,8 +41,43 @@ import type { StreamChunkData, ToolCallPayload, ToolResultData, + UsageData, } from '../types'; +/** + * Convert a raw Anthropic-shape usage object (per-turn or grand-total from + * Claude Code's `result` event) into the provider-agnostic `UsageData` shape. + * Returns undefined when no tokens were consumed, so callers can skip empty + * events without a null-check cascade. + */ +const toUsageData = ( + raw: + | { + cache_creation_input_tokens?: number; + cache_read_input_tokens?: number; + input_tokens?: number; + output_tokens?: number; + } + | null + | undefined, +): UsageData | undefined => { + if (!raw) return undefined; + const inputCacheMissTokens = raw.input_tokens || 0; + const inputCachedTokens = raw.cache_read_input_tokens || 0; + const inputWriteCacheTokens = raw.cache_creation_input_tokens || 0; + const totalInputTokens = inputCacheMissTokens + inputCachedTokens + inputWriteCacheTokens; + const totalOutputTokens = raw.output_tokens || 0; + if (totalInputTokens + totalOutputTokens === 0) return undefined; + return { + inputCacheMissTokens, + inputCachedTokens: inputCachedTokens || undefined, + inputWriteCacheTokens: inputWriteCacheTokens || undefined, + totalInputTokens, + totalOutputTokens, + totalTokens: totalInputTokens + totalOutputTokens, + }; +}; + // ─── CLI Preset ─── export const claudeCodePreset: AgentCLIPreset = { @@ -72,10 +107,14 @@ export class ClaudeCodeAdapter implements AgentEventAdapter { private stepIndex = 0; /** Track current message.id to detect step boundaries */ private currentMessageId: string | undefined; - /** Track which message.id has already emitted usage (dedup) */ - private usageEmittedForMessageId: string | undefined; /** message.id of the stream_event delta flow currently in flight */ private currentStreamEventMessageId: string | undefined; + /** + * Latest model seen for the in-flight message.id — captured from + * `message_start` (partial mode) or `assistant` events, emitted alongside + * authoritative usage on `message_delta`. + */ + private currentStreamEventModel: string | undefined; /** message.ids whose text has already been streamed as deltas — skip the full-block emission */ private messagesWithStreamedText = new Set(); /** message.ids whose thinking has already been streamed as deltas — skip the full-block emission */ @@ -146,20 +185,13 @@ export class ClaudeCodeAdapter implements AgentEventAdapter { events.push(...this.openMainMessage(messageId, raw.message?.model)); - // Per-turn model + usage snapshot — emitted as 'step_complete'-like - // metadata event so executor can track latest model and accumulated usage. - // DEDUP: same message.id carries identical usage on every content block - // (thinking, text, tool_use). Only emit once per message.id. - if ((raw.message?.model || raw.message?.usage) && messageId !== this.usageEmittedForMessageId) { - this.usageEmittedForMessageId = messageId; - events.push( - this.makeEvent('step_complete', { - model: raw.message?.model, - phase: 'turn_metadata', - usage: raw.message?.usage, - }), - ); - } + // Track the latest model — emitted alongside authoritative usage on the + // matching `message_delta`. We deliberately do NOT emit turn_metadata + // here: under `--include-partial-messages` (our default), every + // content-block `assistant` event echoes a STALE usage snapshot from + // `message_start` (e.g. `output_tokens: 8`); the per-turn total only + // arrives on `stream_event: message_delta`. + if (raw.message?.model) this.currentStreamEventModel = raw.message.model; // Each content array here is usually ONE block (thinking OR tool_use OR text) // but we handle multiple defensively. @@ -270,14 +302,18 @@ export class ClaudeCodeAdapter implements AgentEventAdapter { } private handleResult(raw: any): HeterogeneousAgentEvent[] { - // Emit authoritative usage from result event (overrides per-turn accumulation) + // Emit authoritative grand-total usage from CC's result event. The + // executor currently ignores this phase (it persists per-turn via + // turn_metadata), but we still emit it so other consumers — cost + // displays, logs — can read the normalized total. const events: HeterogeneousAgentEvent[] = []; - if (raw.usage) { + const usage = toUsageData(raw.usage); + if (usage) { events.push( this.makeEvent('step_complete', { costUsd: raw.total_cost_usd, phase: 'result_usage', - usage: raw.usage, + usage, }), ); } @@ -310,6 +346,7 @@ export class ClaudeCodeAdapter implements AgentEventAdapter { case 'message_start': { const msgId: string | undefined = event.message?.id; this.currentStreamEventMessageId = msgId; + if (event.message?.model) this.currentStreamEventModel = event.message.model; return this.openMainMessage(msgId, event.message?.model); } case 'content_block_delta': { @@ -326,6 +363,22 @@ export class ClaudeCodeAdapter implements AgentEventAdapter { } return []; } + case 'message_delta': { + // Authoritative per-turn usage. CC echoes stale message_start usage on + // every `assistant` event, so `handleAssistant` deliberately skips the + // emission and lets this branch own it. `message_delta.usage` carries + // the full final usage (input + cache + final output_tokens). + const usage = toUsageData(event.usage); + if (!usage) return []; + return [ + this.makeEvent('step_complete', { + model: this.currentStreamEventModel, + phase: 'turn_metadata', + provider: 'claude-code', + usage, + }), + ]; + } default: { return []; } diff --git a/packages/heterogeneous-agents/src/types.ts b/packages/heterogeneous-agents/src/types.ts index aecf94df3a..98243f531a 100644 --- a/packages/heterogeneous-agents/src/types.ts +++ b/packages/heterogeneous-agents/src/types.ts @@ -77,6 +77,48 @@ export interface ToolCallPayload { type: string; } +/** + * Normalized token usage for a single LLM call. Field names mirror LobeHub's + * `MessageMetadata.usage` so the executor can write this shape straight to + * `metadata.usage` with no further conversion. + * + * Each adapter is responsible for mapping its provider-native usage object + * (Anthropic `input_tokens` + cache split, OpenAI `prompt_tokens`, etc.) into + * this shape. Provider-specific shape knowledge does not leak past the adapter. + */ +export interface UsageData { + /** Input tokens served from the prompt cache (cache reads). */ + inputCachedTokens?: number; + /** Input tokens that missed the prompt cache (fresh prompt bytes). */ + inputCacheMissTokens: number; + /** Input tokens written into the prompt cache (cache creation). */ + inputWriteCacheTokens?: number; + totalInputTokens: number; + totalOutputTokens: number; + totalTokens: number; +} + +/** + * Data shape for `step_complete` events. `phase` disambiguates the subtype: + * - `turn_metadata`: per-turn snapshot of model + provider + usage (once per LLM call) + * - `result_usage`: authoritative grand total at the end of a session + */ +export interface StepCompleteData { + /** Total session cost in USD (only on `result_usage`, if the CLI reports it). */ + costUsd?: number; + /** Model id for this turn (only meaningful on `turn_metadata`). */ + model?: string; + phase: 'turn_metadata' | 'result_usage'; + /** + * Provider identifier for this turn — the CLI / adapter name (e.g. + * `claude-code`, `codex`), not the underlying LLM vendor. CLI-wrapped agents + * bill via their own subscription so downstream pricing logic keys on the + * CLI provider, not on the wrapped model's native vendor. + */ + provider?: string; + usage?: UsageData; +} + // ─── Adapter Interface ─── /** diff --git a/src/store/chat/slices/aiChat/actions/__tests__/heterogeneousAgentExecutor.test.ts b/src/store/chat/slices/aiChat/actions/__tests__/heterogeneousAgentExecutor.test.ts index 25ff34ac64..8d3d50986f 100644 --- a/src/store/chat/slices/aiChat/actions/__tests__/heterogeneousAgentExecutor.test.ts +++ b/src/store/chat/slices/aiChat/actions/__tests__/heterogeneousAgentExecutor.test.ts @@ -161,6 +161,31 @@ const ccResult = (isError = false, result = 'done') => ({ type: 'result', }); +/** + * `stream_event: message_start` — primes adapter's in-flight message.id so a + * following `message_delta` (which has no message.id of its own) can attach + * its authoritative usage to the correct turn. + */ +const ccMessageStart = (msgId: string, model = 'claude-sonnet-4-6') => ({ + event: { message: { id: msgId, model }, type: 'message_start' }, + type: 'stream_event', +}); + +/** + * `stream_event: message_delta` — the authoritative per-turn usage under + * `--include-partial-messages` (CC's `assistant` events only echo a stale + * message_start snapshot, so turn_metadata is driven off this event). + */ +const ccMessageDelta = (usage: { + cache_creation_input_tokens?: number; + cache_read_input_tokens?: number; + input_tokens?: number; + output_tokens?: number; +}) => ({ + event: { type: 'message_delta', usage }, + type: 'stream_event', +}); + // ─── Tests ─── describe('heterogeneousAgentExecutor DB persistence', () => { @@ -358,14 +383,21 @@ describe('heterogeneousAgentExecutor DB persistence', () => { await runWithEvents([ ccInit(), - // Step 1: tool_use Read + // Step 1: tool_use Read (message_start primes turn + model/provider + // so the executor can stamp step 2's createMessage with them) + ccMessageStart('msg_01'), ccToolUse('msg_01', 'toolu_1', 'Read', { file_path: '/a.ts' }), + ccMessageDelta({ input_tokens: 10, output_tokens: 5 }), ccToolResult('toolu_1', 'content of a.ts'), // Step 2 (new message.id): tool_use Write + ccMessageStart('msg_02'), ccToolUse('msg_02', 'toolu_2', 'Write', { file_path: '/b.ts', content: 'new' }), + ccMessageDelta({ input_tokens: 20, output_tokens: 10 }), ccToolResult('toolu_2', 'file written'), // Step 3 (new message.id): final text + ccMessageStart('msg_03'), ccText('msg_03', 'All done!'), + ccMessageDelta({ input_tokens: 30, output_tokens: 15 }), ccResult(), ]); @@ -384,6 +416,9 @@ describe('heterogeneousAgentExecutor DB persistence', () => { // The parentId should be the tool message ID from step 1 const tool1Id = createdIds.find((id) => id.startsWith('tool-')); expect(step2Assistant![0].parentId).toBe(tool1Id); + // createMessage should carry the adapter provider so step 2's assistant + // lands in DB with provider set from the start (no later backfill needed). + expect(step2Assistant![0].provider).toBe('claude-code'); }); it('should fall back to assistant parentId when step has no tools', async () => { @@ -414,28 +449,33 @@ describe('heterogeneousAgentExecutor DB persistence', () => { // ──────────────────────────────────────────────────── describe('final content writes (onComplete)', () => { - it('should write accumulated content + model to the final assistant message', async () => { + it('should write accumulated content + model + provider to the final assistant message', async () => { await runWithEvents([ ccInit(), + // message_start carries the model for this turn; individual assistant + // content-block events echo the same model, so the final write should + // stamp `claude-opus-4-6` (not the init-default sonnet). + ccMessageStart('msg_01', 'claude-opus-4-6'), ccAssistant('msg_01', [{ text: 'Hello ', type: 'text' }], { model: 'claude-opus-4-6', - usage: { input_tokens: 100, output_tokens: 10 }, }), ccAssistant('msg_01', [{ text: 'world!', type: 'text' }], { - usage: { input_tokens: 100, output_tokens: 20 }, + model: 'claude-opus-4-6', }), + // message_delta fires the authoritative turn_metadata (with model from + // the adapter's in-flight state) + ccMessageDelta({ input_tokens: 100, output_tokens: 20 }), ccResult(), ]); - // Final updateMessage should include accumulated content + model const finalWrite = mockUpdateMessage.mock.calls.find( ([id, val]: any) => id === 'ast-initial' && val.content === 'Hello world!', ); expect(finalWrite).toBeDefined(); - // lastModel is set from step_complete(turn_metadata). With usage dedup, - // only the FIRST event per message.id emits turn_metadata, so model stays - // as 'claude-opus-4-6' from the first event. expect(finalWrite![1].model).toBe('claude-opus-4-6'); + // provider is emitted by the CC adapter on turn_metadata so it rides + // along with the final content/model write. + expect(finalWrite![1].provider).toBe('claude-code'); }); it('should write accumulated reasoning', async () => { @@ -453,40 +493,95 @@ describe('heterogeneousAgentExecutor DB persistence', () => { expect(finalWrite![1].reasoning.content).toBe('Let me think about this.'); }); - it('should accumulate usage across turns into metadata', async () => { + it('should persist per-step usage to each step assistant message, not accumulated', async () => { + // Deterministic ids for new-step assistant messages so we can assert per-message usage. + let astStepCounter = 0; + mockCreateMessage.mockImplementation(async (params: any) => { + if (params.role === 'assistant') { + astStepCounter++; + return { id: `ast-step-${astStepCounter}` }; + } + return { id: `tool-${Date.now()}` }; + }); + + // Realistic CC partial-messages flow: message_start primes the turn, + // assistant events echo a stale usage, message_delta carries the final. await runWithEvents([ ccInit(), - ccAssistant('msg_01', [{ text: 'a', type: 'text' }], { - usage: { - cache_creation_input_tokens: 50, - cache_read_input_tokens: 200, - input_tokens: 100, - output_tokens: 50, - }, - }), + ccMessageStart('msg_01'), + ccAssistant('msg_01', [{ text: 'a', type: 'text' }]), ccToolUse('msg_01', 'toolu_1', 'Bash', {}), - ccToolResult('toolu_1', 'ok'), - ccAssistant('msg_02', [{ text: 'b', type: 'text' }], { - usage: { input_tokens: 300, output_tokens: 80 }, + ccMessageDelta({ + cache_creation_input_tokens: 50, + cache_read_input_tokens: 200, + input_tokens: 100, + output_tokens: 50, }), + ccToolResult('toolu_1', 'ok'), + ccMessageStart('msg_02'), + ccAssistant('msg_02', [{ text: 'b', type: 'text' }]), + ccMessageDelta({ input_tokens: 300, output_tokens: 80 }), ccResult(), ]); - // Find the final write that has usage metadata - const finalWrite = mockUpdateMessage.mock.calls.find( + const usageWrites = mockUpdateMessage.mock.calls.filter( ([, val]: any) => val.metadata?.usage?.totalTokens, ); - expect(finalWrite).toBeDefined(); - const usage = finalWrite![1].metadata.usage; - // 100 + 300 input + 200 cache_read + 50 cache_create = 650 input total - expect(usage.totalInputTokens).toBe(650); - // 50 + 80 = 130 output - expect(usage.totalOutputTokens).toBe(130); - expect(usage.totalTokens).toBe(780); - // Breakdown for pricing UI (must match anthropic usage converter shape) - expect(usage.inputCacheMissTokens).toBe(400); - expect(usage.inputCachedTokens).toBe(200); - expect(usage.inputWriteCacheTokens).toBe(50); + // One usage write per step (msg_01 → ast-initial, msg_02 → ast-step-1) + expect(usageWrites.length).toBe(2); + + const step1 = usageWrites.find(([id]: any) => id === 'ast-initial'); + expect(step1).toBeDefined(); + const u1 = step1![1].metadata.usage; + // msg_01: 100 input (miss) + 200 cached + 50 cache_create = 350; 50 output + expect(u1.totalInputTokens).toBe(350); + expect(u1.totalOutputTokens).toBe(50); + expect(u1.totalTokens).toBe(400); + expect(u1.inputCacheMissTokens).toBe(100); + expect(u1.inputCachedTokens).toBe(200); + expect(u1.inputWriteCacheTokens).toBe(50); + + const step2 = usageWrites.find(([id]: any) => id === 'ast-step-1'); + expect(step2).toBeDefined(); + const u2 = step2![1].metadata.usage; + // msg_02: 300 input (miss, no cache); 80 output + expect(u2.totalInputTokens).toBe(300); + expect(u2.totalOutputTokens).toBe(80); + expect(u2.totalTokens).toBe(380); + expect(u2.inputCacheMissTokens).toBe(300); + // No cache tokens for this turn — these fields should be absent + expect(u2.inputCachedTokens).toBeUndefined(); + expect(u2.inputWriteCacheTokens).toBeUndefined(); + }); + + it('should ignore stale usage on assistant events (from message_start echo)', async () => { + // Regression for LOBE-7258-style bug: under partial-messages mode, CC + // echoes a stale message_start usage (e.g. output_tokens: 1) on every + // content-block assistant event. If the adapter picked that up, the DB + // would record output_tokens=1 instead of the real total. This verifies + // the stale snapshot is ignored and only the message_delta total lands. + await runWithEvents([ + ccInit(), + ccMessageStart('msg_01'), + // All assistant events below carry the STALE placeholder usage + ccAssistant('msg_01', [{ text: 'hi', type: 'text' }], { + usage: { input_tokens: 6, output_tokens: 1 }, // stale + }), + ccAssistant('msg_01', [{ id: 'tu', input: {}, name: 'Read', type: 'tool_use' }], { + usage: { input_tokens: 6, output_tokens: 1 }, // stale echo + }), + // Authoritative final usage arrives on message_delta + ccMessageDelta({ input_tokens: 6, output_tokens: 265 }), + ccToolResult('tu', 'ok'), + ccResult(), + ]); + + const usageWrites = mockUpdateMessage.mock.calls.filter( + ([, val]: any) => val.metadata?.usage?.totalTokens, + ); + expect(usageWrites.length).toBe(1); + expect(usageWrites[0][1].metadata.usage.totalOutputTokens).toBe(265); // not 1 + expect(usageWrites[0][1].metadata.usage.totalInputTokens).toBe(6); }); }); diff --git a/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts b/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts index de7094e040..aab0230fd2 100644 --- a/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts +++ b/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts @@ -297,14 +297,10 @@ export const executeHeterogeneousAgent = async ( /** Content accumulators — reset on each new step */ let accumulatedContent = ''; let accumulatedReasoning = ''; - /** Extracted model + usage from each assistant event (used for final write) */ + /** Latest model string — updated per turn, written alongside content on step boundaries. */ let lastModel: string | undefined; - const accumulatedUsage: Record = { - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - input_tokens: 0, - output_tokens: 0, - }; + /** Adapter/CLI provider (e.g. `claude-code`) — carried on every turn_metadata. */ + let lastProvider: string | undefined; /** * Deferred terminal event (agent_runtime_end or error). We don't forward * these to the gateway handler immediately because handler triggers @@ -384,31 +380,32 @@ export const executeHeterogeneousAgent = async ( continue; } - // ─── step_complete with result_usage: authoritative total from CC result event ─── - if (event.type === 'step_complete' && event.data?.phase === 'result_usage') { - if (event.data.usage) { - // Override (not accumulate) — result event has the correct totals - accumulatedUsage.input_tokens = event.data.usage.input_tokens || 0; - accumulatedUsage.output_tokens = event.data.usage.output_tokens || 0; - accumulatedUsage.cache_creation_input_tokens = - event.data.usage.cache_creation_input_tokens || 0; - accumulatedUsage.cache_read_input_tokens = - event.data.usage.cache_read_input_tokens || 0; - } - continue; - } - - // ─── step_complete with turn_metadata: capture model + usage ─── + // ─── step_complete with turn_metadata: persist per-step usage ─── + // `turn_metadata.usage` is the per-turn delta (deduped by adapter per + // message.id) and already normalized to the MessageMetadata.usage + // shape — write it straight through to the current step's assistant + // message. Queue the write so it lands after any in-flight + // stream_start(newStep) that may still be swapping + // `currentAssistantMessageId` to the new step's message. + // + // `result_usage` (grand total across all turns) is intentionally + // ignored — applying it would overwrite the last step with the sum + // of all prior steps. Sum of turn_metadata equals result_usage for + // a healthy run. if (event.type === 'step_complete' && event.data?.phase === 'turn_metadata') { if (event.data.model) lastModel = event.data.model; - if (event.data.usage) { - // Accumulate token usage across turns (deduped by adapter per message.id) - accumulatedUsage.input_tokens += event.data.usage.input_tokens || 0; - accumulatedUsage.output_tokens += event.data.usage.output_tokens || 0; - accumulatedUsage.cache_creation_input_tokens += - event.data.usage.cache_creation_input_tokens || 0; - accumulatedUsage.cache_read_input_tokens += - event.data.usage.cache_read_input_tokens || 0; + if (event.data.provider) lastProvider = event.data.provider; + const turnUsage = event.data.usage; + if (turnUsage) { + persistQueue = persistQueue.then(async () => { + await messageService + .updateMessage( + currentAssistantMessageId, + { metadata: { usage: turnUsage } }, + { agentId: context.agentId, topicId: context.topicId }, + ) + .catch(console.error); + }); } // Don't forward turn metadata — it's internal bookkeeping continue; @@ -423,6 +420,7 @@ export const executeHeterogeneousAgent = async ( const prevContent = accumulatedContent; const prevReasoning = accumulatedReasoning; const prevModel = lastModel; + const prevProvider = lastProvider; // Reset content accumulators synchronously so new-step chunks go to fresh state accumulatedContent = ''; @@ -440,6 +438,7 @@ export const executeHeterogeneousAgent = async ( if (prevContent) prevUpdate.content = prevContent; if (prevReasoning) prevUpdate.reasoning = { content: prevReasoning }; if (prevModel) prevUpdate.model = prevModel; + if (prevProvider) prevUpdate.provider = prevProvider; if (Object.keys(prevUpdate).length > 0) { await messageService .updateMessage(currentAssistantMessageId, prevUpdate, { @@ -462,6 +461,7 @@ export const executeHeterogeneousAgent = async ( content: '', model: lastModel, parentId: stepParentId, + provider: lastProvider, role: 'assistant', topicId: context.topicId ?? undefined, }); @@ -547,33 +547,14 @@ export const executeHeterogeneousAgent = async ( // Wait for all tool persistence to finish before writing final state await persistQueue.catch(console.error); - // Persist final content + reasoning + model + usage to the assistant message - // BEFORE the terminal event triggers fetchAndReplaceMessages. + // Persist final content + reasoning + model for the last step BEFORE the + // terminal event triggers fetchAndReplaceMessages. Usage for this step + // was already written per-turn via the turn_metadata branch. const updateValue: Record = {}; if (accumulatedContent) updateValue.content = accumulatedContent; if (accumulatedReasoning) updateValue.reasoning = { content: accumulatedReasoning }; if (lastModel) updateValue.model = lastModel; - const inputCacheMiss = accumulatedUsage.input_tokens; - const inputCached = accumulatedUsage.cache_read_input_tokens; - const inputWriteCache = accumulatedUsage.cache_creation_input_tokens; - const totalInputTokens = inputCacheMiss + inputCached + inputWriteCache; - const totalOutputTokens = accumulatedUsage.output_tokens; - - if (totalInputTokens + totalOutputTokens > 0) { - updateValue.metadata = { - // Use nested `usage` — the flat fields on MessageMetadata are deprecated. - // Shape mirrors the anthropic usage converter so CC CLI and Gateway turns - // render identically in pricing/usage UI. - usage: { - inputCacheMissTokens: inputCacheMiss, - inputCachedTokens: inputCached || undefined, - inputWriteCacheTokens: inputWriteCache || undefined, - totalInputTokens, - totalOutputTokens, - totalTokens: totalInputTokens + totalOutputTokens, - }, - }; - } + if (lastProvider) updateValue.provider = lastProvider; if (Object.keys(updateValue).length > 0) { await messageService