diff --git a/packages/cli/src/ui/components/LoadingIndicator.test.tsx b/packages/cli/src/ui/components/LoadingIndicator.test.tsx index 003a0dc070..1f529ef195 100644 --- a/packages/cli/src/ui/components/LoadingIndicator.test.tsx +++ b/packages/cli/src/ui/components/LoadingIndicator.test.tsx @@ -316,6 +316,71 @@ describe('', () => { unmount(); }); + it('should prioritize capacity retry phrase over thought.subject', async () => { + const props = { + thought: { + subject: 'Thinking about something else...', + description: 'A description', + }, + currentLoadingPhrase: 'Model capacity exhausted. Retrying (attempt 1)...', + elapsedTime: 5, + }; + const { lastFrame, unmount, waitUntilReady } = await renderWithContext( + , + StreamingState.Responding, + ); + await waitUntilReady(); + const output = lastFrame(); + expect(output).toContain( + 'Model capacity exhausted. Retrying (attempt 1)...', + ); + expect(output).not.toContain('Thinking about something else...'); + unmount(); + }); + + it('should prioritize request timeout retry phrase over thought.subject', async () => { + const props = { + thought: { + subject: 'Thinking about something else...', + description: 'A description', + }, + currentLoadingPhrase: 'Request timed out. Retrying (attempt 2)...', + elapsedTime: 5, + }; + const { lastFrame, unmount, waitUntilReady } = await renderWithContext( + , + StreamingState.Responding, + ); + await waitUntilReady(); + const output = lastFrame(); + expect(output).toContain('Request timed out. Retrying (attempt 2)...'); + expect(output).not.toContain('Thinking about something else...'); + unmount(); + }); + + it('should prioritize generic retry phrase over thought.subject', async () => { + const props = { + thought: { + subject: 'Thinking about something else...', + description: 'A description', + }, + currentLoadingPhrase: + 'Trying to reach gemini-3-flash-preview (Attempt 2/10)', + elapsedTime: 5, + }; + const { lastFrame, unmount, waitUntilReady } = await renderWithContext( + , + StreamingState.Responding, + ); + await waitUntilReady(); + const output = lastFrame(); + expect(output).toContain( + 'Trying to reach gemini-3-flash-preview (Attempt 2/10)', + ); + expect(output).not.toContain('Thinking about something else...'); + unmount(); + }); + it('should not display thought indicator for non-thought loading phrases', async () => { const { lastFrame, unmount, waitUntilReady } = await renderWithContext( = ({ return null; } - // Prioritize the interactive shell waiting phrase over the thought subject - // because it conveys an actionable state for the user (waiting for input). + // Prioritize real status phrases over the thought subject because they convey + // actionable state for the user (waiting for input, backend retrying). + const isRetryPhrase = + currentLoadingPhrase?.startsWith(CAPACITY_RETRY_PREFIX) === true || + currentLoadingPhrase?.startsWith(REQUEST_TIMEOUT_RETRY_PREFIX) === true || + currentLoadingPhrase?.startsWith(GENERIC_RETRY_PREFIX) === true; const primaryText = currentLoadingPhrase === INTERACTIVE_SHELL_WAITING_PHRASE ? currentLoadingPhrase - : thought?.subject - ? (thoughtLabel ?? thought.subject) - : currentLoadingPhrase || - (streamingState === StreamingState.Responding - ? 'Thinking...' - : undefined); + : isRetryPhrase + ? currentLoadingPhrase + : thought?.subject + ? (thoughtLabel ?? thought.subject) + : currentLoadingPhrase || + (streamingState === StreamingState.Responding + ? 'Thinking...' + : undefined); const cancelAndTimerContent = showCancelAndTimer && streamingState === StreamingState.Responding @@ -91,7 +101,7 @@ export const LoadingIndicator: React.FC = ({ if (inline) { return ( - + = ({ {primaryText && ( - + {primaryText} {primaryText === INTERACTIVE_SHELL_WAITING_PHRASE && ( @@ -148,7 +163,12 @@ export const LoadingIndicator: React.FC = ({ {primaryText && ( - + {primaryText} {primaryText === INTERACTIVE_SHELL_WAITING_PHRASE && ( diff --git a/packages/cli/src/ui/components/StatusRow.test.tsx b/packages/cli/src/ui/components/StatusRow.test.tsx index 5f14254f4b..22f962431b 100644 --- a/packages/cli/src/ui/components/StatusRow.test.tsx +++ b/packages/cli/src/ui/components/StatusRow.test.tsx @@ -78,6 +78,49 @@ describe('', () => { expect(output).toContain('Tip: Test Tip'); }); + it('passes retry loading phrase through to the loading indicator', async () => { + (useComposerStatus as Mock).mockReturnValue({ + isInteractiveShellWaiting: false, + showLoadingIndicator: true, + showTips: true, + showWit: false, + modeContentObj: null, + showMinimalContext: false, + }); + + const uiState: Partial = { + ...defaultUiState, + currentTip: 'This stale tip should not be rendered', + currentLoadingPhrase: 'Model capacity exhausted. Retrying (attempt 1)...', + thought: { subject: 'Thinking...' } as unknown as ThoughtSummary, + elapsedTime: 5, + }; + + const { lastFrame, waitUntilReady } = await renderWithProviders( + , + { + width: 100, + uiState, + }, + ); + + await waitUntilReady(); + const output = lastFrame(); + expect(output).toContain( + 'Model capacity exhausted. Retrying (attempt 1)...', + ); + expect(output).not.toContain('Thinking...'); + expect(output).not.toContain('This stale tip should not be rendered'); + expect(output).not.toContain('? for shortcuts'); + }); + it('renders correctly when interactive shell is waiting', async () => { (useComposerStatus as Mock).mockReturnValue({ isInteractiveShellWaiting: true, diff --git a/packages/cli/src/ui/components/StatusRow.tsx b/packages/cli/src/ui/components/StatusRow.tsx index f162481ce5..0900b1171d 100644 --- a/packages/cli/src/ui/components/StatusRow.tsx +++ b/packages/cli/src/ui/components/StatusRow.tsx @@ -42,6 +42,18 @@ const LAYOUT = { COLLISION_GAP: 10, }; +const RETRY_STATUS_PREFIXES = [ + 'Model capacity exhausted. Retrying', + 'Request timed out. Retrying', + 'Trying to reach ', +]; + +function isRetryStatusPhrase(phrase: string | undefined): boolean { + return RETRY_STATUS_PREFIXES.some( + (prefix) => phrase?.startsWith(prefix) === true, + ); +} + interface StatusRowProps { showUiDetails: boolean; isNarrow: boolean; @@ -59,6 +71,7 @@ export const StatusNode: React.FC<{ showWit: boolean; thought: ThoughtSummary | null; elapsedTime: number; + currentLoadingPhrase: string | undefined; currentWittyPhrase: string | undefined; activeHooks: ActiveHook[]; showLoadingIndicator: boolean; @@ -69,6 +82,7 @@ export const StatusNode: React.FC<{ showWit, thought, elapsedTime, + currentLoadingPhrase: uiCurrentLoadingPhrase, currentWittyPhrase, activeHooks, showLoadingIndicator, @@ -130,6 +144,9 @@ export const StatusNode: React.FC<{ currentLoadingPhrase = GENERIC_WORKING_LABEL; } } else { + currentLoadingPhrase = uiCurrentLoadingPhrase + ? stripAnsi(uiCurrentLoadingPhrase) + : undefined; // Sanitize thought subject to prevent terminal injection currentThought = thought ? { ...thought, subject: stripAnsi(thought.subject) } @@ -211,6 +228,10 @@ export const StatusRow: React.FC = ({ }, []); const tipContentStr = (() => { + if (isRetryStatusPhrase(uiState.currentLoadingPhrase)) { + return undefined; + } + // 1. Proactive Tip (Priority) if ( showTips && @@ -263,6 +284,7 @@ export const StatusRow: React.FC = ({ showWit={showWit} thought={uiState.thought} elapsedTime={uiState.elapsedTime} + currentLoadingPhrase={uiState.currentLoadingPhrase} currentWittyPhrase={uiState.currentWittyPhrase} activeHooks={uiState.activeHooks} showLoadingIndicator={showLoadingIndicator} diff --git a/packages/cli/src/ui/hooks/useGeminiStream.test.tsx b/packages/cli/src/ui/hooks/useGeminiStream.test.tsx index 1fa4250e71..f78e294dbd 100644 --- a/packages/cli/src/ui/hooks/useGeminiStream.test.tsx +++ b/packages/cli/src/ui/hooks/useGeminiStream.test.tsx @@ -1836,7 +1836,7 @@ describe('useGeminiStream', () => { }); describe('Retry Handling', () => { - it('should ignore retryStatus updates when not responding', async () => { + it('should retain retryStatus updates even when responding state has not caught up', async () => { const { result } = await renderHookWithDefaults(); const retryPayload = { @@ -1850,7 +1850,7 @@ describe('useGeminiStream', () => { coreEvents.emit(CoreEvent.RetryAttempt, retryPayload); }); - expect(result.current.retryStatus).toBeNull(); + expect(result.current.retryStatus).toEqual(retryPayload); }); it('should reset retryStatus when isResponding becomes false', async () => { diff --git a/packages/cli/src/ui/hooks/useGeminiStream.ts b/packages/cli/src/ui/hooks/useGeminiStream.ts index ac63733fa9..efd3190341 100644 --- a/packages/cli/src/ui/hooks/useGeminiStream.ts +++ b/packages/cli/src/ui/hooks/useGeminiStream.ts @@ -239,6 +239,25 @@ export const useGeminiStream = ( isShellFocused?: boolean, consumeUserHint?: () => string | null, ) => { + const isStreamDebugEnabled = useMemo(() => { + const value = process.env['GEMINI_STREAM_DEBUG']; + return value === '1' || value === 'true'; + }, []); + + const streamDebug = useCallback( + (message: string, details?: unknown) => { + if (!isStreamDebugEnabled) { + return; + } + if (details === undefined) { + debugLogger.debug(`[STREAM_DEBUG] ${message}`); + } else { + debugLogger.debug(`[STREAM_DEBUG] ${message}`, details); + } + }, + [isStreamDebugEnabled], + ); + const [initError, setInitError] = useState(null); const [retryStatus, setRetryStatus] = useState( null, @@ -277,7 +296,20 @@ export const useGeminiStream = ( useEffect(() => { const handleRetryAttempt = (payload: RetryAttemptPayload) => { - if (turnCancelledRef.current || !isRespondingRef.current) { + streamDebug('retry attempt event received', { + attempt: payload.attempt, + maxAttempts: payload.maxAttempts, + delayMs: Math.round(payload.delayMs), + error: payload.error, + isResponding: isRespondingRef.current, + model: payload.model, + turnCancelled: turnCancelledRef.current, + }); + if (turnCancelledRef.current) { + streamDebug('retry attempt event ignored after cancellation', { + attempt: payload.attempt, + model: payload.model, + }); return; } setRetryStatus(payload); @@ -286,7 +318,7 @@ export const useGeminiStream = ( return () => { coreEvents.off(CoreEvent.RetryAttempt, handleRetryAttempt); }; - }, [isRespondingRef]); + }, [isRespondingRef, streamDebug]); const [ toolCalls, @@ -415,6 +447,42 @@ export const useGeminiStream = ( () => calculateStreamingState(isResponding, toolCalls), [isResponding, toolCalls], ); + const activePtyId = + activeShellPtyId ?? activeBackgroundExecutionId ?? undefined; + + const summarizeToolCallStatuses = useCallback(() => { + const summary: Record = {}; + for (const toolCall of toolCalls) { + summary[toolCall.status] = (summary[toolCall.status] ?? 0) + 1; + } + return summary; + }, [toolCalls]); + + const previousStreamingStateRef = useRef(null); + useEffect(() => { + const currentActivePtyId = + activeShellPtyId ?? activeBackgroundExecutionId ?? undefined; + if ( + !isStreamDebugEnabled || + previousStreamingStateRef.current === streamingState + ) { + return; + } + previousStreamingStateRef.current = streamingState; + streamDebug(`streamingState=${streamingState}`, { + isResponding, + toolStatuses: summarizeToolCallStatuses(), + hasActivePty: currentActivePtyId != null, + }); + }, [ + isStreamDebugEnabled, + streamDebug, + streamingState, + isResponding, + summarizeToolCallStatuses, + activeShellPtyId, + activeBackgroundExecutionId, + ]); // Reset tracking when a new batch of tools starts useEffect(() => { @@ -716,9 +784,6 @@ export const useGeminiStream = ( onComplete: (result: { userSelection: 'disable' | 'keep' }) => void; } | null>(null); - const activePtyId = - activeShellPtyId ?? activeBackgroundExecutionId ?? undefined; - const prevActiveShellPtyIdRef = useRef(null); useEffect(() => { if ( @@ -1464,7 +1529,9 @@ export const useGeminiStream = ( ): Promise => { let geminiMessageBuffer = ''; const toolCallRequests: ToolCallRequestInfo[] = []; + streamDebug('processing stream events: begin'); for await (const event of stream) { + streamDebug(`event=${event.type}`); if ( event.type !== ServerGeminiEventType.Thought && thoughtRef.current !== null @@ -1475,10 +1542,16 @@ export const useGeminiStream = ( switch (event.type) { case ServerGeminiEventType.Thought: setLastGeminiActivityTime(Date.now()); + streamDebug('thought event received', { + subject: event.value.subject, + }); handleThoughtEvent(event.value, userMessageTimestamp); break; case ServerGeminiEventType.Content: setLastGeminiActivityTime(Date.now()); + streamDebug('content event received', { + length: event.value.length, + }); geminiMessageBuffer = handleContentEvent( event.value, geminiMessageBuffer, @@ -1486,6 +1559,10 @@ export const useGeminiStream = ( ); break; case ServerGeminiEventType.ToolCallRequest: + streamDebug('tool_call_request received', { + callId: event.value.callId, + name: event.value.name, + }); toolCallRequests.push(event.value); break; case ServerGeminiEventType.UserCancelled: @@ -1527,6 +1604,9 @@ export const useGeminiStream = ( ); break; case ServerGeminiEventType.Finished: + streamDebug('finished event received', { + reason: event.value?.reason, + }); handleFinishedEvent(event, userMessageTimestamp); break; case ServerGeminiEventType.Citation: @@ -1552,15 +1632,21 @@ export const useGeminiStream = ( } } if (toolCallRequests.length > 0) { + streamDebug('scheduling tool calls from stream', { + count: toolCallRequests.length, + names: toolCallRequests.map((request) => request.name), + }); if (pendingHistoryItemRef.current) { addItem(pendingHistoryItemRef.current, userMessageTimestamp); setPendingHistoryItem(null); } await scheduleToolCalls(toolCallRequests, signal); } + streamDebug('processing stream events: complete'); return StreamProcessingStatus.Completed; }, [ + streamDebug, handleContentEvent, handleThoughtEvent, thoughtRef, @@ -1602,8 +1688,13 @@ export const useGeminiStream = ( streamingState === StreamingState.Responding || streamingState === StreamingState.WaitingForConfirmation) && !options?.isContinuation - ) + ) { + streamDebug('submit skipped because stream is already active', { + isResponding: isRespondingRef.current, + streamingState, + }); return; + } const queryId = `${Date.now()}-${Math.random()}`; activeQueryIdRef.current = queryId; @@ -1624,6 +1715,7 @@ export const useGeminiStream = ( abortControllerRef.current = new AbortController(); const abortSignal = abortControllerRef.current.signal; turnCancelledRef.current = false; + setRetryStatus(null); if (!prompt_id) { prompt_id = config.getSessionId() + '########' + getPromptCount(); @@ -1660,12 +1752,18 @@ export const useGeminiStream = ( setIsResponding(true); setInitError(null); + streamDebug('submit started', { + isContinuation: !!options?.isContinuation, + promptId: prompt_id, + queryType: typeof queryToSend, + }); // Store query and prompt_id for potential retry on loop detection lastQueryRef.current = queryToSend; lastPromptIdRef.current = prompt_id!; try { + streamDebug('opening sendMessageStream'); const stream = geminiClient.sendMessageStream( queryToSend, abortSignal, @@ -1678,6 +1776,7 @@ export const useGeminiStream = ( userMessageTimestamp, abortSignal, ); + streamDebug('stream processing result', { processingStatus }); if (processingStatus === StreamProcessingStatus.UserCancelled) { return; @@ -1724,6 +1823,9 @@ export const useGeminiStream = ( } } catch (error: unknown) { spanMetadata.error = error; + streamDebug('submit caught error', { + error: getErrorMessage(error), + }); if (error instanceof UnauthorizedError) { onAuthError('Session expired or is unauthorized.'); } else if ( @@ -1750,6 +1852,10 @@ export const useGeminiStream = ( maybeAddLowVerbosityFailureNote(userMessageTimestamp); } } finally { + streamDebug('submit finished', { + queryId, + isLatestQuery: activeQueryIdRef.current === queryId, + }); if (activeQueryIdRef.current === queryId) { setIsResponding(false); } @@ -1762,6 +1868,7 @@ export const useGeminiStream = ( setModelSwitchedFromQuotaError, prepareQueryForGemini, processGeminiStreamEvents, + streamDebug, pendingHistoryItemRef, addItem, setPendingHistoryItem, @@ -1780,6 +1887,47 @@ export const useGeminiStream = ( ], ); + useEffect(() => { + if (!isStreamDebugEnabled) { + return; + } + + if (streamingState === StreamingState.Idle) { + return; + } + + const interval = setInterval(() => { + const currentActivePtyId = + activeShellPtyId ?? activeBackgroundExecutionId ?? undefined; + const now = Date.now(); + const activityTimestamp = Math.max( + lastGeminiActivityTime, + lastToolOutputTime, + lastShellOutputTime, + ); + const silenceMs = + activityTimestamp > 0 ? now - activityTimestamp : Number.NaN; + streamDebug('heartbeat', { + streamingState, + silenceMs: Number.isNaN(silenceMs) ? 'unknown' : silenceMs, + hasActivePty: currentActivePtyId != null, + toolStatuses: summarizeToolCallStatuses(), + }); + }, 15000); + + return () => clearInterval(interval); + }, [ + isStreamDebugEnabled, + streamDebug, + streamingState, + lastGeminiActivityTime, + lastToolOutputTime, + lastShellOutputTime, + activeShellPtyId, + activeBackgroundExecutionId, + summarizeToolCallStatuses, + ]); + const handleApprovalModeChange = useCallback( async (newApprovalMode: ApprovalMode) => { if ( diff --git a/packages/cli/src/ui/hooks/useLoadingIndicator.test.tsx b/packages/cli/src/ui/hooks/useLoadingIndicator.test.tsx index c723cf7ef5..303af08bf5 100644 --- a/packages/cli/src/ui/hooks/useLoadingIndicator.test.tsx +++ b/packages/cli/src/ui/hooks/useLoadingIndicator.test.tsx @@ -257,7 +257,7 @@ describe('useLoadingIndicator', () => { expect(result.current.currentLoadingPhrase).toBeUndefined(); }); - it('should hide low-verbosity retry status for early retry attempts', async () => { + it('should show generic retry status in low error verbosity mode for early retry attempts', async () => { const retryStatus = { model: 'gemini-pro', attempt: 1, @@ -273,12 +273,12 @@ describe('useLoadingIndicator', () => { 'low', ); - expect(result.current.currentLoadingPhrase).not.toBe( - "This is taking a bit longer, we're still on it.", + expect(result.current.currentLoadingPhrase).toBe( + 'Trying to reach gemini-pro (Attempt 2/5)', ); }); - it('should show a generic retry phrase in low error verbosity mode for later retries', async () => { + it('should show generic retry status in low error verbosity mode for later retries', async () => { const retryStatus = { model: 'gemini-pro', attempt: 2, @@ -295,7 +295,71 @@ describe('useLoadingIndicator', () => { ); expect(result.current.currentLoadingPhrase).toBe( - "This is taking a bit longer, we're still on it.", + 'Trying to reach gemini-pro (Attempt 3/5)', + ); + }); + + it('should show capacity retry phrase when model capacity is exhausted', async () => { + const retryStatus = { + model: 'gemini-pro', + attempt: 0, + maxAttempts: 5, + delayMs: 1000, + error: + 'No capacity available for model gemini-3-flash-preview on the server (MODEL_CAPACITY_EXHAUSTED)', + }; + const { result } = await renderLoadingIndicatorHook( + StreamingState.Responding, + false, + retryStatus, + ); + + expect(result.current.currentLoadingPhrase).toBe( + 'Model capacity exhausted. Retrying (attempt 1)...', + ); + }); + + it('should show capacity retry phrase in low error verbosity mode from attempt 1', async () => { + const retryStatus = { + model: 'gemini-pro', + attempt: 0, + maxAttempts: 5, + delayMs: 1000, + error: 'MODEL_CAPACITY_EXHAUSTED', + }; + const { result } = await renderLoadingIndicatorHook( + StreamingState.Responding, + false, + retryStatus, + true, + true, + 'low', + ); + + expect(result.current.currentLoadingPhrase).toBe( + 'Model capacity exhausted. Retrying (attempt 1)...', + ); + }); + + it('should show network stall retry phrase in low error verbosity mode from attempt 1', async () => { + const retryStatus = { + model: 'gemini-pro', + attempt: 0, + maxAttempts: 5, + delayMs: 1000, + error: 'ETIMEDOUT', + }; + const { result } = await renderLoadingIndicatorHook( + StreamingState.Responding, + false, + retryStatus, + true, + true, + 'low', + ); + + expect(result.current.currentLoadingPhrase).toBe( + 'Request timed out. Retrying (attempt 1)...', ); }); diff --git a/packages/cli/src/ui/hooks/useLoadingIndicator.ts b/packages/cli/src/ui/hooks/useLoadingIndicator.ts index 0e2dc9c2b1..ab032ec6f1 100644 --- a/packages/cli/src/ui/hooks/useLoadingIndicator.ts +++ b/packages/cli/src/ui/hooks/useLoadingIndicator.ts @@ -13,7 +13,10 @@ import { type RetryAttemptPayload, } from '@google/gemini-cli-core'; -const LOW_VERBOSITY_RETRY_HINT_ATTEMPT_THRESHOLD = 2; +const MODEL_CAPACITY_EXHAUSTED_PATTERN = + /MODEL_CAPACITY_EXHAUSTED|no capacity available/i; +const NETWORK_STALL_PATTERN = + /ETIMEDOUT|timed out|ECONNRESET|socket hang up|UND_ERR_(CONNECT|HEADERS|BODY)_TIMEOUT/i; export interface UseLoadingIndicatorProps { streamingState: StreamingState; @@ -33,7 +36,6 @@ export const useLoadingIndicator = ({ showTips = true, showWit = false, customWittyPhrases, - errorVerbosity = 'full', maxLength, }: UseLoadingIndicatorProps) => { const [timerResetKey, setTimerResetKey] = useState(0); @@ -79,13 +81,20 @@ export const useLoadingIndicator = ({ prevStreamingStateRef.current = streamingState; }, [streamingState, elapsedTimeFromTimer]); + const isModelCapacityRetry = + retryStatus?.error !== undefined && + MODEL_CAPACITY_EXHAUSTED_PATTERN.test(retryStatus.error); + const isNetworkStallRetry = + retryStatus?.error !== undefined && + NETWORK_STALL_PATTERN.test(retryStatus.error); + const retryPhrase = streamingState === StreamingState.Responding && retryStatus - ? errorVerbosity === 'low' - ? retryStatus.attempt >= LOW_VERBOSITY_RETRY_HINT_ATTEMPT_THRESHOLD - ? "This is taking a bit longer, we're still on it." - : null - : `Trying to reach ${getDisplayString(retryStatus.model)} (Attempt ${retryStatus.attempt + 1}/${retryStatus.maxAttempts})` + ? isModelCapacityRetry + ? `Model capacity exhausted. Retrying (attempt ${retryStatus.attempt + 1})...` + : isNetworkStallRetry + ? `Request timed out. Retrying (attempt ${retryStatus.attempt + 1})...` + : `Trying to reach ${getDisplayString(retryStatus.model)} (Attempt ${retryStatus.attempt + 1}/${retryStatus.maxAttempts})` : null; return { diff --git a/packages/core/src/code_assist/server.test.ts b/packages/core/src/code_assist/server.test.ts index 8e7f21c3b7..67429138df 100644 --- a/packages/core/src/code_assist/server.test.ts +++ b/packages/core/src/code_assist/server.test.ts @@ -40,6 +40,14 @@ function createTestServer(headers: Record = {}) { return { server, mockRequest, client }; } +function restoreEnv(key: string, originalValue: string | undefined) { + if (originalValue === undefined) { + delete process.env[key]; + } else { + process.env[key] = originalValue; + } +} + describe('CodeAssistServer', () => { beforeEach(() => { vi.resetAllMocks(); @@ -96,7 +104,7 @@ describe('CodeAssistServer', () => { }, responseType: 'json', body: expect.any(String), - signal: undefined, + signal: expect.any(AbortSignal), retryConfig: { retryDelay: 1000, retry: 3, @@ -118,6 +126,109 @@ describe('CodeAssistServer', () => { ); }); + it('should time out stalled requestPost calls with a retryable error code', async () => { + const originalTimeout = + process.env['GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS']; + process.env['GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS'] = '5'; + const { server, mockRequest } = createTestServer(); + mockRequest.mockImplementation( + ({ signal }: { signal?: AbortSignal }) => + new Promise((_resolve, reject) => { + signal?.addEventListener( + 'abort', + () => reject(signal.reason ?? new Error('aborted')), + { once: true }, + ); + }), + ); + + try { + await expect( + server.requestPost('generateContent', {}), + ).rejects.toMatchObject({ + message: 'Code Assist generateContent request timed out after 5ms', + name: 'TimeoutError', + code: 'ETIMEDOUT', + }); + } finally { + restoreEnv('GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS', originalTimeout); + } + }); + + it('should time out stalled requestStreamingPost calls with the stream timeout env var', async () => { + const originalRequestTimeout = + process.env['GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS']; + const originalStreamTimeout = + process.env['GEMINI_CODE_ASSIST_STREAM_TIMEOUT_MS']; + process.env['GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS'] = '50'; + process.env['GEMINI_CODE_ASSIST_STREAM_TIMEOUT_MS'] = '5'; + const { server, mockRequest } = createTestServer(); + mockRequest.mockImplementation( + ({ signal }: { signal?: AbortSignal }) => + new Promise((_resolve, reject) => { + signal?.addEventListener( + 'abort', + () => reject(signal.reason ?? new Error('aborted')), + { once: true }, + ); + }), + ); + + try { + await expect( + server.requestStreamingPost('streamGenerateContent', {}), + ).rejects.toMatchObject({ + message: + 'Code Assist streamGenerateContent request timed out after 5ms', + name: 'TimeoutError', + code: 'ETIMEDOUT', + }); + } finally { + restoreEnv( + 'GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS', + originalRequestTimeout, + ); + restoreEnv('GEMINI_CODE_ASSIST_STREAM_TIMEOUT_MS', originalStreamTimeout); + } + }); + + it('should fall back to the request timeout env var for streams when stream timeout is not set', async () => { + const originalRequestTimeout = + process.env['GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS']; + const originalStreamTimeout = + process.env['GEMINI_CODE_ASSIST_STREAM_TIMEOUT_MS']; + process.env['GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS'] = '5'; + delete process.env['GEMINI_CODE_ASSIST_STREAM_TIMEOUT_MS']; + const { server, mockRequest } = createTestServer(); + mockRequest.mockImplementation( + ({ signal }: { signal?: AbortSignal }) => + new Promise((_resolve, reject) => { + signal?.addEventListener( + 'abort', + () => reject(signal.reason ?? new Error('aborted')), + { once: true }, + ); + }), + ); + + try { + await expect( + server.requestStreamingPost('streamGenerateContent', {}), + ).rejects.toMatchObject({ + message: + 'Code Assist streamGenerateContent request timed out after 5ms', + name: 'TimeoutError', + code: 'ETIMEDOUT', + }); + } finally { + restoreEnv( + 'GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS', + originalRequestTimeout, + ); + restoreEnv('GEMINI_CODE_ASSIST_STREAM_TIMEOUT_MS', originalStreamTimeout); + } + }); + it('should detect error in generateContent response', async () => { const { server, mockRequest } = createTestServer(); const mockResponseData = { @@ -428,7 +539,7 @@ describe('CodeAssistServer', () => { headers: { 'Content-Type': 'application/json', }, - signal: undefined, + signal: expect.any(AbortSignal), retry: false, }); diff --git a/packages/core/src/code_assist/server.ts b/packages/core/src/code_assist/server.ts index 92fc558ebb..748703d533 100644 --- a/packages/core/src/code_assist/server.ts +++ b/packages/core/src/code_assist/server.ts @@ -64,6 +64,7 @@ import { } from './telemetry.js'; import { getClientMetadata } from './experiments/client_metadata.js'; import { InvalidChunkEvent, type LlmRole } from '../telemetry/types.js'; +import { debugLogger } from '../utils/debugLogger.js'; /** HTTP options to be used in each of the requests. */ export interface HttpOptions { /** Additional HTTP headers to be sent with the request. */ @@ -73,6 +74,109 @@ export interface HttpOptions { export const CODE_ASSIST_ENDPOINT = 'https://cloudcode-pa.googleapis.com'; export const CODE_ASSIST_API_VERSION = 'v1internal'; const GENERATE_CONTENT_RETRY_DELAY_IN_MILLISECONDS = 1000; +const DEFAULT_CODE_ASSIST_REQUEST_TIMEOUT_MS = 60000; +const DEFAULT_CODE_ASSIST_STREAM_TIMEOUT_MS = 15000; +const CODE_ASSIST_REQUEST_TIMEOUT_ENV = 'GEMINI_CODE_ASSIST_REQUEST_TIMEOUT_MS'; +const CODE_ASSIST_STREAM_TIMEOUT_ENV = 'GEMINI_CODE_ASSIST_STREAM_TIMEOUT_MS'; + +function isStreamDebugEnabled(): boolean { + const value = process.env['GEMINI_STREAM_DEBUG']; + return value === '1' || value === 'true'; +} + +function streamDebug(message: string, details?: unknown): void { + if (!isStreamDebugEnabled()) { + return; + } + + if (details === undefined) { + debugLogger.debug(`[STREAM_DEBUG] ${message}`); + } else { + debugLogger.debug(`[STREAM_DEBUG] ${message}`, details); + } +} + +function parseCodeAssistTimeoutMs( + configuredValue: string | undefined, + defaultValue: number, +): number | undefined { + if (!configuredValue) { + return defaultValue; + } + + const parsed = Number(configuredValue); + if (!Number.isFinite(parsed)) { + return defaultValue; + } + + return parsed > 0 ? parsed : undefined; +} + +function getCodeAssistRequestTimeoutMs(): number | undefined { + return parseCodeAssistTimeoutMs( + process.env[CODE_ASSIST_REQUEST_TIMEOUT_ENV], + DEFAULT_CODE_ASSIST_REQUEST_TIMEOUT_MS, + ); +} + +function getCodeAssistStreamTimeoutMs(): number | undefined { + return parseCodeAssistTimeoutMs( + process.env[CODE_ASSIST_STREAM_TIMEOUT_ENV] ?? + process.env[CODE_ASSIST_REQUEST_TIMEOUT_ENV], + DEFAULT_CODE_ASSIST_STREAM_TIMEOUT_MS, + ); +} + +function createRequestTimeoutError( + method: string, + timeoutMs: number, +): Error & { code?: string } { + const error = new Error( + `Code Assist ${method} request timed out after ${timeoutMs}ms`, + ) as Error & { code?: string }; + error.name = 'TimeoutError'; + error.code = 'ETIMEDOUT'; + return error; +} + +async function withRequestTimeout( + method: string, + signal: AbortSignal | undefined, + request: (signal?: AbortSignal) => Promise, + timeoutMs: number | undefined = getCodeAssistRequestTimeoutMs(), +): Promise { + if (timeoutMs === undefined) { + return request(signal); + } + + if (signal?.aborted) { + return request(signal); + } + + const timeoutController = new AbortController(); + let didTimeout = false; + const timeoutId = setTimeout(() => { + didTimeout = true; + timeoutController.abort(createRequestTimeoutError(method, timeoutMs)); + }, timeoutMs); + + const abortFromCaller = () => { + timeoutController.abort(signal?.reason); + }; + signal?.addEventListener('abort', abortFromCaller, { once: true }); + + try { + return await request(timeoutController.signal); + } catch (error) { + if (didTimeout && !signal?.aborted) { + throw createRequestTimeoutError(method, timeoutMs); + } + throw error; + } finally { + clearTimeout(timeoutId); + signal?.removeEventListener('abort', abortFromCaller); + } +} export class CodeAssistServer implements ContentGenerator { constructor( @@ -414,28 +518,45 @@ export class CodeAssistServer implements ContentGenerator { signal?: AbortSignal, retryDelay: number = 100, ): Promise { - const res = await this.client.request({ - url: this.getMethodUrl(method), - method: 'POST', - headers: { - 'Content-Type': 'application/json', - ...this.httpOptions.headers, - }, - responseType: 'json', - body: JSON.stringify(req), - signal, - retryConfig: { - retryDelay, - retry: 3, - noResponseRetries: 3, - statusCodesToRetry: [ - [429, 429], - [499, 499], - [500, 599], - ], - }, - }); - return res.data; + const startedAt = Date.now(); + streamDebug('code_assist requestPost started', { method }); + try { + const res = await withRequestTimeout(method, signal, (requestSignal) => + this.client.request({ + url: this.getMethodUrl(method), + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...this.httpOptions.headers, + }, + responseType: 'json', + body: JSON.stringify(req), + signal: requestSignal, + retryConfig: { + retryDelay, + retry: 3, + noResponseRetries: 3, + statusCodesToRetry: [ + [429, 429], + [499, 499], + [500, 599], + ], + }, + }), + ); + streamDebug('code_assist requestPost succeeded', { + method, + durationMs: Date.now() - startedAt, + }); + return res.data; + } catch (error) { + streamDebug('code_assist requestPost failed', { + method, + durationMs: Date.now() - startedAt, + error: error instanceof Error ? error.message : String(error), + }); + throw error; + } } private async makeGetRequest( @@ -468,21 +589,43 @@ export class CodeAssistServer implements ContentGenerator { req: object, signal?: AbortSignal, ): Promise> { - const res = await this.client.request>({ - url: this.getMethodUrl(method), - method: 'POST', - params: { - alt: 'sse', - }, - headers: { - 'Content-Type': 'application/json', - ...this.httpOptions.headers, - }, - responseType: 'stream', - body: JSON.stringify(req), - signal, - retry: false, - }); + const startedAt = Date.now(); + streamDebug('code_assist requestStreamingPost started', { method }); + let res: { data: AsyncIterable }; + try { + res = await withRequestTimeout( + method, + signal, + (requestSignal) => + this.client.request>({ + url: this.getMethodUrl(method), + method: 'POST', + params: { + alt: 'sse', + }, + headers: { + 'Content-Type': 'application/json', + ...this.httpOptions.headers, + }, + responseType: 'stream', + body: JSON.stringify(req), + signal: requestSignal, + retry: false, + }), + getCodeAssistStreamTimeoutMs(), + ); + streamDebug('code_assist requestStreamingPost opened', { + method, + durationMs: Date.now() - startedAt, + }); + } catch (error) { + streamDebug('code_assist requestStreamingPost failed', { + method, + durationMs: Date.now() - startedAt, + error: error instanceof Error ? error.message : String(error), + }); + throw error; + } return (async function* (server: CodeAssistServer): AsyncGenerator { const rl = readline.createInterface({ diff --git a/packages/core/src/utils/debugLogger.test.ts b/packages/core/src/utils/debugLogger.test.ts index 26788f4332..e1e12ed32b 100644 --- a/packages/core/src/utils/debugLogger.test.ts +++ b/packages/core/src/utils/debugLogger.test.ts @@ -79,4 +79,33 @@ describe('DebugLogger', () => { expect(warnSpy).toHaveBeenCalledWith(); expect(warnSpy).toHaveBeenCalledTimes(1); }); + + it('should only write [STREAM_DEBUG] lines when stream-only file filter is enabled', () => { + const loggerInternals = debugLogger as unknown as { + logOnlyStreamDebug: boolean; + logStream: { write: (chunk: string) => void } | undefined; + }; + const originalFilter = loggerInternals.logOnlyStreamDebug; + const originalStream = loggerInternals.logStream; + const writeSpy = vi.fn(); + const debugSpy = vi.spyOn(console, 'debug').mockImplementation(() => {}); + + loggerInternals.logOnlyStreamDebug = true; + loggerInternals.logStream = { write: writeSpy }; + + try { + debugLogger.debug('[STREAM_DEBUG] keep this line'); + debugLogger.debug('drop this line'); + + expect(debugSpy).toHaveBeenCalledTimes(2); + expect(writeSpy).toHaveBeenCalledTimes(1); + expect(writeSpy.mock.calls[0]?.[0]).toContain( + '[STREAM_DEBUG] keep this line', + ); + expect(writeSpy.mock.calls[0]?.[0]).not.toContain('drop this line'); + } finally { + loggerInternals.logOnlyStreamDebug = originalFilter; + loggerInternals.logStream = originalStream; + } + }); }); diff --git a/packages/core/src/utils/debugLogger.ts b/packages/core/src/utils/debugLogger.ts index 9c5a82c123..b550ac8af0 100644 --- a/packages/core/src/utils/debugLogger.ts +++ b/packages/core/src/utils/debugLogger.ts @@ -22,8 +22,12 @@ import * as util from 'node:util'; */ class DebugLogger { private logStream: fs.WriteStream | undefined; + private readonly logOnlyStreamDebug: boolean; constructor() { + this.logOnlyStreamDebug = + process.env['GEMINI_DEBUG_LOG_ONLY_STREAM'] === '1' || + process.env['GEMINI_DEBUG_LOG_ONLY_STREAM'] === 'true'; this.logStream = process.env['GEMINI_DEBUG_LOG_FILE'] ? fs.createWriteStream(process.env['GEMINI_DEBUG_LOG_FILE'], { flags: 'a', @@ -39,6 +43,9 @@ class DebugLogger { private writeToFile(level: string, args: unknown[]) { if (this.logStream) { const message = util.format(...args); + if (this.logOnlyStreamDebug && !message.includes('[STREAM_DEBUG]')) { + return; + } const timestamp = new Date().toISOString(); const logEntry = `[${timestamp}] [${level}] ${message}\n`; this.logStream.write(logEntry); diff --git a/packages/core/src/utils/retry.test.ts b/packages/core/src/utils/retry.test.ts index 290f14eadb..e00de7ce03 100644 --- a/packages/core/src/utils/retry.test.ts +++ b/packages/core/src/utils/retry.test.ts @@ -57,6 +57,7 @@ describe('retryWithBackoff', () => { }); afterEach(() => { + delete process.env['GEMINI_STREAM_STALL_MAX_RETRY_DELAY_MS']; vi.restoreAllMocks(); vi.useRealTimers(); }); @@ -454,6 +455,49 @@ describe('retryWithBackoff', () => { expect(mockFn).toHaveBeenCalledTimes(2); }); + it('should use a short retry delay for stream stall errors', async () => { + const setTimeoutSpy = vi.spyOn(global, 'setTimeout'); + const error = new Error('connect ETIMEDOUT'); + (error as any).code = 'ETIMEDOUT'; + const mockFn = vi + .fn() + .mockRejectedValueOnce(error) + .mockResolvedValue('success'); + + const promise = retryWithBackoff(mockFn, { + initialDelayMs: 5000, + maxDelayMs: 30000, + }); + + await vi.runAllTimersAsync(); + await expect(promise).resolves.toBe('success'); + + expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 1000); + }); + + it('should honor GEMINI_STREAM_STALL_MAX_RETRY_DELAY_MS override', async () => { + process.env['GEMINI_STREAM_STALL_MAX_RETRY_DELAY_MS'] = '250'; + const setTimeoutSpy = vi.spyOn(global, 'setTimeout'); + const error = new Error( + 'Code Assist streamGenerateContent request timed out after 10000ms', + ); + (error as any).code = 'ETIMEDOUT'; + const mockFn = vi + .fn() + .mockRejectedValueOnce(error) + .mockResolvedValue('success'); + + const promise = retryWithBackoff(mockFn, { + initialDelayMs: 5000, + maxDelayMs: 30000, + }); + + await vi.runAllTimersAsync(); + await expect(promise).resolves.toBe('success'); + + expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 250); + }); + it('should retry on undici timeout error codes (UND_ERR_HEADERS_TIMEOUT)', async () => { const error = new Error('Headers timeout error'); (error as any).code = 'UND_ERR_HEADERS_TIMEOUT'; @@ -646,6 +690,81 @@ describe('retryWithBackoff', () => { expect(calledDelayMs).toBeLessThanOrEqual(12345 * 1.2); }); + it('should cap retry delay for MODEL_CAPACITY_EXHAUSTED errors', async () => { + const setTimeoutSpy = vi.spyOn(global, 'setTimeout'); + const mockFn = vi.fn().mockImplementation(async () => { + throw new RetryableQuotaError( + 'No capacity available', + { + code: 503, + message: 'No capacity available', + details: [ + { + '@type': 'type.googleapis.com/google.rpc.ErrorInfo', + reason: 'MODEL_CAPACITY_EXHAUSTED', + domain: 'cloudcode-pa.googleapis.com', + }, + ], + } as any, + 120, + ); + }); + + const promise = retryWithBackoff(mockFn, { + maxAttempts: 2, + initialDelayMs: 100, + }); + + // eslint-disable-next-line vitest/valid-expect + const assertionPromise = expect(promise).rejects.toThrow(); + await vi.runAllTimersAsync(); + await assertionPromise; + + const calledDelayMs = setTimeoutSpy.mock.calls[0][1] as number; + expect(calledDelayMs).toBeGreaterThanOrEqual(3000); + expect(calledDelayMs).toBeLessThanOrEqual(5000 * 1.2); + }); + + it('should honor GEMINI_MODEL_CAPACITY_MAX_RETRY_DELAY_MS override', async () => { + process.env['GEMINI_MODEL_CAPACITY_MAX_RETRY_DELAY_MS'] = '2500'; + const setTimeoutSpy = vi.spyOn(global, 'setTimeout'); + const mockFn = vi.fn().mockImplementation(async () => { + throw new RetryableQuotaError( + 'No capacity available', + { + code: 503, + message: 'No capacity available', + details: [ + { + '@type': 'type.googleapis.com/google.rpc.ErrorInfo', + reason: 'MODEL_CAPACITY_EXHAUSTED', + domain: 'cloudcode-pa.googleapis.com', + }, + ], + } as any, + 120, + ); + }); + + try { + const promise = retryWithBackoff(mockFn, { + maxAttempts: 2, + initialDelayMs: 100, + }); + + // eslint-disable-next-line vitest/valid-expect + const assertionPromise = expect(promise).rejects.toThrow(); + await vi.runAllTimersAsync(); + await assertionPromise; + + const calledDelayMs = setTimeoutSpy.mock.calls[0][1] as number; + expect(calledDelayMs).toBeGreaterThanOrEqual(2500); + expect(calledDelayMs).toBeLessThanOrEqual(2500 * 1.2); + } finally { + delete process.env['GEMINI_MODEL_CAPACITY_MAX_RETRY_DELAY_MS']; + } + }); + it.each([[AuthType.USE_GEMINI], [AuthType.USE_VERTEX_AI], [undefined]])( 'should invoke onPersistent429 callback (delegating decision) for non-Google auth users (authType: %s) on TerminalQuotaError', async (authType) => { diff --git a/packages/core/src/utils/retry.ts b/packages/core/src/utils/retry.ts index a45ba0c0b0..4541f17e0e 100644 --- a/packages/core/src/utils/retry.ts +++ b/packages/core/src/utils/retry.ts @@ -61,6 +61,23 @@ const RETRYABLE_NETWORK_CODES = [ 'ERR_STREAM_PREMATURE_CLOSE', ]; +const ERROR_INFO_TYPE = 'type.googleapis.com/google.rpc.ErrorInfo'; +const MODEL_CAPACITY_EXHAUSTED_REASON = 'MODEL_CAPACITY_EXHAUSTED'; +const DEFAULT_MODEL_CAPACITY_MAX_RETRY_DELAY_MS = 3000; +const MODEL_CAPACITY_MAX_RETRY_DELAY_ENV = + 'GEMINI_MODEL_CAPACITY_MAX_RETRY_DELAY_MS'; +const DEFAULT_STREAM_STALL_MAX_RETRY_DELAY_MS = 1000; +const STREAM_STALL_MAX_RETRY_DELAY_ENV = + 'GEMINI_STREAM_STALL_MAX_RETRY_DELAY_MS'; +const STREAM_STALL_RETRY_CODES = new Set([ + 'ECONNRESET', + 'ETIMEDOUT', + 'UND_ERR_HEADERS_TIMEOUT', + 'UND_ERR_BODY_TIMEOUT', + 'UND_ERR_CONNECT_TIMEOUT', + 'ERR_STREAM_PREMATURE_CLOSE', +]); + // Node.js builds SSL error codes by prepending ERR_SSL_ to the uppercased // OpenSSL reason string with spaces replaced by underscores (see // TLSWrap::ClearOut in node/src/crypto/crypto_tls.cc). The reason string @@ -208,6 +225,69 @@ export function isRetryableError( return false; } +function getModelCapacityMaxRetryDelayMs(): number { + const configuredValue = process.env[MODEL_CAPACITY_MAX_RETRY_DELAY_ENV]; + if (!configuredValue) { + return DEFAULT_MODEL_CAPACITY_MAX_RETRY_DELAY_MS; + } + + const parsed = Number(configuredValue); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_MODEL_CAPACITY_MAX_RETRY_DELAY_MS; + } + return parsed; +} + +function getStreamStallMaxRetryDelayMs(): number { + const configuredValue = process.env[STREAM_STALL_MAX_RETRY_DELAY_ENV]; + if (!configuredValue) { + return DEFAULT_STREAM_STALL_MAX_RETRY_DELAY_MS; + } + + const parsed = Number(configuredValue); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_STREAM_STALL_MAX_RETRY_DELAY_MS; + } + return parsed; +} + +function isStreamStallError(error: unknown): boolean { + const errorCode = getNetworkErrorCode(error); + if (errorCode && STREAM_STALL_RETRY_CODES.has(errorCode)) { + return true; + } + + if (!(error instanceof Error)) { + return false; + } + + const lowerMessage = error.message.toLowerCase(); + return ( + lowerMessage.includes('timed out') || + lowerMessage.includes('timeout') || + lowerMessage.includes('socket hang up') + ); +} + +function isModelCapacityExhaustedError(error: RetryableQuotaError): boolean { + const details = error.cause?.details; + if (!Array.isArray(details)) { + return false; + } + + return details.some((detail) => { + if (typeof detail !== 'object' || detail === null) { + return false; + } + + const typedDetail = detail as { '@type'?: string; reason?: string }; + return ( + typedDetail['@type'] === ERROR_INFO_TYPE && + typedDetail.reason === MODEL_CAPACITY_EXHAUSTED_REASON + ); + }); +} + /** * Retries a function with exponential backoff and jitter. * @param fn The asynchronous function to retry. @@ -374,6 +454,32 @@ export async function retryWithBackoff( : error; } + if ( + classifiedError instanceof RetryableQuotaError && + isModelCapacityExhaustedError(classifiedError) + ) { + const maxCapacityDelayMs = getModelCapacityMaxRetryDelayMs(); + const suggestedDelayMs = + classifiedError.retryDelayMs ?? initialDelayMs; + const capacityDelayMs = Math.min( + maxCapacityDelayMs, + Math.max(initialDelayMs, suggestedDelayMs), + ); + + // Positive jitter up to +20%; keep retries short for capacity errors. + const jitter = capacityDelayMs * 0.2 * Math.random(); + const delayWithJitter = capacityDelayMs + jitter; + debugLogger.warn( + `Attempt ${attempt} failed: ${classifiedError.message}. Capacity constrained; retrying after ${Math.round(delayWithJitter)}ms (capped at ${maxCapacityDelayMs}ms).`, + ); + if (onRetry) { + onRetry(attempt, error, delayWithJitter); + } + await delay(delayWithJitter, signal); + currentDelay = initialDelayMs; + continue; + } + if ( classifiedError instanceof RetryableQuotaError && classifiedError.retryDelayMs !== undefined @@ -393,16 +499,28 @@ export async function retryWithBackoff( continue; } else { const errorStatus = getErrorStatus(error); - logRetryAttempt(attempt, error, errorStatus); - - // Exponential backoff with jitter for non-quota errors - const jitter = currentDelay * 0.3 * (Math.random() * 2 - 1); - const delayWithJitter = Math.max(0, currentDelay + jitter); + const streamStall = isStreamStallError(error); + const delayWithJitter = streamStall + ? getStreamStallMaxRetryDelayMs() + : Math.max( + 0, + currentDelay + currentDelay * 0.3 * (Math.random() * 2 - 1), + ); + if (streamStall) { + debugLogger.warn( + `Attempt ${attempt} failed. Stream stalled; retrying after ${Math.round(delayWithJitter)}ms (capped at ${getStreamStallMaxRetryDelayMs()}ms).`, + error, + ); + } else { + logRetryAttempt(attempt, error, errorStatus); + } if (onRetry) { onRetry(attempt, error, delayWithJitter); } await delay(delayWithJitter, signal); - currentDelay = Math.min(maxDelayMs, currentDelay * 2); + currentDelay = streamStall + ? initialDelayMs + : Math.min(maxDelayMs, currentDelay * 2); continue; } } @@ -417,16 +535,28 @@ export async function retryWithBackoff( } const errorStatus = getErrorStatus(error); - logRetryAttempt(attempt, error, errorStatus); - - // Exponential backoff with jitter for non-quota errors - const jitter = currentDelay * 0.3 * (Math.random() * 2 - 1); - const delayWithJitter = Math.max(0, currentDelay + jitter); + const streamStall = isStreamStallError(error); + const delayWithJitter = streamStall + ? getStreamStallMaxRetryDelayMs() + : Math.max( + 0, + currentDelay + currentDelay * 0.3 * (Math.random() * 2 - 1), + ); + if (streamStall) { + debugLogger.warn( + `Attempt ${attempt} failed. Stream stalled; retrying after ${Math.round(delayWithJitter)}ms (capped at ${getStreamStallMaxRetryDelayMs()}ms).`, + error, + ); + } else { + logRetryAttempt(attempt, error, errorStatus); + } if (onRetry) { onRetry(attempt, error, delayWithJitter); } await delay(delayWithJitter, signal); - currentDelay = Math.min(maxDelayMs, currentDelay * 2); + currentDelay = streamStall + ? initialDelayMs + : Math.min(maxDelayMs, currentDelay * 2); } }