fix(MCP Client Node): Ensure MCP connections close when MCP Client node execution ends (#25742)

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Hyuncheol Park 2026-04-09 21:51:28 +09:00 committed by GitHub
parent 0f4d558b36
commit 752a4e47d4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 937 additions and 181 deletions

View file

@ -238,90 +238,94 @@ export class McpClient implements INodeType {
throw mapToNodeOperationError(node, client.error);
}
const inputMode = this.getNodeParameter('inputMode', 0, 'manual') as 'manual' | 'json';
const items = this.getInputData();
const returnData: INodeExecutionData[] = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
try {
const tool = this.getNodeParameter('tool.value', itemIndex) as string;
const options = this.getNodeParameter('options', itemIndex);
let parameters: IDataObject = {};
if (inputMode === 'manual') {
parameters = this.getNodeParameter('parameters.value', itemIndex) as IDataObject;
} else {
parameters = this.getNodeParameter('jsonInput', itemIndex) as IDataObject;
}
const result = (await client.result.callTool(
{
name: tool,
arguments: parameters,
},
undefined,
{
timeout: options.timeout ? Number(options.timeout) : undefined,
},
)) as CallToolResult;
let binaryIndex = 0;
const binary: IBinaryKeyData = {};
const content: IDataObject[] = [];
const convertToBinary = options.convertToBinary ?? true;
for (const contentItem of result.content) {
if (contentItem.type === 'text') {
content.push({
...contentItem,
text: jsonParse(contentItem.text, { fallbackValue: contentItem.text }),
});
continue;
try {
const inputMode = this.getNodeParameter('inputMode', 0, 'manual') as 'manual' | 'json';
const items = this.getInputData();
const returnData: INodeExecutionData[] = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
try {
const tool = this.getNodeParameter('tool.value', itemIndex) as string;
const options = this.getNodeParameter('options', itemIndex);
let parameters: IDataObject = {};
if (inputMode === 'manual') {
parameters = this.getNodeParameter('parameters.value', itemIndex) as IDataObject;
} else {
parameters = this.getNodeParameter('jsonInput', itemIndex) as IDataObject;
}
if (convertToBinary && (contentItem.type === 'image' || contentItem.type === 'audio')) {
binary[`data_${binaryIndex}`] = await this.helpers.prepareBinaryData(
Buffer.from(contentItem.data, 'base64'),
undefined,
contentItem.mimeType,
);
binaryIndex++;
continue;
const result = (await client.result.callTool(
{
name: tool,
arguments: parameters,
},
undefined,
{
timeout: options.timeout ? Number(options.timeout) : undefined,
},
)) as CallToolResult;
let binaryIndex = 0;
const binary: IBinaryKeyData = {};
const content: IDataObject[] = [];
const convertToBinary = options.convertToBinary ?? true;
for (const contentItem of result.content) {
if (contentItem.type === 'text') {
content.push({
...contentItem,
text: jsonParse(contentItem.text, { fallbackValue: contentItem.text }),
});
continue;
}
if (convertToBinary && (contentItem.type === 'image' || contentItem.type === 'audio')) {
binary[`data_${binaryIndex}`] = await this.helpers.prepareBinaryData(
Buffer.from(contentItem.data, 'base64'),
undefined,
contentItem.mimeType,
);
binaryIndex++;
continue;
}
content.push(contentItem as IDataObject);
}
content.push(contentItem as IDataObject);
}
returnData.push({
json: {
content: content.length > 0 ? content : undefined,
},
binary: Object.keys(binary).length > 0 ? binary : undefined,
pairedItem: {
item: itemIndex,
},
});
} catch (e) {
const errorMessage =
e instanceof ZodError ? prettifyError(e) : e instanceof Error ? e.message : String(e);
if (this.continueOnFail()) {
returnData.push({
json: {
error: {
message: errorMessage,
issues: e instanceof ZodError ? e.issues : undefined,
},
content: content.length > 0 ? content : undefined,
},
binary: Object.keys(binary).length > 0 ? binary : undefined,
pairedItem: {
item: itemIndex,
},
});
continue;
} catch (e) {
const errorMessage =
e instanceof ZodError ? prettifyError(e) : e instanceof Error ? e.message : String(e);
if (this.continueOnFail()) {
returnData.push({
json: {
error: {
message: errorMessage,
issues: e instanceof ZodError ? e.issues : undefined,
},
},
pairedItem: {
item: itemIndex,
},
});
continue;
}
throw new NodeOperationError(node, errorMessage, {
itemIndex,
});
}
throw new NodeOperationError(node, errorMessage, {
itemIndex,
});
}
}
return [returnData];
return [returnData];
} finally {
await client.result.close();
}
}
}

View file

@ -1,9 +1,11 @@
import type { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { mockDeep } from 'jest-mock-extended';
import type { IExecuteFunctions } from 'n8n-workflow';
import { mock, mockDeep } from 'jest-mock-extended';
import type { IExecuteFunctions, ILoadOptionsFunctions } from 'n8n-workflow';
import * as sharedUtils from '../../shared/utils';
import { getTools } from '../listSearch';
import { McpClient } from '../McpClient.node';
import { getToolParameters } from '../resourceMapping';
describe('McpClient', () => {
const getAuthHeaders = jest.spyOn(sharedUtils, 'getAuthHeaders');
@ -218,4 +220,203 @@ describe('McpClient', () => {
[{ json: { error: { message: 'Tool call failed' } }, pairedItem: { item: 0 } }],
]);
});
it('should close client connection after successful execution', async () => {
executeFunctions.getNodeParameter.mockImplementation(
(key, _idx, defaultValue) => defaultParams[key as keyof typeof defaultParams] ?? defaultValue,
);
client.callTool.mockResolvedValue({
content: [{ type: 'text', text: 'result' }],
});
await new McpClient().execute.call(executeFunctions);
expect(client.close).toHaveBeenCalledTimes(1);
});
it('should close client connection even when tool call fails', async () => {
executeFunctions.getNodeParameter.mockImplementation(
(key, _idx, defaultValue) => defaultParams[key as keyof typeof defaultParams] ?? defaultValue,
);
client.callTool.mockRejectedValue(new Error('Tool call failed'));
await expect(new McpClient().execute.call(executeFunctions)).rejects.toThrow(
'Tool call failed',
);
expect(client.close).toHaveBeenCalledTimes(1);
});
describe('listSearch: getTools', () => {
it('should close client after listing tools', async () => {
client.listTools.mockResolvedValue({
tools: [
{
name: 'tool1',
description: 'A tool',
inputSchema: { type: 'object' },
},
],
});
const loadOptionsFunctions = mock<ILoadOptionsFunctions>({
getNode: jest.fn().mockReturnValue({
id: '123',
name: 'MCP Client',
type: '@n8n/n8n-nodes-langchain.mcpClient',
typeVersion: 1,
position: [0, 0],
parameters: {},
}),
getNodeParameter: jest.fn().mockImplementation((key: string) => {
const params: Record<string, unknown> = {
authentication: 'none',
serverTransport: 'httpStreamable',
endpointUrl: 'https://test.com/mcp',
};
return params[key];
}),
});
await getTools.call(loadOptionsFunctions);
expect(client.close).toHaveBeenCalledTimes(1);
});
it('should close client when listTools throws', async () => {
client.listTools.mockRejectedValue(new Error('listTools failed'));
const loadOptionsFunctions = mock<ILoadOptionsFunctions>({
getNode: jest.fn().mockReturnValue({
id: '123',
name: 'MCP Client',
type: '@n8n/n8n-nodes-langchain.mcpClient',
typeVersion: 1,
position: [0, 0],
parameters: {},
}),
getNodeParameter: jest.fn().mockImplementation((key: string) => {
const params: Record<string, unknown> = {
authentication: 'none',
serverTransport: 'httpStreamable',
endpointUrl: 'https://test.com/mcp',
};
return params[key];
}),
});
await expect(getTools.call(loadOptionsFunctions)).rejects.toThrow('listTools failed');
expect(client.close).toHaveBeenCalledTimes(1);
});
});
describe('resourceMapping: getToolParameters', () => {
it('should close client after getting tool parameters', async () => {
client.listTools.mockResolvedValue({
tools: [
{
name: 'tool1',
description: 'A tool',
inputSchema: {
type: 'object',
properties: {
input: { type: 'string' },
},
},
},
],
});
const loadOptionsFunctions = mock<ILoadOptionsFunctions>({
getNode: jest.fn().mockReturnValue({
id: '123',
name: 'MCP Client',
type: '@n8n/n8n-nodes-langchain.mcpClient',
typeVersion: 1,
position: [0, 0],
parameters: {},
}),
getNodeParameter: jest.fn().mockImplementation((key: string) => {
const params: Record<string, unknown> = {
tool: 'tool1',
authentication: 'none',
serverTransport: 'httpStreamable',
endpointUrl: 'https://test.com/mcp',
};
return params[key];
}),
});
await getToolParameters.call(loadOptionsFunctions);
expect(client.close).toHaveBeenCalledTimes(1);
});
it('should close client when getAllTools throws', async () => {
client.listTools.mockRejectedValue(new Error('getAllTools failed'));
const loadOptionsFunctions = mock<ILoadOptionsFunctions>({
getNode: jest.fn().mockReturnValue({
id: '123',
name: 'MCP Client',
type: '@n8n/n8n-nodes-langchain.mcpClient',
typeVersion: 1,
position: [0, 0],
parameters: {},
}),
getNodeParameter: jest.fn().mockImplementation((key: string) => {
const params: Record<string, unknown> = {
tool: 'tool1',
authentication: 'none',
serverTransport: 'httpStreamable',
endpointUrl: 'https://test.com/mcp',
};
return params[key];
}),
});
await expect(getToolParameters.call(loadOptionsFunctions)).rejects.toThrow(
'getAllTools failed',
);
expect(client.close).toHaveBeenCalledTimes(1);
});
it('should close client when tool not found', async () => {
client.listTools.mockResolvedValue({
tools: [
{
name: 'other_tool',
description: 'A different tool',
inputSchema: { type: 'object' },
},
],
});
const loadOptionsFunctions = mock<ILoadOptionsFunctions>({
getNode: jest.fn().mockReturnValue({
id: '123',
name: 'MCP Client',
type: '@n8n/n8n-nodes-langchain.mcpClient',
typeVersion: 1,
position: [0, 0],
parameters: {},
}),
getNodeParameter: jest.fn().mockImplementation((key: string) => {
const params: Record<string, unknown> = {
tool: 'nonexistent_tool',
authentication: 'none',
serverTransport: 'httpStreamable',
endpointUrl: 'https://test.com/mcp',
};
return params[key];
}),
});
await expect(getToolParameters.call(loadOptionsFunctions)).rejects.toThrow('Tool not found');
expect(client.close).toHaveBeenCalledTimes(1);
});
});
});

View file

@ -31,18 +31,22 @@ export async function getTools(
throw mapToNodeOperationError(node, client.error);
}
const result = await client.result.listTools({ cursor: paginationToken });
const tools = filter
? result.tools.filter((tool) => tool.name.toLowerCase().includes(filter.toLowerCase()))
: result.tools;
try {
const result = await client.result.listTools({ cursor: paginationToken });
const tools = filter
? result.tools.filter((tool) => tool.name.toLowerCase().includes(filter.toLowerCase()))
: result.tools;
return {
results: tools.map((tool) => ({
name: tool.name,
value: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
})),
paginationToken: result.nextCursor,
};
return {
results: tools.map((tool) => ({
name: tool.name,
value: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
})),
paginationToken: result.nextCursor,
};
} finally {
await client.result.close();
}
}

View file

@ -35,14 +35,18 @@ export async function getToolParameters(
throw mapToNodeOperationError(node, client.error);
}
const result = await getAllTools(client.result);
const tool = result.find((tool) => tool.name === toolId);
if (!tool) {
throw new NodeOperationError(this.getNode(), 'Tool not found');
}
try {
const result = await getAllTools(client.result);
const tool = result.find((tool) => tool.name === toolId);
if (!tool) {
throw new NodeOperationError(this.getNode(), 'Tool not found');
}
const fields = convertJsonSchemaToResourceMapperFields(tool.inputSchema);
return {
fields,
};
const fields = convertJsonSchemaToResourceMapperFields(tool.inputSchema);
return {
fields,
};
} finally {
await client.result.close();
}
}

View file

@ -100,15 +100,20 @@ async function connectAndGetTools(
return { client, mcpTools: null, error: client.error };
}
const allTools = await getAllTools(client.result);
const mcpTools = getSelectedTools({
tools: allTools,
mode: config.mode,
includeTools: config.includeTools,
excludeTools: config.excludeTools,
});
try {
const allTools = await getAllTools(client.result);
const mcpTools = getSelectedTools({
tools: allTools,
mode: config.mode,
includeTools: config.includeTools,
excludeTools: config.excludeTools,
});
return { client: client.result, mcpTools, error: null };
return { client: client.result, mcpTools, error: null };
} catch (error) {
await client.result.close();
throw error;
}
}
export class McpClientTool implements INodeType {
@ -429,51 +434,55 @@ export class McpClientTool implements INodeType {
throw new NodeOperationError(node, error.error, { itemIndex });
}
if (!mcpTools?.length) {
throw new NodeOperationError(node, 'MCP Server returned no tools', { itemIndex });
}
try {
if (!mcpTools?.length) {
throw new NodeOperationError(node, 'MCP Server returned no tools', { itemIndex });
}
// Check for tool name in item.json.tool (for toolkit execution from agent)
if (!item.json.tool || typeof item.json.tool !== 'string') {
throw new NodeOperationError(node, 'Tool name not found in item.json.tool or item.tool', {
itemIndex,
});
}
const toolName = item.json.tool;
for (const tool of mcpTools) {
const prefixedName = buildMcpToolName(node.name, tool.name);
if (toolName === prefixedName) {
// Extract the tool name from arguments before passing to MCP
const { tool: _, ...toolArguments } = item.json;
const schema: JSONSchema7 = tool.inputSchema;
// When additionalProperties is not explicitly true, filter to schema-defined properties.
// Otherwise, pass all arguments through
const sanitizedToolArguments: IDataObject =
schema.additionalProperties !== true
? pick(toolArguments, Object.keys(schema.properties ?? {}))
: toolArguments;
const params: {
name: string;
arguments: IDataObject;
} = {
name: tool.name,
arguments: sanitizedToolArguments,
};
const result = await client.callTool(params, CallToolResultSchema, {
timeout: config.timeout,
signal: this.getExecutionCancelSignal(),
});
returnData.push({
json: {
response: result.content as IDataObject,
},
pairedItem: {
item: itemIndex,
},
// Check for tool name in item.json.tool (for toolkit execution from agent)
if (!item.json.tool || typeof item.json.tool !== 'string') {
throw new NodeOperationError(node, 'Tool name not found in item.json.tool or item.tool', {
itemIndex,
});
}
const toolName = item.json.tool;
for (const tool of mcpTools) {
const prefixedName = buildMcpToolName(node.name, tool.name);
if (toolName === prefixedName) {
// Extract the tool name from arguments before passing to MCP
const { tool: _, ...toolArguments } = item.json;
const schema: JSONSchema7 = tool.inputSchema;
// When additionalProperties is not explicitly true, filter to schema-defined properties.
// Otherwise, pass all arguments through
const sanitizedToolArguments: IDataObject =
schema.additionalProperties !== true
? pick(toolArguments, Object.keys(schema.properties ?? {}))
: toolArguments;
const params: {
name: string;
arguments: IDataObject;
} = {
name: tool.name,
arguments: sanitizedToolArguments,
};
const result = await client.callTool(params, CallToolResultSchema, {
timeout: config.timeout,
signal: this.getExecutionCancelSignal(),
});
returnData.push({
json: {
response: result.content as IDataObject,
},
pairedItem: {
item: itemIndex,
},
});
}
}
} finally {
await client.close();
}
}

View file

@ -63,6 +63,26 @@ describe('McpClientTool', () => {
getTools.call(mock<ILoadOptionsFunctions>({ getNode: jest.fn(() => node) })),
).rejects.toEqual(new NodeOperationError(node, 'Could not connect to your MCP server'));
});
it('should close client after listing tools', async () => {
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
jest.spyOn(Client.prototype, 'listTools').mockResolvedValue({
tools: [
{
name: 'MyTool',
description: 'MyTool does something',
inputSchema: { type: 'object', properties: { input: { type: 'string' } } },
},
],
});
const closeSpy = jest.spyOn(Client.prototype, 'close').mockResolvedValue();
await getTools.call(
mock<ILoadOptionsFunctions>({ getNode: jest.fn(() => mock<INode>({ typeVersion: 1 })) }),
);
expect(closeSpy).toHaveBeenCalled();
});
});
describe('supplyData', () => {
@ -526,6 +546,34 @@ describe('McpClientTool', () => {
);
});
it('should call client.close() when closeFunction is invoked', async () => {
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
jest.spyOn(Client.prototype, 'listTools').mockResolvedValue({
tools: [
{
name: 'MyTool1',
description: 'MyTool1 does something',
inputSchema: { type: 'object', properties: { input: { type: 'string' } } },
},
],
});
const closeSpy = jest.spyOn(Client.prototype, 'close').mockResolvedValue();
const supplyDataResult = await new McpClientTool().supplyData.call(
mock<ISupplyDataFunctions>({
getNode: jest.fn(() => mock<INode>({ typeVersion: 1, name: 'McpClientTool' })),
logger: { debug: jest.fn(), error: jest.fn() },
addInputData: jest.fn(() => ({ index: 0 })),
}),
0,
);
expect(supplyDataResult.closeFunction).toBeDefined();
await supplyDataResult.closeFunction?.();
expect(closeSpy).toHaveBeenCalledTimes(1);
});
it('should support setting a timeout', async () => {
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
const callToolSpy = jest
@ -1237,6 +1285,193 @@ describe('McpClientTool', () => {
);
expect(connectSpy).not.toHaveBeenCalled();
});
it('should close client connection after execution', async () => {
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
jest.spyOn(Client.prototype, 'callTool').mockResolvedValue({
content: [{ type: 'text', text: 'Weather is sunny' }],
});
jest.spyOn(Client.prototype, 'listTools').mockResolvedValue({
tools: [
{
name: 'get_weather',
description: 'Gets the weather',
inputSchema: { type: 'object', properties: { location: { type: 'string' } } },
},
],
});
const closeSpy = jest.spyOn(Client.prototype, 'close').mockResolvedValue();
const mockNode = mock<INode>({
typeVersion: 1,
type: 'mcpClientTool',
name: 'McpClientTool',
});
const mockExecuteFunctions = mock<any>({
getNode: jest.fn(() => mockNode),
getInputData: jest.fn(() => [
{
json: {
tool: 'get_weather',
location: 'Berlin',
},
},
]),
getNodeParameter: jest.fn((key) => {
const params: Record<string, any> = {
include: 'all',
includeTools: [],
excludeTools: [],
authentication: 'none',
sseEndpoint: 'https://test.com/sse',
'options.timeout': 60000,
};
return params[key];
}),
});
await new McpClientTool().execute.call(mockExecuteFunctions);
expect(closeSpy).toHaveBeenCalled();
});
it('should close client connection even when tool call fails', async () => {
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
jest.spyOn(Client.prototype, 'callTool').mockRejectedValue(new Error('Tool call failed'));
jest.spyOn(Client.prototype, 'listTools').mockResolvedValue({
tools: [
{
name: 'get_weather',
description: 'Gets the weather',
inputSchema: { type: 'object', properties: { location: { type: 'string' } } },
},
],
});
const closeSpy = jest.spyOn(Client.prototype, 'close').mockResolvedValue();
const mockNode = mock<INode>({ typeVersion: 1, type: 'mcpClientTool' });
const mockExecuteFunctions = mock<any>({
getNode: jest.fn(() => mockNode),
getInputData: jest.fn(() => [
{
json: {
tool: 'get_weather',
location: 'Berlin',
},
},
]),
getNodeParameter: jest.fn((key) => {
const params: Record<string, any> = {
include: 'all',
includeTools: [],
excludeTools: [],
authentication: 'none',
sseEndpoint: 'https://test.com/sse',
'options.timeout': 60000,
};
return params[key];
}),
});
await expect(new McpClientTool().execute.call(mockExecuteFunctions)).rejects.toThrow();
expect(closeSpy).toHaveBeenCalled();
});
it('should close client when mcpTools is empty', async () => {
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
jest.spyOn(Client.prototype, 'listTools').mockResolvedValue({ tools: [] });
const closeSpy = jest.spyOn(Client.prototype, 'close').mockResolvedValue();
const mockNode = mock<INode>({ typeVersion: 1, type: 'mcpClientTool' });
const mockExecuteFunctions = mock<any>({
getNode: jest.fn(() => mockNode),
getInputData: jest.fn(() => [{ json: { tool: 'get_weather' } }]),
getNodeParameter: jest.fn((key) => {
const params: Record<string, any> = {
include: 'all',
includeTools: [],
excludeTools: [],
authentication: 'none',
sseEndpoint: 'https://test.com/sse',
'options.timeout': 60000,
};
return params[key];
}),
});
await expect(new McpClientTool().execute.call(mockExecuteFunctions)).rejects.toThrow(
'MCP Server returned no tools',
);
expect(closeSpy).toHaveBeenCalled();
});
it('should close client when item.json.tool is missing', async () => {
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
jest.spyOn(Client.prototype, 'listTools').mockResolvedValue({
tools: [
{
name: 'get_weather',
description: 'Gets the weather',
inputSchema: { type: 'object', properties: { location: { type: 'string' } } },
},
],
});
const closeSpy = jest.spyOn(Client.prototype, 'close').mockResolvedValue();
const mockNode = mock<INode>({ typeVersion: 1, type: 'mcpClientTool' });
const mockExecuteFunctions = mock<any>({
getNode: jest.fn(() => mockNode),
getInputData: jest.fn(() => [{ json: { location: 'Berlin' } }]),
getNodeParameter: jest.fn((key) => {
const params: Record<string, any> = {
include: 'all',
includeTools: [],
excludeTools: [],
authentication: 'none',
sseEndpoint: 'https://test.com/sse',
'options.timeout': 60000,
};
return params[key];
}),
});
await expect(new McpClientTool().execute.call(mockExecuteFunctions)).rejects.toThrow(
'Tool name not found',
);
expect(closeSpy).toHaveBeenCalled();
});
it('should close client when getAllTools throws after connection succeeds', async () => {
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
jest.spyOn(Client.prototype, 'listTools').mockRejectedValue(new Error('listTools failed'));
const closeSpy = jest.spyOn(Client.prototype, 'close').mockResolvedValue();
const mockNode = mock<INode>({ typeVersion: 1, type: 'mcpClientTool' });
const mockExecuteFunctions = mock<any>({
getNode: jest.fn(() => mockNode),
getInputData: jest.fn(() => [{ json: { tool: 'get_weather' } }]),
getNodeParameter: jest.fn((key) => {
const params: Record<string, any> = {
include: 'all',
includeTools: [],
excludeTools: [],
authentication: 'none',
sseEndpoint: 'https://test.com/sse',
'options.timeout': 60000,
};
return params[key];
}),
});
await expect(new McpClientTool().execute.call(mockExecuteFunctions)).rejects.toThrow(
'listTools failed',
);
expect(closeSpy).toHaveBeenCalled();
});
});
describe('supplyData tool name prefixing', () => {

View file

@ -35,11 +35,15 @@ export async function getTools(this: ILoadOptionsFunctions): Promise<INodeProper
throw mapToNodeOperationError(node, client.error);
}
const tools = await getAllTools(client.result);
return tools.map((tool) => ({
name: tool.name,
value: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
}));
try {
const tools = await getAllTools(client.result);
return tools.map((tool) => ({
name: tool.name,
value: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
}));
} finally {
await client.result.close();
}
}

View file

@ -557,6 +557,286 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
expect(closeFunction1).toHaveBeenCalled();
expect(closeFunction2).toHaveBeenCalled();
});
it('should call close functions when execute returns an EngineRequest', async () => {
const engineRequest = { actions: [{ type: 'test' }], metadata: {} };
const closeFunction1 = jest.fn().mockResolvedValue(undefined);
const closeFunction2 = jest.fn().mockResolvedValue(undefined);
mockNodeType.execute = jest.fn().mockResolvedValue(engineRequest);
const mockContextInstance = {
hints: [],
};
mockExecuteContext.mockImplementation(
(
_workflow,
_node,
_additionalData,
_mode,
_runExecutionData,
_runIndex,
_connectionInputData,
_inputData,
_executionData,
closeFunctions,
) => {
closeFunctions.push(closeFunction1, closeFunction2);
return mockContextInstance as unknown as ExecuteContext;
},
);
const result = await workflowExecute.runNode(
mockWorkflow,
mockExecutionData,
mockRunExecutionData,
0,
mockAdditionalData,
'manual',
);
expect(result).toEqual(engineRequest);
expect(closeFunction1).toHaveBeenCalled();
expect(closeFunction2).toHaveBeenCalled();
});
it('should call close functions when execute throws an error', async () => {
const closeFunction1 = jest.fn().mockResolvedValue(undefined);
const closeFunction2 = jest.fn().mockResolvedValue(undefined);
mockNodeType.execute = jest.fn().mockRejectedValue(new Error('Execution failed'));
const mockContextInstance = {
hints: [],
};
mockExecuteContext.mockImplementation(
(
_workflow,
_node,
_additionalData,
_mode,
_runExecutionData,
_runIndex,
_connectionInputData,
_inputData,
_executionData,
closeFunctions,
) => {
closeFunctions.push(closeFunction1, closeFunction2);
return mockContextInstance as unknown as ExecuteContext;
},
);
await expect(
workflowExecute.runNode(
mockWorkflow,
mockExecutionData,
mockRunExecutionData,
0,
mockAdditionalData,
'manual',
),
).rejects.toThrow('Execution failed');
expect(closeFunction1).toHaveBeenCalled();
expect(closeFunction2).toHaveBeenCalled();
});
it('should call all close functions via Promise.allSettled even when some fail', async () => {
const mockData = [[{ json: { result: 'test' } }]];
const closeFunction1 = jest.fn().mockResolvedValue(undefined);
const closeFunction2 = jest.fn().mockRejectedValue(new Error('Close error 1'));
const closeFunction3 = jest.fn().mockResolvedValue(undefined);
mockNodeType.execute = jest.fn().mockResolvedValue(mockData);
const mockContextInstance = {
hints: [],
};
mockExecuteContext.mockImplementation(
(
_workflow,
_node,
_additionalData,
_mode,
_runExecutionData,
_runIndex,
_connectionInputData,
_inputData,
_executionData,
closeFunctions,
) => {
closeFunctions.push(closeFunction1, closeFunction2, closeFunction3);
return mockContextInstance as unknown as ExecuteContext;
},
);
await expect(
workflowExecute.runNode(
mockWorkflow,
mockExecutionData,
mockRunExecutionData,
0,
mockAdditionalData,
'manual',
),
).rejects.toThrow('Close error 1');
expect(closeFunction1).toHaveBeenCalled();
expect(closeFunction2).toHaveBeenCalled();
expect(closeFunction3).toHaveBeenCalled();
});
it('should throw close function error when EngineRequest is returned', async () => {
const engineRequest = { actions: [{ type: 'test' }], metadata: {} };
const closeFunction1 = jest.fn().mockRejectedValue(new Error('Close error on EngineRequest'));
mockNodeType.execute = jest.fn().mockResolvedValue(engineRequest);
const mockContextInstance = {
hints: [],
};
mockExecuteContext.mockImplementation(
(
_workflow,
_node,
_additionalData,
_mode,
_runExecutionData,
_runIndex,
_connectionInputData,
_inputData,
_executionData,
closeFunctions,
) => {
closeFunctions.push(closeFunction1);
return mockContextInstance as unknown as ExecuteContext;
},
);
await expect(
workflowExecute.runNode(
mockWorkflow,
mockExecutionData,
mockRunExecutionData,
0,
mockAdditionalData,
'manual',
),
).rejects.toThrow('Close error on EngineRequest');
expect(closeFunction1).toHaveBeenCalled();
});
it('should call close functions after custom operation completes', async () => {
const mockData = [[{ json: { result: 'custom operation result' } }]];
const mockCustomOperation = jest.fn().mockResolvedValue(mockData);
const closeFunction1 = jest.fn().mockResolvedValue(undefined);
const closeFunction2 = jest.fn().mockResolvedValue(undefined);
const customOpNode = {
...mockNode,
parameters: {
resource: 'testResource',
operation: 'testOperation',
},
};
const customOpNodeType = {
...mockNodeType,
customOperations: {
testResource: {
testOperation: mockCustomOperation,
},
},
execute: undefined,
};
mockWorkflow.nodeTypes.getByNameAndVersion = jest.fn().mockReturnValue(customOpNodeType);
const customOpExecutionData = {
...mockExecutionData,
node: customOpNode,
};
const mockContextInstance = { hints: [] };
mockExecuteContext.mockImplementation(
(
_workflow,
_node,
_additionalData,
_mode,
_runExecutionData,
_runIndex,
_connectionInputData,
_inputData,
_executionData,
closeFunctions,
) => {
closeFunctions.push(closeFunction1, closeFunction2);
return mockContextInstance as unknown as ExecuteContext;
},
);
const result = await workflowExecute.runNode(
mockWorkflow,
customOpExecutionData,
mockRunExecutionData,
0,
mockAdditionalData,
'manual',
);
expect(mockCustomOperation).toHaveBeenCalled();
expect(result).toEqual({ data: mockData, hints: [] });
expect(closeFunction1).toHaveBeenCalled();
expect(closeFunction2).toHaveBeenCalled();
});
it('should not mask execution error with close function error', async () => {
const closeFunction1 = jest.fn().mockRejectedValue(new Error('Close error'));
mockNodeType.execute = jest.fn().mockRejectedValue(new Error('Execution failed'));
const mockContextInstance = {
hints: [],
};
mockExecuteContext.mockImplementation(
(
_workflow,
_node,
_additionalData,
_mode,
_runExecutionData,
_runIndex,
_connectionInputData,
_inputData,
_executionData,
closeFunctions,
) => {
closeFunctions.push(closeFunction1);
return mockContextInstance as unknown as ExecuteContext;
},
);
await expect(
workflowExecute.runNode(
mockWorkflow,
mockExecutionData,
mockRunExecutionData,
0,
mockAdditionalData,
'manual',
),
).rejects.toThrow('Execution failed');
expect(closeFunction1).toHaveBeenCalled();
});
});
describe('poll node type handling', () => {

View file

@ -1035,42 +1035,57 @@ export class WorkflowExecute {
);
let data: INodeExecutionData[][] | EngineRequest | null;
let executionSucceeded = false;
let closingError: Error | undefined;
if (customOperation) {
data = await customOperation.call(context);
} else if (nodeType.execute) {
data =
nodeType instanceof Node
? await nodeType.execute(context, subNodeExecutionResults)
: await nodeType.execute.call(context, subNodeExecutionResults);
} else {
throw new UnexpectedError(
"Can't execute node. There is no custom operation and the node has not execute function.",
);
try {
if (customOperation) {
data = await customOperation.call(context);
} else if (nodeType.execute) {
data =
nodeType instanceof Node
? await nodeType.execute(context, subNodeExecutionResults)
: await nodeType.execute.call(context, subNodeExecutionResults);
} else {
throw new UnexpectedError(
"Can't execute node. There is no custom operation and the node has not execute function.",
);
}
executionSucceeded = true;
} finally {
if (closeFunctions.length > 0) {
const closeFunctionsResults = await Promise.allSettled(
closeFunctions.map(async (fn) => await fn()),
);
// Only throw close function errors if the execution itself succeeded,
// to avoid masking the original execution error.
if (executionSucceeded) {
const closingErrors = closeFunctionsResults
.filter((result): result is PromiseRejectedResult => result.status === 'rejected')
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
.map((result) => result.reason);
if (closingErrors.length > 0) {
closingError =
closingErrors[0] instanceof Error
? closingErrors[0]
: new ApplicationError("Error on execution node's close function(s)", {
extra: { nodeName: node.name },
tags: { nodeType: node.type },
cause: closingErrors,
});
}
}
}
}
if (closingError) throw closingError;
if (isEngineRequest(data)) {
return data;
}
const closeFunctionsResults = await Promise.allSettled(
closeFunctions.map(async (fn) => await fn()),
);
const closingErrors = closeFunctionsResults
.filter((result): result is PromiseRejectedResult => result.status === 'rejected')
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
.map((result) => result.reason);
if (closingErrors.length > 0) {
if (closingErrors[0] instanceof Error) throw closingErrors[0];
throw new ApplicationError("Error on execution node's close function(s)", {
extra: { nodeName: node.name },
tags: { nodeType: node.type },
cause: closingErrors,
});
}
return { data, hints: context.hints };
}