perf(core): make listBookSessions read files concurrently and skip Zod parse

listBookSessions 原来串行读所有 session 文件 + 对每个文件做完整的
BookSessionSchema 解析(包括 messages/events/toolExecutions 等大字段),
一本书的会话列表查询耗时 3-5 秒。

改为:
- Promise.all 并发读取所有文件
- 只做 JSON.parse,手动 cherry-pick metadata 字段,不做 Zod 验证
- 返回新的轻量类型 BookSessionSummary(含 messageCount 代替完整 messages)

server.ts 的 GET /sessions 端点顺带简化,不再手工映射字段。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
fanghanjun 2026-04-17 01:22:23 -07:00 committed by Ma
parent 10eb42dc20
commit fd32b9b8df
2 changed files with 245 additions and 47 deletions

View file

@ -1,4 +1,4 @@
import { readFile, writeFile, readdir, mkdir } from "node:fs/promises";
import { readFile, writeFile, readdir, mkdir, unlink } from "node:fs/promises";
import { join } from "node:path";
import { BookSessionSchema, createBookSession } from "./session.js";
import type { BookSession } from "./session.js";
@ -13,6 +13,13 @@ function sessionPath(projectRoot: string, sessionId: string): string {
return join(sessionsDir(projectRoot), `${sessionId}.json`);
}
export class SessionAlreadyMigratedError extends Error {
constructor(sessionId: string, currentBookId: string) {
super(`Session "${sessionId}" is already bound to book "${currentBookId}"`);
this.name = "SessionAlreadyMigratedError";
}
}
export async function loadBookSession(
projectRoot: string,
sessionId: string,
@ -37,10 +44,19 @@ export async function persistBookSession(
);
}
export interface BookSessionSummary {
readonly sessionId: string;
readonly bookId: string | null;
readonly title: string | null;
readonly messageCount: number;
readonly createdAt: number;
readonly updatedAt: number;
}
export async function listBookSessions(
projectRoot: string,
bookId: string | null,
): Promise<ReadonlyArray<BookSession>> {
): Promise<ReadonlyArray<BookSessionSummary>> {
const dir = sessionsDir(projectRoot);
let files: string[];
try {
@ -49,29 +65,102 @@ export async function listBookSessions(
return [];
}
const sessions: BookSession[] = [];
for (const file of files) {
if (!file.endsWith(".json")) continue;
try {
const raw = await readFile(join(dir, file), "utf-8");
const session = BookSessionSchema.parse(JSON.parse(raw));
if (session.bookId === bookId) {
sessions.push(session);
const jsonFiles = files.filter((file) => file.endsWith(".json"));
const summaries = await Promise.all(
jsonFiles.map(async (file): Promise<BookSessionSummary | null> => {
try {
const raw = await readFile(join(dir, file), "utf-8");
const data = JSON.parse(raw) as {
sessionId?: unknown;
bookId?: unknown;
title?: unknown;
messages?: unknown;
createdAt?: unknown;
updatedAt?: unknown;
};
if (typeof data.sessionId !== "string") return null;
const parsedBookId = data.bookId === null || typeof data.bookId === "string"
? (data.bookId as string | null)
: null;
if (parsedBookId !== bookId) return null;
return {
sessionId: data.sessionId,
bookId: parsedBookId,
title: typeof data.title === "string" ? data.title : null,
messageCount: Array.isArray(data.messages) ? data.messages.length : 0,
createdAt: typeof data.createdAt === "number" ? data.createdAt : 0,
updatedAt: typeof data.updatedAt === "number" ? data.updatedAt : 0,
};
} catch {
return null;
}
} catch {
// skip corrupt files
}
}
}),
);
return sessions.sort((a, b) => b.updatedAt - a.updatedAt);
return summaries
.filter((summary): summary is BookSessionSummary => summary !== null)
.sort((a, b) => b.updatedAt - a.updatedAt);
}
export async function findOrCreateBookSession(
export async function renameBookSession(
projectRoot: string,
sessionId: string,
title: string,
): Promise<BookSession | null> {
const session = await loadBookSession(projectRoot, sessionId);
if (!session) return null;
const updated = { ...session, title, updatedAt: Date.now() };
await persistBookSession(projectRoot, updated);
return updated;
}
export async function updateSessionTitle(
projectRoot: string,
sessionId: string,
title: string,
): Promise<BookSession | null> {
const session = await loadBookSession(projectRoot, sessionId);
if (!session || session.title !== null) return session;
const updated = { ...session, title, updatedAt: Date.now() };
await persistBookSession(projectRoot, updated);
return updated;
}
export async function deleteBookSession(
projectRoot: string,
sessionId: string,
): Promise<void> {
try {
await unlink(sessionPath(projectRoot, sessionId));
} catch {
// Session file is already absent; treat delete as idempotent.
}
}
export async function migrateBookSession(
projectRoot: string,
sessionId: string,
newBookId: string,
): Promise<BookSession | null> {
const session = await loadBookSession(projectRoot, sessionId);
if (!session) return null;
if (session.bookId !== null) {
throw new SessionAlreadyMigratedError(sessionId, session.bookId);
}
const updated = {
...session,
bookId: newBookId,
updatedAt: Date.now(),
};
await persistBookSession(projectRoot, updated);
return updated;
}
export async function createAndPersistBookSession(
projectRoot: string,
bookId: string | null,
): Promise<BookSession> {
const existing = await listBookSessions(projectRoot, bookId);
if (existing.length > 0) return existing[0];
const session = createBookSession(bookId);
await persistBookSession(projectRoot, session);
return session;

View file

@ -13,11 +13,16 @@ import {
loadProjectSession,
processProjectInteractionRequest,
resolveSessionActiveBook,
findOrCreateBookSession,
listBookSessions,
loadBookSession,
persistBookSession,
appendBookSessionMessage,
createAndPersistBookSession,
renameBookSession,
updateSessionTitle,
deleteBookSession,
migrateBookSession,
SessionAlreadyMigratedError,
runAgentSession,
buildAgentSystemPrompt,
resolveServicePreset,
@ -36,7 +41,6 @@ import {
type ProjectConfig,
type LogSink,
type LogEntry,
type BookSession,
} from "@actalk/inkos-core";
import { access, readFile, readdir, writeFile } from "node:fs/promises";
import { join } from "node:path";
@ -99,6 +103,15 @@ function extractToolError(result: unknown): string {
return String(result).slice(0, 500);
}
function normalizeGeneratedSessionTitle(raw: string): string | null {
const compact = raw
.replace(/[\r\n]+/g, " ")
.replace(/[“”"「」]/g, "")
.trim()
.slice(0, 20);
return compact.length > 0 ? compact : null;
}
interface CollectedToolExec {
id: string;
tool: string;
@ -569,10 +582,23 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
async function buildPipelineConfig(
overrides?: Partial<Pick<PipelineConfig, "externalContext" | "client" | "model">> & {
readonly currentConfig?: ProjectConfig;
readonly sessionIdForSSE?: string;
},
): Promise<PipelineConfig> {
const currentConfig = overrides?.currentConfig ?? await loadCurrentProjectConfig();
const logger = createLogger({ tag: "studio", sinks: [sseSink, consoleSink] });
const scopedSseSink: LogSink = overrides?.sessionIdForSSE
? {
write(entry) {
broadcast("log", {
sessionId: overrides.sessionIdForSSE,
level: entry.level,
tag: entry.tag,
message: entry.message,
});
},
}
: sseSink;
const logger = createLogger({ tag: "studio", sinks: [scopedSseSink, consoleSink] });
return {
client: overrides?.client ?? createLLMClient(currentConfig.llm),
model: overrides?.model ?? currentConfig.llm.model,
@ -583,6 +609,7 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
logger,
onStreamProgress: (progress) => {
broadcast("llm:progress", {
...(overrides?.sessionIdForSSE ? { sessionId: overrides.sessionIdForSSE } : {}),
status: progress.status,
elapsedMs: progress.elapsedMs,
totalChars: progress.totalChars,
@ -1239,13 +1266,7 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
app.get("/api/v1/sessions", async (c) => {
const bookId = c.req.query("bookId");
const sessions = await listBookSessions(root, bookId === undefined ? null : bookId === "null" ? null : bookId);
return c.json({ sessions: sessions.map((s) => ({
sessionId: s.sessionId,
bookId: s.bookId,
messageCount: s.messages.length,
createdAt: s.createdAt,
updatedAt: s.updatedAt,
})) });
return c.json({ sessions });
});
app.get("/api/v1/sessions/:sessionId", async (c) => {
@ -1257,10 +1278,30 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
app.post("/api/v1/sessions", async (c) => {
const body = await c.req.json<{ bookId?: string | null }>().catch(() => ({}));
const bookId = (body as { bookId?: string | null }).bookId ?? null;
const session = await findOrCreateBookSession(root, bookId);
const session = await createAndPersistBookSession(root, bookId);
return c.json({ session });
});
app.put("/api/v1/sessions/:sessionId", async (c) => {
const sessionId = c.req.param("sessionId");
const body = await c.req.json<{ title?: string }>().catch(() => ({}) as { title?: string });
const title = body.title?.trim();
if (!title) {
throw new ApiError(400, "INVALID_SESSION_TITLE", "Session title is required");
}
const session = await renameBookSession(root, sessionId, title);
if (!session) {
return c.json({ error: "Session not found" }, 404);
}
return c.json({ session });
});
app.delete("/api/v1/sessions/:sessionId", async (c) => {
await deleteBookSession(root, c.req.param("sessionId"));
return c.json({ ok: true });
});
app.post("/api/v1/agent", async (c) => {
const { instruction, activeBookId, sessionId: reqSessionId, model: reqModel, service: reqService } = await c.req.json<{
instruction: string;
@ -1273,23 +1314,62 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
if (!instruction?.trim()) {
return c.json({ error: "No instruction provided" }, 400);
}
if (!sessionId?.trim()) {
throw new ApiError(400, "SESSION_ID_REQUIRED", "sessionId is required");
}
broadcast("agent:start", { instruction, activeBookId });
broadcast("agent:start", { instruction, activeBookId, sessionId });
try {
// Load config + create LLM client (pipeline created after model resolution)
const config = await loadCurrentProjectConfig({ requireApiKey: false });
const client = createLLMClient(config.llm);
// Resolve or create BookSession for history
let bookSession: BookSession;
if (sessionId) {
bookSession =
(await loadBookSession(root, sessionId)) ??
(await findOrCreateBookSession(root, activeBookId ?? null));
} else {
bookSession = await findOrCreateBookSession(root, activeBookId ?? null);
const loadedBookSession = await loadBookSession(root, sessionId);
if (!loadedBookSession) {
throw new ApiError(404, "SESSION_NOT_FOUND", `Session not found: ${sessionId}`);
}
let bookSession = loadedBookSession;
const streamSessionId = loadedBookSession.sessionId;
const scheduleSessionTitleGeneration = (resolvedSessionId: string) => {
void (async () => {
try {
const freshSession = await loadBookSession(root, resolvedSessionId);
if (!freshSession || freshSession.title !== null) return;
const firstUserMessage = freshSession.messages.find((message) => message.role === "user");
const firstAssistantMessage = freshSession.messages.find((message) => message.role === "assistant");
if (!firstUserMessage || !firstAssistantMessage) return;
const freshConfig = await loadCurrentProjectConfig({ requireApiKey: false });
const titleClient = createLLMClient(freshConfig.llm);
const titleResult = await chatCompletion(
titleClient,
freshConfig.llm.defaultModel ?? freshConfig.llm.model,
[
{
role: "user",
content:
"用 6 个中文字以内概括这段对话的主题,只返回标题文本。\n\n"
+ `用户: ${firstUserMessage.content.slice(0, 200)}\n`
+ `助手: ${firstAssistantMessage.content.slice(0, 200)}`,
},
],
{ maxTokens: 32 },
);
const title = normalizeGeneratedSessionTitle(titleResult.content ?? "");
if (!title) return;
const updated = await updateSessionTitle(root, resolvedSessionId, title);
if (!updated || updated.title !== title) return;
broadcast("session:title", { sessionId: resolvedSessionId, title });
} catch {
// ignore title generation errors and try again after the next exchange
}
})();
};
// Build initial message context from persisted session
const initialMessages = bookSession.messages
@ -1401,6 +1481,7 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
client: pipelineClient,
model: reqModel ?? config.llm.model,
currentConfig: config,
sessionIdForSSE: bookSession.sessionId,
}));
// Run pi-agent session
@ -1418,13 +1499,13 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
if (event.type === "message_update") {
const ame = event.assistantMessageEvent;
if (ame.type === "text_delta") {
broadcast("draft:delta", { text: ame.delta });
broadcast("draft:delta", { sessionId: streamSessionId, text: ame.delta });
} else if (ame.type === "thinking_delta") {
broadcast("thinking:delta", { text: (ame as any).delta });
broadcast("thinking:delta", { sessionId: streamSessionId, text: (ame as any).delta });
} else if (ame.type === "thinking_start") {
broadcast("thinking:start", {});
broadcast("thinking:start", { sessionId: streamSessionId });
} else if (ame.type === "thinking_end") {
broadcast("thinking:end", {});
broadcast("thinking:end", { sessionId: streamSessionId });
}
}
if (event.type === "tool_execution_start") {
@ -1446,6 +1527,7 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
});
broadcast("tool:start", {
sessionId: streamSessionId,
id: event.toolCallId,
tool: event.toolName,
args,
@ -1453,7 +1535,11 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
});
}
if (event.type === "tool_execution_update") {
broadcast("tool:update", { tool: event.toolName, partialResult: event.partialResult });
broadcast("tool:update", {
sessionId: streamSessionId,
tool: event.toolName,
partialResult: event.partialResult,
});
}
if (event.type === "tool_execution_end") {
const exec = collectedToolExecs.find(t => t.id === event.toolCallId);
@ -1465,6 +1551,7 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
else exec.result = summarizeResult(event.result);
}
broadcast("tool:end", {
sessionId: streamSessionId,
id: event.toolCallId,
tool: event.toolName,
result: event.result,
@ -1521,6 +1608,7 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
timestamp: Date.now() + 1,
});
await persistBookSession(root, bookSession);
scheduleSessionTitleGeneration(bookSession.sessionId);
return c.json({
response: fallback.content,
session: { sessionId: bookSession.sessionId },
@ -1562,7 +1650,8 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
}
await persistBookSession(root, bookSession);
broadcast("agent:complete", { instruction, activeBookId });
broadcast("agent:complete", { instruction, activeBookId, sessionId: bookSession.sessionId });
scheduleSessionTitleGeneration(bookSession.sessionId);
// If a sub_agent created a new book during this session, broadcast book:created
// so the sidebar refreshes.
@ -1570,17 +1659,37 @@ export function createStudioServer(initialConfig: ProjectConfig, root: string) {
const books = await state.listBooks();
const latestBook = books.at(-1);
if (latestBook) {
broadcast("book:created", { bookId: latestBook });
try {
const migratedSession = await migrateBookSession(root, bookSession.sessionId, latestBook);
if (migratedSession) {
bookSession = migratedSession;
}
} catch (e) {
if (!(e instanceof SessionAlreadyMigratedError)) {
throw e;
}
}
broadcast("book:created", { bookId: latestBook, sessionId: bookSession.sessionId });
}
}
return c.json({
response: result.responseText,
session: { sessionId: bookSession.sessionId },
session: {
sessionId: bookSession.sessionId,
...(bookSession.bookId ? { activeBookId: bookSession.bookId } : {}),
},
});
} catch (e) {
if (e instanceof ApiError) {
throw e;
}
if (e instanceof SessionAlreadyMigratedError) {
const migratedMessage = e instanceof Error ? e.message : String(e);
throw new ApiError(409, "SESSION_ALREADY_MIGRATED", migratedMessage);
}
const msg = e instanceof Error ? e.message : String(e);
broadcast("agent:error", { instruction, activeBookId, error: msg });
broadcast("agent:error", { instruction, activeBookId, sessionId, error: msg });
// Agent busy — return 429 with user-friendly message
if (/already processing|prompt.*queue/i.test(msg)) {