🐛 fix: fail fast when tool/assistant message persist hits a missing parent (#13828)

* 🐛 fix: fail fast when tool/assistant message persist hits a missing parent

When a conversation parent was deleted mid-operation (LOBE-7154), the
runtime was silently swallowing the parent_id FK violation in three tool
persist paths and continuing with a stale parentMessageId. The next LLM
call hit the same FK without context, surfacing as a raw SQL error to
the user after burning several LLM + tool call round trips.

Changes

- packages/types: add AgentRuntimeErrorType.ConversationParentMissing
- new messagePersistErrors.ts helper: FK detection + structured error
  constructor + persist-fatal marker (keeps RuntimeExecutors smaller)
- RuntimeExecutors:
  - call_tool: publish error event + re-throw on persist failure;
    outer catch propagates when persist-fatal
  - call_tools_batch: same, mark so the per-tool outer catch doesn't
    swallow and fall back to the already-deleted parent
  - resolve_aborted_tools: same pattern
  - call_llm: preflight parent existence via findById so we fail before
    the LLM call instead of after
- tests: replace old swallow-on-fail expectations, add LOBE-7158 cases
  for each executor plus focused unit tests for the helper module

Fixes LOBE-7158

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 💄 chore: publish normalized ConversationParentMissing on persist failure

Review feedback on LOBE-7158: the three persist catches were emitting
the raw DB exception as a stream `error` event before normalizing it.
Clients treat `error` events as terminal and surface `event.data.error`
directly, so the raw SQL text leaked to users and ended the stream
before the typed `ConversationParentMissing` throw could propagate.

Move normalization ahead of the publish in call_tool, call_tools_batch,
and resolve_aborted_tools so the stream event always carries the
intended business error. Add a regression assertion on the
call_tool FK test that the error event's `errorType` is
`ConversationParentMissing` and no `Failed query` text leaks through.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu 2026-04-15 09:27:01 +08:00 committed by GitHub
parent 1a98e1b5aa
commit f12cf8f2ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 375 additions and 38 deletions

View file

@ -13,6 +13,13 @@ export enum RequestTrigger {
// ******* Runtime Biz Error ******* //
export const AgentRuntimeErrorType = {
AgentRuntimeError: 'AgentRuntimeError', // Agent Runtime module runtime error
/**
* The `parent_id` referenced by an assistant / tool message no longer exists
* in the database typically because the parent message was deleted during
* operation execution. The conversation chain is broken, so the runtime
* stops fail-fast instead of letting the next step hit another FK violation.
*/
ConversationParentMissing: 'ConversationParentMissing',
LocationNotSupportError: 'LocationNotSupportError',
AccountDeactivated: 'AccountDeactivated',

View file

@ -49,6 +49,12 @@ import {
import { dispatchClientTool } from './dispatchClientTool';
import { classifyLLMError, type LLMErrorKind } from './llmErrorClassification';
import {
createConversationParentMissingError,
isParentMessageMissingError,
isPersistFatal,
markPersistFatal,
} from './messagePersistErrors';
import { type IStreamEventManager } from './types';
const log = debug('lobe-server:agent-runtime:streaming-executors');
@ -328,6 +334,25 @@ export const createRuntimeExecutors = (
// Get parentId from payload (parentId or parentMessageId depending on payload type)
const parentId = llmPayload.parentId || (llmPayload as any).parentMessageId;
// Parent existence preflight (LOBE-7158 / LOBE-7154):
// If the parent was deleted concurrently (e.g. user deleted topic mid-run),
// assistant message creation below would hit a PG FK violation AFTER we've
// already done the LLM call and spent tokens. Check first — fail fast,
// save cost, and surface a typed error the frontend can act on instead of
// a raw SQL error.
if (parentId) {
const parentExists = await ctx.messageModel.findById(parentId);
if (!parentExists) {
const error = createConversationParentMissingError(parentId);
await streamManager.publishStreamEvent(operationId, {
data: formatErrorEventData(error, 'parent_message_preflight'),
stepIndex,
type: 'error',
});
throw error;
}
}
// Get or create assistant message
// If assistantMessageId is provided in payload, use existing message instead of creating new one
const existingAssistantMessageId = (llmPayload as any).assistantMessageId;
@ -1461,6 +1486,22 @@ export const createRuntimeExecutors = (
toolMessageId = toolMessage.id;
} catch (error) {
console.error('[StreamingToolExecutor] Failed to create tool message: %O', error);
// Normalize BEFORE publishing so clients (which treat `error` stream
// events as terminal and surface `event.data.error` directly) see the
// typed business error, not the raw SQL / driver text.
const fatal = isParentMessageMissingError(error)
? createConversationParentMissingError(payload.parentMessageId, error)
: error instanceof Error
? error
: new Error(String(error));
await streamManager.publishStreamEvent(operationId, {
data: formatErrorEventData(fatal, 'tool_message_persist'),
stepIndex,
type: 'error',
});
// Mark so the outer catch (which normally converts tool-exec errors
// into event records and returns the unchanged state) re-throws.
throw markPersistFatal(fatal);
}
const newState = structuredClone(state);
@ -1562,6 +1603,11 @@ export const createRuntimeExecutors = (
},
};
} catch (error) {
// Persist-level failures (parent FK violation etc.) must propagate so
// the step fails — otherwise the swallow-and-continue path keeps
// running the agent on a broken conversation chain. See LOBE-7158.
if (isPersistFatal(error)) throw error;
// Publish tool execution error event
await streamManager.publishStreamEvent(operationId, {
data: formatErrorEventData(error, 'tool_execution'),
@ -1764,6 +1810,23 @@ export const createRuntimeExecutors = (
`[${operationLogId}] Failed to create tool message for ${toolName}:`,
error,
);
// Normalize BEFORE publishing — clients treat `error` stream
// events as terminal and surface `event.data.error` directly, so
// a raw SQL error here would leak driver text to the user before
// the ConversationParentMissing throw is consumed. See LOBE-7158.
const fatal = isParentMessageMissingError(error)
? createConversationParentMissingError(parentMessageId, error)
: error instanceof Error
? error
: new Error(String(error));
await streamManager.publishStreamEvent(operationId, {
data: formatErrorEventData(fatal, 'tool_message_persist'),
stepIndex,
type: 'error',
});
// Marker so the outer catch (which normally just records
// per-tool exec errors) knows to propagate this one.
throw markPersistFatal(fatal);
}
// Collect tool result
@ -1786,6 +1849,13 @@ export const createRuntimeExecutors = (
toolName,
};
} catch (error) {
// Persist-level failures (e.g. parent FK violations) must propagate
// so the whole batch short-circuits. Without this the fallback to
// the already-deleted parent triggers another FK on the next step.
if (isPersistFatal(error)) {
throw error;
}
console.error(`[${operationLogId}] Tool execution failed for ${toolName}:`, error);
// Publish error event
@ -2103,6 +2173,19 @@ export const createRuntimeExecutors = (
toolName,
error,
);
// Normalize BEFORE publishing so clients surface the typed business
// error instead of the raw driver text (see LOBE-7158 review).
const fatal = isParentMessageMissingError(error)
? createConversationParentMissingError(parentMessageId, error)
: error instanceof Error
? error
: new Error(String(error));
await streamManager.publishStreamEvent(operationId, {
data: formatErrorEventData(fatal, 'tool_message_persist'),
stepIndex,
type: 'error',
});
throw fatal;
}
}

View file

@ -63,6 +63,9 @@ describe('RuntimeExecutors', () => {
mockMessageModel = {
create: vi.fn().mockResolvedValue({ id: 'msg-123' }),
// call_llm does a parent existence preflight; return a truthy row by
// default so existing tests don't have to stub it.
findById: vi.fn().mockResolvedValue({ id: 'msg-existing' }),
query: vi.fn().mockResolvedValue([]),
update: vi.fn().mockResolvedValue({}),
};
@ -236,6 +239,36 @@ describe('RuntimeExecutors', () => {
);
});
it('should throw ConversationParentMissing if parent preflight misses (LOBE-7158)', async () => {
// parent existence preflight — if the parent row was deleted between
// operation kickoff and call_llm, fail fast before spending LLM tokens
// on a chain that would hit a FK violation anyway.
mockMessageModel.findById.mockResolvedValueOnce(null);
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
messages: [{ content: 'Hello', role: 'user' }],
model: 'gpt-4',
parentId: 'gone-msg',
provider: 'openai',
tools: [],
},
type: 'call_llm' as const,
};
await expect(executors.call_llm!(instruction, state)).rejects.toMatchObject({
errorType: 'ConversationParentMissing',
parentId: 'gone-msg',
});
// LLM never got invoked
expect(initModelRuntimeFromDB).not.toHaveBeenCalled();
// No assistant message got created either
expect(mockMessageModel.create).not.toHaveBeenCalled();
});
it('should pass undefined parentId when neither parentId nor parentMessageId is provided', async () => {
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
@ -1382,8 +1415,11 @@ describe('RuntimeExecutors', () => {
expect(result.nextContext!.phase).toBe('tool_result');
});
it('should return undefined parentMessageId if messageModel.create fails', async () => {
// Setup: mock messageModel.create to throw an error
it('should re-throw when messageModel.create fails (LOBE-7158: no silent swallow)', async () => {
// Before LOBE-7158 we silently swallowed this error and returned
// `parentMessageId: undefined`, which let the operation continue into
// the next step and re-hit the same failure without context. The fix
// requires the executor to propagate so the whole step fails.
mockMessageModel.create.mockRejectedValue(new Error('Database error'));
const executors = createRuntimeExecutors(ctx);
@ -1403,11 +1439,53 @@ describe('RuntimeExecutors', () => {
type: 'call_tool' as const,
};
const result = await executors.call_tool!(instruction, state);
await expect(executors.call_tool!(instruction, state)).rejects.toThrow('Database error');
});
// parentMessageId should be undefined when message creation fails
const payload = result.nextContext!.payload as { parentMessageId?: string };
expect(payload.parentMessageId).toBeUndefined();
it('should throw ConversationParentMissing on a parent_id FK violation (LOBE-7158)', async () => {
// Simulate the drizzle + postgres-js wrapped error shape.
const fkError: any = new Error(
'Failed query: insert into "messages" ... violates foreign key constraint',
);
fkError.cause = {
code: '23503',
constraint: 'messages_parent_id_messages_id_fk',
};
mockMessageModel.create.mockRejectedValue(fkError);
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'deleted-parent',
toolCalling: {
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
},
type: 'call_tool' as const,
};
await expect(executors.call_tool!(instruction, state)).rejects.toMatchObject({
errorType: 'ConversationParentMissing',
parentId: 'deleted-parent',
});
// Stream event must carry the normalized error, not raw SQL text —
// clients treat `error` events as terminal and surface data.error
// directly, so leaking driver output here would show up to users.
const errorEventPublishes = mockStreamManager.publishStreamEvent.mock.calls.filter(
([, event]: [string, any]) => event.type === 'error',
);
expect(errorEventPublishes.length).toBeGreaterThan(0);
for (const [, event] of errorEventPublishes) {
expect(event.data.errorType).toBe('ConversationParentMissing');
expect(event.data.error).not.toMatch(/Failed query/);
}
});
it('should retry tool execution when kind is retry and eventually succeed', async () => {
@ -1858,8 +1936,11 @@ describe('RuntimeExecutors', () => {
expect(result.nextContext!.phase).toBe('tools_batch_result');
});
it('should fallback to original parentMessageId if no tool messages created', async () => {
// All tool message creations fail
it('should propagate persist failures instead of silently falling back (LOBE-7158)', async () => {
// Before LOBE-7158 we fell back to the original parentMessageId here,
// which was itself the deleted parent that caused the failure — so the
// next step would hit the same FK violation with no context. The fix
// requires the batch to short-circuit on persist failure.
mockMessageModel.create.mockRejectedValue(new Error('Database error'));
const executors = createRuntimeExecutors(ctx);
@ -1881,11 +1962,44 @@ describe('RuntimeExecutors', () => {
type: 'call_tools_batch' as const,
};
const result = await executors.call_tools_batch!(instruction, state);
await expect(executors.call_tools_batch!(instruction, state)).rejects.toThrow(
'Database error',
);
});
// Should fallback to original parentMessageId
const payload = result.nextContext!.payload as { parentMessageId?: string };
expect(payload.parentMessageId).toBe('original-parent-123');
it('should throw ConversationParentMissing on a parent_id FK violation (LOBE-7158)', async () => {
const fkError: any = new Error(
'Failed query: insert into "messages" ... violates foreign key constraint',
);
fkError.cause = {
code: '23503',
constraint: 'messages_parent_id_messages_id_fk',
};
mockMessageModel.create.mockRejectedValue(fkError);
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'deleted-parent',
toolsCalling: [
{
apiName: 'search',
arguments: '{}',
id: 'tool-call-1',
identifier: 'web-search',
type: 'default' as const,
},
],
},
type: 'call_tools_batch' as const,
};
await expect(executors.call_tools_batch!(instruction, state)).rejects.toMatchObject({
errorType: 'ConversationParentMissing',
parentId: 'deleted-parent',
});
});
it('should continue processing other tools if one tool execution fails', async () => {
@ -1939,8 +2053,10 @@ describe('RuntimeExecutors', () => {
expect(result.nextContext!.phase).toBe('tools_batch_result');
});
it('should continue if tool message creation fails for one tool', async () => {
// First message creation succeeds, second fails
it('should fail the batch if tool message creation fails for any tool (LOBE-7158)', async () => {
// Before LOBE-7158 we swallowed per-tool persist failures and kept
// going. The fix requires the batch to abort — a FK violation on one
// tool means every concurrent tool has the same doomed parent.
mockMessageModel.create
.mockResolvedValueOnce({ id: 'tool-msg-1' })
.mockRejectedValueOnce(new Error('Database error'));
@ -1971,17 +2087,9 @@ describe('RuntimeExecutors', () => {
type: 'call_tools_batch' as const,
};
const result = await executors.call_tools_batch!(instruction, state);
// Both tools should be executed
expect(mockToolExecutionService.executeTool).toHaveBeenCalledTimes(2);
// Should still return result
expect(result.nextContext).toBeDefined();
// parentMessageId should be the first successful tool message
const payload = result.nextContext!.payload as { parentMessageId?: string };
expect(payload.parentMessageId).toBe('tool-msg-1');
await expect(executors.call_tools_batch!(instruction, state)).rejects.toThrow(
'Database error',
);
});
it('should publish tool_start and tool_end events for each tool', async () => {
@ -2687,8 +2795,10 @@ describe('RuntimeExecutors', () => {
});
});
it('should continue processing remaining tools if one fails to create', async () => {
// Mock: first call succeeds, second call fails
it('should propagate persist failures instead of silently swallowing (LOBE-7158)', async () => {
// The pre-LOBE-7158 behavior logged the error and kept walking the
// aborted-tool list. That left a half-persisted state and hid the real
// cause from ops. Now we fail fast.
mockMessageModel.create
.mockResolvedValueOnce({ id: 'tool-msg-1' })
.mockRejectedValueOnce(new Error('Database error'));
@ -2719,18 +2829,9 @@ describe('RuntimeExecutors', () => {
type: 'resolve_aborted_tools' as const,
};
const result = await executors.resolve_aborted_tools!(instruction, state);
// Should still complete and emit done event
expect(result.newState.status).toBe('done');
expect(result.events).toContainEqual(
expect.objectContaining({
type: 'done',
}),
await expect(executors.resolve_aborted_tools!(instruction, state)).rejects.toThrow(
'Database error',
);
// Only the first tool message should be added to state
expect(result.newState.messages).toHaveLength(1);
});
});

View file

@ -0,0 +1,73 @@
import { describe, expect, it } from 'vitest';
import {
createConversationParentMissingError,
isParentMessageMissingError,
isPersistFatal,
markPersistFatal,
} from '../messagePersistErrors';
describe('isParentMessageMissingError', () => {
it('matches the drizzle + postgres-js error shape (FK via .cause)', () => {
const error: any = new Error('Failed query: insert into messages ...');
error.cause = { code: '23503', constraint: 'messages_parent_id_messages_id_fk' };
expect(isParentMessageMissingError(error)).toBe(true);
});
it('matches top-level code/constraint_name variants', () => {
const error: any = new Error('x');
error.code = '23503';
error.constraint_name = 'messages_parent_id_messages_id_fk';
expect(isParentMessageMissingError(error)).toBe(true);
});
it('does not match other FK violations (different constraint)', () => {
const error: any = new Error('x');
error.cause = { code: '23503', constraint: 'messages_topic_id_topics_id_fk' };
expect(isParentMessageMissingError(error)).toBe(false);
});
it('does not match non-FK pg errors', () => {
const error: any = new Error('x');
error.cause = { code: '23505', constraint: 'messages_parent_id_messages_id_fk' };
expect(isParentMessageMissingError(error)).toBe(false);
});
it('handles null / non-object safely', () => {
expect(isParentMessageMissingError(null)).toBe(false);
expect(isParentMessageMissingError(undefined)).toBe(false);
expect(isParentMessageMissingError('string-error')).toBe(false);
expect(isParentMessageMissingError(42)).toBe(false);
});
});
describe('createConversationParentMissingError', () => {
it('carries errorType and parentId so downstream handlers can identify it', () => {
const err: any = createConversationParentMissingError('msg_abc');
expect(err).toBeInstanceOf(Error);
expect(err.errorType).toBe('ConversationParentMissing');
expect(err.parentId).toBe('msg_abc');
expect(err.message).toContain('msg_abc');
});
it('keeps the original FK error as cause for diagnostics', () => {
const cause = { code: '23503' };
const err: any = createConversationParentMissingError('msg_abc', cause);
expect(err.cause).toBe(cause);
});
});
describe('persist-fatal marker', () => {
it('round-trips through mark / is helpers', () => {
const err = new Error('boom');
expect(isPersistFatal(err)).toBe(false);
markPersistFatal(err);
expect(isPersistFatal(err)).toBe(true);
});
it('returns false for non-object values', () => {
expect(isPersistFatal(null)).toBe(false);
expect(isPersistFatal('boom')).toBe(false);
expect(isPersistFatal(undefined)).toBe(false);
});
});

View file

@ -0,0 +1,73 @@
import { AgentRuntimeErrorType } from '@lobechat/types';
/**
* Postgres error code for `foreign_key_violation`.
*
* @see https://www.postgresql.org/docs/current/errcodes-appendix.html
*/
const PG_FOREIGN_KEY_VIOLATION = '23503';
/**
* Constraint name drizzle generates for the `messages.parent_id` self-FK.
* Hard-coded because we only use it as a signature no need to reflect over
* the schema at runtime.
*/
const MESSAGES_PARENT_FK_CONSTRAINT = 'messages_parent_id_messages_id_fk';
/**
* Internal property the runtime uses to mark a thrown error as coming from
* the persist path (inside a `Promise.all` mapper that has its own outer
* catch). The outer catch re-throws anything carrying this marker so the
* whole batch short-circuits.
*/
export const PERSIST_FATAL_MARKER = 'persistFatal';
/**
* Detect whether an error returned by `messageModel.create` is a `parent_id`
* FK violation meaning the parent message no longer exists. Most commonly
* caused by the parent being deleted concurrently with agent execution
* (see LOBE-7154 / LOBE-7158).
*
* `drizzle` + `postgres-js` wrap the raw PG error as `.cause`, so the check
* looks at both the top level and the cause.
*/
export const isParentMessageMissingError = (error: unknown): boolean => {
if (!error || typeof error !== 'object') return false;
const err = error as any;
const code = err?.code ?? err?.cause?.code;
const constraint =
err?.constraint_name ??
err?.constraint ??
err?.cause?.constraint_name ??
err?.cause?.constraint;
return code === PG_FOREIGN_KEY_VIOLATION && constraint === MESSAGES_PARENT_FK_CONSTRAINT;
};
/**
* Build a structured `ConversationParentMissing` error that downstream layers
* (stream events, error classifiers, frontend) can identify and render with
* an actionable message instead of a raw SQL error.
*/
export const createConversationParentMissingError = (parentId: string, cause?: unknown) => {
const error = new Error(
`Conversation parent message ${parentId} no longer exists. It was likely deleted while the operation was running.`,
);
(error as any).errorType = AgentRuntimeErrorType.ConversationParentMissing;
(error as any).parentId = parentId;
if (cause !== undefined) (error as any).cause = cause;
return error;
};
/**
* Tag an error so the outer `Promise.all` catch propagates it instead of
* bundling it into `events` as a per-tool failure.
*/
export const markPersistFatal = <T>(error: T): T => {
if (error && typeof error === 'object') {
(error as any)[PERSIST_FATAL_MARKER] = true;
}
return error;
};
export const isPersistFatal = (error: unknown): boolean =>
!!error && typeof error === 'object' && !!(error as any)[PERSIST_FATAL_MARKER];