🔨 chore(agent-runtime): add ToolResultWaiter for BLPOP-based tool result await (#13763)

*  feat(agent-runtime): add ToolResultWaiter for Redis BLPOP-based tool result await

Introduce ToolResultWaiter — a Promise-based wrapper around Redis BLPOP
that server-side agent loops will use to block-await client-side tool
execution results delivered via the callback API (LPUSH on another
connection).

Design highlights:
- Takes two ioredis clients: a dedicated blocking connection for BLPOP
  (must not be shared with business traffic) and a normal producing
  connection for side effects (cancel sentinel).
- `waitForResult(id, timeoutMs)` returns the parsed payload or null on
  timeout / cancel, never throws for timeout (caller decides fallback).
- `waitForResults(ids[], timeoutMs)` fans out via Promise.all, aligning
  results with input order.
- `cancel(id)` LPUSHes a poison-pill sentinel to wake a pending waiter,
  used when the agent loop is terminated mid-tool.

Covered by unit tests (6 cases: push-before / push-after / timeout /
batch / cancel / malformed payload).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix(agent-runtime): use multi-key BLPOP in waitForResults to avoid N×timeout latency

Promise.all-ing waitForResult over a shared blocking Redis connection
actually serializes: BLPOP holds the socket, so calls run back-to-back
rather than concurrently. A batch of N where some results never arrive
would take up to N × timeoutMs to resolve, stalling tool-call loops
and delaying cancellation.

Rewrite waitForResults to use Redis's multi-key BLPOP in a loop with a
shared deadline: each iteration blocks on all remaining keys with the
remaining budget, wakes when any one arrives, drops that key, and
re-enters with the rest. Total latency is bounded by one timeoutMs
regardless of N. Single-key waitForResult now delegates to this path.

Covered by a new regression test asserting that an N=3 batch of
never-arriving keys completes in ~1 timeout window, not N×.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu 2026-04-13 10:47:49 +08:00 committed by GitHub
parent c60563fffc
commit eff527de65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 327 additions and 0 deletions

View file

@ -0,0 +1,125 @@
import debug from 'debug';
import type { Redis } from 'ioredis';
const log = debug('lobe-server:agent-runtime:tool-result-waiter');
export interface ToolResultPayload {
content: string | null;
error?: {
message: string;
type?: string;
};
success: boolean;
toolCallId: string;
}
const CANCEL_SENTINEL = '__tool_result_cancelled__';
const resultKey = (toolCallId: string) => `tool_result:${toolCallId}`;
/**
* Block-awaits tool results that arrive via Redis LPUSH (from the tool-result
* callback API). Wraps Redis BLPOP with Promise semantics + cancellation.
*
* The constructor expects a dedicated blocking Redis connection (use
* `ioredis.duplicate()`); BLPOP blocks the underlying socket so it must not
* share a connection with business traffic, and it must not be used by more
* than one waiter at the same time.
*/
export class ToolResultWaiter {
private readonly blockingClient: Redis;
private readonly producingClient: Redis;
/**
* @param blockingClient Dedicated connection used exclusively for BLPOP.
* @param producingClient Connection used for LPUSH side effects (e.g.
* `cancel`). Typically the shared agent runtime client.
*/
constructor(blockingClient: Redis, producingClient: Redis) {
this.blockingClient = blockingClient;
this.producingClient = producingClient;
}
/**
* Wait for a single tool result.
*
* @returns The parsed payload, or `null` on timeout/cancel.
*/
async waitForResult(toolCallId: string, timeoutMs: number): Promise<ToolResultPayload | null> {
const [result] = await this.waitForResults([toolCallId], timeoutMs);
return result ?? null;
}
/**
* Wait for a batch of tool results sharing a single blocking connection.
*
* Uses Redis's multi-key BLPOP (`BLPOP key1 key2 ... timeout`) in a loop
* with a shared deadline, so total wait is bounded by `timeoutMs` rather
* than `N * timeoutMs`. Results are aligned with the input order; slots
* that time out or receive a cancel sentinel are `null`.
*/
async waitForResults(
toolCallIds: string[],
timeoutMs: number,
): Promise<Array<ToolResultPayload | null>> {
if (toolCallIds.length === 0) return [];
const idByKey = new Map<string, string>();
for (const id of toolCallIds) idByKey.set(resultKey(id), id);
const results = new Map<string, ToolResultPayload | null>();
const pendingKeys = new Set(idByKey.keys());
const deadline = Date.now() + timeoutMs;
while (pendingKeys.size > 0) {
const remainingMs = deadline - Date.now();
if (remainingMs <= 0) break;
const timeoutSeconds = Math.max(1, Math.ceil(remainingMs / 1000));
const keys = [...pendingKeys];
log('BLPOP multi %o timeout=%ds', keys, timeoutSeconds);
// ioredis variadic signature: blpop(key1, key2, ..., timeoutSeconds).
const popped = (await (
this.blockingClient.blpop as unknown as (
...args: (string | number)[]
) => Promise<[string, string] | null>
)(...keys, timeoutSeconds)) as [string, string] | null;
if (!popped) {
log('BLPOP multi timed out with %d key(s) remaining', pendingKeys.size);
break;
}
const [key, raw] = popped;
const id = idByKey.get(key);
if (!id) continue; // Defensive: unexpected key, skip
pendingKeys.delete(key);
if (raw === CANCEL_SENTINEL) {
log('BLPOP %s cancelled', key);
results.set(id, null);
continue;
}
try {
results.set(id, JSON.parse(raw) as ToolResultPayload);
} catch (error) {
log('Failed to parse tool result for %s: %O', id, error);
results.set(id, null);
}
}
return toolCallIds.map((id) => results.get(id) ?? null);
}
/**
* Cancel a pending waiter by LPUSHing a poison-pill so the BLPOP wakes up.
* Safe to call even if no waiter is active the sentinel will expire.
*/
async cancel(toolCallId: string): Promise<void> {
const key = resultKey(toolCallId);
await this.producingClient.pipeline().lpush(key, CANCEL_SENTINEL).expire(key, 60).exec();
log('Cancel sentinel pushed to %s', key);
}
}

View file

@ -0,0 +1,202 @@
import type { Redis } from 'ioredis';
import { describe, expect, it, vi } from 'vitest';
import type { ToolResultPayload } from '../ToolResultWaiter';
import { ToolResultWaiter } from '../ToolResultWaiter';
const tick = () => new Promise((resolve) => setTimeout(resolve, 0));
/**
* Minimal in-memory Redis stub that supports the subset used by ToolResultWaiter:
* - multi-key `blpop(key1, key2, ..., timeoutSeconds)` on the blocking client,
* - `pipeline().lpush().expire().exec()` on the producer.
*
* `blpop` resolves immediately if any of the passed keys has a value;
* otherwise it registers a single multi-key waiter and sleeps with real
* `setTimeout`. `lpush` wakes the first waiter that is interested in the key.
*/
function createMockRedisPair() {
const lists = new Map<string, string[]>();
const waiters: Array<{
keys: string[];
wake: (key: string, value: string) => void;
}> = [];
const tryDeliverFromLists = () => {
for (let i = 0; i < waiters.length; i++) {
const w = waiters[i];
for (const key of w.keys) {
const list = lists.get(key);
if (list && list.length > 0) {
const value = list.pop()!;
waiters.splice(i, 1);
w.wake(key, value);
return true;
}
}
}
return false;
};
const lpush = (key: string, ...values: string[]): number => {
const list = lists.get(key) ?? [];
list.unshift(...values);
lists.set(key, list);
tryDeliverFromLists();
return list.length;
};
const blockingClient = {
blpop: vi.fn(async (...args: (string | number)[]) => {
const timeoutSeconds = args.at(-1) as number;
const keys = args.slice(0, -1) as string[];
for (const key of keys) {
const list = lists.get(key);
if (list && list.length > 0) {
const value = list.pop()!;
return [key, value] as [string, string];
}
}
return new Promise<[string, string] | null>((resolve) => {
const w = {
keys,
wake: (key: string, value: string) => {
clearTimeout(timer);
resolve([key, value]);
},
};
const timer = setTimeout(() => {
const idx = waiters.indexOf(w);
if (idx >= 0) waiters.splice(idx, 1);
resolve(null);
}, timeoutSeconds * 1000);
waiters.push(w);
});
}),
} as unknown as Redis;
const producingClient = {
pipeline: vi.fn(() => {
const ops: Array<() => void> = [];
const chain: any = {
exec: async () => {
ops.forEach((op) => op());
return [];
},
expire: (_key: string, _seconds: number) => chain,
lpush: (key: string, value: string) => {
ops.push(() => lpush(key, value));
return chain;
},
};
return chain;
}),
} as unknown as Redis;
return { blockingClient, lpush, producingClient };
}
describe('ToolResultWaiter', () => {
it('returns the parsed payload when a result is LPUSHed before BLPOP', async () => {
const { blockingClient, lpush, producingClient } = createMockRedisPair();
const payload: ToolResultPayload = {
content: 'hello',
success: true,
toolCallId: 'call-1',
};
lpush('tool_result:call-1', JSON.stringify(payload));
const waiter = new ToolResultWaiter(blockingClient, producingClient);
const result = await waiter.waitForResult('call-1', 5000);
expect(result).toEqual(payload);
});
it('returns the parsed payload when LPUSHed after BLPOP starts waiting', async () => {
const { blockingClient, lpush, producingClient } = createMockRedisPair();
const waiter = new ToolResultWaiter(blockingClient, producingClient);
const payload: ToolResultPayload = {
content: 'delayed',
success: true,
toolCallId: 'call-2',
};
const pending = waiter.waitForResult('call-2', 5000);
await tick();
lpush('tool_result:call-2', JSON.stringify(payload));
await expect(pending).resolves.toEqual(payload);
});
it('returns null on timeout', async () => {
const { blockingClient, producingClient } = createMockRedisPair();
const waiter = new ToolResultWaiter(blockingClient, producingClient);
const result = await waiter.waitForResult('call-timeout', 50);
expect(result).toBeNull();
});
it('waitForResults aligns with input order and fills timeouts with null', async () => {
const { blockingClient, lpush, producingClient } = createMockRedisPair();
const waiter = new ToolResultWaiter(blockingClient, producingClient);
lpush('tool_result:a', JSON.stringify({ content: 'A', success: true, toolCallId: 'a' }));
lpush('tool_result:c', JSON.stringify({ content: 'C', success: true, toolCallId: 'c' }));
const results = await waiter.waitForResults(['a', 'b', 'c'], 50);
expect(results[0]?.content).toBe('A');
expect(results[1]).toBeNull();
expect(results[2]?.content).toBe('C');
});
it('waitForResults uses multi-key BLPOP (total latency ≈ one timeout, not N × timeout)', async () => {
const { blockingClient, producingClient } = createMockRedisPair();
const waiter = new ToolResultWaiter(blockingClient, producingClient);
// None of the keys ever receive a value. In the old serial impl this
// would take ~3s (1s clamp × 3 keys). The multi-key loop should finish
// in roughly one clamped-timeout window.
const start = Date.now();
const results = await waiter.waitForResults(['x', 'y', 'z'], 50);
const elapsed = Date.now() - start;
expect(results).toEqual([null, null, null]);
expect(elapsed).toBeLessThan(1500);
});
it('waitForResults wakes as results arrive and re-enters BLPOP with remaining keys', async () => {
const { blockingClient, lpush, producingClient } = createMockRedisPair();
const waiter = new ToolResultWaiter(blockingClient, producingClient);
const pending = waiter.waitForResults(['a', 'b'], 5000);
await tick();
lpush('tool_result:a', JSON.stringify({ content: 'A', success: true, toolCallId: 'a' }));
await tick();
lpush('tool_result:b', JSON.stringify({ content: 'B', success: true, toolCallId: 'b' }));
const results = await pending;
expect(results[0]?.content).toBe('A');
expect(results[1]?.content).toBe('B');
});
it('cancel() wakes a blocked BLPOP and returns null', async () => {
const { blockingClient, producingClient } = createMockRedisPair();
const waiter = new ToolResultWaiter(blockingClient, producingClient);
const pending = waiter.waitForResult('call-cancel', 5000);
await tick();
await waiter.cancel('call-cancel');
await expect(pending).resolves.toBeNull();
});
it('returns null when the stored value is not valid JSON', async () => {
const { blockingClient, lpush, producingClient } = createMockRedisPair();
lpush('tool_result:bad', 'not-json');
const waiter = new ToolResultWaiter(blockingClient, producingClient);
const result = await waiter.waitForResult('bad', 5000);
expect(result).toBeNull();
});
});