🐛 fix(gateway): clean up paused server op after human approve/reject (#13860)

* 🐛 fix(gateway): clean up paused server op after human approve/reject

In Gateway mode with userInterventionConfig.approvalMode='ask', the
paused execServerAgentRuntime op was never released — the loading
spinner kept spinning after the user approved, rejected, or
reject-and-continued, and reject-only silently did nothing on the
server.

- ToolAction.rejectToolCall now delegates to chatStore.rejectToolCalling
  so the Gateway resume op actually fires with decision='rejected';
  previously it only mutated local intervention state and the server's
  paused op waited forever.
- AgentRuntimeCoordinator treats waiting_for_human as end-of-stream so
  the coordinator emits agent_runtime_end when request_human_approve
  flips state, letting the client close the paused op via the normal
  terminal-event path.
- conversationControl adds #completeRunningServerOps as a fallback
  guard in the approve/reject/reject-continue Gateway branches — if
  the server-side signal is delayed or missing, the client still clears
  the orphan op before starting the resume op.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix(gateway): defer paused-op cleanup until resume starts successfully

If `executeGatewayAgent` failed (transient network/auth/server error),
the paused `execServerAgentRuntime` op was already marked completed
locally by the pre-call `#completeRunningServerOps`. Retries would
then see no running server op, miss `#hasRunningServerOp`, and fall
through to the non-Gateway client-mode path — while the backend was
still paused awaiting human input.

Snapshot the paused op IDs before the resume call and retire them
only inside the try block after `executeGatewayAgent` resolves. On
failure the running marker stays intact so a retry still lands on
the Gateway branch and can re-issue the resume.

The helper was renamed from `#completeRunningServerOps(context)` to
`#completeOpsById(ids)` to reflect the new contract: callers must
snapshot beforehand, not re-query at completion time (which would
incorrectly match the new resume op too).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix(gateway): avoid double reject dispatch in reject-and-continue

Now that `rejectToolCall` delegates to `chatStore.rejectToolCalling`,
the chained `await get().rejectToolCall(...)` inside
`rejectAndContinueToolCall` fired a full halting reject before the
continue call. In Gateway mode that meant two resume ops on the same
tool_call_id (`decision='rejected'` followed by
`decision='rejected_continue'`) racing server-side; in client mode it
duplicated reject bookkeeping that `chatStore.rejectAndContinueToolCalling`
already handles internally.

Drop the chained call and fire `onToolRejected` inline so hook
semantics are preserved. `chatStore.rejectAndContinueToolCalling` is
now the single entry point for both the rejection persist and the
continue dispatch.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu 2026-04-16 01:43:00 +08:00 committed by GitHub
parent 2cf65e9fb3
commit 1005f442d6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 296 additions and 35 deletions

View file

@ -6,6 +6,7 @@ import { createStore } from '../../index';
// Mock dependencies
const mockApproveToolCalling = vi.fn();
const mockRejectToolCalling = vi.fn();
const mockRejectAndContinueToolCalling = vi.fn();
vi.mock('@/store/chat', () => ({
@ -14,6 +15,7 @@ vi.mock('@/store/chat', () => ({
messagesMap: {},
operations: {},
approveToolCalling: mockApproveToolCalling,
rejectToolCalling: mockRejectToolCalling,
rejectAndContinueToolCalling: mockRejectAndContinueToolCalling,
cancelOperations: vi.fn(),
cancelOperation: vi.fn(),
@ -193,7 +195,40 @@ describe('Tool Actions', () => {
});
expect(onToolRejected).toHaveBeenCalledWith('tool-call-1', 'Reason');
// Should not proceed when hook returns false
expect(mockRejectToolCalling).not.toHaveBeenCalled();
});
it('should delegate to ChatStore.rejectToolCalling with context', async () => {
const context: ConversationContext = {
agentId: 'session-1',
topicId: null,
threadId: null,
};
const store = createStore({ context });
await act(async () => {
await store.getState().rejectToolCall('tool-call-1', 'Reason');
});
expect(mockRejectToolCalling).toHaveBeenCalledWith('tool-call-1', 'Reason', context);
});
it('should pass agent_builder scope context correctly', async () => {
const context: ConversationContext = {
agentId: 'builder-agent',
topicId: 'builder-topic',
threadId: 'builder-thread',
scope: 'agent_builder',
};
const store = createStore({ context });
await act(async () => {
await store.getState().rejectToolCall('tool-call-1', 'Reason');
});
expect(mockRejectToolCalling).toHaveBeenCalledWith('tool-call-1', 'Reason', context);
});
});
@ -261,6 +296,69 @@ describe('Tool Actions', () => {
context,
);
});
it('should NOT also call ChatStore.rejectToolCalling (avoid double-dispatch with rejected_continue)', async () => {
const context: ConversationContext = {
agentId: 'session-1',
topicId: null,
threadId: null,
};
const store = createStore({ context });
await act(async () => {
await store.getState().rejectAndContinueToolCall('tool-call-1', 'Reason');
});
// Only the continue variant should fire. If `rejectToolCalling` also
// fires, Gateway mode would kick off a halting `decision='rejected'`
// resume op before the `decision='rejected_continue'` one and race
// two resume ops on the same tool_call_id.
expect(mockRejectAndContinueToolCalling).toHaveBeenCalledTimes(1);
expect(mockRejectToolCalling).not.toHaveBeenCalled();
});
it('should still fire the onToolRejected hook', async () => {
const onToolRejected = vi.fn().mockResolvedValue(true);
const context: ConversationContext = {
agentId: 'session-1',
topicId: null,
threadId: null,
};
const hooks: ConversationHooks = { onToolRejected };
const store = createStore({ context, hooks });
await act(async () => {
await store.getState().rejectAndContinueToolCall('tool-call-1', 'Reason');
});
expect(onToolRejected).toHaveBeenCalledWith('tool-call-1', 'Reason');
expect(mockRejectAndContinueToolCalling).toHaveBeenCalledWith(
'tool-call-1',
'Reason',
context,
);
});
it('should respect onToolRejected hook returning false and skip the continue call', async () => {
const onToolRejected = vi.fn().mockResolvedValue(false);
const context: ConversationContext = {
agentId: 'session-1',
topicId: null,
threadId: null,
};
const hooks: ConversationHooks = { onToolRejected };
const store = createStore({ context, hooks });
await act(async () => {
await store.getState().rejectAndContinueToolCall('tool-call-1', 'Reason');
});
expect(onToolRejected).toHaveBeenCalledWith('tool-call-1', 'Reason');
expect(mockRejectAndContinueToolCalling).not.toHaveBeenCalled();
});
});
describe('waitForPendingArgsUpdate integration', () => {

View file

@ -3,7 +3,6 @@ import { type StateCreator } from 'zustand';
import { useChatStore } from '@/store/chat';
import { type Store as ConversationStore } from '../../action';
import { dataSelectors } from '../data/selectors';
/**
* Tool Interaction Actions
@ -72,22 +71,36 @@ export const toolSlice: StateCreator<
},
rejectAndContinueToolCall: async (toolMessageId: string, reason?: string) => {
const { context, waitForPendingArgsUpdate } = get();
const { context, hooks, waitForPendingArgsUpdate } = get();
// Wait for any pending args update to complete before rejection
await waitForPendingArgsUpdate(toolMessageId);
// First reject the tool call
await get().rejectToolCall(toolMessageId, reason);
// ===== Hook: onToolRejected =====
// Fire the hook here directly rather than going through `rejectToolCall`.
// `rejectToolCall` now delegates to `chatStore.rejectToolCalling`, so
// chaining it would (in Gateway mode) kick off a halting
// `decision='rejected'` resume op before our own
// `decision='rejected_continue'` call below, racing two resume ops on
// the same tool_call_id. In client mode it would also duplicate the
// reject bookkeeping since `chatStore.rejectAndContinueToolCalling`
// already calls `chatStore.rejectToolCalling` internally.
if (hooks.onToolRejected) {
const shouldProceed = await hooks.onToolRejected(toolMessageId, reason);
if (shouldProceed === false) return;
}
// Then delegate to ChatStore to continue the conversation with context
// Delegate to ChatStore for rejection + continuation. In Gateway mode
// this fires a single `decision='rejected_continue'` resume op; in
// client mode it persists the rejection via an internal
// `chatStore.rejectToolCalling` call before resuming the local runtime.
const chatStore = useChatStore.getState();
await chatStore.rejectAndContinueToolCalling(toolMessageId, reason, context);
},
rejectToolCall: async (toolMessageId: string, reason?: string) => {
const state = get();
const { hooks, updateMessagePlugin, updateMessageContent, waitForPendingArgsUpdate } = state;
const { context, hooks, waitForPendingArgsUpdate } = state;
// Wait for any pending args update to complete before rejection
await waitForPendingArgsUpdate(toolMessageId);
@ -98,23 +111,14 @@ export const toolSlice: StateCreator<
if (shouldProceed === false) return;
}
const toolMessage = dataSelectors.getDbMessageById(toolMessageId)(state);
if (!toolMessage) return;
// Update intervention status to rejected
await updateMessagePlugin(toolMessageId, {
intervention: {
rejectedReason: reason,
status: 'rejected',
},
});
// Update tool message content with rejection reason
const toolContent = !!reason
? `User reject this tool calling with reason: ${reason}`
: 'User reject this tool calling without reason';
await updateMessageContent(toolMessageId, toolContent);
// Delegate to global ChatStore with context for correct conversation scope.
// In Gateway mode this also starts a new op carrying resumeApproval={decision:'rejected'}
// so the server releases the paused confirmation; without this the server op stays
// awaiting confirmation and the client loading state never clears.
// `chatStore.rejectToolCalling` does its own tool-message existence guard, so the
// lookup that used to live here is redundant.
const chatStore = useChatStore.getState();
await chatStore.rejectToolCalling(toolMessageId, reason, context);
},
skipToolInteraction: async (toolMessageId: string, reason?: string) => {

View file

@ -7,14 +7,29 @@ import { type IAgentStateManager, type IStreamEventManager } from './types';
const log = debug('lobe-server:agent-runtime:coordinator');
const TERMINAL_STATUSES = new Set<AgentState['status']>(['done', 'error', 'interrupted']);
/**
* Statuses that end the event stream for the current operationId.
*
* `done` / `error` / `interrupted` are genuinely terminal the op cannot
* resume. `waiting_for_human` is *stream-terminal but state-resumable*:
* the paused state lives on until a resume op (carrying the user's
* decision) starts, but that resume runs under a **new** operationId with
* its own event stream. For the paused operationId no further events will
* arrive, so clients should stop waiting the same way they do on done.
*/
const STREAM_END_STATUSES = new Set<AgentState['status']>([
'done',
'error',
'interrupted',
'waiting_for_human',
]);
const hasEnteredTerminalState = (
const hasEnteredStreamEndState = (
previousStatus?: AgentState['status'],
nextStatus?: AgentState['status'],
): nextStatus is 'done' | 'error' | 'interrupted' => {
const wasTerminal = previousStatus ? TERMINAL_STATUSES.has(previousStatus) : false;
return Boolean(nextStatus && TERMINAL_STATUSES.has(nextStatus) && !wasTerminal);
): nextStatus is 'done' | 'error' | 'interrupted' | 'waiting_for_human' => {
const wasStreamEnd = previousStatus ? STREAM_END_STATUSES.has(previousStatus) : false;
return Boolean(nextStatus && STREAM_END_STATUSES.has(nextStatus) && !wasStreamEnd);
};
export interface AgentRuntimeCoordinatorOptions {
@ -90,7 +105,7 @@ export class AgentRuntimeCoordinator {
await this.stateManager.saveAgentState(operationId, state);
// Send a terminal event once the operation first enters a terminal state.
if (hasEnteredTerminalState(previousState?.status, state.status)) {
if (hasEnteredStreamEndState(previousState?.status, state.status)) {
await this.streamEventManager.publishAgentRuntimeEnd(
operationId,
state.stepCount ?? previousState?.stepCount ?? 0,
@ -117,7 +132,7 @@ export class AgentRuntimeCoordinator {
await this.stateManager.saveStepResult(operationId, stepResult);
// This ensures agent_runtime_end is sent after all step events.
if (hasEnteredTerminalState(previousState?.status, stepResult.newState.status)) {
if (hasEnteredStreamEndState(previousState?.status, stepResult.newState.status)) {
await this.streamEventManager.publishAgentRuntimeEnd(
operationId,
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,

View file

@ -154,6 +154,23 @@ describe('AgentRuntimeCoordinator', () => {
);
});
it('should publish end event when status changes to waiting_for_human so the client releases its loading state', async () => {
const operationId = 'test-operation-id';
const previousState = { status: 'running', stepCount: 3 };
const newState = { status: 'waiting_for_human', stepCount: 4 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
operationId,
newState.stepCount,
newState,
'waiting_for_human',
);
});
it('should not publish end event when status was already done', async () => {
const operationId = 'test-operation-id';
const previousState = { status: 'done', stepCount: 5 };
@ -245,6 +262,26 @@ describe('AgentRuntimeCoordinator', () => {
);
});
it('should publish end event when status becomes waiting_for_human (paused awaiting approval)', async () => {
const operationId = 'test-operation-id';
const stepResult = {
executionTime: 1000,
newState: { status: 'waiting_for_human', stepCount: 4 },
stepIndex: 4,
};
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 3 });
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
operationId,
4,
stepResult.newState,
'waiting_for_human',
);
});
it('should publish end event when status becomes interrupted', async () => {
const operationId = 'test-operation-id';
const stepResult = {

View file

@ -675,6 +675,73 @@ describe('ConversationControl actions', () => {
);
expect(internal_execAgentRuntimeSpy).not.toHaveBeenCalled();
// Fallback guard: the paused `execServerAgentRuntime` op in this
// context must be completed so the loading state doesn't bleed
// across ops when the server-side `agent_runtime_end` for
// `waiting_for_human` hasn't landed yet.
const pausedServerOps = Object.values(result.current.operations).filter(
(op: any) => op.type === 'execServerAgentRuntime',
);
expect(pausedServerOps).toHaveLength(1);
expect(pausedServerOps[0]!.status).toBe('completed');
executeGatewayAgentSpy.mockRestore();
});
it('should leave the paused server op running when the Gateway resume call fails so retries stay on the server-mode path', async () => {
const { result } = renderHook(() => useChatStore());
const agentId = 'server-agent';
const topicId = 'server-topic';
const chatKey = messageMapKey({ agentId, topicId });
const toolMessage = createMockMessage({
id: 'tool-msg-1',
plugin: {
apiName: 'search',
arguments: '{"q":"test"}',
identifier: 'web-search',
type: 'default',
},
role: 'tool',
tool_call_id: 'call_xyz',
} as any);
act(() => {
useChatStore.setState({
activeAgentId: agentId,
activeTopicId: topicId,
dbMessagesMap: { [chatKey]: [toolMessage] },
messagesMap: { [chatKey]: [toolMessage] },
});
result.current.startOperation({
context: { agentId, topicId, threadId: null },
metadata: { serverOperationId: 'server-op-xyz' },
type: 'execServerAgentRuntime',
});
});
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
const executeGatewayAgentSpy = vi
.spyOn(result.current, 'executeGatewayAgent')
.mockRejectedValue(new Error('network error'));
await act(async () => {
await result.current.approveToolCalling('tool-msg-1', 'group-1');
});
expect(executeGatewayAgentSpy).toHaveBeenCalled();
// On failure, the paused server op must stay `running` — otherwise a
// retry would see no running server op and fall through to the
// non-Gateway path while the backend is still awaiting human input.
const serverOps = Object.values(result.current.operations).filter(
(op: any) => op.type === 'execServerAgentRuntime',
);
expect(serverOps).toHaveLength(1);
expect(serverOps[0]!.status).toBe('running');
executeGatewayAgentSpy.mockRestore();
});

View file

@ -45,8 +45,18 @@ export class ConversationControlActionImpl {
* to prevent.
*/
#hasRunningServerOp = (context: ConversationContext): boolean => {
return this.#getRunningServerOps(context).length > 0;
};
/**
* Return running (non-aborting) `execServerAgentRuntime` ops in the given
* context. Used both to detect Gateway mode and to snapshot the set of
* paused ops before starting a resume op so we can retire them once the
* resume has successfully taken over.
*/
#getRunningServerOps = (context: ConversationContext) => {
const { agentId, groupId, scope, subAgentId, topicId, threadId } = context;
if (!agentId) return false;
if (!agentId) return [];
const ops = operationSelectors.getOperationsByContext({
agentId,
groupId,
@ -55,12 +65,28 @@ export class ConversationControlActionImpl {
threadId: threadId ?? null,
topicId: topicId ?? null,
})(this.#get());
return ops.some(
return ops.filter(
(op) =>
op.type === 'execServerAgentRuntime' && op.status === 'running' && !op.metadata?.isAborting,
);
};
/**
* Client-side fallback guard that retires paused server ops once a Gateway
* resume op has started successfully. The server emits `agent_runtime_end`
* after `human_approve_required`, but if that event is delayed or the
* backend lacks the fix the paused op would linger as "running" and keep
* the loading spinner on. Callers must snapshot the IDs *before*
* `executeGatewayAgent` and only invoke this helper after the resume call
* resolves completing eagerly on failure would erase the running marker
* while the server is still paused, causing retries to miss the Gateway
* branch and fall through to client-mode.
*/
#completeOpsById = (opIds: readonly string[]): void => {
const { completeOperation } = this.#get();
for (const id of opIds) completeOperation(id);
};
stopGenerateMessage = (): void => {
const { activeAgentId, activeTopicId, cancelOperations } = this.#get();
@ -193,6 +219,11 @@ export class ConversationControlActionImpl {
completeOperation(operationId);
return;
}
// Snapshot paused op IDs before the resume call; retire them only
// after executeGatewayAgent succeeds so a transient failure leaves
// the running marker intact and `#hasRunningServerOp` still flags
// Gateway mode on retry.
const pausedOpIds = this.#getRunningServerOps(effectiveContext).map((op) => op.id);
try {
await this.#get().executeGatewayAgent({
context: effectiveContext,
@ -204,6 +235,7 @@ export class ConversationControlActionImpl {
toolCallId,
},
});
this.#completeOpsById(pausedOpIds);
completeOperation(operationId);
} catch (error) {
const err = error as Error;
@ -575,8 +607,11 @@ export class ConversationControlActionImpl {
// Server-mode: start a **new** Gateway op carrying `decision='rejected'`.
// Server persists the rejection on the target tool message and halts the
// conversation. The paused op stays where it was; the new op's
// `agent_runtime_end` clears the loading state on the client.
// conversation. The new op's `agent_runtime_end` clears the loading
// state on the client; the snapshot-then-retire dance below is a
// fallback guard in case the server-side human_approve agent_runtime_end
// was missed, while preserving the paused-op marker on failure so a
// retry still hits the Gateway branch.
if (this.#hasRunningServerOp(effectiveContext)) {
const toolCallId = toolMessage.tool_call_id;
if (!toolCallId) {
@ -586,6 +621,7 @@ export class ConversationControlActionImpl {
completeOperation(operationId);
return;
}
const pausedOpIds = this.#getRunningServerOps(effectiveContext).map((op) => op.id);
try {
await this.#get().executeGatewayAgent({
context: effectiveContext,
@ -598,6 +634,7 @@ export class ConversationControlActionImpl {
toolCallId,
},
});
this.#completeOpsById(pausedOpIds);
} catch (error) {
console.error('[rejectToolCalling][server] Gateway resume failed:', error);
}
@ -639,6 +676,8 @@ export class ConversationControlActionImpl {
return;
}
const pausedOpIds = this.#getRunningServerOps(effectiveContext).map((op) => op.id);
const { operationId } = startOperation({
type: 'rejectToolCalling',
context: {
@ -678,6 +717,7 @@ export class ConversationControlActionImpl {
toolCallId,
},
});
this.#completeOpsById(pausedOpIds);
completeOperation(operationId);
} catch (error) {
const err = error as Error;