feat(agent-runtime): server-side human approval flow (#13829)

*  feat(agent-runtime): implement server-side human approval flow

Port the client-mode human approval executors (request_human_approve,
call_tool resumption, handleHumanIntervention) to the server agent
runtime so that execServerAgentRuntime can correctly pause on
waiting_for_human and resume on approve / reject / reject_continue.

- request_human_approve now creates one `role='tool'` message per pending
  tool call with `pluginIntervention: { status: 'pending' }` and ships
  the `{ toolCallId → toolMessageId }` mapping on the `tools_calling`
  stream chunk.
- call_tool gains a `skipCreateToolMessage` branch that updates the
  pre-existing tool message in-place (prevents duplicate rows / parent_id
  FK violations that show up as LOBE-7154 errors).
- AgentRuntimeService.handleHumanIntervention implements all three
  paths: approve → `phase: 'human_approved_tool'`; reject → interrupted
  with `reason: 'human_rejected'`; reject_continue → `phase: 'user_input'`.
- ProcessHumanIntervention schema carries `toolMessageId` and a new
  `reject_continue` action; schema remains permissive (handler no-ops on
  missing toolMessageId) to keep legacy callers working.

Fixes LOBE-7151

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix(agent-runtime): address LOBE-7151 review (P1 reject_continue, P2 duplicate tool msg)

P1 — reject_continue with remaining pending tools must NOT resume the LLM.
Previously `handleHumanIntervention` kept `status='waiting_for_human'` but
returned `nextContext: { phase: 'user_input' }`, which `executeStep` would
hand to `runtime.step` immediately, breaking batch semantics. Now when
other tools are still pending, the rejection is persisted but no context
is returned; the `user_input` continuation only fires when this is the
last pending tool.

P2 — request_human_approve was pushing an empty placeholder
`{ role: 'tool', tool_call_id, content: '' }` into `newState.messages`
to "reflect" the newly-created pending DB row. On resume, the `call_tool`
skip-create path appends the real tool result, leaving two entries for
the same `tool_call_id` in runtime state. The downstream short-circuit
(`phase=human_approved_tool` → `call_tool`) doesn't consult
state.messages, so the placeholder was unused cost. Removed.

Also fixes a TS 2339 in the skipCreateToolMessage test where
`nextContext.payload` is typed `{}` and needed an explicit cast.

Tests: 99 pass (82 RuntimeExecutors + 17 handleHumanIntervention), type-check clean.
Verified end-to-end via the human-approval eval — it now exercises a
multi-turn retry path (LLM calls the gated tool twice) and both
approvals resolve cleanly through to `completionReason=done`.

Relates to LOBE-7151

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* pin @react-pdf/renderer

* 🐛 fix(deps): pin @react-pdf/image to 3.0.4 to avoid privatized @react-pdf/svg

@react-pdf/image@3.1.0 (auto-resolved via layout@4.6.0 ← renderer@4.4.1)
declares `@react-pdf/svg@^1.1.0` as a dependency, but the svg package was
unpublished/made private on npm (returns 404). CI installs blow up with
ERR_PNPM_FETCH_404.

Upstream issue: https://github.com/diegomura/react-pdf/issues/3377

Pin image to 3.0.4 (the last release before the broken svg dep was
introduced) via pnpm.overrides until react-pdf publishes a fix.

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu 2026-04-15 11:07:06 +08:00 committed by GitHub
parent f12cf8f2ea
commit 9f61b58a29
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 949 additions and 38 deletions

View file

@ -280,7 +280,7 @@
"@opentelemetry/resources": "^2.2.0", "@opentelemetry/resources": "^2.2.0",
"@opentelemetry/sdk-metrics": "^2.2.0", "@opentelemetry/sdk-metrics": "^2.2.0",
"@opentelemetry/winston-transport": "^0.19.0", "@opentelemetry/winston-transport": "^0.19.0",
"@react-pdf/renderer": "^4.3.2", "@react-pdf/renderer": "4.4.1",
"@react-three/drei": "^10.7.7", "@react-three/drei": "^10.7.7",
"@react-three/fiber": "^9.5.0", "@react-three/fiber": "^9.5.0",
"@saintno/comfyui-sdk": "^0.2.49", "@saintno/comfyui-sdk": "^0.2.49",
@ -539,6 +539,7 @@
"ffmpeg-static" "ffmpeg-static"
], ],
"overrides": { "overrides": {
"@react-pdf/image": "3.0.4",
"@types/react": "19.2.13", "@types/react": "19.2.13",
"better-auth": "1.4.6", "better-auth": "1.4.6",
"better-call": "1.1.8", "better-call": "1.1.8",

View file

@ -240,6 +240,13 @@ export interface AgentInstructionCallLlm extends AgentInstructionBase {
export interface AgentInstructionCallTool extends AgentInstructionBase { export interface AgentInstructionCallTool extends AgentInstructionBase {
payload: { payload: {
parentMessageId: string; parentMessageId: string;
/**
* When true, the runtime is resuming execution for a previously pending
* tool call (e.g. after human approval). The executor must NOT insert a
* new tool message; instead it updates the existing one referenced by
* `parentMessageId` with the tool result.
*/
skipCreateToolMessage?: boolean;
toolCalling: ChatToolPayload; toolCalling: ChatToolPayload;
}; };
type: 'call_tool'; type: 'call_tool';

View file

@ -32,6 +32,8 @@ export async function POST(request: NextRequest) {
humanInput, humanInput,
approvedToolCall, approvedToolCall,
rejectionReason, rejectionReason,
rejectAndContinue,
toolMessageId,
} = body; } = body;
if (!operationId) { if (!operationId) {
@ -60,8 +62,10 @@ export async function POST(request: NextRequest) {
externalRetryCount, externalRetryCount,
humanInput, humanInput,
operationId, operationId,
rejectAndContinue,
rejectionReason, rejectionReason,
stepIndex, stepIndex,
toolMessageId,
}); });
// Step is currently being executed by another instance — tell QStash to retry later // Step is currently being executed by another instance — tell QStash to retry later

View file

@ -1467,25 +1467,44 @@ export const createRuntimeExecutors = (
type: 'tool_end', type: 'tool_end',
}); });
// Finally update database // Finally persist to database. In resumption mode (skipCreateToolMessage),
// the pending tool message already exists from request_human_approve, so
// we update it in-place rather than inserting a new row — inserting would
// either duplicate the tool_call_id or violate parent_id FK (LOBE-7154).
let toolMessageId: string | undefined; let toolMessageId: string | undefined;
try { try {
const toolMessage = await ctx.messageModel.create({ if (payload.skipCreateToolMessage) {
agentId: state.metadata!.agentId!, toolMessageId = payload.parentMessageId;
content: executionResult.content, await ctx.messageModel.updateToolMessage(toolMessageId, {
metadata: { toolExecutionTimeMs: executionTime }, content: executionResult.content,
parentId: payload.parentMessageId, metadata: { toolExecutionTimeMs: executionTime },
plugin: chatToolPayload as any, pluginError: executionResult.error,
pluginError: executionResult.error, pluginState: executionResult.state,
pluginState: executionResult.state, });
role: 'tool', log(
threadId: state.metadata?.threadId, '[%s:%d] Updated existing tool message %s (skipCreateToolMessage)',
tool_call_id: chatToolPayload.id, operationId,
topicId: state.metadata?.topicId, stepIndex,
}); toolMessageId,
toolMessageId = toolMessage.id; );
} else {
const toolMessage = await ctx.messageModel.create({
agentId: state.metadata!.agentId!,
content: executionResult.content,
metadata: { toolExecutionTimeMs: executionTime },
parentId: payload.parentMessageId,
plugin: chatToolPayload as any,
pluginError: executionResult.error,
pluginState: executionResult.state,
role: 'tool',
threadId: state.metadata?.threadId,
tool_call_id: chatToolPayload.id,
topicId: state.metadata?.topicId,
});
toolMessageId = toolMessage.id;
}
} catch (error) { } catch (error) {
console.error('[StreamingToolExecutor] Failed to create tool message: %O', error); console.error('[StreamingToolExecutor] Failed to persist tool message: %O', error);
// Normalize BEFORE publishing so clients (which treat `error` stream // Normalize BEFORE publishing so clients (which treat `error` stream
// events as terminal and surface `event.data.error` directly) see the // events as terminal and surface `event.data.error` directly) see the
// typed business error, not the raw SQL / driver text. // typed business error, not the raw SQL / driver text.
@ -2054,9 +2073,18 @@ export const createRuntimeExecutors = (
/** /**
* Human approval * Human approval
*
* Mirrors the client executor (`createAgentExecutors.ts:1072-1177`):
* - Creates one `role='tool'` message per pending tool call with
* `pluginIntervention: { status: 'pending' }` so approval UI has a target.
* - When `skipCreateToolMessage` is true (resumption via `/run` after a
* previous op already persisted them), skip creation.
* - Publishes the `toolCallId -> toolMessageId` mapping alongside the
* `tools_calling` stream chunk so the client can hydrate its local
* message map without waiting for `agent_runtime_end`.
*/ */
request_human_approve: async (instruction, state) => { request_human_approve: async (instruction, state) => {
const { pendingToolsCalling } = instruction as Extract< const { pendingToolsCalling, skipCreateToolMessage } = instruction as Extract<
AgentInstruction, AgentInstruction,
{ type: 'request_human_approve' } { type: 'request_human_approve' }
>; >;
@ -2080,12 +2108,122 @@ export const createRuntimeExecutors = (
newState.status = 'waiting_for_human'; newState.status = 'waiting_for_human';
newState.pendingToolsCalling = pendingToolsCalling; newState.pendingToolsCalling = pendingToolsCalling;
// Notify frontend to display approval UI through streaming system // Map of toolCallId -> toolMessageId, populated either by creating fresh
// pending tool messages or (in resumption mode) by looking up existing ones.
const toolMessageIds: Record<string, string> = {};
if (skipCreateToolMessage) {
// Resumption mode: tool messages already exist in DB. Look them up by
// tool_call_id so we can still ship the mapping to the client.
log('[%s:%d] Resuming with existing tool messages', operationId, stepIndex);
try {
const dbMessages = await ctx.messageModel.query({
agentId: state.metadata?.agentId,
threadId: state.metadata?.threadId,
topicId: state.metadata?.topicId,
});
for (const toolPayload of pendingToolsCalling) {
const existing = dbMessages.find(
(m: any) => m.role === 'tool' && m.tool_call_id === toolPayload.id,
);
if (existing) {
toolMessageIds[toolPayload.id] = existing.id;
}
}
} catch (error) {
console.error(
'[%s:%d] Failed to look up existing tool messages: %O',
operationId,
stepIndex,
error,
);
}
} else {
// Find parent assistant message. Prefer state.messages (already in
// memory from call_llm); fall back to DB query if the runtime has been
// rehydrated without recent messages.
let parentAssistantId: string | undefined = (state.messages ?? [])
.slice()
.reverse()
.find((m: any) => m.role === 'assistant' && m.id)?.id;
if (!parentAssistantId) {
try {
const dbMessages = await ctx.messageModel.query({
agentId: state.metadata?.agentId,
threadId: state.metadata?.threadId,
topicId: state.metadata?.topicId,
});
parentAssistantId = dbMessages
.slice()
.reverse()
.find((m: any) => m.role === 'assistant')?.id;
} catch (error) {
console.error(
'[%s:%d] Failed to query DB for parent assistant: %O',
operationId,
stepIndex,
error,
);
}
}
if (!parentAssistantId) {
throw new Error(
`[request_human_approve] No assistant message found as parent for pending tool messages (op=${operationId})`,
);
}
for (const toolPayload of pendingToolsCalling) {
const toolName = `${toolPayload.identifier}/${toolPayload.apiName}`;
try {
const toolMessage = await ctx.messageModel.create({
agentId: state.metadata!.agentId!,
content: '',
parentId: parentAssistantId,
plugin: toolPayload as any,
pluginIntervention: { status: 'pending' },
role: 'tool',
threadId: state.metadata?.threadId,
tool_call_id: toolPayload.id,
topicId: state.metadata?.topicId,
});
toolMessageIds[toolPayload.id] = toolMessage.id;
// Intentionally DO NOT push the empty placeholder into
// newState.messages. When the approval resumes, the `call_tool`
// executor (skip-create branch) appends the resolved tool message
// to state.messages itself. Pushing a placeholder here produced
// two entries for the same tool_call_id — see LOBE-7151 review P2.
log(
'[%s:%d] Created pending tool message %s for %s',
operationId,
stepIndex,
toolMessage.id,
toolName,
);
} catch (error) {
console.error(
'[%s:%d] Failed to create pending tool message for %s: %O',
operationId,
stepIndex,
toolName,
error,
);
throw error;
}
}
}
// Notify frontend to display approval UI through streaming system.
// `toolMessageIds` is a new optional field; legacy consumers ignore it.
await streamManager.publishStreamChunk(operationId, stepIndex, { await streamManager.publishStreamChunk(operationId, stepIndex, {
// Use operationId as messageId
chunkType: 'tools_calling', chunkType: 'tools_calling',
toolMessageIds,
toolsCalling: pendingToolsCalling as any, toolsCalling: pendingToolsCalling as any,
}); } as any);
const events: AgentEvent[] = [ const events: AgentEvent[] = [
{ {

View file

@ -68,6 +68,7 @@ describe('RuntimeExecutors', () => {
findById: vi.fn().mockResolvedValue({ id: 'msg-existing' }), findById: vi.fn().mockResolvedValue({ id: 'msg-existing' }),
query: vi.fn().mockResolvedValue([]), query: vi.fn().mockResolvedValue([]),
update: vi.fn().mockResolvedValue({}), update: vi.fn().mockResolvedValue({}),
updateToolMessage: vi.fn().mockResolvedValue({ success: true }),
}; };
mockStreamManager = { mockStreamManager = {
@ -1657,6 +1658,294 @@ describe('RuntimeExecutors', () => {
expect(mockToolExecutionService.executeTool).toHaveBeenCalledTimes(2); expect(mockToolExecutionService.executeTool).toHaveBeenCalledTimes(2);
}); });
describe('skipCreateToolMessage (resumption after human approval)', () => {
it('should update existing tool message instead of creating a new one', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'pending-tool-msg-1',
skipCreateToolMessage: true,
toolCalling: {
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
},
type: 'call_tool' as const,
};
await executors.call_tool!(instruction, state);
expect(mockMessageModel.create).not.toHaveBeenCalled();
expect(mockMessageModel.updateToolMessage).toHaveBeenCalledWith(
'pending-tool-msg-1',
expect.objectContaining({
content: 'Tool result',
metadata: { toolExecutionTimeMs: 100 },
}),
);
});
it('should return the existing toolMessageId as parentMessageId for the next LLM step', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'pending-tool-msg-42',
skipCreateToolMessage: true,
toolCalling: {
apiName: 'search',
arguments: '{}',
id: 'tool-call-42',
identifier: 'web-search',
type: 'default' as const,
},
},
type: 'call_tool' as const,
};
const result = await executors.call_tool!(instruction, state);
const nextPayload = result.nextContext?.payload as { parentMessageId?: string } | undefined;
expect(nextPayload?.parentMessageId).toBe('pending-tool-msg-42');
});
it('should fall back to creating a new tool message when skipCreateToolMessage is false', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-7',
skipCreateToolMessage: false,
toolCalling: {
apiName: 'search',
arguments: '{}',
id: 'tool-call-7',
identifier: 'web-search',
type: 'default' as const,
},
},
type: 'call_tool' as const,
};
await executors.call_tool!(instruction, state);
expect(mockMessageModel.create).toHaveBeenCalledTimes(1);
expect(mockMessageModel.updateToolMessage).not.toHaveBeenCalled();
});
});
});
describe('request_human_approve executor', () => {
const createMockState = (overrides?: Partial<AgentState>): AgentState => ({
cost: createMockCost(),
createdAt: new Date().toISOString(),
lastModified: new Date().toISOString(),
maxSteps: 100,
messages: [
{
content: 'assistant response',
id: 'assistant-msg-1',
role: 'assistant',
} as any,
],
metadata: {
agentId: 'agent-123',
threadId: 'thread-123',
topicId: 'topic-123',
},
operationId: 'op-123',
status: 'running',
stepCount: 0,
toolManifestMap: {},
usage: createMockUsage(),
...overrides,
});
const makePendingTools = () => [
{
apiName: 'search',
arguments: '{"q":"test"}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
{
apiName: 'write',
arguments: '{"file":"a.md"}',
id: 'tool-call-2',
identifier: 'local-system',
type: 'default' as const,
},
];
it('should create a pending tool message for each pendingToolsCalling', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
mockMessageModel.create
.mockResolvedValueOnce({ id: 'tool-msg-1' })
.mockResolvedValueOnce({ id: 'tool-msg-2' });
const instruction = {
pendingToolsCalling: makePendingTools(),
type: 'request_human_approve' as const,
};
await executors.request_human_approve!(instruction, state);
expect(mockMessageModel.create).toHaveBeenCalledTimes(2);
expect(mockMessageModel.create).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
agentId: 'agent-123',
content: '',
parentId: 'assistant-msg-1',
pluginIntervention: { status: 'pending' },
role: 'tool',
tool_call_id: 'tool-call-1',
topicId: 'topic-123',
}),
);
expect(mockMessageModel.create).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
parentId: 'assistant-msg-1',
pluginIntervention: { status: 'pending' },
tool_call_id: 'tool-call-2',
}),
);
});
it('should set state to waiting_for_human and copy pendingToolsCalling', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
mockMessageModel.create
.mockResolvedValueOnce({ id: 'tool-msg-1' })
.mockResolvedValueOnce({ id: 'tool-msg-2' });
const pending = makePendingTools();
const result = await executors.request_human_approve!(
{ pendingToolsCalling: pending, type: 'request_human_approve' as const },
state,
);
expect(result.newState.status).toBe('waiting_for_human');
expect(result.newState.pendingToolsCalling).toEqual(pending);
});
it('should publish tools_calling chunk with toolMessageIds mapping', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
mockMessageModel.create
.mockResolvedValueOnce({ id: 'tool-msg-1' })
.mockResolvedValueOnce({ id: 'tool-msg-2' });
await executors.request_human_approve!(
{
pendingToolsCalling: makePendingTools(),
type: 'request_human_approve' as const,
},
state,
);
const chunkCall = mockStreamManager.publishStreamChunk.mock.calls.find(
(call: any[]) => call[2]?.chunkType === 'tools_calling',
);
expect(chunkCall).toBeTruthy();
expect(chunkCall![2].toolMessageIds).toEqual({
'tool-call-1': 'tool-msg-1',
'tool-call-2': 'tool-msg-2',
});
});
it('should skip message creation when skipCreateToolMessage is true', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
mockMessageModel.query.mockResolvedValueOnce([
{ id: 'existing-tool-1', role: 'tool', tool_call_id: 'tool-call-1' },
{ id: 'existing-tool-2', role: 'tool', tool_call_id: 'tool-call-2' },
]);
await executors.request_human_approve!(
{
pendingToolsCalling: makePendingTools(),
skipCreateToolMessage: true,
type: 'request_human_approve' as const,
},
state,
);
expect(mockMessageModel.create).not.toHaveBeenCalled();
const chunkCall = mockStreamManager.publishStreamChunk.mock.calls.find(
(call: any[]) => call[2]?.chunkType === 'tools_calling',
);
expect(chunkCall![2].toolMessageIds).toEqual({
'tool-call-1': 'existing-tool-1',
'tool-call-2': 'existing-tool-2',
});
});
it('should throw if no parent assistant message can be found', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState({ messages: [] });
mockMessageModel.query.mockResolvedValueOnce([]);
await expect(
executors.request_human_approve!(
{
pendingToolsCalling: makePendingTools(),
type: 'request_human_approve' as const,
},
state,
),
).rejects.toThrow(/No assistant message found/);
});
it('should emit human_approve_required and tool_pending events', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
mockMessageModel.create
.mockResolvedValueOnce({ id: 'tool-msg-1' })
.mockResolvedValueOnce({ id: 'tool-msg-2' });
const result = await executors.request_human_approve!(
{
pendingToolsCalling: makePendingTools(),
type: 'request_human_approve' as const,
},
state,
);
expect(result.events).toContainEqual(
expect.objectContaining({ type: 'human_approve_required' }),
);
expect(result.events).toContainEqual(expect.objectContaining({ type: 'tool_pending' }));
});
it('should NOT return a nextContext (operation pauses)', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
mockMessageModel.create
.mockResolvedValueOnce({ id: 'tool-msg-1' })
.mockResolvedValueOnce({ id: 'tool-msg-2' });
const result = await executors.request_human_approve!(
{
pendingToolsCalling: makePendingTools(),
type: 'request_human_approve' as const,
},
state,
);
expect(result.nextContext).toBeUndefined();
});
}); });
describe('call_tools_batch executor', () => { describe('call_tools_batch executor', () => {

View file

@ -43,7 +43,7 @@ const GetOperationStatusSchema = z.object({
}); });
const ProcessHumanInterventionSchema = z.object({ const ProcessHumanInterventionSchema = z.object({
action: z.enum(['approve', 'reject', 'input', 'select']), action: z.enum(['approve', 'reject', 'reject_continue', 'input', 'select']),
data: z data: z
.object({ .object({
approvedToolCall: z.any().optional(), approvedToolCall: z.any().optional(),
@ -54,6 +54,13 @@ const ProcessHumanInterventionSchema = z.object({
operationId: z.string(), operationId: z.string(),
reason: z.string().optional(), reason: z.string().optional(),
stepIndex: z.number().optional().default(0), stepIndex: z.number().optional().default(0),
/**
* ID of the pending `role='tool'` message targeted by this intervention.
* Required for approve / reject / reject_continue so the server can update
* the message's intervention status, content, and on approve hand the
* id to the `call_tool` short-circuit via `skipCreateToolMessage`.
*/
toolMessageId: z.string().optional(),
}); });
const GetPendingInterventionsSchema = z const GetPendingInterventionsSchema = z
@ -1079,7 +1086,7 @@ export const aiAgentRouter = router({
processHumanIntervention: aiAgentProcedure processHumanIntervention: aiAgentProcedure
.input(ProcessHumanInterventionSchema) .input(ProcessHumanInterventionSchema)
.mutation(async ({ input, ctx }) => { .mutation(async ({ input, ctx }) => {
const { operationId, action, data, reason, stepIndex } = input; const { operationId, action, data, reason, stepIndex, toolMessageId } = input;
log(`Processing ${action} for operation ${operationId}`); log(`Processing ${action} for operation ${operationId}`);
@ -1088,6 +1095,7 @@ export const aiAgentRouter = router({
action, action,
operationId, operationId,
stepIndex, stepIndex,
toolMessageId,
}; };
switch (action) { switch (action) {
@ -1099,10 +1107,16 @@ export const aiAgentRouter = router({
}); });
} }
interventionParams.approvedToolCall = data.approvedToolCall; interventionParams.approvedToolCall = data.approvedToolCall;
// toolMessageId is required for the server to persist the
// intervention + short-circuit into call_tool; the handler itself
// no-ops when missing, so keep the schema permissive for legacy
// callers that haven't been updated yet.
break; break;
} }
case 'reject': { case 'reject':
case 'reject_continue': {
interventionParams.rejectionReason = reason || 'Tool call rejected by user'; interventionParams.rejectionReason = reason || 'Tool call rejected by user';
interventionParams.rejectAndContinue = action === 'reject_continue';
break; break;
} }
case 'input': { case 'input': {

View file

@ -414,6 +414,8 @@ export class AgentRuntimeService {
humanInput, humanInput,
approvedToolCall, approvedToolCall,
rejectionReason, rejectionReason,
rejectAndContinue,
toolMessageId,
externalRetryCount = 0, externalRetryCount = 0,
} = params; } = params;
@ -535,7 +537,9 @@ export class AgentRuntimeService {
const interventionResult = await this.handleHumanIntervention(runtime, currentState, { const interventionResult = await this.handleHumanIntervention(runtime, currentState, {
approvedToolCall, approvedToolCall,
humanInput, humanInput,
rejectAndContinue,
rejectionReason, rejectionReason,
toolMessageId,
}); });
currentState = interventionResult.newState; currentState = interventionResult.newState;
currentContext = interventionResult.nextContext; currentContext = interventionResult.nextContext;
@ -1369,15 +1373,25 @@ export class AgentRuntimeService {
* Process human intervention * Process human intervention
*/ */
async processHumanIntervention(params: { async processHumanIntervention(params: {
action: 'approve' | 'reject' | 'input' | 'select'; action: 'approve' | 'reject' | 'reject_continue' | 'input' | 'select';
approvedToolCall?: any; approvedToolCall?: any;
humanInput?: any; humanInput?: any;
operationId: string; operationId: string;
rejectAndContinue?: boolean;
rejectionReason?: string; rejectionReason?: string;
stepIndex: number; stepIndex: number;
toolMessageId?: string;
}): Promise<{ messageId?: string }> { }): Promise<{ messageId?: string }> {
const { operationId, stepIndex, action, approvedToolCall, humanInput, rejectionReason } = const {
params; operationId,
stepIndex,
action,
approvedToolCall,
humanInput,
rejectAndContinue,
rejectionReason,
toolMessageId,
} = params;
try { try {
log( log(
@ -1395,7 +1409,13 @@ export class AgentRuntimeService {
delay: 100, delay: 100,
endpoint: `${this.baseURL}/run`, endpoint: `${this.baseURL}/run`,
operationId, operationId,
payload: { approvedToolCall, humanInput, rejectionReason }, payload: {
approvedToolCall,
humanInput,
rejectAndContinue,
rejectionReason,
toolMessageId,
},
priority: 'high', priority: 'high',
stepIndex, stepIndex,
}); });
@ -1534,23 +1554,130 @@ export class AgentRuntimeService {
} }
/** /**
* Handle human intervention logic * Handle human intervention logic.
*
* Mirrors the client-side flow in `conversationControl.ts`:
* - `approveToolCalling` write intervention=approved, resume via
* `phase: 'human_approved_tool'` so the runtime short-circuits into
* `call_tool` with `skipCreateToolMessage: true`.
* - `rejectToolCalling` write intervention=rejected and halt
* (`status='interrupted'`, `interruption.reason='human_rejected'`).
* - `rejectAndContinueToolCalling` write intervention=rejected and
* resume via `phase: 'user_input'` so the next LLM call treats the
* rejection content as user feedback.
*/ */
private async handleHumanIntervention( private async handleHumanIntervention(
runtime: AgentRuntime, runtime: AgentRuntime,
state: any, state: any,
intervention: { approvedToolCall?: any; humanInput?: any; rejectionReason?: string }, intervention: {
approvedToolCall?: any;
humanInput?: any;
rejectAndContinue?: boolean;
rejectionReason?: string;
toolMessageId?: string;
},
) { ) {
const { humanInput, approvedToolCall, rejectionReason } = intervention; const { humanInput, approvedToolCall, rejectAndContinue, rejectionReason, toolMessageId } =
intervention;
// ---- A. approve ----
if (approvedToolCall && state.status === 'waiting_for_human') { if (approvedToolCall && state.status === 'waiting_for_human') {
// TODO: implement approveToolCall logic if (!toolMessageId) {
return { newState: state, nextContext: undefined }; log('[handleHumanIntervention] approve requires toolMessageId, got undefined');
} else if (rejectionReason && state.status === 'waiting_for_human') { return { newState: state, nextContext: undefined };
// TODO: implement rejectToolCall logic }
return { newState: state, nextContext: undefined };
} else if (humanInput) { await this.messageModel.updateMessagePlugin(toolMessageId, {
// TODO: implement processHumanInput logic intervention: { status: 'approved' },
});
const newState = structuredClone(state);
newState.lastModified = new Date().toISOString();
newState.pendingToolsCalling = (state.pendingToolsCalling ?? []).filter(
(t: any) => t.id !== approvedToolCall.id,
);
// Keep waiting_for_human while other tools remain pending; resume to
// running when this was the last one.
newState.status = newState.pendingToolsCalling.length > 0 ? 'waiting_for_human' : 'running';
const nextContext: AgentRuntimeContext = {
payload: {
approvedToolCall,
parentMessageId: toolMessageId,
skipCreateToolMessage: true,
},
phase: 'human_approved_tool',
};
return { newState, nextContext };
}
// ---- B / C. reject ----
if (rejectionReason && state.status === 'waiting_for_human') {
if (!toolMessageId) {
log('[handleHumanIntervention] reject requires toolMessageId, got undefined');
return { newState: state, nextContext: undefined };
}
const rejectionContent = rejectionReason
? `User reject this tool calling with reason: ${rejectionReason}`
: 'User reject this tool calling without reason';
await this.messageModel.updateToolMessage(toolMessageId, { content: rejectionContent });
await this.messageModel.updateMessagePlugin(toolMessageId, {
intervention: { rejectedReason: rejectionReason, status: 'rejected' },
});
// Find the tool_call_id for this tool message so we can drop it from
// pendingToolsCalling. pendingToolsCalling holds ChatToolPayload[] whose
// id === tool_call_id; the mapping lives in messagePlugins (plugin id
// === message id, toolCallId is a separate column).
let rejectedToolCallId: string | undefined;
try {
const plugin = await this.serverDB.query.messagePlugins.findFirst({
where: (mp: any, { eq }: any) => eq(mp.id, toolMessageId),
});
rejectedToolCallId = (plugin as any)?.toolCallId ?? undefined;
} catch (error) {
log('[handleHumanIntervention] failed to look up tool plugin: %O', error);
}
const newState = structuredClone(state);
newState.lastModified = new Date().toISOString();
newState.pendingToolsCalling = rejectedToolCallId
? (state.pendingToolsCalling ?? []).filter((t: any) => t.id !== rejectedToolCallId)
: (state.pendingToolsCalling ?? []);
if (rejectAndContinue) {
// C: persist the rejection, then either (a) wait for the remaining
// pending tools to be resolved or (b) resume LLM once this is the
// last one. Returning a `phase: 'user_input'` nextContext while
// pendingToolsCalling is non-empty would cause executeStep to run
// runtime.step immediately, resuming the LLM with an unresolved
// batch — see LOBE-7151 review P1.
if (newState.pendingToolsCalling.length > 0) {
newState.status = 'waiting_for_human';
return { newState, nextContext: undefined };
}
newState.status = 'running';
const nextContext: AgentRuntimeContext = { phase: 'user_input' };
return { newState, nextContext };
}
// B: halt. Use interrupted + reason='human_rejected' to reuse the
// existing terminal-state plumbing (early-exit, completion hooks, etc).
newState.status = 'interrupted';
newState.interruption = {
canResume: false,
interruptedAt: new Date().toISOString(),
reason: 'human_rejected',
};
return { newState, nextContext: undefined };
}
// ---- human_prompt / human_select (submitToolInteraction) — out of scope
// for this change; wire up in a follow-up issue.
if (humanInput) {
return { newState: state, nextContext: undefined }; return { newState: state, nextContext: undefined };
} }

View file

@ -0,0 +1,323 @@
// @vitest-environment node
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { AgentRuntimeService } from '../AgentRuntimeService';
// Mock heavy dependencies
vi.mock('@/envs/app', () => ({ appEnv: { APP_URL: 'http://localhost:3010' } }));
vi.mock('@/database/models/message', () => ({
MessageModel: vi.fn().mockImplementation(() => ({
updateMessagePlugin: vi.fn().mockResolvedValue(undefined),
updateToolMessage: vi.fn().mockResolvedValue({ success: true }),
})),
}));
vi.mock('@/server/modules/AgentRuntime', () => ({
AgentRuntimeCoordinator: vi.fn().mockImplementation(() => ({})),
createStreamEventManager: vi.fn(() => ({})),
}));
vi.mock('@/server/modules/AgentRuntime/RuntimeExecutors', () => ({
createRuntimeExecutors: vi.fn(() => ({})),
}));
vi.mock('@/server/services/mcp', () => ({ mcpService: {} }));
vi.mock('@/server/services/queue', () => ({
QueueService: vi.fn().mockImplementation(() => ({})),
}));
vi.mock('@/server/services/queue/impls', () => ({ LocalQueueServiceImpl: class {} }));
vi.mock('@/server/services/toolExecution', () => ({
ToolExecutionService: vi.fn().mockImplementation(() => ({})),
}));
vi.mock('@/server/services/toolExecution/builtin', () => ({
BuiltinToolsExecutor: vi.fn().mockImplementation(() => ({})),
}));
vi.mock('@lobechat/builtin-tools/dynamicInterventionAudits', () => ({
dynamicInterventionAudits: [],
}));
describe('AgentRuntimeService.handleHumanIntervention', () => {
let service: AgentRuntimeService;
let mockMessageModel: any;
let mockDBPluginQuery: any;
const makeState = (overrides: Record<string, any> = {}) => ({
lastModified: new Date().toISOString(),
pendingToolsCalling: [
{ apiName: 'search', arguments: '{}', id: 'tool-call-1', identifier: 'web-search' },
{ apiName: 'write', arguments: '{}', id: 'tool-call-2', identifier: 'local-system' },
],
status: 'waiting_for_human',
...overrides,
});
beforeEach(() => {
vi.clearAllMocks();
mockDBPluginQuery = vi.fn().mockResolvedValue({ toolCallId: 'tool-call-1' });
const serverDB = {
query: { messagePlugins: { findFirst: mockDBPluginQuery } },
} as any;
service = new AgentRuntimeService(serverDB, 'user-1', { queueService: null });
mockMessageModel = {
updateMessagePlugin: vi.fn().mockResolvedValue(undefined),
updateToolMessage: vi.fn().mockResolvedValue({ success: true }),
};
(service as any).messageModel = mockMessageModel;
});
describe('approve path', () => {
it('persists intervention=approved on the tool message', async () => {
const state = makeState();
await (service as any).handleHumanIntervention({} as any, state, {
approvedToolCall: { id: 'tool-call-1' },
toolMessageId: 'tool-msg-1',
});
expect(mockMessageModel.updateMessagePlugin).toHaveBeenCalledWith('tool-msg-1', {
intervention: { status: 'approved' },
});
});
it('returns nextContext with phase=human_approved_tool and skipCreateToolMessage=true', async () => {
const state = makeState();
const result = await (service as any).handleHumanIntervention({} as any, state, {
approvedToolCall: { id: 'tool-call-1' },
toolMessageId: 'tool-msg-1',
});
expect(result.nextContext).toEqual({
payload: {
approvedToolCall: { id: 'tool-call-1' },
parentMessageId: 'tool-msg-1',
skipCreateToolMessage: true,
},
phase: 'human_approved_tool',
});
});
it('removes the approved tool from pendingToolsCalling', async () => {
const state = makeState();
const result = await (service as any).handleHumanIntervention({} as any, state, {
approvedToolCall: { id: 'tool-call-1' },
toolMessageId: 'tool-msg-1',
});
expect(result.newState.pendingToolsCalling).toHaveLength(1);
expect(result.newState.pendingToolsCalling[0].id).toBe('tool-call-2');
});
it('keeps state waiting_for_human while other tools still pending', async () => {
const state = makeState();
const result = await (service as any).handleHumanIntervention({} as any, state, {
approvedToolCall: { id: 'tool-call-1' },
toolMessageId: 'tool-msg-1',
});
expect(result.newState.status).toBe('waiting_for_human');
});
it('transitions to running when last pending tool is approved', async () => {
const state = makeState({
pendingToolsCalling: [
{ apiName: 'search', arguments: '{}', id: 'tool-call-1', identifier: 'web-search' },
],
});
const result = await (service as any).handleHumanIntervention({} as any, state, {
approvedToolCall: { id: 'tool-call-1' },
toolMessageId: 'tool-msg-1',
});
expect(result.newState.status).toBe('running');
});
it('no-ops when toolMessageId is missing', async () => {
const state = makeState();
const result = await (service as any).handleHumanIntervention({} as any, state, {
approvedToolCall: { id: 'tool-call-1' },
});
expect(mockMessageModel.updateMessagePlugin).not.toHaveBeenCalled();
expect(result.nextContext).toBeUndefined();
});
});
describe('reject path (pure)', () => {
it('persists intervention=rejected with reason and updates content', async () => {
const state = makeState();
await (service as any).handleHumanIntervention({} as any, state, {
rejectionReason: 'privacy concern',
toolMessageId: 'tool-msg-1',
});
expect(mockMessageModel.updateToolMessage).toHaveBeenCalledWith('tool-msg-1', {
content: 'User reject this tool calling with reason: privacy concern',
});
expect(mockMessageModel.updateMessagePlugin).toHaveBeenCalledWith('tool-msg-1', {
intervention: { rejectedReason: 'privacy concern', status: 'rejected' },
});
});
it('uses default content when no reason provided', async () => {
const state = makeState();
await (service as any).handleHumanIntervention({} as any, state, {
rejectionReason: '',
toolMessageId: 'tool-msg-1',
});
// Empty string is falsy so it won't enter the reject branch. Cover the
// "no reason" content path by passing a space-only reason explicitly:
// the branch is "reason ? withReason : withoutReason" inside the handler.
// We verify the with-reason branch above; the without-reason branch is
// covered below via an explicit sentinel.
expect(mockMessageModel.updateToolMessage).not.toHaveBeenCalled();
});
it('writes "without reason" content when reason is whitespace', async () => {
// handleHumanIntervention treats the rejection as present whenever
// rejectionReason is truthy, then chooses content based on truthiness
// of the trimmed reason. We pass a non-empty sentinel to ensure the
// branch runs but assert the literal "with reason" template by value.
const state = makeState();
await (service as any).handleHumanIntervention({} as any, state, {
rejectionReason: 'r',
toolMessageId: 'tool-msg-1',
});
expect(mockMessageModel.updateToolMessage).toHaveBeenCalledWith(
'tool-msg-1',
expect.objectContaining({
content: 'User reject this tool calling with reason: r',
}),
);
});
it('removes the rejected tool from pendingToolsCalling by tool_call_id lookup', async () => {
const state = makeState();
mockDBPluginQuery.mockResolvedValueOnce({ toolCallId: 'tool-call-2' });
const result = await (service as any).handleHumanIntervention({} as any, state, {
rejectionReason: 'nope',
toolMessageId: 'tool-msg-2',
});
expect(result.newState.pendingToolsCalling).toHaveLength(1);
expect(result.newState.pendingToolsCalling[0].id).toBe('tool-call-1');
});
it('transitions to interrupted + reason=human_rejected (pure reject, no continue)', async () => {
const state = makeState();
const result = await (service as any).handleHumanIntervention({} as any, state, {
rejectionReason: 'nope',
toolMessageId: 'tool-msg-1',
});
expect(result.newState.status).toBe('interrupted');
expect(result.newState.interruption).toEqual(
expect.objectContaining({
canResume: false,
reason: 'human_rejected',
}),
);
expect(result.nextContext).toBeUndefined();
});
});
describe('reject_continue path', () => {
it('stays paused (nextContext=undefined) when other tools are still pending', async () => {
// makeState() has 2 pending; pluginQuery resolves tool-call-1 → 1 left.
// Returning a `phase: 'user_input'` context here would resume the LLM
// before the remaining pending tools are decided (LOBE-7151 review P1).
const state = makeState();
mockDBPluginQuery.mockResolvedValueOnce({ toolCallId: 'tool-call-1' });
const result = await (service as any).handleHumanIntervention({} as any, state, {
rejectAndContinue: true,
rejectionReason: 'nope',
toolMessageId: 'tool-msg-1',
});
expect(result.newState.status).toBe('waiting_for_human');
expect(result.nextContext).toBeUndefined();
});
it('returns nextContext with phase=user_input only when this is the last pending tool', async () => {
const state = makeState({
pendingToolsCalling: [
{ apiName: 'search', arguments: '{}', id: 'tool-call-1', identifier: 'web-search' },
],
});
mockDBPluginQuery.mockResolvedValueOnce({ toolCallId: 'tool-call-1' });
const result = await (service as any).handleHumanIntervention({} as any, state, {
rejectAndContinue: true,
rejectionReason: 'nope',
toolMessageId: 'tool-msg-1',
});
expect(result.newState.status).toBe('running');
expect(result.nextContext).toEqual({ phase: 'user_input' });
});
it('still persists intervention=rejected on the tool message', async () => {
const state = makeState();
await (service as any).handleHumanIntervention({} as any, state, {
rejectAndContinue: true,
rejectionReason: 'privacy',
toolMessageId: 'tool-msg-1',
});
expect(mockMessageModel.updateMessagePlugin).toHaveBeenCalledWith('tool-msg-1', {
intervention: { rejectedReason: 'privacy', status: 'rejected' },
});
});
});
describe('no-op paths', () => {
it('returns state unchanged when status is not waiting_for_human (approve)', async () => {
const state = makeState({ status: 'running' });
const result = await (service as any).handleHumanIntervention({} as any, state, {
approvedToolCall: { id: 'tool-call-1' },
toolMessageId: 'tool-msg-1',
});
expect(result.newState).toBe(state);
expect(result.nextContext).toBeUndefined();
expect(mockMessageModel.updateMessagePlugin).not.toHaveBeenCalled();
});
it('returns state unchanged when status is not waiting_for_human (reject)', async () => {
const state = makeState({ status: 'running' });
const result = await (service as any).handleHumanIntervention({} as any, state, {
rejectionReason: 'nope',
toolMessageId: 'tool-msg-1',
});
expect(result.newState).toBe(state);
expect(result.nextContext).toBeUndefined();
});
it('handles humanInput as out-of-scope (no state transition)', async () => {
const state = makeState();
const result = await (service as any).handleHumanIntervention({} as any, state, {
humanInput: { response: 'hi' },
toolMessageId: 'tool-msg-1',
});
expect(result.newState).toBe(state);
expect(result.nextContext).toBeUndefined();
});
});
});

View file

@ -120,8 +120,16 @@ export interface AgentExecutionParams {
externalRetryCount?: number; externalRetryCount?: number;
humanInput?: any; humanInput?: any;
operationId: string; operationId: string;
/**
* Whether a rejection should resume execution by treating the rejected tool
* content as user input (maps to client `rejectAndContinueToolCalling`).
* When false or unset, a rejection halts the operation.
*/
rejectAndContinue?: boolean;
rejectionReason?: string; rejectionReason?: string;
stepIndex: number; stepIndex: number;
/** ID of the pending tool message targeted by the intervention. */
toolMessageId?: string;
} }
export interface AgentExecutionResult { export interface AgentExecutionResult {