mirror of
https://github.com/lobehub/lobehub
synced 2026-04-21 17:47:27 +00:00
🔨 chore: integrate Gateway connection management into chat store (#13636)
* ✨ feat: integrate Gateway connection management into chat store Add GatewayActionImpl to aiChat slice for managing Agent Gateway WebSocket connections per operationId. Includes connect, disconnect, interrupt, and status tracking. Also type the execAgentTask return value. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ✨ feat: add Gateway mode branch in sendMessage for server-side agent execution When agentGatewayUrl is set in server config (enableQueueAgentRuntime), sendMessage now triggers server-side agent execution via execAgentTask and receives events through the Agent Gateway WebSocket, instead of running the agent loop client-side. Includes: - Expose agentGatewayUrl in GlobalServerConfig when queue mode is enabled - Gateway event handler mapping stream events to UI message updates - Fallback to client-side agent loop when Gateway is not configured Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: emit disconnected event on intentional disconnect disconnect() was only calling setStatus('disconnected') but not emitting the 'disconnected' event. This caused the store's cleanup listener to never fire after terminal events (agent_runtime_end), leaving stale connections in gatewayConnections. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ✨ feat: enhance Gateway event handler for multi-step agent streaming Support multi-step agent execution display (LLM → tool calls → next LLM) using hybrid approach: real-time streaming for current step, DB refresh at step transitions. Fixes LOBE-6874 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ✨ feat: wire up Gateway JWT token from execAgent to connectToGateway Pass the RS256 JWT token returned by execAgentTask to connectToGateway for WebSocket authentication. Also use ExecAgentResult from @lobechat/types instead of local duplicate definition. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: handle wss:// protocol in AgentStreamClient buildWsUrl When gatewayUrl already uses ws:// or wss:// protocol, use it directly instead of stripping and re-adding the protocol prefix. Previously, wss://host would become ws://wss://host (double protocol). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: queue gateway events to ensure stream_chunk waits for refreshMessages Use a sequential Promise chain to process gateway events, so that stream_chunk dispatches only run after stream_start's refreshMessages resolves. Previously, chunks arrived before the new assistant message existed in dbMessagesMap, causing updates to be silently dropped. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: pass operationId context to internal_dispatchMessage in gateway handler Without operationId, internal_dispatchMessage falls back to global state to compute the messageMapKey, which may differ from the key where refreshMessages stored the server-created messages. Passing operationId ensures the correct conversation context is resolved. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: resolve gateway streaming display issues - Use fetchAndReplaceMessages (direct DB fetch + replaceMessages) instead of refreshMessages which mutates an orphaned SWR key - Create dedicated execServerAgentRuntime operation with correct topicId context for internal_dispatchMessage to resolve the right messageMapKey - Complete operation on agent_runtime_end instead of relying on onSessionComplete callback - Keep loading state active between steps (only clear on agent_runtime_end) so users don't think the session ended during tool execution gaps Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: maintain loading state across gateway step transitions - Create dedicated execServerAgentRuntime operation with correct topicId - Use fetchAndReplaceMessages instead of orphaned refreshMessages SWR key - Re-apply loading after tool_end refresh so UI stays active between steps - Complete operation on agent_runtime_end - Add record-app-screen.sh for automated screen recording - Output recordings to .records/ (gitignored) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: show loading on assistant message immediately in stream_start Set loading on the current assistant message BEFORE awaiting fetchAndReplaceMessages, so the UI shows a loading indicator while waiting for the DB response instead of appearing frozen. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: drive gateway loading state via operation system instead of messageLoadingIds Associate the assistant message with the gateway operation via associateMessageWithOperation so the Conversation store's operation-based loading detection (isGenerating) works correctly. This shows the proper loading skeleton on the assistant message while waiting for gateway events. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ♻️ refactor: remove unused internal_toggleMessageLoading from gateway handler Loading state is now fully driven by the operation system via associateMessageWithOperation + completeOperation. The old messageLoadingIds-based approach is no longer needed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: rewrite record-app-screen.sh to use CDP screenshot assembly Replace broken ffmpeg avfoundation live recording (corrupts on kill) with agent-browser CDP screenshot capture + ffmpeg assembly on stop. This works reliably on any screen including external monitors. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ✨ feat: add Gateway Mode lab toggle and fix CI type error - Add enableGatewayMode to UserLabSchema as experimental feature - Add lab selector and settings UI toggle in Advanced > Labs - Gateway mode now requires both server config (agentGatewayUrl) AND user opt-in via Labs toggle - Fix TS2322: result.token (string | undefined) → fallback to '' - Add i18n keys for gateway mode feature Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ✨ feat: hide Gateway Mode toggle when agentGatewayUrl is not configured Only show the lab toggle when the server has AGENT_GATEWAY_URL set, so users without gateway infrastructure don't see the option. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 💄 style: move Gateway Mode toggle below Input Markdown in labs section Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: remove default AGENT_GATEWAY_URL value and make schema optional Without an explicit env var, the gateway URL should be undefined so the lab toggle and gateway mode are not available. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 📝 docs: update SKILL.md to reference record-app-screen.sh Replace outdated record-gateway-demo.sh references with the renamed record-app-screen.sh and its start/stop lifecycle documentation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 📝 docs: add record-app-screen reference doc and slim down SKILL.md Move detailed recording documentation to references/record-app-screen.md and keep SKILL.md concise with a link to the full reference. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * 🐛 fix: guard GatewayStreamNotifier with AGENT_GATEWAY_URL check AGENT_GATEWAY_URL is now optional, so check both URL and service token before wrapping with GatewayStreamNotifier to avoid TS2345. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ♻️ refactor: extract gateway execution logic to GatewayActionImpl Move server-side gateway execution logic from conversationLifecycle.ts into GatewayActionImpl.startGatewayExecution(). The sendMessage flow now does a simple early return when gateway mode is active, keeping the existing client-mode code path untouched. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ♻️ refactor: split gateway into isGatewayModeEnabled check + executeGatewayAgent Replace fire-and-forget startGatewayExecution with explicit check/execute pattern. Caller does: if (check) { await execute(); return; } — giving proper error handling and clearer control flow. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
84eff30be1
commit
3e7ee1fbfc
23 changed files with 1514 additions and 26 deletions
|
|
@ -1006,6 +1006,7 @@ Ready-to-use scripts in `.agents/skills/local-testing/scripts/`:
|
|||
| `electron-dev.sh` | Manage Electron dev env (start/stop/status/restart) |
|
||||
| `capture-app-window.sh` | Capture screenshot of a specific app window |
|
||||
| `record-electron-demo.sh` | Record Electron app demo with ffmpeg |
|
||||
| `record-app-screen.sh` | Record app screen (video + screenshots, start/stop) |
|
||||
| `test-discord-bot.sh` | Send message to Discord bot via osascript |
|
||||
| `test-slack-bot.sh` | Send message to Slack bot via osascript |
|
||||
| `test-telegram-bot.sh` | Send message to Telegram bot via osascript |
|
||||
|
|
@ -1068,25 +1069,16 @@ Each script: activates the app, navigates to the channel/contact, pastes the mes
|
|||
|
||||
# Screen Recording
|
||||
|
||||
Record automated demos by combining `ffmpeg` screen capture with `agent-browser` automation. The script `.agents/skills/local-testing/scripts/record-electron-demo.sh` handles the full lifecycle for Electron.
|
||||
|
||||
### Usage
|
||||
Record automated demos using `record-app-screen.sh` (start/stop lifecycle, CDP screenshots + ffmpeg assembly). See [references/record-app-screen.md](references/record-app-screen.md) for full documentation.
|
||||
|
||||
```bash
|
||||
# Run the built-in demo (queue-edit feature)
|
||||
./.agents/skills/local-testing/scripts/record-electron-demo.sh
|
||||
|
||||
# Run a custom automation script
|
||||
./.agents/skills/local-testing/scripts/record-electron-demo.sh ./my-demo.sh /tmp/my-demo.mp4
|
||||
./.agents/skills/local-testing/scripts/electron-dev.sh start
|
||||
./.agents/skills/local-testing/scripts/record-app-screen.sh start my-demo
|
||||
# ... run automation ...
|
||||
./.agents/skills/local-testing/scripts/record-app-screen.sh stop
|
||||
```
|
||||
|
||||
The script automatically:
|
||||
|
||||
1. Starts Electron with CDP and waits for SPA to load
|
||||
2. Detects window position, screen, and Retina scale via Swift/CGWindowList
|
||||
3. Records only the Electron window region using `ffmpeg -f avfoundation` with crop
|
||||
4. Runs the demo (built-in or custom script receiving CDP port as `$1`)
|
||||
5. Stops recording and cleans up
|
||||
Outputs to `.records/` directory (gitignored): `<name>.mp4` (video) + `<name>/` (screenshots every 3s).
|
||||
|
||||
---
|
||||
|
||||
|
|
|
|||
142
.agents/skills/local-testing/references/record-app-screen.md
Normal file
142
.agents/skills/local-testing/references/record-app-screen.md
Normal file
|
|
@ -0,0 +1,142 @@
|
|||
# record-app-screen.sh
|
||||
|
||||
General-purpose screen recording tool for the Electron app. Captures CDP screenshots as video frames and gallery snapshots, then assembles into an MP4 on stop.
|
||||
|
||||
## Why CDP Screenshots Instead of ffmpeg Screen Capture
|
||||
|
||||
- **Works on any screen** — CDP screenshots capture the browser viewport directly, so external monitors, Retina scaling, and window positioning are all handled automatically
|
||||
- **No signal handling issues** — ffmpeg-static (npm) produces corrupt MP4 files when killed (missing moov atom). CDP screenshots avoid this entirely
|
||||
- **Consistent output** — Screenshots are resolution-independent and don't require crop coordinate calculations
|
||||
|
||||
## Commands
|
||||
|
||||
```bash
|
||||
# Start recording (Electron must be running with CDP)
|
||||
.agents/skills/local-testing/scripts/record-app-screen.sh start [output_name]
|
||||
|
||||
# Stop recording and assemble video
|
||||
.agents/skills/local-testing/scripts/record-app-screen.sh stop
|
||||
|
||||
# Check if recording is active
|
||||
.agents/skills/local-testing/scripts/record-app-screen.sh status
|
||||
```
|
||||
|
||||
### Arguments
|
||||
|
||||
| Argument | Default | Description |
|
||||
| ------------- | --------------------------- | -------------------------- |
|
||||
| `output_name` | `recording-YYYYMMDD-HHMMSS` | Base name for output files |
|
||||
|
||||
### Environment Variables
|
||||
|
||||
| Variable | Default | Description |
|
||||
| ---------------------- | ------- | -------------------------------------- |
|
||||
| `CDP_PORT` | `9222` | Chrome DevTools Protocol port |
|
||||
| `SCREENSHOT_INTERVAL` | `3` | Seconds between gallery screenshots |
|
||||
| `VIDEO_FRAME_INTERVAL` | `0.5` | Seconds between video frames (\~2 fps) |
|
||||
|
||||
## Output Structure
|
||||
|
||||
```
|
||||
.records/
|
||||
<name>.mp4 # Video assembled from frames (~2 fps)
|
||||
<name>/ # Gallery screenshots (every 3s)
|
||||
0000.png
|
||||
0001.png
|
||||
0002.png
|
||||
...
|
||||
```
|
||||
|
||||
The `.records/` directory is at the project root and is gitignored.
|
||||
|
||||
## How It Works
|
||||
|
||||
### Start
|
||||
|
||||
1. Creates two background loops:
|
||||
- **Video frames** — `agent-browser screenshot` every `VIDEO_FRAME_INTERVAL` seconds into a temp directory (`/tmp/record-frames-XXXXXX/`)
|
||||
- **Gallery screenshots** — `agent-browser screenshot` every `SCREENSHOT_INTERVAL` seconds into `.records/<name>/`
|
||||
2. Saves PIDs and paths to `/tmp/record-app-screen.pids` and `/tmp/record-app-screen.state`
|
||||
|
||||
### Stop
|
||||
|
||||
1. Kills both background loops
|
||||
2. Assembles video frames into MP4 using ffmpeg:
|
||||
```
|
||||
ffmpeg -framerate 2 -i frame_%06d.png -c:v libx264 -crf 23 -pix_fmt yuv420p <output>.mp4
|
||||
```
|
||||
3. Cleans up temp frame directory
|
||||
4. Reports file sizes and paths
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Basic Test Recording
|
||||
|
||||
```bash
|
||||
# Start Electron
|
||||
.agents/skills/local-testing/scripts/electron-dev.sh start
|
||||
|
||||
# Start recording
|
||||
.agents/skills/local-testing/scripts/record-app-screen.sh start my-test
|
||||
|
||||
# Run automation
|
||||
agent-browser --cdp 9222 click @e61
|
||||
agent-browser --cdp 9222 type @e42 "hello"
|
||||
agent-browser --cdp 9222 press Enter
|
||||
sleep 10
|
||||
|
||||
# Stop and get results
|
||||
.agents/skills/local-testing/scripts/record-app-screen.sh stop
|
||||
# → .records/my-test.mp4 + .records/my-test/*.png
|
||||
```
|
||||
|
||||
### Gateway Streaming Demo
|
||||
|
||||
```bash
|
||||
.agents/skills/local-testing/scripts/electron-dev.sh start
|
||||
|
||||
# Inject gateway URL
|
||||
agent-browser --cdp 9222 eval --stdin << 'EOF'
|
||||
(function() {
|
||||
var store = window.global_serverConfigStore;
|
||||
store.setState({ serverConfig: { ...store.getState().serverConfig,
|
||||
agentGatewayUrl: 'https://agent-gateway.lobehub.com' } });
|
||||
return 'ready';
|
||||
})()
|
||||
EOF
|
||||
|
||||
# Record
|
||||
.agents/skills/local-testing/scripts/record-app-screen.sh start gateway-demo
|
||||
|
||||
# Navigate to agent, send message, wait for completion...
|
||||
# (automation commands here)
|
||||
|
||||
.agents/skills/local-testing/scripts/record-app-screen.sh stop
|
||||
open .records/gateway-demo.mp4
|
||||
```
|
||||
|
||||
### Check Active Recording
|
||||
|
||||
```bash
|
||||
.agents/skills/local-testing/scripts/record-app-screen.sh status
|
||||
# [record] Active recording
|
||||
# Frames: 42 captured (running: yes)
|
||||
# Screenshots: 14 captured (running: yes)
|
||||
# Output: .records/my-test.mp4
|
||||
```
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- **ffmpeg** — For video assembly. Install via `bun add -g ffmpeg-static` or `brew install ffmpeg`
|
||||
- **agent-browser** — For CDP screenshots. Install via `npm i -g agent-browser`
|
||||
- **Electron app running** — With CDP enabled (use `electron-dev.sh start`)
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
| Problem | Solution |
|
||||
| ----------------------------------- | ------------------------------------------------------------------------------------------------------------ |
|
||||
| "No active recording found" on stop | PID file was cleaned up. Check if background processes are still running with `ps aux \| grep agent-browser` |
|
||||
| "A recording is already active" | Run `stop` first, or manually clean: `rm /tmp/record-app-screen.pids /tmp/record-app-screen.state` |
|
||||
| Video is 0 bytes | No frames were captured. Ensure Electron is running and CDP port is correct |
|
||||
| Screenshots are blank/white | SPA may not have loaded yet. Wait for `electron-dev.sh` to report "Renderer ready" |
|
||||
| ffmpeg assembly fails | Check `/tmp/ffmpeg-assemble.log`. Ensure ffmpeg is installed and frames exist |
|
||||
189
.agents/skills/local-testing/scripts/record-app-screen.sh
Executable file
189
.agents/skills/local-testing/scripts/record-app-screen.sh
Executable file
|
|
@ -0,0 +1,189 @@
|
|||
#!/usr/bin/env bash
|
||||
#
|
||||
# record-app-screen.sh — Record the Electron app window (video + screenshots)
|
||||
#
|
||||
# Captures screenshots via agent-browser (CDP), then assembles into video on stop.
|
||||
# Works on any screen (including external monitors) since it uses CDP, not screen capture.
|
||||
#
|
||||
# Usage:
|
||||
# ./record-app-screen.sh start [output_name] # Begin recording
|
||||
# ./record-app-screen.sh stop # Stop and save
|
||||
# ./record-app-screen.sh status # Check recording state
|
||||
#
|
||||
# Outputs to .records/ directory:
|
||||
# .records/<name>.mp4 — Video assembled from screenshots (~2 fps)
|
||||
# .records/<name>/ — Screenshots every SCREENSHOT_INTERVAL seconds
|
||||
#
|
||||
# Prerequisites:
|
||||
# - ffmpeg installed (bun add -g ffmpeg-static, or brew install ffmpeg)
|
||||
# - agent-browser CLI installed
|
||||
# - Electron app already running with CDP enabled
|
||||
#
|
||||
# Environment variables:
|
||||
# CDP_PORT — Chrome DevTools Protocol port (default: 9222)
|
||||
# SCREENSHOT_INTERVAL — Seconds between gallery screenshots (default: 3)
|
||||
# VIDEO_FRAME_INTERVAL — Seconds between video frames (default: 0.5)
|
||||
#
|
||||
# Examples:
|
||||
# ./electron-dev.sh start
|
||||
# ./record-app-screen.sh start gateway-demo
|
||||
# # ... run automation via agent-browser ...
|
||||
# ./record-app-screen.sh stop
|
||||
#
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||
PROJECT_DIR="$(cd "$SCRIPT_DIR/../../../.." && pwd)"
|
||||
|
||||
RECORDS_DIR="$PROJECT_DIR/.records"
|
||||
PID_FILE="/tmp/record-app-screen.pids"
|
||||
STATE_FILE="/tmp/record-app-screen.state"
|
||||
|
||||
CDP_PORT="${CDP_PORT:-9222}"
|
||||
SCREENSHOT_INTERVAL="${SCREENSHOT_INTERVAL:-3}"
|
||||
VIDEO_FRAME_INTERVAL="${VIDEO_FRAME_INTERVAL:-0.5}"
|
||||
|
||||
AB="agent-browser --cdp $CDP_PORT"
|
||||
|
||||
# ─── Commands ───
|
||||
|
||||
cmd_start() {
|
||||
local output_name="${1:-recording-$(date +%Y%m%d-%H%M%S)}"
|
||||
local output_video="$RECORDS_DIR/${output_name}.mp4"
|
||||
local screenshot_dir="$RECORDS_DIR/${output_name}"
|
||||
local frames_dir
|
||||
frames_dir=$(mktemp -d /tmp/record-frames-XXXXXX)
|
||||
|
||||
if [ -f "$PID_FILE" ]; then
|
||||
echo "[record] A recording is already active. Run '$0 stop' first."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
mkdir -p "$RECORDS_DIR" "$screenshot_dir"
|
||||
|
||||
# Video frames loop (~2 fps via agent-browser CDP screenshots)
|
||||
(
|
||||
local idx=0
|
||||
while true; do
|
||||
local fname
|
||||
fname=$(printf "%s/frame_%06d.png" "$frames_dir" "$idx")
|
||||
$AB screenshot "$fname" 2>/dev/null || true
|
||||
idx=$((idx + 1))
|
||||
sleep "$VIDEO_FRAME_INTERVAL"
|
||||
done
|
||||
) &
|
||||
local frames_pid=$!
|
||||
|
||||
# Gallery screenshots loop (every N seconds for human review)
|
||||
(
|
||||
local idx=0
|
||||
while true; do
|
||||
local fname
|
||||
fname=$(printf "%s/%04d.png" "$screenshot_dir" "$idx")
|
||||
$AB screenshot "$fname" 2>/dev/null || true
|
||||
idx=$((idx + 1))
|
||||
sleep "$SCREENSHOT_INTERVAL"
|
||||
done
|
||||
) &
|
||||
local screenshot_pid=$!
|
||||
|
||||
# Save state
|
||||
echo "$frames_pid $screenshot_pid" > "$PID_FILE"
|
||||
echo "$output_video $frames_dir $screenshot_dir" > "$STATE_FILE"
|
||||
|
||||
echo "[record] Started!"
|
||||
echo " Video frames: every ${VIDEO_FRAME_INTERVAL}s (PID $frames_pid)"
|
||||
echo " Screenshots: every ${SCREENSHOT_INTERVAL}s → $screenshot_dir/"
|
||||
echo " Stop with: $0 stop"
|
||||
}
|
||||
|
||||
cmd_stop() {
|
||||
if [ ! -f "$PID_FILE" ] || [ ! -f "$STATE_FILE" ]; then
|
||||
echo "[record] No active recording found."
|
||||
return 0
|
||||
fi
|
||||
|
||||
local frames_pid screenshot_pid
|
||||
read -r frames_pid screenshot_pid < "$PID_FILE"
|
||||
|
||||
local output_video frames_dir screenshot_dir
|
||||
read -r output_video frames_dir screenshot_dir < "$STATE_FILE"
|
||||
|
||||
# Stop both capture loops
|
||||
kill "$frames_pid" 2>/dev/null || true
|
||||
kill "$screenshot_pid" 2>/dev/null || true
|
||||
wait "$frames_pid" 2>/dev/null || true
|
||||
wait "$screenshot_pid" 2>/dev/null || true
|
||||
|
||||
# Assemble frames into video
|
||||
local frame_count
|
||||
frame_count=$(ls -1 "$frames_dir"/frame_*.png 2>/dev/null | wc -l | tr -d ' ')
|
||||
|
||||
if [ "$frame_count" -gt 0 ]; then
|
||||
echo "[record] Assembling $frame_count frames into video..."
|
||||
ffmpeg -y -framerate 2 -i "$frames_dir/frame_%06d.png" \
|
||||
-c:v libx264 -crf 23 -pix_fmt yuv420p -an \
|
||||
"$output_video" > /tmp/ffmpeg-assemble.log 2>&1
|
||||
|
||||
if [ ! -s "$output_video" ]; then
|
||||
echo " [warn] Video assembly failed. Check /tmp/ffmpeg-assemble.log"
|
||||
echo " Frames preserved in: $frames_dir/"
|
||||
fi
|
||||
else
|
||||
echo " [warn] No frames captured."
|
||||
fi
|
||||
|
||||
rm -rf "$frames_dir" 2>/dev/null
|
||||
rm -f "$PID_FILE" "$STATE_FILE"
|
||||
|
||||
local video_size screenshot_count
|
||||
video_size=$(ls -lh "$output_video" 2>/dev/null | awk '{print $5}' || echo "?")
|
||||
screenshot_count=$(ls -1 "$screenshot_dir"/*.png 2>/dev/null | wc -l | tr -d ' ' || echo "0")
|
||||
|
||||
echo "[record] Stopped!"
|
||||
echo " Video: $output_video ($video_size)"
|
||||
echo " Screenshots: ${screenshot_count} files in $screenshot_dir/"
|
||||
echo " Play: open $output_video"
|
||||
}
|
||||
|
||||
cmd_status() {
|
||||
if [ ! -f "$PID_FILE" ]; then
|
||||
echo "[record] No active recording."
|
||||
return 0
|
||||
fi
|
||||
|
||||
local frames_pid screenshot_pid
|
||||
read -r frames_pid screenshot_pid < "$PID_FILE"
|
||||
|
||||
local frames_ok="no" screenshot_ok="no"
|
||||
kill -0 "$frames_pid" 2>/dev/null && frames_ok="yes"
|
||||
kill -0 "$screenshot_pid" 2>/dev/null && screenshot_ok="yes"
|
||||
|
||||
if [ -f "$STATE_FILE" ]; then
|
||||
local output_video frames_dir screenshot_dir
|
||||
read -r output_video frames_dir screenshot_dir < "$STATE_FILE"
|
||||
local frame_count ss_count
|
||||
frame_count=$(ls -1 "$frames_dir"/frame_*.png 2>/dev/null | wc -l | tr -d ' ' || echo "0")
|
||||
ss_count=$(ls -1 "$screenshot_dir"/*.png 2>/dev/null | wc -l | tr -d ' ' || echo "0")
|
||||
echo "[record] Active recording"
|
||||
echo " Frames: $frame_count captured (running: $frames_ok)"
|
||||
echo " Screenshots: $ss_count captured (running: $screenshot_ok)"
|
||||
echo " Output: $output_video"
|
||||
fi
|
||||
}
|
||||
|
||||
# ─── Main ───
|
||||
|
||||
case "${1:-}" in
|
||||
start) shift; cmd_start "$@" ;;
|
||||
stop) cmd_stop ;;
|
||||
status) cmd_status ;;
|
||||
*)
|
||||
echo "Usage: $0 {start [name] | stop | status}"
|
||||
echo ""
|
||||
echo " start [name] Start recording (default: recording-YYYYMMDD-HHMMSS)"
|
||||
echo " stop Stop recording and save outputs"
|
||||
echo " status Check if recording is active"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -25,6 +25,9 @@ Desktop.ini
|
|||
*.code-workspace
|
||||
.vscode/sessions.json
|
||||
prd
|
||||
# Recordings
|
||||
.records/
|
||||
|
||||
# Temporary files
|
||||
.temp/
|
||||
temp/
|
||||
|
|
|
|||
|
|
@ -47,6 +47,12 @@ export interface ServerModelProviderConfig {
|
|||
export type ServerLanguageModel = Partial<Record<GlobalLLMProviderKey, ServerModelProviderConfig>>;
|
||||
|
||||
export interface GlobalServerConfig {
|
||||
/**
|
||||
* Agent Gateway URL for WebSocket-based agent execution.
|
||||
* When set, the SPA can offload agent execution to the server and receive
|
||||
* events via the Gateway instead of running the agent loop client-side.
|
||||
*/
|
||||
agentGatewayUrl?: string;
|
||||
aiProvider: ServerLanguageModel;
|
||||
defaultAgent?: PartialDeep<UserDefaultAgent>;
|
||||
disableEmailPassword?: boolean;
|
||||
|
|
|
|||
|
|
@ -38,6 +38,10 @@ export const UserGuideSchema = z.object({
|
|||
export type UserGuide = z.infer<typeof UserGuideSchema>;
|
||||
|
||||
export const UserLabSchema = z.object({
|
||||
/**
|
||||
* enable server-side agent execution via Gateway WebSocket
|
||||
*/
|
||||
enableGatewayMode: z.boolean().optional(),
|
||||
/**
|
||||
* enable multi-agent group chat mode
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ export const getAppConfig = () => {
|
|||
MARKET_TRUSTED_CLIENT_ID: z.string().optional(),
|
||||
|
||||
AGENT_GATEWAY_SERVICE_TOKEN: z.string().optional(),
|
||||
AGENT_GATEWAY_URL: z.string().url(),
|
||||
AGENT_GATEWAY_URL: z.string().url().optional(),
|
||||
/**
|
||||
* Enable Queue-based Agent Runtime
|
||||
* When true, use QStash for async agent execution (production)
|
||||
|
|
@ -121,7 +121,7 @@ export const getAppConfig = () => {
|
|||
MARKET_TRUSTED_CLIENT_ID: process.env.MARKET_TRUSTED_CLIENT_ID,
|
||||
|
||||
AGENT_GATEWAY_SERVICE_TOKEN: process.env.AGENT_GATEWAY_SERVICE_TOKEN,
|
||||
AGENT_GATEWAY_URL: process.env.AGENT_GATEWAY_URL || 'https://agent-gateway.lobehub.com',
|
||||
AGENT_GATEWAY_URL: process.env.AGENT_GATEWAY_URL,
|
||||
enableQueueAgentRuntime: process.env.AGENT_RUNTIME_MODE === 'queue',
|
||||
TELEMETRY_DISABLED: process.env.TELEMETRY_DISABLED === '1',
|
||||
},
|
||||
|
|
|
|||
|
|
@ -126,6 +126,7 @@ export class AgentStreamClient extends TypedEmitter {
|
|||
this.intentionalDisconnect = true;
|
||||
this.cleanup();
|
||||
this.setStatus('disconnected');
|
||||
this.emit('disconnected');
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -168,6 +169,12 @@ export class AgentStreamClient extends TypedEmitter {
|
|||
}
|
||||
|
||||
private buildWsUrl(): string {
|
||||
// If the URL already has a ws/wss protocol, use it directly
|
||||
if (this.gatewayUrl.startsWith('ws://') || this.gatewayUrl.startsWith('wss://')) {
|
||||
const base = this.gatewayUrl.replace(/\/+$/, '');
|
||||
return `${base}/ws?operationId=${encodeURIComponent(this.operationId)}`;
|
||||
}
|
||||
// Otherwise convert http(s) to ws(s)
|
||||
const wsProtocol = this.gatewayUrl.startsWith('https') ? 'wss' : 'ws';
|
||||
const host = this.gatewayUrl.replace(/^https?:\/\//, '');
|
||||
return `${wsProtocol}://${host}/ws?operationId=${encodeURIComponent(this.operationId)}`;
|
||||
|
|
|
|||
|
|
@ -5,6 +5,10 @@ export type {
|
|||
AgentStreamEvent,
|
||||
AgentStreamEventType,
|
||||
ConnectionStatus,
|
||||
StepCompleteData,
|
||||
StreamChunkData,
|
||||
StreamChunkType,
|
||||
StreamStartData,
|
||||
ToolEndData,
|
||||
ToolStartData,
|
||||
} from './types';
|
||||
|
|
|
|||
|
|
@ -44,6 +44,33 @@ export interface StreamChunkData {
|
|||
toolsCalling?: any[];
|
||||
}
|
||||
|
||||
// ─── Typed Event Data ───
|
||||
|
||||
export interface StreamStartData {
|
||||
assistantMessage: { id: string };
|
||||
model?: string;
|
||||
provider?: string;
|
||||
}
|
||||
|
||||
export interface ToolStartData {
|
||||
parentMessageId: string;
|
||||
toolCalling: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface ToolEndData {
|
||||
executionTime?: number;
|
||||
isSuccess: boolean;
|
||||
payload?: Record<string, unknown>;
|
||||
result?: unknown;
|
||||
}
|
||||
|
||||
export interface StepCompleteData {
|
||||
finalState?: unknown;
|
||||
phase: string;
|
||||
reason?: string;
|
||||
reasonDetail?: string;
|
||||
}
|
||||
|
||||
// ─── WebSocket Protocol Messages ───
|
||||
|
||||
// Client → Server
|
||||
|
|
|
|||
|
|
@ -2,6 +2,9 @@ export default {
|
|||
'features.assistantMessageGroup.desc':
|
||||
'Group agent messages and their tool call results together for display',
|
||||
'features.assistantMessageGroup.title': 'Agent Message Grouping',
|
||||
'features.gatewayMode.desc':
|
||||
'Execute agent tasks on the server via Gateway WebSocket instead of running locally. Enables faster execution and reduces client resource usage.',
|
||||
'features.gatewayMode.title': 'Server-Side Agent Execution (Gateway)',
|
||||
'features.groupChat.desc': 'Enable multi-agent group chat coordination.',
|
||||
'features.groupChat.title': 'Group Chat (Multi-Agent)',
|
||||
'features.inputMarkdown.desc':
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import { useTranslation } from 'react-i18next';
|
|||
import { FORM_STYLE } from '@/const/layoutTokens';
|
||||
import SettingHeader from '@/routes/(main)/settings/features/SettingHeader';
|
||||
import { autoUpdateService } from '@/services/electron/autoUpdate';
|
||||
import { useServerConfigStore } from '@/store/serverConfig';
|
||||
import { useUserStore } from '@/store/user';
|
||||
import { labPreferSelectors, preferenceSelectors, settingsSelectors } from '@/store/user/selectors';
|
||||
|
||||
|
|
@ -34,11 +35,16 @@ const Page = memo(() => {
|
|||
const [setSettings, isUserStateInit] = useUserStore((s) => [s.setSettings, s.isUserStateInit]);
|
||||
const [loading, setLoading] = useState(false);
|
||||
|
||||
const [isPreferenceInit, enableInputMarkdown, updateLab] = useUserStore((s) => [
|
||||
preferenceSelectors.isPreferenceInit(s),
|
||||
labPreferSelectors.enableInputMarkdown(s),
|
||||
s.updateLab,
|
||||
]);
|
||||
const [isPreferenceInit, enableInputMarkdown, enableGatewayMode, updateLab] = useUserStore(
|
||||
(s) => [
|
||||
preferenceSelectors.isPreferenceInit(s),
|
||||
labPreferSelectors.enableInputMarkdown(s),
|
||||
labPreferSelectors.enableGatewayMode(s),
|
||||
s.updateLab,
|
||||
],
|
||||
);
|
||||
|
||||
const hasGatewayUrl = useServerConfigStore((s) => !!s.serverConfig.agentGatewayUrl);
|
||||
|
||||
const [channel, setChannel] = useState<UpdateChannelValue>('stable');
|
||||
|
||||
|
|
@ -112,6 +118,23 @@ const Page = memo(() => {
|
|||
label: tLabs('features.inputMarkdown.title'),
|
||||
minWidth: undefined,
|
||||
},
|
||||
...(hasGatewayUrl
|
||||
? [
|
||||
{
|
||||
children: (
|
||||
<Switch
|
||||
checked={enableGatewayMode}
|
||||
loading={!isPreferenceInit}
|
||||
onChange={(checked: boolean) => updateLab({ enableGatewayMode: checked })}
|
||||
/>
|
||||
),
|
||||
className: styles.labItem,
|
||||
desc: tLabs('features.gatewayMode.desc'),
|
||||
label: tLabs('features.gatewayMode.title'),
|
||||
minWidth: undefined,
|
||||
},
|
||||
]
|
||||
: []),
|
||||
],
|
||||
title: tLabs('title'),
|
||||
};
|
||||
|
|
|
|||
|
|
@ -85,6 +85,11 @@ export const getServerGlobalConfig = async () => {
|
|||
),
|
||||
enableUploadFileToServer: !!fileEnv.S3_SECRET_ACCESS_KEY,
|
||||
|
||||
// Expose Agent Gateway URL to client when queue-based agent runtime is enabled
|
||||
...(appEnv.enableQueueAgentRuntime && appEnv.AGENT_GATEWAY_URL
|
||||
? { agentGatewayUrl: appEnv.AGENT_GATEWAY_URL }
|
||||
: undefined),
|
||||
|
||||
image: cleanObject({
|
||||
defaultImageNum: imageEnv.AI_IMAGE_DEFAULT_IMAGE_NUM,
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ export const createStreamEventManager = (): IStreamEventManager => {
|
|||
}
|
||||
|
||||
// Wrap with Gateway notifier when configured
|
||||
if (appEnv.AGENT_GATEWAY_SERVICE_TOKEN) {
|
||||
if (appEnv.AGENT_GATEWAY_URL && appEnv.AGENT_GATEWAY_SERVICE_TOKEN) {
|
||||
log('Wrapping with GatewayStreamNotifier (%s)', appEnv.AGENT_GATEWAY_URL);
|
||||
return new GatewayStreamNotifier(
|
||||
manager,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,9 @@
|
|||
import type { ExecAgentResult } from '@lobechat/types';
|
||||
|
||||
import { lambdaClient } from '@/libs/trpc/client';
|
||||
|
||||
export type { ExecAgentResult };
|
||||
|
||||
export interface ExecAgentTaskParams {
|
||||
agentId?: string;
|
||||
appContext?: {
|
||||
|
|
@ -91,9 +95,10 @@ export interface UpdateClientTaskThreadStatusParams {
|
|||
|
||||
class AiAgentService {
|
||||
/**
|
||||
* Execute a single Agent task
|
||||
* Execute a single Agent task.
|
||||
* Returns the operationId needed to connect to the Agent Gateway.
|
||||
*/
|
||||
async execAgentTask(params: ExecAgentTaskParams) {
|
||||
async execAgentTask(params: ExecAgentTaskParams): Promise<ExecAgentResult> {
|
||||
return await lambdaClient.aiAgent.execAgent.mutate(params);
|
||||
}
|
||||
|
||||
|
|
|
|||
239
src/store/chat/slices/aiChat/actions/__tests__/gateway.test.ts
Normal file
239
src/store/chat/slices/aiChat/actions/__tests__/gateway.test.ts
Normal file
|
|
@ -0,0 +1,239 @@
|
|||
import { afterEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import type { AgentStreamEvent } from '@/libs/agent-stream';
|
||||
|
||||
import type { GatewayConnection } from '../gateway';
|
||||
import { GatewayActionImpl } from '../gateway';
|
||||
|
||||
// ─── Mock Client Factory ───
|
||||
|
||||
function createMockClient(): GatewayConnection['client'] & {
|
||||
emitEvent: (event: string, ...args: any[]) => void;
|
||||
} {
|
||||
const listeners = new Map<string, Set<(...args: any[]) => void>>();
|
||||
|
||||
return {
|
||||
connect: vi.fn(),
|
||||
disconnect: vi.fn(),
|
||||
emitEvent(event: string, ...args: any[]) {
|
||||
listeners.get(event)?.forEach((listener) => listener(...args));
|
||||
},
|
||||
on: vi.fn((event: string, listener: (...args: any[]) => void) => {
|
||||
let set = listeners.get(event);
|
||||
if (!set) {
|
||||
set = new Set();
|
||||
listeners.set(event, set);
|
||||
}
|
||||
set.add(listener);
|
||||
}),
|
||||
sendInterrupt: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
// ─── Test Helpers ───
|
||||
|
||||
function createTestAction() {
|
||||
const state: Record<string, any> = { gatewayConnections: {} };
|
||||
const set = vi.fn((updater: any) => {
|
||||
if (typeof updater === 'function') {
|
||||
Object.assign(state, updater(state));
|
||||
} else {
|
||||
Object.assign(state, updater);
|
||||
}
|
||||
});
|
||||
const get = vi.fn(() => state as any);
|
||||
|
||||
const action = new GatewayActionImpl(set as any, get, undefined);
|
||||
|
||||
// Inject mock client factory
|
||||
const mockClient = createMockClient();
|
||||
action.createClient = vi.fn(() => mockClient);
|
||||
|
||||
return { action, get, mockClient, set, state };
|
||||
}
|
||||
|
||||
describe('GatewayActionImpl', () => {
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe('connectToGateway', () => {
|
||||
it('should create client and add to store', () => {
|
||||
const { action, mockClient, state } = createTestAction();
|
||||
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
operationId: 'op-1',
|
||||
token: 'test-token',
|
||||
});
|
||||
|
||||
expect(state.gatewayConnections['op-1']).toBeDefined();
|
||||
expect(state.gatewayConnections['op-1'].status).toBe('connecting');
|
||||
expect(mockClient.connect).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('should wire up status_changed listener', () => {
|
||||
const { action, mockClient, state } = createTestAction();
|
||||
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
operationId: 'op-1',
|
||||
token: 'test-token',
|
||||
});
|
||||
|
||||
mockClient.emitEvent('status_changed', 'connected');
|
||||
expect(state.gatewayConnections['op-1'].status).toBe('connected');
|
||||
});
|
||||
|
||||
it('should forward agent events to onEvent callback', () => {
|
||||
const { action, mockClient } = createTestAction();
|
||||
const events: AgentStreamEvent[] = [];
|
||||
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
onEvent: (e) => events.push(e),
|
||||
operationId: 'op-1',
|
||||
token: 'test-token',
|
||||
});
|
||||
|
||||
const testEvent: AgentStreamEvent = {
|
||||
data: { content: 'hello' },
|
||||
operationId: 'op-1',
|
||||
stepIndex: 0,
|
||||
timestamp: Date.now(),
|
||||
type: 'stream_chunk',
|
||||
};
|
||||
mockClient.emitEvent('agent_event', testEvent);
|
||||
|
||||
expect(events).toHaveLength(1);
|
||||
expect(events[0]).toEqual(testEvent);
|
||||
});
|
||||
|
||||
it('should cleanup on session_complete', () => {
|
||||
const { action, mockClient, state } = createTestAction();
|
||||
const onComplete = vi.fn();
|
||||
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
onSessionComplete: onComplete,
|
||||
operationId: 'op-1',
|
||||
token: 'test-token',
|
||||
});
|
||||
|
||||
mockClient.emitEvent('session_complete');
|
||||
expect(state.gatewayConnections['op-1']).toBeUndefined();
|
||||
expect(onComplete).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('should cleanup on disconnected', () => {
|
||||
const { action, mockClient, state } = createTestAction();
|
||||
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
operationId: 'op-1',
|
||||
token: 'test-token',
|
||||
});
|
||||
|
||||
mockClient.emitEvent('disconnected');
|
||||
expect(state.gatewayConnections['op-1']).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should cleanup on auth_failed', () => {
|
||||
const { action, mockClient, state } = createTestAction();
|
||||
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
operationId: 'op-1',
|
||||
token: 'test-token',
|
||||
});
|
||||
|
||||
mockClient.emitEvent('auth_failed', 'invalid token');
|
||||
expect(state.gatewayConnections['op-1']).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should disconnect existing connection before creating new one', () => {
|
||||
const { action, state } = createTestAction();
|
||||
|
||||
// First connection with its own mock
|
||||
const firstMock = createMockClient();
|
||||
action.createClient = vi.fn(() => firstMock);
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
operationId: 'op-1',
|
||||
token: 'token-1',
|
||||
});
|
||||
|
||||
// Second connection
|
||||
const secondMock = createMockClient();
|
||||
action.createClient = vi.fn(() => secondMock);
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
operationId: 'op-1',
|
||||
token: 'token-2',
|
||||
});
|
||||
|
||||
expect(firstMock.disconnect).toHaveBeenCalled();
|
||||
expect(state.gatewayConnections['op-1'].client).toBe(secondMock);
|
||||
});
|
||||
});
|
||||
|
||||
describe('disconnectFromGateway', () => {
|
||||
it('should disconnect and cleanup', () => {
|
||||
const { action, mockClient, state } = createTestAction();
|
||||
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
operationId: 'op-1',
|
||||
token: 'test-token',
|
||||
});
|
||||
|
||||
action.disconnectFromGateway('op-1');
|
||||
expect(mockClient.disconnect).toHaveBeenCalled();
|
||||
expect(state.gatewayConnections['op-1']).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should be a no-op for unknown operationId', () => {
|
||||
const { action } = createTestAction();
|
||||
action.disconnectFromGateway('nonexistent');
|
||||
});
|
||||
});
|
||||
|
||||
describe('interruptGatewayAgent', () => {
|
||||
it('should send interrupt to the client', () => {
|
||||
const { action, mockClient } = createTestAction();
|
||||
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
operationId: 'op-1',
|
||||
token: 'test-token',
|
||||
});
|
||||
|
||||
action.interruptGatewayAgent('op-1');
|
||||
expect(mockClient.sendInterrupt).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('should be a no-op for unknown operationId', () => {
|
||||
const { action } = createTestAction();
|
||||
action.interruptGatewayAgent('nonexistent');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getGatewayConnectionStatus', () => {
|
||||
it('should return status for active connection', () => {
|
||||
const { action } = createTestAction();
|
||||
|
||||
action.connectToGateway({
|
||||
gatewayUrl: 'https://gateway.test.com',
|
||||
operationId: 'op-1',
|
||||
token: 'test-token',
|
||||
});
|
||||
|
||||
expect(action.getGatewayConnectionStatus('op-1')).toBe('connecting');
|
||||
});
|
||||
|
||||
it('should return undefined for unknown operationId', () => {
|
||||
const { action } = createTestAction();
|
||||
expect(action.getGatewayConnectionStatus('nonexistent')).toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,367 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import type { AgentStreamEvent } from '@/libs/agent-stream';
|
||||
|
||||
import { createGatewayEventHandler } from '../gatewayEventHandler';
|
||||
|
||||
vi.mock('@/services/message', () => ({
|
||||
messageService: { getMessages: vi.fn().mockResolvedValue([]) },
|
||||
}));
|
||||
|
||||
// ─── Test Helpers ───
|
||||
|
||||
function createMockStore() {
|
||||
return {
|
||||
associateMessageWithOperation: vi.fn(),
|
||||
completeOperation: vi.fn(),
|
||||
internal_dispatchMessage: vi.fn(),
|
||||
internal_toggleToolCallingStreaming: vi.fn(),
|
||||
replaceMessages: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function createHandler(
|
||||
store: ReturnType<typeof createMockStore>,
|
||||
overrides?: { assistantMessageId?: string },
|
||||
) {
|
||||
const get = vi.fn(() => store) as any;
|
||||
return createGatewayEventHandler(get, {
|
||||
assistantMessageId: overrides?.assistantMessageId ?? 'msg-initial',
|
||||
context: { agentId: 'agent-1', scope: 'session', topicId: 'topic-1' } as any,
|
||||
operationId: 'op-1',
|
||||
});
|
||||
}
|
||||
|
||||
function makeEvent(type: AgentStreamEvent['type'], data?: any): AgentStreamEvent {
|
||||
return { data, id: '1', operationId: 'op-1', stepIndex: 0, timestamp: Date.now(), type };
|
||||
}
|
||||
|
||||
/** Flush the async processing queue by draining microtasks + setTimeout queue */
|
||||
const flush = async () => {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
await new Promise((r) => setTimeout(r, 15));
|
||||
}
|
||||
};
|
||||
|
||||
// ─── Tests ───
|
||||
|
||||
describe('createGatewayEventHandler', () => {
|
||||
describe('stream_start', () => {
|
||||
it('should associate new message with operation', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-step2' } }));
|
||||
await flush();
|
||||
|
||||
expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-step2', 'op-1');
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should keep current ID if event data has no assistantMessage', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('stream_start', {}));
|
||||
await flush();
|
||||
|
||||
// No new message to associate, but fetch still happens
|
||||
expect(store.associateMessageWithOperation).not.toHaveBeenCalled();
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should reset accumulators on each stream_start', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
// Accumulate some content
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'hello' }));
|
||||
await flush();
|
||||
expect(store.internal_dispatchMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ value: { content: 'hello' } }),
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
|
||||
// New stream_start resets
|
||||
handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-step2' } }));
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'world' }));
|
||||
await flush();
|
||||
|
||||
// Content should be 'world', not 'helloworld'
|
||||
expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({
|
||||
id: 'msg-step2',
|
||||
value: { content: 'world' },
|
||||
}),
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('stream_chunk', () => {
|
||||
it('should accumulate text content and pass operationId context', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Hello' }));
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'text', content: ' world' }));
|
||||
await flush();
|
||||
|
||||
expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith(
|
||||
{
|
||||
id: 'msg-initial',
|
||||
type: 'updateMessage',
|
||||
value: { content: 'Hello world' },
|
||||
},
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
});
|
||||
|
||||
it('should accumulate reasoning content', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'reasoning', reasoning: 'Think' }));
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'reasoning', reasoning: 'ing...' }));
|
||||
await flush();
|
||||
|
||||
expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith(
|
||||
{
|
||||
id: 'msg-initial',
|
||||
type: 'updateMessage',
|
||||
value: { reasoning: { content: 'Thinking...' } },
|
||||
},
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
});
|
||||
|
||||
it('should dispatch tools and toggle tool calling streaming', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
const toolsCalling = [{ id: 'tc-1' }, { id: 'tc-2' }];
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'tools_calling', toolsCalling }));
|
||||
await flush();
|
||||
|
||||
expect(store.internal_dispatchMessage).toHaveBeenCalledWith(
|
||||
{
|
||||
id: 'msg-initial',
|
||||
type: 'updateMessage',
|
||||
value: { tools: toolsCalling },
|
||||
},
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
|
||||
expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith('msg-initial', [
|
||||
true,
|
||||
true,
|
||||
]);
|
||||
});
|
||||
|
||||
it('should ignore chunk with no data', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('stream_chunk', undefined));
|
||||
await flush();
|
||||
|
||||
expect(store.internal_dispatchMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('stream_end', () => {
|
||||
it('should clear tool streaming only', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('stream_end'));
|
||||
await flush();
|
||||
|
||||
expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith(
|
||||
'msg-initial',
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('tool_start', () => {
|
||||
it('should be a no-op (loading already active from stream_start)', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('tool_start', { parentMessageId: 'msg-initial', toolCalling: {} }));
|
||||
await flush();
|
||||
|
||||
expect(store.internal_dispatchMessage).not.toHaveBeenCalled();
|
||||
expect(store.replaceMessages).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('tool_end', () => {
|
||||
it('should refresh messages to pull tool results', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('tool_end', { isSuccess: true }));
|
||||
await flush();
|
||||
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('step_complete', () => {
|
||||
it('should refresh on execution_complete phase', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('step_complete', { phase: 'execution_complete', reason: 'done' }));
|
||||
await flush();
|
||||
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not refresh on other phases', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('step_complete', { phase: 'human_approval' }));
|
||||
await flush();
|
||||
|
||||
expect(store.replaceMessages).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('agent_runtime_end', () => {
|
||||
it('should complete operation and refresh messages', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('agent_runtime_end'));
|
||||
await flush();
|
||||
|
||||
expect(store.completeOperation).toHaveBeenCalledWith('op-1');
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('error', () => {
|
||||
it('should dispatch error to current message with operationId context', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('error', { message: 'Something went wrong' }));
|
||||
await flush();
|
||||
|
||||
expect(store.internal_dispatchMessage).toHaveBeenCalledWith(
|
||||
{
|
||||
id: 'msg-initial',
|
||||
type: 'updateMessage',
|
||||
value: {
|
||||
error: { body: { message: 'Something went wrong' }, type: 'AgentRuntimeError' },
|
||||
},
|
||||
},
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
});
|
||||
|
||||
it('should dispatch error to switched message ID', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-step2' } }));
|
||||
handler(makeEvent('error', { error: 'Timeout' }));
|
||||
await flush();
|
||||
|
||||
expect(store.internal_dispatchMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ id: 'msg-step2' }),
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('sequential processing', () => {
|
||||
it('should process stream_chunk only after stream_start refresh completes', async () => {
|
||||
const store = createMockStore();
|
||||
const callOrder: string[] = [];
|
||||
|
||||
const { messageService } = await import('@/services/message');
|
||||
(messageService.getMessages as any).mockImplementation(async () => {
|
||||
callOrder.push('refresh_start');
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
callOrder.push('refresh_end');
|
||||
return [];
|
||||
});
|
||||
store.internal_dispatchMessage.mockImplementation(() => {
|
||||
callOrder.push('dispatch');
|
||||
});
|
||||
store.associateMessageWithOperation.mockImplementation(() => {
|
||||
callOrder.push('associate');
|
||||
});
|
||||
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-new' } }));
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Hello' }));
|
||||
await flush();
|
||||
|
||||
const refreshEndIdx = callOrder.indexOf('refresh_end');
|
||||
const dispatchIdx = callOrder.indexOf('dispatch');
|
||||
expect(refreshEndIdx).toBeGreaterThan(-1);
|
||||
expect(dispatchIdx).toBeGreaterThan(refreshEndIdx);
|
||||
});
|
||||
});
|
||||
|
||||
describe('multi-step integration', () => {
|
||||
it('should handle full LLM → tools → LLM cycle', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
// Step 1: LLM call
|
||||
handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-1' } }));
|
||||
await flush();
|
||||
expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-1', 'op-1');
|
||||
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Let me search.' }));
|
||||
await flush();
|
||||
expect(store.internal_dispatchMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ id: 'msg-1', value: { content: 'Let me search.' } }),
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
|
||||
const tools = [{ id: 'tc-1' }];
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'tools_calling', toolsCalling: tools }));
|
||||
await flush();
|
||||
expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith('msg-1', [true]);
|
||||
|
||||
handler(makeEvent('stream_end'));
|
||||
await flush();
|
||||
// Loading stays active between steps — only tool streaming is cleared
|
||||
expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith('msg-1', undefined);
|
||||
|
||||
// Tool execution
|
||||
handler(makeEvent('tool_start', { parentMessageId: 'msg-1', toolCalling: tools[0] }));
|
||||
handler(makeEvent('tool_end', { isSuccess: true }));
|
||||
await flush();
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
|
||||
// Step 2: Next LLM call with new assistant message
|
||||
vi.clearAllMocks();
|
||||
handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-2' } }));
|
||||
await flush();
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-2', 'op-1');
|
||||
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Here are the results.' }));
|
||||
await flush();
|
||||
expect(store.internal_dispatchMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ id: 'msg-2', value: { content: 'Here are the results.' } }),
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
|
||||
handler(makeEvent('stream_end'));
|
||||
handler(makeEvent('agent_runtime_end'));
|
||||
await flush();
|
||||
expect(store.completeOperation).toHaveBeenCalledWith('op-1');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -48,7 +48,6 @@ import {
|
|||
parseSelectedToolsFromEditorData,
|
||||
processCommands,
|
||||
} from './commandBus';
|
||||
|
||||
/**
|
||||
* Extended params for sendMessage with context
|
||||
*/
|
||||
|
|
@ -584,6 +583,31 @@ export class ConversationLifecycleActionImpl {
|
|||
}
|
||||
|
||||
// ── AI execution ──
|
||||
|
||||
// Gateway mode: server-side execution via WebSocket (opt-in via Labs toggle)
|
||||
if (this.#get().isGatewayModeEnabled()) {
|
||||
try {
|
||||
await this.#get().executeGatewayAgent({
|
||||
assistantMessageId: data.assistantMessageId,
|
||||
context: execContext,
|
||||
message,
|
||||
parentOperationId: operationId,
|
||||
topicId: data.topicId,
|
||||
userMessageId: data.userMessageId,
|
||||
});
|
||||
} catch (e) {
|
||||
console.error('[Gateway] Failed to start server-side agent:', e);
|
||||
if (data.topicId) this.#get().internal_updateTopicLoading(data.topicId, false);
|
||||
}
|
||||
|
||||
return {
|
||||
assistantMessageId: data.assistantMessageId,
|
||||
createdThreadId: data.createdThreadId,
|
||||
userMessageId: data.userMessageId,
|
||||
};
|
||||
}
|
||||
|
||||
// Client mode: run agent loop locally
|
||||
{
|
||||
const displayMessages = displayMessageSelectors.getDisplayMessagesByKey(
|
||||
messageMapKey(execContext),
|
||||
|
|
|
|||
239
src/store/chat/slices/aiChat/actions/gateway.ts
Normal file
239
src/store/chat/slices/aiChat/actions/gateway.ts
Normal file
|
|
@ -0,0 +1,239 @@
|
|||
import type { ConversationContext } from '@lobechat/types';
|
||||
|
||||
import type {
|
||||
AgentStreamClientOptions,
|
||||
AgentStreamEvent,
|
||||
ConnectionStatus,
|
||||
} from '@/libs/agent-stream';
|
||||
import { AgentStreamClient } from '@/libs/agent-stream/client';
|
||||
import { aiAgentService } from '@/services/aiAgent';
|
||||
import type { ChatStore } from '@/store/chat/store';
|
||||
import type { StoreSetter } from '@/store/types';
|
||||
import { useUserStore } from '@/store/user';
|
||||
|
||||
import { createGatewayEventHandler } from './gatewayEventHandler';
|
||||
|
||||
type Setter = StoreSetter<ChatStore>;
|
||||
|
||||
// ─── Types ───
|
||||
|
||||
export interface GatewayConnection {
|
||||
client: Pick<AgentStreamClient, 'connect' | 'disconnect' | 'on' | 'sendInterrupt'>;
|
||||
status: ConnectionStatus;
|
||||
}
|
||||
|
||||
export interface ConnectGatewayParams {
|
||||
/**
|
||||
* Gateway WebSocket URL (e.g. https://agent-gateway.lobehub.com)
|
||||
*/
|
||||
gatewayUrl: string;
|
||||
/**
|
||||
* Callback for each agent event received
|
||||
*/
|
||||
onEvent?: (event: AgentStreamEvent) => void;
|
||||
/**
|
||||
* Called when the session completes (agent_runtime_end or session_complete)
|
||||
*/
|
||||
onSessionComplete?: () => void;
|
||||
/**
|
||||
* The operation ID returned by execAgent
|
||||
*/
|
||||
operationId: string;
|
||||
/**
|
||||
* Auth token for the Gateway
|
||||
*/
|
||||
token: string;
|
||||
}
|
||||
|
||||
// ─── Action Implementation ───
|
||||
|
||||
export class GatewayActionImpl {
|
||||
readonly #get: () => ChatStore;
|
||||
readonly #set: Setter;
|
||||
|
||||
/** Overridable factory for testing */
|
||||
createClient: (options: AgentStreamClientOptions) => GatewayConnection['client'] = (options) =>
|
||||
new AgentStreamClient(options);
|
||||
|
||||
constructor(set: Setter, get: () => ChatStore, _api?: unknown) {
|
||||
void _api;
|
||||
this.#set = set;
|
||||
this.#get = get;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the Agent Gateway for a specific operation.
|
||||
* Creates an AgentStreamClient, manages its lifecycle, and wires up event callbacks.
|
||||
*/
|
||||
connectToGateway = (params: ConnectGatewayParams): void => {
|
||||
const { operationId, gatewayUrl, token, onEvent, onSessionComplete } = params;
|
||||
|
||||
// Disconnect existing connection for this operation if any
|
||||
this.disconnectFromGateway(operationId);
|
||||
|
||||
const client = this.createClient({ gatewayUrl, operationId, token });
|
||||
|
||||
// Track connection in store
|
||||
this.#set(
|
||||
(state) => ({
|
||||
gatewayConnections: {
|
||||
...state.gatewayConnections,
|
||||
[operationId]: { client, status: 'connecting' },
|
||||
},
|
||||
}),
|
||||
false,
|
||||
'connectToGateway',
|
||||
);
|
||||
|
||||
// Wire up status changes
|
||||
client.on('status_changed', (status) => {
|
||||
this.#set(
|
||||
(state) => {
|
||||
const conn = state.gatewayConnections[operationId];
|
||||
if (!conn) return state;
|
||||
return {
|
||||
gatewayConnections: { ...state.gatewayConnections, [operationId]: { ...conn, status } },
|
||||
};
|
||||
},
|
||||
false,
|
||||
'gateway/statusChanged',
|
||||
);
|
||||
});
|
||||
|
||||
// Forward agent events to caller
|
||||
if (onEvent) {
|
||||
client.on('agent_event', onEvent);
|
||||
}
|
||||
|
||||
// Handle session completion
|
||||
client.on('session_complete', () => {
|
||||
this.internal_cleanupGatewayConnection(operationId);
|
||||
onSessionComplete?.();
|
||||
});
|
||||
|
||||
// Handle disconnection (terminal events auto-disconnect the client)
|
||||
client.on('disconnected', () => {
|
||||
this.internal_cleanupGatewayConnection(operationId);
|
||||
});
|
||||
|
||||
// Handle auth failures
|
||||
client.on('auth_failed', (reason) => {
|
||||
console.error(`[Gateway] Auth failed for operation ${operationId}: ${reason}`);
|
||||
this.internal_cleanupGatewayConnection(operationId);
|
||||
});
|
||||
|
||||
client.connect();
|
||||
};
|
||||
|
||||
/**
|
||||
* Disconnect from the Gateway for a specific operation.
|
||||
*/
|
||||
disconnectFromGateway = (operationId: string): void => {
|
||||
const conn = this.#get().gatewayConnections[operationId];
|
||||
if (!conn) return;
|
||||
|
||||
conn.client.disconnect();
|
||||
this.internal_cleanupGatewayConnection(operationId);
|
||||
};
|
||||
|
||||
/**
|
||||
* Send an interrupt command to stop the agent for a specific operation.
|
||||
*/
|
||||
interruptGatewayAgent = (operationId: string): void => {
|
||||
const conn = this.#get().gatewayConnections[operationId];
|
||||
if (!conn) return;
|
||||
|
||||
conn.client.sendInterrupt();
|
||||
};
|
||||
|
||||
/**
|
||||
* Get the connection status for a specific operation.
|
||||
*/
|
||||
getGatewayConnectionStatus = (operationId: string): ConnectionStatus | undefined => {
|
||||
return this.#get().gatewayConnections[operationId]?.status;
|
||||
};
|
||||
|
||||
/**
|
||||
* Check if Gateway mode is available and enabled.
|
||||
* Returns true if both server config and user lab toggle are set.
|
||||
*/
|
||||
isGatewayModeEnabled = (): boolean => {
|
||||
const agentGatewayUrl =
|
||||
window.global_serverConfigStore?.getState()?.serverConfig?.agentGatewayUrl;
|
||||
const enableGatewayMode = useUserStore.getState().preference.lab?.enableGatewayMode;
|
||||
|
||||
return !!agentGatewayUrl && !!enableGatewayMode;
|
||||
};
|
||||
|
||||
/**
|
||||
* Execute agent task via Gateway WebSocket.
|
||||
* Call isGatewayModeEnabled() first to check availability.
|
||||
*/
|
||||
executeGatewayAgent = async (params: {
|
||||
assistantMessageId: string;
|
||||
context: ConversationContext;
|
||||
message: string;
|
||||
parentOperationId: string;
|
||||
topicId?: string;
|
||||
userMessageId: string;
|
||||
}): Promise<void> => {
|
||||
const { assistantMessageId, context, message, parentOperationId, topicId, userMessageId } =
|
||||
params;
|
||||
|
||||
const agentGatewayUrl =
|
||||
window.global_serverConfigStore!.getState().serverConfig.agentGatewayUrl!;
|
||||
|
||||
const result = await aiAgentService.execAgentTask({
|
||||
agentId: context.agentId,
|
||||
appContext: {
|
||||
groupId: context.groupId,
|
||||
scope: context.scope,
|
||||
threadId: context.threadId,
|
||||
topicId: context.topicId,
|
||||
},
|
||||
existingMessageIds: [userMessageId, assistantMessageId],
|
||||
prompt: message,
|
||||
});
|
||||
|
||||
// Create a dedicated operation for gateway execution with correct context
|
||||
const { operationId: gatewayOpId } = this.#get().startOperation({
|
||||
context,
|
||||
parentOperationId,
|
||||
type: 'execServerAgentRuntime',
|
||||
});
|
||||
|
||||
// Associate the initial assistant message with the gateway operation
|
||||
// so the UI shows loading/generating state via the operation system
|
||||
this.#get().associateMessageWithOperation(assistantMessageId, gatewayOpId);
|
||||
|
||||
const eventHandler = createGatewayEventHandler(this.#get, {
|
||||
assistantMessageId,
|
||||
context,
|
||||
operationId: gatewayOpId,
|
||||
});
|
||||
|
||||
this.#get().connectToGateway({
|
||||
gatewayUrl: agentGatewayUrl,
|
||||
onEvent: eventHandler,
|
||||
onSessionComplete: () => {
|
||||
this.#get().completeOperation(gatewayOpId);
|
||||
if (topicId) this.#get().internal_updateTopicLoading(topicId, false);
|
||||
},
|
||||
operationId: result.operationId,
|
||||
token: result.token || '',
|
||||
});
|
||||
};
|
||||
|
||||
private internal_cleanupGatewayConnection = (operationId: string): void => {
|
||||
this.#set(
|
||||
(state) => {
|
||||
const { [operationId]: _, ...rest } = state.gatewayConnections;
|
||||
return { gatewayConnections: rest };
|
||||
},
|
||||
false,
|
||||
'gateway/cleanup',
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
export type GatewayAction = Pick<GatewayActionImpl, keyof GatewayActionImpl>;
|
||||
198
src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts
Normal file
198
src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
import type { ConversationContext } from '@lobechat/types';
|
||||
|
||||
import type {
|
||||
AgentStreamEvent,
|
||||
StepCompleteData,
|
||||
StreamChunkData,
|
||||
StreamStartData,
|
||||
} from '@/libs/agent-stream';
|
||||
import { messageService } from '@/services/message';
|
||||
import type { ChatStore } from '@/store/chat/store';
|
||||
|
||||
/**
|
||||
* Fetch messages from DB and replace them in the chat store's dbMessagesMap.
|
||||
* This updates the ConversationArea component via React subscription:
|
||||
* dbMessagesMap → ConversationArea (messages prop) → ConversationStore → UI
|
||||
*/
|
||||
const fetchAndReplaceMessages = async (get: () => ChatStore, context: ConversationContext) => {
|
||||
const messages = await messageService.getMessages(context);
|
||||
get().replaceMessages(messages, { context });
|
||||
};
|
||||
|
||||
/**
|
||||
* Creates a handler function that processes Agent Gateway events
|
||||
* and maps them to the chat store's message update actions.
|
||||
*
|
||||
* Supports multi-step agent execution (LLM → tool calls → next LLM → ...)
|
||||
* using a hybrid approach:
|
||||
* - Current LLM step: real-time streaming via stream_chunk
|
||||
* - Step transitions: fetchAndReplaceMessages from DB at stream_start / tool_end / step_complete
|
||||
*
|
||||
* The handler queues incoming events and processes them sequentially,
|
||||
* ensuring that stream_chunk waits for stream_start's DB fetch to resolve
|
||||
* before dispatching updates.
|
||||
*/
|
||||
export const createGatewayEventHandler = (
|
||||
get: () => ChatStore,
|
||||
params: {
|
||||
assistantMessageId: string;
|
||||
context: ConversationContext;
|
||||
operationId: string;
|
||||
},
|
||||
) => {
|
||||
const { context, operationId } = params;
|
||||
|
||||
// Dispatch context — ensures internal_dispatchMessage resolves the correct messageMapKey
|
||||
const dispatchContext = { operationId };
|
||||
|
||||
// Mutable — switches to new assistant message ID on each stream_start
|
||||
let currentAssistantMessageId = params.assistantMessageId;
|
||||
|
||||
// Accumulated content from stream chunks (reset on each stream_start)
|
||||
let accumulatedContent = '';
|
||||
let accumulatedReasoning = '';
|
||||
|
||||
// Sequential processing queue — ensures stream_chunk waits for stream_start's fetch
|
||||
let processingChain: Promise<void> = Promise.resolve();
|
||||
|
||||
const enqueue = (fn: () => Promise<void> | void): void => {
|
||||
processingChain = processingChain.then(fn, fn);
|
||||
};
|
||||
|
||||
return (event: AgentStreamEvent) => {
|
||||
switch (event.type) {
|
||||
case 'stream_start': {
|
||||
enqueue(async () => {
|
||||
const data = event.data as StreamStartData | undefined;
|
||||
|
||||
const newAssistantMessageId = data?.assistantMessage?.id;
|
||||
|
||||
// Switch to the new assistant message created by the server for this step
|
||||
if (newAssistantMessageId) {
|
||||
currentAssistantMessageId = newAssistantMessageId;
|
||||
// Associate the new message with the operation so UI shows generating state
|
||||
get().associateMessageWithOperation(currentAssistantMessageId, operationId);
|
||||
}
|
||||
|
||||
// Reset accumulators for the new stream
|
||||
accumulatedContent = '';
|
||||
accumulatedReasoning = '';
|
||||
|
||||
// Fetch from DB so the new message exists in dbMessagesMap before chunks arrive
|
||||
await fetchAndReplaceMessages(get, context).catch(console.error);
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case 'stream_chunk': {
|
||||
enqueue(() => {
|
||||
const data = event.data as StreamChunkData | undefined;
|
||||
if (!data) return;
|
||||
|
||||
if (data.chunkType === 'text' && data.content) {
|
||||
accumulatedContent += data.content;
|
||||
get().internal_dispatchMessage(
|
||||
{
|
||||
id: currentAssistantMessageId,
|
||||
type: 'updateMessage',
|
||||
value: { content: accumulatedContent },
|
||||
},
|
||||
dispatchContext,
|
||||
);
|
||||
}
|
||||
|
||||
if (data.chunkType === 'reasoning' && data.reasoning) {
|
||||
accumulatedReasoning += data.reasoning;
|
||||
get().internal_dispatchMessage(
|
||||
{
|
||||
id: currentAssistantMessageId,
|
||||
type: 'updateMessage',
|
||||
value: { reasoning: { content: accumulatedReasoning } },
|
||||
},
|
||||
dispatchContext,
|
||||
);
|
||||
}
|
||||
|
||||
if (data.chunkType === 'tools_calling' && data.toolsCalling) {
|
||||
get().internal_dispatchMessage(
|
||||
{
|
||||
id: currentAssistantMessageId,
|
||||
type: 'updateMessage',
|
||||
value: { tools: data.toolsCalling },
|
||||
},
|
||||
dispatchContext,
|
||||
);
|
||||
|
||||
// Drive tool calling animation
|
||||
get().internal_toggleToolCallingStreaming(
|
||||
currentAssistantMessageId,
|
||||
data.toolsCalling.map(() => true),
|
||||
);
|
||||
}
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case 'stream_end': {
|
||||
enqueue(() => {
|
||||
// Only clear tool calling streaming — keep message loading active
|
||||
// until agent_runtime_end so users don't think the session ended
|
||||
// during tool execution gaps between steps
|
||||
get().internal_toggleToolCallingStreaming(currentAssistantMessageId, undefined);
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case 'tool_start': {
|
||||
// Server creates tool messages in DB.
|
||||
// Loading is already active from stream_start (not cleared by stream_end).
|
||||
break;
|
||||
}
|
||||
|
||||
case 'tool_end': {
|
||||
enqueue(async () => {
|
||||
await fetchAndReplaceMessages(get, context).catch(console.error);
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case 'step_complete': {
|
||||
const data = event.data as StepCompleteData | undefined;
|
||||
|
||||
// Refresh on execution_complete to ensure final step state is consistent
|
||||
if (data?.phase === 'execution_complete') {
|
||||
enqueue(async () => {
|
||||
await fetchAndReplaceMessages(get, context).catch(console.error);
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'agent_runtime_end': {
|
||||
enqueue(async () => {
|
||||
get().internal_toggleToolCallingStreaming(currentAssistantMessageId, undefined);
|
||||
get().completeOperation(operationId);
|
||||
await fetchAndReplaceMessages(get, context).catch(console.error);
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case 'error': {
|
||||
enqueue(() => {
|
||||
const errorMsg = event.data?.message || event.data?.error || 'Unknown error';
|
||||
get().internal_dispatchMessage(
|
||||
{
|
||||
id: currentAssistantMessageId,
|
||||
type: 'updateMessage',
|
||||
value: {
|
||||
error: { body: { message: errorMsg }, type: 'AgentRuntimeError' },
|
||||
},
|
||||
},
|
||||
dispatchContext,
|
||||
);
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
|
@ -7,6 +7,8 @@ import { type ConversationControlAction } from './conversationControl';
|
|||
import { ConversationControlActionImpl } from './conversationControl';
|
||||
import { type ConversationLifecycleAction } from './conversationLifecycle';
|
||||
import { ConversationLifecycleActionImpl } from './conversationLifecycle';
|
||||
import { type GatewayAction } from './gateway';
|
||||
import { GatewayActionImpl } from './gateway';
|
||||
import { type ChatMemoryAction } from './memory';
|
||||
import { ChatMemoryActionImpl } from './memory';
|
||||
import { type StreamingExecutorAction } from './streamingExecutor';
|
||||
|
|
@ -17,6 +19,7 @@ import { StreamingStatesActionImpl } from './streamingStates';
|
|||
export type ChatAIChatAction = ChatMemoryAction &
|
||||
ConversationLifecycleAction &
|
||||
ConversationControlAction &
|
||||
GatewayAction &
|
||||
StreamingExecutorAction &
|
||||
StreamingStatesAction;
|
||||
|
||||
|
|
@ -34,6 +37,7 @@ export const chatAiChat: StateCreator<
|
|||
new ChatMemoryActionImpl(...params),
|
||||
new ConversationLifecycleActionImpl(...params),
|
||||
new ConversationControlActionImpl(...params),
|
||||
new GatewayActionImpl(...params),
|
||||
new StreamingExecutorActionImpl(...params),
|
||||
new StreamingStatesActionImpl(...params),
|
||||
]);
|
||||
|
|
|
|||
|
|
@ -1,6 +1,11 @@
|
|||
import { type ChatInputEditor } from '@/features/ChatInput';
|
||||
import type { GatewayConnection } from '@/store/chat/slices/aiChat/actions/gateway';
|
||||
|
||||
export interface ChatAIChatState {
|
||||
/**
|
||||
* Active Agent Gateway WebSocket connections, keyed by operationId
|
||||
*/
|
||||
gatewayConnections: Record<string, GatewayConnection>;
|
||||
inputFiles: File[];
|
||||
inputMessage: string;
|
||||
mainInputEditor: ChatInputEditor | null;
|
||||
|
|
@ -13,6 +18,7 @@ export interface ChatAIChatState {
|
|||
}
|
||||
|
||||
export const initialAiChatState: ChatAIChatState = {
|
||||
gatewayConnections: {},
|
||||
inputFiles: [],
|
||||
inputMessage: '',
|
||||
mainInputEditor: null,
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { DEFAULT_PREFERENCE } from '@lobechat/const';
|
|||
import { type UserState } from '@/store/user/initialState';
|
||||
|
||||
export const labPreferSelectors = {
|
||||
enableGatewayMode: (s: UserState): boolean => s.preference.lab?.enableGatewayMode ?? false,
|
||||
enableInputMarkdown: (s: UserState): boolean =>
|
||||
s.preference.lab?.enableInputMarkdown ?? DEFAULT_PREFERENCE.lab!.enableInputMarkdown!,
|
||||
};
|
||||
|
|
|
|||
Loading…
Reference in a new issue