♻️ refactor: unify retry logic to async-retry (#10579)

* ♻️ refactor: unify retry logic to async-retry

- Refactor MCPService.listTools() to use async-retry with exponential backoff
- Refactor asyncifyPolling() to use async-retry internally while maintaining the same API
- Add async-retry as dependency to root package and model-runtime package

🔗 Related: LOBE-1370

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

*  test: update MCPService.listTools tests for async-retry

- Update test expectation: throw original error when retries exceeded
- Remove skipCache parameter test (now handled internally by async-retry)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Arvin Xu 2025-12-03 19:14:40 +08:00 committed by GitHub
parent d15c845213
commit 95f31bc57c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 166 additions and 156 deletions

View file

@ -200,6 +200,7 @@
"ahooks": "^3.9.6",
"antd": "^5.29.1",
"antd-style": "^3.7.1",
"async-retry": "^1.3.3",
"better-auth": "^1.4.3",
"brotli-wasm": "^3.0.1",
"chroma-js": "^3.2.0",
@ -330,6 +331,7 @@
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/react": "^16.3.0",
"@testing-library/user-event": "^14.6.1",
"@types/async-retry": "^1.4.9",
"@types/chroma-js": "^3.1.2",
"@types/crypto-js": "^4.2.2",
"@types/debug": "^4.1.12",

View file

@ -17,6 +17,7 @@
"@lobechat/const": "workspace:*",
"@lobechat/types": "workspace:*",
"@lobechat/utils": "workspace:*",
"async-retry": "^1.3.3",
"debug": "^4.4.3",
"model-bank": "workspace:*",
"openai": "^4.104.0"

View file

@ -1,3 +1,5 @@
import retry from 'async-retry';
export interface TaskResult<T> {
data?: T;
error?: any;
@ -23,14 +25,14 @@ export interface AsyncifyPollingOptions<T, R> {
checkStatus: (result: T) => TaskResult<R>;
// Retry configuration
initialInterval?: number;
initialInterval?: number;
// Optional logger
logger?: {
debug?: (...args: any[]) => void;
error?: (...args: any[]) => void;
};
};
// Default 1.5
maxConsecutiveFailures?: number;
maxConsecutiveFailures?: number;
// Default 500ms
maxInterval?: number; // Default 3
maxRetries?: number; // Default Infinity
@ -42,6 +44,24 @@ export interface AsyncifyPollingOptions<T, R> {
pollingQuery: () => Promise<T>;
}
// Internal error class to signal that polling should continue
class PendingError extends Error {
constructor() {
super('Task is pending, continue polling');
this.name = 'PendingError';
}
}
// Internal error class to signal that task has failed and should not retry
class TaskFailedError extends Error {
originalError: any;
constructor(error: any) {
super(error instanceof Error ? error.message : String(error));
this.name = 'TaskFailedError';
this.originalError = error;
}
}
/**
* Convert polling pattern to async/await pattern
*
@ -62,114 +82,117 @@ export async function asyncifyPolling<T, R>(options: AsyncifyPollingOptions<T, R
logger,
} = options;
let retries = 0;
let consecutiveFailures = 0;
while (retries < maxRetries) {
let pollingResult: T;
// async-retry uses Infinity for retries when maxRetries is Infinity
// but we need to handle this case properly
const retriesConfig = maxRetries === Infinity ? 1_000_000 : maxRetries - 1;
try {
// Execute polling function
pollingResult = await pollingQuery();
try {
return await retry(
async (bail, attemptNumber) => {
const retries = attemptNumber - 1;
// Reset consecutive failures counter on successful execution
consecutiveFailures = 0;
} catch (error) {
// Polling function execution failed (network error, etc.)
consecutiveFailures++;
try {
// Execute polling function
const pollingResult = await pollingQuery();
logger?.error?.(
`Failed to execute polling function (attempt ${retries + 1}/${maxRetries === Infinity ? '∞' : maxRetries}, consecutive failures: ${consecutiveFailures}/${maxConsecutiveFailures}):`,
error,
);
// Reset consecutive failures counter on successful execution
consecutiveFailures = 0;
// Handle custom error processing if provided
if (onPollingError) {
const errorResult = onPollingError({
consecutiveFailures,
error,
retries,
});
// Check task status
const statusResult = checkStatus(pollingResult);
if (!errorResult.isContinuePolling) {
// Custom error handler decided to stop polling
throw errorResult.error || error;
}
logger?.debug?.(`Task status: ${statusResult.status} (attempt ${attemptNumber})`);
// Custom error handler decided to continue polling
logger?.debug?.('Custom error handler decided to continue polling');
} else {
// Default behavior: check if maximum consecutive failures reached
if (consecutiveFailures >= maxConsecutiveFailures) {
throw new Error(
`Failed to execute polling function after ${consecutiveFailures} consecutive attempts: ${error}`,
switch (statusResult.status) {
case 'success': {
return statusResult.data as R;
}
case 'failed': {
// Task logic failed, throw error immediately (not counted as consecutive failure)
bail(new TaskFailedError(statusResult.error || new Error('Task failed')));
// This return is never reached due to bail, but needed for type safety
return undefined as R;
}
default: {
// 'pending' or unknown status - continue polling by throwing PendingError
throw new PendingError();
}
}
} catch (error) {
// Re-throw internal errors that should be handled by async-retry
if (error instanceof PendingError) {
throw error;
}
// Polling function execution failed (network error, etc.)
consecutiveFailures++;
logger?.error?.(
`Failed to execute polling function (attempt ${attemptNumber}/${maxRetries === Infinity ? '∞' : maxRetries}, consecutive failures: ${consecutiveFailures}/${maxConsecutiveFailures}):`,
error,
);
// Handle custom error processing if provided
if (onPollingError) {
const errorResult = onPollingError({
consecutiveFailures,
error,
retries,
});
if (!errorResult.isContinuePolling) {
// Custom error handler decided to stop polling
bail(errorResult.error || (error as Error));
return undefined as R;
}
// Custom error handler decided to continue polling
logger?.debug?.('Custom error handler decided to continue polling');
throw error; // Rethrow to trigger retry
} else {
// Default behavior: check if maximum consecutive failures reached
if (consecutiveFailures >= maxConsecutiveFailures) {
bail(
new Error(
`Failed to execute polling function after ${consecutiveFailures} consecutive attempts: ${error}`,
),
);
return undefined as R;
}
}
// Rethrow to trigger retry
throw error;
}
}
// Wait before retry and continue to next loop iteration
if (retries < maxRetries - 1) {
const currentInterval = Math.min(
initialInterval * Math.pow(backoffMultiplier, retries),
maxInterval,
);
logger?.debug?.(`Waiting ${currentInterval}ms before next retry`);
await new Promise((resolve) => {
setTimeout(resolve, currentInterval);
});
}
retries++;
continue;
},
{
factor: backoffMultiplier,
maxTimeout: maxInterval,
minTimeout: initialInterval,
onRetry: (error, attempt) => {
if (!(error instanceof PendingError)) {
logger?.debug?.(`Retrying after error (attempt ${attempt})`);
}
},
randomize: false, // Disable jitter for predictable intervals
retries: retriesConfig,
},
);
} catch (error) {
// Handle TaskFailedError by throwing the original error
if (error instanceof TaskFailedError) {
throw error.originalError;
}
// Check task status
const statusResult = checkStatus(pollingResult);
logger?.debug?.(`Task status: ${statusResult.status} (attempt ${retries + 1})`);
switch (statusResult.status) {
case 'success': {
return statusResult.data as R;
}
case 'failed': {
// Task logic failed, throw error immediately (not counted as consecutive failure)
throw statusResult.error || new Error('Task failed');
}
case 'pending': {
// Continue polling
break;
}
default: {
// Unknown status, treat as pending
break;
}
// Handle max retries exceeded
if (error instanceof PendingError) {
throw new Error(`Task timeout after ${maxRetries} attempts`);
}
// Wait before next retry if not the last attempt
if (retries < maxRetries - 1) {
// Calculate dynamic retry interval with exponential backoff
const currentInterval = Math.min(
initialInterval * Math.pow(backoffMultiplier, retries),
maxInterval,
);
logger?.debug?.(`Waiting ${currentInterval}ms before next retry`);
// Wait for retry interval
await new Promise((resolve) => {
setTimeout(resolve, currentInterval);
});
}
retries++;
throw error;
}
// Maximum retries reached
throw new Error(`Task timeout after ${maxRetries} attempts`);
}

View file

@ -324,12 +324,13 @@ describe('MCPService', () => {
expect(result).toHaveLength(1);
});
it('should throw TRPCError when NoValidSessionId retry exceeds limit', async () => {
it('should throw original error when NoValidSessionId retry exceeds limit', async () => {
// Fail more than 3 times
mockClient.listTools.mockRejectedValue(new Error('NoValidSessionId'));
await expect(mcpService.listTools(mockParams)).rejects.toThrow(TRPCError);
expect(mockClient.listTools).toHaveBeenCalledTimes(5); // initial + 4 retry attempts (last one fails condition)
await expect(mcpService.listTools(mockParams)).rejects.toThrow('NoValidSessionId');
// async-retry: 1 initial + 3 retries = 4 attempts
expect(mockClient.listTools).toHaveBeenCalledTimes(4);
});
it('should throw TRPCError on other errors without retry', async () => {
@ -340,23 +341,6 @@ describe('MCPService', () => {
expect(mockClient.listTools).toHaveBeenCalledTimes(1);
});
it('should pass skipCache option to getClient', async () => {
const mockTools = [
{
name: 'tool1',
description: 'Test tool',
inputSchema: { type: 'object' },
},
];
mockClient.listTools.mockResolvedValue(mockTools);
await mcpService.listTools(mockParams, { skipCache: true });
// Verify getClient was called with skipCache
expect(mcpService.getClient).toHaveBeenCalledWith(mockParams, true);
});
it('should throw TRPCError with correct error message', async () => {
const error = new Error('Custom error message');
mockClient.listTools.mockRejectedValue(error);

View file

@ -4,6 +4,7 @@ import { LobeChatPluginApi, LobeChatPluginManifest, PluginSchema } from '@lobehu
import { DeploymentOption } from '@lobehub/market-sdk';
import { McpError } from '@modelcontextprotocol/sdk/types.js';
import { TRPCError } from '@trpc/server';
import retry from 'async-retry';
import debug from 'debug';
import {
@ -36,48 +37,47 @@ export class MCPService {
// --- MCP Interaction ---
// listTools now accepts MCPClientParams
async listTools(
params: MCPClientParams,
{ retryTime, skipCache }: { retryTime?: number; skipCache?: boolean } = {},
): Promise<LobeChatPluginApi[]> {
const client = await this.getClient(params, skipCache); // Get client using params
async listTools(params: MCPClientParams): Promise<LobeChatPluginApi[]> {
const loggableParams = this.sanitizeForLogging(params);
log(`Listing tools using client for params: %O`, loggableParams);
try {
const result = await client.listTools();
log(
`Tools listed successfully for params: %O, result count: %d`,
loggableParams,
result.length,
);
return result.map<LobeChatPluginApi>((item) => ({
// Assuming identifier is the unique name/id
description: item.description,
name: item.name,
parameters: item.inputSchema as PluginSchema,
}));
} catch (error) {
let nextReTryTime = retryTime || 0;
return retry(
async (bail, attemptNumber) => {
// Skip cache on retry attempts
const skipCache = attemptNumber > 1;
const client = await this.getClient(params, skipCache);
log(`Listing tools using client for params: %O (attempt ${attemptNumber})`, loggableParams);
if ((error as Error).message === 'NoValidSessionId' && nextReTryTime <= 3) {
if (!nextReTryTime) {
nextReTryTime = 1;
} else {
nextReTryTime += 1;
try {
const result = await client.listTools();
log(
`Tools listed successfully for params: %O, result count: %d`,
loggableParams,
result.length,
);
return result.map<LobeChatPluginApi>((item) => ({
// Assuming identifier is the unique name/id
description: item.description,
name: item.name,
parameters: item.inputSchema as PluginSchema,
}));
} catch (error) {
// Only retry for NoValidSessionId errors
if ((error as Error).message !== 'NoValidSessionId') {
console.error(`Error listing tools for params %O:`, loggableParams, error);
bail(
new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: `Error listing tools from MCP server: ${(error as Error).message}`,
}),
);
return []; // This line will never be reached due to bail, but needed for type safety
}
throw error; // Rethrow to trigger retry
}
return this.listTools(params, { retryTime: nextReTryTime, skipCache: true });
}
console.error(`Error listing tools for params %O:`, loggableParams, error);
// Propagate a TRPCError for better handling upstream
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: `Error listing tools from MCP server: ${(error as Error).message}`,
});
}
},
{ maxRetryTime: 1000, minTimeout: 100, retries: 3 },
);
}
// listTools now accepts MCPClientParams