feat: resume failed workflows from prior artifacts on same branch (#440)

* feat: resume failed workflows from prior artifacts on same branch

When a workflow fails mid-execution (rate limits, crashes) and is
re-run on the same branch/worktree, the executor now detects the prior
failed run and resumes from the first incomplete step instead of
starting fresh.

- Adds working_path column to workflow_runs (migration 019 + combined)
- Adds findResumableRun() and resumeWorkflowRun() to db/workflows
- Resume detection runs before createWorkflowRun in the executor;
  falls through to fresh run on DB error (non-critical)
- Step skip logic at top of loop emits step_skipped_prior_success events
- Only resumes when current_step_index > 0 (something to skip)
- Completed runs and runs with 0 completed steps always get fresh start

* fix: add working_path to SQLite schema and split resume error handling

Two gaps from the initial implementation:

1. SQLite migrateColumns() never added working_path for existing
   databases. createSchema() already had it for fresh installs, but
   existing users would hit 'no such column: working_path' on first
   run. Added the IF NOT EXISTS guard alongside parent_conversation_id.
   Also added working_path to createSchema() CREATE TABLE for completeness.

2. findResumableRun and resumeWorkflowRun shared a single catch block
   that treated both errors as non-critical (fall through to fresh run).
   resumeWorkflowRun failing is different: a run was detected but
   couldn't be activated, so silently creating a fresh run would leave
   the user without prior artifacts. Split into two separate try-catch
   blocks: findResumableRun errors fall through (non-critical),
   resumeWorkflowRun errors propagate as { success: false }.

* fix: address resume workflow review feedback

- Add conversation_id filter to findResumableRun to prevent cross-conversation resume leaks
- Fix stepNumber initialization on resume (now starts at resumeFromStepIndex to show correct Step X/N)
- Add session-context warning to resume notification message
- Guard updateWorkflowRun(status=running) behind !resumeFromStepIndex to avoid redundant write on resumed runs
- Fix resumeWorkflowRun: move not-found throw outside try-catch to distinguish race from DB error
- Lower findResumableRun log level from error to warn (failure is non-critical at executor level)
- Add user notification when findResumableRun silently falls through to fresh run
- Log when current_step_index=0 guard drops a found-but-not-resumable run
- Use WorkflowRun directly instead of Awaited<ReturnType<...>> for workflowRun variable
- Add comment explaining no emitter.emit for step_skipped_prior_success events
- Add unit tests for findResumableRun and resumeWorkflowRun in workflows.test.ts
- Add executor tests for resume activation failure and findResumableRun error fall-through
- Fix mockQuery.mockRestore() → mockImplementation(defaultMockQuery) in resume tests
- Tighten resume message assertion to verify step numbers and context warning text

* feat: extend workflow resume to loop workflows

Loop workflows now resume from the recorded iteration instead of always
restarting from iteration 1. Derives startIteration from
workflowRun.current_step_index (set at the start of each iteration), and
notifies the user when resuming mid-loop.

* fix: harden loop workflow resume against edge cases

- Guard against startIteration > max_iterations (YAML reduced between runs):
  fail fast with a clear user message instead of silently misfiring
- Fix needsFreshSession to use startIteration instead of i===1 so the
  first resumed iteration is correctly treated as a session start
- Show resume banner for all resumed loop runs (not just startIteration > 1),
  since resuming from iteration 1 is still a resume not a fresh start
- Add comment clarifying that current_step_index is written at iteration START
This commit is contained in:
Rasmus Widing 2026-02-18 12:29:30 +02:00 committed by GitHub
parent 2213bc5dad
commit 54697d40d1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 626 additions and 45 deletions

View file

@ -173,7 +173,8 @@ CREATE TABLE IF NOT EXISTS remote_agent_workflow_runs (
parent_conversation_id UUID REFERENCES remote_agent_conversations(id) ON DELETE SET NULL,
started_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
completed_at TIMESTAMP WITH TIME ZONE,
last_activity_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
last_activity_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
working_path TEXT
);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_conversation

View file

@ -0,0 +1,7 @@
-- Add working_path to workflow_runs for resume detection
-- Version: 19.0
-- Description: Stores the cwd (worktree path) for each workflow run so
-- re-runs on the same branch can find prior failed runs and resume.
ALTER TABLE remote_agent_workflow_runs
ADD COLUMN IF NOT EXISTS working_path TEXT;

View file

@ -174,6 +174,10 @@ export class SqliteAdapter implements IDatabase {
'ALTER TABLE remote_agent_workflow_runs ADD COLUMN parent_conversation_id TEXT'
);
}
if (!wfColNames.has('working_path')) {
this.db.run('ALTER TABLE remote_agent_workflow_runs ADD COLUMN working_path TEXT');
}
} catch (e: unknown) {
getLog().warn({ err: e as Error }, 'migration_workflow_runs_columns_failed');
}
@ -280,7 +284,8 @@ export class SqliteAdapter implements IDatabase {
parent_conversation_id TEXT REFERENCES remote_agent_conversations(id) ON DELETE SET NULL,
started_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
last_activity_at TEXT DEFAULT (datetime('now'))
last_activity_at TEXT DEFAULT (datetime('now')),
working_path TEXT
);
-- Workflow events table

View file

@ -20,6 +20,8 @@ import {
completeWorkflowRun,
failWorkflowRun,
updateWorkflowActivity,
findResumableRun,
resumeWorkflowRun,
} from './workflows';
describe('workflows database', () => {
@ -31,6 +33,7 @@ describe('workflows database', () => {
id: 'workflow-run-123',
workflow_name: 'feature-development',
conversation_id: 'conv-456',
parent_conversation_id: null,
codebase_id: 'codebase-789',
current_step_index: 0,
status: 'running',
@ -39,6 +42,7 @@ describe('workflows database', () => {
started_at: new Date('2025-01-01T00:00:00Z'),
completed_at: null,
last_activity_at: new Date('2025-01-01T00:00:00Z'),
working_path: null,
};
describe('createWorkflowRun', () => {
@ -55,7 +59,7 @@ describe('workflows database', () => {
expect(result).toEqual(mockWorkflowRun);
expect(mockQuery).toHaveBeenCalledWith(
expect.stringContaining('INSERT INTO remote_agent_workflow_runs'),
['feature-development', 'conv-456', 'codebase-789', 'Add dark mode support', '{}']
['feature-development', 'conv-456', 'codebase-789', 'Add dark mode support', '{}', null]
);
});
@ -83,6 +87,7 @@ describe('workflows database', () => {
'codebase-789',
'Add dark mode support',
JSON.stringify({ github_context: 'Issue #42 context' }),
null,
]
);
});
@ -100,7 +105,7 @@ describe('workflows database', () => {
expect(result.codebase_id).toBeNull();
expect(mockQuery).toHaveBeenCalledWith(
expect.stringContaining('INSERT INTO remote_agent_workflow_runs'),
['feature-development', 'conv-456', null, 'Add dark mode support', '{}']
['feature-development', 'conv-456', null, 'Add dark mode support', '{}', null]
);
});
});
@ -384,4 +389,74 @@ describe('workflows database', () => {
expect(mockQuery).toHaveBeenCalled();
});
});
describe('findResumableRun', () => {
test('returns the most recent failed run matching workflow name, path, and conversation', async () => {
const failedRun = {
...mockWorkflowRun,
status: 'failed' as const,
working_path: '/repo/path',
};
mockQuery.mockResolvedValueOnce(createQueryResult([failedRun]));
const result = await findResumableRun('feature-development', '/repo/path', 'conv-456');
expect(result).toEqual(failedRun);
const [query, params] = mockQuery.mock.calls[0] as [string, unknown[]];
expect(query).toContain("status = 'failed'");
expect(query).toContain('working_path = $2');
expect(query).toContain('conversation_id = $3');
expect(query).toContain('ORDER BY started_at DESC');
expect(params).toEqual(['feature-development', '/repo/path', 'conv-456']);
});
test('returns null when no failed run exists', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([]));
const result = await findResumableRun('feature-development', '/repo/path', 'conv-456');
expect(result).toBeNull();
});
test('throws on database error', async () => {
mockQuery.mockRejectedValueOnce(new Error('Connection refused'));
await expect(findResumableRun('test', '/path', 'conv-456')).rejects.toThrow(
'Failed to find resumable run: Connection refused'
);
});
});
describe('resumeWorkflowRun', () => {
test('updates run to running, clears completed_at, and returns updated row', async () => {
const updatedRun = { ...mockWorkflowRun, status: 'running' as const, completed_at: null };
mockQuery.mockResolvedValueOnce(createQueryResult([updatedRun]));
const result = await resumeWorkflowRun('workflow-run-123');
expect(result.status).toBe('running');
expect(result.completed_at).toBeNull();
const [query, params] = mockQuery.mock.calls[0] as [string, unknown[]];
expect(query).toContain("status = 'running'");
expect(query).toContain('completed_at = NULL');
expect(query).toContain('RETURNING *');
expect(params).toEqual(['workflow-run-123']);
});
test('throws when no row matched (run not found)', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([]));
await expect(resumeWorkflowRun('nonexistent-id')).rejects.toThrow(
'Workflow run not found (id: nonexistent-id)'
);
});
test('throws on database error', async () => {
mockQuery.mockRejectedValueOnce(new Error('Lock timeout'));
await expect(resumeWorkflowRun('workflow-run-123')).rejects.toThrow(
'Failed to resume workflow run: Lock timeout'
);
});
});
});

View file

@ -18,6 +18,7 @@ export async function createWorkflowRun(data: {
codebase_id?: string;
user_message: string;
metadata?: Record<string, unknown>;
working_path?: string;
}): Promise<WorkflowRun> {
// Serialize metadata with validation to catch circular references early
let metadataJson: string;
@ -52,8 +53,8 @@ export async function createWorkflowRun(data: {
try {
const result = await pool.query<WorkflowRun>(
`INSERT INTO remote_agent_workflow_runs
(workflow_name, conversation_id, codebase_id, user_message, metadata)
VALUES ($1, $2, $3, $4, $5)
(workflow_name, conversation_id, codebase_id, user_message, metadata, working_path)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *`,
[
data.workflow_name,
@ -61,6 +62,7 @@ export async function createWorkflowRun(data: {
data.codebase_id ?? null,
data.user_message,
metadataJson,
data.working_path ?? null,
]
);
const row = result.rows[0];
@ -107,6 +109,54 @@ export async function getActiveWorkflowRun(conversationId: string): Promise<Work
}
}
export async function findResumableRun(
workflowName: string,
workingPath: string,
conversationId: string
): Promise<WorkflowRun | null> {
try {
const result = await pool.query<WorkflowRun>(
`SELECT * FROM remote_agent_workflow_runs
WHERE workflow_name = $1
AND working_path = $2
AND conversation_id = $3
AND status = 'failed'
ORDER BY started_at DESC
LIMIT 1`,
[workflowName, workingPath, conversationId]
);
return result.rows[0] ?? null;
} catch (error) {
const err = error as Error;
getLog().warn({ err, workflowName, workingPath }, 'find_resumable_run_failed');
throw new Error(`Failed to find resumable run: ${err.message}`);
}
}
export async function resumeWorkflowRun(id: string): Promise<WorkflowRun> {
let result;
try {
result = await pool.query<WorkflowRun>(
`UPDATE remote_agent_workflow_runs
SET status = 'running', completed_at = NULL, last_activity_at = NOW()
WHERE id = $1
RETURNING *`,
[id]
);
} catch (error) {
const err = error as Error;
getLog().error({ err, workflowRunId: id }, 'resume_workflow_run_failed');
throw new Error(`Failed to resume workflow run: ${err.message}`);
}
const row = result.rows[0];
if (!row) {
// Logical race: run was deleted or already activated between find and resume
getLog().warn({ workflowRunId: id }, 'resume_workflow_run_not_found');
throw new Error(`Workflow run not found (id: ${id})`);
}
return row;
}
/**
* Find the most recent workflow run for a worker platform conversation ID.
* Joins with conversations table to resolve platform_conversation_id DB id.

View file

@ -17,8 +17,16 @@ import type { WorkflowDefinition } from './types';
import { createQueryResult } from '../test/mocks/database';
import { createMockLogger } from '../test/mocks/logger';
// Mock at the connection level to avoid polluting db/workflows module
const mockQuery = mock((query: string) => {
// Default mock implementation — extracted so tests can restore it after overriding
function defaultMockQuery(query: string): Promise<ReturnType<typeof createQueryResult>> {
// For findResumableRun query (status='failed' + working_path): no resumable run by default
if (query.includes("status = 'failed'") && query.includes('working_path')) {
return Promise.resolve(createQueryResult([]));
}
// For resumeWorkflowRun UPDATE (status='running' + completed_at = NULL): empty by default
if (query.includes("status = 'running'") && query.includes('completed_at = NULL')) {
return Promise.resolve(createQueryResult([]));
}
// For getActiveWorkflowRun query, return no active workflow by default
if (query.includes("status = 'running'")) {
return Promise.resolve(createQueryResult([]));
@ -38,6 +46,9 @@ const mockQuery = mock((query: string) => {
metadata: {},
started_at: new Date(),
completed_at: null,
last_activity_at: null,
working_path: null,
parent_conversation_id: null,
},
])
);
@ -52,7 +63,10 @@ const mockQuery = mock((query: string) => {
}
// Default: empty result for UPDATE queries and other operations
return Promise.resolve(createQueryResult([]));
});
}
// Mock at the connection level to avoid polluting db/workflows module
const mockQuery = mock(defaultMockQuery);
mock.module('../db/connection', () => ({
pool: {
@ -156,11 +170,11 @@ describe('Workflow Executor', () => {
mock.restore();
});
// Helper: Get count of workflow status updates in database
// Helper: Get count of workflow status UPDATE operations in database
function getWorkflowStatusUpdates(status: 'failed' | 'completed'): unknown[][] {
return mockQuery.mock.calls.filter(
(call: unknown[]) =>
(call[0] as string).includes('remote_agent_workflow_runs') &&
(call[0] as string).includes('UPDATE remote_agent_workflow_runs') &&
(call[0] as string).includes(`status = '${status}'`)
);
}
@ -4208,4 +4222,315 @@ describe('app defaults command loading', () => {
expect(notFoundMessages.length).toBeGreaterThan(0);
});
});
describe('workflow resume', () => {
const twoStepWorkflow: WorkflowDefinition = {
name: 'test-workflow',
description: 'Two-step workflow for resume testing',
provider: 'claude',
steps: [{ command: 'command-one' }, { command: 'command-two' }],
};
it('creates a fresh run when no prior failed run exists', async () => {
// mockQuery already returns [] for status='failed' queries by default
await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
twoStepWorkflow,
'User message',
'db-conv-id'
);
const insertCalls = mockQuery.mock.calls.filter((call: unknown[]) =>
(call[0] as string).includes('INSERT INTO remote_agent_workflow_runs')
);
expect(insertCalls.length).toBe(1);
});
it('resumes a prior failed run when found with completed steps', async () => {
const priorRunId = 'prior-run-id';
const priorRun = {
id: priorRunId,
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
parent_conversation_id: null,
codebase_id: null,
current_step_index: 1, // step 0 completed
status: 'failed' as const,
user_message: 'original message',
metadata: {},
started_at: new Date(),
completed_at: new Date(),
last_activity_at: new Date(),
working_path: testDir,
};
const resumedRun = { ...priorRun, status: 'running' as const, completed_at: null };
mockQuery.mockImplementation((query: string) => {
if (
(query as string).includes("status = 'failed'") &&
(query as string).includes('working_path')
) {
return Promise.resolve(createQueryResult([priorRun]));
}
if (
(query as string).includes("status = 'running'") &&
(query as string).includes('completed_at = NULL')
) {
return Promise.resolve(createQueryResult([resumedRun]));
}
if ((query as string).includes("status = 'running'")) {
return Promise.resolve(createQueryResult([]));
}
if ((query as string).includes('INSERT INTO remote_agent_workflow_runs')) {
return Promise.resolve(createQueryResult([resumedRun]));
}
if (
(query as string).includes('remote_agent_codebases') &&
(query as string).includes('WHERE id')
) {
return Promise.resolve(createQueryResult([]));
}
if ((query as string).includes('INSERT INTO remote_agent_workflow_events')) {
return Promise.resolve(createQueryResult([]));
}
return Promise.resolve(createQueryResult([]));
});
await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
twoStepWorkflow,
'User message',
'db-conv-id'
);
// No INSERT — resume used existing run
const insertCalls = mockQuery.mock.calls.filter((call: unknown[]) =>
(call[0] as string).includes('INSERT INTO remote_agent_workflow_runs')
);
expect(insertCalls.length).toBe(0);
// UPDATE status='running' was called (resumeWorkflowRun)
const resumeCalls = mockQuery.mock.calls.filter(
(call: unknown[]) =>
(call[0] as string).includes("status = 'running'") &&
(call[0] as string).includes('completed_at = NULL')
);
expect(resumeCalls.length).toBe(1);
// Resume message was sent to user with correct step numbers
const sendMessageCalls = (mockPlatform.sendMessage as ReturnType<typeof mock>).mock.calls;
const resumeMessages = sendMessageCalls.filter(
(call: unknown[]) =>
typeof call[1] === 'string' && (call[1] as string).includes('▶️ **Resuming**')
);
expect(resumeMessages.length).toBe(1);
const resumeMsg = resumeMessages[0][1] as string;
expect(resumeMsg).toContain('from step 2'); // resumeFromStepIndex=1 → step 2
expect(resumeMsg).toContain('skipping 1 already-completed step(s)');
expect(resumeMsg).toContain('session context from prior steps is not restored');
// step_skipped_prior_success event was emitted for step 0
const skipEventCalls = mockQuery.mock.calls.filter(
(call: unknown[]) =>
(call[0] as string).includes('INSERT INTO remote_agent_workflow_events') &&
JSON.stringify(call[1]).includes('step_skipped_prior_success')
);
expect(skipEventCalls.length).toBe(1);
// Restore default mock after test
mockQuery.mockImplementation(defaultMockQuery);
});
it('creates a fresh run when prior failed run has current_step_index=0', async () => {
// A run that failed on step 0 (nothing completed) — not worth resuming
const priorRun = {
id: 'prior-run-id',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
parent_conversation_id: null,
codebase_id: null,
current_step_index: 0, // no steps completed
status: 'failed' as const,
user_message: 'original message',
metadata: {},
started_at: new Date(),
completed_at: new Date(),
last_activity_at: new Date(),
working_path: testDir,
};
mockQuery.mockImplementation((query: string) => {
if (
(query as string).includes("status = 'failed'") &&
(query as string).includes('working_path')
) {
return Promise.resolve(createQueryResult([priorRun]));
}
if ((query as string).includes("status = 'running'")) {
return Promise.resolve(createQueryResult([]));
}
if ((query as string).includes('INSERT INTO remote_agent_workflow_runs')) {
return Promise.resolve(
createQueryResult([
{
id: 'new-run-id',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
parent_conversation_id: null,
codebase_id: null,
current_step_index: 0,
status: 'running' as const,
user_message: 'User message',
metadata: {},
started_at: new Date(),
completed_at: null,
last_activity_at: null,
working_path: testDir,
},
])
);
}
if (
(query as string).includes('remote_agent_codebases') &&
(query as string).includes('WHERE id')
) {
return Promise.resolve(createQueryResult([]));
}
if ((query as string).includes('INSERT INTO remote_agent_workflow_events')) {
return Promise.resolve(createQueryResult([]));
}
return Promise.resolve(createQueryResult([]));
});
await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
twoStepWorkflow,
'User message',
'db-conv-id'
);
// Fresh INSERT was called (current_step_index=0 → no steps to skip → fresh run)
const insertCalls = mockQuery.mock.calls.filter((call: unknown[]) =>
(call[0] as string).includes('INSERT INTO remote_agent_workflow_runs')
);
expect(insertCalls.length).toBe(1);
// Restore default mock after test
mockQuery.mockImplementation(defaultMockQuery);
});
it('fails workflow when resume activation (resumeWorkflowRun) throws', async () => {
const priorRun = {
id: 'prior-run-id',
workflow_name: 'test-workflow',
conversation_id: 'conv-123',
parent_conversation_id: null,
codebase_id: null,
current_step_index: 1, // step 0 completed — activation will be attempted
status: 'failed' as const,
user_message: 'original message',
metadata: {},
started_at: new Date(),
completed_at: new Date(),
last_activity_at: new Date(),
working_path: testDir,
};
mockQuery.mockImplementation((query: string) => {
if (
(query as string).includes("status = 'failed'") &&
(query as string).includes('working_path')
) {
return Promise.resolve(createQueryResult([priorRun]));
}
if (
(query as string).includes("status = 'running'") &&
(query as string).includes('completed_at = NULL')
) {
return Promise.reject(new Error('Database connection lost'));
}
if ((query as string).includes("status = 'running'")) {
return Promise.resolve(createQueryResult([]));
}
if ((query as string).includes('INSERT INTO remote_agent_workflow_events')) {
return Promise.resolve(createQueryResult([]));
}
return Promise.resolve(createQueryResult([]));
});
const result = await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
twoStepWorkflow,
'User message',
'db-conv-id'
);
expect(result.success).toBe(false);
expect(result.error).toBe('Database error resuming workflow run');
// Error message sent to user
const sendCalls = (mockPlatform.sendMessage as ReturnType<typeof mock>).mock.calls;
const errorMessages = sendCalls.filter(
(call: unknown[]) =>
typeof call[1] === 'string' && (call[1] as string).includes('could not activate it')
);
expect(errorMessages.length).toBeGreaterThan(0);
// No new run was created
const insertCalls = mockQuery.mock.calls.filter((call: unknown[]) =>
(call[0] as string).includes('INSERT INTO remote_agent_workflow_runs')
);
expect(insertCalls.length).toBe(0);
// Restore default mock after test
mockQuery.mockImplementation(defaultMockQuery);
});
it('falls through to fresh run when findResumableRun throws (non-critical)', async () => {
mockQuery.mockImplementation((query: string) => {
if (
(query as string).includes("status = 'failed'") &&
(query as string).includes('working_path')
) {
return Promise.reject(new Error('DB timeout'));
}
return defaultMockQuery(query);
});
await executeWorkflow(
mockPlatform,
'conv-123',
testDir,
twoStepWorkflow,
'User message',
'db-conv-id'
);
// Fresh run was created (not blocked by resume check failure)
const insertCalls = mockQuery.mock.calls.filter((call: unknown[]) =>
(call[0] as string).includes('INSERT INTO remote_agent_workflow_runs')
);
expect(insertCalls.length).toBe(1);
// Warning message was sent so the user knows resume was skipped
const sendCalls = (mockPlatform.sendMessage as ReturnType<typeof mock>).mock.calls;
const warnMessages = sendCalls.filter(
(call: unknown[]) =>
typeof call[1] === 'string' &&
(call[1] as string).includes('Could not check for a prior run to resume')
);
expect(warnMessages.length).toBeGreaterThan(0);
// Restore default mock after test
mockQuery.mockImplementation(defaultMockQuery);
});
});
});

View file

@ -1078,7 +1078,44 @@ async function executeLoopWorkflow(
let currentSessionId: string | undefined;
let metadataTrackingFailed = false;
for (let i = 1; i <= loop.max_iterations; i++) {
// Resume: current_step_index is written at the START of each iteration, so resuming from it
// re-runs the last recorded iteration (which may have failed mid-way).
const startIteration = workflowRun.current_step_index > 0 ? workflowRun.current_step_index : 1;
const isResume = workflowRun.current_step_index > 0;
// Guard: stored iteration exceeds current max (e.g. YAML max_iterations reduced between runs)
if (startIteration > loop.max_iterations) {
getLog().warn(
{ workflowRunId: workflowRun.id, startIteration, maxIterations: loop.max_iterations },
'loop_resume_index_exceeds_max_iterations'
);
await sendCriticalMessage(
platform,
conversationId,
`❌ **Cannot resume** \`${workflow.name}\`: prior run reached iteration ${String(startIteration)} but current \`max_iterations\` is ${String(loop.max_iterations)}. Increase \`max_iterations\` or start a fresh run.`,
workflowContext
);
await workflowDb.failWorkflowRun(
workflowRun.id,
`Resume aborted: prior iteration ${String(startIteration)} exceeds current max_iterations ${String(loop.max_iterations)}`
);
return;
}
if (isResume) {
getLog().info(
{ workflowRunId: workflowRun.id, startIteration, maxIterations: loop.max_iterations },
'loop_workflow_resuming'
);
await safeSendMessage(
platform,
conversationId,
`▶️ **Resuming** \`${workflow.name}\` from iteration ${String(startIteration)} — skipping ${String(startIteration - 1)} already-completed iteration(s).\n\nNote: AI session context from prior iterations is not restored.`,
workflowContext
);
}
for (let i = startIteration; i <= loop.max_iterations; i++) {
// Update metadata with current iteration (non-critical - log but don't fail on db error)
try {
await workflowDb.updateWorkflowRun(workflowRun.id, {
@ -1126,8 +1163,9 @@ async function executeLoopWorkflow(
data: { iteration: i, maxIterations: loop.max_iterations },
});
// Determine session handling
const needsFreshSession = loop.fresh_context === true || i === 1;
// Determine session handling — treat the first executed iteration as a session start,
// whether that is iteration 1 (fresh run) or a later iteration (resume).
const needsFreshSession = loop.fresh_context === true || i === startIteration;
const resumeSessionId = needsFreshSession ? undefined : currentSessionId;
if (needsFreshSession && i > 1) {
@ -1561,28 +1599,82 @@ export async function executeWorkflow(
}
}
// Create workflow run record
let workflowRun;
// Resume detection: check for prior failed run on same workflow + worktree
let resumeFromStepIndex: number | undefined;
let workflowRun: WorkflowRun | undefined;
// Step 1: Find prior failed run — non-critical, fall through on DB error
let resumableRun: Awaited<ReturnType<typeof workflowDb.findResumableRun>> = null;
try {
workflowRun = await workflowDb.createWorkflowRun({
workflow_name: workflow.name,
conversation_id: conversationDbId,
codebase_id: codebaseId,
user_message: userMessage,
metadata: issueContext ? { github_context: issueContext } : {},
});
resumableRun = await workflowDb.findResumableRun(workflow.name, cwd, conversationDbId);
} catch (error) {
const err = error as Error;
getLog().error(
{ err, workflowName: workflow.name, conversationId },
'db_create_workflow_run_failed'
);
await sendCriticalMessage(
getLog().warn({ err, workflowName: workflow.name, cwd }, 'workflow_resume_check_failed');
// Non-critical: fall through to create a new run; notify user so they know resume was skipped
// (workflowName is already captured in the warn log above for correlation)
await safeSendMessage(
platform,
conversationId,
'❌ **Workflow failed**: Unable to start workflow (database error). Please try again later.'
'⚠️ Could not check for a prior run to resume (database error). Starting a fresh run instead.'
);
return { success: false, error: 'Database error creating workflow run' };
}
// Step 2: Activate the resume — propagate as error if this fails (resume detected but couldn't activate)
if (resumableRun && resumableRun.current_step_index > 0) {
try {
workflowRun = await workflowDb.resumeWorkflowRun(resumableRun.id);
resumeFromStepIndex = resumableRun.current_step_index;
getLog().info({ workflowRunId: workflowRun.id, resumeFromStepIndex }, 'workflow_resuming');
await safeSendMessage(
platform,
conversationId,
`▶️ **Resuming** \`${workflow.name}\` from step ${String(resumeFromStepIndex + 1)} — skipping ${String(resumeFromStepIndex)} already-completed step(s).\n\nNote: AI session context from prior steps is not restored. Steps that depend on prior context may need to re-read artifacts.`
);
} catch (error) {
const err = error as Error;
getLog().error(
{ err, workflowName: workflow.name, resumableRunId: resumableRun.id },
'workflow_resume_activate_failed'
);
await sendCriticalMessage(
platform,
conversationId,
'❌ **Workflow failed**: Found a prior run to resume but could not activate it (database error). Please try again later.'
);
return { success: false, error: 'Database error resuming workflow run' };
}
} else if (resumableRun) {
// Found a prior failed run but no steps completed (current_step_index=0) — not worth resuming
getLog().info(
{ workflowRunId: resumableRun.id, currentStepIndex: resumableRun.current_step_index },
'workflow_resume_skipped_no_completed_steps'
);
}
if (!workflowRun) {
// Create workflow run record
try {
workflowRun = await workflowDb.createWorkflowRun({
workflow_name: workflow.name,
conversation_id: conversationDbId,
codebase_id: codebaseId,
user_message: userMessage,
working_path: cwd,
metadata: issueContext ? { github_context: issueContext } : {},
});
} catch (error) {
const err = error as Error;
getLog().error(
{ err, workflowName: workflow.name, conversationId },
'db_create_workflow_run_failed'
);
await sendCriticalMessage(
platform,
conversationId,
'❌ **Workflow failed**: Unable to start workflow (database error). Please try again later.'
);
return { success: false, error: 'Database error creating workflow run' };
}
}
// Resolve external artifact and log directories
@ -1621,20 +1713,22 @@ export async function executeWorkflow(
data: { workflowName: workflow.name, totalSteps, isLoop },
});
// Set status to running now that execution has started
try {
await workflowDb.updateWorkflowRun(workflowRun.id, { status: 'running' });
} catch (dbError) {
getLog().error(
{ err: dbError as Error, workflowRunId: workflowRun.id },
'db_workflow_status_update_failed'
);
await sendCriticalMessage(
platform,
conversationId,
'Workflow blocked: Unable to update status. Please try again.'
);
return { success: false, error: 'Database error setting workflow to running' };
// Set status to running now that execution has started (skip for resumed runs — already running)
if (!resumeFromStepIndex) {
try {
await workflowDb.updateWorkflowRun(workflowRun.id, { status: 'running' });
} catch (dbError) {
getLog().error(
{ err: dbError as Error, workflowRunId: workflowRun.id },
'db_workflow_status_update_failed'
);
await sendCriticalMessage(
platform,
conversationId,
'Workflow blocked: Unable to update status. Please try again.'
);
return { success: false, error: 'Database error setting workflow to running' };
}
}
// Context for error logging
@ -1739,12 +1833,35 @@ export async function executeWorkflow(
}
let currentSessionId: string | undefined;
let stepNumber = 0; // For user-facing step count
// Start at the resume index so user-facing "Step X/N" reflects actual position
let stepNumber = resumeFromStepIndex ?? 0;
// Execute steps sequentially (for step-based workflows)
// After the loop check above, TypeScript knows workflow.steps exists
const steps = workflow.steps;
for (let i = 0; i < steps.length; i++) {
// Resume: skip steps that completed in a prior run
if (resumeFromStepIndex !== undefined && i < resumeFromStepIndex) {
const skippedStep = steps[i];
const skippedName = isParallelBlock(skippedStep)
? `parallel(${skippedStep.parallel.map(s => s.command).join(', ')})`
: skippedStep.command;
getLog().info(
{ workflowRunId: workflowRun.id, stepIndex: i, stepName: skippedName },
'workflow.step_skipped_prior_success'
);
// No emitter.emit here — skip events during resume have no in-process subscribers;
// the workflow progress card uses DB events for historical display only
void workflowEventDb.createWorkflowEvent({
workflow_run_id: workflowRun.id,
event_type: 'step_skipped_prior_success',
step_index: i,
step_name: skippedName,
data: { resumedFrom: resumeFromStepIndex },
});
continue;
}
const step = steps[i];
if (isParallelBlock(step)) {

View file

@ -117,6 +117,7 @@ export interface WorkflowRun {
started_at: Date;
completed_at: Date | null;
last_activity_at: Date | null; // For staleness detection
working_path: string | null; // cwd at run creation time; used for resume detection
}
/**