mirror of
https://github.com/lobehub/lobehub
synced 2026-04-21 17:47:27 +00:00
🔨 fix: wire Gateway-mode stop via direct tRPC interrupt (#13815)
* ✨ feat: wire Gateway-mode stop button to WS interrupt Frontend half of [LOBE-7142](https://linear.app/lobehub/issue/LOBE-7142) — the stop button previously silently failed in Gateway mode because: 1. `stopGenerateMessage` only filtered `execAgentRuntime`, so `execServerAgentRuntime` ops (Gateway) were skipped. 2. Even if the local op got cancelled, nothing bridged the cancel to the server-side agent loop running behind the Agent Gateway WS. ## Changes **`conversationControl.ts::stopGenerateMessage`** — extend the type filter to include both op types so both client-side and Gateway-mode runs are cancelled from the same entry point. **`gateway.ts::executeGatewayAgent` + `reconnectToGatewayOperation`** — register an `onOperationCancel` handler on the local `gatewayOpId` that forwards the server-side operation id to `interruptGatewayAgent(...)`, which sends `{ type: 'interrupt' }` over the Agent Gateway WS. The closure cleanly resolves the "local op id vs server op id" mapping — no metadata lookup needed. **`operation/actions.ts::cancelOperation`** — `isAborting` flag was gated on `execAgentRuntime`. Extend to `execServerAgentRuntime` too so the UI loading state transitions out immediately on Gateway-mode stop, without waiting for the round-trip `session_complete` from the server. ## What this doesn't do (follow-ups) - **Backend**: new `POST /api/agent/interrupt` route + Redis LPUSH (LOBE-7145). Without it, the WS interrupt reaches Agent Gateway but never gets forwarded to cloud. - **Agent loop**: `AgentRuntimeService.executeStep` LPOP polling of the interrupt key (LOBE-7146). Without it, the state never flips to `interrupted` server-side. - **Agent Gateway DO** (external repo): `_forwardInterrupt` HTTP POST from the WS interrupt handler (LOBE-7147). With only this PR merged, clicking stop will clear the local UI state and send the WS frame correctly — the server-side loop keeps running until those three are merged too. ## Tests - `conversationControl.test.ts`: +1 — stopGenerateMessage cancels `execServerAgentRuntime`, invokes the onCancel handler, sets `isAborting: true`. - `gateway.test.ts`: +1 — `executeGatewayAgent` registers a handler against the local opId, handler invokes `interruptGatewayAgent` with the server opId. All 123 touched-slice tests pass; type-check clean. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🔨 chore: switch Gateway stop to direct tRPC instead of WS roundtrip Rewiring only — no new behaviour on top of the previous commit. See the discussion in PR #13815 for the full reasoning. TL;DR the WS-based path (client → Agent Gateway WS → DO forwards HTTP → cloud route → Redis LPUSH → loop LPOP) has the same end-effect as the tRPC-direct path (client → tRPC → AgentRuntimeService .interruptOperation → DB state flip), except: - the tRPC path is one hop instead of three - the tRPC path reuses infrastructure that's *already on canary* — `aiAgentService.interruptTask` → `AiAgentService.interruptTask` → `AgentRuntimeService.interruptOperation` → `coordinator.saveAgentState` with status='interrupted' — and the existing step-boundary polling in `executeStep` (AgentRuntimeService.ts:474, 565) already picks it up - zero new server code required; zero Agent Gateway (external repo) coordination required The only reason the WS path was in the original spec (LOBE-7142) was symmetry with the Phase 6.4 tool_execute/tool_result path, but `interrupt` is a one-shot control signal, not stream data — there's no actual benefit to routing it through the same channel. Mid-step abort would require threading an AbortSignal into `runtime.step(...)`, which WS doesn't help with either. Closes out the need for LOBE-7145 / LOBE-7146 / LOBE-7147. Changes: - `gateway.ts`: both `executeGatewayAgent` and `reconnectToGatewayOperation` register the cancel handler against the local op id, but the handler body now calls `aiAgentService.interruptTask({ operationId: serverOpId })` via tRPC instead of `this.interruptGatewayAgent(serverOpId)` (which sent the WS interrupt frame). - `gateway.test.ts`: adjust the one new test case to verify the tRPC call rather than the WS-path spy; add `interruptTask` to the `aiAgentService` mock. `AgentStreamClient.sendInterrupt()` and `interruptGatewayAgent()` are kept as-is — public API, might be useful elsewhere. Just not called from the cancel handler anymore. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
636a3b77c3
commit
18bc2716b2
5 changed files with 140 additions and 5 deletions
|
|
@ -85,6 +85,49 @@ describe('ConversationControl actions', () => {
|
|||
|
||||
expect(result.current.operations[operationId!].status).toBe('running');
|
||||
});
|
||||
|
||||
it('cancels Gateway-mode execServerAgentRuntime ops and invokes their cancel handler', () => {
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
|
||||
act(() => {
|
||||
useChatStore.setState({
|
||||
activeAgentId: TEST_IDS.SESSION_ID,
|
||||
activeTopicId: TEST_IDS.TOPIC_ID,
|
||||
});
|
||||
});
|
||||
|
||||
let operationId!: string;
|
||||
act(() => {
|
||||
const res = result.current.startOperation({
|
||||
type: 'execServerAgentRuntime',
|
||||
context: { agentId: TEST_IDS.SESSION_ID, topicId: TEST_IDS.TOPIC_ID },
|
||||
});
|
||||
operationId = res.operationId;
|
||||
});
|
||||
|
||||
const cancelHandler = vi.fn();
|
||||
act(() => {
|
||||
result.current.onOperationCancel(operationId, cancelHandler);
|
||||
});
|
||||
|
||||
expect(result.current.operations[operationId].status).toBe('running');
|
||||
|
||||
act(() => {
|
||||
result.current.stopGenerateMessage();
|
||||
});
|
||||
|
||||
// Operation gets cancelled and the handler (which would fire the WS interrupt
|
||||
// in real code) is invoked with the operation context.
|
||||
expect(result.current.operations[operationId].status).toBe('cancelled');
|
||||
expect(cancelHandler).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
operationId,
|
||||
type: 'execServerAgentRuntime',
|
||||
}),
|
||||
);
|
||||
// isAborting flag is also flipped so the UI loading state clears immediately.
|
||||
expect(result.current.operations[operationId].metadata.isAborting).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('cancelSendMessageInServer', () => {
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import { GatewayActionImpl } from '../gateway';
|
|||
vi.mock('@/services/aiAgent', () => ({
|
||||
aiAgentService: {
|
||||
execAgentTask: vi.fn(),
|
||||
interruptTask: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
|
|
@ -281,6 +282,7 @@ describe('GatewayActionImpl', () => {
|
|||
associateMessageWithOperation: vi.fn(),
|
||||
connectToGateway: vi.fn(),
|
||||
internal_updateTopicLoading: vi.fn(),
|
||||
onOperationCancel: vi.fn(),
|
||||
replaceMessages: vi.fn(),
|
||||
switchTopic: vi.fn(),
|
||||
})) as any;
|
||||
|
|
@ -398,5 +400,67 @@ describe('GatewayActionImpl', () => {
|
|||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('registers a cancel handler that calls aiAgentService.interruptTask with the server operationId', async () => {
|
||||
const onOperationCancel = vi.fn();
|
||||
const startOperation = vi.fn(() => ({ operationId: 'gw-op-local' }));
|
||||
|
||||
const mockClient = createMockClient();
|
||||
const state: Record<string, any> = { gatewayConnections: {} };
|
||||
const set = vi.fn((updater: any) => {
|
||||
if (typeof updater === 'function') Object.assign(state, updater(state));
|
||||
else Object.assign(state, updater);
|
||||
});
|
||||
const get = vi.fn(() => ({
|
||||
...state,
|
||||
associateMessageWithOperation: vi.fn(),
|
||||
connectToGateway: vi.fn(),
|
||||
internal_updateTopicLoading: vi.fn(),
|
||||
onOperationCancel,
|
||||
replaceMessages: vi.fn(),
|
||||
startOperation,
|
||||
switchTopic: vi.fn(),
|
||||
})) as any;
|
||||
|
||||
(globalThis as any).window = {
|
||||
global_serverConfigStore: {
|
||||
getState: () => ({ serverConfig: { agentGatewayUrl: 'https://gateway.test.com' } }),
|
||||
},
|
||||
};
|
||||
|
||||
const action = new GatewayActionImpl(set as any, get, undefined);
|
||||
action.createClient = vi.fn(() => mockClient);
|
||||
const interruptTaskSpy = vi
|
||||
.mocked(aiAgentService.interruptTask)
|
||||
.mockResolvedValue({ operationId: 'server-op-xyz', success: true });
|
||||
|
||||
vi.mocked(aiAgentService.execAgentTask).mockResolvedValue({
|
||||
agentId: 'agent-1',
|
||||
assistantMessageId: 'ast-1',
|
||||
autoStarted: true,
|
||||
createdAt: new Date().toISOString(),
|
||||
message: 'ok',
|
||||
operationId: 'server-op-xyz',
|
||||
status: 'created',
|
||||
success: true,
|
||||
timestamp: new Date().toISOString(),
|
||||
token: 'test-token',
|
||||
topicId: 'topic-1',
|
||||
userMessageId: 'usr-1',
|
||||
});
|
||||
|
||||
await action.executeGatewayAgent({
|
||||
context: { agentId: 'agent-1', topicId: 'topic-1', threadId: null, scope: 'main' },
|
||||
message: 'Hello',
|
||||
});
|
||||
|
||||
// Handler was registered against the local operation id...
|
||||
expect(onOperationCancel).toHaveBeenCalledWith('gw-op-local', expect.any(Function));
|
||||
|
||||
// ...and, when invoked, fires tRPC interruptTask with the *server-side* operation id
|
||||
const [, handler] = onOperationCancel.mock.calls[0];
|
||||
await handler();
|
||||
expect(interruptTaskSpy).toHaveBeenCalledWith({ operationId: 'server-op-xyz' });
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -31,10 +31,15 @@ export class ConversationControlActionImpl {
|
|||
stopGenerateMessage = (): void => {
|
||||
const { activeAgentId, activeTopicId, cancelOperations } = this.#get();
|
||||
|
||||
// Cancel all running execAgentRuntime operations in the current context
|
||||
// Cancel running agent-runtime operations in the current context —
|
||||
// both client-side (execAgentRuntime) and Gateway-mode
|
||||
// (execServerAgentRuntime). For the Gateway-mode branch, a cancel
|
||||
// handler registered in `executeGatewayAgent` / `reconnectToGatewayOperation`
|
||||
// picks up the cancellation and forwards an `interrupt` frame over the
|
||||
// Agent Gateway WebSocket so the server-side loop aborts.
|
||||
cancelOperations(
|
||||
{
|
||||
type: 'execAgentRuntime',
|
||||
type: ['execAgentRuntime', 'execServerAgentRuntime'],
|
||||
status: 'running',
|
||||
agentId: activeAgentId,
|
||||
topicId: activeTopicId,
|
||||
|
|
|
|||
|
|
@ -275,6 +275,17 @@ export class GatewayActionImpl {
|
|||
// Associate the server-created assistant message with the gateway operation
|
||||
this.#get().associateMessageWithOperation(result.assistantMessageId, gatewayOpId);
|
||||
|
||||
// When the local operation is cancelled (e.g. user clicks stop), forward
|
||||
// the interrupt directly to the server via the existing tRPC endpoint.
|
||||
// Closure captures `result.operationId` (the server-side id) so we don't
|
||||
// depend on any metadata lookup. Fire-and-forget — errors are logged but
|
||||
// never block the local cancel flow.
|
||||
this.#get().onOperationCancel(gatewayOpId, async () => {
|
||||
await aiAgentService
|
||||
.interruptTask({ operationId: result.operationId })
|
||||
.catch((err) => console.error('[Gateway] interruptTask failed:', err));
|
||||
});
|
||||
|
||||
const eventHandler = createGatewayEventHandler(this.#get, {
|
||||
assistantMessageId: result.assistantMessageId,
|
||||
context: execContext,
|
||||
|
|
@ -344,6 +355,14 @@ export class GatewayActionImpl {
|
|||
|
||||
this.#get().associateMessageWithOperation(assistantMessageId, gatewayOpId);
|
||||
|
||||
// Forward local-op cancellation to the server-side agent loop via tRPC.
|
||||
// See note in executeGatewayAgent for details.
|
||||
this.#get().onOperationCancel(gatewayOpId, async () => {
|
||||
await aiAgentService
|
||||
.interruptTask({ operationId })
|
||||
.catch((err) => console.error('[Gateway] interruptTask failed:', err));
|
||||
});
|
||||
|
||||
const eventHandler = createGatewayEventHandler(this.#get, {
|
||||
assistantMessageId,
|
||||
context,
|
||||
|
|
|
|||
|
|
@ -389,9 +389,13 @@ export class OperationActionsImpl {
|
|||
// Ignore abort errors
|
||||
}
|
||||
|
||||
// 2. Set isAborting flag immediately for execAgentRuntime operations
|
||||
// This ensures UI (loading button) responds instantly to user cancellation
|
||||
if (operation.type === 'execAgentRuntime') {
|
||||
// 2. Set isAborting flag immediately for agent-runtime operations.
|
||||
// This ensures UI (loading button) responds instantly to user cancellation.
|
||||
// Applies to both client-side (execAgentRuntime) and Gateway-mode
|
||||
// (execServerAgentRuntime) runs — the latter needs the flag so the UI
|
||||
// transitions out of loading right away, without waiting for the
|
||||
// round-trip WS `session_complete` after the server acknowledges interrupt.
|
||||
if (operation.type === 'execAgentRuntime' || operation.type === 'execServerAgentRuntime') {
|
||||
this.#get().updateOperationMetadata(operationId, { isAborting: true });
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue