mirror of
https://github.com/n8n-io/n8n
synced 2026-04-21 15:47:20 +00:00
feat(core): Add isolate pooling for VM expression engine (#27573)
This commit is contained in:
parent
ccd04b0edf
commit
5ee0e842b8
40 changed files with 1161 additions and 416 deletions
24
.github/workflows/test-e2e-ci-reusable.yml
vendored
24
.github/workflows/test-e2e-ci-reusable.yml
vendored
|
|
@ -13,6 +13,28 @@ on:
|
|||
required: false
|
||||
type: boolean
|
||||
default: false
|
||||
n8n-env:
|
||||
description: 'JSON string of n8n env vars to inject into test containers, e.g. {"N8N_EXPRESSION_ENGINE":"vm"}'
|
||||
required: false
|
||||
type: string
|
||||
default: ''
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
branch:
|
||||
description: 'GitHub branch/ref to test'
|
||||
required: false
|
||||
type: string
|
||||
default: ''
|
||||
playwright-only:
|
||||
description: 'Only Playwright files changed — run impacted tests only'
|
||||
required: false
|
||||
type: boolean
|
||||
default: false
|
||||
n8n-env:
|
||||
description: 'JSON string of n8n env vars to inject into test containers (e.g. {"N8N_EXPRESSION_ENGINE":"vm"})'
|
||||
required: false
|
||||
type: string
|
||||
default: ''
|
||||
|
||||
env:
|
||||
DOCKER_IMAGE: ghcr.io/${{ github.repository }}:ci-${{ github.run_id }}
|
||||
|
|
@ -90,6 +112,7 @@ jobs:
|
|||
runner: blacksmith-2vcpu-ubuntu-2204
|
||||
workers: '1'
|
||||
pre-generated-matrix: '[{"shard":1,"images":""}]'
|
||||
n8n-env: ${{ inputs.n8n-env }}
|
||||
|
||||
# Multi-main: postgres + redis + caddy + 2 mains + 1 worker
|
||||
# Only runs for internal PRs (not community/fork PRs)
|
||||
|
|
@ -109,6 +132,7 @@ jobs:
|
|||
workers: '1'
|
||||
use-custom-orchestration: true
|
||||
pre-generated-matrix: ${{ needs.prepare.outputs.matrix }}
|
||||
n8n-env: ${{ inputs.n8n-env }}
|
||||
secrets: inherit
|
||||
|
||||
# Community PR tests: Local mode with SQLite (no container building, no secrets required)
|
||||
|
|
|
|||
7
.github/workflows/test-e2e-reusable.yml
vendored
7
.github/workflows/test-e2e-reusable.yml
vendored
|
|
@ -62,7 +62,11 @@ on:
|
|||
required: false
|
||||
default: ''
|
||||
type: string
|
||||
|
||||
n8n-env:
|
||||
description: 'JSON string of n8n env vars to inject into test containers, e.g. {"N8N_EXPRESSION_ENGINE":"vm"}'
|
||||
required: false
|
||||
default: ''
|
||||
type: string
|
||||
|
||||
env:
|
||||
NODE_OPTIONS: ${{ contains(inputs.runner, '2vcpu') && '--max-old-space-size=6144' || '' }}
|
||||
|
|
@ -149,6 +153,7 @@ jobs:
|
|||
N8N_LICENSE_ACTIVATION_KEY: ${{ secrets.N8N_LICENSE_ACTIVATION_KEY }}
|
||||
N8N_LICENSE_CERT: ${{ secrets.N8N_LICENSE_CERT }}
|
||||
N8N_ENCRYPTION_KEY: ${{ secrets.N8N_ENCRYPTION_KEY }}
|
||||
N8N_TEST_ENV: ${{ inputs.n8n-env }}
|
||||
|
||||
- name: Upload Failure Artifacts
|
||||
if: ${{ failure() && inputs.upload-failure-artifacts }}
|
||||
|
|
|
|||
26
packages/@n8n/config/src/configs/expression-engine.config.ts
Normal file
26
packages/@n8n/config/src/configs/expression-engine.config.ts
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
import z from 'zod';
|
||||
|
||||
import { Config, Env } from '../decorators';
|
||||
|
||||
const expressionEngineSchema = z.enum(['legacy', 'vm']);
|
||||
|
||||
@Config
|
||||
export class ExpressionEngineConfig {
|
||||
/**
|
||||
* Which expression engine to use.
|
||||
* - `legacy` runs expressions without isolation.
|
||||
* - `vm` runs expressions in a V8 isolate.
|
||||
*
|
||||
* `vm` is currently **experimental**. Use at your own risk.
|
||||
*/
|
||||
@Env('N8N_EXPRESSION_ENGINE', expressionEngineSchema)
|
||||
engine: 'legacy' | 'vm' = 'legacy';
|
||||
|
||||
/** Number of V8 isolates ready in the pool. */
|
||||
@Env('N8N_EXPRESSION_ENGINE_POOL_SIZE')
|
||||
poolSize: number = 1;
|
||||
|
||||
/** Max number of AST-transformed expressions to cache. */
|
||||
@Env('N8N_EXPRESSION_ENGINE_MAX_CODE_CACHE_SIZE')
|
||||
maxCodeCacheSize: number = 1024;
|
||||
}
|
||||
|
|
@ -15,6 +15,7 @@ import { DynamicBannersConfig } from './configs/dynamic-banners.config';
|
|||
import { EndpointsConfig } from './configs/endpoints.config';
|
||||
import { EventBusConfig } from './configs/event-bus.config';
|
||||
import { ExecutionsConfig } from './configs/executions.config';
|
||||
import { ExpressionEngineConfig } from './configs/expression-engine.config';
|
||||
import { ExternalHooksConfig } from './configs/external-hooks.config';
|
||||
import { GenericConfig } from './configs/generic.config';
|
||||
import { HiringBannerConfig } from './configs/hiring-banner.config';
|
||||
|
|
@ -66,6 +67,7 @@ export { NodesConfig } from './configs/nodes.config';
|
|||
export { CronLoggingConfig } from './configs/logging.config';
|
||||
export { WorkflowHistoryCompactionConfig } from './configs/workflow-history-compaction.config';
|
||||
export { ChatHubConfig } from './configs/chat-hub.config';
|
||||
export { ExpressionEngineConfig } from './configs/expression-engine.config';
|
||||
export { PasswordConfig } from './configs/password.config';
|
||||
|
||||
const protocolSchema = z.enum(['http', 'https']);
|
||||
|
|
@ -242,4 +244,7 @@ export class GlobalConfig {
|
|||
|
||||
@Nested
|
||||
chatHub: ChatHubConfig;
|
||||
|
||||
@Nested
|
||||
expressionEngine: ExpressionEngineConfig;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -461,6 +461,11 @@ describe('GlobalConfig', () => {
|
|||
trimmingTimeWindowDays: 2,
|
||||
trimOnStartUp: false,
|
||||
},
|
||||
expressionEngine: {
|
||||
engine: 'legacy',
|
||||
poolSize: 1,
|
||||
maxCodeCacheSize: 1024,
|
||||
},
|
||||
} satisfies GlobalConfigShape;
|
||||
|
||||
it('should use all default values when no env variables are defined', () => {
|
||||
|
|
|
|||
|
|
@ -6,14 +6,19 @@ import { TimeoutError, MemoryLimitError } from '../types';
|
|||
|
||||
describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
||||
let evaluator: ExpressionEvaluator;
|
||||
const caller = {};
|
||||
|
||||
beforeAll(async () => {
|
||||
const bridge = new IsolatedVmBridge({ timeout: 5000 });
|
||||
evaluator = new ExpressionEvaluator({ bridge, maxCodeCacheSize: 1024 });
|
||||
evaluator = new ExpressionEvaluator({
|
||||
createBridge: () => new IsolatedVmBridge({ timeout: 5000 }),
|
||||
maxCodeCacheSize: 1024,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
await evaluator.acquire(caller);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await evaluator.release(caller);
|
||||
await evaluator.dispose();
|
||||
});
|
||||
|
||||
|
|
@ -22,7 +27,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
$json: { email: 'test@example.com' },
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.email }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.email }}', data, caller);
|
||||
|
||||
expect(result).toBe('test@example.com');
|
||||
});
|
||||
|
|
@ -38,7 +43,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
},
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.user.profile.name }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.user.profile.name }}', data, caller);
|
||||
|
||||
expect(result).toBe('John Doe');
|
||||
});
|
||||
|
|
@ -50,7 +55,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
},
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.items[1].id }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.items[1].id }}', data, caller);
|
||||
|
||||
expect(result).toBe(2);
|
||||
});
|
||||
|
|
@ -63,7 +68,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
},
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.price * $json.quantity }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.price * $json.quantity }}', data, caller);
|
||||
|
||||
expect(result).toBe(300);
|
||||
});
|
||||
|
|
@ -78,7 +83,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
const result = evaluator.evaluate(
|
||||
'{{ DateTime.fromISO($json.date).toFormat("MMMM dd, yyyy") }}',
|
||||
data,
|
||||
{},
|
||||
caller,
|
||||
);
|
||||
|
||||
expect(result).toBe('January 15, 2024');
|
||||
|
|
@ -91,7 +96,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
},
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $items() }}', data);
|
||||
const result = evaluator.evaluate('{{ $items() }}', data, caller);
|
||||
|
||||
expect(result).toBe('items-result');
|
||||
});
|
||||
|
|
@ -101,7 +106,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
$json: { zero: 0 },
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.zero }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.zero }}', data, caller);
|
||||
|
||||
expect(result).toBe(0);
|
||||
});
|
||||
|
|
@ -111,7 +116,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
$json: { empty: '' },
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.empty }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.empty }}', data, caller);
|
||||
|
||||
expect(result).toBe('');
|
||||
});
|
||||
|
|
@ -121,7 +126,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
$json: { items: ['first', 'second'] },
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.items[0] }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.items[0] }}', data, caller);
|
||||
|
||||
expect(result).toBe('first');
|
||||
});
|
||||
|
|
@ -131,7 +136,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
$json: { numbers: [42, 99] },
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.numbers[0] }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.numbers[0] }}', data, caller);
|
||||
|
||||
expect(result).toBe(42);
|
||||
});
|
||||
|
|
@ -141,7 +146,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
$json: { items: [1, 2, 3] },
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.items.length }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.items.length }}', data, caller);
|
||||
|
||||
expect(result).toBe(3);
|
||||
});
|
||||
|
|
@ -151,7 +156,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
$json: { field: null },
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.field }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.field }}', data, caller);
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
|
@ -161,7 +166,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
$json: { active: true },
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ $json.active }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.active }}', data, caller);
|
||||
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
|
|
@ -175,7 +180,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
};
|
||||
|
||||
// Access element deep in the array via lazy proxy
|
||||
const result = evaluator.evaluate('{{ $json.items[150].id }}', data);
|
||||
const result = evaluator.evaluate('{{ $json.items[150].id }}', data, caller);
|
||||
|
||||
expect(result).toBe(150);
|
||||
});
|
||||
|
|
@ -188,6 +193,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
const result = evaluator.evaluate(
|
||||
'{{ DateTime.fromMillis($json.ts).toFormat("HH:mm ZZ") }}',
|
||||
data,
|
||||
caller,
|
||||
{ timezone: 'America/New_York' },
|
||||
);
|
||||
|
||||
|
|
@ -203,6 +209,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
const result = evaluator.evaluate(
|
||||
'{{ DateTime.fromMillis($json.ts).toFormat("HH:mm ZZ") }}',
|
||||
data,
|
||||
caller,
|
||||
{ timezone: 'Asia/Tokyo' },
|
||||
);
|
||||
|
||||
|
|
@ -213,7 +220,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
describe('Luxon type serialization at boundary', () => {
|
||||
it('should return DateTime as ISO string', () => {
|
||||
const data = { $json: {} };
|
||||
const result = evaluator.evaluate('{{ DateTime.now() }}', data);
|
||||
const result = evaluator.evaluate('{{ DateTime.now() }}', data, caller);
|
||||
expect(typeof result).toBe('string');
|
||||
const dt = DateTime.fromISO(result as string);
|
||||
expect(dt.isValid).toBe(true);
|
||||
|
|
@ -221,7 +228,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
|
||||
it('should return Duration as ISO string', () => {
|
||||
const data = { $json: {} };
|
||||
const result = evaluator.evaluate('{{ Duration.fromMillis(3600000) }}', data);
|
||||
const result = evaluator.evaluate('{{ Duration.fromMillis(3600000) }}', data, caller);
|
||||
expect(typeof result).toBe('string');
|
||||
const duration = Duration.fromISO(result as string);
|
||||
expect(duration.isValid).toBe(true);
|
||||
|
|
@ -233,6 +240,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
const result = evaluator.evaluate(
|
||||
'{{ Interval.after(DateTime.fromISO("2024-01-01"), 86400000) }}',
|
||||
data,
|
||||
caller,
|
||||
);
|
||||
expect(typeof result).toBe('string');
|
||||
const interval = Interval.fromISO(result as string);
|
||||
|
|
@ -245,6 +253,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
const result = evaluator.evaluate(
|
||||
'{{ ({ date: DateTime.fromISO("2024-01-15") }) }}',
|
||||
data,
|
||||
caller,
|
||||
) as Record<string, unknown>;
|
||||
expect(typeof result.date).toBe('string');
|
||||
const dt = DateTime.fromISO(result.date as string);
|
||||
|
|
@ -254,20 +263,20 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
|
||||
it('should not affect primitive return values', () => {
|
||||
const data = { $json: { count: 42 } };
|
||||
expect(evaluator.evaluate('{{ $json.count }}', data)).toBe(42);
|
||||
expect(evaluator.evaluate('{{ $json.count > 10 }}', data)).toBe(true);
|
||||
expect(evaluator.evaluate('{{ "hello" }}', data)).toBe('hello');
|
||||
expect(evaluator.evaluate('{{ $json.count }}', data, caller)).toBe(42);
|
||||
expect(evaluator.evaluate('{{ $json.count > 10 }}', data, caller)).toBe(true);
|
||||
expect(evaluator.evaluate('{{ "hello" }}', data, caller)).toBe('hello');
|
||||
});
|
||||
|
||||
it('should return null for invalid DateTime', () => {
|
||||
const data = { $json: {} };
|
||||
const result = evaluator.evaluate('{{ DateTime.invalid("test") }}', data);
|
||||
const result = evaluator.evaluate('{{ DateTime.invalid("test") }}', data, caller);
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it('should preserve Date objects (structured-cloneable)', () => {
|
||||
const data = { $json: {} };
|
||||
const result = evaluator.evaluate('{{ new Date(2024, 0, 15) }}', data);
|
||||
const result = evaluator.evaluate('{{ new Date(2024, 0, 15) }}', data, caller);
|
||||
expect(result).toBeInstanceOf(Date);
|
||||
expect((result as Date).getFullYear()).toBe(2024);
|
||||
expect((result as Date).getMonth()).toBe(0);
|
||||
|
|
@ -278,15 +287,15 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
it('should throw on invalid timezone', async () => {
|
||||
const data = { $json: { x: 1 } };
|
||||
|
||||
expect(() => evaluator.evaluate('{{ $json.x }}', data, { timezone: 'Not/A/Timezone' })).toThrow(
|
||||
'Invalid timezone: "Not/A/Timezone"',
|
||||
);
|
||||
expect(() =>
|
||||
evaluator.evaluate('{{ $json.x }}', data, caller, { timezone: 'Not/A/Timezone' }),
|
||||
).toThrow('Invalid timezone: "Not/A/Timezone"');
|
||||
});
|
||||
|
||||
it('should create $now with the provided timezone', async () => {
|
||||
const data = { $json: {} };
|
||||
|
||||
const zone = evaluator.evaluate('{{ $now.zoneName }}', data, {
|
||||
const zone = evaluator.evaluate('{{ $now.zoneName }}', data, caller, {
|
||||
timezone: 'America/New_York',
|
||||
});
|
||||
|
||||
|
|
@ -296,7 +305,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
it('should create $today with the provided timezone', async () => {
|
||||
const data = { $json: {} };
|
||||
|
||||
const zone = evaluator.evaluate('{{ $today.zoneName }}', data, {
|
||||
const zone = evaluator.evaluate('{{ $today.zoneName }}', data, caller, {
|
||||
timezone: 'Asia/Tokyo',
|
||||
});
|
||||
|
||||
|
|
@ -312,15 +321,20 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
const systemOffset = evaluator.evaluate(
|
||||
'{{ DateTime.fromMillis($json.ts).toFormat("ZZ") }}',
|
||||
data,
|
||||
caller,
|
||||
);
|
||||
|
||||
// Evaluate with explicit timezone (changes Settings.defaultZone)
|
||||
evaluator.evaluate('{{ DateTime.fromMillis($json.ts).toFormat("HH:mm ZZ") }}', data, {
|
||||
evaluator.evaluate('{{ DateTime.fromMillis($json.ts).toFormat("HH:mm ZZ") }}', data, caller, {
|
||||
timezone: 'Asia/Tokyo',
|
||||
});
|
||||
|
||||
// Evaluate WITHOUT timezone — should reset to system default, not keep Tokyo
|
||||
const result = evaluator.evaluate('{{ DateTime.fromMillis($json.ts).toFormat("ZZ") }}', data);
|
||||
const result = evaluator.evaluate(
|
||||
'{{ DateTime.fromMillis($json.ts).toFormat("ZZ") }}',
|
||||
data,
|
||||
caller,
|
||||
);
|
||||
|
||||
expect(result).toBe(systemOffset);
|
||||
});
|
||||
|
|
@ -330,7 +344,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
$json: { name: 'Alice', age: 30, city: 'Berlin' },
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate('{{ Object.keys($json).join(",") }}', data);
|
||||
const result = evaluator.evaluate('{{ Object.keys($json).join(",") }}', data, caller);
|
||||
|
||||
expect(result).toBe('name,age,city');
|
||||
});
|
||||
|
|
@ -351,7 +365,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
' throw e;' +
|
||||
'})() }}';
|
||||
|
||||
expect(() => evaluator.evaluate(expression, data)).toThrow(
|
||||
expect(() => evaluator.evaluate(expression, data, caller)).toThrow(
|
||||
expect.objectContaining({
|
||||
name: 'ExpressionExtensionError',
|
||||
message: 'test error',
|
||||
|
|
@ -365,7 +379,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
const data = { $json: {} };
|
||||
let error: Error | undefined;
|
||||
try {
|
||||
evaluator.evaluate('{{ (() => { throw null })() }}', data);
|
||||
evaluator.evaluate('{{ (() => { throw null })() }}', data, caller);
|
||||
} catch (e) {
|
||||
error = e as Error;
|
||||
}
|
||||
|
|
@ -377,7 +391,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
const data = { $json: {} };
|
||||
let error: Error | undefined;
|
||||
try {
|
||||
evaluator.evaluate('{{ (() => { throw undefined })() }}', data);
|
||||
evaluator.evaluate('{{ (() => { throw undefined })() }}', data, caller);
|
||||
} catch (e) {
|
||||
error = e as Error;
|
||||
}
|
||||
|
|
@ -392,6 +406,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
evaluator.evaluate(
|
||||
'{{ (() => { var e = Object.create(null); e.foo = "bar"; throw e; })() }}',
|
||||
data,
|
||||
caller,
|
||||
);
|
||||
} catch (e) {
|
||||
error = e as Error;
|
||||
|
|
@ -404,7 +419,11 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
const data = { $json: {} };
|
||||
let error: Error | undefined;
|
||||
try {
|
||||
evaluator.evaluate('{{ (() => { throw { hasOwnProperty: null, foo: "bar" }; })() }}', data);
|
||||
evaluator.evaluate(
|
||||
'{{ (() => { throw { hasOwnProperty: null, foo: "bar" }; })() }}',
|
||||
data,
|
||||
caller,
|
||||
);
|
||||
} catch (e) {
|
||||
error = e as Error;
|
||||
}
|
||||
|
|
@ -417,7 +436,11 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
|
||||
// E() inside the isolate swallows TypeErrors (failed attack attempts).
|
||||
// The expression should return undefined, not throw.
|
||||
const result = evaluator.evaluate('{{ (() => { throw new TypeError("test") })() }}', data);
|
||||
const result = evaluator.evaluate(
|
||||
'{{ (() => { throw new TypeError("test") })() }}',
|
||||
data,
|
||||
caller,
|
||||
);
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
|
|
@ -429,7 +452,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
},
|
||||
};
|
||||
|
||||
expect(() => evaluator.evaluate('{{ $json.brokenProp }}', { $json: json })).toThrow(
|
||||
expect(() => evaluator.evaluate('{{ $json.brokenProp }}', { $json: json }, caller)).toThrow(
|
||||
'property access failed',
|
||||
);
|
||||
});
|
||||
|
|
@ -443,7 +466,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
},
|
||||
};
|
||||
|
||||
expect(() => evaluator.evaluate('{{ $json.myFn() }}', data)).toThrow('function threw');
|
||||
expect(() => evaluator.evaluate('{{ $json.myFn() }}', data, caller)).toThrow('function threw');
|
||||
});
|
||||
|
||||
it('should propagate errors from $items() when result properties are accessed', () => {
|
||||
|
|
@ -455,7 +478,7 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
|
||||
// Without throwIfErrorSentinel in the $items wrapper, the sentinel is
|
||||
// returned as a value and .length reads undefined on it — silently swallowed
|
||||
expect(() => evaluator.evaluate('{{ $items().length }}', data)).toThrow('items failed');
|
||||
expect(() => evaluator.evaluate('{{ $items().length }}', data, caller)).toThrow('items failed');
|
||||
});
|
||||
|
||||
it('should propagate errors thrown during array element access across the isolate boundary', () => {
|
||||
|
|
@ -470,7 +493,9 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
|
||||
const data = { $json: { items } };
|
||||
|
||||
expect(() => evaluator.evaluate('{{ $json.items[0] }}', data)).toThrow('element access failed');
|
||||
expect(() => evaluator.evaluate('{{ $json.items[0] }}', data, caller)).toThrow(
|
||||
'element access failed',
|
||||
);
|
||||
});
|
||||
|
||||
it('should propagate errors thrown during an "in" operator check across the isolate boundary', () => {
|
||||
|
|
@ -484,9 +509,9 @@ describe('Integration: ExpressionEvaluator + IsolatedVmBridge', () => {
|
|||
// The bridge calls __getValueAtPath(['$json', 'brokenProp']) which throws.
|
||||
// Without throwIfErrorSentinel in the has trap, the sentinel is returned
|
||||
// as a non-undefined value so 'brokenProp' in $json incorrectly returns true.
|
||||
expect(() => evaluator.evaluate('{{ "brokenProp" in $json }}', { $json: json })).toThrow(
|
||||
'in-check access failed',
|
||||
);
|
||||
expect(() =>
|
||||
evaluator.evaluate('{{ "brokenProp" in $json }}', { $json: json }, caller),
|
||||
).toThrow('in-check access failed');
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -513,3 +538,88 @@ describe('Integration: IsolatedVmBridge error handling', () => {
|
|||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('Integration: Concurrent execution pooling', () => {
|
||||
let evaluator: ExpressionEvaluator;
|
||||
|
||||
beforeAll(async () => {
|
||||
evaluator = new ExpressionEvaluator({
|
||||
createBridge: () => new IsolatedVmBridge({ timeout: 5000 }),
|
||||
maxCodeCacheSize: 1024,
|
||||
poolSize: 2,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await evaluator.dispose();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
await evaluator.waitForReplenishment();
|
||||
});
|
||||
|
||||
it('should hold separate bridges for separate callers', async () => {
|
||||
const caller1 = {};
|
||||
const caller2 = {};
|
||||
await evaluator.acquire(caller1);
|
||||
await evaluator.acquire(caller2);
|
||||
|
||||
const data1 = { $json: { value: 'from-ctx-1' } };
|
||||
const data2 = { $json: { value: 'from-ctx-2' } };
|
||||
|
||||
const result1 = evaluator.evaluate('{{ $json.value }}', data1, caller1);
|
||||
const result2 = evaluator.evaluate('{{ $json.value }}', data2, caller2);
|
||||
|
||||
expect(result1).toBe('from-ctx-1');
|
||||
expect(result2).toBe('from-ctx-2');
|
||||
|
||||
await evaluator.release(caller1);
|
||||
await evaluator.release(caller2);
|
||||
});
|
||||
|
||||
it('should reuse the same bridge for the same caller', async () => {
|
||||
const caller = {};
|
||||
await evaluator.acquire(caller);
|
||||
|
||||
const result1 = evaluator.evaluate('{{ $json.a }}', { $json: { a: 'first' } }, caller);
|
||||
const result2 = evaluator.evaluate('{{ $json.b }}', { $json: { b: 'second' } }, caller);
|
||||
|
||||
expect(result1).toBe('first');
|
||||
expect(result2).toBe('second');
|
||||
|
||||
await evaluator.release(caller);
|
||||
});
|
||||
|
||||
it('should replenish after acquire', async () => {
|
||||
const caller1 = {};
|
||||
await evaluator.acquire(caller1);
|
||||
|
||||
await evaluator.waitForReplenishment();
|
||||
|
||||
// Pool should have a fresh bridge available for a second caller
|
||||
const caller2 = {};
|
||||
await evaluator.acquire(caller2);
|
||||
const result = evaluator.evaluate('{{ $json.y }}', { $json: { y: 'replenished' } }, caller2);
|
||||
expect(result).toBe('replenished');
|
||||
|
||||
await evaluator.release(caller1);
|
||||
await evaluator.release(caller2);
|
||||
});
|
||||
|
||||
it('should replenish after release', async () => {
|
||||
const caller1 = {};
|
||||
await evaluator.acquire(caller1);
|
||||
await evaluator.release(caller1);
|
||||
|
||||
await evaluator.waitForReplenishment();
|
||||
|
||||
// Pool should have a fresh bridge available
|
||||
const caller2 = {};
|
||||
await evaluator.acquire(caller2);
|
||||
const result = evaluator.evaluate('{{ $json.y }}', { $json: { y: 'replenished' } }, caller2);
|
||||
expect(result).toBe('replenished');
|
||||
|
||||
await evaluator.release(caller2);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -675,6 +675,6 @@ export class IsolatedVmBridge implements RuntimeBridge {
|
|||
* @returns true if disposed, false otherwise
|
||||
*/
|
||||
isDisposed(): boolean {
|
||||
return this.disposed;
|
||||
return this.disposed || this.isolate.isDisposed;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,67 +46,104 @@ describe('ExpressionEvaluator cache', () => {
|
|||
});
|
||||
|
||||
it('should emit cache miss on first evaluation', async () => {
|
||||
const evaluator = new ExpressionEvaluator({ bridge, observability, maxCodeCacheSize: 1024 });
|
||||
const evaluator = new ExpressionEvaluator({
|
||||
createBridge: () => bridge,
|
||||
observability,
|
||||
maxCodeCacheSize: 1024,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
evaluator.evaluate('={{ $json.email }}', {});
|
||||
const caller = {};
|
||||
await evaluator.acquire(caller);
|
||||
evaluator.evaluate('={{ $json.email }}', {}, caller);
|
||||
expect(observability.metrics.counter).toHaveBeenCalledWith('expression.code_cache.miss', 1);
|
||||
});
|
||||
|
||||
it('should emit cache hit on repeated evaluation', async () => {
|
||||
const evaluator = new ExpressionEvaluator({ bridge, observability, maxCodeCacheSize: 1024 });
|
||||
const evaluator = new ExpressionEvaluator({
|
||||
createBridge: () => bridge,
|
||||
observability,
|
||||
maxCodeCacheSize: 1024,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
evaluator.evaluate('={{ $json.email }}', {});
|
||||
evaluator.evaluate('={{ $json.email }}', {});
|
||||
const caller = {};
|
||||
await evaluator.acquire(caller);
|
||||
evaluator.evaluate('={{ $json.email }}', {}, caller);
|
||||
evaluator.evaluate('={{ $json.email }}', {}, caller);
|
||||
expect(observability.metrics.counter).toHaveBeenCalledWith('expression.code_cache.hit', 1);
|
||||
});
|
||||
|
||||
it('should emit eviction when cache is full', async () => {
|
||||
const evaluator = new ExpressionEvaluator({
|
||||
bridge,
|
||||
createBridge: () => bridge,
|
||||
observability,
|
||||
maxCodeCacheSize: 2,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
evaluator.evaluate('={{ $json.a }}', {});
|
||||
evaluator.evaluate('={{ $json.b }}', {});
|
||||
evaluator.evaluate('={{ $json.c }}', {}); // evicts first
|
||||
const caller = {};
|
||||
await evaluator.acquire(caller);
|
||||
evaluator.evaluate('={{ $json.a }}', {}, caller);
|
||||
evaluator.evaluate('={{ $json.b }}', {}, caller);
|
||||
evaluator.evaluate('={{ $json.c }}', {}, caller); // evicts first
|
||||
expect(observability.metrics.counter).toHaveBeenCalledWith('expression.code_cache.eviction', 1);
|
||||
});
|
||||
|
||||
it('should work without observability', async () => {
|
||||
const evaluator = new ExpressionEvaluator({ bridge, maxCodeCacheSize: 1024 });
|
||||
const evaluator = new ExpressionEvaluator({
|
||||
createBridge: () => bridge,
|
||||
maxCodeCacheSize: 1024,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
const caller = {};
|
||||
await evaluator.acquire(caller);
|
||||
expect(() => {
|
||||
evaluator.evaluate('={{ $json.email }}', {});
|
||||
evaluator.evaluate('={{ $json.email }}', {});
|
||||
evaluator.evaluate('={{ $json.email }}', {}, caller);
|
||||
evaluator.evaluate('={{ $json.email }}', {}, caller);
|
||||
}).not.toThrow();
|
||||
});
|
||||
|
||||
it('should emit cache size gauge on cache miss', async () => {
|
||||
const evaluator = new ExpressionEvaluator({ bridge, observability, maxCodeCacheSize: 1024 });
|
||||
const evaluator = new ExpressionEvaluator({
|
||||
createBridge: () => bridge,
|
||||
observability,
|
||||
maxCodeCacheSize: 1024,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
evaluator.evaluate('={{ $json.email }}', {});
|
||||
const caller = {};
|
||||
await evaluator.acquire(caller);
|
||||
evaluator.evaluate('={{ $json.email }}', {}, caller);
|
||||
expect(observability.metrics.gauge).toHaveBeenCalledWith('expression.code_cache.size', 1);
|
||||
});
|
||||
|
||||
it('should emit cache size gauge of 0 on dispose', async () => {
|
||||
const evaluator = new ExpressionEvaluator({ bridge, observability, maxCodeCacheSize: 1024 });
|
||||
const evaluator = new ExpressionEvaluator({
|
||||
createBridge: () => bridge,
|
||||
observability,
|
||||
maxCodeCacheSize: 1024,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
evaluator.evaluate('={{ $json.email }}', {});
|
||||
const caller = {};
|
||||
await evaluator.acquire(caller);
|
||||
evaluator.evaluate('={{ $json.email }}', {}, caller);
|
||||
vi.clearAllMocks();
|
||||
await evaluator.dispose();
|
||||
expect(observability.metrics.gauge).toHaveBeenCalledWith('expression.code_cache.size', 0);
|
||||
});
|
||||
|
||||
it('should evict least recently used and report miss on re-access', async () => {
|
||||
const evaluator = new ExpressionEvaluator({ bridge, observability, maxCodeCacheSize: 2 });
|
||||
const evaluator = new ExpressionEvaluator({
|
||||
createBridge: () => bridge,
|
||||
observability,
|
||||
maxCodeCacheSize: 2,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
evaluator.evaluate('={{ $json.a }}', {});
|
||||
evaluator.evaluate('={{ $json.b }}', {});
|
||||
evaluator.evaluate('={{ $json.c }}', {});
|
||||
const caller = {};
|
||||
await evaluator.acquire(caller);
|
||||
evaluator.evaluate('={{ $json.a }}', {}, caller);
|
||||
evaluator.evaluate('={{ $json.b }}', {}, caller);
|
||||
evaluator.evaluate('={{ $json.c }}', {}, caller);
|
||||
expect(observability.metrics.counter).toHaveBeenCalledWith('expression.code_cache.eviction', 1);
|
||||
vi.clearAllMocks();
|
||||
evaluator.evaluate('={{ $json.a }}', {});
|
||||
evaluator.evaluate('={{ $json.a }}', {}, caller);
|
||||
expect(observability.metrics.counter).toHaveBeenCalledWith('expression.code_cache.miss', 1);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -4,7 +4,9 @@ import type {
|
|||
EvaluatorConfig,
|
||||
WorkflowData,
|
||||
EvaluateOptions,
|
||||
RuntimeBridge,
|
||||
} from '../types';
|
||||
import { IsolatePool, PoolDisposedError, PoolExhaustedError } from '../pool/isolate-pool';
|
||||
import { LruCache } from './lru-cache';
|
||||
|
||||
export class ExpressionEvaluator implements IExpressionEvaluator {
|
||||
|
|
@ -19,25 +21,61 @@ export class ExpressionEvaluator implements IExpressionEvaluator {
|
|||
// Cache hit rate in production: ~99.9% (same expressions repeat within a workflow)
|
||||
private codeCache: LruCache<string, string>;
|
||||
|
||||
private pool: IsolatePool;
|
||||
|
||||
private bridgesByCaller = new WeakMap<object, RuntimeBridge>();
|
||||
|
||||
private readonly createBridge: () => Promise<RuntimeBridge>;
|
||||
|
||||
constructor(config: EvaluatorConfig) {
|
||||
this.config = config;
|
||||
this.codeCache = new LruCache<string, string>(config.maxCodeCacheSize, () => {
|
||||
this.config.observability?.metrics.counter('expression.code_cache.eviction', 1);
|
||||
});
|
||||
this.createBridge = async () => {
|
||||
const bridge = config.createBridge();
|
||||
await bridge.initialize();
|
||||
return bridge;
|
||||
};
|
||||
this.pool = new IsolatePool(this.createBridge, config.poolSize ?? 1, (error) => {
|
||||
console.error('[IsolatePool] Failed to replenish bridge:', error);
|
||||
config.observability?.metrics.counter('expression.pool.replenish_failed', 1);
|
||||
});
|
||||
}
|
||||
|
||||
async initialize(): Promise<void> {
|
||||
await this.config.bridge.initialize();
|
||||
await this.pool.initialize();
|
||||
}
|
||||
|
||||
evaluate(expression: string, data: WorkflowData, options?: EvaluateOptions): unknown {
|
||||
async acquire(caller: object): Promise<void> {
|
||||
if (this.bridgesByCaller.has(caller)) return;
|
||||
let bridge: RuntimeBridge;
|
||||
try {
|
||||
bridge = this.pool.acquire();
|
||||
} catch (error) {
|
||||
if (error instanceof PoolDisposedError) throw error;
|
||||
if (!(error instanceof PoolExhaustedError)) throw error;
|
||||
bridge = await this.createBridge();
|
||||
}
|
||||
this.config.observability?.metrics.counter('expression.pool.acquired', 1);
|
||||
this.bridgesByCaller.set(caller, bridge);
|
||||
}
|
||||
|
||||
evaluate(
|
||||
expression: string,
|
||||
data: WorkflowData,
|
||||
caller: object,
|
||||
options?: EvaluateOptions,
|
||||
): unknown {
|
||||
if (this.disposed) throw new Error('Evaluator disposed');
|
||||
|
||||
const bridge = this.getBridge(caller);
|
||||
|
||||
// Transform template expression → sanitized JavaScript (cached)
|
||||
const transformedCode = this.getTransformedCode(expression);
|
||||
|
||||
try {
|
||||
const result = this.config.bridge.execute(transformedCode, data, {
|
||||
const result = bridge.execute(transformedCode, data, {
|
||||
timezone: options?.timezone,
|
||||
});
|
||||
|
||||
|
|
@ -54,6 +92,32 @@ export class ExpressionEvaluator implements IExpressionEvaluator {
|
|||
}
|
||||
}
|
||||
|
||||
private getBridge(caller: object): RuntimeBridge {
|
||||
const bridge = this.bridgesByCaller.get(caller);
|
||||
if (!bridge) {
|
||||
throw new Error('No bridge acquired for this context. Call acquire() first.');
|
||||
}
|
||||
|
||||
// If the isolate died mid-execution (e.g. OOM), all remaining expressions
|
||||
// in this execution are expected to fail. Recovery is per-execution, not per-expression.
|
||||
if (bridge.isDisposed()) {
|
||||
throw new Error('Isolate for this caller is no longer available');
|
||||
}
|
||||
|
||||
return bridge;
|
||||
}
|
||||
|
||||
async release(caller: object): Promise<void> {
|
||||
const bridge = this.bridgesByCaller.get(caller);
|
||||
if (!bridge) return;
|
||||
this.bridgesByCaller.delete(caller);
|
||||
await this.pool.release(bridge);
|
||||
}
|
||||
|
||||
async waitForReplenishment(): Promise<void> {
|
||||
await this.pool.waitForReplenishment();
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform a template expression to executable JavaScript via tournament.
|
||||
*
|
||||
|
|
@ -94,7 +158,7 @@ export class ExpressionEvaluator implements IExpressionEvaluator {
|
|||
this.disposed = true;
|
||||
this.codeCache.clear();
|
||||
this.config.observability?.metrics.gauge('expression.code_cache.size', 0);
|
||||
await this.config.bridge.dispose();
|
||||
await this.pool.dispose();
|
||||
}
|
||||
|
||||
isDisposed(): boolean {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,123 @@
|
|||
import { describe, it, expect, vi } from 'vitest';
|
||||
import type { RuntimeBridge } from '../../types';
|
||||
import { IsolatePool, PoolDisposedError, PoolExhaustedError } from '../isolate-pool';
|
||||
|
||||
function createMockBridge(): RuntimeBridge {
|
||||
return {
|
||||
initialize: vi.fn().mockResolvedValue(undefined),
|
||||
execute: vi.fn().mockReturnValue('result'),
|
||||
dispose: vi.fn().mockResolvedValue(undefined),
|
||||
isDisposed: vi.fn().mockReturnValue(false),
|
||||
};
|
||||
}
|
||||
|
||||
function createFactory() {
|
||||
return vi.fn().mockImplementation(async () => createMockBridge());
|
||||
}
|
||||
|
||||
describe('IsolatePool', () => {
|
||||
it('should initialize with the configured number of bridges', async () => {
|
||||
const factory = createFactory();
|
||||
const pool = new IsolatePool(factory, 3);
|
||||
await pool.initialize();
|
||||
expect(factory).toHaveBeenCalledTimes(3);
|
||||
await pool.dispose();
|
||||
});
|
||||
|
||||
it('should acquire a bridge synchronously', async () => {
|
||||
const factory = createFactory();
|
||||
const pool = new IsolatePool(factory, 2);
|
||||
await pool.initialize();
|
||||
const bridge = pool.acquire();
|
||||
expect(bridge).toBeDefined();
|
||||
expect(bridge.execute).toBeDefined();
|
||||
await pool.dispose();
|
||||
});
|
||||
|
||||
it('should throw when pool is exhausted', async () => {
|
||||
const factory = createFactory();
|
||||
const pool = new IsolatePool(factory, 1);
|
||||
await pool.initialize();
|
||||
pool.acquire();
|
||||
expect(() => pool.acquire()).toThrow(PoolExhaustedError);
|
||||
await pool.dispose();
|
||||
});
|
||||
|
||||
it('should dispose and replace bridge on release', async () => {
|
||||
const factory = createFactory();
|
||||
const pool = new IsolatePool(factory, 1);
|
||||
await pool.initialize();
|
||||
|
||||
const bridge = pool.acquire();
|
||||
await pool.release(bridge);
|
||||
expect(bridge.dispose).toHaveBeenCalled();
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
const newBridge = pool.acquire();
|
||||
expect(newBridge).toBeDefined();
|
||||
expect(newBridge).not.toBe(bridge);
|
||||
await pool.dispose();
|
||||
});
|
||||
|
||||
it('should dispose all bridges on pool disposal', async () => {
|
||||
const factory = createFactory();
|
||||
const pool = new IsolatePool(factory, 3);
|
||||
await pool.initialize();
|
||||
await pool.dispose();
|
||||
const bridges = await Promise.all(
|
||||
factory.mock.results.map((r) => r.value as Promise<RuntimeBridge>),
|
||||
);
|
||||
expect(factory).toHaveBeenCalledTimes(3);
|
||||
for (const bridge of bridges) {
|
||||
expect(bridge.dispose).toHaveBeenCalled();
|
||||
}
|
||||
});
|
||||
|
||||
it('should throw on acquire after disposal', async () => {
|
||||
const factory = createFactory();
|
||||
const pool = new IsolatePool(factory, 1);
|
||||
await pool.initialize();
|
||||
await pool.dispose();
|
||||
expect(() => pool.acquire()).toThrow(PoolDisposedError);
|
||||
});
|
||||
|
||||
it('should handle partial failure during initialization', async () => {
|
||||
let callCount = 0;
|
||||
const factory = vi.fn().mockImplementation(async () => {
|
||||
callCount++;
|
||||
if (callCount === 2) throw new Error('Failed to create bridge');
|
||||
return createMockBridge();
|
||||
});
|
||||
|
||||
const pool = new IsolatePool(factory, 3);
|
||||
await pool.initialize();
|
||||
|
||||
// 2 of 3 succeeded
|
||||
expect(pool.acquire()).toBeDefined();
|
||||
expect(pool.acquire()).toBeDefined();
|
||||
expect(() => pool.acquire()).toThrow(PoolExhaustedError);
|
||||
await pool.dispose();
|
||||
});
|
||||
|
||||
it('should throw if all bridges fail during initialization', async () => {
|
||||
const factory = vi.fn().mockRejectedValue(new Error('fail'));
|
||||
const pool = new IsolatePool(factory, 3);
|
||||
await expect(pool.initialize()).rejects.toThrow('IsolatePool failed to create any bridges');
|
||||
});
|
||||
|
||||
it('should kick off replenishment after acquire', async () => {
|
||||
const factory = createFactory();
|
||||
const pool = new IsolatePool(factory, 1);
|
||||
await pool.initialize();
|
||||
|
||||
pool.acquire();
|
||||
|
||||
// Wait for async replenishment
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
const replenished = pool.acquire();
|
||||
expect(replenished).toBeDefined();
|
||||
await pool.dispose();
|
||||
});
|
||||
});
|
||||
110
packages/@n8n/expression-runtime/src/pool/isolate-pool.ts
Normal file
110
packages/@n8n/expression-runtime/src/pool/isolate-pool.ts
Normal file
|
|
@ -0,0 +1,110 @@
|
|||
import type { RuntimeBridge } from '../types';
|
||||
|
||||
export class PoolDisposedError extends Error {
|
||||
constructor() {
|
||||
super('Pool is disposed');
|
||||
this.name = 'PoolDisposedError';
|
||||
}
|
||||
}
|
||||
|
||||
export class PoolExhaustedError extends Error {
|
||||
constructor() {
|
||||
super('No isolate bridge available in pool');
|
||||
this.name = 'PoolExhaustedError';
|
||||
}
|
||||
}
|
||||
|
||||
export class IsolatePool {
|
||||
private bridges: RuntimeBridge[] = [];
|
||||
private disposed = false;
|
||||
/** Number of bridges currently being created. */
|
||||
private warming = 0;
|
||||
/** In-flight isolate replenishment promises. */
|
||||
private replenishPromises = new Set<Promise<void>>();
|
||||
|
||||
constructor(
|
||||
private readonly createBridge: () => Promise<RuntimeBridge>,
|
||||
private readonly size: number,
|
||||
private readonly onReplenishFailed?: (error: unknown) => void,
|
||||
) {}
|
||||
|
||||
async initialize() {
|
||||
const results = await Promise.allSettled(
|
||||
Array.from({ length: this.size }, () => this.createBridge()),
|
||||
);
|
||||
|
||||
for (const result of results) {
|
||||
if (result.status === 'fulfilled') {
|
||||
this.bridges.push(result.value);
|
||||
} else {
|
||||
console.error('[IsolatePool] Failed to create bridge during init:', result.reason);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.bridges.length === 0) {
|
||||
throw new Error('IsolatePool failed to create any bridges');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Pops a warm bridge from the pool. Kickstarts replenishment.
|
||||
* Throws if disposed or pool is empty. Callers are expected to handle the empty case by falling back to cold-start bridge creation.
|
||||
*/
|
||||
acquire(): RuntimeBridge {
|
||||
if (this.disposed) throw new PoolDisposedError();
|
||||
const bridge = this.bridges.shift();
|
||||
if (!bridge) throw new PoolExhaustedError();
|
||||
void this.replenish();
|
||||
return bridge;
|
||||
}
|
||||
|
||||
async release(bridge: RuntimeBridge) {
|
||||
if (!bridge.isDisposed()) await bridge.dispose();
|
||||
this.replenish();
|
||||
}
|
||||
|
||||
async dispose(): Promise<void> {
|
||||
this.disposed = true;
|
||||
await Promise.all([...this.replenishPromises]);
|
||||
await Promise.all(this.bridges.map((b) => b.dispose()));
|
||||
this.bridges = [];
|
||||
}
|
||||
|
||||
private static readonly MAX_REPLENISH_RETRIES = 3;
|
||||
private static readonly REPLENISH_RETRY_BASE_MS = 500;
|
||||
|
||||
async waitForReplenishment(): Promise<void> {
|
||||
if (this.replenishPromises.size > 0) {
|
||||
await Promise.all([...this.replenishPromises]);
|
||||
}
|
||||
}
|
||||
|
||||
private replenish(attempt = 0) {
|
||||
if (this.disposed) return;
|
||||
if (this.bridges.length + this.warming >= this.size) return;
|
||||
|
||||
this.warming++;
|
||||
let promise: Promise<void>;
|
||||
promise = this.createBridge()
|
||||
.then((bridge) => {
|
||||
this.warming--;
|
||||
this.replenishPromises.delete(promise);
|
||||
if (this.disposed) {
|
||||
void bridge.dispose();
|
||||
return;
|
||||
}
|
||||
this.bridges.push(bridge);
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
this.warming--;
|
||||
this.replenishPromises.delete(promise);
|
||||
this.onReplenishFailed?.(error);
|
||||
|
||||
if (attempt < IsolatePool.MAX_REPLENISH_RETRIES) {
|
||||
const delay = IsolatePool.REPLENISH_RETRY_BASE_MS * 2 ** attempt;
|
||||
setTimeout(() => this.replenish(attempt + 1), delay).unref();
|
||||
}
|
||||
});
|
||||
this.replenishPromises.add(promise);
|
||||
}
|
||||
}
|
||||
|
|
@ -14,10 +14,8 @@ import type { RuntimeBridge } from './bridge';
|
|||
* will be added in later slices.
|
||||
*/
|
||||
export interface EvaluatorConfig {
|
||||
/**
|
||||
* Runtime bridge implementation.
|
||||
*/
|
||||
bridge: RuntimeBridge;
|
||||
/** Factory function to create a bridge instance. */
|
||||
createBridge: () => RuntimeBridge;
|
||||
|
||||
/**
|
||||
* Observability provider for metrics, traces, and logs.
|
||||
|
|
@ -35,6 +33,13 @@ export interface EvaluatorConfig {
|
|||
* Maximum number of tournament-transformed expressions to cache (LRU).
|
||||
*/
|
||||
maxCodeCacheSize: number;
|
||||
|
||||
/**
|
||||
* Number of bridges to pre-warm in the pool. Defaults to 1 if not provided.
|
||||
* Can be set to the execution concurrency limit (N8N_EXPRESSION_ENGINE_POOL_SIZE)
|
||||
* to give each concurrent execution a pre-warmed bridge.
|
||||
*/
|
||||
poolSize?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -54,13 +59,28 @@ export interface IExpressionEvaluator {
|
|||
*
|
||||
* @param expression - Expression string (e.g., "{{ $json.email }}")
|
||||
* @param data - Workflow data context
|
||||
* @param options - Evaluation options
|
||||
* @param caller - Owner object that acquired the bridge (same object passed to acquire())
|
||||
* @param options - Optional evaluation options (e.g. timezone)
|
||||
* @returns Result of the expression
|
||||
*
|
||||
* Note: Synchronous for Slice 1 (Node.js vm module).
|
||||
* Will be async for Slice 2 (isolated-vm).
|
||||
*/
|
||||
evaluate(expression: string, data: WorkflowData, options?: EvaluateOptions): unknown;
|
||||
evaluate(
|
||||
expression: string,
|
||||
data: WorkflowData,
|
||||
caller: object,
|
||||
options?: EvaluateOptions,
|
||||
): unknown;
|
||||
|
||||
/**
|
||||
* Acquire a bridge for an owner object (e.g. an Expression instance).
|
||||
* Must be called before evaluate(). The same object must be passed as
|
||||
* the caller argument to evaluate().
|
||||
*/
|
||||
acquire(owner: object): Promise<void>;
|
||||
|
||||
/**
|
||||
* Release the bridge held for an owner object.
|
||||
*/
|
||||
release(owner: object): Promise<void>;
|
||||
|
||||
/**
|
||||
* Dispose of the evaluator and free resources.
|
||||
|
|
@ -87,12 +107,6 @@ export type WorkflowData = Record<string, unknown>;
|
|||
* Note: Slice 1 is minimal. Tournament options will be added later.
|
||||
*/
|
||||
export interface EvaluateOptions {
|
||||
/**
|
||||
* Custom timeout for this evaluation (in milliseconds).
|
||||
* Overrides the bridge's default timeout.
|
||||
*/
|
||||
timeout?: number;
|
||||
|
||||
/**
|
||||
* IANA timezone for this evaluation (e.g., 'America/New_York').
|
||||
* Sets luxon Settings.defaultZone inside the isolate before execution.
|
||||
|
|
|
|||
|
|
@ -776,6 +776,11 @@ export class ActiveWorkflowManager {
|
|||
|
||||
/**
|
||||
* Count all triggers in the workflow, excluding Manual Trigger and other n8n-internal triggers.
|
||||
*
|
||||
* TODO: This method calls getWorkflowWebhooks, which evaluates webhook description expressions
|
||||
* (path, httpMethod, etc.) that may reference user-authored expressions via $parameter. It
|
||||
* should acquire an isolate before calling getWorkflowWebhooks, but countTriggers is sync.
|
||||
* addWebhooks and removeWorkflow are async and can be fixed straightforwardly.
|
||||
*/
|
||||
private countTriggers(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData) {
|
||||
const triggerFilter = (nodeType: INodeType) =>
|
||||
|
|
|
|||
|
|
@ -136,6 +136,7 @@ describe('Start - AuthRolesService initialization', () => {
|
|||
},
|
||||
cache: { backend: 'memory' },
|
||||
taskRunners: {},
|
||||
expressionEngine: { engine: 'legacy', poolSize: 1, maxCodeCacheSize: 1024 },
|
||||
};
|
||||
// @ts-expect-error - Accessing protected method for testing
|
||||
start.initCrashJournal = jest.fn().mockResolvedValue(undefined);
|
||||
|
|
@ -186,6 +187,7 @@ describe('Start - AuthRolesService initialization', () => {
|
|||
},
|
||||
cache: { backend: 'memory' },
|
||||
taskRunners: {},
|
||||
expressionEngine: { engine: 'legacy', poolSize: 1, maxCodeCacheSize: 1024 },
|
||||
};
|
||||
|
||||
await start.init();
|
||||
|
|
@ -219,6 +221,7 @@ describe('Start - AuthRolesService initialization', () => {
|
|||
},
|
||||
cache: { backend: 'memory' },
|
||||
taskRunners: {},
|
||||
expressionEngine: { engine: 'legacy', poolSize: 1, maxCodeCacheSize: 1024 },
|
||||
};
|
||||
|
||||
await start.init();
|
||||
|
|
@ -243,6 +246,7 @@ describe('Start - AuthRolesService initialization', () => {
|
|||
},
|
||||
cache: { backend: 'memory' },
|
||||
taskRunners: {},
|
||||
expressionEngine: { engine: 'legacy' as const, poolSize: 1, maxCodeCacheSize: 1024 },
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import {
|
|||
ExecutionContextHookRegistry,
|
||||
} from 'n8n-core';
|
||||
import { ObjectStoreConfig } from 'n8n-core/dist/binary-data/object-store/object-store.config';
|
||||
import { ensureError, sleep, UnexpectedError } from 'n8n-workflow';
|
||||
import { ensureError, Expression, sleep, UnexpectedError } from 'n8n-workflow';
|
||||
|
||||
import type { AbstractServer } from '@/abstract-server';
|
||||
import { N8N_VERSION, N8N_RELEASE_DATE } from '@/constants';
|
||||
|
|
@ -174,6 +174,9 @@ export abstract class BaseCommand<F = never> {
|
|||
await Container.get(PostHogClient).init();
|
||||
await Container.get(TelemetryEventRelay).init();
|
||||
Container.get(WorkflowFailureNotificationEventRelay).init();
|
||||
|
||||
const { engine, poolSize, maxCodeCacheSize } = this.globalConfig.expressionEngine;
|
||||
await Expression.initExpressionEngine({ engine, poolSize, maxCodeCacheSize });
|
||||
}
|
||||
|
||||
protected async stopProcess() {
|
||||
|
|
@ -197,7 +200,11 @@ export abstract class BaseCommand<F = never> {
|
|||
|
||||
protected async exitSuccessFully() {
|
||||
try {
|
||||
await Promise.all([CrashJournal.cleanup(), this.dbConnection.close()]);
|
||||
await Promise.all([
|
||||
CrashJournal.cleanup(),
|
||||
this.dbConnection.close(),
|
||||
Expression.disposeExpressionEngine(),
|
||||
]);
|
||||
} finally {
|
||||
process.exit();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -502,15 +502,20 @@ export class CredentialsHelper extends ICredentialsHelper {
|
|||
});
|
||||
|
||||
// Resolve expressions if any are set
|
||||
decryptedData = workflow.expression.getComplexParameterValue(
|
||||
mockNode,
|
||||
decryptedData as INodeParameters,
|
||||
mode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
decryptedData,
|
||||
) as ICredentialDataDecryptedObject;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
decryptedData = workflow.expression.getComplexParameterValue(
|
||||
mockNode,
|
||||
decryptedData as INodeParameters,
|
||||
mode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
decryptedData,
|
||||
) as ICredentialDataDecryptedObject;
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
|
||||
return decryptedData;
|
||||
|
|
|
|||
|
|
@ -37,18 +37,23 @@ export class ResolverConfigExpressionService {
|
|||
const additionalData = await getBase();
|
||||
const additionalKeys = getNonWorkflowAdditionalKeys(additionalData);
|
||||
|
||||
return workflow.expression.getComplexParameterValue(
|
||||
// Use a mock node (mandatory) to resolve expressions in the config
|
||||
{
|
||||
id: '1',
|
||||
name: 'Mock Node',
|
||||
} as INode,
|
||||
config,
|
||||
'manual',
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
config,
|
||||
) as INodeParameters;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
return workflow.expression.getComplexParameterValue(
|
||||
// Use a mock node (mandatory) to resolve expressions in the config
|
||||
{
|
||||
id: '1',
|
||||
name: 'Mock Node',
|
||||
} as INode,
|
||||
config,
|
||||
'manual',
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
config,
|
||||
) as INodeParameters;
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -348,6 +348,7 @@ export class CredentialsTester {
|
|||
|
||||
let response: INodeExecutionData[][] | null | undefined;
|
||||
try {
|
||||
await workflow.expression.acquireIsolate();
|
||||
response = await routingNode.runNode();
|
||||
} catch (error) {
|
||||
this.errorReporter.error(error);
|
||||
|
|
@ -393,6 +394,7 @@ export class CredentialsTester {
|
|||
message: error.message.toString(),
|
||||
};
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
delete mockNodesData[nodeTypeCopy.description.name];
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,15 +8,17 @@ import { Readable } from 'node:stream';
|
|||
|
||||
import { extractWebhookLastNodeResponse } from '../webhook-last-node-response-extractor';
|
||||
|
||||
import type { WebhookExecutionContext } from '@/webhooks/webhook-execution-context';
|
||||
|
||||
describe('extractWebhookLastNodeResponse', () => {
|
||||
let context: MockProxy<WebhookExecutionContext>;
|
||||
let lastNodeTaskData: MockProxy<ITaskData>;
|
||||
let binaryDataService: MockProxy<BinaryDataService>;
|
||||
|
||||
const defaultOptions = {
|
||||
responsePropertyName: undefined,
|
||||
responseContentType: undefined,
|
||||
responseBinaryPropertyName: undefined,
|
||||
} as const;
|
||||
|
||||
beforeEach(() => {
|
||||
context = mock<WebhookExecutionContext>();
|
||||
lastNodeTaskData = mock<ITaskData>();
|
||||
binaryDataService = mock<BinaryDataService>();
|
||||
|
||||
|
|
@ -35,9 +37,10 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
|
|
@ -56,9 +59,10 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -71,14 +75,15 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[{ json: jsonData }]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression
|
||||
.mockReturnValueOnce('nested.value')
|
||||
.mockReturnValueOnce(undefined);
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
{
|
||||
responsePropertyName: 'nested.value',
|
||||
responseContentType: undefined,
|
||||
responseBinaryPropertyName: undefined,
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
|
|
@ -97,14 +102,15 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[{ json: jsonData }]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression
|
||||
.mockReturnValueOnce(undefined)
|
||||
.mockReturnValueOnce('application/xml');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
{
|
||||
responsePropertyName: undefined,
|
||||
responseContentType: 'application/xml',
|
||||
responseBinaryPropertyName: undefined,
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
|
|
@ -127,9 +133,10 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -147,10 +154,10 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
true, // checkAllMainOutputs = true
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
|
|
@ -169,9 +176,10 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -194,12 +202,15 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue('data');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
{
|
||||
responsePropertyName: undefined,
|
||||
responseContentType: undefined,
|
||||
responseBinaryPropertyName: 'data',
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
|
|
@ -229,12 +240,15 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue('data');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
{
|
||||
responsePropertyName: undefined,
|
||||
responseContentType: undefined,
|
||||
responseBinaryPropertyName: 'data',
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
|
|
@ -254,9 +268,10 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -273,9 +288,10 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -292,12 +308,15 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue(undefined);
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
{
|
||||
responsePropertyName: undefined,
|
||||
responseContentType: undefined,
|
||||
responseBinaryPropertyName: undefined,
|
||||
},
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -314,12 +333,15 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue(123);
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
{
|
||||
responsePropertyName: undefined,
|
||||
responseContentType: undefined,
|
||||
responseBinaryPropertyName: 123,
|
||||
},
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -336,12 +358,15 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue('nonExistentProperty');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
{
|
||||
responsePropertyName: undefined,
|
||||
responseContentType: undefined,
|
||||
responseBinaryPropertyName: 'nonExistentProperty',
|
||||
},
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -367,12 +392,15 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue('data');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
{
|
||||
responsePropertyName: undefined,
|
||||
responseContentType: undefined,
|
||||
responseBinaryPropertyName: 'data',
|
||||
},
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -396,13 +424,15 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue('data');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
true, // checkAllMainOutputs = true
|
||||
{
|
||||
responsePropertyName: undefined,
|
||||
responseContentType: undefined,
|
||||
responseBinaryPropertyName: 'data',
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
|
|
@ -421,9 +451,10 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
|
|
@ -434,7 +465,12 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
|
||||
describe('responseDataType: noData', () => {
|
||||
it('should return undefined body and contentType', async () => {
|
||||
const result = await extractWebhookLastNodeResponse(context, 'noData', lastNodeTaskData);
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
'noData',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
|
|
@ -456,7 +492,12 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[{ json: jsonData1 }, { json: jsonData2 }, { json: jsonData3 }]],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(context, 'allEntries', lastNodeTaskData);
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
'allEntries',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
|
|
@ -473,7 +514,12 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[]],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(context, 'allEntries', lastNodeTaskData);
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
'allEntries',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
|
|
@ -496,7 +542,12 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(context, 'allEntries', lastNodeTaskData);
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
'allEntries',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
|
|
@ -520,10 +571,10 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'allEntries',
|
||||
lastNodeTaskData,
|
||||
true, // checkAllMainOutputs = true
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
|
|
@ -548,7 +599,12 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(context, 'allEntries', lastNodeTaskData);
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
'allEntries',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
|
|
@ -565,7 +621,12 @@ describe('extractWebhookLastNodeResponse', () => {
|
|||
main: [[], [], []],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(context, 'allEntries', lastNodeTaskData);
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
'allEntries',
|
||||
lastNodeTaskData,
|
||||
false,
|
||||
defaultOptions,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
|
|
|
|||
|
|
@ -140,45 +140,50 @@ export class LiveWebhooks implements IWebhookManager {
|
|||
projectId: ownerProjectId,
|
||||
});
|
||||
|
||||
const webhookData = this.webhookService
|
||||
.getNodeWebhooks(workflow, workflow.getNode(webhook.node) as INode, additionalData)
|
||||
.find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
const webhookData = this.webhookService
|
||||
.getNodeWebhooks(workflow, workflow.getNode(webhook.node) as INode, additionalData)
|
||||
.find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData;
|
||||
|
||||
// Get the node which has the webhook defined to know where to start from and to
|
||||
// get additional data
|
||||
const workflowStartNode = workflow.getNode(webhookData.node);
|
||||
// Get the node which has the webhook defined to know where to start from and to
|
||||
// get additional data
|
||||
const workflowStartNode = workflow.getNode(webhookData.node);
|
||||
|
||||
if (workflowStartNode === null) {
|
||||
throw new NotFoundError('Could not find node to process webhook.');
|
||||
if (workflowStartNode === null) {
|
||||
throw new NotFoundError('Could not find node to process webhook.');
|
||||
}
|
||||
|
||||
if (!authAllowlistedNodes.has(workflowStartNode.type)) {
|
||||
sanitizeWebhookRequest(request);
|
||||
}
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
const executionMode = 'webhook';
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
activeWorkflowData, // Use activeWorkflowData instead of workflowData
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
request,
|
||||
response,
|
||||
async (error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
// Save static data if it changed
|
||||
await this.workflowStaticDataService.saveStaticData(workflow);
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
|
||||
if (!authAllowlistedNodes.has(workflowStartNode.type)) {
|
||||
sanitizeWebhookRequest(request);
|
||||
}
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
const executionMode = 'webhook';
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
activeWorkflowData, // Use activeWorkflowData instead of workflowData
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
request,
|
||||
response,
|
||||
async (error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
// Save static data if it changed
|
||||
await this.workflowStaticDataService.saveStaticData(workflow);
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private async findWebhook(path: string, httpMethod: IHttpRequestMethods) {
|
||||
|
|
|
|||
|
|
@ -128,59 +128,70 @@ export class TestWebhooks implements IWebhookManager {
|
|||
sanitizeWebhookRequest(request);
|
||||
}
|
||||
|
||||
return await new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
const executionMode = 'manual';
|
||||
const executionId = await WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhook,
|
||||
workflowEntity,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
pushRef,
|
||||
undefined, // IRunExecutionData
|
||||
undefined, // executionId
|
||||
request,
|
||||
response,
|
||||
(error: Error | null, data: IWebhookResponseCallbackData) => {
|
||||
if (error !== null) reject(error);
|
||||
else resolve(data);
|
||||
},
|
||||
destinationNode,
|
||||
);
|
||||
|
||||
// The workflow did not run as the request was probably setup related
|
||||
// or a ping so do not resolve the promise and wait for the real webhook
|
||||
// request instead.
|
||||
if (executionId === undefined) return;
|
||||
|
||||
// Inform editor-ui that webhook got received
|
||||
if (pushRef !== undefined) {
|
||||
this.push.send(
|
||||
{ type: 'testWebhookReceived', data: { workflowId: webhook?.workflowId, executionId } },
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
return await new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
const executionMode = 'manual';
|
||||
const executionId = await WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhook,
|
||||
workflowEntity,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
pushRef,
|
||||
undefined, // IRunExecutionData
|
||||
undefined, // executionId
|
||||
request,
|
||||
response,
|
||||
(error: Error | null, data: IWebhookResponseCallbackData) => {
|
||||
if (error !== null) reject(error);
|
||||
else resolve(data);
|
||||
},
|
||||
destinationNode,
|
||||
);
|
||||
|
||||
// The workflow did not run as the request was probably setup related
|
||||
// or a ping so do not resolve the promise and wait for the real webhook
|
||||
// request instead.
|
||||
if (executionId === undefined) {
|
||||
await workflow.expression.releaseIsolate();
|
||||
return;
|
||||
}
|
||||
|
||||
// Inform editor-ui that webhook got received
|
||||
if (pushRef !== undefined) {
|
||||
this.push.send(
|
||||
{
|
||||
type: 'testWebhookReceived',
|
||||
data: { workflowId: webhook?.workflowId, executionId },
|
||||
},
|
||||
pushRef,
|
||||
);
|
||||
}
|
||||
} catch {}
|
||||
|
||||
/**
|
||||
* Multi-main setup: In a manual webhook execution, the main process that
|
||||
* handles a webhook might not be the same as the main process that created
|
||||
* the webhook. If so, after the test webhook has been successfully executed,
|
||||
* the handler process commands the creator process to clear its test webhooks.
|
||||
*/
|
||||
if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) {
|
||||
void this.publisher.publishCommand({
|
||||
command: 'clear-test-webhooks',
|
||||
payload: { webhookKey: key, workflowEntity, pushRef },
|
||||
});
|
||||
return;
|
||||
}
|
||||
} catch {}
|
||||
|
||||
/**
|
||||
* Multi-main setup: In a manual webhook execution, the main process that
|
||||
* handles a webhook might not be the same as the main process that created
|
||||
* the webhook. If so, after the test webhook has been successfully executed,
|
||||
* the handler process commands the creator process to clear its test webhooks.
|
||||
*/
|
||||
if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) {
|
||||
void this.publisher.publishCommand({
|
||||
command: 'clear-test-webhooks',
|
||||
payload: { webhookKey: key, workflowEntity, pushRef },
|
||||
});
|
||||
return;
|
||||
}
|
||||
this.clearTimeout(key);
|
||||
|
||||
this.clearTimeout(key);
|
||||
|
||||
await this.deactivateWebhooks(workflow);
|
||||
});
|
||||
await this.deactivateWebhooks(workflow);
|
||||
});
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
|
||||
@OnPubSubEvent('clear-test-webhooks', { instanceType: 'main' })
|
||||
|
|
|
|||
|
|
@ -296,51 +296,56 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
workflowId: workflow.id,
|
||||
});
|
||||
const webhookData = this.webhookService
|
||||
.getNodeWebhooks(workflow, workflowStartNode, additionalData)
|
||||
.find(
|
||||
(webhook) =>
|
||||
webhook.httpMethod === req.method &&
|
||||
webhook.path === (suffix ?? '') &&
|
||||
webhook.webhookDescription.restartWebhook === true &&
|
||||
(webhook.webhookDescription.nodeType === 'form' || false) === this.includeForms,
|
||||
);
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
const webhookData = this.webhookService
|
||||
.getNodeWebhooks(workflow, workflowStartNode, additionalData)
|
||||
.find(
|
||||
(webhook) =>
|
||||
webhook.httpMethod === req.method &&
|
||||
webhook.path === (suffix ?? '') &&
|
||||
webhook.webhookDescription.restartWebhook === true &&
|
||||
(webhook.webhookDescription.nodeType === 'form' || false) === this.includeForms,
|
||||
);
|
||||
|
||||
if (webhookData === undefined) {
|
||||
// If no data got found it means that the execution can not be started via a webhook.
|
||||
// Return 404 because we do not want to give any data if the execution exists or not.
|
||||
const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`;
|
||||
if (webhookData === undefined) {
|
||||
// If no data got found it means that the execution can not be started via a webhook.
|
||||
// Return 404 because we do not want to give any data if the execution exists or not.
|
||||
const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`;
|
||||
|
||||
if (this.isSendAndWaitRequest(workflow.nodes, suffix)) {
|
||||
res.render('send-and-wait-no-action-required', { isTestWebhook: false });
|
||||
return { noWebhookResponse: true };
|
||||
if (this.isSendAndWaitRequest(workflow.nodes, suffix)) {
|
||||
res.render('send-and-wait-no-action-required', { isTestWebhook: false });
|
||||
return { noWebhookResponse: true };
|
||||
}
|
||||
|
||||
throw new NotFoundError(errorMessage);
|
||||
}
|
||||
|
||||
throw new NotFoundError(errorMessage);
|
||||
const runExecutionData = execution.data;
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
execution.mode,
|
||||
runExecutionData.pushRef,
|
||||
runExecutionData,
|
||||
execution.id,
|
||||
req,
|
||||
res,
|
||||
|
||||
(error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
|
||||
const runExecutionData = execution.data;
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
execution.mode,
|
||||
runExecutionData.pushRef,
|
||||
runExecutionData,
|
||||
execution.id,
|
||||
req,
|
||||
res,
|
||||
|
||||
(error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -460,7 +460,15 @@ export async function executeWebhook(
|
|||
additionalData.executionId = executionId;
|
||||
}
|
||||
|
||||
const { responseMode, responseCode, responseData, checkAllMainOutputs } = evaluateResponseOptions(
|
||||
const {
|
||||
responseMode,
|
||||
responseCode,
|
||||
responseData,
|
||||
checkAllMainOutputs,
|
||||
responsePropertyName,
|
||||
responseContentType,
|
||||
responseBinaryPropertyName,
|
||||
} = evaluateResponseOptions(
|
||||
workflowStartNode,
|
||||
workflow,
|
||||
req,
|
||||
|
|
@ -741,14 +749,11 @@ export async function executeWebhook(
|
|||
resumeUrl: `${additionalData.webhookWaitingBaseUrl}/${executionId}`,
|
||||
resumeFormUrl: `${additionalData.formWaitingBaseUrl}/${executionId}`,
|
||||
};
|
||||
const evaluatedResponseData = workflow.expression.getComplexParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseData,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
const evaluatedResponseData = context.evaluateComplexWebhookDescriptionExpression<string>(
|
||||
'responseData',
|
||||
undefined,
|
||||
'firstEntryJson',
|
||||
) as string | undefined;
|
||||
);
|
||||
|
||||
const responseBody = extractWebhookOnReceivedResponse(
|
||||
evaluatedResponseData,
|
||||
|
|
@ -870,10 +875,10 @@ export async function executeWebhook(
|
|||
}
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
responseData as WebhookResponseData,
|
||||
lastNodeTaskData,
|
||||
checkAllMainOutputs,
|
||||
{ responsePropertyName, responseContentType, responseBinaryPropertyName },
|
||||
);
|
||||
|
||||
if (!result.ok) {
|
||||
|
|
@ -975,7 +980,38 @@ function evaluateResponseOptions(
|
|||
// We can unify the behavior in the next major release and get rid of this flag
|
||||
const checkAllMainOutputs = workflowStartNode.type === CHAT_TRIGGER_NODE_TYPE;
|
||||
|
||||
return { responseMode, responseCode, responseData, checkAllMainOutputs };
|
||||
const responsePropertyName = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responsePropertyName,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
) as string | undefined;
|
||||
|
||||
const responseContentType = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseContentType,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
) as string | undefined;
|
||||
|
||||
const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseBinaryPropertyName,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
'data',
|
||||
);
|
||||
|
||||
return {
|
||||
responseMode,
|
||||
responseCode,
|
||||
responseData,
|
||||
checkAllMainOutputs,
|
||||
responsePropertyName,
|
||||
responseContentType,
|
||||
responseBinaryPropertyName,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -5,8 +5,6 @@ import type { INodeExecutionData, ITaskData, Result, WebhookResponseData } from
|
|||
import { BINARY_ENCODING, createResultError, createResultOk, OperationalError } from 'n8n-workflow';
|
||||
import type { Readable } from 'node:stream';
|
||||
|
||||
import type { WebhookExecutionContext } from '@/webhooks/webhook-execution-context';
|
||||
|
||||
/** Response that is not a stream */
|
||||
type StaticResponse = {
|
||||
type: 'static';
|
||||
|
|
@ -21,7 +19,7 @@ type StreamResponse = {
|
|||
};
|
||||
|
||||
/**
|
||||
+ * Extracts the response for a webhook when the response mode is set to
|
||||
* Extracts the response for a webhook when the response mode is set to
|
||||
* `lastNode`.
|
||||
* Note: We can check either all main outputs or just the first one.
|
||||
* For the backward compatibility, by default we only check the first main output.
|
||||
|
|
@ -29,20 +27,29 @@ type StreamResponse = {
|
|||
* until we find one that has data.
|
||||
*/
|
||||
export async function extractWebhookLastNodeResponse(
|
||||
context: WebhookExecutionContext,
|
||||
responseDataType: WebhookResponseData | undefined,
|
||||
lastNodeTaskData: ITaskData,
|
||||
checkAllMainOutputs: boolean = false,
|
||||
options: {
|
||||
responsePropertyName: string | undefined;
|
||||
responseContentType: string | undefined;
|
||||
responseBinaryPropertyName: string | number | boolean | unknown[] | undefined;
|
||||
},
|
||||
): Promise<Result<StaticResponse | StreamResponse, OperationalError>> {
|
||||
if (responseDataType === 'firstEntryJson') {
|
||||
return extractFirstEntryJsonFromTaskData(context, lastNodeTaskData, checkAllMainOutputs);
|
||||
return extractFirstEntryJsonFromTaskData(
|
||||
lastNodeTaskData,
|
||||
checkAllMainOutputs,
|
||||
options.responsePropertyName,
|
||||
options.responseContentType,
|
||||
);
|
||||
}
|
||||
|
||||
if (responseDataType === 'firstEntryBinary') {
|
||||
return await extractFirstEntryBinaryFromTaskData(
|
||||
context,
|
||||
lastNodeTaskData,
|
||||
checkAllMainOutputs,
|
||||
options.responseBinaryPropertyName,
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -62,9 +69,10 @@ export async function extractWebhookLastNodeResponse(
|
|||
* Extracts the JSON data of the first item of the last node
|
||||
*/
|
||||
function extractFirstEntryJsonFromTaskData(
|
||||
context: WebhookExecutionContext,
|
||||
lastNodeTaskData: ITaskData,
|
||||
checkAllMainOutputs: boolean = false,
|
||||
responsePropertyName: string | undefined,
|
||||
responseContentType: string | undefined,
|
||||
): Result<StaticResponse, OperationalError> {
|
||||
const mainOutputs = lastNodeTaskData.data?.main;
|
||||
let firstItem: INodeExecutionData | undefined;
|
||||
|
|
@ -89,18 +97,12 @@ function extractFirstEntryJsonFromTaskData(
|
|||
|
||||
let lastNodeFirstJsonItem: unknown = firstItem.json;
|
||||
|
||||
const responsePropertyName =
|
||||
context.evaluateSimpleWebhookDescriptionExpression<string>('responsePropertyName');
|
||||
|
||||
if (responsePropertyName !== undefined) {
|
||||
lastNodeFirstJsonItem = get(lastNodeFirstJsonItem, responsePropertyName);
|
||||
}
|
||||
|
||||
// User can set the content type of the response and also the headers.
|
||||
// The `responseContentType` only applies to `firstEntryJson` mode.
|
||||
const responseContentType =
|
||||
context.evaluateSimpleWebhookDescriptionExpression<string>('responseContentType');
|
||||
|
||||
return createResultOk({
|
||||
type: 'static',
|
||||
body: lastNodeFirstJsonItem,
|
||||
|
|
@ -112,9 +114,9 @@ function extractFirstEntryJsonFromTaskData(
|
|||
* Extracts the binary data of the first item of the last node
|
||||
*/
|
||||
async function extractFirstEntryBinaryFromTaskData(
|
||||
context: WebhookExecutionContext,
|
||||
lastNodeTaskData: ITaskData,
|
||||
checkAllMainOutputs: boolean = false,
|
||||
responseBinaryPropertyName: string | number | boolean | unknown[] | undefined,
|
||||
): Promise<Result<StaticResponse | StreamResponse, OperationalError>> {
|
||||
const mainOutputs = lastNodeTaskData.data?.main;
|
||||
let lastNodeFirstJsonItem: INodeExecutionData | undefined;
|
||||
|
|
@ -142,12 +144,6 @@ async function extractFirstEntryBinaryFromTaskData(
|
|||
return createResultError(new OperationalError('No binary data was found to return'));
|
||||
}
|
||||
|
||||
const responseBinaryPropertyName = context.evaluateSimpleWebhookDescriptionExpression<string>(
|
||||
'responseBinaryPropertyName',
|
||||
undefined,
|
||||
'data',
|
||||
);
|
||||
|
||||
if (responseBinaryPropertyName === undefined) {
|
||||
return createResultError(new OperationalError("No 'responseBinaryPropertyName' is set"));
|
||||
} else if (typeof responseBinaryPropertyName !== 'string') {
|
||||
|
|
|
|||
|
|
@ -1440,6 +1440,8 @@ export class WorkflowExecute {
|
|||
// eslint-disable-next-line complexity
|
||||
const returnPromise = (async () => {
|
||||
try {
|
||||
await workflow.expression.acquireIsolate();
|
||||
|
||||
// Establish the execution context
|
||||
await establishExecutionContext(
|
||||
workflow,
|
||||
|
|
@ -2310,6 +2312,13 @@ export class WorkflowExecute {
|
|||
}
|
||||
|
||||
return fullRunData;
|
||||
})
|
||||
.finally(async () => {
|
||||
try {
|
||||
await workflow.expression.releaseIsolate();
|
||||
} catch (error) {
|
||||
Container.get(ErrorReporter).error(error);
|
||||
}
|
||||
});
|
||||
|
||||
return await returnPromise.then(resolve);
|
||||
|
|
|
|||
|
|
@ -137,15 +137,19 @@ export function makeLargeData(): INodeExecutionData[] {
|
|||
|
||||
// ── Engine switching helpers ──
|
||||
|
||||
export async function useCurrentEngine(): Promise<void> {
|
||||
await Expression.disposeVmEvaluator();
|
||||
Expression.setExpressionEngine('current');
|
||||
export async function useLegacyEngine(): Promise<void> {
|
||||
await Expression.disposeExpressionEngine();
|
||||
Expression.setExpressionEngine('legacy');
|
||||
}
|
||||
|
||||
export async function useVmEngine(): Promise<void> {
|
||||
Expression.setExpressionEngine('vm');
|
||||
// Use a higher timeout for benchmarks — CodSpeed's instruction-counting
|
||||
// instrumentation adds significant wall-clock overhead that can cause the
|
||||
// default 5s timeout to fire on larger data set benchmarks (e.g. 10k items).
|
||||
await Expression.initializeVmEvaluator({ timeout: 60_000 });
|
||||
await Expression.initExpressionEngine({
|
||||
engine: 'vm',
|
||||
poolSize: 1,
|
||||
maxCodeCacheSize: 1024,
|
||||
timeout: 60_000,
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
/**
|
||||
* Shared benchmark definitions for Tier 1 pattern benchmarks.
|
||||
* Used by both patterns-current.bench.ts and patterns-vm.bench.ts.
|
||||
* Used by both patterns-legacy.bench.ts and patterns-vm.bench.ts.
|
||||
*
|
||||
* NOTE: CodSpeed ignores describe block names, so every bench() name must be
|
||||
* globally unique. Each name is prefixed with `{engine}: {group} -` to ensure
|
||||
|
|
|
|||
|
|
@ -19,15 +19,17 @@ import {
|
|||
} from 'n8n-workflow/src/expression-sandboxing';
|
||||
|
||||
// Top-level await — vitest bench doesn't support beforeAll
|
||||
const bridge = new IsolatedVmBridge({ timeout: 5000 });
|
||||
const evaluator = new ExpressionEvaluator({
|
||||
bridge,
|
||||
createBridge: () => new IsolatedVmBridge({ timeout: 5000 }),
|
||||
maxCodeCacheSize: 1024,
|
||||
hooks: {
|
||||
before: [ThisSanitizer],
|
||||
after: [PrototypeSanitizer, DollarSignValidator],
|
||||
},
|
||||
});
|
||||
await evaluator.initialize();
|
||||
const caller = {};
|
||||
await evaluator.acquire(caller);
|
||||
|
||||
const testData: Record<string, unknown> = {
|
||||
$json: { id: 123, name: 'test', email: 'test@example.com' },
|
||||
|
|
@ -37,12 +39,12 @@ const testData: Record<string, unknown> = {
|
|||
|
||||
// Script Compilation
|
||||
bench('vm micro: Script Compilation - cache hit (repeated expression)', () => {
|
||||
evaluator.evaluate('$json.id', testData);
|
||||
evaluator.evaluate('$json.id', testData, caller);
|
||||
});
|
||||
|
||||
let counter = 0;
|
||||
bench('vm micro: Script Compilation - cache miss (unique expressions)', () => {
|
||||
evaluator.evaluate(`$json.id + ${counter++}`, testData);
|
||||
evaluator.evaluate(`$json.id + ${counter++}`, testData, caller);
|
||||
});
|
||||
|
||||
// Data Complexity
|
||||
|
|
@ -55,11 +57,11 @@ const deepData: Record<string, unknown> = {
|
|||
};
|
||||
|
||||
bench('vm micro: Data Complexity - shallow access (depth 1)', () => {
|
||||
evaluator.evaluate('$json.value', shallowData);
|
||||
evaluator.evaluate('$json.value', shallowData, caller);
|
||||
});
|
||||
|
||||
bench('vm micro: Data Complexity - deep access (depth 6)', () => {
|
||||
evaluator.evaluate('$json.a.b.c.d.e.value', deepData);
|
||||
evaluator.evaluate('$json.a.b.c.d.e.value', deepData, caller);
|
||||
});
|
||||
|
||||
// Array Element Access
|
||||
|
|
@ -70,9 +72,9 @@ const arrayData: Record<string, unknown> = {
|
|||
};
|
||||
|
||||
bench('vm micro: Array Element Access - single element', () => {
|
||||
evaluator.evaluate('$json.items[0].id', arrayData);
|
||||
evaluator.evaluate('$json.items[0].id', arrayData, caller);
|
||||
});
|
||||
|
||||
bench('vm micro: Array Element Access - map 100 elements', () => {
|
||||
evaluator.evaluate('$json.items.map(i => i.id)', arrayData);
|
||||
evaluator.evaluate('$json.items.map(i => i.id)', arrayData, caller);
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
/**
|
||||
* Tier 1: Current Engine (Tournament) Pattern Benchmarks
|
||||
* Tier 1: Legacy Engine Pattern Benchmarks
|
||||
*
|
||||
* Benchmarks expression evaluation through the full Workflow.expression path
|
||||
* using the current (Tournament) engine.
|
||||
* using the legacy engine.
|
||||
*
|
||||
* Run: pnpm --filter=@n8n/performance bench
|
||||
*/
|
||||
|
|
@ -14,19 +14,19 @@ import {
|
|||
makeSmallData,
|
||||
makeMediumData,
|
||||
makeLargeData,
|
||||
useCurrentEngine,
|
||||
useLegacyEngine,
|
||||
} from './fixtures/data';
|
||||
import { definePatternBenchmarks } from './fixtures/pattern-benchmarks';
|
||||
|
||||
await useCurrentEngine();
|
||||
if (Expression.getActiveImplementation() !== 'current') {
|
||||
throw new Error(`Engine not set to 'current' — got '${Expression.getActiveImplementation()}'`);
|
||||
await useLegacyEngine();
|
||||
if (Expression.getActiveImplementation() !== 'legacy') {
|
||||
throw new Error(`Engine not set to 'legacy' — got '${Expression.getActiveImplementation()}'`);
|
||||
}
|
||||
|
||||
const workflow = createWorkflow();
|
||||
|
||||
definePatternBenchmarks(
|
||||
'current',
|
||||
'legacy',
|
||||
workflow,
|
||||
evaluate,
|
||||
makeSmallData(),
|
||||
|
|
@ -6,6 +6,7 @@
|
|||
*
|
||||
* Run: pnpm --filter=@n8n/performance bench
|
||||
*/
|
||||
import { afterAll } from 'vitest';
|
||||
import { Expression } from 'n8n-workflow';
|
||||
|
||||
import {
|
||||
|
|
@ -24,6 +25,9 @@ if (Expression.getActiveImplementation() !== 'vm') {
|
|||
}
|
||||
|
||||
const workflow = createWorkflow();
|
||||
await workflow.expression.acquireIsolate();
|
||||
|
||||
afterAll(() => workflow.expression.releaseIsolate());
|
||||
|
||||
definePatternBenchmarks(
|
||||
'vm',
|
||||
|
|
|
|||
|
|
@ -78,11 +78,28 @@ export const test = base.extend<
|
|||
? CAPABILITIES[capability]
|
||||
: capability;
|
||||
|
||||
const globalEnv: Record<string, string> = (() => {
|
||||
const raw = process.env.N8N_TEST_ENV;
|
||||
if (!raw) return {};
|
||||
try {
|
||||
return JSON.parse(raw) as Record<string, string>;
|
||||
} catch {
|
||||
console.warn('[base.ts] Failed to parse N8N_TEST_ENV');
|
||||
return {};
|
||||
}
|
||||
})();
|
||||
|
||||
const config: N8NConfig = {
|
||||
...base,
|
||||
...override,
|
||||
services: [...new Set([...(base.services ?? []), ...(override.services ?? [])])],
|
||||
env: { ...base.env, ...override.env, E2E_TESTS: 'true', N8N_RESTRICT_FILE_ACCESS_TO: '' },
|
||||
env: {
|
||||
...globalEnv,
|
||||
...base.env,
|
||||
...override.env,
|
||||
E2E_TESTS: 'true',
|
||||
N8N_RESTRICT_FILE_ACCESS_TO: '',
|
||||
},
|
||||
};
|
||||
|
||||
const container = await createN8NStack(config);
|
||||
|
|
|
|||
|
|
@ -224,18 +224,7 @@ const createSafeErrorSubclass = <T extends ErrorConstructor>(ErrorClass: T): T =
|
|||
};
|
||||
|
||||
export class Expression {
|
||||
// Feature gate for expression engine selection
|
||||
private static expressionEngine: 'current' | 'vm' = (() => {
|
||||
if (typeof process === 'undefined') return 'current';
|
||||
const env = process.env.N8N_EXPRESSION_ENGINE;
|
||||
if (env === 'vm' || env === 'current') return env;
|
||||
if (env) {
|
||||
console.warn(
|
||||
`Unknown N8N_EXPRESSION_ENGINE="${env}", falling back to "current". Valid values: current, vm`,
|
||||
);
|
||||
}
|
||||
return 'current';
|
||||
})();
|
||||
private static expressionEngine: 'legacy' | 'vm' = 'legacy';
|
||||
|
||||
private static vmEvaluator?: IExpressionEvaluator;
|
||||
|
||||
|
|
@ -254,19 +243,22 @@ export class Expression {
|
|||
* Should be called once during application startup.
|
||||
* Only available in Node.js environments (not in browser).
|
||||
*/
|
||||
static async initializeVmEvaluator(options?: { timeout?: number }): Promise<void> {
|
||||
if (this.expressionEngine !== 'vm' || IS_FRONTEND) return;
|
||||
static async initExpressionEngine(options: {
|
||||
engine: 'legacy' | 'vm';
|
||||
timeout?: number;
|
||||
poolSize: number;
|
||||
maxCodeCacheSize: number;
|
||||
}): Promise<void> {
|
||||
if (options.engine !== 'vm' || IS_FRONTEND) return;
|
||||
this.expressionEngine = options.engine;
|
||||
|
||||
if (!this.vmEvaluator) {
|
||||
// Dynamic import to avoid loading expression-runtime in browser environments
|
||||
const { ExpressionEvaluator, IsolatedVmBridge } = await import('@n8n/expression-runtime');
|
||||
const bridge = new IsolatedVmBridge({ timeout: options?.timeout ?? 5000 });
|
||||
const DEFAULT_MAX_CODE_CACHE_SIZE = 1024;
|
||||
const parsed = parseInt(process.env.N8N_EXPRESSION_ENGINE_MAX_CODE_CACHE_SIZE ?? '', 10);
|
||||
const maxCodeCacheSize = parsed || DEFAULT_MAX_CODE_CACHE_SIZE;
|
||||
this.vmEvaluator = new ExpressionEvaluator({
|
||||
bridge,
|
||||
maxCodeCacheSize,
|
||||
createBridge: () => new IsolatedVmBridge({ timeout: options.timeout ?? 5000 }),
|
||||
maxCodeCacheSize: options.maxCodeCacheSize,
|
||||
poolSize: options.poolSize,
|
||||
hooks: {
|
||||
before: [ThisSanitizer],
|
||||
after: [PrototypeSanitizer, DollarSignValidator],
|
||||
|
|
@ -276,11 +268,19 @@ export class Expression {
|
|||
}
|
||||
}
|
||||
|
||||
async acquireIsolate(): Promise<void> {
|
||||
if (Expression.vmEvaluator) await Expression.vmEvaluator.acquire(this);
|
||||
}
|
||||
|
||||
async releaseIsolate(): Promise<void> {
|
||||
if (Expression.vmEvaluator) await Expression.vmEvaluator.release(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispose the VM evaluator and release resources.
|
||||
* Should be called during application shutdown or test teardown.
|
||||
*/
|
||||
static async disposeVmEvaluator(): Promise<void> {
|
||||
static async disposeExpressionEngine(): Promise<void> {
|
||||
if (this.vmEvaluator) {
|
||||
await this.vmEvaluator.dispose();
|
||||
this.vmEvaluator = undefined;
|
||||
|
|
@ -291,9 +291,9 @@ export class Expression {
|
|||
* Get the active expression evaluation implementation.
|
||||
* Used for testing and verification.
|
||||
*/
|
||||
static getActiveImplementation(): 'current' | 'vm' {
|
||||
static getActiveImplementation(): 'legacy' | 'vm' {
|
||||
if (this.shouldUseVm()) return 'vm';
|
||||
return 'current';
|
||||
return 'legacy';
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -304,7 +304,7 @@ export class Expression {
|
|||
* another. Only use this in benchmarks and tests, never in production code.
|
||||
* In production, set `N8N_EXPRESSION_ENGINE` before process startup instead.
|
||||
*/
|
||||
static setExpressionEngine(engine: 'current' | 'vm'): void {
|
||||
static setExpressionEngine(engine: 'legacy' | 'vm'): void {
|
||||
this.expressionEngine = engine;
|
||||
}
|
||||
|
||||
|
|
@ -580,12 +580,12 @@ export class Expression {
|
|||
if (Expression.expressionEngine === 'vm' && !IS_FRONTEND) {
|
||||
if (!Expression.vmEvaluator) {
|
||||
throw new UnexpectedError(
|
||||
'N8N_EXPRESSION_ENGINE=vm is enabled but VM evaluator is not initialized. Call Expression.initializeVmEvaluator() during application startup.',
|
||||
'N8N_EXPRESSION_ENGINE=vm is enabled but VM evaluator is not initialized. Call Expression.initExpressionEngine() during application startup.',
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
const result = Expression.vmEvaluator.evaluate(expression, data, {
|
||||
const result = Expression.vmEvaluator.evaluate(expression, data, this, {
|
||||
timezone: this.timezone,
|
||||
});
|
||||
return result as string | null | (() => unknown);
|
||||
|
|
|
|||
|
|
@ -269,4 +269,12 @@ export class WorkflowExpression {
|
|||
convertObjectValueToString(value: object): string {
|
||||
return this.expression.convertObjectValueToString(value);
|
||||
}
|
||||
|
||||
async acquireIsolate(): Promise<void> {
|
||||
await this.expression.acquireIsolate();
|
||||
}
|
||||
|
||||
async releaseIsolate(): Promise<void> {
|
||||
await this.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import { afterAll, beforeAll } from 'vitest';
|
||||
import { DateTime, Duration, Interval } from 'luxon';
|
||||
|
||||
import type { IDataObject } from '../../src/interfaces';
|
||||
|
|
@ -22,6 +23,15 @@ export const workflow = new Workflow({
|
|||
});
|
||||
export const expression = workflow.expression;
|
||||
|
||||
// acquireIsolate/releaseIsolate are no-ops for the legacy engine, so these
|
||||
// hooks are safe to register unconditionally.
|
||||
beforeAll(async () => {
|
||||
await expression.acquireIsolate();
|
||||
});
|
||||
afterAll(async () => {
|
||||
await expression.releaseIsolate();
|
||||
});
|
||||
|
||||
export const evaluate = (value: string, values?: IDataObject[]) =>
|
||||
expression.getParameterValue(
|
||||
value,
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ describe('Expression VM error handling', () => {
|
|||
nodeTypes,
|
||||
});
|
||||
|
||||
let originalEngine: 'current' | 'vm';
|
||||
let originalEngine: 'legacy' | 'vm';
|
||||
let originalEvaluator: IExpressionEvaluator | undefined;
|
||||
|
||||
beforeEach(() => {
|
||||
|
|
|
|||
|
|
@ -35,6 +35,13 @@ describe('Expression', () => {
|
|||
});
|
||||
const expression = workflow.expression;
|
||||
|
||||
beforeAll(async () => {
|
||||
await expression.acquireIsolate();
|
||||
});
|
||||
afterAll(async () => {
|
||||
await expression.releaseIsolate();
|
||||
});
|
||||
|
||||
const evaluate = (value: string) =>
|
||||
expression.getParameterValue(value, null, 0, 0, 'node', [], 'manual', {});
|
||||
|
||||
|
|
@ -828,7 +835,7 @@ describe('Expression', () => {
|
|||
});
|
||||
|
||||
describe('resolveSimpleParameterValue with IWorkflowDataProxyData', () => {
|
||||
it('should evaluate expression with provided IWorkflowDataProxyData', () => {
|
||||
it('should evaluate expression with provided IWorkflowDataProxyData', async () => {
|
||||
const nodeTypes = Helpers.NodeTypes();
|
||||
const workflow = new Workflow({
|
||||
id: 'test',
|
||||
|
|
@ -865,7 +872,9 @@ describe('Expression', () => {
|
|||
// Test Expression with new API
|
||||
const timezone = workflow.settings?.timezone ?? 'UTC';
|
||||
const expression = new Expression(timezone);
|
||||
await expression.acquireIsolate();
|
||||
const result = expression.resolveSimpleParameterValue('={{ $json.value * 2 }}', data, false);
|
||||
await expression.releaseIsolate();
|
||||
|
||||
expect(result).toBe(84);
|
||||
});
|
||||
|
|
@ -916,7 +925,7 @@ describe('Expression', () => {
|
|||
});
|
||||
|
||||
describe('getParameterValue with IWorkflowDataProxyData', () => {
|
||||
it('should evaluate simple expression with provided IWorkflowDataProxyData', () => {
|
||||
it('should evaluate simple expression with provided IWorkflowDataProxyData', async () => {
|
||||
const nodeTypes = Helpers.NodeTypes();
|
||||
const workflow = new Workflow({
|
||||
id: 'test',
|
||||
|
|
@ -951,11 +960,13 @@ describe('Expression', () => {
|
|||
|
||||
const timezone = workflow.settings?.timezone ?? 'UTC';
|
||||
const expression = new Expression(timezone);
|
||||
await expression.acquireIsolate();
|
||||
const result = expression.resolveSimpleParameterValue(
|
||||
'={{ $json.text.toUpperCase() }}',
|
||||
data,
|
||||
false,
|
||||
);
|
||||
await expression.releaseIsolate();
|
||||
|
||||
expect(result).toBe('HELLO');
|
||||
});
|
||||
|
|
|
|||
|
|
@ -5,10 +5,10 @@ import { Expression } from '../src/expression';
|
|||
// and disposes it after.
|
||||
if (process.env.N8N_EXPRESSION_ENGINE === 'vm') {
|
||||
beforeAll(async () => {
|
||||
await Expression.initializeVmEvaluator();
|
||||
await Expression.initExpressionEngine({ engine: 'vm', poolSize: 1, maxCodeCacheSize: 1024 });
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await Expression.disposeVmEvaluator();
|
||||
await Expression.disposeExpressionEngine();
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,13 @@ describe('WorkflowExpression', () => {
|
|||
});
|
||||
const expression = workflow.expression;
|
||||
|
||||
beforeAll(async () => {
|
||||
await expression.acquireIsolate();
|
||||
});
|
||||
afterAll(async () => {
|
||||
await expression.releaseIsolate();
|
||||
});
|
||||
|
||||
const evaluate = (value: NodeParameterValueType) =>
|
||||
expression.getParameterValue(value, null, 0, 0, 'node', [], 'manual', {});
|
||||
|
||||
|
|
|
|||
|
|
@ -1742,7 +1742,7 @@ describe('Workflow', () => {
|
|||
const nodeTypes = Helpers.NodeTypes();
|
||||
|
||||
for (const testData of tests) {
|
||||
test(testData.description, () => {
|
||||
test(testData.description, async () => {
|
||||
process.env.N8N_BLOCK_ENV_ACCESS_IN_NODE = 'false';
|
||||
|
||||
const nodes: INode[] = [
|
||||
|
|
@ -1820,64 +1820,70 @@ describe('Workflow', () => {
|
|||
};
|
||||
|
||||
const workflow = new Workflow({ nodes, connections, active: false, nodeTypes });
|
||||
const activeNodeName = testData.input.hasOwnProperty('Node3') ? 'Node3' : 'Node2';
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
const activeNodeName = testData.input.hasOwnProperty('Node3') ? 'Node3' : 'Node2';
|
||||
|
||||
const runExecutionData = createRunExecutionData({
|
||||
resultData: {
|
||||
runData: {
|
||||
Node1: [
|
||||
{
|
||||
source: [
|
||||
{
|
||||
previousNode: 'test',
|
||||
},
|
||||
],
|
||||
startTime: 1,
|
||||
executionTime: 1,
|
||||
executionIndex: 0,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: testData.input.Node1.outputJson || testData.input.Node1.parameters,
|
||||
binary: testData.input.Node1.outputBinary,
|
||||
},
|
||||
],
|
||||
const runExecutionData = createRunExecutionData({
|
||||
resultData: {
|
||||
runData: {
|
||||
Node1: [
|
||||
{
|
||||
source: [
|
||||
{
|
||||
previousNode: 'test',
|
||||
},
|
||||
],
|
||||
startTime: 1,
|
||||
executionTime: 1,
|
||||
executionIndex: 0,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json:
|
||||
testData.input.Node1.outputJson || testData.input.Node1.parameters,
|
||||
binary: testData.input.Node1.outputBinary,
|
||||
},
|
||||
],
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
Node2: [],
|
||||
'Node 4 with spaces': [],
|
||||
],
|
||||
Node2: [],
|
||||
'Node 4 with spaces': [],
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
const itemIndex = 0;
|
||||
const runIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] =
|
||||
runExecutionData.resultData.runData.Node1[0].data!.main[0]!;
|
||||
const itemIndex = 0;
|
||||
const runIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] =
|
||||
runExecutionData.resultData.runData.Node1[0].data!.main[0]!;
|
||||
|
||||
for (const parameterName of Object.keys(testData.output)) {
|
||||
const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[
|
||||
parameterName
|
||||
];
|
||||
const result = workflow.expression.getParameterValue(
|
||||
parameterValue,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
itemIndex,
|
||||
activeNodeName,
|
||||
connectionInputData,
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
expect(result).toEqual(testData.output[parameterName]);
|
||||
for (const parameterName of Object.keys(testData.output)) {
|
||||
const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[
|
||||
parameterName
|
||||
];
|
||||
const result = workflow.expression.getParameterValue(
|
||||
parameterValue,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
itemIndex,
|
||||
activeNodeName,
|
||||
connectionInputData,
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
expect(result).toEqual(testData.output[parameterName]);
|
||||
}
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
test('should also resolve all child parameters when the parent get requested', () => {
|
||||
test('should also resolve all child parameters when the parent get requested', async () => {
|
||||
const nodes: INode[] = [
|
||||
{
|
||||
name: 'Node1',
|
||||
|
|
@ -1904,6 +1910,7 @@ describe('Workflow', () => {
|
|||
const connections: IConnections = {};
|
||||
|
||||
const workflow = new Workflow({ nodes, connections, active: false, nodeTypes });
|
||||
await workflow.expression.acquireIsolate();
|
||||
const activeNodeName = 'Node1';
|
||||
|
||||
const runExecutionData = createRunExecutionData({
|
||||
|
|
@ -1962,6 +1969,7 @@ describe('Workflow', () => {
|
|||
},
|
||||
],
|
||||
});
|
||||
await workflow.expression.releaseIsolate();
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ export default defineConfig({
|
|||
{
|
||||
test: {
|
||||
...sharedTestConfig,
|
||||
name: 'current-engine',
|
||||
name: 'legacy-engine',
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in a new issue