fix(ai-builder): Prevent duplicate workflow creation on parallel submits in instance AI (#28793)

This commit is contained in:
Albert Alises 2026-04-21 14:21:48 +02:00 committed by GitHub
parent 76358a60be
commit 782b2d18b2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 367 additions and 35 deletions

View file

@ -44,10 +44,8 @@ import type { BuilderWorkspace } from '../../workspace/builder-sandbox-factory';
import { readFileViaSandbox } from '../../workspace/sandbox-fs';
import { getWorkspaceRoot } from '../../workspace/sandbox-setup';
import { buildCredentialMap, type CredentialMap } from '../workflows/resolve-credentials';
import {
createSubmitWorkflowTool,
type SubmitWorkflowAttempt,
} from '../workflows/submit-workflow.tool';
import { createIdentityEnforcedSubmitWorkflowTool } from '../workflows/submit-workflow-identity';
import { type SubmitWorkflowAttempt } from '../workflows/submit-workflow.tool';
/** Trigger types that cannot be test-fired programmatically (need an external request). */
const UNTESTABLE_TRIGGERS = new Set([
@ -363,11 +361,12 @@ export async function startBuildWorkflowAgentTask(
}
const mainWorkflowPath = `${root}/src/workflow.ts`;
builderTools['submit-workflow'] = createSubmitWorkflowTool(
domainContext,
builderTools['submit-workflow'] = createIdentityEnforcedSubmitWorkflowTool({
context: domainContext,
workspace,
credMap,
async (attempt) => {
credentialMap: credMap,
root,
onAttempt: async (attempt) => {
submitAttempts.set(attempt.filePath, attempt);
submitAttemptHistory.push(attempt);
if (attempt.filePath !== mainWorkflowPath || !context.workflowTaskService) {
@ -385,7 +384,7 @@ export async function startBuildWorkflowAgentTask(
),
);
},
);
});
const tracedBuilderTools = traceSubAgentTools(
context,

View file

@ -0,0 +1,187 @@
import { wrapSubmitExecuteWithIdentity } from '../submit-workflow-identity';
import type { SubmitWorkflowInput, SubmitWorkflowOutput } from '../submit-workflow.tool';
const ROOT = '/home/daytona/workspace';
const MAIN_PATH = `${ROOT}/src/workflow.ts`;
const CHUNK_PATH = `${ROOT}/src/chunk.ts`;
function resolvePath(rawFilePath: string | undefined): string {
if (!rawFilePath) return MAIN_PATH;
if (rawFilePath.startsWith('/')) return rawFilePath;
return `${ROOT}/${rawFilePath}`;
}
/**
* Build a fake underlying `execute` that:
* - On a call without `workflowId`, returns a freshly-minted id (simulating create).
* - On a call with `workflowId`, returns that same id (simulating update).
* - Records every call it received.
* An optional `gate` promise lets tests hold dispatch mid-flight to exercise races.
*/
function makeUnderlying(opts: { idPrefix?: string; gate?: Promise<void> } = {}) {
const prefix = opts.idPrefix ?? 'wf';
let counter = 0;
const calls: SubmitWorkflowInput[] = [];
const execute = async (input: SubmitWorkflowInput): Promise<SubmitWorkflowOutput> => {
calls.push({ ...input });
if (opts.gate) await opts.gate;
if (input.workflowId) {
return { success: true, workflowId: input.workflowId };
}
counter += 1;
return { success: true, workflowId: `${prefix}_${counter}` };
};
return { execute, calls };
}
describe('wrapSubmitExecuteWithIdentity', () => {
it('parallel submits for the same filePath produce one create and N-1 updates sharing the workflowId', async () => {
let release: () => void = () => {};
const gate = new Promise<void>((res) => {
release = res;
});
const { execute, calls } = makeUnderlying({ gate });
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath);
const inFlight = Array.from({ length: 5 }, async () => await wrapped({}));
// Let the dispatcher land first, then release.
await Promise.resolve();
release();
const results = await Promise.all(inFlight);
const ids = results.map((r) => r.workflowId);
expect(new Set(ids).size).toBe(1);
expect(results.every((r) => r.success)).toBe(true);
const createCalls = calls.filter((c) => !c.workflowId);
const updateCalls = calls.filter((c) => c.workflowId);
expect(createCalls).toHaveLength(1);
expect(updateCalls).toHaveLength(4);
expect(updateCalls.every((c) => c.workflowId === ids[0])).toBe(true);
});
it('sequential submits for the same filePath reuse the bound workflowId', async () => {
const { execute, calls } = makeUnderlying();
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath);
const first = await wrapped({});
const second = await wrapped({});
expect(first.workflowId).toBe('wf_1');
expect(second.workflowId).toBe('wf_1');
expect(calls).toHaveLength(2);
expect(calls[0].workflowId).toBeUndefined();
expect(calls[1].workflowId).toBe('wf_1');
});
it('overrides an LLM-supplied workflowId once the wrapper has bound one', async () => {
const { execute, calls } = makeUnderlying();
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath);
const first = await wrapped({});
const second = await wrapped({ workflowId: 'llm_hallucinated_id' });
expect(first.workflowId).toBe('wf_1');
expect(second.workflowId).toBe('wf_1');
expect(calls[1].workflowId).toBe('wf_1');
});
it('different filePaths dispatch independently (chunk + main composition)', async () => {
const { execute, calls } = makeUnderlying();
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath);
const [mainResult, chunkResult] = await Promise.all([
wrapped({ filePath: MAIN_PATH }),
wrapped({ filePath: CHUNK_PATH }),
]);
expect(mainResult.workflowId).not.toBe(chunkResult.workflowId);
const createCalls = calls.filter((c) => !c.workflowId);
expect(createCalls).toHaveLength(2);
const mainAgain = await wrapped({ filePath: MAIN_PATH });
expect(mainAgain.workflowId).toBe(mainResult.workflowId);
const chunkAgain = await wrapped({ filePath: CHUNK_PATH });
expect(chunkAgain.workflowId).toBe(chunkResult.workflowId);
});
it('resolves differently-spelled paths to the same identity', async () => {
const { execute } = makeUnderlying();
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath);
const absolute = await wrapped({ filePath: MAIN_PATH });
const relative = await wrapped({ filePath: 'src/workflow.ts' });
const defaulted = await wrapped({});
expect(absolute.workflowId).toBe('wf_1');
expect(relative.workflowId).toBe('wf_1');
expect(defaulted.workflowId).toBe('wf_1');
});
it('clears the map when the first dispatch fails so subsequent calls can retry', async () => {
let call = 0;
const execute = async (input: SubmitWorkflowInput): Promise<SubmitWorkflowOutput> => {
await Promise.resolve();
call += 1;
if (call === 1) {
return { success: false, errors: ['transient failure'] };
}
if (input.workflowId) return { success: true, workflowId: input.workflowId };
return { success: true, workflowId: 'wf_recovered' };
};
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath);
const failed = await wrapped({});
expect(failed.success).toBe(false);
const retried = await wrapped({});
expect(retried.success).toBe(true);
expect(retried.workflowId).toBe('wf_recovered');
});
it('reports a failure to concurrent waiters when the dispatcher fails', async () => {
let release: () => void = () => {};
const gate = new Promise<void>((res) => {
release = res;
});
let call = 0;
const execute = async (): Promise<SubmitWorkflowOutput> => {
call += 1;
await gate;
if (call === 1) return { success: false, errors: ['create failed'] };
return { success: true, workflowId: 'wf_unused' };
};
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath);
const a = wrapped({});
const b = wrapped({});
await Promise.resolve();
release();
const [aResult, bResult] = await Promise.all([a, b]);
expect(aResult.success).toBe(false);
expect(bResult.success).toBe(false);
expect(bResult.errors?.[0]).toContain('Previous submit-workflow for this file failed');
});
it('propagates thrown errors from the dispatcher and clears the map', async () => {
let call = 0;
const execute = async (input: SubmitWorkflowInput): Promise<SubmitWorkflowOutput> => {
await Promise.resolve();
call += 1;
if (call === 1) throw new Error('boom');
if (input.workflowId) return { success: true, workflowId: input.workflowId };
return { success: true, workflowId: 'wf_after_throw' };
};
const wrapped = wrapSubmitExecuteWithIdentity(execute, resolvePath);
await expect(wrapped({})).rejects.toThrow('boom');
const retried = await wrapped({});
expect(retried.workflowId).toBe('wf_after_throw');
});
});

View file

@ -0,0 +1,130 @@
/**
* Identity-enforcing wrapper for the sandbox submit-workflow tool.
*
* The builder sub-agent can emit multiple parallel submit-workflow calls within
* a single assistant turn. When the LLM drops `workflowId` on calls 2..N, each
* call takes the create branch and persists a duplicate row with the same name.
*
* This wrapper keys identity per resolved `filePath`. The first call for a given
* path synchronously installs a deferred in the pending map before dispatching,
* so concurrent calls for the same path await the first result and inject the
* bound `workflowId` forcing the update branch.
*
* The map is scoped to the builder-task closure: it dies with the task. No
* cross-module coordinator, eviction hook, or TTL sweep is required.
*/
import { createTool } from '@mastra/core/tools';
import type { Workspace } from '@mastra/core/workspace';
import type { CredentialMap } from './resolve-credentials';
import {
createSubmitWorkflowTool,
resolveSandboxWorkflowFilePath,
submitWorkflowInputSchema,
submitWorkflowOutputSchema,
type SubmitWorkflowAttempt,
type SubmitWorkflowInput,
type SubmitWorkflowOutput,
} from './submit-workflow.tool';
import type { InstanceAiContext } from '../../types';
export type SubmitExecute = (input: SubmitWorkflowInput) => Promise<SubmitWorkflowOutput>;
/**
* Wrap a submit-workflow `execute` with per-filePath identity enforcement.
*
* - First call for a given resolved path dispatches and populates the map on success.
* - Concurrent calls for the same path await the first result and inject the bound id.
* - On dispatch failure, the map entry is cleared and waiters see a failure result.
*
* Exposed separately from the tool factory so it can be unit-tested without
* constructing a Mastra tool or a sandbox workspace.
*/
export function wrapSubmitExecuteWithIdentity(
underlying: SubmitExecute,
resolvePath: (rawFilePath: string | undefined) => string,
): SubmitExecute {
const pending = new Map<string, Promise<string>>();
return async (input) => {
const resolvedPath = resolvePath(input.filePath);
const existing = pending.get(resolvedPath);
if (existing) {
let boundId: string;
try {
boundId = await existing;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
return {
success: false,
errors: [`Previous submit-workflow for this file failed: ${message}`],
};
}
return await underlying({ ...input, workflowId: boundId });
}
let resolveFn: ((id: string) => void) | undefined;
let rejectFn: ((reason: unknown) => void) | undefined;
const promise = new Promise<string>((res, rej) => {
resolveFn = res;
rejectFn = rej;
});
// Swallow rejections on the stored promise so Node doesn't warn about
// unhandled rejections when no concurrent waiter happens to attach.
promise.catch(() => {});
pending.set(resolvedPath, promise);
try {
const result = await underlying(input);
if (result.success && typeof result.workflowId === 'string') {
resolveFn?.(result.workflowId);
} else {
rejectFn?.(new Error(result.errors?.join(' ') ?? 'submit-workflow failed'));
pending.delete(resolvedPath);
}
return result;
} catch (error) {
rejectFn?.(error);
pending.delete(resolvedPath);
throw error;
}
};
}
/**
* Build a submit-workflow Mastra tool wired with identity enforcement.
* Convenience factory used at the builder-agent callsite.
*/
export function createIdentityEnforcedSubmitWorkflowTool(args: {
context: InstanceAiContext;
workspace: Workspace;
credentialMap?: CredentialMap;
onAttempt: (attempt: SubmitWorkflowAttempt) => Promise<void> | void;
root: string;
}) {
const underlying = createSubmitWorkflowTool(
args.context,
args.workspace,
args.credentialMap,
args.onAttempt,
);
const underlyingExecute = underlying.execute as SubmitExecute | undefined;
if (!underlyingExecute) {
throw new Error('createSubmitWorkflowTool returned a tool without an execute handler');
}
const wrappedExecute = wrapSubmitExecuteWithIdentity(underlyingExecute, (rawFilePath) =>
resolveSandboxWorkflowFilePath(rawFilePath, args.root),
);
return createTool({
id: 'submit-workflow',
description: underlying.description ?? '',
inputSchema: submitWorkflowInputSchema,
outputSchema: submitWorkflowOutputSchema,
execute: wrappedExecute,
});
}

View file

@ -148,6 +148,45 @@ export const submitWorkflowInputSchema = z.object({
name: z.string().optional().describe('Workflow name (required for new workflows)'),
});
export const submitWorkflowOutputSchema = z.object({
success: z.boolean(),
workflowId: z.string().optional(),
workflowName: z.string().optional(),
/** Node names whose credentials were mocked via pinned data. */
mockedNodeNames: z.array(z.string()).optional(),
/** Credential types that were mocked (not resolved to real credentials). */
mockedCredentialTypes: z.array(z.string()).optional(),
/** Map of node name → credential types that were mocked on that node. */
mockedCredentialsByNode: z.record(z.array(z.string())).optional(),
/** Verification-only pin data — scoped to this build, never persisted to workflow. */
verificationPinData: z.record(z.array(z.record(z.unknown()))).optional(),
errors: z.array(z.string()).optional(),
warnings: z.array(z.string()).optional(),
});
export type SubmitWorkflowInput = z.infer<typeof submitWorkflowInputSchema>;
export type SubmitWorkflowOutput = z.infer<typeof submitWorkflowOutputSchema>;
/**
* Resolve a raw `filePath` tool argument into an absolute path under the sandbox root.
* Exported so identity wrappers can key state by the same resolved path the tool uses.
*/
export function resolveSandboxWorkflowFilePath(
rawFilePath: string | undefined,
root: string,
): string {
if (!rawFilePath) {
return `${root}/src/workflow.ts`;
}
if (rawFilePath.startsWith('~/')) {
return `${root.replace(/\/workspace$/, '')}/${rawFilePath.slice(2)}`;
}
if (!rawFilePath.startsWith('/')) {
return `${root}/${rawFilePath}`;
}
return rawFilePath;
}
export function createSubmitWorkflowTool(
context: InstanceAiContext,
workspace: Workspace,
@ -161,38 +200,15 @@ export function createSubmitWorkflowTool(
'and saves it to n8n as a draft. The workflow must be explicitly published via ' +
'publish-workflow before it will run on its triggers in production.',
inputSchema: submitWorkflowInputSchema,
outputSchema: z.object({
success: z.boolean(),
workflowId: z.string().optional(),
/** Node names whose credentials were mocked via pinned data. */
mockedNodeNames: z.array(z.string()).optional(),
/** Credential types that were mocked (not resolved to real credentials). */
mockedCredentialTypes: z.array(z.string()).optional(),
/** Map of node name → credential types that were mocked on that node. */
mockedCredentialsByNode: z.record(z.array(z.string())).optional(),
/** Verification-only pin data — scoped to this build, never persisted to workflow. */
verificationPinData: z.record(z.array(z.record(z.unknown()))).optional(),
errors: z.array(z.string()).optional(),
warnings: z.array(z.string()).optional(),
}),
outputSchema: submitWorkflowOutputSchema,
execute: async ({
filePath: rawFilePath,
workflowId,
projectId,
name,
}: z.infer<typeof submitWorkflowInputSchema>) => {
// Resolve file path: relative paths resolve against workspace root, ~ is expanded
}: SubmitWorkflowInput) => {
const root = await getWorkspaceRoot(workspace);
let filePath: string;
if (!rawFilePath) {
filePath = `${root}/src/workflow.ts`;
} else if (rawFilePath.startsWith('~/')) {
filePath = `${root.replace(/\/workspace$/, '')}/${rawFilePath.slice(2)}`;
} else if (!rawFilePath.startsWith('/')) {
filePath = `${root}/${rawFilePath}`;
} else {
filePath = rawFilePath;
}
const filePath = resolveSandboxWorkflowFilePath(rawFilePath, root);
const sourceHash = hashContent(await readFileViaSandbox(workspace, filePath));
const reportAttempt = async (