mirror of
https://github.com/lobehub/lobehub
synced 2026-04-21 17:47:27 +00:00
🐛 fix: buffer and deduplicate events during gateway resume (#13689)
* 🐛 fix: buffer and deduplicate events during resume to prevent out-of-order display When reconnecting with empty lastEventId (page reload), live broadcast events can arrive before resume replay completes, causing content to appear out of order. Now AgentStreamClient enters resume mode: buffers all events, waits for a 500ms gap (resume replay is dense, live events are sparse), then deduplicates by event ID and emits in order. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: clear runningOperation on agent finish + resume timeout for completed sessions - RuntimeExecutors.finish clears topic metadata.runningOperation when agent reaches terminal state, so stale entries don't trigger reconnect - AgentStreamClient resume mode: add 3s timeout for empty buffer — if no events arrive after resume request, session has already completed, emit session_complete and disconnect Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: eagerly fetch messages after topic switch to avoid skeleton flash After switchTopic in Gateway mode, immediately fetch messages from DB and replace in store, so the UI renders content right away instead of showing a skeleton loading state while SWR re-fetches. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: eliminate skeleton flash on gateway topic switch Match the client-mode pattern: fetch messages from DB and replaceMessages BEFORE calling switchTopic with skipRefreshMessage: true. This ensures messages are already in the store when the topic switches, preventing a skeleton loading flash. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: flush resume buffer on session_complete before disconnect session_complete is a top-level ServerMessage (not an agent_event), so it bypassed the resume buffer. When it arrived during resume mode, disconnect() cleared the buffer and all replayed events were lost. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: limit resume buffering to explicit reconnect scenarios only Resume mode was triggered for ALL new connections (lastEventId always empty on first connect), delaying live streaming for normal operations. Now resume buffering requires explicit opt-in via resumeOnConnect option, which is only set by reconnectToGatewayOperation (page-reload reconnect). Normal executeGatewayAgent connections stream events immediately. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
06ac87dc45
commit
23f91d044c
4 changed files with 139 additions and 4 deletions
|
|
@ -13,6 +13,8 @@ const HEARTBEAT_INTERVAL = 30_000; // 30s
|
|||
const INITIAL_RECONNECT_DELAY = 1000; // 1s
|
||||
const MAX_RECONNECT_DELAY = 30_000; // 30s
|
||||
const MAX_MISSED_HEARTBEATS = 3;
|
||||
const RESUME_FLUSH_DELAY = 500; // 500ms debounce after last resume event
|
||||
const RESUME_TIMEOUT = 3000; // 3s: if no events after resume request, session is already done
|
||||
|
||||
// ─── Typed Event Emitter (browser-compatible, no node:events) ───
|
||||
|
||||
|
|
@ -70,9 +72,16 @@ export class AgentStreamClient extends TypedEmitter {
|
|||
private lastEventId = '';
|
||||
private sessionEnded = false;
|
||||
|
||||
// Resume buffering: when reconnecting with empty lastEventId, buffer events
|
||||
// until resume replay completes, then deduplicate and emit in order.
|
||||
private resumeBuffer: Array<{ event: AgentStreamEvent; id?: string }> = [];
|
||||
private resumeMode = false;
|
||||
private resumeFlushTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
private readonly gatewayUrl: string;
|
||||
private readonly operationId: string;
|
||||
private readonly autoReconnect: boolean;
|
||||
private readonly resumeOnConnect: boolean;
|
||||
private token: string;
|
||||
|
||||
constructor(options: AgentStreamClientOptions) {
|
||||
|
|
@ -81,6 +90,7 @@ export class AgentStreamClient extends TypedEmitter {
|
|||
this.operationId = options.operationId;
|
||||
this.token = options.token;
|
||||
this.autoReconnect = options.autoReconnect ?? true;
|
||||
this.resumeOnConnect = options.resumeOnConnect ?? false;
|
||||
}
|
||||
|
||||
// ─── Public API ───
|
||||
|
|
@ -196,6 +206,26 @@ export class AgentStreamClient extends TypedEmitter {
|
|||
case 'auth_success': {
|
||||
this.setStatus('connected');
|
||||
this.startHeartbeat();
|
||||
|
||||
// Enter resume mode only for explicit reconnect scenarios (page reload).
|
||||
// Buffer all events until resume replay completes, then deduplicate and emit.
|
||||
// This is NOT enabled for normal first-connect to avoid delaying live streaming.
|
||||
if (this.resumeOnConnect && !this.lastEventId) {
|
||||
this.resumeMode = true;
|
||||
this.resumeBuffer = [];
|
||||
|
||||
// Safety timeout: if no events arrive after resume, the session has already
|
||||
// completed and the DO has nothing to replay. Exit resume mode and signal completion.
|
||||
this.resumeFlushTimer = setTimeout(() => {
|
||||
if (this.resumeMode && this.resumeBuffer.length === 0) {
|
||||
this.resumeMode = false;
|
||||
this.sessionEnded = true;
|
||||
this.emit('session_complete');
|
||||
this.disconnect();
|
||||
}
|
||||
}, RESUME_TIMEOUT);
|
||||
}
|
||||
|
||||
// Request all buffered events (covers events pushed before WS connected)
|
||||
this.sendMessage({ lastEventId: this.lastEventId, type: 'resume' });
|
||||
this.emit('connected');
|
||||
|
|
@ -217,6 +247,20 @@ export class AgentStreamClient extends TypedEmitter {
|
|||
const agentEvent: AgentStreamEvent = message.event;
|
||||
if (message.id) this.lastEventId = message.id;
|
||||
|
||||
if (this.resumeMode) {
|
||||
// Buffer events during resume — will be deduplicated and emitted after replay
|
||||
this.resumeBuffer.push({ event: agentEvent, id: message.id });
|
||||
this.scheduleResumeFlush();
|
||||
|
||||
// Terminal events still end the session even in resume mode
|
||||
if (agentEvent.type === 'agent_runtime_end' || agentEvent.type === 'error') {
|
||||
this.sessionEnded = true;
|
||||
this.flushResumeBuffer();
|
||||
this.disconnect();
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
this.emit('agent_event', agentEvent);
|
||||
|
||||
// Terminal events — session is done, no need to reconnect
|
||||
|
|
@ -229,6 +273,10 @@ export class AgentStreamClient extends TypedEmitter {
|
|||
|
||||
case 'session_complete': {
|
||||
this.sessionEnded = true;
|
||||
// Flush any buffered resume events before disconnecting
|
||||
if (this.resumeMode) {
|
||||
this.flushResumeBuffer();
|
||||
}
|
||||
this.emit('session_complete');
|
||||
this.disconnect();
|
||||
break;
|
||||
|
|
@ -322,6 +370,52 @@ export class AgentStreamClient extends TypedEmitter {
|
|||
this.emit('status_changed', status);
|
||||
}
|
||||
|
||||
// ─── Resume Buffering ───
|
||||
|
||||
/**
|
||||
* Schedule a debounced flush of the resume buffer.
|
||||
* Resume replay events arrive in rapid succession; once there's a 500ms gap,
|
||||
* we consider the replay done and flush the deduplicated buffer.
|
||||
*/
|
||||
private scheduleResumeFlush(): void {
|
||||
if (this.resumeFlushTimer) clearTimeout(this.resumeFlushTimer);
|
||||
this.resumeFlushTimer = setTimeout(() => {
|
||||
this.flushResumeBuffer();
|
||||
}, RESUME_FLUSH_DELAY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deduplicate buffered events by event ID and emit them in order.
|
||||
*/
|
||||
private flushResumeBuffer(): void {
|
||||
if (!this.resumeMode) return;
|
||||
this.resumeMode = false;
|
||||
|
||||
if (this.resumeFlushTimer) {
|
||||
clearTimeout(this.resumeFlushTimer);
|
||||
this.resumeFlushTimer = null;
|
||||
}
|
||||
|
||||
// Deduplicate by event ID, keeping the first occurrence (from resume replay)
|
||||
const seen = new Set<string>();
|
||||
const deduped: AgentStreamEvent[] = [];
|
||||
|
||||
for (const { event, id } of this.resumeBuffer) {
|
||||
const key = id || `${event.type}_${event.stepIndex}_${event.timestamp}`;
|
||||
if (!seen.has(key)) {
|
||||
seen.add(key);
|
||||
deduped.push(event);
|
||||
}
|
||||
}
|
||||
|
||||
this.resumeBuffer = [];
|
||||
|
||||
// Emit deduplicated events in order
|
||||
for (const event of deduped) {
|
||||
this.emit('agent_event', event);
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Helpers ───
|
||||
|
||||
private sendMessage(data: ClientMessage): void {
|
||||
|
|
@ -349,5 +443,11 @@ export class AgentStreamClient extends TypedEmitter {
|
|||
this.stopHeartbeat();
|
||||
this.clearReconnectTimer();
|
||||
this.closeWebSocket();
|
||||
if (this.resumeFlushTimer) {
|
||||
clearTimeout(this.resumeFlushTimer);
|
||||
this.resumeFlushTimer = null;
|
||||
}
|
||||
this.resumeMode = false;
|
||||
this.resumeBuffer = [];
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -156,6 +156,13 @@ export interface AgentStreamClientOptions {
|
|||
gatewayUrl: string;
|
||||
/** Operation ID to subscribe to */
|
||||
operationId: string;
|
||||
/**
|
||||
* Enable resume buffering on first connect (default: false).
|
||||
* When true, events are buffered and deduplicated after the resume replay
|
||||
* completes, preventing out-of-order display during page-reload reconnect.
|
||||
* Only set this for reconnection scenarios, not for new operations.
|
||||
*/
|
||||
resumeOnConnect?: boolean;
|
||||
/** Auth token */
|
||||
token: string;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1862,6 +1862,16 @@ export const createRuntimeExecutors = (
|
|||
|
||||
log('[%s:%d] Finishing execution: (%s)', operationId, stepIndex, reason);
|
||||
|
||||
// Clear runningOperation from topic metadata so reconnect doesn't trigger after completion
|
||||
if (ctx.topicId && ctx.userId) {
|
||||
try {
|
||||
const topicModel = new TopicModel(ctx.serverDB, ctx.userId);
|
||||
await topicModel.updateMetadata(ctx.topicId, { runningOperation: null });
|
||||
} catch (e) {
|
||||
log('[%s] Failed to clear runningOperation metadata: %O', operationId, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Publish execution complete event
|
||||
await streamManager.publishStreamEvent(operationId, {
|
||||
data: {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import type {
|
|||
} from '@/libs/agent-stream';
|
||||
import { AgentStreamClient } from '@/libs/agent-stream/client';
|
||||
import { aiAgentService } from '@/services/aiAgent';
|
||||
import { messageService } from '@/services/message';
|
||||
import { topicService } from '@/services/topic';
|
||||
import type { ChatStore } from '@/store/chat/store';
|
||||
import type { StoreSetter } from '@/store/types';
|
||||
|
|
@ -40,6 +41,10 @@ export interface ConnectGatewayParams {
|
|||
* The operation ID returned by execAgent
|
||||
*/
|
||||
operationId: string;
|
||||
/**
|
||||
* Enable resume buffering for reconnect scenarios (default: false)
|
||||
*/
|
||||
resumeOnConnect?: boolean;
|
||||
/**
|
||||
* Auth token for the Gateway
|
||||
*/
|
||||
|
|
@ -67,12 +72,12 @@ export class GatewayActionImpl {
|
|||
* Creates an AgentStreamClient, manages its lifecycle, and wires up event callbacks.
|
||||
*/
|
||||
connectToGateway = (params: ConnectGatewayParams): void => {
|
||||
const { operationId, gatewayUrl, token, onEvent, onSessionComplete } = params;
|
||||
const { operationId, gatewayUrl, token, onEvent, onSessionComplete, resumeOnConnect } = params;
|
||||
|
||||
// Disconnect existing connection for this operation if any
|
||||
this.disconnectFromGateway(operationId);
|
||||
|
||||
const client = this.createClient({ gatewayUrl, operationId, token });
|
||||
const client = this.createClient({ gatewayUrl, operationId, resumeOnConnect, token });
|
||||
|
||||
// Track connection in store
|
||||
this.#set(
|
||||
|
|
@ -202,9 +207,21 @@ export class GatewayActionImpl {
|
|||
prompt: message,
|
||||
});
|
||||
|
||||
// If server created a new topic, switch to it and clean up the _new key temp messages
|
||||
// If server created a new topic, fetch messages first then switch topic
|
||||
// (same pattern as client mode: replaceMessages before switchTopic to avoid skeleton flash)
|
||||
if (isCreateNewTopic && result.topicId) {
|
||||
await this.#get().switchTopic(result.topicId, { clearNewKey: true });
|
||||
try {
|
||||
const newContext = { ...context, topicId: result.topicId };
|
||||
const messages = await messageService.getMessages(newContext);
|
||||
this.#get().replaceMessages(messages, { context: newContext });
|
||||
} catch {
|
||||
/* non-critical */
|
||||
}
|
||||
|
||||
await this.#get().switchTopic(result.topicId, {
|
||||
clearNewKey: true,
|
||||
skipRefreshMessage: true,
|
||||
});
|
||||
}
|
||||
|
||||
// Use the server-created topicId for the execution context
|
||||
|
|
@ -303,6 +320,7 @@ export class GatewayActionImpl {
|
|||
topicService.updateTopicMetadata(topicId, { runningOperation: null }).catch(() => {});
|
||||
},
|
||||
operationId,
|
||||
resumeOnConnect: true,
|
||||
token,
|
||||
});
|
||||
};
|
||||
|
|
|
|||
Loading…
Reference in a new issue