Fix: Add stale workflow cleanup and defense-in-depth error handling (#237)

* Fix: Add stale workflow cleanup and defense-in-depth error handling

Problem: Workflows could get stuck in "running" state indefinitely when
the async generator disconnected but the AI subprocess continued working.
This blocked new workflow invocations with "Workflow already running" errors.

Root cause: No cleanup mechanism existed for workflows that failed to
complete due to disconnection between the executor and the Claude SDK.

Solution (defense-in-depth):
1. Activity-based staleness detection: Workflows inactive for 15+ minutes
   are auto-failed when a new workflow is triggered on the same conversation
2. Top-level error handling: All errors in workflow execution are caught
   and the workflow is properly marked as failed (prevents stuck state)
3. Manual cancel command: /workflow cancel lets users force-fail stuck
   workflows immediately

Changes:
- Add last_activity_at column via migration for staleness tracking
- Add updateWorkflowActivity() to track activity during execution
- Add staleness check before blocking concurrent workflows
- Wrap workflow execution in try-catch to ensure failure is recorded
- Add /workflow cancel subcommand to command handler
- Update test to match new error handling behavior

Fixes #232

* docs: Add /workflow cancel command to documentation

* Improve error handling and add comprehensive tests for stale workflow cleanup

Error handling improvements:
- Add workflow ID and error context to updateWorkflowActivity logs
- Add stack trace, error name, and cause to top-level catch block
- Separate DB failure recording from file logging for clearer error messages
- Add try-catch around staleness cleanup with user-facing error message
- Check sendCriticalMessage return value and log when user not notified

Test coverage additions:
- Add staleness detection tests (stale vs non-stale, fallback to started_at)
- Add /workflow cancel command tests
- Add updateWorkflowActivity function tests (including non-throwing behavior)

All 845 tests pass, type-check clean, lint clean.
This commit is contained in:
Rasmus Widing 2026-01-15 21:31:38 +02:00 committed by GitHub
parent 7c1fd0a034
commit 779f9af63e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 1218 additions and 26 deletions

View file

@ -610,6 +610,7 @@ if (streamingMode === 'batch') {
- Stored in `.archon/workflows/` (or `.claude/workflows/`, `.agents/workflows/`)
- Multi-step AI execution chains
- Discovered at runtime, routed by AI
- Commands: `/workflow list`, `/workflow reload`, `/workflow cancel`
### Example Commands in This Repo

606
docs/new-developer-guide.md Normal file
View file

@ -0,0 +1,606 @@
# Archon: New Developer Guide
> **TL;DR**: Archon lets you control AI coding assistants (Claude Code, Codex) from your phone via Telegram, Slack, Discord, or GitHub. Think of it as a remote control for AI pair programming.
---
## The Problem We Solve
```
┌─────────────────────────────────────────────────────────────────────┐
│ WITHOUT ARCHON │
│ │
│ You're on the train, phone in hand... │
│ │
│ ┌──────────┐ ❌ Can't SSH ┌──────────────────┐ │
│ │ Phone │ ──────────────────────│ Dev Machine │ │
│ │ │ ❌ No terminal │ (Claude Code) │ │
│ └──────────┘ ❌ No IDE └──────────────────┘ │
│ │
│ "I wish I could just message Claude to fix that bug..." │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ WITH ARCHON │
│ │
│ ┌──────────┐ ┌──────────────────┐ │
│ │ Phone │ ─────Telegram────────▶│ Archon Server │ │
│ │ │ "fix issue #42" │ │ │
│ └──────────┘ │ ┌────────────┐ │ │
│ │ │ │Claude Code │ │ │
│ │ │ │ SDK │ │ │
│ │ │ └─────┬──────┘ │ │
│ │ │ │ │ │
│ │ │ ┌─────▼──────┐ │ │
│ │◀────"PR created #127"───────│ │ Git Repo │ │ │
│ │ │ │ (worktree) │ │ │
│ │ └────────────┘ │ │
│ └──────────────────┘ │
│ │
│ You just fixed a bug from your phone. │
└─────────────────────────────────────────────────────────────────────┘
```
---
## Core Concept: Message → AI → Code → Response
```
┌──────────────────────────────────────────────────────────────────────────┐
│ │
│ USER ARCHON CODEBASE │
│ │
│ ┌─────────┐ ┌─────────────────┐ ┌──────────┐ │
│ │Telegram │ │ │ │ │ │
│ │ Slack │───Message─▶│ Orchestrator │───Claude──▶│ Git Repo │ │
│ │ Discord │ │ │ Code │ │ │
│ │ GitHub │◀──Response─│ (routes to │◀──────────│ (files) │ │
│ └─────────┘ │ AI client) │ └──────────┘ │
│ └─────────────────┘ │
│ │
│ That's it. You message, AI works on code, you get results. │
│ │
└──────────────────────────────────────────────────────────────────────────┘
```
---
## The Three Ways to Use Archon
### 1. Direct Chat (Simple Questions)
Just talk to the AI like you would in Claude Code terminal:
```
┌─────────────────────────────────────────────────────────────────┐
│ TELEGRAM CHAT │
├─────────────────────────────────────────────────────────────────┤
│ │
│ You: What does the handleMessage function do? │
│ │
│ Archon: Looking at src/orchestrator/orchestrator.ts... │
│ │
│ The handleMessage function is the main entry point │
│ that routes incoming messages. It: │
│ 1. Checks if it's a slash command │
│ 2. Loads conversation context from database │
│ 3. Routes to AI client for processing │
│ 4. Streams responses back to platform │
│ │
│ See: src/orchestrator/orchestrator.ts:45-120 │
│ │
└─────────────────────────────────────────────────────────────────┘
```
### 2. Slash Commands (Specific Operations)
Deterministic commands that don't involve AI:
```
┌─────────────────────────────────────────────────────────────────┐
│ SLASH COMMANDS │
├─────────────────────────────────────────────────────────────────┤
│ │
│ /clone https://github.com/user/repo Clone a repository │
│ /status Show current state │
│ /repos List available repos │
│ /setcwd /path/to/dir Change working dir │
│ /reset Clear AI session │
│ /help Show all commands │
│ │
└─────────────────────────────────────────────────────────────────┘
```
### 3. Workflows (Multi-Step Automation)
This is where Archon shines - automated multi-step AI workflows:
```
┌─────────────────────────────────────────────────────────────────┐
│ GITHUB ISSUE #42
├─────────────────────────────────────────────────────────────────┤
│ │
│ Title: Login button doesn't work on mobile │
│ │
│ ─────────────────────────────────────────────────────────────── │
│ │
@user commented: │
@archon fix this issue │
│ │
│ ─────────────────────────────────────────────────────────────── │
│ │
@archon commented: │
│ 🔍 Investigation Complete │
│ │
│ Root Cause: Touch event handler missing on mobile │
│ File: src/components/LoginButton.tsx:45 │
│ Fix: Add onTouchEnd handler alongside onClick │
│ │
│ Creating PR... │
│ │
│ ─────────────────────────────────────────────────────────────── │
│ │
@archon commented: │
│ ✅ Fix implemented: PR #127
│ - Added touch event handling │
│ - Added mobile viewport tests │
│ - All tests passing │
│ │
└─────────────────────────────────────────────────────────────────┘
```
---
## How Workflows Work (The Magic)
A workflow is a YAML file that chains AI prompts together:
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ .archon/workflows/fix-github-issue.yaml │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ name: fix-github-issue │ │
│ │ description: Investigate and fix a GitHub issue │ │
│ │ │ │
│ │ steps: │ │
│ │ - command: investigate-issue ◀── Step 1: Research │ │
│ │ - command: implement-issue ◀── Step 2: Fix │ │
│ │ clearContext: true │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ │ │
│ ▼ │
│ │
│ EXECUTION FLOW: │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ ┌────────────┐ │
│ │ investigate- │ │ implement- │ │ │ │
│ │ issue.md │─────▶│ issue.md │─────▶│ PR #127 │ │
│ │ │ │ │ │ │ │
│ │ - Read issue │ │ - Read artifact │ │ Created! │ │
│ │ - Explore code │ │ - Make changes │ │ │ │
│ │ - Find root │ │ - Run tests │ │ │ │
│ │ cause │ │ - Commit │ │ │ │
│ │ - Save artifact │ │ - Create PR │ │ │ │
│ └──────────────────┘ └──────────────────┘ └────────────┘ │
│ │
│ Each "command" is a markdown file with AI instructions. │
│ The workflow executor runs them in sequence. │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## The Router: How Archon Picks Workflows
When you send a message, an AI "router" decides what to do:
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ USER MESSAGE ROUTER DECISION │
│ │
│ "fix this issue" ───────▶ fix-github-issue │
│ "review this PR" ───────▶ comprehensive-pr-review │
│ "what does X do?" ───────▶ assist (catch-all) │
│ "resolve the conflicts" ───────▶ resolve-conflicts │
│ │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ HOW IT WORKS: │
│ │
│ ┌──────────┐ ┌─────────────────────────────────────┐ │
│ │ Message │────▶│ Router AI reads workflow descriptions│ │
│ │ │ │ and picks the best match │ │
│ └──────────┘ └──────────────────┬──────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ /invoke-workflow fix-github-issue │ │
│ └─────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Available Workflows
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ WORKFLOW TRIGGER PHRASES WHAT IT DOES │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ fix-github-issue "fix this issue" Investigate │ │
│ │ "implement #42" + Fix + PR │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ comprehensive-pr-review "review this PR" 5 parallel │ │
│ │ "code review" review agents │ │
│ │ + auto-fix │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ resolve-conflicts "resolve conflicts" Auto-resolve │ │
│ │ "fix merge conflicts" git conflicts │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ ralph-fresh "run ralph-fresh" PRD loop │ │
│ │ ralph-stateful "run ralph-stateful" (autonomous) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ assist (anything else) General help │ │
│ │ "what does X do?" questions, │ │
│ │ "help me debug" debugging │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Parallel Agents: The PR Review Example
The `comprehensive-pr-review` workflow runs 5 AI agents simultaneously:
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ USER: "review this PR" │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Step 1: pr-review-scope Determine what changed │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Step 2: sync-pr-with-main Rebase onto latest main │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Step 3: PARALLEL BLOCK (5 agents running at once) │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ code-review │ │ error- │ │ test- │ │ │
│ │ │ agent │ │ handling │ │ coverage │ │ │
│ │ │ │ │ agent │ │ agent │ │ │
│ │ │ Style, │ │ Catch blocks │ │ Missing │ │ │
│ │ │ patterns, │ │ Silent fails │ │ tests? │ │ │
│ │ │ bugs │ │ Logging │ │ Edge cases │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ comment- │ │ docs- │ │ │
│ │ │ quality │ │ impact │ │ │
│ │ │ agent │ │ agent │ │ │
│ │ │ │ │ │ │ │
│ │ │ Outdated? │ │ README? │ │ │
│ │ │ Accurate? │ │ CLAUDE.md? │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Step 4: synthesize-review Combine all findings │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Step 5: implement-review-fixes Auto-fix CRITICAL/HIGH issues │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## The Ralph Loop: Autonomous PRD Implementation
For larger features, Ralph executes user stories one-by-one until complete:
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ PRD FILE: .archon/ralph/my-feature/prd.json │
│ │
│ { │
│ "stories": [ │
│ { "id": "S1", "title": "Add button", "passes": true }, │
│ { "id": "S2", "title": "Add handler", "passes": true }, │
│ { "id": "S3", "title": "Add tests", "passes": false }, ◀─ NEXT │
│ { "id": "S4", "title": "Add docs", "passes": false } │
│ ] │
│ } │
│ │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ RALPH LOOP EXECUTION: │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Iteration 1 │ │
│ │ ─────────────────────────────────────────────────────────────── │ │
│ │ 1. Read prd.json → Find S3 (first with passes: false) │ │
│ │ 2. Implement S3: "Add tests" │ │
│ │ 3. Run: bun run type-check && bun test │ │
│ │ 4. Commit: "feat: S3 - Add tests" │ │
│ │ 5. Update prd.json: S3.passes = true │ │
│ │ 6. More stories remain → Continue │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Iteration 2 │ │
│ │ ─────────────────────────────────────────────────────────────── │ │
│ │ 1. Read prd.json → Find S4 (next with passes: false) │ │
│ │ 2. Implement S4: "Add docs" │ │
│ │ 3. Run validation │ │
│ │ 4. Commit │ │
│ │ 5. Update prd.json: S4.passes = true │ │
│ │ 6. ALL stories pass → Create PR │ │
│ │ 7. Output: <promise>COMPLETE</promise> │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ LOOP STOPS │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Platform Integration
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ TELEGRAM HOW IT WORKS │
│ ───────────────────────────────────────────────────────────────── │
│ ┌──────────────────┐ - Bot polls for messages │
│ │ @archon_bot │ - Real-time streaming (default) │
│ │ │ - DM the bot directly │
│ │ "fix issue #42" │ - Good for mobile use │
│ └──────────────────┘ │
│ │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ SLACK HOW IT WORKS │
│ ───────────────────────────────────────────────────────────────── │
│ ┌──────────────────┐ - Socket Mode (no webhooks) │
│ │ #dev-channel │ - @mention in threads │
│ │ │ - DM the bot │
│ │ @archon review │ - Good for team visibility │
│ │ this PR │ │
│ └──────────────────┘ │
│ │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ DISCORD HOW IT WORKS │
│ ───────────────────────────────────────────────────────────────── │
│ ┌──────────────────┐ - WebSocket connection │
│ │ #coding-help │ - @mention to activate │
│ │ │ - Thread support │
│ │ @Archon what │ - Good for communities │
│ │ does this do? │ │
│ └──────────────────┘ │
│ │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ GITHUB HOW IT WORKS │
│ ───────────────────────────────────────────────────────────────── │
│ ┌──────────────────┐ - Webhook on issues/PRs │
│ │ Issue #42 │ - @archon in comments │
│ │ │ - Batch mode (single comment) │
│ │ @archon fix │ - Auto-creates PRs │
│ │ this issue │ - Good for automation │
│ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Isolation: Git Worktrees
Each conversation gets its own isolated copy of the repo:
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ ~/.archon/worktrees/ │
│ │ │
│ ├── my-app/ │
│ │ ├── issue-42/ ◀── Conversation about issue #42
│ │ │ └── (full repo) Working on fix for mobile bug │
│ │ │ │
│ │ ├── pr-127/ ◀── Conversation about PR #127
│ │ │ └── (full repo) Reviewing code changes │
│ │ │ │
│ │ └── task-dark-mode/ ◀── Manual feature work │
│ │ └── (full repo) Adding dark mode feature │
│ │ │
│ └── other-repo/ │
│ └── issue-15/ │
│ └── (full repo) │
│ │
│ WHY WORKTREES? │
│ ───────────────────────────────────────────────────────────────── │
│ - Multiple conversations can work simultaneously │
│ - No branch conflicts between parallel work │
│ - Each gets isolated file changes │
│ - Cleaned up when issue/PR closes │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Configuration Hierarchy
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ CONFIGURATION LAYERS (later overrides earlier) │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 1. DEFAULTS (hardcoded) │ │
│ │ assistant: claude │ │
│ │ streaming.telegram: stream │ │
│ │ streaming.github: batch │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 2. GLOBAL CONFIG (~/.archon/config.yaml) │ │
│ │ botName: MyBot │ │
│ │ defaultAssistant: claude │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 3. REPO CONFIG (.archon/config.yaml) │ │
│ │ assistant: codex # This repo prefers Codex │ │
│ │ commands: │ │
│ │ folder: .claude/commands/custom │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 4. ENVIRONMENT VARIABLES (highest priority) │ │
│ │ TELEGRAM_STREAMING_MODE=batch │ │
│ │ DEFAULT_AI_ASSISTANT=claude │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Directory Structure
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ YOUR REPO ARCHON SERVER │
│ │
│ my-app/ ~/.archon/ │
│ ├── .archon/ ├── config.yaml (global cfg) │
│ │ ├── config.yaml ├── workspaces/ (cloned repos)│
│ │ ├── commands/ │ └── user/repo/ │
│ │ │ ├── investigate-issue.md └── worktrees/ (isolation) │
│ │ │ ├── implement-issue.md └── my-app/ │
│ │ │ └── assist.md ├── issue-42/ │
│ │ ├── workflows/ └── pr-127/ │
│ │ │ ├── fix-github-issue.yaml │
│ │ │ └── assist.yaml │
│ │ └── artifacts/ │
│ │ └── issues/ │
│ │ └── issue-42.md │
│ ├── src/ │
│ └── ... │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Quick Reference: Common Interactions
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ WHAT YOU WANT WHAT TO SAY │
│ │
│ Fix a GitHub issue "@archon fix this issue" │
│ Review a PR "@archon review this PR" │
│ Ask a question "What does handleMessage do?" │
│ Resolve conflicts "@archon resolve the conflicts" │
│ See current state "/status" │
│ Clone a repo "/clone https://github.com/u/r" │
│ Switch repos "/repos" then pick one │
│ List available workflows "/workflow list" │
│ Reload workflow definitions "/workflow reload" │
│ Cancel stuck workflow "/workflow cancel" │
│ Start fresh "/reset" │
│ Get help "/help" │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Summary
```
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ ARCHON = Remote Control for AI Coding Assistants │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Phone/Slack/GitHub ──▶ Archon Server ──▶ AI (Claude/Codex) │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ Workflows Git Worktrees │ │
│ │ (automation) (isolation) │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ KEY CAPABILITIES: │
│ ───────────────── │
│ ✓ Message from anywhere (phone, tablet, desktop) │
│ ✓ Automated multi-step workflows │
│ ✓ Parallel AI agents for complex tasks │
│ ✓ Isolated environments per conversation │
│ ✓ Custom prompts versioned in Git │
│ ✓ GitHub integration (issues/PRs/comments) │
│ │
│ WHEN TO USE: │
│ ───────────── │
│ ✓ You want to fix bugs from your phone │
│ ✓ You want automated PR reviews │
│ ✓ You want GitHub issue automation │
│ ✓ You want parallel development without conflicts │
│ ✓ You want custom AI workflows for your team │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Next Steps
1. **Read**: `docs/getting-started.md` - Set up your first instance
2. **Explore**: `.archon/workflows/` - See example workflows
3. **Customize**: `.archon/commands/` - Create your own prompts
4. **Configure**: `.archon/config.yaml` - Tweak settings
Welcome to remote agentic coding!

View file

@ -0,0 +1,16 @@
-- Migration: Add last_activity_at column for staleness detection
-- This enables activity-based staleness detection for stuck workflows
-- Add last_activity_at column
ALTER TABLE remote_agent_workflow_runs
ADD COLUMN IF NOT EXISTS last_activity_at TIMESTAMP WITH TIME ZONE DEFAULT NOW();
-- Backfill existing rows: use completed_at if available, otherwise started_at
UPDATE remote_agent_workflow_runs
SET last_activity_at = COALESCE(completed_at, started_at)
WHERE last_activity_at IS NULL;
-- Partial index for efficient staleness queries on running workflows
CREATE INDEX IF NOT EXISTS idx_workflow_runs_last_activity
ON remote_agent_workflow_runs(last_activity_at)
WHERE status = 'running';

View file

@ -18,6 +18,7 @@ import {
updateWorkflowRun,
completeWorkflowRun,
failWorkflowRun,
updateWorkflowActivity,
} from './workflows';
describe('workflows database', () => {
@ -36,6 +37,7 @@ describe('workflows database', () => {
metadata: {},
started_at: new Date('2025-01-01T00:00:00Z'),
completed_at: null,
last_activity_at: new Date('2025-01-01T00:00:00Z'),
};
describe('createWorkflowRun', () => {
@ -304,4 +306,27 @@ describe('workflows database', () => {
);
});
});
describe('updateWorkflowActivity', () => {
test('updates last_activity_at timestamp', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([]));
await updateWorkflowActivity('workflow-run-123');
expect(mockQuery).toHaveBeenCalledWith(
'UPDATE remote_agent_workflow_runs SET last_activity_at = NOW() WHERE id = $1',
['workflow-run-123']
);
});
test('does not throw on database error (non-throwing design)', async () => {
mockQuery.mockRejectedValueOnce(new Error('Connection lost'));
// Should not throw - just logs the error
await updateWorkflowActivity('workflow-run-123');
// Verify the query was attempted
expect(mockQuery).toHaveBeenCalled();
});
});
});

View file

@ -156,3 +156,26 @@ export async function failWorkflowRun(id: string, error: string): Promise<void>
throw new Error(`Failed to fail workflow run: ${err.message}`);
}
}
/**
* Update last_activity_at timestamp for a workflow run.
* Used for activity-based staleness detection.
* Non-throwing: logs errors but doesn't fail the workflow.
*/
export async function updateWorkflowActivity(id: string): Promise<void> {
try {
await pool.query(
'UPDATE remote_agent_workflow_runs SET last_activity_at = NOW() WHERE id = $1',
[id]
);
} catch (error) {
const err = error as Error;
// Non-critical - log with full context but don't throw
// Note: If this fails repeatedly, staleness detection may be degraded
console.error('[DB:Workflows] Failed to update activity:', {
workflowId: id,
error: err.message,
errorName: err.name,
});
}
}

View file

@ -26,6 +26,10 @@ const mockDeleteCodebase = mock(() => Promise.resolve());
const mockGetActiveSession = mock(() => Promise.resolve(null));
const mockDeactivateSession = mock(() => Promise.resolve());
// Workflow database mocks
const mockGetActiveWorkflowRun = mock(() => Promise.resolve(null));
const mockFailWorkflowRun = mock(() => Promise.resolve());
// Spies for internal modules (use spyOn instead of mock.module to avoid global pollution)
let spyIsPathWithinWorkspace: ReturnType<typeof spyOn>;
let spyExecFileAsync: ReturnType<typeof spyOn>;
@ -63,6 +67,11 @@ mock.module('../db/sessions', () => ({
deactivateSession: mockDeactivateSession,
}));
mock.module('../db/workflows', () => ({
getActiveWorkflowRun: mockGetActiveWorkflowRun,
failWorkflowRun: mockFailWorkflowRun,
}));
// Mock isolation-environments database
const mockIsolationEnvDbCreate = mock(() =>
Promise.resolve({
@ -137,6 +146,9 @@ function clearAllMocks(): void {
mockDeleteCodebase.mockClear();
mockGetActiveSession.mockClear();
mockDeactivateSession.mockClear();
// Workflow db mocks
mockGetActiveWorkflowRun.mockClear();
mockFailWorkflowRun.mockClear();
// Isolation mocks
mockIsolationCreate.mockClear();
mockIsolationDestroy.mockClear();
@ -1579,5 +1591,63 @@ describe('CommandHandler', () => {
expect(result.message).toContain('Repository not found: NonExistent');
});
});
describe('/workflow cancel', () => {
const conversationWithCodebase: Conversation = {
...baseConversation,
codebase_id: 'codebase-123',
};
beforeEach(() => {
// Mock getCodebase to return a valid codebase
mockGetCodebase.mockResolvedValue({
id: 'codebase-123',
repository_url: 'https://github.com/test/repo',
default_cwd: '/workspace/test-repo',
commands: {},
created_at: new Date(),
updated_at: new Date(),
});
});
test('should cancel active workflow and return success message', async () => {
mockGetActiveWorkflowRun.mockResolvedValueOnce({
id: 'wf-123',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
status: 'running',
started_at: new Date(),
completed_at: null,
current_step_index: 0,
user_message: 'test',
metadata: {},
last_activity_at: new Date(),
});
const result = await handleCommand(conversationWithCodebase, '/workflow cancel');
expect(result.success).toBe(true);
expect(result.message).toContain('Cancelled workflow');
expect(result.message).toContain('test-workflow');
expect(mockFailWorkflowRun).toHaveBeenCalledWith('wf-123', 'Cancelled by user');
});
test('should return message when no active workflow exists', async () => {
mockGetActiveWorkflowRun.mockResolvedValueOnce(null);
const result = await handleCommand(conversationWithCodebase, '/workflow cancel');
expect(result.success).toBe(true);
expect(result.message).toBe('No active workflow to cancel.');
expect(mockFailWorkflowRun).not.toHaveBeenCalled();
});
test('should fail when no codebase is configured', async () => {
const result = await handleCommand(baseConversation, '/workflow cancel');
expect(result.success).toBe(false);
expect(result.message).toContain('No codebase configured');
});
});
});
});

View file

@ -23,6 +23,7 @@ import {
import { getArchonWorkspacesPath, getCommandFolderSearchPaths } from '../utils/archon-paths';
import { discoverWorkflows } from '../workflows';
import { isSingleStep } from '../workflows/types';
import * as workflowDb from '../db/workflows';
/**
* Convert an absolute path to a relative path from the repository root
@ -207,6 +208,7 @@ Worktrees:
Workflows:
/workflow list - Show available workflows
/workflow reload - Reload workflow definitions
/workflow cancel - Cancel running workflow
Note: Workflows are YAML files in .archon/workflows/
Session:
@ -1346,11 +1348,28 @@ Setup:
};
}
case 'cancel': {
// Cancel (force-fail) any running workflow for this conversation
const activeWorkflow = await workflowDb.getActiveWorkflowRun(conversation.id);
if (!activeWorkflow) {
return {
success: true,
message: 'No active workflow to cancel.',
};
}
await workflowDb.failWorkflowRun(activeWorkflow.id, 'Cancelled by user');
return {
success: true,
message: `Cancelled workflow: \`${activeWorkflow.workflow_name}\``,
};
}
default:
return {
success: false,
message:
'Usage:\n /workflow list - Show available workflows\n /workflow reload - Reload workflow definitions',
'Usage:\n /workflow list - Show available workflows\n /workflow reload - Reload workflow definitions\n /workflow cancel - Cancel running workflow',
};
}
}

View file

@ -1108,22 +1108,29 @@ describe('Workflow Executor', () => {
expect(logContext).toHaveProperty('platformType');
});
it('should throw on fatal authentication errors', async () => {
it('should mark workflow as failed on fatal authentication errors (no throw)', async () => {
const sendMessageMock = mock(() =>
Promise.reject(new Error('401 Unauthorized: Invalid token'))
);
mockPlatform.sendMessage = sendMessageMock;
await expect(
executeWorkflow(
mockPlatform,
'conv-123',
testDir,
{ name: 'test-workflow', description: 'Test', steps: [{ command: 'command-one' }] },
'User message',
'db-conv-id'
)
).rejects.toThrow('Platform authentication/permission error');
// With top-level error handling, executeWorkflow should NOT throw
// Instead it marks the workflow as failed and returns normally
await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
{ name: 'test-workflow', description: 'Test', steps: [{ command: 'command-one' }] },
'User message',
'db-conv-id'
);
// Verify workflow was marked as failed in database
const failCalls = mockQuery.mock.calls.filter(
(call: unknown[]) =>
(call[0] as string).includes('UPDATE') && (call[0] as string).includes("'failed'")
);
expect(failCalls.length).toBeGreaterThan(0);
});
it('should continue workflow when tool message send fails in streaming mode', async () => {
@ -1288,6 +1295,346 @@ describe('Workflow Executor', () => {
// Should have retried
expect(completionCalls.length).toBeGreaterThanOrEqual(1);
});
describe('staleness detection', () => {
it('should fail stale workflow and start new one when last_activity_at > 15 min ago', async () => {
// Mock getActiveWorkflowRun to return a stale workflow (20 min inactive)
const staleTime = new Date(Date.now() - 20 * 60 * 1000); // 20 minutes ago
mockQuery.mockImplementation((query: string) => {
if (query.includes("status = 'running'")) {
return Promise.resolve(
createQueryResult([
{
id: 'stale-workflow-id',
workflow_name: 'old-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: staleTime,
last_activity_at: staleTime,
completed_at: null,
current_step_index: 0,
user_message: 'old message',
metadata: {},
},
])
);
}
if (query.includes('INSERT INTO remote_agent_workflow_runs')) {
return Promise.resolve(
createQueryResult([
{
id: 'new-workflow-run-id',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: new Date(),
last_activity_at: new Date(),
completed_at: null,
current_step_index: 0,
user_message: 'test user message',
metadata: {},
},
])
);
}
return Promise.resolve(createQueryResult([]));
});
await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
{ name: 'test-workflow', description: 'Test', steps: [{ command: 'command-one' }] },
'User message',
'db-conv-id'
);
// Verify stale workflow was marked as failed
const failCalls = mockQuery.mock.calls.filter(
(call: unknown[]) =>
(call[0] as string).includes('UPDATE') &&
(call[0] as string).includes("'failed'") &&
(call[1] as string[])?.includes('stale-workflow-id')
);
expect(failCalls.length).toBeGreaterThan(0);
// Verify new workflow was created
const insertCalls = mockQuery.mock.calls.filter((call: unknown[]) =>
(call[0] as string).includes('INSERT INTO remote_agent_workflow_runs')
);
expect(insertCalls.length).toBeGreaterThan(0);
// Reset mock
mockQuery.mockImplementation((query: string) => {
if (query.includes("status = 'running'")) {
return Promise.resolve(createQueryResult([]));
}
if (query.includes('INSERT INTO remote_agent_workflow_runs')) {
return Promise.resolve(
createQueryResult([
{
id: 'test-workflow-run-id',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: new Date(),
completed_at: null,
current_step_index: 0,
user_message: 'test user message',
metadata: {},
},
])
);
}
return Promise.resolve(createQueryResult([]));
});
});
it('should block new workflow when active workflow is not stale', async () => {
// Mock getActiveWorkflowRun to return a recent workflow (5 min inactive)
const recentTime = new Date(Date.now() - 5 * 60 * 1000); // 5 minutes ago
mockQuery.mockImplementation((query: string) => {
if (query.includes("status = 'running'")) {
return Promise.resolve(
createQueryResult([
{
id: 'active-workflow-id',
workflow_name: 'active-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: recentTime,
last_activity_at: recentTime,
completed_at: null,
current_step_index: 0,
user_message: 'active message',
metadata: {},
},
])
);
}
return Promise.resolve(createQueryResult([]));
});
await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
{ name: 'test-workflow', description: 'Test', steps: [{ command: 'command-one' }] },
'User message',
'db-conv-id'
);
// Verify rejection message was sent
const sendMessage = mockPlatform.sendMessage as ReturnType<typeof mock>;
const calls = sendMessage.mock.calls;
const blockingMessages = calls.filter((call: unknown[]) =>
(call[1] as string).includes('Workflow already running')
);
expect(blockingMessages.length).toBe(1);
// Verify no INSERT was made (new workflow not created)
const insertCalls = mockQuery.mock.calls.filter((call: unknown[]) =>
(call[0] as string).includes('INSERT INTO remote_agent_workflow_runs')
);
expect(insertCalls.length).toBe(0);
// Reset mock
mockQuery.mockImplementation((query: string) => {
if (query.includes("status = 'running'")) {
return Promise.resolve(createQueryResult([]));
}
if (query.includes('INSERT INTO remote_agent_workflow_runs')) {
return Promise.resolve(
createQueryResult([
{
id: 'test-workflow-run-id',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: new Date(),
completed_at: null,
current_step_index: 0,
user_message: 'test user message',
metadata: {},
},
])
);
}
return Promise.resolve(createQueryResult([]));
});
});
it('should fallback to started_at when last_activity_at is null', async () => {
// Mock getActiveWorkflowRun to return a workflow with null last_activity_at but old started_at
const staleTime = new Date(Date.now() - 20 * 60 * 1000); // 20 minutes ago
mockQuery.mockImplementation((query: string) => {
if (query.includes("status = 'running'")) {
return Promise.resolve(
createQueryResult([
{
id: 'stale-workflow-id',
workflow_name: 'old-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: staleTime,
last_activity_at: null, // null - should fallback to started_at
completed_at: null,
current_step_index: 0,
user_message: 'old message',
metadata: {},
},
])
);
}
if (query.includes('INSERT INTO remote_agent_workflow_runs')) {
return Promise.resolve(
createQueryResult([
{
id: 'new-workflow-run-id',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: new Date(),
last_activity_at: new Date(),
completed_at: null,
current_step_index: 0,
user_message: 'test user message',
metadata: {},
},
])
);
}
return Promise.resolve(createQueryResult([]));
});
await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
{ name: 'test-workflow', description: 'Test', steps: [{ command: 'command-one' }] },
'User message',
'db-conv-id'
);
// Verify stale workflow was marked as failed (fallback to started_at worked)
const failCalls = mockQuery.mock.calls.filter(
(call: unknown[]) =>
(call[0] as string).includes('UPDATE') &&
(call[0] as string).includes("'failed'") &&
(call[1] as string[])?.includes('stale-workflow-id')
);
expect(failCalls.length).toBeGreaterThan(0);
// Reset mock
mockQuery.mockImplementation((query: string) => {
if (query.includes("status = 'running'")) {
return Promise.resolve(createQueryResult([]));
}
if (query.includes('INSERT INTO remote_agent_workflow_runs')) {
return Promise.resolve(
createQueryResult([
{
id: 'test-workflow-run-id',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: new Date(),
completed_at: null,
current_step_index: 0,
user_message: 'test user message',
metadata: {},
},
])
);
}
return Promise.resolve(createQueryResult([]));
});
});
it('should show cleanup error message when failWorkflowRun fails for stale workflow', async () => {
// Mock getActiveWorkflowRun to return a stale workflow
const staleTime = new Date(Date.now() - 20 * 60 * 1000);
let failWorkflowCallCount = 0;
mockQuery.mockImplementation((query: string) => {
if (query.includes("status = 'running'")) {
return Promise.resolve(
createQueryResult([
{
id: 'stale-workflow-id',
workflow_name: 'old-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: staleTime,
last_activity_at: staleTime,
completed_at: null,
current_step_index: 0,
user_message: 'old message',
metadata: {},
},
])
);
}
// Fail the staleness cleanup query
if (query.includes('UPDATE') && query.includes("'failed'")) {
failWorkflowCallCount++;
if (failWorkflowCallCount === 1) {
// First fail call is for staleness cleanup
return Promise.reject(new Error('Database connection lost'));
}
}
return Promise.resolve(createQueryResult([]));
});
await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
{ name: 'test-workflow', description: 'Test', steps: [{ command: 'command-one' }] },
'User message',
'db-conv-id'
);
// Verify user received cleanup error message
const sendMessage = mockPlatform.sendMessage as ReturnType<typeof mock>;
const calls = sendMessage.mock.calls;
const cleanupErrorMessages = calls.filter((call: unknown[]) =>
(call[1] as string).includes('Workflow blocked') &&
(call[1] as string).includes('/workflow cancel')
);
expect(cleanupErrorMessages.length).toBe(1);
// Verify no new workflow was created
const insertCalls = mockQuery.mock.calls.filter((call: unknown[]) =>
(call[0] as string).includes('INSERT INTO remote_agent_workflow_runs')
);
expect(insertCalls.length).toBe(0);
// Reset mock
mockQuery.mockImplementation((query: string) => {
if (query.includes("status = 'running'")) {
return Promise.resolve(createQueryResult([]));
}
if (query.includes('INSERT INTO remote_agent_workflow_runs')) {
return Promise.resolve(
createQueryResult([
{
id: 'test-workflow-run-id',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
status: 'running' as const,
started_at: new Date(),
completed_at: null,
current_step_index: 0,
user_message: 'test user message',
metadata: {},
},
])
);
}
return Promise.resolve(createQueryResult([]));
});
});
});
});
describe('loop workflow execution', () => {

View file

@ -480,6 +480,9 @@ async function executeStepInternal(
let droppedMessageCount = 0;
for await (const msg of aiClient.sendQuery(substitutedPrompt, cwd, resumeSessionId)) {
// Update activity timestamp on each message (non-blocking, non-critical)
void workflowDb.updateWorkflowActivity(workflowRun.id);
if (msg.type === 'assistant' && msg.content) {
if (streamingMode === 'stream') {
const sent = await safeSendMessage(platform, conversationId, msg.content, messageContext);
@ -724,6 +727,9 @@ async function executeLoopWorkflow(
let droppedMessageCount = 0;
for await (const msg of aiClient.sendQuery(substitutedPrompt, cwd, resumeSessionId)) {
// Update activity timestamp on each message (non-blocking, non-critical)
void workflowDb.updateWorkflowActivity(workflowRun.id);
if (msg.type === 'assistant' && msg.content) {
fullOutput += msg.content;
if (streamingMode === 'stream') {
@ -857,15 +863,44 @@ export async function executeWorkflow(
console.log(`[WorkflowExecutor] Using configured command folder: ${configuredCommandFolder}`);
}
// Check for concurrent workflow execution
// Check for concurrent workflow execution with staleness detection
const activeWorkflow = await workflowDb.getActiveWorkflowRun(conversationDbId);
if (activeWorkflow) {
await sendCriticalMessage(
platform,
conversationId,
`❌ **Workflow already running**: A \`${activeWorkflow.workflow_name}\` workflow is already running for this issue. Please wait for it to complete before starting another.`
);
return;
// Check staleness based on last activity, not start time
const lastActivity = activeWorkflow.last_activity_at ?? activeWorkflow.started_at;
const minutesSinceActivity = (Date.now() - new Date(lastActivity).getTime()) / (1000 * 60);
const STALE_MINUTES = 15; // Workflow is stale if no activity for 15 minutes
if (minutesSinceActivity > STALE_MINUTES) {
console.log(
`[WorkflowExecutor] Marking stale workflow as failed: ${activeWorkflow.id} (${Math.floor(minutesSinceActivity)} min inactive)`
);
try {
await workflowDb.failWorkflowRun(
activeWorkflow.id,
`Workflow timed out after ${Math.floor(minutesSinceActivity)} minutes of inactivity`
);
} catch (cleanupError) {
console.error('[WorkflowExecutor] Failed to cleanup stale workflow:', {
staleWorkflowId: activeWorkflow.id,
error: (cleanupError as Error).message,
});
await sendCriticalMessage(
platform,
conversationId,
'❌ **Workflow blocked**: A stale workflow exists but cleanup failed. Try `/workflow cancel` first.'
);
return;
}
// Continue to create new workflow
} else {
await sendCriticalMessage(
platform,
conversationId,
`❌ **Workflow already running**: A \`${activeWorkflow.workflow_name}\` workflow is already running for this issue. Please wait for it to complete before starting another.`
);
return;
}
}
// Create workflow run record
@ -893,13 +928,15 @@ export async function executeWorkflow(
return;
}
console.log(`[WorkflowExecutor] Starting workflow: ${workflow.name} (${workflowRun.id})`);
await logWorkflowStart(cwd, workflowRun.id, workflow.name, userMessage);
// Wrap execution in try-catch to ensure workflow is marked as failed on any error
try {
console.log(`[WorkflowExecutor] Starting workflow: ${workflow.name} (${workflowRun.id})`);
await logWorkflowStart(cwd, workflowRun.id, workflow.name, userMessage);
// Context for error logging
const workflowContext: SendMessageContext = {
workflowId: workflowRun.id,
};
// Context for error logging
const workflowContext: SendMessageContext = {
workflowId: workflowRun.id,
};
// Notify user - use type narrowing from discriminated union
const stepsInfo = workflow.steps
@ -1105,5 +1142,52 @@ export async function executeWorkflow(
});
}
console.log(`[WorkflowExecutor] Workflow completed: ${workflow.name}`);
console.log(`[WorkflowExecutor] Workflow completed: ${workflow.name}`);
} catch (error) {
// Top-level error handler: ensure workflow is marked as failed
const err = error as Error;
console.error('[WorkflowExecutor] Workflow execution failed with unhandled error:', {
error: err.message,
errorName: err.name,
stack: err.stack,
cause: err.cause,
workflow: workflow.name,
workflowId: workflowRun.id,
});
// Record failure in database (non-blocking - log but don't re-throw on DB error)
try {
await workflowDb.failWorkflowRun(workflowRun.id, err.message);
} catch (dbError) {
console.error('[WorkflowExecutor] Failed to record workflow failure in database:', {
workflowId: workflowRun.id,
originalError: err.message,
dbError: (dbError as Error).message,
});
}
// Log to file (separate from database - non-blocking)
try {
await logWorkflowError(cwd, workflowRun.id, err.message);
} catch (logError) {
console.error('[WorkflowExecutor] Failed to write workflow error to log file:', {
workflowId: workflowRun.id,
logError: (logError as Error).message,
});
}
// Notify user about the failure
const delivered = await sendCriticalMessage(
platform,
conversationId,
`❌ **Workflow failed**: ${err.message}`
);
if (!delivered) {
console.error('[WorkflowExecutor] ALERT: User was NOT notified of workflow failure', {
workflowId: workflowRun.id,
originalError: err.message,
});
}
// Don't re-throw - orchestrator already has error handling
}
}

View file

@ -104,6 +104,7 @@ export interface WorkflowRun {
metadata: Record<string, unknown>;
started_at: Date;
completed_at: Date | null;
last_activity_at: Date | null; // For staleness detection
}
/**