mirror of
https://github.com/lobehub/lobehub
synced 2026-04-21 17:47:27 +00:00
🐛 fix(gateway): route approve/reject via lab flag (#13863)
🐛 fix(gateway): route approve/reject via lab flag, not transient server op state After the coordinator fix for `waiting_for_human` (#13860) the paused `execServerAgentRuntime` op is marked `completed` client-side as soon as the server emits `agent_runtime_end`. `startOperation` then runs `cleanupCompletedOperations(30_000)`, which deletes any op completed more than 30 seconds ago — so by the time the user sees the InterventionBar and clicks approve/reject, the running (or recently completed) server op is gone. The previous `#hasRunningServerOp` check therefore kept returning false against a live Gateway backend, flipping approve/reject into the client-mode `internal_execAgentRuntime` branch and stranding the server-side paused conversation. Switch the helper to `#shouldUseGatewayResume`, which checks the same `isGatewayModeEnabled()` lab flag used to route the initial send. The signal now mirrors how the conversation was dispatched and survives the op-cleanup window. New regression test exercises the post-coordinator-fix state: the paused `execServerAgentRuntime` op is explicitly `completed` before the approve call runs, and we still expect the Gateway branch to fire with `decision='approved'`. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
1005f442d6
commit
8109bbbbc3
2 changed files with 104 additions and 23 deletions
|
|
@ -641,8 +641,8 @@ describe('ConversationControl actions', () => {
|
|||
messagesMap: { [chatKey]: [toolMessage] },
|
||||
});
|
||||
|
||||
// Simulate a running server operation — presence of this op is
|
||||
// what flips approve/reject into server-mode.
|
||||
// Presence of an `execServerAgentRuntime` op (any status) is one
|
||||
// half of the Gateway-resume signal; the other is the lab flag.
|
||||
result.current.startOperation({
|
||||
context: { agentId, topicId, threadId: null },
|
||||
metadata: { serverOperationId: 'server-op-xyz' },
|
||||
|
|
@ -650,6 +650,7 @@ describe('ConversationControl actions', () => {
|
|||
});
|
||||
});
|
||||
|
||||
vi.spyOn(result.current, 'isGatewayModeEnabled').mockReturnValue(true);
|
||||
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
|
||||
const executeGatewayAgentSpy = vi
|
||||
.spyOn(result.current, 'executeGatewayAgent')
|
||||
|
|
@ -688,6 +689,78 @@ describe('ConversationControl actions', () => {
|
|||
executeGatewayAgentSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should still take the Gateway branch when the server already ended the paused op (post-coordinator-fix state)', async () => {
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
|
||||
const agentId = 'server-agent';
|
||||
const topicId = 'server-topic';
|
||||
const chatKey = messageMapKey({ agentId, topicId });
|
||||
|
||||
const toolMessage = createMockMessage({
|
||||
id: 'tool-msg-1',
|
||||
plugin: {
|
||||
apiName: 'search',
|
||||
arguments: '{"q":"test"}',
|
||||
identifier: 'web-search',
|
||||
type: 'default',
|
||||
},
|
||||
role: 'tool',
|
||||
tool_call_id: 'call_xyz',
|
||||
} as any);
|
||||
|
||||
let serverOpId: string | undefined;
|
||||
act(() => {
|
||||
useChatStore.setState({
|
||||
activeAgentId: agentId,
|
||||
activeTopicId: topicId,
|
||||
dbMessagesMap: { [chatKey]: [toolMessage] },
|
||||
messagesMap: { [chatKey]: [toolMessage] },
|
||||
});
|
||||
|
||||
serverOpId = result.current.startOperation({
|
||||
context: { agentId, topicId, threadId: null },
|
||||
metadata: { serverOperationId: 'server-op-xyz' },
|
||||
type: 'execServerAgentRuntime',
|
||||
}).operationId;
|
||||
|
||||
// Simulate the coordinator's `waiting_for_human` → `agent_runtime_end`
|
||||
// signal arriving before the user clicks approve: the op is already
|
||||
// `completed` when the Gateway-branch decision runs.
|
||||
result.current.completeOperation(serverOpId!);
|
||||
});
|
||||
|
||||
expect(result.current.operations[serverOpId!]!.status).toBe('completed');
|
||||
|
||||
vi.spyOn(result.current, 'isGatewayModeEnabled').mockReturnValue(true);
|
||||
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
|
||||
const executeGatewayAgentSpy = vi
|
||||
.spyOn(result.current, 'executeGatewayAgent')
|
||||
.mockResolvedValue({} as any);
|
||||
const internal_execAgentRuntimeSpy = vi
|
||||
.spyOn(result.current, 'internal_execAgentRuntime')
|
||||
.mockResolvedValue(undefined);
|
||||
|
||||
await act(async () => {
|
||||
await result.current.approveToolCalling('tool-msg-1', 'group-1');
|
||||
});
|
||||
|
||||
// Critical regression guard: with `#hasRunningServerOp` the branch
|
||||
// was missed here (no running op → fell through to client-mode).
|
||||
// The combined `isGatewayModeEnabled() + any execServerAgentRuntime`
|
||||
// check keeps us on the Gateway path.
|
||||
expect(executeGatewayAgentSpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
resumeApproval: expect.objectContaining({
|
||||
decision: 'approved',
|
||||
toolCallId: 'call_xyz',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
expect(internal_execAgentRuntimeSpy).not.toHaveBeenCalled();
|
||||
|
||||
executeGatewayAgentSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should leave the paused server op running when the Gateway resume call fails so retries stay on the server-mode path', async () => {
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
|
||||
|
|
@ -722,6 +795,7 @@ describe('ConversationControl actions', () => {
|
|||
});
|
||||
});
|
||||
|
||||
vi.spyOn(result.current, 'isGatewayModeEnabled').mockReturnValue(true);
|
||||
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
|
||||
const executeGatewayAgentSpy = vi
|
||||
.spyOn(result.current, 'executeGatewayAgent')
|
||||
|
|
@ -829,6 +903,7 @@ describe('ConversationControl actions', () => {
|
|||
});
|
||||
});
|
||||
|
||||
vi.spyOn(result.current, 'isGatewayModeEnabled').mockReturnValue(true);
|
||||
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
|
||||
const executeGatewayAgentSpy = vi
|
||||
.spyOn(result.current, 'executeGatewayAgent')
|
||||
|
|
@ -888,6 +963,7 @@ describe('ConversationControl actions', () => {
|
|||
});
|
||||
});
|
||||
|
||||
vi.spyOn(result.current, 'isGatewayModeEnabled').mockReturnValue(true);
|
||||
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
|
||||
vi.spyOn(result.current, 'optimisticUpdateMessageContent').mockResolvedValue(undefined);
|
||||
const executeGatewayAgentSpy = vi
|
||||
|
|
@ -944,6 +1020,7 @@ describe('ConversationControl actions', () => {
|
|||
});
|
||||
});
|
||||
|
||||
vi.spyOn(result.current, 'isGatewayModeEnabled').mockReturnValue(true);
|
||||
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
|
||||
vi.spyOn(result.current, 'optimisticUpdateMessageContent').mockResolvedValue(undefined);
|
||||
const executeGatewayAgentSpy = vi
|
||||
|
|
|
|||
|
|
@ -30,29 +30,33 @@ export class ConversationControlActionImpl {
|
|||
}
|
||||
|
||||
/**
|
||||
* Detect whether the given context has a running server-mode agent op.
|
||||
* When true, approve/reject/reject_continue should skip the local
|
||||
* `internal_execAgentRuntime` path and instead start a **new** Gateway op
|
||||
* carrying a `resumeApproval` decision — mirroring the "interrupt + new op"
|
||||
* pattern from LOBE-7142 so we don't need to reach into the paused op's
|
||||
* state to resume it in-place.
|
||||
* Decide whether approve/reject/reject_continue should go through the
|
||||
* Gateway resume path (new op carrying `resumeApproval`) instead of the
|
||||
* local `internal_execAgentRuntime` path. Mirrors the "interrupt + new op"
|
||||
* pattern from LOBE-7142.
|
||||
*
|
||||
* Must forward scope/groupId/subAgentId from the incoming context:
|
||||
* `operationsByContext` is keyed on the full `messageMapKey`, so a
|
||||
* thread/group/sub-agent conversation whose call site omits these fields
|
||||
* would silently lookup the 'main' scope bucket, miss the running server
|
||||
* op, and fall back to client-mode — exactly the bug this helper exists
|
||||
* to prevent.
|
||||
* Uses the same `isGatewayModeEnabled()` lab flag that routes the initial
|
||||
* send, so approve/reject align with how the conversation was dispatched.
|
||||
*
|
||||
* We deliberately do **not** look for a living `execServerAgentRuntime`
|
||||
* op here. The server's `waiting_for_human` → `agent_runtime_end` signal
|
||||
* marks the paused op `completed` client-side, and `startOperation` runs
|
||||
* `cleanupCompletedOperations(30_000)` on every new op, which means the
|
||||
* paused op is typically gone by the time the user clicks approve — so
|
||||
* scanning for it would flip us back into client-mode against a live
|
||||
* Gateway backend.
|
||||
*/
|
||||
#hasRunningServerOp = (context: ConversationContext): boolean => {
|
||||
return this.#getRunningServerOps(context).length > 0;
|
||||
#shouldUseGatewayResume = (): boolean => {
|
||||
return this.#get().isGatewayModeEnabled();
|
||||
};
|
||||
|
||||
/**
|
||||
* Return running (non-aborting) `execServerAgentRuntime` ops in the given
|
||||
* context. Used both to detect Gateway mode and to snapshot the set of
|
||||
* paused ops before starting a resume op so we can retire them once the
|
||||
* resume has successfully taken over.
|
||||
* context. Used only to snapshot paused ops before starting a resume op
|
||||
* so we can retire them if the server-side `agent_runtime_end` signal is
|
||||
* delayed or missing — see `#completeOpsById`. In steady state with the
|
||||
* coordinator fix active, this returns an empty list by the time approve
|
||||
* runs because the server already completed the op.
|
||||
*/
|
||||
#getRunningServerOps = (context: ConversationContext) => {
|
||||
const { agentId, groupId, scope, subAgentId, topicId, threadId } = context;
|
||||
|
|
@ -210,7 +214,7 @@ export class ConversationControlActionImpl {
|
|||
// message, persists `intervention=approved`, dispatches the approved
|
||||
// tool, and streams results back on the new op. No in-place resume of
|
||||
// the paused op — simpler state + avoids stepIndex races.
|
||||
if (this.#hasRunningServerOp(effectiveContext)) {
|
||||
if (this.#shouldUseGatewayResume()) {
|
||||
const toolCallId = toolMessage.tool_call_id;
|
||||
if (!toolCallId) {
|
||||
console.warn(
|
||||
|
|
@ -221,7 +225,7 @@ export class ConversationControlActionImpl {
|
|||
}
|
||||
// Snapshot paused op IDs before the resume call; retire them only
|
||||
// after executeGatewayAgent succeeds so a transient failure leaves
|
||||
// the running marker intact and `#hasRunningServerOp` still flags
|
||||
// the running marker intact and `#shouldUseGatewayResume` still flags
|
||||
// Gateway mode on retry.
|
||||
const pausedOpIds = this.#getRunningServerOps(effectiveContext).map((op) => op.id);
|
||||
try {
|
||||
|
|
@ -612,7 +616,7 @@ export class ConversationControlActionImpl {
|
|||
// fallback guard in case the server-side human_approve agent_runtime_end
|
||||
// was missed, while preserving the paused-op marker on failure so a
|
||||
// retry still hits the Gateway branch.
|
||||
if (this.#hasRunningServerOp(effectiveContext)) {
|
||||
if (this.#shouldUseGatewayResume()) {
|
||||
const toolCallId = toolMessage.tool_call_id;
|
||||
if (!toolCallId) {
|
||||
console.warn(
|
||||
|
|
@ -667,7 +671,7 @@ export class ConversationControlActionImpl {
|
|||
// the LLM loop with the rejection content surfaced as user feedback.
|
||||
// Skip the client-mode `rejectToolCalling` chain below — that would fire
|
||||
// a duplicate halting `reject` before this continue signal.
|
||||
if (this.#hasRunningServerOp(effectiveContext)) {
|
||||
if (this.#shouldUseGatewayResume()) {
|
||||
const toolCallId = toolMessage.tool_call_id;
|
||||
if (!toolCallId) {
|
||||
console.warn(
|
||||
|
|
|
|||
Loading…
Reference in a new issue