mirror of
https://github.com/n8n-io/n8n
synced 2026-04-21 15:47:20 +00:00
fix: Update working memory using tools (#28467)
This commit is contained in:
parent
9ef55ca4f9
commit
39189c3985
10 changed files with 302 additions and 392 deletions
|
|
@ -367,10 +367,11 @@ At end of turn, `saveToMemory()` uses `list.turnDelta()` and
|
|||
`saveMessagesToThread`. If **semantic recall** is configured with an embedder
|
||||
and `memory.saveEmbeddings`, new messages are embedded and stored.
|
||||
|
||||
**Working memory:** when configured, the runtime parses `<working_memory>` …
|
||||
`</working_memory>` regions from assistant text, validates structured JSON if a
|
||||
schema exists, strips the tags from the visible message, and asynchronously
|
||||
persists via `memory.saveWorkingMemory`.
|
||||
**Working memory:** when configured, the runtime injects an `updateWorkingMemory`
|
||||
tool into the agent's tool set. The current state is included in the system prompt
|
||||
so the model can read it; when new information should be persisted the model calls
|
||||
the tool, which validates the input and asynchronously persists via
|
||||
`memory.saveWorkingMemory`.
|
||||
|
||||
**Thread titles:** `titleGeneration` triggers `generateThreadTitle` (fire-and-forget)
|
||||
after a successful save when persistence and memory are present.
|
||||
|
|
@ -414,7 +415,7 @@ src/
|
|||
tool-adapter.ts — buildToolMap, executeTool, toAiSdkTools, suspend / agent-result guards
|
||||
stream.ts — convertChunk, toTokenUsage
|
||||
runtime-helpers.ts — normalizeInput, usage merge, stream error helpers, …
|
||||
working-memory.ts — instruction text, parse/filter for working_memory tags
|
||||
working-memory.ts — instruction text, updateWorkingMemory tool builder
|
||||
strip-orphaned-tool-messages.ts
|
||||
title-generation.ts
|
||||
logger.ts
|
||||
|
|
|
|||
|
|
@ -224,7 +224,7 @@ describe('custom BuiltMemory backend', () => {
|
|||
expect(findLastTextContent(result.messages)?.toLowerCase()).not.toContain('aurora');
|
||||
|
||||
// Thread 2 working memory should be independent
|
||||
expect(store.workingMemory.get(thread2)).not.toContain('aurora');
|
||||
expect(store.workingMemory.get(thread2)).toBeFalsy();
|
||||
});
|
||||
|
||||
it('thread-scoped working memory allows recall within the same thread when history is truncated', async () => {
|
||||
|
|
|
|||
|
|
@ -32,26 +32,24 @@ describe('freeform working memory', () => {
|
|||
expect(findLastTextContent(result.messages)?.toLowerCase()).toContain('berlin');
|
||||
});
|
||||
|
||||
it('working memory tags are stripped from visible response', async () => {
|
||||
it('working memory is updated when new information is provided', async () => {
|
||||
const memory = new Memory().storage('memory').lastMessages(10).freeform(template);
|
||||
|
||||
const agent = new Agent('strip-test')
|
||||
const agent = new Agent('wm-update-test')
|
||||
.model(getModel('anthropic'))
|
||||
.instructions('You are a helpful assistant. Be concise.')
|
||||
.memory(memory);
|
||||
|
||||
const threadId = `strip-${Date.now()}`;
|
||||
const threadId = `wm-update-${Date.now()}`;
|
||||
const options = { persistence: { threadId, resourceId: 'test-user' } };
|
||||
|
||||
const result = await agent.generate('My name is Bob.', options);
|
||||
|
||||
const allText = result.messages
|
||||
.flatMap((m) => ('content' in m ? m.content : []))
|
||||
.filter((c) => c.type === 'text')
|
||||
.map((c) => (c as { text: string }).text)
|
||||
.join(' ');
|
||||
expect(allText).not.toContain('<working_memory>');
|
||||
expect(allText).not.toContain('</working_memory>');
|
||||
const toolCalls = result.messages.flatMap((m) =>
|
||||
'content' in m ? m.content.filter((c) => c.type === 'tool-call') : [],
|
||||
) as Array<{ type: 'tool-call'; toolName: string }>;
|
||||
const wmToolCall = toolCalls.find((c) => c.toolName === 'updateWorkingMemory');
|
||||
expect(wmToolCall).toBeDefined();
|
||||
});
|
||||
|
||||
it('working memory persists across threads with same resourceId', async () => {
|
||||
|
|
|
|||
|
|
@ -1,62 +1,74 @@
|
|||
import { z } from 'zod';
|
||||
|
||||
import {
|
||||
parseWorkingMemory,
|
||||
buildWorkingMemoryInstruction,
|
||||
buildWorkingMemoryTool,
|
||||
templateFromSchema,
|
||||
WorkingMemoryStreamFilter,
|
||||
UPDATE_WORKING_MEMORY_TOOL_NAME,
|
||||
WORKING_MEMORY_DEFAULT_INSTRUCTION,
|
||||
} from '../runtime/working-memory';
|
||||
import type { StreamChunk } from '../types';
|
||||
|
||||
describe('parseWorkingMemory', () => {
|
||||
it('extracts content between tags at end of text', () => {
|
||||
const text = 'Hello world.\n<working_memory>\n# Name: Alice\n</working_memory>';
|
||||
const result = parseWorkingMemory(text);
|
||||
expect(result.cleanText).toBe('Hello world.');
|
||||
expect(result.workingMemory).toBe('# Name: Alice');
|
||||
});
|
||||
|
||||
it('extracts content between tags in middle of text', () => {
|
||||
const text = 'Before.\n<working_memory>\ndata\n</working_memory>\nAfter.';
|
||||
const result = parseWorkingMemory(text);
|
||||
expect(result.cleanText).toBe('Before.\nAfter.');
|
||||
expect(result.workingMemory).toBe('data');
|
||||
});
|
||||
|
||||
it('returns null when no tags present', () => {
|
||||
const text = 'Just a normal response.';
|
||||
const result = parseWorkingMemory(text);
|
||||
expect(result.cleanText).toBe('Just a normal response.');
|
||||
expect(result.workingMemory).toBeNull();
|
||||
});
|
||||
|
||||
it('handles empty working memory', () => {
|
||||
const text = 'Response.\n<working_memory>\n</working_memory>';
|
||||
const result = parseWorkingMemory(text);
|
||||
expect(result.cleanText).toBe('Response.');
|
||||
expect(result.workingMemory).toBe('');
|
||||
});
|
||||
|
||||
it('handles multiline content with markdown', () => {
|
||||
const wm = '# User Context\n- **Name**: Alice\n- **City**: Berlin';
|
||||
const text = `Response text.\n<working_memory>\n${wm}\n</working_memory>`;
|
||||
const result = parseWorkingMemory(text);
|
||||
expect(result.workingMemory).toBe(wm);
|
||||
});
|
||||
});
|
||||
|
||||
describe('buildWorkingMemoryInstruction', () => {
|
||||
it('generates freeform instruction', () => {
|
||||
it('mentions the updateWorkingMemory tool name', () => {
|
||||
const result = buildWorkingMemoryInstruction('# Context\n- Name:', false);
|
||||
expect(result).toContain('<working_memory>');
|
||||
expect(result).toContain('</working_memory>');
|
||||
expect(result).toContain('# Context\n- Name:');
|
||||
expect(result).toContain(UPDATE_WORKING_MEMORY_TOOL_NAME);
|
||||
});
|
||||
|
||||
it('generates structured instruction mentioning JSON', () => {
|
||||
const result = buildWorkingMemoryInstruction('{"userName": ""}', true);
|
||||
it('instructs the model to call the tool only when something changed', () => {
|
||||
const result = buildWorkingMemoryInstruction('# Context\n- Name:', false);
|
||||
expect(result).toContain('Only call it when something has actually changed');
|
||||
});
|
||||
|
||||
it('includes the template in the instruction', () => {
|
||||
const template = '# Context\n- Name:\n- City:';
|
||||
const result = buildWorkingMemoryInstruction(template, false);
|
||||
expect(result).toContain(template);
|
||||
});
|
||||
|
||||
it('mentions JSON for structured variant', () => {
|
||||
const result = buildWorkingMemoryInstruction('{"name": ""}', true);
|
||||
expect(result).toContain('JSON');
|
||||
expect(result).toContain('<working_memory>');
|
||||
});
|
||||
|
||||
describe('custom instruction', () => {
|
||||
it('replaces the default instruction body when provided', () => {
|
||||
const custom = 'Always update working memory after every message.';
|
||||
const result = buildWorkingMemoryInstruction('# Template', false, custom);
|
||||
expect(result).toContain(custom);
|
||||
expect(result).not.toContain(WORKING_MEMORY_DEFAULT_INSTRUCTION);
|
||||
});
|
||||
|
||||
it('still includes the ## Working Memory heading', () => {
|
||||
const result = buildWorkingMemoryInstruction('# Template', false, 'Custom text.');
|
||||
expect(result).toContain('## Working Memory');
|
||||
});
|
||||
|
||||
it('still includes the template block', () => {
|
||||
const template = '# Context\n- Name:\n- City:';
|
||||
const result = buildWorkingMemoryInstruction(template, false, 'Custom text.');
|
||||
expect(result).toContain(template);
|
||||
});
|
||||
|
||||
it('still includes the format hint for structured memory', () => {
|
||||
const result = buildWorkingMemoryInstruction('{}', true, 'Custom text.');
|
||||
expect(result).toContain('JSON');
|
||||
});
|
||||
|
||||
it('still includes the format hint for freeform memory', () => {
|
||||
const result = buildWorkingMemoryInstruction('# Template', false, 'Custom text.');
|
||||
expect(result).toContain('Update the template with any new information learned');
|
||||
});
|
||||
|
||||
it('uses the default instruction when undefined is passed explicitly', () => {
|
||||
const withDefault = buildWorkingMemoryInstruction('# Template', false, undefined);
|
||||
const withoutArg = buildWorkingMemoryInstruction('# Template', false);
|
||||
expect(withDefault).toBe(withoutArg);
|
||||
});
|
||||
|
||||
it('WORKING_MEMORY_DEFAULT_INSTRUCTION appears in the output when no custom instruction is set', () => {
|
||||
const result = buildWorkingMemoryInstruction('# Template', false);
|
||||
expect(result).toContain(WORKING_MEMORY_DEFAULT_INSTRUCTION);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -69,7 +81,6 @@ describe('templateFromSchema', () => {
|
|||
const result = templateFromSchema(schema);
|
||||
expect(result).toContain('userName');
|
||||
expect(result).toContain('favoriteColor');
|
||||
// Should be valid JSON
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(result);
|
||||
|
|
@ -80,118 +91,117 @@ describe('templateFromSchema', () => {
|
|||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Helper that feeds chunks through a WorkingMemoryStreamFilter and collects
|
||||
* the output text and any persisted working memory content.
|
||||
*/
|
||||
async function runStreamFilter(
|
||||
chunks: string[],
|
||||
): Promise<{ outputText: string; persisted: string[] }> {
|
||||
const persisted: string[] = [];
|
||||
const stream = new TransformStream<StreamChunk>();
|
||||
const writer = stream.writable.getWriter();
|
||||
// eslint-disable-next-line @typescript-eslint/require-await
|
||||
const filter = new WorkingMemoryStreamFilter(writer, async (content) => {
|
||||
persisted.push(content);
|
||||
describe('buildWorkingMemoryTool — freeform', () => {
|
||||
it('returns a BuiltTool with the correct name', () => {
|
||||
const tool = buildWorkingMemoryTool({
|
||||
structured: false,
|
||||
persist: async () => {},
|
||||
});
|
||||
expect(tool.name).toBe(UPDATE_WORKING_MEMORY_TOOL_NAME);
|
||||
});
|
||||
|
||||
// Read the readable side concurrently to avoid backpressure deadlock
|
||||
const reader = stream.readable.getReader();
|
||||
const readAll = (async () => {
|
||||
let outputText = '';
|
||||
while (true) {
|
||||
const result = await reader.read();
|
||||
if (result.done) break;
|
||||
const chunk = result.value as StreamChunk;
|
||||
if (chunk.type === 'text-delta') outputText += chunk.delta;
|
||||
}
|
||||
return outputText;
|
||||
})();
|
||||
|
||||
for (const chunk of chunks) {
|
||||
await filter.write({ type: 'text-delta', delta: chunk });
|
||||
}
|
||||
await filter.flush();
|
||||
await writer.close();
|
||||
|
||||
const outputText = await readAll;
|
||||
return { outputText, persisted };
|
||||
}
|
||||
|
||||
describe('WorkingMemoryStreamFilter with tag split across multiple chunks', () => {
|
||||
it('handles tag split mid-open-tag', async () => {
|
||||
const { outputText, persisted } = await runStreamFilter([
|
||||
'Hello <work',
|
||||
'ing_memory>state</working_memory>',
|
||||
]);
|
||||
expect(outputText).toBe('Hello ');
|
||||
expect(persisted).toEqual(['state']);
|
||||
it('has a description', () => {
|
||||
const tool = buildWorkingMemoryTool({
|
||||
structured: false,
|
||||
persist: async () => {},
|
||||
});
|
||||
expect(tool.description).toBeTruthy();
|
||||
});
|
||||
|
||||
it('handles tag split mid-close-tag', async () => {
|
||||
const { outputText, persisted } = await runStreamFilter([
|
||||
'<working_memory>state</worki',
|
||||
'ng_memory> after',
|
||||
]);
|
||||
expect(persisted).toEqual(['state']);
|
||||
expect(outputText).toBe(' after');
|
||||
it('has a freeform input schema with a memory field', () => {
|
||||
const tool = buildWorkingMemoryTool({
|
||||
structured: false,
|
||||
persist: async () => {},
|
||||
});
|
||||
expect(tool.inputSchema).toBeDefined();
|
||||
const schema = tool.inputSchema as z.ZodObject<z.ZodRawShape>;
|
||||
const result = schema.safeParse({ memory: 'hello' });
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it('handles tag spread across 3+ chunks', async () => {
|
||||
const { outputText, persisted } = await runStreamFilter([
|
||||
'<wor',
|
||||
'king_mem',
|
||||
'ory>data</working_memory>',
|
||||
]);
|
||||
expect(persisted).toEqual(['data']);
|
||||
expect(outputText).toBe('');
|
||||
it('rejects input without memory field', () => {
|
||||
const tool = buildWorkingMemoryTool({
|
||||
structured: false,
|
||||
persist: async () => {},
|
||||
});
|
||||
const schema = tool.inputSchema as z.ZodObject<z.ZodRawShape>;
|
||||
const result = schema.safeParse({ other: 'value' });
|
||||
expect(result.success).toBe(false);
|
||||
});
|
||||
|
||||
it('handles partial < that is not a tag', async () => {
|
||||
const { outputText, persisted } = await runStreamFilter(['Hello <', 'div>world']);
|
||||
expect(outputText).toBe('Hello <div>world');
|
||||
expect(persisted).toEqual([]);
|
||||
it('handler calls persist with the memory string', async () => {
|
||||
const persisted: string[] = [];
|
||||
const tool = buildWorkingMemoryTool({
|
||||
structured: false,
|
||||
// eslint-disable-next-line @typescript-eslint/require-await
|
||||
persist: async (content) => {
|
||||
persisted.push(content);
|
||||
},
|
||||
});
|
||||
const result = await tool.handler!({ memory: 'test content' }, {} as never);
|
||||
expect(persisted).toEqual(['test content']);
|
||||
expect(result).toMatchObject({ success: true });
|
||||
});
|
||||
});
|
||||
|
||||
describe('parseWorkingMemory with invalid structured content', () => {
|
||||
it('strips tags and extracts content regardless of JSON validity', () => {
|
||||
const invalidJson = '{not valid json!!!}';
|
||||
const text = `Here is my response.\n<working_memory>\n${invalidJson}\n</working_memory>`;
|
||||
const result = parseWorkingMemory(text);
|
||||
|
||||
expect(result.cleanText).toBe('Here is my response.');
|
||||
expect(result.workingMemory).toBe(invalidJson);
|
||||
describe('buildWorkingMemoryTool — structured', () => {
|
||||
const schema = z.object({
|
||||
userName: z.string().optional().describe("The user's name"),
|
||||
location: z.string().optional().describe('Where the user lives'),
|
||||
});
|
||||
|
||||
it('strips tags with content that fails Zod schema validation', () => {
|
||||
// Content is valid JSON but wrong shape for the schema
|
||||
const wrongShape = '{"unexpected": true}';
|
||||
const text = `Response text.\n<working_memory>\n${wrongShape}\n</working_memory>`;
|
||||
const result = parseWorkingMemory(text);
|
||||
it('uses the Zod schema as input schema', () => {
|
||||
const tool = buildWorkingMemoryTool({
|
||||
structured: true,
|
||||
schema,
|
||||
persist: async () => {},
|
||||
});
|
||||
const inputSchema = tool.inputSchema as typeof schema;
|
||||
const result = inputSchema.safeParse({ userName: 'Alice', location: 'Berlin' });
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
// Tags are stripped from response regardless
|
||||
expect(result.cleanText).toBe('Response text.');
|
||||
// Raw content is returned — caller decides whether it passes validation
|
||||
expect(result.workingMemory).toBe(wrongShape);
|
||||
it('handler serializes input to JSON and calls persist', async () => {
|
||||
const persisted: string[] = [];
|
||||
const tool = buildWorkingMemoryTool({
|
||||
structured: true,
|
||||
schema,
|
||||
// eslint-disable-next-line @typescript-eslint/require-await
|
||||
persist: async (content) => {
|
||||
persisted.push(content);
|
||||
},
|
||||
});
|
||||
|
||||
// Verify the content would indeed fail schema validation
|
||||
expect(result.workingMemory).not.toBeNull();
|
||||
const input = { userName: 'Alice', location: 'Berlin' };
|
||||
await tool.handler!(input, {} as never);
|
||||
|
||||
expect(persisted).toHaveLength(1);
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(result.workingMemory!);
|
||||
parsed = JSON.parse(persisted[0]) as unknown;
|
||||
} catch {
|
||||
parsed = undefined;
|
||||
}
|
||||
expect(parsed).toBeDefined();
|
||||
expect(parsed).toMatchObject(input);
|
||||
});
|
||||
|
||||
it('strips tags even when content is completely non-JSON', () => {
|
||||
const text =
|
||||
'My reply.\n<working_memory>\nthis is just plain text, not JSON at all\n</working_memory>';
|
||||
const result = parseWorkingMemory(text);
|
||||
it('handler returns success confirmation', async () => {
|
||||
const tool = buildWorkingMemoryTool({
|
||||
structured: true,
|
||||
schema,
|
||||
persist: async () => {},
|
||||
});
|
||||
const result = await tool.handler!({ userName: 'Alice' }, {} as never);
|
||||
expect(result).toMatchObject({ success: true });
|
||||
});
|
||||
|
||||
expect(result.cleanText).toBe('My reply.');
|
||||
expect(result.workingMemory).toBe('this is just plain text, not JSON at all');
|
||||
it('falls back to freeform when no schema provided despite structured:true', () => {
|
||||
const tool = buildWorkingMemoryTool({
|
||||
structured: true,
|
||||
persist: async () => {},
|
||||
});
|
||||
const inputSchema = tool.inputSchema as z.ZodObject<z.ZodRawShape>;
|
||||
const result = inputSchema.safeParse({ memory: 'fallback text' });
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -106,6 +106,10 @@ export type {
|
|||
ModelLimits,
|
||||
} from './sdk/catalog';
|
||||
export { SqliteMemory } from './storage/sqlite-memory';
|
||||
export {
|
||||
UPDATE_WORKING_MEMORY_TOOL_NAME,
|
||||
WORKING_MEMORY_DEFAULT_INSTRUCTION,
|
||||
} from './runtime/working-memory';
|
||||
export type { SqliteMemoryConfig } from './storage/sqlite-memory';
|
||||
export { PostgresMemory } from './storage/postgres-memory';
|
||||
export type { PostgresMemoryConfig } from './storage/postgres-memory';
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ import type {
|
|||
XaiThinkingConfig,
|
||||
} from '../types';
|
||||
import { AgentEventBus } from './event-bus';
|
||||
import { createFilteredLogger } from './logger';
|
||||
import { saveMessagesToThread } from './memory-store';
|
||||
import { AgentMessageList, type SerializedMessageList } from './message-list';
|
||||
import { fromAiFinishReason, fromAiMessages } from './messages';
|
||||
|
|
@ -57,7 +56,7 @@ import {
|
|||
toAiSdkProviderTools,
|
||||
toAiSdkTools,
|
||||
} from './tool-adapter';
|
||||
import { parseWorkingMemory, WorkingMemoryStreamFilter } from './working-memory';
|
||||
import { buildWorkingMemoryTool } from './working-memory';
|
||||
import { AgentEvent } from '../types/runtime/event';
|
||||
import type {
|
||||
AgentPersistenceOptions,
|
||||
|
|
@ -75,19 +74,6 @@ import type {
|
|||
import type { JSONObject, JSONValue } from '../types/utils/json';
|
||||
import { isZodSchema } from '../utils/zod';
|
||||
|
||||
const logger = createFilteredLogger();
|
||||
|
||||
/** Type guard for text content parts in LLM messages. */
|
||||
function isTextPart(part: unknown): part is { type: 'text'; text: string } {
|
||||
return (
|
||||
typeof part === 'object' &&
|
||||
part !== null &&
|
||||
'type' in part &&
|
||||
(part as Record<string, unknown>).type === 'text' &&
|
||||
'text' in part
|
||||
);
|
||||
}
|
||||
|
||||
export interface AgentRuntimeConfig {
|
||||
name: string;
|
||||
model: ModelConfig;
|
||||
|
|
@ -102,6 +88,7 @@ export interface AgentRuntimeConfig {
|
|||
structured: boolean;
|
||||
schema?: z.ZodObject<z.ZodRawShape>;
|
||||
scope?: 'resource' | 'thread';
|
||||
instruction?: string;
|
||||
};
|
||||
semanticRecall?: SemanticRecallConfig;
|
||||
structuredOutput?: z.ZodType;
|
||||
|
|
@ -628,7 +615,7 @@ export class AgentRuntime {
|
|||
runId?: string,
|
||||
): Promise<GenerateResult> {
|
||||
const { model, toolMap, aiTools, providerOptions, hasTools, outputSpec } =
|
||||
this.buildLoopContext(options);
|
||||
this.buildLoopContext({ ...options, persistence: options?.persistence });
|
||||
|
||||
let totalUsage: TokenUsage | undefined;
|
||||
let lastFinishReason: FinishReason = 'stop';
|
||||
|
|
@ -760,19 +747,6 @@ export class AgentRuntime {
|
|||
);
|
||||
}
|
||||
|
||||
// Extract and strip working memory from assistant response
|
||||
if (
|
||||
this.config.workingMemory &&
|
||||
this.config.memory?.saveWorkingMemory &&
|
||||
options?.persistence
|
||||
) {
|
||||
this.extractAndPersistWorkingMemory(list, {
|
||||
threadId: options.persistence.threadId,
|
||||
resourceId: options.persistence.resourceId,
|
||||
scope: this.config.workingMemory?.scope ?? 'resource',
|
||||
});
|
||||
}
|
||||
|
||||
await this.saveToMemory(list, options);
|
||||
await this.flushTelemetry(options);
|
||||
|
||||
|
|
@ -850,22 +824,10 @@ export class AgentRuntime {
|
|||
runId?: string,
|
||||
): Promise<void> {
|
||||
const { model, toolMap, aiTools, providerOptions, hasTools, outputSpec } =
|
||||
this.buildLoopContext(options);
|
||||
|
||||
// Wrap writer with working memory filter if configured
|
||||
const wmParamsStream = this.resolveWorkingMemoryParams(options?.persistence);
|
||||
const wmFilter = wmParamsStream?.persistFn
|
||||
? new WorkingMemoryStreamFilter(writer, async (content: string) => {
|
||||
await wmParamsStream.persistFn(content);
|
||||
})
|
||||
: undefined;
|
||||
this.buildLoopContext({ ...options, persistence: options?.persistence });
|
||||
|
||||
const writeChunk = async (chunk: StreamChunk): Promise<void> => {
|
||||
if (wmFilter) {
|
||||
await wmFilter.write(chunk);
|
||||
} else {
|
||||
await writer.write(chunk);
|
||||
}
|
||||
await writer.write(chunk);
|
||||
};
|
||||
|
||||
let totalUsage: TokenUsage | undefined;
|
||||
|
|
@ -877,7 +839,6 @@ export class AgentRuntime {
|
|||
const closeStreamWithError = async (error: unknown, status: AgentRunState): Promise<void> => {
|
||||
await this.cleanupRun(runId);
|
||||
this.updateState({ status });
|
||||
if (wmFilter) await wmFilter.flush();
|
||||
await writer.write({ type: 'error', error });
|
||||
await writer.write({ type: 'finish', finishReason: 'error' });
|
||||
await writer.close();
|
||||
|
|
@ -1065,8 +1026,6 @@ export class AgentRuntime {
|
|||
this.emitTurnEnd(newMessages, extractToolResults(list.responseDelta()));
|
||||
}
|
||||
|
||||
if (wmFilter) await wmFilter.flush();
|
||||
|
||||
const costUsage = this.applyCost(totalUsage);
|
||||
const parentCost = costUsage?.cost ?? 0;
|
||||
const subCost = collectedSubAgentUsage.reduce((sum, s) => sum + (s.usage.cost ?? 0), 0);
|
||||
|
|
@ -1083,19 +1042,6 @@ export class AgentRuntime {
|
|||
});
|
||||
|
||||
try {
|
||||
// Extract and strip working memory from assistant response
|
||||
if (
|
||||
this.config.workingMemory &&
|
||||
this.config.memory?.saveWorkingMemory &&
|
||||
options?.persistence
|
||||
) {
|
||||
this.extractAndPersistWorkingMemory(list, {
|
||||
threadId: options.persistence.threadId,
|
||||
resourceId: options.persistence.resourceId,
|
||||
scope: this.config.workingMemory?.scope ?? 'resource',
|
||||
});
|
||||
}
|
||||
|
||||
await this.saveToMemory(list, options);
|
||||
|
||||
if (this.config.titleGeneration && options?.persistence && this.config.memory) {
|
||||
|
|
@ -1187,43 +1133,6 @@ export class AgentRuntime {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract <working_memory> tags from the last assistant message in the turn delta,
|
||||
* strip them from the message, and persist the working memory content.
|
||||
*/
|
||||
private extractAndPersistWorkingMemory(
|
||||
list: AgentMessageList,
|
||||
params: { threadId: string; resourceId: string; scope: 'resource' | 'thread' },
|
||||
): void {
|
||||
const delta = list.responseDelta();
|
||||
for (let i = delta.length - 1; i >= 0; i--) {
|
||||
const msg = delta[i];
|
||||
if (!isLlmMessage(msg) || msg.role !== 'assistant') continue;
|
||||
for (const part of msg.content) {
|
||||
if (!isTextPart(part)) continue;
|
||||
const { cleanText, workingMemory } = parseWorkingMemory(part.text);
|
||||
if (workingMemory !== null) {
|
||||
// Validate structured working memory if schema is configured
|
||||
if (this.config.workingMemory?.structured && this.config.workingMemory.schema) {
|
||||
try {
|
||||
this.config.workingMemory.schema.parse(JSON.parse(workingMemory));
|
||||
} catch {
|
||||
// Validation failed — keep previous state, still strip tags
|
||||
part.text = cleanText;
|
||||
return;
|
||||
}
|
||||
}
|
||||
part.text = cleanText;
|
||||
// Fire-and-forget persist
|
||||
this.config.memory!.saveWorkingMemory!(params, workingMemory).catch((error: unknown) => {
|
||||
logger.warn('Failed to persist working memory', { error });
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Build the providerOptions object for thinking/reasoning config. */
|
||||
private buildThinkingProviderOptions(): Record<string, Record<string, unknown>> | undefined {
|
||||
if (!this.config.thinking) return undefined;
|
||||
|
|
@ -1691,13 +1600,19 @@ export class AgentRuntime {
|
|||
}
|
||||
|
||||
/** Build common LLM call dependencies shared by both the generate and stream loops. */
|
||||
private buildLoopContext(execOptions?: ExecutionOptions) {
|
||||
const aiTools = toAiSdkTools(this.config.tools);
|
||||
private buildLoopContext(
|
||||
execOptions?: ExecutionOptions & { persistence?: AgentPersistenceOptions },
|
||||
) {
|
||||
const wmTool = this.buildWorkingMemoryToolForRun(execOptions?.persistence);
|
||||
const allUserTools = wmTool
|
||||
? [...(this.config.tools ?? []), wmTool]
|
||||
: (this.config.tools ?? []);
|
||||
const aiTools = toAiSdkTools(allUserTools);
|
||||
const aiProviderTools = toAiSdkProviderTools(this.config.providerTools);
|
||||
const allTools = { ...aiTools, ...aiProviderTools };
|
||||
return {
|
||||
model: createModel(this.config.model),
|
||||
toolMap: buildToolMap(this.config.tools),
|
||||
toolMap: buildToolMap(allUserTools),
|
||||
aiTools: allTools,
|
||||
providerOptions: this.buildCallProviderOptions(execOptions?.providerOptions),
|
||||
hasTools: Object.keys(allTools).length > 0,
|
||||
|
|
@ -1707,6 +1622,20 @@ export class AgentRuntime {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the updateWorkingMemory BuiltTool for the current run.
|
||||
* Returns undefined when working memory is not configured or persistence is unavailable.
|
||||
*/
|
||||
private buildWorkingMemoryToolForRun(persistence: AgentPersistenceOptions | undefined) {
|
||||
const wmParams = this.resolveWorkingMemoryParams(persistence);
|
||||
if (!wmParams) return undefined;
|
||||
return buildWorkingMemoryTool({
|
||||
structured: wmParams.structured,
|
||||
schema: wmParams.schema,
|
||||
persist: wmParams.persistFn,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist a suspended run state and update the current state snapshot.
|
||||
* Returns the runId (reuses existingRunId when resuming to prevent dangling runs).
|
||||
|
|
@ -1804,6 +1733,7 @@ export class AgentRuntime {
|
|||
template: wmParams.template,
|
||||
structured: wmParams.structured,
|
||||
state: wmState,
|
||||
...(wmParams.instruction !== undefined && { instruction: wmParams.instruction }),
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -1832,6 +1762,7 @@ export class AgentRuntime {
|
|||
template: this.config.workingMemory.template,
|
||||
structured: this.config.workingMemory.structured,
|
||||
schema: this.config.workingMemory.schema,
|
||||
instruction: this.config.workingMemory.instruction,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ export interface WorkingMemoryContext {
|
|||
structured: boolean;
|
||||
/** The current persisted state, or null if not yet loaded. Falls back to template. */
|
||||
state: string | null;
|
||||
/** Custom instruction text. When absent the default instruction is used. */
|
||||
instruction?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -144,10 +146,11 @@ export class AgentMessageList {
|
|||
const wmInstruction = buildWorkingMemoryInstruction(
|
||||
this.workingMemory.template,
|
||||
this.workingMemory.structured,
|
||||
this.workingMemory.instruction,
|
||||
);
|
||||
const wmState = this.workingMemory.state ?? this.workingMemory.template;
|
||||
systemPrompt +=
|
||||
wmInstruction + '\n\nCurrent working memory state:\n```\n' + wmState + '\n```';
|
||||
wmInstruction + '\n\nCurrent working memory state:\n```\n' + wmState + '\n```\n';
|
||||
}
|
||||
|
||||
const systemMessage: ModelMessage = instructionProviderOptions
|
||||
|
|
|
|||
|
|
@ -1,58 +1,48 @@
|
|||
import type { z } from 'zod';
|
||||
import { z } from 'zod';
|
||||
|
||||
import type { StreamChunk } from '../types';
|
||||
import { createFilteredLogger } from './logger';
|
||||
|
||||
const logger = createFilteredLogger();
|
||||
import type { BuiltTool } from '../types';
|
||||
|
||||
type ZodObjectSchema = z.ZodObject<z.ZodRawShape>;
|
||||
|
||||
const OPEN_TAG = '<working_memory>';
|
||||
const CLOSE_TAG = '</working_memory>';
|
||||
export const UPDATE_WORKING_MEMORY_TOOL_NAME = 'updateWorkingMemory';
|
||||
|
||||
/**
|
||||
* Extract working memory content from an LLM response.
|
||||
* Returns the clean text (tags stripped) and the extracted working memory (or null).
|
||||
* The default instruction block injected into the system prompt when working memory
|
||||
* is configured. Exported so callers can reference it when building custom instructions.
|
||||
*/
|
||||
export function parseWorkingMemory(text: string): {
|
||||
cleanText: string;
|
||||
workingMemory: string | null;
|
||||
} {
|
||||
const openIdx = text.indexOf(OPEN_TAG);
|
||||
if (openIdx === -1) return { cleanText: text, workingMemory: null };
|
||||
|
||||
const closeIdx = text.indexOf(CLOSE_TAG, openIdx);
|
||||
if (closeIdx === -1) return { cleanText: text, workingMemory: null };
|
||||
|
||||
const contentStart = openIdx + OPEN_TAG.length;
|
||||
const rawContent = text.slice(contentStart, closeIdx);
|
||||
const workingMemory = rawContent.replace(/^\n/, '').replace(/\n$/, '');
|
||||
|
||||
const before = text.slice(0, openIdx).replace(/\n$/, '');
|
||||
const after = text.slice(closeIdx + CLOSE_TAG.length).replace(/^\n/, '');
|
||||
const cleanText = (before + (after ? '\n' + after : '')).trim();
|
||||
|
||||
return { cleanText, workingMemory };
|
||||
}
|
||||
export const WORKING_MEMORY_DEFAULT_INSTRUCTION = [
|
||||
'You have persistent working memory that survives across conversations.',
|
||||
'Your current working memory state is shown below.',
|
||||
`When you learn new information about the user or conversation that should be remembered, call the \`${UPDATE_WORKING_MEMORY_TOOL_NAME}\` tool.`,
|
||||
'Only call it when something has actually changed — do NOT call it if nothing new was learned.',
|
||||
].join('\n');
|
||||
|
||||
/**
|
||||
* Generate the system prompt instruction for working memory.
|
||||
* Tells the LLM to call the updateWorkingMemory tool when it has new information to persist.
|
||||
*
|
||||
* @param template - The working memory template or schema.
|
||||
* @param structured - Whether the working memory is structured (JSON schema).
|
||||
* @param instruction - Custom instruction text to replace the default. Defaults to
|
||||
* {@link WORKING_MEMORY_DEFAULT_INSTRUCTION}.
|
||||
*/
|
||||
export function buildWorkingMemoryInstruction(template: string, structured: boolean): string {
|
||||
export function buildWorkingMemoryInstruction(
|
||||
template: string,
|
||||
structured: boolean,
|
||||
instruction?: string,
|
||||
): string {
|
||||
const format = structured
|
||||
? 'Emit the updated state as valid JSON matching the schema'
|
||||
? 'The memory argument must be valid JSON matching the schema'
|
||||
: 'Update the template with any new information learned';
|
||||
|
||||
const body = instruction ?? WORKING_MEMORY_DEFAULT_INSTRUCTION;
|
||||
|
||||
return [
|
||||
'',
|
||||
'## Working Memory',
|
||||
'',
|
||||
'You have persistent working memory that survives across conversations.',
|
||||
'The current state will be shown to you in a system message.',
|
||||
'IMPORTANT: Always respond to the user first with your normal reply.',
|
||||
`Then, at the very end of your response, emit your updated working memory inside ${OPEN_TAG}...${CLOSE_TAG} tags on a new line.`,
|
||||
`${format}. If nothing changed, emit the current state unchanged.`,
|
||||
'The working memory block must be the last thing in your response, after your reply to the user.',
|
||||
body,
|
||||
`${format}.`,
|
||||
'',
|
||||
'Current template:',
|
||||
'```',
|
||||
|
|
@ -73,111 +63,51 @@ export function templateFromSchema(schema: ZodObjectSchema): string {
|
|||
return JSON.stringify(obj, null, 2);
|
||||
}
|
||||
|
||||
type PersistFn = (content: string) => Promise<void>;
|
||||
export interface WorkingMemoryToolConfig {
|
||||
/** Whether this is structured (Zod-schema-driven) working memory. */
|
||||
structured: boolean;
|
||||
/** Zod schema for structured working memory input validation. */
|
||||
schema?: ZodObjectSchema;
|
||||
/** Called with the serialized working memory string to persist it. */
|
||||
persist: (content: string) => Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a stream writer to intercept <working_memory> tags from text-delta chunks.
|
||||
* All non-text-delta chunks pass through unchanged.
|
||||
* Text inside the tags is buffered and persisted when the closing tag is detected.
|
||||
* Build the updateWorkingMemory BuiltTool that the agent calls to persist working memory.
|
||||
*
|
||||
* For freeform working memory the input schema is `{ memory: string }`.
|
||||
* For structured working memory the input schema is the configured Zod object schema,
|
||||
* whose values are serialized to JSON before persisting.
|
||||
*/
|
||||
export class WorkingMemoryStreamFilter {
|
||||
private writer: WritableStreamDefaultWriter<StreamChunk>;
|
||||
|
||||
private persist: PersistFn;
|
||||
|
||||
private state: 'normal' | 'inside' = 'normal';
|
||||
|
||||
private buffer = '';
|
||||
|
||||
private pendingText = '';
|
||||
|
||||
constructor(writer: WritableStreamDefaultWriter<StreamChunk>, persist: PersistFn) {
|
||||
this.writer = writer;
|
||||
this.persist = persist;
|
||||
export function buildWorkingMemoryTool(config: WorkingMemoryToolConfig): BuiltTool {
|
||||
if (config.structured && config.schema) {
|
||||
const schema = config.schema;
|
||||
return {
|
||||
name: UPDATE_WORKING_MEMORY_TOOL_NAME,
|
||||
description:
|
||||
'Update your persistent working memory with new information about the user or conversation. Only call this when something has actually changed.',
|
||||
inputSchema: schema,
|
||||
handler: async (input: unknown) => {
|
||||
const content = JSON.stringify(input, null, 2);
|
||||
await config.persist(content);
|
||||
return { success: true, message: 'Working memory updated.' };
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async write(chunk: StreamChunk): Promise<void> {
|
||||
if (chunk.type !== 'text-delta') {
|
||||
await this.writer.write(chunk);
|
||||
return;
|
||||
}
|
||||
const freeformSchema = z.object({
|
||||
memory: z.string().describe('The updated working memory content.'),
|
||||
});
|
||||
|
||||
this.pendingText += chunk.delta;
|
||||
|
||||
while (this.pendingText.length > 0) {
|
||||
if (this.state === 'normal') {
|
||||
const openIdx = this.pendingText.indexOf(OPEN_TAG);
|
||||
if (openIdx === -1) {
|
||||
// No full open tag found. Check if the tail is a valid prefix of OPEN_TAG.
|
||||
const lastLt = this.pendingText.lastIndexOf('<');
|
||||
if (
|
||||
lastLt !== -1 &&
|
||||
this.pendingText.length - lastLt < OPEN_TAG.length &&
|
||||
OPEN_TAG.startsWith(this.pendingText.slice(lastLt))
|
||||
) {
|
||||
// Potential partial tag at end — forward everything before it, hold the rest
|
||||
if (lastLt > 0) {
|
||||
await this.writer.write({
|
||||
type: 'text-delta',
|
||||
delta: this.pendingText.slice(0, lastLt),
|
||||
});
|
||||
}
|
||||
this.pendingText = this.pendingText.slice(lastLt);
|
||||
} else {
|
||||
// No partial tag concern — forward everything
|
||||
await this.writer.write({ type: 'text-delta', delta: this.pendingText });
|
||||
this.pendingText = '';
|
||||
}
|
||||
break;
|
||||
}
|
||||
// Forward text before the tag
|
||||
if (openIdx > 0) {
|
||||
await this.writer.write({
|
||||
type: 'text-delta',
|
||||
delta: this.pendingText.slice(0, openIdx),
|
||||
});
|
||||
}
|
||||
this.state = 'inside';
|
||||
this.pendingText = this.pendingText.slice(openIdx + OPEN_TAG.length);
|
||||
this.buffer = '';
|
||||
} else {
|
||||
// Inside tag — look for closing tag
|
||||
const closeIdx = this.pendingText.indexOf(CLOSE_TAG);
|
||||
if (closeIdx === -1) {
|
||||
// Check if the tail is a valid prefix of CLOSE_TAG — hold it back
|
||||
const lastLt = this.pendingText.lastIndexOf('<');
|
||||
if (
|
||||
lastLt !== -1 &&
|
||||
this.pendingText.length - lastLt < CLOSE_TAG.length &&
|
||||
CLOSE_TAG.startsWith(this.pendingText.slice(lastLt))
|
||||
) {
|
||||
this.buffer += this.pendingText.slice(0, lastLt);
|
||||
this.pendingText = this.pendingText.slice(lastLt);
|
||||
} else {
|
||||
this.buffer += this.pendingText;
|
||||
this.pendingText = '';
|
||||
}
|
||||
break;
|
||||
}
|
||||
this.buffer += this.pendingText.slice(0, closeIdx);
|
||||
this.pendingText = this.pendingText.slice(closeIdx + CLOSE_TAG.length);
|
||||
this.state = 'normal';
|
||||
const content = this.buffer.replace(/^\n/, '').replace(/\n$/, '');
|
||||
this.persist(content).catch((error: unknown) => {
|
||||
logger.warn('Failed to persist working memory', { error });
|
||||
});
|
||||
this.buffer = '';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async flush(): Promise<void> {
|
||||
if (this.state === 'normal' && this.pendingText.length > 0) {
|
||||
await this.writer.write({ type: 'text-delta', delta: this.pendingText });
|
||||
}
|
||||
// Reset all state so the filter is clean for reuse after abort/completion.
|
||||
this.pendingText = '';
|
||||
this.buffer = '';
|
||||
this.state = 'normal';
|
||||
}
|
||||
return {
|
||||
name: UPDATE_WORKING_MEMORY_TOOL_NAME,
|
||||
description:
|
||||
'Update your persistent working memory with new information about the user or conversation. Only call this when something has actually changed.',
|
||||
inputSchema: freeformSchema,
|
||||
handler: async (input: unknown) => {
|
||||
const { memory } = input as z.infer<typeof freeformSchema>;
|
||||
await config.persist(memory);
|
||||
return { success: true, message: 'Working memory updated.' };
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,6 +37,8 @@ export class Memory {
|
|||
|
||||
private workingMemoryScope: 'resource' | 'thread' = 'resource';
|
||||
|
||||
private workingMemoryInstruction?: string;
|
||||
|
||||
private memoryBackend?: BuiltMemory;
|
||||
|
||||
private titleGenerationConfig?: TitleGenerationConfig;
|
||||
|
|
@ -102,6 +104,26 @@ export class Memory {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override the default instruction text injected into the system prompt for working memory.
|
||||
*
|
||||
* The instruction tells the model when and how to call the `updateWorkingMemory` tool.
|
||||
* When omitted, `WORKING_MEMORY_DEFAULT_INSTRUCTION` is used.
|
||||
*
|
||||
* Example:
|
||||
* ```typescript
|
||||
* import { WORKING_MEMORY_DEFAULT_INSTRUCTION } from '@n8n/agents';
|
||||
*
|
||||
* memory.instruction(
|
||||
* WORKING_MEMORY_DEFAULT_INSTRUCTION + '\nAlways update after every user message.',
|
||||
* );
|
||||
* ```
|
||||
*/
|
||||
instruction(text: string): this {
|
||||
this.workingMemoryInstruction = text;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable automatic title generation for new threads.
|
||||
*
|
||||
|
|
@ -167,12 +189,18 @@ export class Memory {
|
|||
structured: true,
|
||||
schema: this.workingMemorySchema,
|
||||
scope: this.workingMemoryScope,
|
||||
...(this.workingMemoryInstruction !== undefined && {
|
||||
instruction: this.workingMemoryInstruction,
|
||||
}),
|
||||
};
|
||||
} else if (this.workingMemoryTemplate !== undefined) {
|
||||
workingMemory = {
|
||||
template: this.workingMemoryTemplate,
|
||||
structured: false,
|
||||
scope: this.workingMemoryScope,
|
||||
...(this.workingMemoryInstruction !== undefined && {
|
||||
instruction: this.workingMemoryInstruction,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -114,6 +114,11 @@ export interface MemoryConfig {
|
|||
structured: boolean;
|
||||
schema?: z.ZodObject<z.ZodRawShape>;
|
||||
scope: 'resource' | 'thread';
|
||||
/**
|
||||
* Custom instruction text injected into the system prompt in place of the default.
|
||||
* When omitted the runtime uses {@link WORKING_MEMORY_DEFAULT_INSTRUCTION}.
|
||||
*/
|
||||
instruction?: string;
|
||||
};
|
||||
semanticRecall?: SemanticRecallConfig;
|
||||
titleGeneration?: TitleGenerationConfig;
|
||||
|
|
|
|||
Loading…
Reference in a new issue