🔨 chore: wire server-mode human approval through conversationControl (#13830)

 feat(chat): server-mode human approval via new Gateway op + resumeApproval

When the current agent runtime is Gateway-mode (execServerAgentRuntime),
approve / reject / reject_continue now start a **new** Gateway op carrying
a `resumeApproval` decision instead of resuming the paused op in place
over tRPC — mirroring the "interrupt + new op" pattern from LOBE-7142
(stop/interrupt). This sidesteps the stepIndex / executeStep early-exit
race that was blocking the in-place resume path and matches the Linear
spec for LOBE-7152. Client mode is unchanged.

### Client

- `conversationControl.ts`
  - `approveToolCalling` / `rejectToolCalling` / `rejectAndContinueToolCalling`:
    server-mode branch calls `executeGatewayAgent({ message: '',
    parentMessageId: toolMessageId, resumeApproval: { decision, ... } })`.
    The local runtime never spins up; the new op's `agent_runtime_end`
    clears loading.
  - `#hasRunningServerOp` replaces the old `#getServerOperationId` helper
    (we no longer need the paused op's id). Forwards scope/groupId/
    subAgentId from `ConversationContext` into the operation lookup so
    group/thread conversations correctly resolve their running server op
    — `operationsByContext` is keyed on the full `messageMapKey`.
- `gateway.ts` — `executeGatewayAgent` takes an optional `resumeApproval`
  and forwards it to `aiAgentService.execAgentTask`.
- `services/aiAgent.ts` — `ExecAgentTaskParams.resumeApproval` with new
  `ResumeApprovalParam` shape (decision + parentMessageId + toolCallId
  + optional rejectionReason).
- `gatewayEventHandler.ts` — kept the `toolMessageIds` branch that fetches
  pending tool messages on `tools_calling`.
- `services/agentRuntime/{type,index}.ts` — removed the short-lived
  `toolMessageId` / `reject_continue` additions; this flow no longer
  routes through `processHumanIntervention`.
- `store/chat/slices/operation/selectors.ts` — `getOperationsByContext` /
  `hasRunningOperationByContext` now take `MessageMapKeyInput` so scope/
  group/subAgent fields are honoured end-to-end.

### Server

- `ExecAgentSchema` / `InternalExecAgentParams.resumeApproval` — optional
  `{ decision, parentMessageId, rejectionReason?, toolCallId }`.
- `AiAgentService.execAgent`
  - `resumeApproval` implies resume semantics (skip user-message creation,
    reuse `parentMessageId` as the target tool message). Folded into a
    single `effectiveResume` flag so the existing resume branches apply.
  - Validates parent is a `role='tool'` message whose `tool_call_id`
    matches the request — guards stale / double-clicks.
  - Writes the decision to DB before `historyMessages` is fetched so the
    runtime sees the updated tool message on the first step:
    * `approved` → `intervention: { status: 'approved' }`
    * `rejected` / `rejected_continue` → tool content =
      "User reject this tool calling [with reason: X]",
      `intervention: { status: 'rejected', rejectedReason }`.
  - Branches initial runtime context:
    * `approved` → `phase: 'human_approved_tool'` + `approvedToolCall`
      payload rebuilt from the tool message plugin → runtime executes
      the tool.
    * `rejected` / `rejected_continue` → `phase: 'user_input'` with
      empty content → LLM re-reads history (now including the rejected
      tool) and responds. Both decisions share this path: the client
      split is only about optimistic writes and button UX; once the
      rejection is persisted there's nothing meaningful to differentiate
      server-side.

### Tests

- `conversationControl.test.ts` — rewrote the three server-mode blocks
  to spy `executeGatewayAgent` and assert the `resumeApproval` payload
  shape. Added a regression test covering group-scope lookup so dropping
  scope/groupId from `#hasRunningServerOp` breaks the suite.
- `execAgent.resumeApproval.test.ts` (new) — covers approved and the
  unified rejected branches (parameterized), the no-reason fallback, and
  the role/tool_call_id validation guards.

Relates to LOBE-7152.

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu 2026-04-15 21:17:22 +08:00 committed by GitHub
parent 75626de0b3
commit 843cb8f30b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 1005 additions and 19 deletions

View file

@ -114,6 +114,24 @@ const ExecAgentSchema = z
parentMessageId: z.string().optional(),
/** The user input/prompt */
prompt: z.string(),
/**
* Resume a previous op paused on `human_approve_required`. When set, the
* new op writes the decision to the target tool message and either runs
* the approved tool (`approved`), halts with reason=`human_rejected`
* (`rejected`), or surfaces the rejection as user feedback so the LLM
* can continue (`rejected_continue`).
*/
resumeApproval: z
.object({
decision: z.enum(['approved', 'rejected', 'rejected_continue']),
/** ID of the pending `role='tool'` message this decision targets. */
parentMessageId: z.string(),
/** Optional user-supplied rejection reason (only meaningful for rejected variants). */
rejectionReason: z.string().optional(),
/** tool_call_id of the pending tool call being approved/rejected. */
toolCallId: z.string(),
})
.optional(),
/** The agent slug to run (either agentId or slug is required) */
slug: z.string().optional(),
/**
@ -554,6 +572,7 @@ export const aiAgentRouter = router({
existingMessageIds = [],
fileIds,
parentMessageId,
resumeApproval,
userInterventionConfig,
} = input;
@ -570,8 +589,10 @@ export const aiAgentRouter = router({
fileIds,
parentMessageId,
prompt,
// When parentMessageId is provided, this is a regeneration/continue — skip user message creation
// When parentMessageId is provided, this is a regeneration/continue or a
// human-approval resume — either way, skip user message creation.
resume: !!parentMessageId,
resumeApproval,
slug,
userInterventionConfig,
});

View file

@ -0,0 +1,301 @@
import type * as ModelBankModule from 'model-bank';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { AiAgentService } from '../index';
const {
mockCreateOperation,
mockFindById,
mockMessageCreate,
mockMessageQuery,
mockUpdateMessagePlugin,
mockUpdateToolMessage,
} = vi.hoisted(() => ({
mockCreateOperation: vi.fn(),
mockFindById: vi.fn(),
mockMessageCreate: vi.fn(),
mockMessageQuery: vi.fn(),
mockUpdateMessagePlugin: vi.fn(),
mockUpdateToolMessage: vi.fn(),
}));
vi.mock('@/libs/trusted-client', () => ({
generateTrustedClientToken: vi.fn().mockReturnValue(undefined),
getTrustedClientTokenForSession: vi.fn().mockResolvedValue(undefined),
isTrustedClientEnabled: vi.fn().mockReturnValue(false),
}));
vi.mock('@/database/models/message', () => ({
MessageModel: vi.fn().mockImplementation(() => ({
create: mockMessageCreate,
findById: mockFindById,
query: mockMessageQuery,
update: vi.fn().mockResolvedValue({}),
updateMessagePlugin: mockUpdateMessagePlugin,
updateToolMessage: mockUpdateToolMessage,
})),
}));
vi.mock('@/database/models/agent', () => ({
AgentModel: vi.fn().mockImplementation(() => ({ queryAgents: vi.fn().mockResolvedValue([]) })),
}));
vi.mock('@/server/services/agent', () => ({
AgentService: vi.fn().mockImplementation(() => ({
getAgentConfig: vi.fn().mockResolvedValue({
chatConfig: {},
id: 'agent-1',
knowledgeBases: [],
model: 'gpt-4',
plugins: [],
provider: 'openai',
systemRole: 'You are a helpful assistant',
}),
})),
}));
vi.mock('@/database/models/plugin', () => ({
PluginModel: vi.fn().mockImplementation(() => ({ query: vi.fn().mockResolvedValue([]) })),
}));
vi.mock('@/database/models/topic', () => ({
TopicModel: vi.fn().mockImplementation(() => ({
create: vi.fn().mockResolvedValue({ id: 'topic-1' }),
updateMetadata: vi.fn(),
})),
}));
vi.mock('@/database/models/thread', () => ({
ThreadModel: vi.fn().mockImplementation(() => ({
create: vi.fn(),
findById: vi.fn(),
update: vi.fn(),
})),
}));
vi.mock('@/database/models/user', () => ({
UserModel: vi.fn().mockImplementation(() => ({
getUserSettings: vi.fn().mockResolvedValue(undefined),
})),
}));
vi.mock('@/database/models/userMemory/persona', () => ({
UserPersonaModel: vi.fn().mockImplementation(() => ({
getLatestPersonaDocument: vi.fn().mockResolvedValue(undefined),
})),
}));
vi.mock('@/server/services/agentRuntime', () => ({
AgentRuntimeService: vi.fn().mockImplementation(() => ({
createOperation: mockCreateOperation,
})),
}));
vi.mock('@/server/services/market', () => ({
MarketService: vi.fn().mockImplementation(() => ({
getLobehubSkillManifests: vi.fn().mockResolvedValue([]),
})),
}));
vi.mock('@/server/services/klavis', () => ({
KlavisService: vi.fn().mockImplementation(() => ({
getKlavisManifests: vi.fn().mockResolvedValue([]),
})),
}));
vi.mock('@/server/services/file', () => ({
FileService: vi.fn().mockImplementation(() => ({ uploadFromUrl: vi.fn() })),
}));
vi.mock('@/server/modules/Mecha', () => ({
createServerAgentToolsEngine: vi.fn().mockReturnValue({
generateToolsDetailed: vi.fn().mockReturnValue({ enabledToolIds: [], tools: [] }),
getEnabledPluginManifests: vi.fn().mockReturnValue(new Map()),
}),
}));
vi.mock('@/server/services/toolExecution/deviceProxy', () => ({
deviceProxy: { isConfigured: false, queryDeviceList: vi.fn().mockResolvedValue([]) },
}));
vi.mock('model-bank', async (importOriginal) => {
const actual = await importOriginal<typeof ModelBankModule>();
return {
...actual,
LOBE_DEFAULT_MODEL_LIST: [
{
abilities: { functionCall: true, vision: true },
id: 'gpt-4',
providerId: 'openai',
},
],
};
});
describe('AiAgentService.execAgent - resumeApproval', () => {
let service: AiAgentService;
const pendingToolMessage = {
id: 'tool-msg-1',
plugin: {
apiName: 'runCommand',
arguments: '{"command":"echo"}',
identifier: 'lobe-local-system',
type: 'builtin',
},
role: 'tool',
sessionId: 'session-1',
threadId: 'thread-1',
tool_call_id: 'call_xyz',
topicId: 'topic-1',
};
beforeEach(() => {
vi.clearAllMocks();
mockCreateOperation.mockResolvedValue({
autoStarted: true,
messageId: 'queue-msg-1',
operationId: 'op-123',
success: true,
});
mockFindById.mockResolvedValue(pendingToolMessage);
mockMessageQuery.mockResolvedValue([{ content: 'hi', id: 'history-1', role: 'user' }]);
mockMessageCreate.mockResolvedValue({ id: 'assistant-msg-new' });
mockUpdateMessagePlugin.mockResolvedValue(undefined);
mockUpdateToolMessage.mockResolvedValue(undefined);
service = new AiAgentService({} as any, 'user-1');
});
const baseParams = {
agentId: 'agent-1',
appContext: { sessionId: 'session-1', threadId: 'thread-1', topicId: 'topic-1' },
parentMessageId: 'tool-msg-1',
prompt: '',
};
describe('decision=approved', () => {
it('persists intervention=approved and seeds initialContext for human_approved_tool', async () => {
await service.execAgent({
...baseParams,
resumeApproval: {
decision: 'approved',
parentMessageId: 'tool-msg-1',
toolCallId: 'call_xyz',
},
});
expect(mockUpdateMessagePlugin).toHaveBeenCalledWith('tool-msg-1', {
intervention: { status: 'approved' },
});
// `approved` decision never writes tool content — the content arrives
// when the approved tool actually executes.
expect(mockUpdateToolMessage).not.toHaveBeenCalled();
expect(mockCreateOperation).toHaveBeenCalledWith(
expect.objectContaining({
initialContext: expect.objectContaining({
payload: expect.objectContaining({
approvedToolCall: expect.objectContaining({
apiName: 'runCommand',
arguments: '{"command":"echo"}',
id: 'call_xyz',
identifier: 'lobe-local-system',
}),
parentMessageId: 'tool-msg-1',
skipCreateToolMessage: true,
}),
phase: 'human_approved_tool',
}),
}),
);
});
});
// Server handles `rejected` and `rejected_continue` identically — both
// persist the rejection and resume the LLM with the updated history so it
// can respond to the user. The client-side split is only about optimistic
// writes / button UX; beyond the DB write there's nothing meaningful to
// differentiate once the decision is persisted.
describe.each([
['rejected' as const, 'not appropriate', 'with reason: not appropriate'],
['rejected_continue' as const, 'too risky', 'with reason: too risky'],
])('decision=%s', (decision, rejectionReason, expectedSuffix) => {
it(`persists rejection + resumes LLM with user_input phase`, async () => {
await service.execAgent({
...baseParams,
resumeApproval: {
decision,
parentMessageId: 'tool-msg-1',
rejectionReason,
toolCallId: 'call_xyz',
},
});
expect(mockUpdateToolMessage).toHaveBeenCalledWith('tool-msg-1', {
content: `User reject this tool calling ${expectedSuffix}`,
});
expect(mockUpdateMessagePlugin).toHaveBeenCalledWith('tool-msg-1', {
intervention: { rejectedReason: rejectionReason, status: 'rejected' },
});
expect(mockCreateOperation).toHaveBeenCalledWith(
expect.objectContaining({
initialContext: expect.objectContaining({
payload: expect.objectContaining({
message: [{ content: '' }],
parentMessageId: 'tool-msg-1',
}),
phase: 'user_input',
}),
}),
);
});
});
it('falls back to the no-reason rejection string when rejectionReason is omitted', async () => {
await service.execAgent({
...baseParams,
resumeApproval: {
decision: 'rejected',
parentMessageId: 'tool-msg-1',
toolCallId: 'call_xyz',
},
});
expect(mockUpdateToolMessage).toHaveBeenCalledWith('tool-msg-1', {
content: 'User reject this tool calling without reason',
});
});
describe('validation guards', () => {
it('throws when the parent message is not role=tool', async () => {
mockFindById.mockResolvedValue({ ...pendingToolMessage, role: 'user' });
await expect(
service.execAgent({
...baseParams,
resumeApproval: {
decision: 'approved',
parentMessageId: 'tool-msg-1',
toolCallId: 'call_xyz',
},
}),
).rejects.toThrow(/role='tool'/);
});
it('throws when the stored tool_call_id does not match the resume request', async () => {
mockFindById.mockResolvedValue({ ...pendingToolMessage, tool_call_id: 'call_other' });
await expect(
service.execAgent({
...baseParams,
resumeApproval: {
decision: 'approved',
parentMessageId: 'tool-msg-1',
toolCallId: 'call_xyz',
},
}),
).rejects.toThrow(/toolCallId mismatch/);
});
});
});

View file

@ -136,6 +136,21 @@ interface InternalExecAgentParams extends ExecAgentParams {
queueRetryDelay?: string;
/** Whether to continue execution from an existing persisted message */
resume?: boolean;
/**
* When present, this execAgent call acts as the "continue" step for a
* previous op that hit `human_approve_required`. The service writes the
* decision to the target tool message and either runs the approved tool
* (`approved`), halts with `reason='human_rejected'` (`rejected`), or
* surfaces the rejection as user feedback so the LLM can respond
* (`rejected_continue`). `parentMessageId` must point at the pending tool
* message.
*/
resumeApproval?: {
decision: 'approved' | 'rejected' | 'rejected_continue';
parentMessageId: string;
rejectionReason?: string;
toolCallId: string;
};
/** Abort startup before the agent runtime operation is created */
signal?: AbortSignal;
/**
@ -249,6 +264,7 @@ export class AiAgentService {
queueRetryDelay,
parentMessageId,
resume,
resumeApproval,
} = params;
// Validate that either agentId or slug is provided
@ -350,7 +366,14 @@ export class AiAgentService {
let resumeParentMessage;
if (resume) {
// `resumeApproval` implies the same "load parent message + skip user
// message creation" semantics as `resume`. Callers that go through the
// tRPC router get `resume: true` via the router, but the service-level
// API allows resumeApproval alone — fold both into a single effective
// flag so downstream resume branches don't need to know about approval.
const effectiveResume = resume || !!resumeApproval;
if (effectiveResume) {
if (!parentMessageId) {
throw new Error('parentMessageId is required when resume is true');
}
@ -385,6 +408,56 @@ export class AiAgentService {
}
}
// 2.6. Human-approval resume: write the user's decision to the target tool
// message in the DB so the history fetched below (step 11) + the runtime
// state both reflect the decision before the first step runs. Validates
// the parent is actually a pending tool message tied to the tool call we
// were asked about — guards against stale / double-clicks.
if (resumeApproval) {
if (!resumeParentMessage) {
throw new Error('resumeApproval requires parentMessageId to point at a tool message');
}
if (resumeParentMessage.role !== 'tool') {
throw new Error(
`resumeApproval.parentMessageId must point at a role='tool' message, got role='${resumeParentMessage.role}'`,
);
}
const existingToolCallId = (resumeParentMessage as any).tool_call_id;
if (existingToolCallId && existingToolCallId !== resumeApproval.toolCallId) {
throw new Error(
`resumeApproval.toolCallId mismatch for message ${resumeApproval.parentMessageId}: ` +
`stored=${existingToolCallId}, requested=${resumeApproval.toolCallId}`,
);
}
const { decision, rejectionReason } = resumeApproval;
if (decision === 'approved') {
await this.messageModel.updateMessagePlugin(resumeApproval.parentMessageId, {
intervention: { status: 'approved' },
});
} else {
// rejected / rejected_continue both write the same rejection content
// + intervention state. The difference surfaces later in how the new
// op's initial state/context are configured (halt vs. continue LLM).
const rejectionContent = rejectionReason
? `User reject this tool calling with reason: ${rejectionReason}`
: 'User reject this tool calling without reason';
await this.messageModel.updateToolMessage(resumeApproval.parentMessageId, {
content: rejectionContent,
});
await this.messageModel.updateMessagePlugin(resumeApproval.parentMessageId, {
intervention: { rejectedReason: rejectionReason, status: 'rejected' },
});
}
log(
'execAgent: resumeApproval decision=%s applied to tool message %s (toolCallId=%s)',
decision,
resumeApproval.parentMessageId,
resumeApproval.toolCallId,
);
}
// 3. Handle topic creation: if no topicId provided, create a new topic; otherwise reuse existing
let topicId = appContext?.topicId;
const topicBoundDeviceId = requestedDeviceId;
@ -1146,7 +1219,7 @@ export class AiAgentService {
// 13. Create user message in database
// Include threadId if provided (for SubAgent task execution in isolated Thread)
const userMessageRecord = resume
const userMessageRecord = effectiveResume
? undefined
: await this.messageModel.create({
agentId: resolvedAgentId,
@ -1195,7 +1268,7 @@ export class AiAgentService {
};
// Combine history messages with user message
const allMessages = resume ? historyMessages : [...historyMessages, userMessage];
const allMessages = effectiveResume ? historyMessages : [...historyMessages, userMessage];
log('execAgent: prepared evalContext for executor');
@ -1206,12 +1279,12 @@ export class AiAgentService {
const operationId = `op_${timestamp}_${resolvedAgentId}_${topicId}_${nanoid(8)}`;
// 16. Create initial context
const initialContext: AgentRuntimeContext = {
let initialContext: AgentRuntimeContext = {
payload: {
// Pass assistant message ID so agent runtime knows which message to update
assistantMessageId: assistantMessageRecord.id,
isFirstMessage: true,
message: resume ? [{ content: '' }] : [{ content: prompt }],
message: effectiveResume ? [{ content: '' }] : [{ content: prompt }],
// Pass user message ID as parentMessageId for reference
parentMessageId: parentMessageId ?? userMessageRecord?.id ?? '',
// Include tools for initial LLM call
@ -1226,6 +1299,61 @@ export class AiAgentService {
},
};
// 16b. Human-approval resume — override initialContext based on the
// user's decision. The DB write above has already persisted the
// intervention status, so `allMessages` reflects the decision for the
// LLM / runner on the first step.
//
// `rejected` and `rejected_continue` share the same server-side path:
// both surface the rejection to the LLM as user feedback via
// `phase: 'user_input'`. The client-side split (halt vs. continue) is
// only about the UX of the button and the optimistic writes — once the
// decision is persisted, there's nothing meaningful to do differently
// server-side, and letting the LLM produce a brief acknowledgement keeps
// the conversation cleanly terminated either way.
if (resumeApproval && resumeParentMessage) {
if (resumeApproval.decision === 'approved') {
// Ask the runtime to execute the approved tool directly. Matches the
// `phase: 'human_approved_tool'` contract used by the in-place
// handleHumanIntervention flow — the runner generates a `call_tool`
// instruction keyed on this payload.
const toolPlugin = (resumeParentMessage as any).plugin as
| { apiName?: string; arguments?: string; identifier?: string; type?: string }
| undefined;
initialContext = {
payload: {
approvedToolCall: {
apiName: toolPlugin?.apiName,
arguments: toolPlugin?.arguments,
id: resumeApproval.toolCallId,
identifier: toolPlugin?.identifier,
type: toolPlugin?.type ?? 'default',
},
assistantMessageId: assistantMessageRecord.id,
parentMessageId: resumeApproval.parentMessageId,
skipCreateToolMessage: true,
} as any,
phase: 'human_approved_tool' as const,
session: {
messageCount: allMessages.length,
sessionId: operationId,
status: 'idle' as const,
stepCount: 0,
},
};
} else {
initialContext = {
...initialContext,
payload: {
...(initialContext.payload as any),
isFirstMessage: false,
message: [{ content: '' }],
parentMessageId: resumeApproval.parentMessageId,
},
};
}
}
// 17. Log final operation parameters summary
log(
'execAgent: creating operation %s with params: model=%s, provider=%s, tools=%d, messages=%d, manifests=%d',

View file

@ -4,6 +4,26 @@ import { lambdaClient } from '@/libs/trpc/client';
export type { ExecAgentResult };
/**
* Resume instruction for an operation that hit `human_approve_required`. When
* present, the new op acts as the "continue" step: server reads the target tool
* message, writes the user's decision, and either re-dispatches the tool
* (approved) or feeds the rejection back to the LLM as user feedback
* (rejected / rejected_continue).
*
* Kept as a top-level field (not folded into `appContext`) so the server schema
* can validate it independently.
*/
export interface ResumeApprovalParam {
decision: 'approved' | 'rejected' | 'rejected_continue';
/** ID of the pending `role='tool'` message this decision targets. */
parentMessageId: string;
/** Optional user-supplied rejection reason (only meaningful for rejected variants). */
rejectionReason?: string;
/** tool_call_id of the pending tool call being approved/rejected. */
toolCallId: string;
}
export interface ExecAgentTaskParams {
agentId?: string;
appContext?: {
@ -27,6 +47,8 @@ export interface ExecAgentTaskParams {
/** Parent message ID for regeneration/continue (skip user message creation, branch from this message) */
parentMessageId?: string;
prompt: string;
/** Resume a previous op paused on `human_approve_required` instead of starting from a fresh user prompt. */
resumeApproval?: ResumeApprovalParam;
slug?: string;
}

View file

@ -10,6 +10,22 @@ import { resetTestEnvironment } from './helpers';
// Keep zustand mock as it's needed globally
vi.mock('zustand/traditional');
// Mock the tRPC client & agentRuntimeService so the import chain doesn't pull
// server-only code (cloud business packages, redis envs) into the test env.
vi.mock('@/libs/trpc/client', () => ({
lambdaClient: {
aiAgent: {
processHumanIntervention: { mutate: vi.fn().mockResolvedValue({ success: true }) },
},
},
}));
vi.mock('@/services/agentRuntime', () => ({
agentRuntimeService: {
handleHumanIntervention: vi.fn().mockResolvedValue({ success: true }),
},
}));
beforeEach(() => {
resetTestEnvironment();
});
@ -594,6 +610,309 @@ describe('ConversationControl actions', () => {
// Should not call internal_execAgentRuntime when tool message not found
expect(internal_execAgentRuntimeSpy).not.toHaveBeenCalled();
});
describe('server-mode branch', () => {
it('should start a new Gateway op with resumeApproval.decision=approved and NOT run local runtime', async () => {
const { result } = renderHook(() => useChatStore());
const agentId = 'server-agent';
const topicId = 'server-topic';
const chatKey = messageMapKey({ agentId, topicId });
const toolMessage = createMockMessage({
id: 'tool-msg-1',
plugin: {
apiName: 'search',
arguments: '{"q":"test"}',
identifier: 'web-search',
type: 'default',
},
role: 'tool',
// `tool_call_id` is what the server uses to locate the pending tool
// call; the new Gateway op carries it forward via `resumeApproval`.
tool_call_id: 'call_xyz',
} as any);
act(() => {
useChatStore.setState({
activeAgentId: agentId,
activeTopicId: topicId,
dbMessagesMap: { [chatKey]: [toolMessage] },
messagesMap: { [chatKey]: [toolMessage] },
});
// Simulate a running server operation — presence of this op is
// what flips approve/reject into server-mode.
result.current.startOperation({
context: { agentId, topicId, threadId: null },
metadata: { serverOperationId: 'server-op-xyz' },
type: 'execServerAgentRuntime',
});
});
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
const executeGatewayAgentSpy = vi
.spyOn(result.current, 'executeGatewayAgent')
.mockResolvedValue({} as any);
const internal_execAgentRuntimeSpy = vi
.spyOn(result.current, 'internal_execAgentRuntime')
.mockResolvedValue(undefined);
await act(async () => {
await result.current.approveToolCalling('tool-msg-1', 'group-1');
});
expect(executeGatewayAgentSpy).toHaveBeenCalledWith(
expect.objectContaining({
message: '',
parentMessageId: 'tool-msg-1',
resumeApproval: {
decision: 'approved',
parentMessageId: 'tool-msg-1',
toolCallId: 'call_xyz',
},
}),
);
expect(internal_execAgentRuntimeSpy).not.toHaveBeenCalled();
executeGatewayAgentSpy.mockRestore();
});
it('should fall through to client-mode runtime when no server operation is running', async () => {
const { result } = renderHook(() => useChatStore());
const agentId = 'local-agent';
const topicId = 'local-topic';
const chatKey = messageMapKey({ agentId, topicId });
const toolMessage = createMockMessage({
id: 'tool-msg-1',
plugin: { identifier: 'x', type: 'default', arguments: '{}', apiName: 'y' },
role: 'tool',
tool_call_id: 'call_local',
} as any);
act(() => {
useChatStore.setState({
activeAgentId: agentId,
activeTopicId: topicId,
dbMessagesMap: { [chatKey]: [toolMessage] },
messagesMap: { [chatKey]: [toolMessage] },
});
});
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
vi.spyOn(result.current, 'internal_createAgentState').mockReturnValue({
state: {} as any,
context: { phase: 'init' } as any,
agentConfig: createMockResolvedAgentConfig(),
});
const executeGatewayAgentSpy = vi
.spyOn(result.current, 'executeGatewayAgent')
.mockResolvedValue({} as any);
const internal_execAgentRuntimeSpy = vi
.spyOn(result.current, 'internal_execAgentRuntime')
.mockResolvedValue(undefined);
await act(async () => {
await result.current.approveToolCalling('tool-msg-1', 'group-1');
});
expect(executeGatewayAgentSpy).not.toHaveBeenCalled();
expect(internal_execAgentRuntimeSpy).toHaveBeenCalled();
executeGatewayAgentSpy.mockRestore();
});
it('resolves the running server op in a group scope context (scope/groupId forwarded to the lookup)', async () => {
// Regression: operationsByContext is keyed by the full messageMapKey
// including scope/groupId. If #hasRunningServerOp were to drop those
// fields, a group conversation's approve/reject would miss the op and
// fall back to client mode. Assert the server-mode branch fires with
// the group context intact.
const { result } = renderHook(() => useChatStore());
const agentId = 'server-agent';
const groupId = 'group-1';
const topicId = 'server-topic';
const scope = 'group' as const;
const chatKey = messageMapKey({ agentId, groupId, scope, topicId });
const toolMessage = createMockMessage({
id: 'tool-msg-1',
plugin: { apiName: 'y', arguments: '{}', identifier: 'x', type: 'default' },
role: 'tool',
tool_call_id: 'call_group',
} as any);
act(() => {
useChatStore.setState({
activeAgentId: agentId,
activeTopicId: topicId,
dbMessagesMap: { [chatKey]: [toolMessage] },
messagesMap: { [chatKey]: [toolMessage] },
});
// Server op is indexed under the group-scope key. Without scope
// forwarding the lookup would hit the default 'main' bucket instead.
result.current.startOperation({
context: { agentId, groupId, scope, topicId, threadId: null },
metadata: { serverOperationId: 'server-op-group' },
type: 'execServerAgentRuntime',
});
});
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
const executeGatewayAgentSpy = vi
.spyOn(result.current, 'executeGatewayAgent')
.mockResolvedValue({} as any);
const internal_execAgentRuntimeSpy = vi
.spyOn(result.current, 'internal_execAgentRuntime')
.mockResolvedValue(undefined);
await act(async () => {
await result.current.approveToolCalling('tool-msg-1', 'group-1', {
agentId,
groupId,
scope,
topicId,
threadId: null,
});
});
expect(executeGatewayAgentSpy).toHaveBeenCalledWith(
expect.objectContaining({
resumeApproval: expect.objectContaining({ decision: 'approved' }),
}),
);
expect(internal_execAgentRuntimeSpy).not.toHaveBeenCalled();
executeGatewayAgentSpy.mockRestore();
});
});
});
describe('rejectToolCalling server-mode branch', () => {
it('starts a new Gateway op with resumeApproval.decision=rejected', async () => {
const { result } = renderHook(() => useChatStore());
const agentId = 'server-agent';
const topicId = 'server-topic';
const chatKey = messageMapKey({ agentId, topicId });
const toolMessage = createMockMessage({
id: 'tool-msg-1',
role: 'tool',
tool_call_id: 'call_xyz',
} as any);
act(() => {
useChatStore.setState({
activeAgentId: agentId,
activeTopicId: topicId,
dbMessagesMap: { [chatKey]: [toolMessage] },
messagesMap: { [chatKey]: [toolMessage] },
});
result.current.startOperation({
context: { agentId, topicId, threadId: null },
metadata: { serverOperationId: 'server-op-xyz' },
type: 'execServerAgentRuntime',
});
});
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
vi.spyOn(result.current, 'optimisticUpdateMessageContent').mockResolvedValue(undefined);
const executeGatewayAgentSpy = vi
.spyOn(result.current, 'executeGatewayAgent')
.mockResolvedValue({} as any);
await act(async () => {
await result.current.rejectToolCalling('tool-msg-1', 'not appropriate');
});
expect(executeGatewayAgentSpy).toHaveBeenCalledWith(
expect.objectContaining({
message: '',
parentMessageId: 'tool-msg-1',
resumeApproval: {
decision: 'rejected',
parentMessageId: 'tool-msg-1',
rejectionReason: 'not appropriate',
toolCallId: 'call_xyz',
},
}),
);
executeGatewayAgentSpy.mockRestore();
});
});
describe('rejectAndContinueToolCalling server-mode branch', () => {
it('starts a new Gateway op with resumeApproval.decision=rejected_continue and skips both local runtime and client rejectToolCalling', async () => {
const { result } = renderHook(() => useChatStore());
const agentId = 'server-agent';
const topicId = 'server-topic';
const chatKey = messageMapKey({ agentId, topicId });
const toolMessage = createMockMessage({
id: 'tool-msg-1',
role: 'tool',
tool_call_id: 'call_xyz',
} as any);
act(() => {
useChatStore.setState({
activeAgentId: agentId,
activeTopicId: topicId,
dbMessagesMap: { [chatKey]: [toolMessage] },
messagesMap: { [chatKey]: [toolMessage] },
});
result.current.startOperation({
context: { agentId, topicId, threadId: null },
metadata: { serverOperationId: 'server-op-xyz' },
type: 'execServerAgentRuntime',
});
});
vi.spyOn(result.current, 'optimisticUpdateMessagePlugin').mockResolvedValue(undefined);
vi.spyOn(result.current, 'optimisticUpdateMessageContent').mockResolvedValue(undefined);
const executeGatewayAgentSpy = vi
.spyOn(result.current, 'executeGatewayAgent')
.mockResolvedValue({} as any);
const internal_execAgentRuntimeSpy = vi
.spyOn(result.current, 'internal_execAgentRuntime')
.mockResolvedValue(undefined);
// Ensure client rejectToolCalling is NOT invoked in server-mode path —
// otherwise the server would see a duplicate halting `reject` before
// this continue signal lands.
const rejectToolCallingSpy = vi
.spyOn(result.current, 'rejectToolCalling')
.mockResolvedValue(undefined);
await act(async () => {
await result.current.rejectAndContinueToolCalling('tool-msg-1', 'too risky');
});
expect(executeGatewayAgentSpy).toHaveBeenCalledWith(
expect.objectContaining({
message: '',
parentMessageId: 'tool-msg-1',
resumeApproval: {
decision: 'rejected_continue',
parentMessageId: 'tool-msg-1',
rejectionReason: 'too risky',
toolCallId: 'call_xyz',
},
}),
);
expect(internal_execAgentRuntimeSpy).not.toHaveBeenCalled();
expect(rejectToolCallingSpy).not.toHaveBeenCalled();
executeGatewayAgentSpy.mockRestore();
});
});
describe('submitToolInteraction', () => {

View file

@ -3,6 +3,7 @@ import { type AgentRuntimeContext } from '@lobechat/agent-runtime';
import { MESSAGE_CANCEL_FLAT } from '@lobechat/const';
import { type ConversationContext } from '@lobechat/types';
import { operationSelectors } from '@/store/chat/slices/operation/selectors';
import { type ChatStore } from '@/store/chat/store';
import { type StoreSetter } from '@/store/types';
@ -28,6 +29,38 @@ export class ConversationControlActionImpl {
this.#get = get;
}
/**
* Detect whether the given context has a running server-mode agent op.
* When true, approve/reject/reject_continue should skip the local
* `internal_execAgentRuntime` path and instead start a **new** Gateway op
* carrying a `resumeApproval` decision mirroring the "interrupt + new op"
* pattern from LOBE-7142 so we don't need to reach into the paused op's
* state to resume it in-place.
*
* Must forward scope/groupId/subAgentId from the incoming context:
* `operationsByContext` is keyed on the full `messageMapKey`, so a
* thread/group/sub-agent conversation whose call site omits these fields
* would silently lookup the 'main' scope bucket, miss the running server
* op, and fall back to client-mode exactly the bug this helper exists
* to prevent.
*/
#hasRunningServerOp = (context: ConversationContext): boolean => {
const { agentId, groupId, scope, subAgentId, topicId, threadId } = context;
if (!agentId) return false;
const ops = operationSelectors.getOperationsByContext({
agentId,
groupId,
scope,
subAgentId,
threadId: threadId ?? null,
topicId: topicId ?? null,
})(this.#get());
return ops.some(
(op) =>
op.type === 'execServerAgentRuntime' && op.status === 'running' && !op.metadata?.isAborting,
);
};
stopGenerateMessage = (): void => {
const { activeAgentId, activeTopicId, cancelOperations } = this.#get();
@ -146,6 +179,43 @@ export class ConversationControlActionImpl {
optimisticContext,
);
// 2.5. Server-mode: start a **new** Gateway op carrying the approval
// decision via `resumeApproval`. The server reads the target tool
// message, persists `intervention=approved`, dispatches the approved
// tool, and streams results back on the new op. No in-place resume of
// the paused op — simpler state + avoids stepIndex races.
if (this.#hasRunningServerOp(effectiveContext)) {
const toolCallId = toolMessage.tool_call_id;
if (!toolCallId) {
console.warn(
'[approveToolCalling][server] tool message missing tool_call_id; skipping resume',
);
completeOperation(operationId);
return;
}
try {
await this.#get().executeGatewayAgent({
context: effectiveContext,
message: '',
parentMessageId: toolMessageId,
resumeApproval: {
decision: 'approved',
parentMessageId: toolMessageId,
toolCallId,
},
});
completeOperation(operationId);
} catch (error) {
const err = error as Error;
console.error('[approveToolCalling][server] Gateway resume failed:', err);
this.#get().failOperation(operationId, {
type: 'approveToolCalling',
message: err.message || 'Unknown error',
});
}
return;
}
// 3. Get current messages for state construction using context
const chatKey = messageMapKey({ agentId, topicId, threadId, scope });
const currentMessages = displayMessageSelectors.getDisplayMessagesByKey(chatKey)(this.#get());
@ -503,6 +573,36 @@ export class ConversationControlActionImpl {
optimisticContext,
);
// Server-mode: start a **new** Gateway op carrying `decision='rejected'`.
// Server persists the rejection on the target tool message and halts the
// conversation. The paused op stays where it was; the new op's
// `agent_runtime_end` clears the loading state on the client.
if (this.#hasRunningServerOp(effectiveContext)) {
const toolCallId = toolMessage.tool_call_id;
if (!toolCallId) {
console.warn(
'[rejectToolCalling][server] tool message missing tool_call_id; skipping resume',
);
completeOperation(operationId);
return;
}
try {
await this.#get().executeGatewayAgent({
context: effectiveContext,
message: '',
parentMessageId: messageId,
resumeApproval: {
decision: 'rejected',
parentMessageId: messageId,
rejectionReason: reason,
toolCallId,
},
});
} catch (error) {
console.error('[rejectToolCalling][server] Gateway resume failed:', error);
}
}
completeOperation(operationId);
};
@ -511,9 +611,6 @@ export class ConversationControlActionImpl {
reason?: string,
context?: ConversationContext,
): Promise<void> => {
// Pass context to rejectToolCalling for proper context isolation
await this.#get().rejectToolCalling(messageId, reason, context);
const toolMessage = dbMessageSelectors.getDbMessageById(messageId)(this.#get());
if (!toolMessage) return;
@ -528,6 +625,75 @@ export class ConversationControlActionImpl {
const { agentId, topicId, threadId, scope } = effectiveContext;
// Server-mode: start a **new** Gateway op with `decision='rejected_continue'`.
// Server persists the rejection on the target tool message and resumes
// the LLM loop with the rejection content surfaced as user feedback.
// Skip the client-mode `rejectToolCalling` chain below — that would fire
// a duplicate halting `reject` before this continue signal.
if (this.#hasRunningServerOp(effectiveContext)) {
const toolCallId = toolMessage.tool_call_id;
if (!toolCallId) {
console.warn(
'[rejectAndContinueToolCalling][server] tool message missing tool_call_id; skipping resume',
);
return;
}
const { operationId } = startOperation({
type: 'rejectToolCalling',
context: {
agentId,
topicId: topicId ?? undefined,
threadId: threadId ?? undefined,
scope,
messageId,
},
});
const optimisticContext = { operationId };
await this.#get().optimisticUpdateMessagePlugin(
messageId,
{ intervention: { rejectedReason: reason, status: 'rejected' } as any },
optimisticContext,
);
const toolContent = reason
? `User reject this tool calling with reason: ${reason}`
: 'User reject this tool calling without reason';
await this.#get().optimisticUpdateMessageContent(
messageId,
toolContent,
undefined,
optimisticContext,
);
try {
await this.#get().executeGatewayAgent({
context: effectiveContext,
message: '',
parentMessageId: messageId,
resumeApproval: {
decision: 'rejected_continue',
parentMessageId: messageId,
rejectionReason: reason,
toolCallId,
},
});
completeOperation(operationId);
} catch (error) {
const err = error as Error;
console.error('[rejectAndContinueToolCalling][server] Gateway resume failed:', err);
this.#get().failOperation(operationId, {
type: 'rejectToolCalling',
message: err.message || 'Unknown error',
});
}
return;
}
// Client-mode path: reject first (persists rejection + updates content),
// then spin up a local runtime with phase='user_input' to continue.
await this.#get().rejectToolCalling(messageId, reason, context);
// Create an operation to manage the continue execution
const { operationId } = startOperation({
type: 'rejectToolCalling',

View file

@ -7,7 +7,7 @@ import type {
ConnectionStatus,
} from '@/libs/agent-stream';
import { AgentStreamClient } from '@/libs/agent-stream/client';
import { aiAgentService } from '@/services/aiAgent';
import { aiAgentService, type ResumeApprovalParam } from '@/services/aiAgent';
import { messageService } from '@/services/message';
import { topicService } from '@/services/topic';
import type { ChatStore } from '@/store/chat/store';
@ -217,8 +217,15 @@ export class GatewayActionImpl {
onComplete?: () => void;
/** Parent message ID for regeneration/continue (skip user message creation, branch from this message) */
parentMessageId?: string;
/**
* Resume a paused op waiting on `human_approve_required`. Forwarded to
* `aiAgentService.execAgentTask` so the new server-side op knows to apply
* the user's decision to the target tool message instead of starting from
* a fresh user prompt.
*/
resumeApproval?: ResumeApprovalParam;
}): Promise<ExecAgentResult> => {
const { context, fileIds, message, onComplete, parentMessageId } = params;
const { context, fileIds, message, onComplete, parentMessageId, resumeApproval } = params;
const agentGatewayUrl =
window.global_serverConfigStore!.getState().serverConfig.agentGatewayUrl!;
@ -240,6 +247,7 @@ export class GatewayActionImpl {
fileIds,
parentMessageId,
prompt: message,
resumeApproval,
});
// If server created a new topic, fetch messages first then switch topic
@ -266,9 +274,13 @@ export class GatewayActionImpl {
this.#get().internal_updateTopicLoading(result.topicId, true);
}
// Create a dedicated operation for gateway execution with correct context
// Create a dedicated operation for gateway execution with correct context.
// Stash the server operation id in metadata so human-intervention flows
// (approve/reject/reject_continue) can look it up and call the server
// without needing an out-of-band lookup.
const { operationId: gatewayOpId } = this.#get().startOperation({
context: execContext,
metadata: { serverOperationId: result.operationId },
type: 'execServerAgentRuntime',
});
@ -347,9 +359,11 @@ export class GatewayActionImpl {
topicId,
};
// Create a local operation for UI loading state
// Create a local operation for UI loading state, stashing the server op id
// so intervention flows can find it after reconnect as well.
const { operationId: gatewayOpId } = this.#get().startOperation({
context,
metadata: { serverOperationId: operationId },
type: 'execServerAgentRuntime',
});

View file

@ -136,6 +136,15 @@ export const createGatewayEventHandler = (
currentAssistantMessageId,
data.toolsCalling.map(() => true),
);
// If the server attached a `toolMessageIds` map, it has persisted
// pending tool messages (human approval path). Fetch the latest
// messages so ApprovalActions can read them by id instead of
// waiting for `agent_runtime_end` (which won't fire while paused
// in `waiting_for_human`).
if ((data as any).toolMessageIds) {
fetchAndReplaceMessages(get, context).catch(console.error);
}
}
});
break;

View file

@ -1,5 +1,5 @@
import { type ChatStoreState } from '@/store/chat/initialState';
import { messageMapKey } from '@/store/chat/utils/messageMapKey';
import { messageMapKey, type MessageMapKeyInput } from '@/store/chat/utils/messageMapKey';
import { type Operation, type OperationType } from './types';
import { AI_RUNTIME_OPERATION_TYPES, INPUT_LOADING_OPERATION_TYPES } from './types';
@ -165,13 +165,19 @@ const getCurrentOperationProgress = (s: ChatStoreState): number | undefined => {
};
/**
* Get operations by context (agentId, topicId, threadId)
* Useful for filtering operations for a specific conversation context
* Get operations by context (agentId, topicId, threadId, scope, groupId, subAgentId).
*
* Operations are indexed by `operationsByContext` under the full `messageMapKey`,
* which keys on scope/group/subAgent in addition to agent+topic. Callers that
* live inside a group or thread/sub-agent conversation MUST pass the matching
* scope/group info omitting them computes the 'main' scope key, which silently
* returns an empty list and causes flows like approve/reject to fall back to the
* wrong branch. Same-shape input as messageMapKey for consistency.
*/
const getOperationsByContext =
(context: { agentId: string; threadId?: string | null; topicId?: string | null }) =>
(context: MessageMapKeyInput) =>
(s: ChatStoreState): Operation[] => {
const contextKey = messageMapKey({ agentId: context.agentId, topicId: context.topicId });
const contextKey = messageMapKey(context);
const operationIds = s.operationsByContext[contextKey] || [];
return operationIds
.map((id) => s.operations[id])
@ -189,7 +195,7 @@ const getOperationsByContext =
* Use this for loading states in components that display a specific conversation
*/
const hasRunningOperationByContext =
(context: { agentId: string; threadId?: string | null; topicId?: string | null }) =>
(context: MessageMapKeyInput) =>
(s: ChatStoreState): boolean => {
const operations = getOperationsByContext(context)(s);
return operations.some((op) => op.status === 'running' && !op.metadata.isAborting);