fix(workflows): preserve MCP failure status in filtered message + observability

Address review feedback on PR #1327:

- parseMcpFailureServerNames now returns {name, segment} entries so the
  forwarded "MCP server connection failed: ..." message preserves the
  per-server status detail (e.g. "(timeout)", "(disconnected)") that the
  bare-name reconstruction was dropping.
- loadConfiguredMcpServerNames now debug-logs read failures as
  dag.mcp_filter_config_read_failed instead of swallowing them silently.
  A transient EMFILE/EBUSY at filter time would otherwise silently
  reclassify a real workflow-MCP failure as plugin noise.
- Add 4 integration tests through executeDagWorkflow covering the mixed
  workflow/plugin split, all-plugin suppression, no-mcp:-config nodes,
  and the unchanged ⚠️ warning path.
- Drop a WHAT comment above configuredMcpNames and a temporal phrase
  ("before this filter landed") that would rot on merge.
- Document the filtering boundary in guides/mcp-servers.md and add a
  troubleshooting row for users debugging silently-suppressed plugin MCPs.
This commit is contained in:
Rasmus Widing 2026-04-21 12:24:10 +03:00
parent efe6918fa3
commit 7a32dde55a
3 changed files with 167 additions and 38 deletions

View file

@ -194,8 +194,9 @@ and cannot touch the filesystem or run shell commands.
## Connection Failure Handling
MCP server connections are established when the node starts executing. If a server
fails to connect, you'll see a message like:
MCP server connections are established when the node starts executing. If a
server the **workflow** configured via `mcp:` fails to connect, you'll see a
message like:
```
MCP server connection failed: github (failed)
@ -204,6 +205,13 @@ MCP server connection failed: github (failed)
The node continues executing but without the tools from the failed server.
Check your config file path, server command, and environment variables if this happens.
User-level Claude plugin MCPs inherited from `~/.claude/` (e.g. `telegram`,
`notion`) routinely fail to connect inside the headless workflow subprocess
and are **not** surfaced here — they're not actionable for the workflow author.
They appear only in debug logs as `dag.mcp_plugin_connection_suppressed`. Run
the CLI with `--verbose` (or set `LOG_LEVEL=debug` on the server) if you need
to see them.
## Workflow Examples
### GitHub Issue Triage
@ -378,6 +386,7 @@ bun run cli workflow run archon-smart-pr-review "Review PR #123"
| `MCP config must be a JSON object` | Top-level value is array or string | Wrap in `{ "server-name": { ... } }` |
| `undefined env vars: VAR_NAME` | Environment variable not set | Export the variable or add it to your `.env` |
| `MCP server connection failed` | Server process crashed or URL unreachable | Check command/URL, test the server standalone |
| Plugin MCP missing from workflow output | User-level plugin MCPs (from `~/.claude/`) are filtered out of workflow warnings | Run with `--verbose` and look for `dag.mcp_plugin_connection_suppressed` |
| `mcp config but uses Codex` | Node resolved to Codex provider | Set `provider: claude` on the node or switch default |
| `Haiku model with MCP servers` | Haiku doesn't support tool search | Use `model: sonnet` or `model: opus` instead |

View file

@ -5759,12 +5759,15 @@ describe('executeDagWorkflow -- script nodes', () => {
// ---------------------------------------------------------------------------
describe('parseMcpFailureServerNames', () => {
it('extracts names from a well-formed message', async () => {
it('extracts entries (name + segment) from a well-formed message', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
const names = parseMcpFailureServerNames(
const entries = parseMcpFailureServerNames(
'MCP server connection failed: telegram (disconnected), github (timeout)'
);
expect(names).toEqual(['telegram', 'github']);
expect(entries).toEqual([
{ name: 'telegram', segment: 'telegram (disconnected)' },
{ name: 'github', segment: 'github (timeout)' },
]);
});
it('returns empty array for unrelated messages', async () => {
@ -5773,24 +5776,29 @@ describe('parseMcpFailureServerNames', () => {
expect(parseMcpFailureServerNames('')).toEqual([]);
});
it('deduplicates repeated entries', async () => {
it('deduplicates repeated entries (first segment wins)', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
const names = parseMcpFailureServerNames(
const entries = parseMcpFailureServerNames(
'MCP server connection failed: foo (a), foo (b), bar (c)'
);
expect(names).toEqual(['foo', 'bar']);
expect(entries).toEqual([
{ name: 'foo', segment: 'foo (a)' },
{ name: 'bar', segment: 'bar (c)' },
]);
});
it('handles a single entry without status parens gracefully', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
expect(parseMcpFailureServerNames('MCP server connection failed: solo')).toEqual(['solo']);
expect(parseMcpFailureServerNames('MCP server connection failed: solo')).toEqual([
{ name: 'solo', segment: 'solo' },
]);
});
it('drops empty segments from trailing/leading commas', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
expect(parseMcpFailureServerNames('MCP server connection failed: a (x), , b (y)')).toEqual([
'a',
'b',
{ name: 'a', segment: 'a (x)' },
{ name: 'b', segment: 'b (y)' },
]);
});
});
@ -5851,3 +5859,108 @@ describe('loadConfiguredMcpServerNames', () => {
expect(names.size).toBe(0);
});
});
// ---------------------------------------------------------------------------
// MCP plugin-noise filtering — end-to-end through executeDagWorkflow
// ---------------------------------------------------------------------------
describe('executeDagWorkflow -- MCP failure filtering', () => {
let testDir: string;
beforeEach(async () => {
testDir = join(tmpdir(), `dag-mcp-filter-${Date.now()}-${Math.random().toString(36).slice(2)}`);
const commandsDir = join(testDir, '.archon', 'commands');
await mkdir(commandsDir, { recursive: true });
await writeFile(join(commandsDir, 'my-cmd.md'), 'cmd prompt');
mockSendQueryDag.mockClear();
mockGetAgentProviderDag.mockClear();
});
afterEach(async () => {
mockGetAgentProviderDag.mockImplementation(() => ({
sendQuery: mockSendQueryDag,
getType: () => 'claude',
getCapabilities: mockClaudeCapabilities,
}));
try {
await rm(testDir, { recursive: true, force: true });
} catch {
// ignore cleanup errors
}
});
async function runWithSystemChunk(
systemContent: string,
nodeMcpPath?: string
): Promise<IWorkflowPlatform> {
mockSendQueryDag.mockImplementation(function* () {
yield { type: 'system', content: systemContent };
yield { type: 'assistant', content: 'ok' };
yield { type: 'result', sessionId: 'sess' };
});
const platform = createMockPlatform();
await executeDagWorkflow(
createMockDeps(),
platform,
'conv-mcp-filter',
testDir,
{
name: 'mcp-filter-test',
nodes: [{ id: 'review', command: 'my-cmd', ...(nodeMcpPath ? { mcp: nodeMcpPath } : {}) }],
},
makeWorkflowRun(),
'claude',
undefined,
join(testDir, 'artifacts'),
join(testDir, 'logs'),
'main',
'docs/',
minimalConfig
);
return platform;
}
function mcpMessages(platform: IWorkflowPlatform): string[] {
const calls = (platform.sendMessage as Mock<typeof platform.sendMessage>).mock.calls;
return calls
.map(c => c[1] as string)
.filter(m => m.startsWith('MCP server connection failed:') || m.startsWith('⚠️'));
}
it('forwards only workflow-configured failures and preserves status detail', async () => {
await writeFile(join(testDir, 'mcp.json'), JSON.stringify({ 'workflow-server': {} }));
const platform = await runWithSystemChunk(
'MCP server connection failed: workflow-server (timeout), telegram (disconnected)',
'mcp.json'
);
const sent = mcpMessages(platform);
expect(sent).toEqual(['MCP server connection failed: workflow-server (timeout)']);
});
it('suppresses MCP message entirely when all failures are user plugins', async () => {
await writeFile(join(testDir, 'mcp.json'), JSON.stringify({ 'workflow-server': {} }));
const platform = await runWithSystemChunk(
'MCP server connection failed: telegram (disconnected), notion (timeout)',
'mcp.json'
);
expect(mcpMessages(platform)).toEqual([]);
});
it('suppresses everything when node has no mcp: config (all failures are plugin noise)', async () => {
const platform = await runWithSystemChunk(
'MCP server connection failed: telegram (disconnected)'
);
expect(mcpMessages(platform)).toEqual([]);
});
it('forwards ⚠️ provider warnings verbatim', async () => {
const platform = await runWithSystemChunk('⚠️ Haiku does not support MCP');
expect(mcpMessages(platform)).toEqual(['⚠️ Haiku does not support MCP']);
});
});

View file

@ -81,23 +81,33 @@ function getLog(): ReturnType<typeof createLogger> {
const MCP_FAILURE_PREFIX = 'MCP server connection failed: ';
/** A failed MCP server entry parsed from the SDK message. `segment` is the
* original substring (e.g. `"telegram (disconnected)"`) so callers can
* reconstruct a filtered message without losing the status detail. */
export interface McpFailureEntry {
name: string;
segment: string;
}
/**
* Parse the SDK's "MCP server connection failed: a (status), b (status)"
* message into the list of failed server names (ordered, deduped). Best-effort
* malformed or prefix-free messages return an empty array.
* message. Best-effort malformed or prefix-free messages return `[]`.
* Entries are ordered and deduped by name; the segment of the first
* occurrence wins.
*/
export function parseMcpFailureServerNames(message: string): string[] {
export function parseMcpFailureServerNames(message: string): McpFailureEntry[] {
if (!message.startsWith(MCP_FAILURE_PREFIX)) return [];
const seen = new Set<string>();
const names: string[] = [];
for (const entry of message.slice(MCP_FAILURE_PREFIX.length).split(', ')) {
const name = entry.split(' (')[0]?.trim();
const entries: McpFailureEntry[] = [];
for (const raw of message.slice(MCP_FAILURE_PREFIX.length).split(', ')) {
const segment = raw.trim();
const name = segment.split(' (')[0]?.trim();
if (name && !seen.has(name)) {
seen.add(name);
names.push(name);
entries.push({ name, segment });
}
}
return names;
return entries;
}
/**
@ -108,6 +118,10 @@ export function parseMcpFailureServerNames(message: string): string[] {
* user) from user-plugin failures (silent debug log). We intentionally do not
* validate or env-expand here the provider owns full loading and will
* surface its own parse errors via the warning channel if the file is broken.
*
* Read failures are debug-logged so a transient I/O error (EMFILE/EBUSY) that
* leaves us with an empty set and silently reclassifies a real workflow-MCP
* failure as plugin noise is at least observable.
*/
export async function loadConfiguredMcpServerNames(
nodeMcpPath: string | undefined,
@ -122,7 +136,8 @@ export async function loadConfiguredMcpServerNames(
return new Set();
}
return new Set(Object.keys(parsed as Record<string, unknown>));
} catch {
} catch (err) {
getLog().debug({ err, nodeMcpPath, fullPath }, 'dag.mcp_filter_config_read_failed');
return new Set();
}
}
@ -538,9 +553,6 @@ async function executeNodeInternal(
const nodeStartTime = Date.now();
const nodeContext: SendMessageContext = { workflowId: workflowRun.id, nodeName: node.id };
// Pre-compute the workflow-configured MCP server names. Used by the system
// message handler below to distinguish workflow-owned MCP failures (surface
// to the user) from user-plugin noise (debug log, suppress).
const configuredMcpNames = await loadConfiguredMcpServerNames(node.mcp, cwd);
getLog().info({ nodeId: node.id, provider }, 'dag_node_started');
@ -871,23 +883,18 @@ async function executeNodeInternal(
break; // Result is the "I'm done" signal — don't wait for subprocess to exit
} else if (msg.type === 'system' && msg.content) {
// Providers yield system chunks for user-actionable issues (missing env
// vars, Haiku+MCP, structured output failures, etc.). Two sub-cases:
//
// 1. MCP connection failures — only surface servers the *workflow*
// configured via `mcp:`. User-level plugin MCPs (e.g. `telegram`
// inherited from `~/.claude/`) often fail to connect in the
// headless subprocess and produced spurious warnings before this
// filter landed. Plugin failures are debug-logged, not shown.
//
// 2. Other provider warnings (⚠️) — always surface; these are things
// the workflow author can act on.
// vars, Haiku+MCP, structured output failures, etc.). MCP-failure
// chunks need filtering: user-level plugin MCPs inherited from
// `~/.claude/` (e.g. `telegram`) routinely fail to connect inside the
// headless subprocess and aren't actionable for the workflow author.
// Other warnings (⚠️) are always actionable and surface verbatim.
if (msg.content.startsWith(MCP_FAILURE_PREFIX)) {
const failedNames = parseMcpFailureServerNames(msg.content);
const workflowFailures = failedNames.filter(n => configuredMcpNames.has(n));
const pluginFailures = failedNames.filter(n => !configuredMcpNames.has(n));
const failedEntries = parseMcpFailureServerNames(msg.content);
const workflowFailures = failedEntries.filter(e => configuredMcpNames.has(e.name));
const pluginFailures = failedEntries.filter(e => !configuredMcpNames.has(e.name));
if (workflowFailures.length > 0) {
const filteredMsg = `${MCP_FAILURE_PREFIX}${workflowFailures.join(', ')}`;
const filteredMsg = `${MCP_FAILURE_PREFIX}${workflowFailures.map(e => e.segment).join(', ')}`;
getLog().warn(
{ nodeId: node.id, systemContent: filteredMsg },
'dag.provider_warning_forwarded'
@ -907,7 +914,7 @@ async function executeNodeInternal(
}
if (pluginFailures.length > 0) {
getLog().debug(
{ nodeId: node.id, pluginFailures },
{ nodeId: node.id, pluginFailures: pluginFailures.map(e => e.name) },
'dag.mcp_plugin_connection_suppressed'
);
}