♻️ refactor(hetero-agent): persist per-step usage to each step assistant message (#13964)

* ♻️ refactor(hetero-agent): persist per-step usage to each step assistant message

Previously, usage tokens from a multi-step Claude Code run were accumulated
across all turns and written only to the final assistant message, leaving
intermediate step messages with no usage metadata.

Each Claude Code `turn_metadata` event carries per-turn token usage
(deduped by adapter per message.id), so write it straight through to the
current step's assistant message via persistQueue (runs after any in-flight
stream_start(newStep) that swaps currentAssistantMessageId). The `result_usage`
grand-total event is intentionally dropped — applying it would overwrite the
last step with the sum of all prior steps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* ♻️ refactor(hetero-agent): normalize usage inside CC adapter (UsageData)

Follows the same principle as LOBE-7363: provider-native shape knowledge
stays in the adapter, executor only sees normalized events. The previous
commit left Anthropic-shape fields (input_tokens, cache_creation_input_tokens,
cache_read_input_tokens) leaking into the executor via `buildUsageMetadata`.

Introduce `UsageData` in `@lobechat/heterogeneous-agents` types with LobeHub's
MessageMetadata.usage field names. The Claude Code adapter now normalizes
Anthropic usage into `UsageData` before emitting step_complete, for both
turn_metadata (per-turn) and result_usage (grand total). Executor drops
`buildUsageMetadata` and writes `{ metadata: { usage: event.data.usage } }`
directly. Future adapters (Codex, Kimi-CLI) normalize their native usage into
the same shape; executor stays provider-agnostic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* ♻️ refactor(hetero-agent): persist per-step provider alongside model

CC / hetero-agent assistant messages were writing `model` per step but
leaving `message.provider` NULL, so pricing/usage lookups could not key on
the adapter (e.g. `claude-code`, billed via CLI subscription rather than
raw Anthropic API rates).

CC adapter now emits `provider: 'claude-code'` on every turn_metadata event
(same collection point as model + normalized usage). Executor tracks
`lastProvider` alongside `lastModel` and writes it into:

- the step-boundary update for the previous step
- `createMessage` for each new step's assistant
- the onComplete write for the final step

Provider choice is the CLI flavor (what the adapter knows), not the wrapped
model's native vendor — CC runs under its own subscription billing, so
downstream pricing must treat `claude-code` as its own provider rather than
conflating with `anthropic`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(hetero-agent): read authoritative usage from message_delta, not assistant

Under `--include-partial-messages` (enabled by the CC adapter preset), Claude
Code echoes a STALE usage snapshot from `message_start` on every content-block
`assistant` event — e.g. `output_tokens: 8` or `1` — and never updates that
snapshot as more output tokens are generated. The authoritative per-turn
total arrives on a separate `stream_event: message_delta` with the final
`input_tokens` + cache counts + cumulative `output_tokens` (e.g. 265).

The adapter previously grabbed usage from the first `assistant` event per
message.id and deduped, so DB rows ended up with `totalOutputTokens: 1` on
every CC turn.

Move turn_metadata emission from `handleAssistant` to a new `message_delta`
case in `handleStreamEvent`. `handleAssistant` still tracks the latest model
so turn_metadata (emitted later on message_delta) carries the correct model
even if `message_start` doesn't.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 💄 style(extras-usage): fall back to metadata.usage when top-level is absent

The assistant Extras bar passes `message.usage` to the Usage component,
which conditionally renders a token-count badge on `!!usage.totalTokens`.
Nothing in the read path aggregates `message.metadata.usage` up to
`message.usage`, so the top-level field is always undefined for DB-read
messages — the badge never shows for CC/hetero turns (and in practice also
skips the gateway path where usage only lands in `metadata.usage`).

Prefer `usage` when the top-level field is populated, fall back to
`metadata.usage` otherwise. Both fields are the same `ModelUsage` shape, so
the Usage/TokenDetail components don't need any other change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* ♻️ refactor(extras-usage): promote metadata.usage inside conversation-flow parse

The previous fix spread a `usage ?? metadata?.usage` fallback across each
renderer site that passed usage to the Extras bar. Consolidate: `parse`
(src/store → packages/conversation-flow) is the single renderer-side
transform every consumer flows through, so promote `metadata.usage` onto the
top-level `usage` field there and revert the per-site fallbacks.

UIChatMessage exposes a canonical `usage` field, but no server-side or
client-side transform populated it — executors write to `metadata.usage`
(canonical storage, JSONB-friendly). Doing the promotion in parse keeps the
rule in one place, close to where display shapes are built, and covers both
desktop (local PGlite) and web (remote Postgres) without a backend deploy.

Top-level `usage` is preserved when already present (e.g. group-level
aggregates) — `metadata.usage` is strictly a fallback.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu 2026-04-19 16:19:18 +08:00 committed by GitHub
parent 2711aa9191
commit ccbb75da06
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 459 additions and 122 deletions

View file

@ -418,6 +418,63 @@ describe('parse', () => {
});
});
describe('Usage promotion', () => {
it('should promote metadata.usage onto the top-level usage field', () => {
// UIChatMessage consumers (Extras token badge, tokenCounter) read from
// the top-level `usage` field, but executors only write to
// `metadata.usage`. `parse` is the single renderer-side transform that
// every read flows through, so it owns the promotion.
const usage = {
inputCacheMissTokens: 6,
inputCachedTokens: 16204,
inputWriteCacheTokens: 13964,
totalInputTokens: 30174,
totalOutputTokens: 265,
totalTokens: 30439,
};
const input = [
{
id: 'u1',
role: 'user' as const,
content: 'hi',
createdAt: 1,
},
{
id: 'a1',
role: 'assistant' as const,
content: 'hello',
parentId: 'u1',
metadata: { usage },
createdAt: 2,
},
];
const result = parse(input as any[]);
const assistant = result.flatList.find((m) => m.id === 'a1');
expect(assistant?.usage).toEqual(usage);
});
it('should not overwrite an existing top-level usage', () => {
// If a message already carries a top-level `usage` (e.g. aggregated
// group-level total), we keep it — `metadata.usage` is only a fallback.
const topLevelUsage = { totalTokens: 999, totalInputTokens: 900, totalOutputTokens: 99 };
const metaUsage = { totalTokens: 1, totalInputTokens: 1, totalOutputTokens: 0 };
const input = [
{
id: 'a1',
role: 'assistant' as const,
content: 'hi',
createdAt: 1,
usage: topLevelUsage,
metadata: { usage: metaUsage },
},
];
const result = parse(input as any[]);
expect(result.flatList[0]?.usage).toEqual(topLevelUsage);
});
});
describe('Performance', () => {
it('should parse 10000 items within 100ms', () => {
// Generate 10000 messages as flat siblings (no deep nesting to avoid stack overflow)

View file

@ -117,11 +117,25 @@ export function parse(messages: Message[], messageGroups?: MessageGroupMetadata[
// For non-grouped supervisor messages (e.g., supervisor summary without tools)
// Note: sub_agent scope transformation is done in pre-processing phase (before buildHelperMaps)
const processedFlatList = flatList.map((msg) => {
let next = msg;
// Transform supervisor messages
if (msg.role === 'assistant' && msg.metadata?.isSupervisor) {
return { ...msg, role: 'supervisor' as const };
if (next.role === 'assistant' && next.metadata?.isSupervisor) {
next = { ...next, role: 'supervisor' as const };
}
return msg;
// Promote `metadata.usage` (canonical storage) onto the top-level `usage`
// field that UIChatMessage consumers (Extras token badge, tokenCounter,
// etc.) read from. The DB layer stores token usage inside the metadata
// JSONB column — executors on every path (Gateway, hetero-agent CLI) write
// there — but no server-side transform lifts it out. Doing it here keeps
// the promotion in one place, close to where display shapes are built,
// and works for both desktop (local PGlite) and web (remote Postgres).
if (!next.usage && next.metadata?.usage) {
next = { ...next, usage: next.metadata.usage };
}
return next;
});
return {

View file

@ -182,11 +182,21 @@ describe('ClaudeCodeAdapter E2E', () => {
// 2 boundaries: msg_01 → msg_02, msg_02 → msg_03
expect(newStepStarts.length).toBe(2);
// 5. Should have usage metadata events
// 5. turn_metadata is now emitted on `stream_event: message_delta`, not on
// `assistant` events (CC echoes a stale message_start usage on every
// content block). This simplified fixture has no stream_event records, so
// no turn_metadata fires here — a dedicated test in claudeCode.test.ts
// covers the message_delta flow. We still assert the result_usage summary
// lands at session end.
const metaEvents = allEvents.filter(
(e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata',
);
expect(metaEvents.length).toBeGreaterThanOrEqual(3); // At least one per assistant turn
expect(metaEvents.length).toBe(0);
const resultUsage = allEvents.filter(
(e) => e.type === 'step_complete' && e.data?.phase === 'result_usage',
);
// No `result.usage` in this fixture, so none emitted either.
expect(resultUsage.length).toBe(0);
// 6. Should end with stream_end + agent_runtime_end (from result)
const lastTwo = types.slice(-2);

View file

@ -269,7 +269,12 @@ describe('ClaudeCodeAdapter', () => {
});
describe('usage and model extraction', () => {
it('emits step_complete with turn_metadata when message has model and usage', () => {
// Under `--include-partial-messages` (our preset default), CC emits a
// stale `message_start.usage` snapshot (e.g. `output_tokens: 8`) that it
// echoes verbatim on every content-block `assistant` event. The
// authoritative per-turn total only arrives later as `message_delta`.
// So turn_metadata emission is wired to `message_delta`, not `assistant`.
it('does NOT emit turn_metadata on assistant events (usage there is stale)', () => {
const adapter = new ClaudeCodeAdapter();
adapter.adapt({ subtype: 'init', type: 'system' });
@ -278,29 +283,69 @@ describe('ClaudeCodeAdapter', () => {
id: 'msg_1',
content: [{ text: 'hello', type: 'text' }],
model: 'claude-sonnet-4-6',
usage: { input_tokens: 100, output_tokens: 50 },
usage: { input_tokens: 100, output_tokens: 1 }, // stale placeholder
},
type: 'assistant',
});
expect(
events.find((e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata'),
).toBeUndefined();
});
it('emits turn_metadata on message_delta with authoritative usage', () => {
const adapter = new ClaudeCodeAdapter();
adapter.adapt({ subtype: 'init', type: 'system' });
// stream_event:message_start primes the current message id + model
adapter.adapt({
event: {
message: { id: 'msg_1', model: 'claude-sonnet-4-6' },
type: 'message_start',
},
type: 'stream_event',
});
// message_delta carries the final per-turn usage
const events = adapter.adapt({
event: {
type: 'message_delta',
usage: { input_tokens: 100, output_tokens: 50 },
},
type: 'stream_event',
});
const meta = events.find(
(e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata',
);
expect(meta).toBeDefined();
expect(meta!.data.model).toBe('claude-sonnet-4-6');
expect(meta!.data.usage.input_tokens).toBe(100);
expect(meta!.data.usage.output_tokens).toBe(50);
expect(meta!.data.provider).toBe('claude-code');
expect(meta!.data.usage).toEqual({
inputCacheMissTokens: 100,
inputCachedTokens: undefined,
inputWriteCacheTokens: undefined,
totalInputTokens: 100,
totalOutputTokens: 50,
totalTokens: 150,
});
});
it('emits step_complete with cache token usage', () => {
it('normalizes cache creation and cache read from message_delta usage', () => {
const adapter = new ClaudeCodeAdapter();
adapter.adapt({ subtype: 'init', type: 'system' });
adapter.adapt({
event: {
message: { id: 'msg_1', model: 'claude-sonnet-4-6' },
type: 'message_start',
},
type: 'stream_event',
});
const events = adapter.adapt({
message: {
id: 'msg_1',
content: [{ text: 'hi', type: 'text' }],
model: 'claude-sonnet-4-6',
event: {
type: 'message_delta',
usage: {
cache_creation_input_tokens: 200,
cache_read_input_tokens: 300,
@ -308,14 +353,54 @@ describe('ClaudeCodeAdapter', () => {
output_tokens: 50,
},
},
type: 'assistant',
type: 'stream_event',
});
const meta = events.find(
(e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata',
);
expect(meta!.data.usage.cache_creation_input_tokens).toBe(200);
expect(meta!.data.usage.cache_read_input_tokens).toBe(300);
expect(meta!.data.usage).toEqual({
inputCacheMissTokens: 100,
inputCachedTokens: 300,
inputWriteCacheTokens: 200,
totalInputTokens: 600,
totalOutputTokens: 50,
totalTokens: 650,
});
});
it('uses model from the latest assistant event when message_start lacks one', () => {
// Non-partial edge case: no message_start carries model, but assistant
// events always do. The adapter should still attach the right model.
const adapter = new ClaudeCodeAdapter();
adapter.adapt({ subtype: 'init', type: 'system' });
adapter.adapt({
event: { message: { id: 'msg_1' }, type: 'message_start' },
type: 'stream_event',
});
adapter.adapt({
message: {
id: 'msg_1',
content: [{ text: 'hi', type: 'text' }],
model: 'claude-opus-4-7',
usage: { input_tokens: 1, output_tokens: 1 },
},
type: 'assistant',
});
const events = adapter.adapt({
event: {
type: 'message_delta',
usage: { input_tokens: 10, output_tokens: 100 },
},
type: 'stream_event',
});
const meta = events.find(
(e) => e.type === 'step_complete' && e.data?.phase === 'turn_metadata',
);
expect(meta!.data.model).toBe('claude-opus-4-7');
});
});

View file

@ -41,8 +41,43 @@ import type {
StreamChunkData,
ToolCallPayload,
ToolResultData,
UsageData,
} from '../types';
/**
* Convert a raw Anthropic-shape usage object (per-turn or grand-total from
* Claude Code's `result` event) into the provider-agnostic `UsageData` shape.
* Returns undefined when no tokens were consumed, so callers can skip empty
* events without a null-check cascade.
*/
const toUsageData = (
raw:
| {
cache_creation_input_tokens?: number;
cache_read_input_tokens?: number;
input_tokens?: number;
output_tokens?: number;
}
| null
| undefined,
): UsageData | undefined => {
if (!raw) return undefined;
const inputCacheMissTokens = raw.input_tokens || 0;
const inputCachedTokens = raw.cache_read_input_tokens || 0;
const inputWriteCacheTokens = raw.cache_creation_input_tokens || 0;
const totalInputTokens = inputCacheMissTokens + inputCachedTokens + inputWriteCacheTokens;
const totalOutputTokens = raw.output_tokens || 0;
if (totalInputTokens + totalOutputTokens === 0) return undefined;
return {
inputCacheMissTokens,
inputCachedTokens: inputCachedTokens || undefined,
inputWriteCacheTokens: inputWriteCacheTokens || undefined,
totalInputTokens,
totalOutputTokens,
totalTokens: totalInputTokens + totalOutputTokens,
};
};
// ─── CLI Preset ───
export const claudeCodePreset: AgentCLIPreset = {
@ -72,10 +107,14 @@ export class ClaudeCodeAdapter implements AgentEventAdapter {
private stepIndex = 0;
/** Track current message.id to detect step boundaries */
private currentMessageId: string | undefined;
/** Track which message.id has already emitted usage (dedup) */
private usageEmittedForMessageId: string | undefined;
/** message.id of the stream_event delta flow currently in flight */
private currentStreamEventMessageId: string | undefined;
/**
* Latest model seen for the in-flight message.id captured from
* `message_start` (partial mode) or `assistant` events, emitted alongside
* authoritative usage on `message_delta`.
*/
private currentStreamEventModel: string | undefined;
/** message.ids whose text has already been streamed as deltas — skip the full-block emission */
private messagesWithStreamedText = new Set<string>();
/** message.ids whose thinking has already been streamed as deltas — skip the full-block emission */
@ -146,20 +185,13 @@ export class ClaudeCodeAdapter implements AgentEventAdapter {
events.push(...this.openMainMessage(messageId, raw.message?.model));
// Per-turn model + usage snapshot — emitted as 'step_complete'-like
// metadata event so executor can track latest model and accumulated usage.
// DEDUP: same message.id carries identical usage on every content block
// (thinking, text, tool_use). Only emit once per message.id.
if ((raw.message?.model || raw.message?.usage) && messageId !== this.usageEmittedForMessageId) {
this.usageEmittedForMessageId = messageId;
events.push(
this.makeEvent('step_complete', {
model: raw.message?.model,
phase: 'turn_metadata',
usage: raw.message?.usage,
}),
);
}
// Track the latest model — emitted alongside authoritative usage on the
// matching `message_delta`. We deliberately do NOT emit turn_metadata
// here: under `--include-partial-messages` (our default), every
// content-block `assistant` event echoes a STALE usage snapshot from
// `message_start` (e.g. `output_tokens: 8`); the per-turn total only
// arrives on `stream_event: message_delta`.
if (raw.message?.model) this.currentStreamEventModel = raw.message.model;
// Each content array here is usually ONE block (thinking OR tool_use OR text)
// but we handle multiple defensively.
@ -270,14 +302,18 @@ export class ClaudeCodeAdapter implements AgentEventAdapter {
}
private handleResult(raw: any): HeterogeneousAgentEvent[] {
// Emit authoritative usage from result event (overrides per-turn accumulation)
// Emit authoritative grand-total usage from CC's result event. The
// executor currently ignores this phase (it persists per-turn via
// turn_metadata), but we still emit it so other consumers — cost
// displays, logs — can read the normalized total.
const events: HeterogeneousAgentEvent[] = [];
if (raw.usage) {
const usage = toUsageData(raw.usage);
if (usage) {
events.push(
this.makeEvent('step_complete', {
costUsd: raw.total_cost_usd,
phase: 'result_usage',
usage: raw.usage,
usage,
}),
);
}
@ -310,6 +346,7 @@ export class ClaudeCodeAdapter implements AgentEventAdapter {
case 'message_start': {
const msgId: string | undefined = event.message?.id;
this.currentStreamEventMessageId = msgId;
if (event.message?.model) this.currentStreamEventModel = event.message.model;
return this.openMainMessage(msgId, event.message?.model);
}
case 'content_block_delta': {
@ -326,6 +363,22 @@ export class ClaudeCodeAdapter implements AgentEventAdapter {
}
return [];
}
case 'message_delta': {
// Authoritative per-turn usage. CC echoes stale message_start usage on
// every `assistant` event, so `handleAssistant` deliberately skips the
// emission and lets this branch own it. `message_delta.usage` carries
// the full final usage (input + cache + final output_tokens).
const usage = toUsageData(event.usage);
if (!usage) return [];
return [
this.makeEvent('step_complete', {
model: this.currentStreamEventModel,
phase: 'turn_metadata',
provider: 'claude-code',
usage,
}),
];
}
default: {
return [];
}

View file

@ -77,6 +77,48 @@ export interface ToolCallPayload {
type: string;
}
/**
* Normalized token usage for a single LLM call. Field names mirror LobeHub's
* `MessageMetadata.usage` so the executor can write this shape straight to
* `metadata.usage` with no further conversion.
*
* Each adapter is responsible for mapping its provider-native usage object
* (Anthropic `input_tokens` + cache split, OpenAI `prompt_tokens`, etc.) into
* this shape. Provider-specific shape knowledge does not leak past the adapter.
*/
export interface UsageData {
/** Input tokens served from the prompt cache (cache reads). */
inputCachedTokens?: number;
/** Input tokens that missed the prompt cache (fresh prompt bytes). */
inputCacheMissTokens: number;
/** Input tokens written into the prompt cache (cache creation). */
inputWriteCacheTokens?: number;
totalInputTokens: number;
totalOutputTokens: number;
totalTokens: number;
}
/**
* Data shape for `step_complete` events. `phase` disambiguates the subtype:
* - `turn_metadata`: per-turn snapshot of model + provider + usage (once per LLM call)
* - `result_usage`: authoritative grand total at the end of a session
*/
export interface StepCompleteData {
/** Total session cost in USD (only on `result_usage`, if the CLI reports it). */
costUsd?: number;
/** Model id for this turn (only meaningful on `turn_metadata`). */
model?: string;
phase: 'turn_metadata' | 'result_usage';
/**
* Provider identifier for this turn the CLI / adapter name (e.g.
* `claude-code`, `codex`), not the underlying LLM vendor. CLI-wrapped agents
* bill via their own subscription so downstream pricing logic keys on the
* CLI provider, not on the wrapped model's native vendor.
*/
provider?: string;
usage?: UsageData;
}
// ─── Adapter Interface ───
/**

View file

@ -161,6 +161,31 @@ const ccResult = (isError = false, result = 'done') => ({
type: 'result',
});
/**
* `stream_event: message_start` primes adapter's in-flight message.id so a
* following `message_delta` (which has no message.id of its own) can attach
* its authoritative usage to the correct turn.
*/
const ccMessageStart = (msgId: string, model = 'claude-sonnet-4-6') => ({
event: { message: { id: msgId, model }, type: 'message_start' },
type: 'stream_event',
});
/**
* `stream_event: message_delta` the authoritative per-turn usage under
* `--include-partial-messages` (CC's `assistant` events only echo a stale
* message_start snapshot, so turn_metadata is driven off this event).
*/
const ccMessageDelta = (usage: {
cache_creation_input_tokens?: number;
cache_read_input_tokens?: number;
input_tokens?: number;
output_tokens?: number;
}) => ({
event: { type: 'message_delta', usage },
type: 'stream_event',
});
// ─── Tests ───
describe('heterogeneousAgentExecutor DB persistence', () => {
@ -358,14 +383,21 @@ describe('heterogeneousAgentExecutor DB persistence', () => {
await runWithEvents([
ccInit(),
// Step 1: tool_use Read
// Step 1: tool_use Read (message_start primes turn + model/provider
// so the executor can stamp step 2's createMessage with them)
ccMessageStart('msg_01'),
ccToolUse('msg_01', 'toolu_1', 'Read', { file_path: '/a.ts' }),
ccMessageDelta({ input_tokens: 10, output_tokens: 5 }),
ccToolResult('toolu_1', 'content of a.ts'),
// Step 2 (new message.id): tool_use Write
ccMessageStart('msg_02'),
ccToolUse('msg_02', 'toolu_2', 'Write', { file_path: '/b.ts', content: 'new' }),
ccMessageDelta({ input_tokens: 20, output_tokens: 10 }),
ccToolResult('toolu_2', 'file written'),
// Step 3 (new message.id): final text
ccMessageStart('msg_03'),
ccText('msg_03', 'All done!'),
ccMessageDelta({ input_tokens: 30, output_tokens: 15 }),
ccResult(),
]);
@ -384,6 +416,9 @@ describe('heterogeneousAgentExecutor DB persistence', () => {
// The parentId should be the tool message ID from step 1
const tool1Id = createdIds.find((id) => id.startsWith('tool-'));
expect(step2Assistant![0].parentId).toBe(tool1Id);
// createMessage should carry the adapter provider so step 2's assistant
// lands in DB with provider set from the start (no later backfill needed).
expect(step2Assistant![0].provider).toBe('claude-code');
});
it('should fall back to assistant parentId when step has no tools', async () => {
@ -414,28 +449,33 @@ describe('heterogeneousAgentExecutor DB persistence', () => {
// ────────────────────────────────────────────────────
describe('final content writes (onComplete)', () => {
it('should write accumulated content + model to the final assistant message', async () => {
it('should write accumulated content + model + provider to the final assistant message', async () => {
await runWithEvents([
ccInit(),
// message_start carries the model for this turn; individual assistant
// content-block events echo the same model, so the final write should
// stamp `claude-opus-4-6` (not the init-default sonnet).
ccMessageStart('msg_01', 'claude-opus-4-6'),
ccAssistant('msg_01', [{ text: 'Hello ', type: 'text' }], {
model: 'claude-opus-4-6',
usage: { input_tokens: 100, output_tokens: 10 },
}),
ccAssistant('msg_01', [{ text: 'world!', type: 'text' }], {
usage: { input_tokens: 100, output_tokens: 20 },
model: 'claude-opus-4-6',
}),
// message_delta fires the authoritative turn_metadata (with model from
// the adapter's in-flight state)
ccMessageDelta({ input_tokens: 100, output_tokens: 20 }),
ccResult(),
]);
// Final updateMessage should include accumulated content + model
const finalWrite = mockUpdateMessage.mock.calls.find(
([id, val]: any) => id === 'ast-initial' && val.content === 'Hello world!',
);
expect(finalWrite).toBeDefined();
// lastModel is set from step_complete(turn_metadata). With usage dedup,
// only the FIRST event per message.id emits turn_metadata, so model stays
// as 'claude-opus-4-6' from the first event.
expect(finalWrite![1].model).toBe('claude-opus-4-6');
// provider is emitted by the CC adapter on turn_metadata so it rides
// along with the final content/model write.
expect(finalWrite![1].provider).toBe('claude-code');
});
it('should write accumulated reasoning', async () => {
@ -453,40 +493,95 @@ describe('heterogeneousAgentExecutor DB persistence', () => {
expect(finalWrite![1].reasoning.content).toBe('Let me think about this.');
});
it('should accumulate usage across turns into metadata', async () => {
it('should persist per-step usage to each step assistant message, not accumulated', async () => {
// Deterministic ids for new-step assistant messages so we can assert per-message usage.
let astStepCounter = 0;
mockCreateMessage.mockImplementation(async (params: any) => {
if (params.role === 'assistant') {
astStepCounter++;
return { id: `ast-step-${astStepCounter}` };
}
return { id: `tool-${Date.now()}` };
});
// Realistic CC partial-messages flow: message_start primes the turn,
// assistant events echo a stale usage, message_delta carries the final.
await runWithEvents([
ccInit(),
ccAssistant('msg_01', [{ text: 'a', type: 'text' }], {
usage: {
cache_creation_input_tokens: 50,
cache_read_input_tokens: 200,
input_tokens: 100,
output_tokens: 50,
},
}),
ccMessageStart('msg_01'),
ccAssistant('msg_01', [{ text: 'a', type: 'text' }]),
ccToolUse('msg_01', 'toolu_1', 'Bash', {}),
ccToolResult('toolu_1', 'ok'),
ccAssistant('msg_02', [{ text: 'b', type: 'text' }], {
usage: { input_tokens: 300, output_tokens: 80 },
ccMessageDelta({
cache_creation_input_tokens: 50,
cache_read_input_tokens: 200,
input_tokens: 100,
output_tokens: 50,
}),
ccToolResult('toolu_1', 'ok'),
ccMessageStart('msg_02'),
ccAssistant('msg_02', [{ text: 'b', type: 'text' }]),
ccMessageDelta({ input_tokens: 300, output_tokens: 80 }),
ccResult(),
]);
// Find the final write that has usage metadata
const finalWrite = mockUpdateMessage.mock.calls.find(
const usageWrites = mockUpdateMessage.mock.calls.filter(
([, val]: any) => val.metadata?.usage?.totalTokens,
);
expect(finalWrite).toBeDefined();
const usage = finalWrite![1].metadata.usage;
// 100 + 300 input + 200 cache_read + 50 cache_create = 650 input total
expect(usage.totalInputTokens).toBe(650);
// 50 + 80 = 130 output
expect(usage.totalOutputTokens).toBe(130);
expect(usage.totalTokens).toBe(780);
// Breakdown for pricing UI (must match anthropic usage converter shape)
expect(usage.inputCacheMissTokens).toBe(400);
expect(usage.inputCachedTokens).toBe(200);
expect(usage.inputWriteCacheTokens).toBe(50);
// One usage write per step (msg_01 → ast-initial, msg_02 → ast-step-1)
expect(usageWrites.length).toBe(2);
const step1 = usageWrites.find(([id]: any) => id === 'ast-initial');
expect(step1).toBeDefined();
const u1 = step1![1].metadata.usage;
// msg_01: 100 input (miss) + 200 cached + 50 cache_create = 350; 50 output
expect(u1.totalInputTokens).toBe(350);
expect(u1.totalOutputTokens).toBe(50);
expect(u1.totalTokens).toBe(400);
expect(u1.inputCacheMissTokens).toBe(100);
expect(u1.inputCachedTokens).toBe(200);
expect(u1.inputWriteCacheTokens).toBe(50);
const step2 = usageWrites.find(([id]: any) => id === 'ast-step-1');
expect(step2).toBeDefined();
const u2 = step2![1].metadata.usage;
// msg_02: 300 input (miss, no cache); 80 output
expect(u2.totalInputTokens).toBe(300);
expect(u2.totalOutputTokens).toBe(80);
expect(u2.totalTokens).toBe(380);
expect(u2.inputCacheMissTokens).toBe(300);
// No cache tokens for this turn — these fields should be absent
expect(u2.inputCachedTokens).toBeUndefined();
expect(u2.inputWriteCacheTokens).toBeUndefined();
});
it('should ignore stale usage on assistant events (from message_start echo)', async () => {
// Regression for LOBE-7258-style bug: under partial-messages mode, CC
// echoes a stale message_start usage (e.g. output_tokens: 1) on every
// content-block assistant event. If the adapter picked that up, the DB
// would record output_tokens=1 instead of the real total. This verifies
// the stale snapshot is ignored and only the message_delta total lands.
await runWithEvents([
ccInit(),
ccMessageStart('msg_01'),
// All assistant events below carry the STALE placeholder usage
ccAssistant('msg_01', [{ text: 'hi', type: 'text' }], {
usage: { input_tokens: 6, output_tokens: 1 }, // stale
}),
ccAssistant('msg_01', [{ id: 'tu', input: {}, name: 'Read', type: 'tool_use' }], {
usage: { input_tokens: 6, output_tokens: 1 }, // stale echo
}),
// Authoritative final usage arrives on message_delta
ccMessageDelta({ input_tokens: 6, output_tokens: 265 }),
ccToolResult('tu', 'ok'),
ccResult(),
]);
const usageWrites = mockUpdateMessage.mock.calls.filter(
([, val]: any) => val.metadata?.usage?.totalTokens,
);
expect(usageWrites.length).toBe(1);
expect(usageWrites[0][1].metadata.usage.totalOutputTokens).toBe(265); // not 1
expect(usageWrites[0][1].metadata.usage.totalInputTokens).toBe(6);
});
});

View file

@ -297,14 +297,10 @@ export const executeHeterogeneousAgent = async (
/** Content accumulators — reset on each new step */
let accumulatedContent = '';
let accumulatedReasoning = '';
/** Extracted model + usage from each assistant event (used for final write) */
/** Latest model string — updated per turn, written alongside content on step boundaries. */
let lastModel: string | undefined;
const accumulatedUsage: Record<string, number> = {
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
input_tokens: 0,
output_tokens: 0,
};
/** Adapter/CLI provider (e.g. `claude-code`) — carried on every turn_metadata. */
let lastProvider: string | undefined;
/**
* Deferred terminal event (agent_runtime_end or error). We don't forward
* these to the gateway handler immediately because handler triggers
@ -384,31 +380,32 @@ export const executeHeterogeneousAgent = async (
continue;
}
// ─── step_complete with result_usage: authoritative total from CC result event ───
if (event.type === 'step_complete' && event.data?.phase === 'result_usage') {
if (event.data.usage) {
// Override (not accumulate) — result event has the correct totals
accumulatedUsage.input_tokens = event.data.usage.input_tokens || 0;
accumulatedUsage.output_tokens = event.data.usage.output_tokens || 0;
accumulatedUsage.cache_creation_input_tokens =
event.data.usage.cache_creation_input_tokens || 0;
accumulatedUsage.cache_read_input_tokens =
event.data.usage.cache_read_input_tokens || 0;
}
continue;
}
// ─── step_complete with turn_metadata: capture model + usage ───
// ─── step_complete with turn_metadata: persist per-step usage ───
// `turn_metadata.usage` is the per-turn delta (deduped by adapter per
// message.id) and already normalized to the MessageMetadata.usage
// shape — write it straight through to the current step's assistant
// message. Queue the write so it lands after any in-flight
// stream_start(newStep) that may still be swapping
// `currentAssistantMessageId` to the new step's message.
//
// `result_usage` (grand total across all turns) is intentionally
// ignored — applying it would overwrite the last step with the sum
// of all prior steps. Sum of turn_metadata equals result_usage for
// a healthy run.
if (event.type === 'step_complete' && event.data?.phase === 'turn_metadata') {
if (event.data.model) lastModel = event.data.model;
if (event.data.usage) {
// Accumulate token usage across turns (deduped by adapter per message.id)
accumulatedUsage.input_tokens += event.data.usage.input_tokens || 0;
accumulatedUsage.output_tokens += event.data.usage.output_tokens || 0;
accumulatedUsage.cache_creation_input_tokens +=
event.data.usage.cache_creation_input_tokens || 0;
accumulatedUsage.cache_read_input_tokens +=
event.data.usage.cache_read_input_tokens || 0;
if (event.data.provider) lastProvider = event.data.provider;
const turnUsage = event.data.usage;
if (turnUsage) {
persistQueue = persistQueue.then(async () => {
await messageService
.updateMessage(
currentAssistantMessageId,
{ metadata: { usage: turnUsage } },
{ agentId: context.agentId, topicId: context.topicId },
)
.catch(console.error);
});
}
// Don't forward turn metadata — it's internal bookkeeping
continue;
@ -423,6 +420,7 @@ export const executeHeterogeneousAgent = async (
const prevContent = accumulatedContent;
const prevReasoning = accumulatedReasoning;
const prevModel = lastModel;
const prevProvider = lastProvider;
// Reset content accumulators synchronously so new-step chunks go to fresh state
accumulatedContent = '';
@ -440,6 +438,7 @@ export const executeHeterogeneousAgent = async (
if (prevContent) prevUpdate.content = prevContent;
if (prevReasoning) prevUpdate.reasoning = { content: prevReasoning };
if (prevModel) prevUpdate.model = prevModel;
if (prevProvider) prevUpdate.provider = prevProvider;
if (Object.keys(prevUpdate).length > 0) {
await messageService
.updateMessage(currentAssistantMessageId, prevUpdate, {
@ -462,6 +461,7 @@ export const executeHeterogeneousAgent = async (
content: '',
model: lastModel,
parentId: stepParentId,
provider: lastProvider,
role: 'assistant',
topicId: context.topicId ?? undefined,
});
@ -547,33 +547,14 @@ export const executeHeterogeneousAgent = async (
// Wait for all tool persistence to finish before writing final state
await persistQueue.catch(console.error);
// Persist final content + reasoning + model + usage to the assistant message
// BEFORE the terminal event triggers fetchAndReplaceMessages.
// Persist final content + reasoning + model for the last step BEFORE the
// terminal event triggers fetchAndReplaceMessages. Usage for this step
// was already written per-turn via the turn_metadata branch.
const updateValue: Record<string, any> = {};
if (accumulatedContent) updateValue.content = accumulatedContent;
if (accumulatedReasoning) updateValue.reasoning = { content: accumulatedReasoning };
if (lastModel) updateValue.model = lastModel;
const inputCacheMiss = accumulatedUsage.input_tokens;
const inputCached = accumulatedUsage.cache_read_input_tokens;
const inputWriteCache = accumulatedUsage.cache_creation_input_tokens;
const totalInputTokens = inputCacheMiss + inputCached + inputWriteCache;
const totalOutputTokens = accumulatedUsage.output_tokens;
if (totalInputTokens + totalOutputTokens > 0) {
updateValue.metadata = {
// Use nested `usage` — the flat fields on MessageMetadata are deprecated.
// Shape mirrors the anthropic usage converter so CC CLI and Gateway turns
// render identically in pricing/usage UI.
usage: {
inputCacheMissTokens: inputCacheMiss,
inputCachedTokens: inputCached || undefined,
inputWriteCacheTokens: inputWriteCache || undefined,
totalInputTokens,
totalOutputTokens,
totalTokens: totalInputTokens + totalOutputTokens,
},
};
}
if (lastProvider) updateValue.provider = lastProvider;
if (Object.keys(updateValue).length > 0) {
await messageService