feat(core): Broadcast workflow updates from MCP tools to open editors (#28709)

This commit is contained in:
Daria 2026-04-21 14:26:56 +02:00 committed by GitHub
parent 8e49800421
commit b1ca129496
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 232 additions and 5 deletions

View file

@ -274,6 +274,18 @@ export class CollaborationService {
return await this.state.getWriteLock(workflowId);
}
/**
* Throws if any user currently holds the write lock for the given workflow.
*/
async ensureWorkflowEditable(workflowId: Workflow['id']): Promise<void> {
const lock = await this.state.getWriteLock(workflowId);
if (lock) {
throw new LockedError(
'Cannot modify workflow while it is being edited by a user in the editor.',
);
}
}
/**
* Validates that if a write lock exists for a workflow, the requesting client holds it.
* Throws ConflictError (409) if same user but different tab holds the lock.

View file

@ -1,9 +1,11 @@
import { mockInstance } from '@n8n/backend-test-utils';
import { User } from '@n8n/db';
import { User, WorkflowEntity } from '@n8n/db';
import { createArchiveWorkflowTool } from '../tools/workflow-builder/delete-workflow.tool';
import { CollaborationService } from '@/collaboration/collaboration.service';
import { Telemetry } from '@/telemetry';
import { WorkflowFinderService } from '@/workflows/workflow-finder.service';
import { WorkflowService } from '@/workflows/workflow.service';
jest.mock('@n8n/ai-workflow-builder', () => ({
@ -25,19 +27,44 @@ const parseResult = (result: { content: Array<{ type: string; text?: string }> }
describe('archive-workflow MCP tool', () => {
const user = Object.assign(new User(), { id: 'user-1' });
let workflowFinderService: WorkflowFinderService;
let workflowService: WorkflowService;
let telemetry: Telemetry;
let collaborationService: CollaborationService;
const mockExistingWorkflow = Object.assign(new WorkflowEntity(), {
id: 'wf-1',
name: 'My Workflow',
nodes: [],
connections: {},
isArchived: false,
settings: { availableInMCP: true },
});
beforeEach(() => {
jest.clearAllMocks();
workflowFinderService = mockInstance(WorkflowFinderService, {
findWorkflowForUser: jest.fn().mockResolvedValue(mockExistingWorkflow),
});
workflowService = mockInstance(WorkflowService);
telemetry = mockInstance(Telemetry, {
track: jest.fn(),
});
collaborationService = mockInstance(CollaborationService, {
ensureWorkflowEditable: jest.fn().mockResolvedValue(undefined),
broadcastWorkflowUpdate: jest.fn().mockResolvedValue(undefined),
});
});
const createTool = () => createArchiveWorkflowTool(user, workflowService, telemetry);
const createTool = () =>
createArchiveWorkflowTool(
user,
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
describe('smoke tests', () => {
test('creates tool with correct name and destructiveHint=true', () => {
@ -72,10 +99,43 @@ describe('archive-workflow MCP tool', () => {
expect(response.workflowId).toBe('wf-1');
expect(response.name).toBe('My Workflow');
expect(result.isError).toBeUndefined();
expect(collaborationService.broadcastWorkflowUpdate).toHaveBeenCalledWith('wf-1', user.id);
});
test('returns error when workflow has active write lock', async () => {
(collaborationService.ensureWorkflowEditable as jest.Mock).mockRejectedValue(
new Error('Cannot modify workflow while it is being edited by a user in the editor.'),
);
const tool = createTool();
const result = await tool.handler({ workflowId: 'wf-1' }, {} as never);
const response = parseResult(result);
expect(result.isError).toBe(true);
expect(response.error).toContain('being edited by a user');
expect(workflowService.archive).not.toHaveBeenCalled();
});
test('succeeds even when broadcastWorkflowUpdate rejects', async () => {
(workflowService.archive as jest.Mock).mockResolvedValue({
id: 'wf-1',
name: 'My Workflow',
});
(collaborationService.broadcastWorkflowUpdate as jest.Mock).mockRejectedValue(
new Error('Cache unavailable'),
);
const tool = createTool();
const result = await tool.handler({ workflowId: 'wf-1' }, {} as never);
const response = parseResult(result);
expect(response.archived).toBe(true);
expect(result.isError).toBeUndefined();
});
test('returns error when workflow not found or no permission to archive', async () => {
(workflowService.archive as jest.Mock).mockResolvedValue(null);
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(null);
const tool = createTool();
const result = await tool.handler({ workflowId: 'wf-missing' }, {} as never);
@ -83,7 +143,7 @@ describe('archive-workflow MCP tool', () => {
const response = parseResult(result);
expect(result.isError).toBe(true);
expect(response.error).toContain('not found or');
expect(response.error).toContain('permission to archive');
expect(response.error).toContain('permission to access');
});
test('returns error when service throws', async () => {

View file

@ -16,6 +16,7 @@ import { McpService } from '../mcp.service';
import { WorkflowBuilderToolsService } from '../tools/workflow-builder/workflow-builder-tools.service';
import { ActiveExecutions } from '@/active-executions';
import { CollaborationService } from '@/collaboration/collaboration.service';
import { CredentialsService } from '@/credentials/credentials.service';
import { DataTableProxyService } from '@/modules/data-table/data-table-proxy.service';
import { NodeTypes } from '@/node-types';
@ -71,6 +72,7 @@ describe('McpService', () => {
mockInstance(ExecutionRepository),
mockInstance(ExecutionService),
mockInstance(DataTableProxyService),
mockInstance(CollaborationService),
);
});
@ -110,6 +112,7 @@ describe('McpService', () => {
mockInstance(ExecutionRepository),
mockInstance(ExecutionService),
mockInstance(DataTableProxyService),
mockInstance(CollaborationService),
);
expect(queueMcpService.isQueueMode).toBe(true);
@ -314,6 +317,7 @@ describe('McpService', () => {
mockInstance(ExecutionRepository),
mockInstance(ExecutionService),
mockInstance(DataTableProxyService),
mockInstance(CollaborationService),
);
const server = await service.getServer(user);
@ -355,6 +359,7 @@ describe('McpService', () => {
mockInstance(ExecutionRepository),
mockInstance(ExecutionService),
mockInstance(DataTableProxyService),
mockInstance(CollaborationService),
);
const server = await service.getServer(user);

View file

@ -5,6 +5,7 @@ import { v4 as uuid } from 'uuid';
import { createWorkflow } from './mock.utils';
import { createPublishWorkflowTool } from '../tools/publish-workflow.tool';
import { CollaborationService } from '@/collaboration/collaboration.service';
import { Telemetry } from '@/telemetry';
import { WorkflowFinderService } from '@/workflows/workflow-finder.service';
import { WorkflowService } from '@/workflows/workflow.service';
@ -14,6 +15,7 @@ describe('publish-workflow MCP tool', () => {
let workflowFinderService: WorkflowFinderService;
let workflowService: WorkflowService;
let telemetry: Telemetry;
let collaborationService: CollaborationService;
beforeEach(() => {
workflowFinderService = mockInstance(WorkflowFinderService);
@ -21,6 +23,10 @@ describe('publish-workflow MCP tool', () => {
telemetry = mockInstance(Telemetry, {
track: jest.fn(),
});
collaborationService = mockInstance(CollaborationService, {
ensureWorkflowEditable: jest.fn().mockResolvedValue(undefined),
broadcastWorkflowUpdate: jest.fn().mockResolvedValue(undefined),
});
});
describe('smoke tests', () => {
@ -30,6 +36,7 @@ describe('publish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
expect(tool.name).toBe('publish_workflow');
@ -52,6 +59,7 @@ describe('publish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
const result = await tool.handler(
@ -68,6 +76,35 @@ describe('publish-workflow MCP tool', () => {
});
});
describe('write lock', () => {
test('returns error when workflow has active write lock', async () => {
const workflow = createWorkflow({ settings: { availableInMCP: true } });
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(collaborationService.ensureWorkflowEditable as jest.Mock).mockRejectedValue(
new Error('Cannot modify workflow while it is being edited by a user in the editor.'),
);
const tool = createPublishWorkflowTool(
user,
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
const result = await tool.handler(
{ workflowId: 'wf-1', versionId: undefined },
{} as Parameters<typeof tool.handler>[1],
);
expect(result.structuredContent).toMatchObject({
success: false,
error: expect.stringContaining('being edited by a user'),
});
expect(workflowService.activateWorkflow).not.toHaveBeenCalled();
});
});
describe('successful publish', () => {
test('publishes workflow successfully', async () => {
const workflow = createWorkflow({ settings: { availableInMCP: true } });
@ -82,6 +119,7 @@ describe('publish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
const result = await tool.handler(
@ -99,6 +137,8 @@ describe('publish-workflow MCP tool', () => {
versionId: undefined,
source: 'n8n-mcp',
});
expect(collaborationService.broadcastWorkflowUpdate).toHaveBeenCalledWith('wf-1', user.id);
});
test('publishes specific version when versionId provided', async () => {
@ -114,6 +154,7 @@ describe('publish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
const result = await tool.handler(
@ -148,6 +189,7 @@ describe('publish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
await tool.handler(
@ -180,6 +222,7 @@ describe('publish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
await tool.handler(
@ -216,6 +259,7 @@ describe('publish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
const result = await tool.handler(

View file

@ -1,6 +1,7 @@
import { mockInstance } from '@n8n/backend-test-utils';
import { User } from '@n8n/db';
import { CollaborationService } from '@/collaboration/collaboration.service';
import { Telemetry } from '@/telemetry';
import { WorkflowFinderService } from '@/workflows/workflow-finder.service';
import { WorkflowService } from '@/workflows/workflow.service';
@ -13,6 +14,7 @@ describe('unpublish-workflow MCP tool', () => {
let workflowFinderService: WorkflowFinderService;
let workflowService: WorkflowService;
let telemetry: Telemetry;
let collaborationService: CollaborationService;
beforeEach(() => {
workflowFinderService = mockInstance(WorkflowFinderService);
@ -20,6 +22,10 @@ describe('unpublish-workflow MCP tool', () => {
telemetry = mockInstance(Telemetry, {
track: jest.fn(),
});
collaborationService = mockInstance(CollaborationService, {
ensureWorkflowEditable: jest.fn().mockResolvedValue(undefined),
broadcastWorkflowUpdate: jest.fn().mockResolvedValue(undefined),
});
});
describe('smoke tests', () => {
@ -29,6 +35,7 @@ describe('unpublish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
expect(tool.name).toBe('unpublish_workflow');
@ -51,6 +58,7 @@ describe('unpublish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
const result = await tool.handler(
@ -66,6 +74,35 @@ describe('unpublish-workflow MCP tool', () => {
});
});
describe('write lock', () => {
test('returns error when workflow has active write lock', async () => {
const workflow = createWorkflow({ settings: { availableInMCP: true } });
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(collaborationService.ensureWorkflowEditable as jest.Mock).mockRejectedValue(
new Error('Cannot modify workflow while it is being edited by a user in the editor.'),
);
const tool = createUnpublishWorkflowTool(
user,
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
const result = await tool.handler(
{ workflowId: 'wf-1' },
{} as Parameters<typeof tool.handler>[1],
);
expect(result.structuredContent).toMatchObject({
success: false,
error: expect.stringContaining('being edited by a user'),
});
expect(workflowService.deactivateWorkflow).not.toHaveBeenCalled();
});
});
describe('successful unpublish', () => {
test('unpublishes workflow successfully', async () => {
const workflow = createWorkflow({ settings: { availableInMCP: true } });
@ -79,6 +116,7 @@ describe('unpublish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
const result = await tool.handler(
@ -94,6 +132,8 @@ describe('unpublish-workflow MCP tool', () => {
expect(workflowService.deactivateWorkflow).toHaveBeenCalledWith(user, 'wf-1', {
source: 'n8n-mcp',
});
expect(collaborationService.broadcastWorkflowUpdate).toHaveBeenCalledWith('wf-1', user.id);
});
});
@ -110,6 +150,7 @@ describe('unpublish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
await tool.handler({ workflowId: 'wf-1' }, {} as Parameters<typeof tool.handler>[1]);
@ -138,6 +179,7 @@ describe('unpublish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
await tool.handler(
@ -174,6 +216,7 @@ describe('unpublish-workflow MCP tool', () => {
workflowFinderService,
workflowService,
telemetry,
collaborationService,
);
const result = await tool.handler(

View file

@ -4,6 +4,7 @@ import type { INode } from 'n8n-workflow';
import { createUpdateWorkflowTool } from '../tools/workflow-builder/update-workflow.tool';
import { CollaborationService } from '@/collaboration/collaboration.service';
import { CredentialsService } from '@/credentials/credentials.service';
import { NodeTypes } from '@/node-types';
import { UrlService } from '@/services/url.service';
@ -87,6 +88,7 @@ describe('update-workflow MCP tool', () => {
let credentialsService: CredentialsService;
let sharedWorkflowRepository: SharedWorkflowRepository;
let nodeTypes: ReturnType<typeof mockInstance<NodeTypes>>;
let collaborationService: CollaborationService;
const mockExistingWorkflow = Object.assign(new WorkflowEntity(), {
id: 'wf-1',
@ -121,6 +123,10 @@ describe('update-workflow MCP tool', () => {
findOneOrFail: jest.fn().mockResolvedValue({ projectId: 'project-1' }),
});
nodeTypes = mockInstance(NodeTypes);
collaborationService = mockInstance(CollaborationService, {
ensureWorkflowEditable: jest.fn().mockResolvedValue(undefined),
broadcastWorkflowUpdate: jest.fn().mockResolvedValue(undefined),
});
mockParseAndValidate.mockImplementation(async () => ({
workflow: { ...mockWorkflowJson, nodes: mockNodes.map((n) => ({ ...n })) },
@ -139,6 +145,7 @@ describe('update-workflow MCP tool', () => {
nodeTypes,
credentialsService,
sharedWorkflowRepository,
collaborationService,
);
// Helper to call handler with proper typing (optional fields default to undefined)
@ -182,6 +189,19 @@ describe('update-workflow MCP tool', () => {
});
describe('handler tests', () => {
test('returns error when workflow has active write lock', async () => {
(collaborationService.ensureWorkflowEditable as jest.Mock).mockRejectedValue(
new Error('Cannot modify workflow while it is being edited by a user in the editor.'),
);
const result = await callHandler({ workflowId: 'wf-1', code: 'const wf = ...' });
const response = parseResult(result);
expect(result.isError).toBe(true);
expect(response.error).toContain('being edited by a user');
expect(workflowService.update).not.toHaveBeenCalled();
});
test('successfully updates workflow and returns expected response', async () => {
const result = await callHandler({ workflowId: 'wf-1', code: 'const wf = ...' });
@ -192,6 +212,8 @@ describe('update-workflow MCP tool', () => {
expect(response.url).toBe('https://n8n.example.com/workflow/wf-1');
expect(response.autoAssignedCredentials).toEqual([]);
expect(result.isError).toBeUndefined();
expect(collaborationService.broadcastWorkflowUpdate).toHaveBeenCalledWith('wf-1', user.id);
});
test('sets correct workflow entity defaults', async () => {

View file

@ -47,6 +47,7 @@ import { createValidateWorkflowCodeTool } from './tools/workflow-builder/validat
import { WorkflowBuilderToolsService } from './tools/workflow-builder/workflow-builder-tools.service';
import { ActiveExecutions } from '@/active-executions';
import { CollaborationService } from '@/collaboration/collaboration.service';
import { CredentialsService } from '@/credentials/credentials.service';
import { DataTableProxyService } from '@/modules/data-table/data-table-proxy.service';
import { NodeTypes } from '@/node-types';
@ -102,6 +103,7 @@ export class McpService {
private readonly executionRepository: ExecutionRepository,
private readonly executionService: ExecutionService,
private readonly dataTableProxyService: DataTableProxyService,
private readonly collaborationService: CollaborationService,
) {}
async getServer(user: User) {
@ -174,6 +176,7 @@ export class McpService {
this.workflowFinderService,
this.workflowService,
this.telemetry,
this.collaborationService,
);
server.registerTool(
publishWorkflowTool.name,
@ -186,6 +189,7 @@ export class McpService {
this.workflowFinderService,
this.workflowService,
this.telemetry,
this.collaborationService,
);
server.registerTool(
unpublishWorkflowTool.name,
@ -352,7 +356,13 @@ export class McpService {
searchFoldersTool.handler,
);
const archiveTool = createArchiveWorkflowTool(user, this.workflowService, this.telemetry);
const archiveTool = createArchiveWorkflowTool(
user,
this.workflowFinderService,
this.workflowService,
this.telemetry,
this.collaborationService,
);
server.registerTool(archiveTool.name, archiveTool.config, archiveTool.handler);
const updateTool = createUpdateWorkflowTool(
@ -364,6 +374,7 @@ export class McpService {
this.nodeTypes,
this.credentialsService,
this.sharedWorkflowRepository,
this.collaborationService,
);
server.registerTool(updateTool.name, updateTool.config, updateTool.handler);

View file

@ -7,6 +7,7 @@ import { WorkflowAccessError } from '../mcp.errors';
import type { ToolDefinition, UserCalledMCPToolEventPayload } from '../mcp.types';
import { getMcpWorkflow } from './workflow-validation.utils';
import type { CollaborationService } from '@/collaboration/collaboration.service';
import type { Telemetry } from '@/telemetry';
import type { WorkflowFinderService } from '@/workflows/workflow-finder.service';
import type { WorkflowService } from '@/workflows/workflow.service';
@ -40,6 +41,7 @@ export const createPublishWorkflowTool = (
workflowFinderService: WorkflowFinderService,
workflowService: WorkflowService,
telemetry: Telemetry,
collaborationService: CollaborationService,
): ToolDefinition<typeof inputSchema.shape> => ({
name: 'publish_workflow',
config: {
@ -65,11 +67,15 @@ export const createPublishWorkflowTool = (
try {
await getMcpWorkflow(workflowId, user, ['workflow:publish'], workflowFinderService);
await collaborationService.ensureWorkflowEditable(workflowId);
const activatedWorkflow = await workflowService.activateWorkflow(user, workflowId, {
versionId,
source: 'n8n-mcp',
});
void collaborationService.broadcastWorkflowUpdate(workflowId, user.id).catch(() => {});
const output: PublishWorkflowOutput = {
success: true,
workflowId: activatedWorkflow.id,

View file

@ -7,6 +7,7 @@ import { WorkflowAccessError } from '../mcp.errors';
import type { ToolDefinition, UserCalledMCPToolEventPayload } from '../mcp.types';
import { getMcpWorkflow } from './workflow-validation.utils';
import type { CollaborationService } from '@/collaboration/collaboration.service';
import type { Telemetry } from '@/telemetry';
import type { WorkflowFinderService } from '@/workflows/workflow-finder.service';
import type { WorkflowService } from '@/workflows/workflow.service';
@ -32,6 +33,7 @@ export const createUnpublishWorkflowTool = (
workflowFinderService: WorkflowFinderService,
workflowService: WorkflowService,
telemetry: Telemetry,
collaborationService: CollaborationService,
): ToolDefinition<typeof inputSchema.shape> => ({
name: 'unpublish_workflow',
config: {
@ -57,10 +59,14 @@ export const createUnpublishWorkflowTool = (
try {
await getMcpWorkflow(workflowId, user, ['workflow:unpublish'], workflowFinderService);
await collaborationService.ensureWorkflowEditable(workflowId);
await workflowService.deactivateWorkflow(user, workflowId, {
source: 'n8n-mcp',
});
void collaborationService.broadcastWorkflowUpdate(workflowId, user.id).catch(() => {});
const output: UnpublishWorkflowOutput = {
success: true,
workflowId,

View file

@ -5,9 +5,13 @@ import { USER_CALLED_MCP_TOOL_EVENT } from '../../mcp.constants';
import type { ToolDefinition, UserCalledMCPToolEventPayload } from '../../mcp.types';
import { MCP_ARCHIVE_WORKFLOW_TOOL } from './constants';
import type { CollaborationService } from '@/collaboration/collaboration.service';
import type { Telemetry } from '@/telemetry';
import type { WorkflowFinderService } from '@/workflows/workflow-finder.service';
import type { WorkflowService } from '@/workflows/workflow.service';
import { getMcpWorkflow } from '../workflow-validation.utils';
const inputSchema = {
workflowId: z.string().describe('The ID of the workflow to archive'),
} satisfies z.ZodRawShape;
@ -23,8 +27,10 @@ const outputSchema = {
*/
export const createArchiveWorkflowTool = (
user: User,
workflowFinderService: WorkflowFinderService,
workflowService: WorkflowService,
telemetry: Telemetry,
collaborationService: CollaborationService,
): ToolDefinition<typeof inputSchema> => ({
name: MCP_ARCHIVE_WORKFLOW_TOOL.toolName,
config: {
@ -47,12 +53,18 @@ export const createArchiveWorkflowTool = (
};
try {
await getMcpWorkflow(workflowId, user, ['workflow:delete'], workflowFinderService);
await collaborationService.ensureWorkflowEditable(workflowId);
const workflow = await workflowService.archive(user, workflowId, { skipArchived: true });
if (!workflow) {
throw new Error("Workflow not found or you don't have permission to archive it.");
}
void collaborationService.broadcastWorkflowUpdate(workflowId, user.id).catch(() => {});
telemetryPayload.results = {
success: true,
data: { workflowId },

View file

@ -7,6 +7,7 @@ import type { ToolDefinition, UserCalledMCPToolEventPayload } from '../../mcp.ty
import { CODE_BUILDER_VALIDATE_TOOL, MCP_UPDATE_WORKFLOW_TOOL } from './constants';
import { autoPopulateNodeCredentials, stripNullCredentialStubs } from './credentials-auto-assign';
import type { CollaborationService } from '@/collaboration/collaboration.service';
import type { CredentialsService } from '@/credentials/credentials.service';
import type { NodeTypes } from '@/node-types';
import type { UrlService } from '@/services/url.service';
@ -79,6 +80,7 @@ export const createUpdateWorkflowTool = (
nodeTypes: NodeTypes,
credentialsService: CredentialsService,
sharedWorkflowRepository: SharedWorkflowRepository,
collaborationService: CollaborationService,
): ToolDefinition<typeof inputSchema> => ({
name: MCP_UPDATE_WORKFLOW_TOOL.toolName,
config: {
@ -119,6 +121,8 @@ export const createUpdateWorkflowTool = (
workflowFinderService,
);
await collaborationService.ensureWorkflowEditable(existingWorkflow.id);
const { ParseValidateHandler, stripImportStatements } = await import(
'@n8n/ai-workflow-builder'
);
@ -177,6 +181,8 @@ export const createUpdateWorkflowTool = (
source: 'n8n-mcp',
});
void collaborationService.broadcastWorkflowUpdate(workflowId, user.id).catch(() => {});
const baseUrl = urlService.getInstanceBaseUrl();
const workflowUrl = `${baseUrl}/workflow/${updatedWorkflow.id}`;