fix: add AbortSignal support to message bus requests and improve error handling for tool confirmation flows

This commit is contained in:
Chandan K T 2026-04-13 14:35:17 +00:00
parent f8bcd93eb1
commit 07c98ca294
5 changed files with 84 additions and 59 deletions

View file

@ -30,8 +30,10 @@ describe('MessageBus', () => {
const errorHandler = vi.fn();
messageBus.on('error', errorHandler);
// @ts-expect-error - Testing invalid message
await messageBus.publish({ invalid: 'message' });
await expect(
// @ts-expect-error - Testing invalid message
messageBus.publish({ invalid: 'message' }),
).rejects.toThrow('Invalid message structure');
expect(errorHandler).toHaveBeenCalledWith(
expect.objectContaining({
@ -44,11 +46,13 @@ describe('MessageBus', () => {
const errorHandler = vi.fn();
messageBus.on('error', errorHandler);
// @ts-expect-error - Testing missing correlationId
await messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_REQUEST,
toolCall: { name: 'test' },
});
await expect(
// @ts-expect-error - Testing missing correlationId
messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_REQUEST,
toolCall: { name: 'test' },
}),
).rejects.toThrow('Invalid message structure');
expect(errorHandler).toHaveBeenCalled();
});
@ -251,8 +255,10 @@ describe('MessageBus', () => {
correlationId: '123',
};
// Should not throw
await expect(messageBus.publish(request)).resolves.not.toThrow();
// Should throw
await expect(messageBus.publish(request)).rejects.toThrow(
'Policy check failed',
);
// Should emit error
expect(errorHandler).toHaveBeenCalledWith(
@ -280,7 +286,9 @@ describe('MessageBus', () => {
const requestPromise = subagentBus.request<
ToolConfirmationRequest,
ToolConfirmationResponse
>(request, MessageBusType.TOOL_CONFIRMATION_RESPONSE, 2000);
>(request, MessageBusType.TOOL_CONFIRMATION_RESPONSE, {
timeoutMs: 2000,
});
// Wait for request on root bus and respond
await new Promise<void>((resolve) => {
@ -323,7 +331,9 @@ describe('MessageBus', () => {
const requestPromise = subagentBus2.request<
ToolConfirmationRequest,
ToolConfirmationResponse
>(request, MessageBusType.TOOL_CONFIRMATION_RESPONSE, 2000);
>(request, MessageBusType.TOOL_CONFIRMATION_RESPONSE, {
timeoutMs: 2000,
});
await new Promise<void>((resolve) => {
messageBus.subscribe<ToolConfirmationRequest>(

View file

@ -143,7 +143,9 @@ export class MessageBus extends EventEmitter {
this.emitMessage(message);
}
} catch (error) {
debugLogger.error(`[MESSAGE_BUS] publish failed: ${error}`);
this.emit('error', error);
throw error;
}
}
@ -209,18 +211,35 @@ export class MessageBus extends EventEmitter {
async request<TRequest extends Message, TResponse extends Message>(
request: Omit<TRequest, 'correlationId'>,
responseType: TResponse['type'],
timeoutMs: number = 60000,
options: { timeoutMs?: number; signal?: AbortSignal } = {},
): Promise<TResponse> {
const { timeoutMs = 60000, signal } = options;
const correlationId = randomUUID();
return new Promise<TResponse>((resolve, reject) => {
const timeoutId = setTimeout(() => {
const timeoutId =
timeoutMs > 0
? setTimeout(() => {
cleanup();
reject(
new Error(`Request timed out waiting for ${responseType}`),
);
}, timeoutMs)
: undefined;
const onAbort = () => {
cleanup();
reject(new Error(`Request timed out waiting for ${responseType}`));
}, timeoutMs);
reject(new Error('Operation cancelled'));
};
if (signal?.aborted) {
return onAbort();
}
signal?.addEventListener('abort', onAbort, { once: true });
const cleanup = () => {
clearTimeout(timeoutId);
if (timeoutId) clearTimeout(timeoutId);
signal?.removeEventListener('abort', onAbort);
this.unsubscribe(responseType, responseHandler);
};

View file

@ -335,7 +335,8 @@ async function waitForConfirmation(
)
.catch((error) => {
debugLogger.warn('Error waiting for confirmation via IDE', error);
throw error;
// Return a never-resolving promise so the race continues with the bus
return new Promise<ConfirmationResult>(() => {});
});
return await Promise.race([busPromise, idePromise]);

View file

@ -33,6 +33,7 @@ import {
type AnyDeclarativeTool,
} from '../tools/tools.js';
import { getToolSuggestion } from '../utils/tool-utils.js';
import { debugLogger } from '../utils/debugLogger.js';
import { runInDevTraceSpan } from '../telemetry/trace.js';
import { logToolCall } from '../telemetry/loggers.js';
import { ToolCallEvent } from '../telemetry/types.js';
@ -173,8 +174,10 @@ export class Scheduler {
confirmed: false,
requiresUserConfirmation: true,
})
.catch(() => {
// Error updating confirmation response, swallowed intentionally
.catch((error) => {
debugLogger.error(
`Failed to publish tool confirmation response: ${error}`,
);
});
};

View file

@ -14,6 +14,7 @@ import type { AnsiOutput } from '../utils/terminalSerializer.js';
import type { MessageBus } from '../confirmation-bus/message-bus.js';
import { isRecord } from '../utils/markdownUtils.js';
import { randomUUID } from 'node:crypto';
import { debugLogger } from '../utils/debugLogger.js';
import {
MessageBusType,
type ToolConfirmationRequest,
@ -23,7 +24,6 @@ import {
import { ApprovalMode } from '../policy/types.js';
import type { SubagentProgress } from '../agents/types.js';
/**
/**
* Supported decisions for forcing tool execution behavior.
*/
@ -305,31 +305,6 @@ export abstract class BaseToolInvocation<
return;
}
let timeoutId: NodeJS.Timeout | null = null;
let unsubscribe: (() => void) | null = null;
const cleanup = () => {
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
if (unsubscribe) {
unsubscribe();
unsubscribe = null;
}
abortSignal.removeEventListener('abort', abortHandler);
};
const abortHandler = () => {
cleanup();
resolve('deny');
};
if (abortSignal.aborted) {
resolve('deny');
return;
}
const responseHandler = (response: ToolConfirmationResponse) => {
if (response.correlationId === correlationId) {
cleanup();
@ -343,30 +318,47 @@ export abstract class BaseToolInvocation<
}
};
abortSignal.addEventListener('abort', abortHandler, { once: true });
timeoutId = setTimeout(() => {
cleanup();
resolve('ask_user'); // Default to ask_user on timeout
}, 30000);
this.messageBus.subscribe(
MessageBusType.TOOL_CONFIRMATION_RESPONSE,
responseHandler,
);
unsubscribe = () => {
const unsubscribe = () => {
this.messageBus?.unsubscribe(
MessageBusType.TOOL_CONFIRMATION_RESPONSE,
responseHandler,
);
};
const cleanup = () => {
abortSignal.removeEventListener('abort', abortHandler);
unsubscribe();
};
const abortHandler = () => {
cleanup();
resolve('deny');
};
if (abortSignal.aborted) {
resolve('deny');
return;
}
abortSignal.addEventListener('abort', abortHandler, { once: true });
this.messageBus.subscribe(
MessageBusType.TOOL_CONFIRMATION_RESPONSE,
responseHandler,
);
try {
this.messageBus.publish(request).catch(() => {
this.messageBus.publish(request).catch((err) => {
debugLogger.error(
`Failed to publish tool confirmation request: ${err}`,
);
cleanup();
resolve('allow');
});
} catch {
} catch (err) {
debugLogger.error(
`Failed to publish tool confirmation request: ${err}`,
);
cleanup();
resolve('allow');
}