mirror of
https://github.com/n8n-io/n8n
synced 2026-04-21 15:47:20 +00:00
feat(ai-builder): Improve sub-agent context passing with structured briefings and debriefings (#28317)
This commit is contained in:
parent
0ce81461ab
commit
e78f144e8e
14 changed files with 526 additions and 65 deletions
95
packages/@n8n/instance-ai/src/agent/sub-agent-briefing.ts
Normal file
95
packages/@n8n/instance-ai/src/agent/sub-agent-briefing.ts
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
import { formatPreviousAttempts, type IterationLog } from '../storage/iteration-log';
|
||||
|
||||
// ── Types ───────────────────────────────────────────────────────────────────
|
||||
|
||||
export interface RunningTaskSummary {
|
||||
taskId: string;
|
||||
role: string;
|
||||
goal?: string;
|
||||
}
|
||||
|
||||
export interface SubAgentBriefingInput {
|
||||
/** The core task description. */
|
||||
task: string;
|
||||
/** Brief summary of the conversation so far. */
|
||||
conversationContext?: string;
|
||||
/** Structured artifacts (IDs, data). */
|
||||
artifacts?: Record<string, unknown>;
|
||||
/** Additional context blocks (e.g., sandbox instructions, workflowId notes). */
|
||||
additionalContext?: string;
|
||||
/** Requirements block (e.g., DETACHED_BUILDER_REQUIREMENTS). */
|
||||
requirements?: string;
|
||||
/** Iteration log + task key for retry context. */
|
||||
iteration?: {
|
||||
log: IterationLog;
|
||||
threadId: string;
|
||||
taskKey: string;
|
||||
};
|
||||
/** Currently running background tasks in this thread. */
|
||||
runningTasks?: RunningTaskSummary[];
|
||||
}
|
||||
|
||||
// ── Builder ─────────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Build a structured XML-formatted briefing for a sub-agent.
|
||||
*
|
||||
* All sub-agent spawn sites (delegate, builder, research, data-table) use this
|
||||
* instead of ad-hoc string concatenation. The XML structure gives the LLM
|
||||
* clear section boundaries and makes the briefing parseable.
|
||||
*/
|
||||
export async function buildSubAgentBriefing(input: SubAgentBriefingInput): Promise<string> {
|
||||
const parts: string[] = [];
|
||||
|
||||
// Core task — always present
|
||||
parts.push(`<task>\n${input.task}\n</task>`);
|
||||
|
||||
// Conversation context — what the user discussed, decisions made
|
||||
if (input.conversationContext) {
|
||||
parts.push(`<conversation-context>\n${input.conversationContext}\n</conversation-context>`);
|
||||
}
|
||||
|
||||
// Structured artifacts — IDs, data, references
|
||||
if (input.artifacts && Object.keys(input.artifacts).length > 0) {
|
||||
parts.push(`<artifacts>\n${JSON.stringify(input.artifacts)}\n</artifacts>`);
|
||||
}
|
||||
|
||||
// Additional context — sandbox paths, workflowId notes, etc.
|
||||
if (input.additionalContext) {
|
||||
parts.push(input.additionalContext);
|
||||
}
|
||||
|
||||
// Requirements block — e.g., DETACHED_BUILDER_REQUIREMENTS
|
||||
if (input.requirements) {
|
||||
parts.push(input.requirements);
|
||||
}
|
||||
|
||||
// Thread state — what else is happening in parallel
|
||||
if (input.runningTasks && input.runningTasks.length > 0) {
|
||||
const taskLines = input.runningTasks
|
||||
.map(
|
||||
(t) =>
|
||||
` <running-task taskId="${t.taskId}" role="${t.role}">${t.goal ?? ''}</running-task>`,
|
||||
)
|
||||
.join('\n');
|
||||
parts.push(`<thread-state>\n${taskLines}\n</thread-state>`);
|
||||
}
|
||||
|
||||
// Iteration context — previous attempt history
|
||||
if (input.iteration) {
|
||||
try {
|
||||
const entries = await input.iteration.log.getForTask(
|
||||
input.iteration.threadId,
|
||||
input.iteration.taskKey,
|
||||
);
|
||||
const formatted = formatPreviousAttempts(entries);
|
||||
if (formatted) {
|
||||
parts.push(formatted);
|
||||
}
|
||||
} catch {
|
||||
// Non-fatal — iteration log is best-effort
|
||||
}
|
||||
}
|
||||
|
||||
return parts.join('\n\n');
|
||||
}
|
||||
78
packages/@n8n/instance-ai/src/agent/sub-agent-debriefing.ts
Normal file
78
packages/@n8n/instance-ai/src/agent/sub-agent-debriefing.ts
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
import { z } from 'zod';
|
||||
|
||||
import { toolCallSummarySchema } from '../stream/work-summary-accumulator';
|
||||
import type { WorkSummary } from '../stream/work-summary-accumulator';
|
||||
|
||||
// ── Schema (source of truth) ────────────────────────────────────────────────
|
||||
|
||||
export const subAgentDebriefingSchema = z.object({
|
||||
/** Unique sub-agent ID. */
|
||||
agentId: z.string(),
|
||||
/** Sub-agent role descriptor. */
|
||||
role: z.string(),
|
||||
/** Terminal status. */
|
||||
status: z.enum(['completed', 'failed', 'cancelled']),
|
||||
/** The agent's final text output. */
|
||||
result: z.string(),
|
||||
|
||||
// ── Work summary (accumulated from stream observation) ────────────────
|
||||
/** Total number of tool invocations observed. */
|
||||
toolCallCount: z.number().int().min(0),
|
||||
/** Number of tool invocations that failed. */
|
||||
toolErrorCount: z.number().int().min(0),
|
||||
/** Per-tool call outcomes — omitted when empty to keep payloads lean. */
|
||||
toolCalls: z.array(toolCallSummarySchema).optional(),
|
||||
|
||||
// ── Timing ────────────────────────────────────────────────────────────
|
||||
/** Wall-clock duration of the sub-agent run in milliseconds. */
|
||||
durationMs: z.number().optional(),
|
||||
|
||||
// ── Diagnostic context ───────────────────────────────────────────────
|
||||
/** Why the agent stopped (e.g. "completed task", "blocked on credential", "retry limit"). */
|
||||
stoppingReason: z.string().optional(),
|
||||
/** Specific blockers encountered during execution. */
|
||||
blockers: z.array(z.string()).optional(),
|
||||
});
|
||||
|
||||
export type SubAgentDebriefing = z.infer<typeof subAgentDebriefingSchema>;
|
||||
|
||||
// ── Builder ─────────────────────────────────────────────────────────────────
|
||||
|
||||
export interface BuildDebriefingInput {
|
||||
agentId: string;
|
||||
role: string;
|
||||
result: string;
|
||||
workSummary: WorkSummary;
|
||||
startTime: number;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a {@link SubAgentDebriefing} from the host-observed work summary
|
||||
* and timing data. The sub-agent itself is stateless — this is built by the
|
||||
* caller (delegate tool, background task manager) after stream consumption.
|
||||
*/
|
||||
export function buildDebriefing(input: BuildDebriefingInput): SubAgentDebriefing {
|
||||
const { agentId, role, result, workSummary, startTime, error } = input;
|
||||
|
||||
const status = error ? 'failed' : 'completed';
|
||||
const durationMs = Date.now() - startTime;
|
||||
|
||||
// Extract blockers from tool errors
|
||||
const blockers = workSummary.toolCalls
|
||||
.filter((c) => !c.succeeded && c.errorSummary)
|
||||
.map((c) => `${c.toolName}: ${c.errorSummary}`);
|
||||
|
||||
return {
|
||||
agentId,
|
||||
role,
|
||||
status,
|
||||
result,
|
||||
toolCallCount: workSummary.totalToolCalls,
|
||||
toolErrorCount: workSummary.totalToolErrors,
|
||||
...(workSummary.toolCalls.length > 0 ? { toolCalls: workSummary.toolCalls } : {}),
|
||||
durationMs,
|
||||
...(error ? { stoppingReason: error } : {}),
|
||||
...(blockers.length > 0 ? { blockers } : {}),
|
||||
};
|
||||
}
|
||||
|
|
@ -22,9 +22,21 @@ export interface SubAgentOptions {
|
|||
/** Hard protocol injected into every sub-agent — cannot be overridden by orchestrator instructions. */
|
||||
const SUB_AGENT_PROTOCOL = `## Output Protocol (MANDATORY)
|
||||
You are reporting to a parent agent, NOT a human user. Your output is machine-consumed.
|
||||
- Return ONLY structured data: IDs, statuses, errors, counts.
|
||||
- NO prose, NO narration, NO emojis, NO markdown headers (## or **bold**), NO filler phrases.
|
||||
- Do NOT describe what you are about to do or what you did. Just return the facts.
|
||||
|
||||
### Structured Result (required)
|
||||
Return a concise result summary: IDs created, statuses, counts, errors encountered.
|
||||
No emojis, no markdown headers, no filler phrases.
|
||||
|
||||
### Diagnostic Context (when relevant)
|
||||
If you encountered errors, retried operations, or made non-obvious decisions, add a brief
|
||||
diagnostic section at the end explaining:
|
||||
- What approaches you tried and why they failed
|
||||
- What blockers remain (missing credentials, permissions, API errors)
|
||||
- What assumptions you made
|
||||
|
||||
Keep diagnostics to 2-3 sentences maximum. Omit entirely when the task succeeded cleanly.
|
||||
|
||||
### Rules
|
||||
- One tool call at a time unless truly independent. Minimum tool calls needed.
|
||||
- You cannot delegate to other agents or create plans.
|
||||
- If you are stuck or need information only a human can provide, use the ask-user tool.
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import type { WorkSummary } from '../../stream/work-summary-accumulator';
|
||||
import { executeResumableStream } from '../resumable-stream-executor';
|
||||
import { streamAgentRun } from '../stream-runner';
|
||||
|
||||
|
|
@ -6,6 +7,8 @@ jest.mock('../resumable-stream-executor', () => ({
|
|||
createLlmStepTraceHooks: jest.fn(),
|
||||
}));
|
||||
|
||||
const emptyWorkSummary: WorkSummary = { toolCalls: [], totalToolCalls: 0, totalToolErrors: 0 };
|
||||
|
||||
function createLogger() {
|
||||
return { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() };
|
||||
}
|
||||
|
|
@ -38,6 +41,7 @@ describe('streamAgentRun', () => {
|
|||
jest.mocked(executeResumableStream).mockResolvedValue({
|
||||
status: 'errored',
|
||||
mastraRunId: 'mastra-run-1',
|
||||
workSummary: emptyWorkSummary,
|
||||
});
|
||||
const eventBus = createEventBus();
|
||||
const agent = {
|
||||
|
|
@ -77,6 +81,7 @@ describe('streamAgentRun', () => {
|
|||
jest.mocked(executeResumableStream).mockResolvedValue({
|
||||
status: 'completed',
|
||||
mastraRunId: 'mastra-run-1',
|
||||
workSummary: emptyWorkSummary,
|
||||
});
|
||||
const eventBus = createEventBus();
|
||||
const agent = {
|
||||
|
|
@ -116,6 +121,7 @@ describe('streamAgentRun', () => {
|
|||
mockedExecuteResumableStream.mockResolvedValue({
|
||||
status: 'suspended',
|
||||
mastraRunId: 'mastra-run-1',
|
||||
workSummary: emptyWorkSummary,
|
||||
suspension: {
|
||||
requestId: 'request-1',
|
||||
toolCallId: 'tool-call-1',
|
||||
|
|
@ -181,6 +187,7 @@ describe('streamAgentRun', () => {
|
|||
status: 'completed',
|
||||
mastraRunId: 'mastra-run-2',
|
||||
text: Promise.resolve('done'),
|
||||
workSummary: emptyWorkSummary,
|
||||
});
|
||||
|
||||
await streamAgentRun(
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import type { RunTree } from 'langsmith';
|
|||
import type { InstanceAiEventBus } from '../event-bus';
|
||||
import type { Logger } from '../logger';
|
||||
import { mapMastraChunkToEvent } from '../stream/map-chunk';
|
||||
import { WorkSummaryAccumulator, type WorkSummary } from '../stream/work-summary-accumulator';
|
||||
import { getTraceParentRun, setTraceParentOverride } from '../tracing/langsmith-tracing';
|
||||
import { asResumable, parseSuspension } from '../utils/stream-helpers';
|
||||
import type { SuspensionInfo } from '../utils/stream-helpers';
|
||||
|
|
@ -68,6 +69,8 @@ export interface ExecuteResumableStreamResult {
|
|||
text?: Promise<string>;
|
||||
suspension?: SuspensionInfo;
|
||||
confirmationEvent?: ConfirmationRequestEvent;
|
||||
/** Accumulated tool call outcomes observed during stream consumption. */
|
||||
workSummary: WorkSummary;
|
||||
}
|
||||
|
||||
export interface LlmStepTraceHooks {
|
||||
|
|
@ -1817,6 +1820,7 @@ export async function executeResumableStream(
|
|||
let activeStream = options.stream.fullStream;
|
||||
let activeMastraRunId = options.stream.runId ?? options.initialMastraRunId ?? '';
|
||||
let text = options.stream.text;
|
||||
const workSummaryAccumulator = new WorkSummaryAccumulator();
|
||||
|
||||
let currentResponseId: string | undefined;
|
||||
|
||||
|
|
@ -1847,7 +1851,12 @@ export async function executeResumableStream(
|
|||
status: 'cancelled',
|
||||
error: 'Run cancelled while streaming',
|
||||
});
|
||||
return { status: 'cancelled', mastraRunId: activeMastraRunId, text };
|
||||
return {
|
||||
status: 'cancelled',
|
||||
mastraRunId: activeMastraRunId,
|
||||
text,
|
||||
workSummary: workSummaryAccumulator.toSummary(),
|
||||
};
|
||||
}
|
||||
|
||||
await startSyntheticToolTrace(chunk, syntheticToolRecords);
|
||||
|
|
@ -1915,6 +1924,7 @@ export async function executeResumableStream(
|
|||
currentResponseId,
|
||||
);
|
||||
if (event) {
|
||||
workSummaryAccumulator.observe(event);
|
||||
let shouldPublishEvent = true;
|
||||
|
||||
if (event.type === 'confirmation-request') {
|
||||
|
|
@ -1961,11 +1971,21 @@ export async function executeResumableStream(
|
|||
});
|
||||
|
||||
if (options.context.signal.aborted) {
|
||||
return { status: 'cancelled', mastraRunId: activeMastraRunId, text };
|
||||
return {
|
||||
status: 'cancelled',
|
||||
mastraRunId: activeMastraRunId,
|
||||
text,
|
||||
workSummary: workSummaryAccumulator.toSummary(),
|
||||
};
|
||||
}
|
||||
|
||||
if (!suspension) {
|
||||
return { status: hasError ? 'errored' : 'completed', mastraRunId: activeMastraRunId, text };
|
||||
return {
|
||||
status: hasError ? 'errored' : 'completed',
|
||||
mastraRunId: activeMastraRunId,
|
||||
text,
|
||||
workSummary: workSummaryAccumulator.toSummary(),
|
||||
};
|
||||
}
|
||||
|
||||
if (options.control.mode === 'manual') {
|
||||
|
|
@ -1974,6 +1994,7 @@ export async function executeResumableStream(
|
|||
mastraRunId: activeMastraRunId,
|
||||
text,
|
||||
suspension,
|
||||
workSummary: workSummaryAccumulator.toSummary(),
|
||||
...(confirmationEvent ? { confirmationEvent } : {}),
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,129 @@
|
|||
import type { InstanceAiEvent } from '@n8n/api-types';
|
||||
|
||||
import { WorkSummaryAccumulator } from '../work-summary-accumulator';
|
||||
|
||||
function toolCallEvent(toolCallId: string, toolName: string): InstanceAiEvent {
|
||||
return {
|
||||
type: 'tool-call',
|
||||
runId: 'run-1',
|
||||
agentId: 'agent-1',
|
||||
payload: { toolCallId, toolName, args: {} },
|
||||
};
|
||||
}
|
||||
|
||||
function toolResultEvent(toolCallId: string): InstanceAiEvent {
|
||||
return {
|
||||
type: 'tool-result',
|
||||
runId: 'run-1',
|
||||
agentId: 'agent-1',
|
||||
payload: { toolCallId, result: 'ok' },
|
||||
};
|
||||
}
|
||||
|
||||
function toolErrorEvent(toolCallId: string, error: string): InstanceAiEvent {
|
||||
return {
|
||||
type: 'tool-error',
|
||||
runId: 'run-1',
|
||||
agentId: 'agent-1',
|
||||
payload: { toolCallId, error },
|
||||
};
|
||||
}
|
||||
|
||||
describe('WorkSummaryAccumulator', () => {
|
||||
it('returns empty summary when no events observed', () => {
|
||||
const accumulator = new WorkSummaryAccumulator();
|
||||
const summary = accumulator.toSummary();
|
||||
expect(summary.totalToolCalls).toBe(0);
|
||||
expect(summary.totalToolErrors).toBe(0);
|
||||
expect(summary.toolCalls).toEqual([]);
|
||||
});
|
||||
|
||||
it('tracks a successful tool call', () => {
|
||||
const accumulator = new WorkSummaryAccumulator();
|
||||
accumulator.observe(toolCallEvent('tc-1', 'list-workflows'));
|
||||
accumulator.observe(toolResultEvent('tc-1'));
|
||||
|
||||
const summary = accumulator.toSummary();
|
||||
expect(summary.totalToolCalls).toBe(1);
|
||||
expect(summary.totalToolErrors).toBe(0);
|
||||
expect(summary.toolCalls).toEqual([
|
||||
{ toolCallId: 'tc-1', toolName: 'list-workflows', succeeded: true },
|
||||
]);
|
||||
});
|
||||
|
||||
it('tracks a failed tool call', () => {
|
||||
const accumulator = new WorkSummaryAccumulator();
|
||||
accumulator.observe(toolCallEvent('tc-1', 'get-workflow'));
|
||||
accumulator.observe(toolErrorEvent('tc-1', 'Not found'));
|
||||
|
||||
const summary = accumulator.toSummary();
|
||||
expect(summary.totalToolCalls).toBe(1);
|
||||
expect(summary.totalToolErrors).toBe(1);
|
||||
expect(summary.toolCalls[0]).toMatchObject({
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'get-workflow',
|
||||
succeeded: false,
|
||||
errorSummary: 'Not found',
|
||||
});
|
||||
});
|
||||
|
||||
it('tracks multiple tool calls with mixed outcomes', () => {
|
||||
const accumulator = new WorkSummaryAccumulator();
|
||||
accumulator.observe(toolCallEvent('tc-1', 'list-workflows'));
|
||||
accumulator.observe(toolResultEvent('tc-1'));
|
||||
accumulator.observe(toolCallEvent('tc-2', 'build-workflow'));
|
||||
accumulator.observe(toolErrorEvent('tc-2', 'Compilation error'));
|
||||
accumulator.observe(toolCallEvent('tc-3', 'list-credentials'));
|
||||
accumulator.observe(toolResultEvent('tc-3'));
|
||||
|
||||
const summary = accumulator.toSummary();
|
||||
expect(summary.totalToolCalls).toBe(3);
|
||||
expect(summary.totalToolErrors).toBe(1);
|
||||
});
|
||||
|
||||
it('ignores non-tool events', () => {
|
||||
const accumulator = new WorkSummaryAccumulator();
|
||||
accumulator.observe({
|
||||
type: 'text-delta',
|
||||
runId: 'run-1',
|
||||
agentId: 'agent-1',
|
||||
payload: { text: 'hello' },
|
||||
});
|
||||
|
||||
const summary = accumulator.toSummary();
|
||||
expect(summary.totalToolCalls).toBe(0);
|
||||
});
|
||||
|
||||
it('is idempotent — multiple toSummary calls return same data', () => {
|
||||
const accumulator = new WorkSummaryAccumulator();
|
||||
accumulator.observe(toolCallEvent('tc-1', 'list-workflows'));
|
||||
accumulator.observe(toolResultEvent('tc-1'));
|
||||
|
||||
const summary1 = accumulator.toSummary();
|
||||
const summary2 = accumulator.toSummary();
|
||||
expect(summary1).toEqual(summary2);
|
||||
});
|
||||
|
||||
it('truncates long error summaries to 500 chars', () => {
|
||||
const accumulator = new WorkSummaryAccumulator();
|
||||
accumulator.observe(toolCallEvent('tc-1', 'build-workflow'));
|
||||
accumulator.observe(toolErrorEvent('tc-1', 'x'.repeat(1000)));
|
||||
|
||||
const summary = accumulator.toSummary();
|
||||
expect(summary.toolCalls[0].errorSummary).toHaveLength(500);
|
||||
});
|
||||
|
||||
it('de-duplicates by toolCallId (keeps latest outcome)', () => {
|
||||
const accumulator = new WorkSummaryAccumulator();
|
||||
// First attempt: fails
|
||||
accumulator.observe(toolCallEvent('tc-1', 'build-workflow'));
|
||||
accumulator.observe(toolErrorEvent('tc-1', 'Failed'));
|
||||
// Resumed stream replays the same toolCallId as succeeded
|
||||
accumulator.observe(toolCallEvent('tc-1', 'build-workflow'));
|
||||
accumulator.observe(toolResultEvent('tc-1'));
|
||||
|
||||
const summary = accumulator.toSummary();
|
||||
expect(summary.totalToolCalls).toBe(1);
|
||||
expect(summary.toolCalls[0].succeeded).toBe(true);
|
||||
});
|
||||
});
|
||||
|
|
@ -7,6 +7,7 @@ import {
|
|||
executeResumableStream,
|
||||
type ResumableStreamSource,
|
||||
} from '../runtime/resumable-stream-executor';
|
||||
import type { WorkSummary } from '../stream/work-summary-accumulator';
|
||||
|
||||
export interface ConsumeWithHitlOptions {
|
||||
agent: Agent;
|
||||
|
|
@ -31,6 +32,8 @@ export interface ConsumeWithHitlOptions {
|
|||
export interface ConsumeWithHitlResult {
|
||||
/** Promise that resolves to the agent's full text output (including post-resume text). */
|
||||
text: Promise<string>;
|
||||
/** Accumulated tool call outcomes observed during stream consumption. */
|
||||
workSummary: WorkSummary;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -77,5 +80,5 @@ export async function consumeStreamWithHitl(
|
|||
llmStepTraceHooks: options.llmStepTraceHooks,
|
||||
});
|
||||
|
||||
return { text: result.text ?? options.stream.text };
|
||||
return { text: result.text ?? options.stream.text, workSummary: result.workSummary };
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,83 @@
|
|||
import type { InstanceAiEvent } from '@n8n/api-types';
|
||||
import { z } from 'zod';
|
||||
|
||||
// ── Schema (source of truth) ────────────────────────────────────────────────
|
||||
|
||||
export const toolCallSummarySchema = z.object({
|
||||
toolCallId: z.string(),
|
||||
toolName: z.string(),
|
||||
succeeded: z.boolean(),
|
||||
errorSummary: z.string().optional(),
|
||||
});
|
||||
|
||||
export const workSummarySchema = z.object({
|
||||
toolCalls: z.array(toolCallSummarySchema),
|
||||
totalToolCalls: z.number().int().min(0),
|
||||
totalToolErrors: z.number().int().min(0),
|
||||
});
|
||||
|
||||
export type ToolCallSummary = z.infer<typeof toolCallSummarySchema>;
|
||||
export type WorkSummary = z.infer<typeof workSummarySchema>;
|
||||
|
||||
// ── Accumulator ─────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Lightweight observer that accumulates tool call outcomes from stream events.
|
||||
* Instantiated per-stream in `executeResumableStream`, fed each mapped event,
|
||||
* and drained at the end to produce a {@link WorkSummary}.
|
||||
*
|
||||
* Keyed by `toolCallId` — duplicate IDs (e.g. from resumed streams) are
|
||||
* de-duplicated by keeping the latest outcome.
|
||||
*/
|
||||
export class WorkSummaryAccumulator {
|
||||
private readonly calls = new Map<string, ToolCallSummary>();
|
||||
|
||||
/** Feed an event from the stream. Only tool-call / tool-result / tool-error
|
||||
* events are processed; all others are silently ignored. */
|
||||
observe(event: InstanceAiEvent): void {
|
||||
switch (event.type) {
|
||||
case 'tool-call': {
|
||||
const { toolCallId, toolName } = event.payload;
|
||||
if (!toolCallId) break;
|
||||
this.calls.set(toolCallId, {
|
||||
toolCallId,
|
||||
toolName,
|
||||
succeeded: true, // optimistic — flipped on error
|
||||
});
|
||||
break;
|
||||
}
|
||||
case 'tool-result': {
|
||||
const { toolCallId } = event.payload;
|
||||
if (!toolCallId) break;
|
||||
const existing = this.calls.get(toolCallId);
|
||||
if (existing) {
|
||||
existing.succeeded = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'tool-error': {
|
||||
const { toolCallId, error } = event.payload;
|
||||
if (!toolCallId) break;
|
||||
const existing = this.calls.get(toolCallId);
|
||||
if (existing) {
|
||||
existing.succeeded = false;
|
||||
existing.errorSummary = typeof error === 'string' ? error.slice(0, 500) : undefined;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// Ignore text-delta, reasoning-delta, confirmation-request, error, etc.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/** Produce a frozen summary. Safe to call multiple times (idempotent). */
|
||||
toSummary(): WorkSummary {
|
||||
const toolCalls = [...this.calls.values()].map((c) => ({ ...c }));
|
||||
return {
|
||||
toolCalls,
|
||||
totalToolCalls: toolCalls.length,
|
||||
totalToolErrors: toolCalls.filter((c) => !c.succeeded).length,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -27,8 +27,8 @@ import {
|
|||
} from './tracing-utils';
|
||||
import { createVerifyBuiltWorkflowTool } from './verify-built-workflow.tool';
|
||||
import { registerWithMastra } from '../../agent/register-with-mastra';
|
||||
import { buildSubAgentBriefing } from '../../agent/sub-agent-briefing';
|
||||
import { createLlmStepTraceHooks } from '../../runtime/resumable-stream-executor';
|
||||
import { formatPreviousAttempts } from '../../storage/iteration-log';
|
||||
import { consumeStreamWithHitl } from '../../stream/consume-with-hitl';
|
||||
import {
|
||||
buildAgentTraceInputs,
|
||||
|
|
@ -273,33 +273,30 @@ export async function startBuildWorkflowAgentTask(
|
|||
|
||||
const { workflowId } = input;
|
||||
|
||||
let iterationContext = '';
|
||||
if (context.iterationLog) {
|
||||
const taskKey = `build:${workflowId ?? 'new'}`;
|
||||
try {
|
||||
const entries = await context.iterationLog.getForTask(context.threadId, taskKey);
|
||||
iterationContext = formatPreviousAttempts(entries);
|
||||
} catch {
|
||||
// Non-fatal — iteration log is best-effort
|
||||
}
|
||||
}
|
||||
|
||||
const conversationCtx = input.conversationContext
|
||||
? `\n\n[CONVERSATION CONTEXT: ${input.conversationContext}]`
|
||||
: '';
|
||||
|
||||
let briefing: string;
|
||||
if (useSandbox) {
|
||||
if (workflowId) {
|
||||
briefing = `${input.task}${conversationCtx}\n\n[CONTEXT: Modifying existing workflow ${workflowId}. The current code is pre-loaded in ~/workspace/src/workflow.ts — read it first, then edit. Use workflowId "${workflowId}" when calling submit-workflow.]\n\n[WORK ITEM ID: ${workItemId}]\n\n${DETACHED_BUILDER_REQUIREMENTS}${iterationContext ? `\n\n${iterationContext}` : ''}`;
|
||||
} else {
|
||||
briefing = `${input.task}${conversationCtx}\n\n[WORK ITEM ID: ${workItemId}]\n\n${DETACHED_BUILDER_REQUIREMENTS}${iterationContext ? `\n\n${iterationContext}` : ''}`;
|
||||
}
|
||||
// Build additional context based on sandbox mode and existing workflow
|
||||
let additionalContext = '';
|
||||
if (useSandbox && workflowId) {
|
||||
additionalContext = `[CONTEXT: Modifying existing workflow ${workflowId}. The current code is pre-loaded in ~/workspace/src/workflow.ts — read it first, then edit. Use workflowId "${workflowId}" when calling submit-workflow.]\n\n[WORK ITEM ID: ${workItemId}]`;
|
||||
} else if (useSandbox) {
|
||||
additionalContext = `[WORK ITEM ID: ${workItemId}]`;
|
||||
} else if (workflowId) {
|
||||
briefing = `${input.task}${conversationCtx}\n\n[CONTEXT: Modifying existing workflow ${workflowId}. Use workflowId "${workflowId}" when calling build-workflow.]${iterationContext ? `\n\n${iterationContext}` : ''}`;
|
||||
} else {
|
||||
briefing = `${input.task}${conversationCtx}${iterationContext ? `\n\n${iterationContext}` : ''}`;
|
||||
additionalContext = `[CONTEXT: Modifying existing workflow ${workflowId}. Use workflowId "${workflowId}" when calling build-workflow.]`;
|
||||
}
|
||||
|
||||
const briefing = await buildSubAgentBriefing({
|
||||
task: input.task,
|
||||
conversationContext: input.conversationContext,
|
||||
additionalContext: additionalContext || undefined,
|
||||
requirements: useSandbox ? DETACHED_BUILDER_REQUIREMENTS : undefined,
|
||||
iteration: context.iterationLog
|
||||
? {
|
||||
log: context.iterationLog,
|
||||
threadId: context.threadId,
|
||||
taskKey: `build:${workflowId ?? 'new'}`,
|
||||
}
|
||||
: undefined,
|
||||
runningTasks: context.getRunningTaskSummaries?.(),
|
||||
});
|
||||
const traceContext = await createDetachedSubAgentTracing(context, {
|
||||
agentId: subAgentId,
|
||||
role: 'workflow-builder',
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import {
|
|||
withTraceContextActor,
|
||||
} from './tracing-utils';
|
||||
import { registerWithMastra } from '../../agent/register-with-mastra';
|
||||
import { buildSubAgentBriefing } from '../../agent/sub-agent-briefing';
|
||||
import { createLlmStepTraceHooks } from '../../runtime/resumable-stream-executor';
|
||||
import { consumeStreamWithHitl } from '../../stream/consume-with-hitl';
|
||||
import {
|
||||
|
|
@ -146,10 +147,11 @@ export async function startDataTableAgentTask(
|
|||
|
||||
registerWithMastra(subAgentId, subAgent, context.storage);
|
||||
|
||||
const conversationCtx = input.conversationContext
|
||||
? `\n\n[CONVERSATION CONTEXT: ${input.conversationContext}]`
|
||||
: '';
|
||||
const briefing = `${input.task}${conversationCtx}`;
|
||||
const briefing = await buildSubAgentBriefing({
|
||||
task: input.task,
|
||||
conversationContext: input.conversationContext,
|
||||
runningTasks: context.getRunningTaskSummaries?.(),
|
||||
});
|
||||
|
||||
const traceParent = getTraceParentRun();
|
||||
return await withTraceParentContext(traceParent, async () => {
|
||||
|
|
|
|||
|
|
@ -30,6 +30,23 @@ export type DelegateInput = z.infer<typeof delegateInputSchema>;
|
|||
|
||||
export const delegateOutputSchema = z.object({
|
||||
result: z.string().describe('The sub-agent synthesized answer'),
|
||||
toolCallCount: z
|
||||
.number()
|
||||
.int()
|
||||
.min(0)
|
||||
.optional()
|
||||
.describe('Total number of tool invocations the sub-agent made'),
|
||||
toolErrorCount: z
|
||||
.number()
|
||||
.int()
|
||||
.min(0)
|
||||
.optional()
|
||||
.describe('Number of tool invocations that failed'),
|
||||
durationMs: z.number().optional().describe('Wall-clock duration in milliseconds'),
|
||||
blockers: z
|
||||
.array(z.string())
|
||||
.optional()
|
||||
.describe('Specific blockers encountered during execution'),
|
||||
});
|
||||
|
||||
export type DelegateOutput = z.infer<typeof delegateOutputSchema>;
|
||||
|
|
|
|||
|
|
@ -14,9 +14,10 @@ import {
|
|||
withTraceRun,
|
||||
} from './tracing-utils';
|
||||
import { registerWithMastra } from '../../agent/register-with-mastra';
|
||||
import { buildSubAgentBriefing } from '../../agent/sub-agent-briefing';
|
||||
import { buildDebriefing } from '../../agent/sub-agent-debriefing';
|
||||
import { createSubAgent, SUB_AGENT_PROTOCOL } from '../../agent/sub-agent-factory';
|
||||
import { createLlmStepTraceHooks } from '../../runtime/resumable-stream-executor';
|
||||
import { formatPreviousAttempts } from '../../storage/iteration-log';
|
||||
import { consumeStreamWithHitl } from '../../stream/consume-with-hitl';
|
||||
import { getTraceParentRun, withTraceParentContext } from '../../tracing/langsmith-tracing';
|
||||
import type { OrchestrationContext } from '../../types';
|
||||
|
|
@ -66,23 +67,17 @@ async function buildDelegateBriefing(
|
|||
artifacts?: unknown,
|
||||
conversationContext?: string,
|
||||
): Promise<string> {
|
||||
const serializedArtifacts = artifacts ? `\n\nArtifacts: ${JSON.stringify(artifacts)}` : '';
|
||||
const conversationCtx = conversationContext
|
||||
? `\n\n[CONVERSATION CONTEXT: ${conversationContext}]`
|
||||
: '';
|
||||
const structured = await buildSubAgentBriefing({
|
||||
task: briefing,
|
||||
conversationContext,
|
||||
artifacts: artifacts as Record<string, unknown> | undefined,
|
||||
iteration: context.iterationLog
|
||||
? { log: context.iterationLog, threadId: context.threadId, taskKey: `delegate:${role}` }
|
||||
: undefined,
|
||||
runningTasks: context.getRunningTaskSummaries?.(),
|
||||
});
|
||||
|
||||
let iterationContext = '';
|
||||
if (context.iterationLog) {
|
||||
const taskKey = `delegate:${role}`;
|
||||
try {
|
||||
const entries = await context.iterationLog.getForTask(context.threadId, taskKey);
|
||||
iterationContext = formatPreviousAttempts(entries);
|
||||
} catch {
|
||||
// Non-fatal — iteration log is best-effort
|
||||
}
|
||||
}
|
||||
|
||||
return `${briefing}${conversationCtx}${serializedArtifacts}${iterationContext ? `\n\n${iterationContext}` : ''}\n\nRemember: ${SUB_AGENT_PROTOCOL}`;
|
||||
return `${structured}\n\nRemember: ${SUB_AGENT_PROTOCOL}`;
|
||||
}
|
||||
|
||||
export interface DetachedDelegateTaskInput {
|
||||
|
|
@ -261,6 +256,7 @@ export function createDelegateTool(context: OrchestrationContext) {
|
|||
}
|
||||
|
||||
const subAgentId = generateAgentId();
|
||||
const startTime = Date.now();
|
||||
|
||||
// 2. Publish agent-spawned
|
||||
context.eventBus.publish(context.threadId, {
|
||||
|
|
@ -311,7 +307,7 @@ export function createDelegateTool(context: OrchestrationContext) {
|
|||
);
|
||||
|
||||
// 4. Stream sub-agent with HITL support
|
||||
const resultText = await withTraceRun(context, traceRun, async () => {
|
||||
const consumeResult = await withTraceRun(context, traceRun, async () => {
|
||||
const traceParent = getTraceParentRun();
|
||||
return await withTraceParentContext(traceParent, async () => {
|
||||
const llmStepTraceHooks = createLlmStepTraceHooks(traceParent);
|
||||
|
|
@ -324,7 +320,7 @@ export function createDelegateTool(context: OrchestrationContext) {
|
|||
...(llmStepTraceHooks?.executionOptions ?? {}),
|
||||
});
|
||||
|
||||
const result = await consumeStreamWithHitl({
|
||||
return await consumeStreamWithHitl({
|
||||
agent: subAgent,
|
||||
stream: stream as {
|
||||
runId?: string;
|
||||
|
|
@ -340,15 +336,26 @@ export function createDelegateTool(context: OrchestrationContext) {
|
|||
waitForConfirmation: context.waitForConfirmation,
|
||||
llmStepTraceHooks,
|
||||
});
|
||||
|
||||
return await result.text;
|
||||
});
|
||||
});
|
||||
|
||||
const resultText = await consumeResult.text;
|
||||
const debriefing = buildDebriefing({
|
||||
agentId: subAgentId,
|
||||
role: input.role,
|
||||
result: resultText,
|
||||
workSummary: consumeResult.workSummary,
|
||||
startTime,
|
||||
});
|
||||
|
||||
await finishTraceRun(context, traceRun, {
|
||||
outputs: {
|
||||
result: resultText,
|
||||
agentId: subAgentId,
|
||||
role: input.role,
|
||||
toolCallCount: debriefing.toolCallCount,
|
||||
toolErrorCount: debriefing.toolErrorCount,
|
||||
durationMs: debriefing.durationMs,
|
||||
},
|
||||
});
|
||||
|
||||
|
|
@ -363,7 +370,13 @@ export function createDelegateTool(context: OrchestrationContext) {
|
|||
},
|
||||
});
|
||||
|
||||
return { result: resultText };
|
||||
return {
|
||||
result: resultText,
|
||||
toolCallCount: debriefing.toolCallCount,
|
||||
toolErrorCount: debriefing.toolErrorCount,
|
||||
durationMs: debriefing.durationMs,
|
||||
blockers: debriefing.blockers,
|
||||
};
|
||||
} catch (error) {
|
||||
// 8. Publish agent-completed with error
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import {
|
|||
withTraceContextActor,
|
||||
} from './tracing-utils';
|
||||
import { registerWithMastra } from '../../agent/register-with-mastra';
|
||||
import { buildSubAgentBriefing } from '../../agent/sub-agent-briefing';
|
||||
import { createLlmStepTraceHooks } from '../../runtime/resumable-stream-executor';
|
||||
import { consumeStreamWithHitl } from '../../stream/consume-with-hitl';
|
||||
import {
|
||||
|
|
@ -85,12 +86,12 @@ export async function startResearchAgentTask(
|
|||
},
|
||||
});
|
||||
|
||||
const conversationCtx = input.conversationContext
|
||||
? `\n\n[CONVERSATION CONTEXT: ${input.conversationContext}]`
|
||||
: '';
|
||||
const briefing = input.constraints
|
||||
? `${input.goal}${conversationCtx}\n\nConstraints: ${input.constraints}`
|
||||
: `${input.goal}${conversationCtx}`;
|
||||
const briefing = await buildSubAgentBriefing({
|
||||
task: input.goal,
|
||||
conversationContext: input.conversationContext,
|
||||
additionalContext: input.constraints ? `Constraints: ${input.constraints}` : undefined,
|
||||
runningTasks: context.getRunningTaskSummaries?.(),
|
||||
});
|
||||
const traceContext = await createDetachedSubAgentTracing(context, {
|
||||
agentId: subAgentId,
|
||||
role: 'web-researcher',
|
||||
|
|
|
|||
|
|
@ -838,6 +838,9 @@ export interface OrchestrationContext {
|
|||
workflowTaskService?: WorkflowTaskService;
|
||||
/** When set, LangSmith traces are routed through the AI service proxy. */
|
||||
tracingProxyConfig?: ServiceProxyConfig;
|
||||
/** Summaries of currently running background tasks in this thread.
|
||||
* Used to give sub-agents thread-state awareness (what else is happening). */
|
||||
getRunningTaskSummaries?: () => Array<{ taskId: string; role: string; goal?: string }>;
|
||||
}
|
||||
|
||||
// ── Agent factory options ────────────────────────────────────────────────────
|
||||
|
|
|
|||
Loading…
Reference in a new issue