mirror of
https://github.com/n8n-io/n8n
synced 2026-04-21 15:47:20 +00:00
feat(core): Workflow level otel (#27528)
This commit is contained in:
parent
e79d8af1bb
commit
42843d6c72
19 changed files with 1565 additions and 281 deletions
|
|
@ -38,6 +38,7 @@ describe('eligibleModules', () => {
|
|||
'workflow-builder',
|
||||
'redaction',
|
||||
'instance-registry',
|
||||
'otel',
|
||||
]);
|
||||
});
|
||||
|
||||
|
|
@ -62,6 +63,7 @@ describe('eligibleModules', () => {
|
|||
'workflow-builder',
|
||||
'redaction',
|
||||
'instance-registry',
|
||||
'otel',
|
||||
]);
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ export class ModuleRegistry {
|
|||
'workflow-builder',
|
||||
'redaction',
|
||||
'instance-registry',
|
||||
'otel',
|
||||
];
|
||||
|
||||
private readonly activeModules: string[] = [];
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ export const MODULE_NAMES = [
|
|||
'workflow-builder',
|
||||
'redaction',
|
||||
'instance-registry',
|
||||
'otel',
|
||||
] as const;
|
||||
|
||||
export type ModuleName = (typeof MODULE_NAMES)[number];
|
||||
|
|
|
|||
|
|
@ -59,7 +59,6 @@
|
|||
],
|
||||
"devDependencies": {
|
||||
"@n8n/backend-test-utils": "workspace:*",
|
||||
"n8n-containers": "workspace:*",
|
||||
"@n8n/typescript-config": "workspace:*",
|
||||
"@redocly/cli": "^1.28.5",
|
||||
"@types/aws4": "^1.5.1",
|
||||
|
|
@ -90,16 +89,18 @@
|
|||
"concurrently": "^8.2.0",
|
||||
"ioredis-mock": "^8.8.1",
|
||||
"mjml": "^4.15.3",
|
||||
"n8n-containers": "workspace:*",
|
||||
"openapi-types": "^12.1.3",
|
||||
"ts-essentials": "^7.0.3"
|
||||
},
|
||||
"dependencies": {
|
||||
"@1password/connect": "1.4.2",
|
||||
"@apidevtools/json-schema-ref-parser": "12.0.2",
|
||||
"@aws-sdk/client-secrets-manager": "3.808.0",
|
||||
"@azure/identity": "catalog:",
|
||||
"@azure/keyvault-secrets": "4.8.0",
|
||||
"@1password/connect": "1.4.2",
|
||||
"@apidevtools/json-schema-ref-parser": "12.0.2",
|
||||
"@google-cloud/secret-manager": "5.6.0",
|
||||
"@modelcontextprotocol/sdk": "1.26.0",
|
||||
"@n8n/ai-utilities": "workspace:*",
|
||||
"@n8n/ai-workflow-builder": "workspace:*",
|
||||
"@n8n/api-types": "workspace:*",
|
||||
|
|
@ -112,7 +113,6 @@
|
|||
"@n8n/decorators": "workspace:*",
|
||||
"@n8n/di": "workspace:*",
|
||||
"@n8n/errors": "workspace:*",
|
||||
"@modelcontextprotocol/sdk": "1.26.0",
|
||||
"@n8n/n8n-nodes-langchain": "workspace:*",
|
||||
"@n8n/permissions": "workspace:*",
|
||||
"@n8n/syslog-client": "workspace:*",
|
||||
|
|
@ -122,6 +122,13 @@
|
|||
"@n8n/workflow-sdk": "workspace:*",
|
||||
"@n8n_io/ai-assistant-sdk": "catalog:",
|
||||
"@n8n_io/license-sdk": "2.25.0",
|
||||
"@opentelemetry/api": "^1.9.0",
|
||||
"@opentelemetry/exporter-trace-otlp-proto": "^0.213.0",
|
||||
"@opentelemetry/instrumentation": "^0.213.0",
|
||||
"@opentelemetry/resources": "^2.6.0",
|
||||
"@opentelemetry/sdk-node": "^0.213.0",
|
||||
"@opentelemetry/sdk-trace-node": "^2.6.0",
|
||||
"@opentelemetry/semantic-conventions": "^1.40.0",
|
||||
"@parcel/watcher": "^2.5.1",
|
||||
"@rudderstack/rudder-sdk-node": "3.0.0",
|
||||
"@sentry/node": "catalog:sentry",
|
||||
|
|
|
|||
137
packages/cli/src/modules/otel/README.md
Normal file
137
packages/cli/src/modules/otel/README.md
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
## Workflow level OTEL
|
||||
This module enables workflow level telemetry
|
||||
|
||||
The module should work in complete isolation - plugging into n8n to add tracing. When switched off no otel items should be loaded
|
||||
|
||||
It is based upon and an extension of the work done in the community by:
|
||||
@gabrielhmsantos - https://github.com/gabrielhmsantos/n8n-tracekit
|
||||
|
||||
### Testing
|
||||
Given OTEL often involves events triggered from elsewhere within the n8n system integration testing is preferred.
|
||||
|
||||
### Attributes
|
||||
All attributes are listed in `otel.constants.ts`
|
||||
|
||||
### Module architecture
|
||||
```mermaid
|
||||
graph TD
|
||||
subgraph Module Layer
|
||||
MOD["OtelModule
|
||||
@BackendModule"]
|
||||
end
|
||||
|
||||
subgraph Configuration
|
||||
CFG["OtelConfig
|
||||
env vars to typed config"]
|
||||
end
|
||||
|
||||
subgraph SDK Layer
|
||||
SVC["OtelService
|
||||
owns NodeSDK lifecycle"]
|
||||
SDK["OpenTelemetry NodeSDK
|
||||
exporter, sampler, resource"]
|
||||
end
|
||||
|
||||
subgraph Instrumentation Layer
|
||||
INST["N8nWorkflowInstrumentation
|
||||
@OnLifecycleEvent listeners"]
|
||||
REG["SpanRegistry
|
||||
Map of executionId to Span"]
|
||||
end
|
||||
|
||||
subgraph Handler Layer
|
||||
IFC{{"SpanHandler interface"}}
|
||||
WS["WorkflowStartHandler"]
|
||||
WE["WorkflowEndHandler"]
|
||||
end
|
||||
|
||||
subgraph n8n Core
|
||||
LC(("Lifecycle Events
|
||||
workflowExecuteBefore
|
||||
workflowExecuteAfter"))
|
||||
end
|
||||
|
||||
MOD -- "1. check enabled" --> CFG
|
||||
MOD -- "2. init SDK" --> SVC
|
||||
SVC -- "creates" --> SDK
|
||||
MOD -- "3. register listeners" --> INST
|
||||
|
||||
LC -. "fires event" .-> INST
|
||||
INST -- "dispatches to" --> IFC
|
||||
IFC -. "implemented by" .-> WS
|
||||
IFC -. "implemented by" .-> WE
|
||||
|
||||
WS -- "startSpan, store" --> REG
|
||||
WE -- "retrieve, enrich, end" --> REG
|
||||
REG -. "spans exported via" .-> SDK
|
||||
|
||||
classDef module fill:#4a9eff,color:#fff
|
||||
classDef config fill:#f5a623,color:#fff
|
||||
classDef sdk fill:#7b68ee,color:#fff
|
||||
classDef inst fill:#50c878,color:#fff
|
||||
classDef handler fill:#ff6b6b,color:#fff
|
||||
classDef core fill:#888,color:#fff
|
||||
|
||||
class MOD module
|
||||
class CFG config
|
||||
class SVC,SDK sdk
|
||||
class INST,REG inst
|
||||
class IFC,WS,WE handler
|
||||
class LC core
|
||||
```
|
||||
|
||||
#### Manual validation
|
||||
1. Create docker-compose files and start them `docker-compose up -d`
|
||||
`docker-compose.yml`
|
||||
```yaml
|
||||
services:
|
||||
jaeger:
|
||||
image: jaegertracing/jaeger:latest
|
||||
ports:
|
||||
- "16686:16686" # UI
|
||||
- "4317:4317" # OTLP gRPC
|
||||
- "4318:4318" # OTLP HTTP
|
||||
command: ["--config", "/etc/jaeger/config.yaml"]
|
||||
volumes:
|
||||
- ./jaeger-config.yaml:/etc/jaeger/config.yaml:ro
|
||||
```
|
||||
|
||||
`jaeger-config.yaml`
|
||||
```yaml
|
||||
service:
|
||||
extensions: [jaeger_storage, jaeger_query]
|
||||
pipelines:
|
||||
traces:
|
||||
receivers: [otlp]
|
||||
exporters: [jaeger_storage_exporter]
|
||||
|
||||
extensions:
|
||||
jaeger_storage:
|
||||
backends:
|
||||
memory:
|
||||
memory:
|
||||
max_traces: 1000
|
||||
jaeger_query:
|
||||
storage:
|
||||
traces: memory
|
||||
ui:
|
||||
config_file: ""
|
||||
|
||||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
grpc:
|
||||
endpoint: 0.0.0.0:4317
|
||||
http:
|
||||
endpoint: 0.0.0.0:4318
|
||||
|
||||
exporters:
|
||||
jaeger_storage_exporter:
|
||||
trace_storage: memory
|
||||
```
|
||||
|
||||
Start n8n & point it at the jaeger instance
|
||||
```
|
||||
cd packages/cli
|
||||
N8N_OTEL_ENABLED=true N8N_OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318 pnpm run dev
|
||||
```
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
import { trace } from '@opentelemetry/api';
|
||||
import {
|
||||
BasicTracerProvider,
|
||||
InMemorySpanExporter,
|
||||
SimpleSpanProcessor,
|
||||
} from '@opentelemetry/sdk-trace-node';
|
||||
|
||||
/**
|
||||
* Disposable OTel test harness. Sets up an in-memory tracer
|
||||
* that captures spans for assertion, and tears down cleanly.
|
||||
*
|
||||
* Usage:
|
||||
* const otel = OtelTestProvider.create();
|
||||
* // ... run code that creates spans ...
|
||||
* expect(otel.getFinishedSpans()).toHaveLength(1);
|
||||
* otel.reset(); // between tests
|
||||
* await otel.shutdown(); // cleanup
|
||||
*/
|
||||
export class OtelTestProvider {
|
||||
private constructor(
|
||||
private readonly provider: BasicTracerProvider,
|
||||
private readonly exporter: InMemorySpanExporter,
|
||||
) {}
|
||||
|
||||
static create(): OtelTestProvider {
|
||||
const exporter = new InMemorySpanExporter();
|
||||
const provider = new BasicTracerProvider({
|
||||
spanProcessors: [new SimpleSpanProcessor(exporter)],
|
||||
});
|
||||
trace.setGlobalTracerProvider(provider);
|
||||
return new OtelTestProvider(provider, exporter);
|
||||
}
|
||||
|
||||
getFinishedSpans() {
|
||||
return this.exporter.getFinishedSpans();
|
||||
}
|
||||
|
||||
reset() {
|
||||
this.exporter.reset();
|
||||
}
|
||||
|
||||
async shutdown() {
|
||||
this.exporter.reset();
|
||||
await this.provider.shutdown();
|
||||
trace.disable();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,208 @@
|
|||
import { ModuleRegistry } from '@n8n/backend-common';
|
||||
import { createTeamProject, createWorkflow, testDb, testModules } from '@n8n/backend-test-utils';
|
||||
import type { Project, WorkflowEntity } from '@n8n/db';
|
||||
import { ExecutionRepository } from '@n8n/db';
|
||||
import { Container } from '@n8n/di';
|
||||
import { InstanceSettings } from 'n8n-core';
|
||||
import { DebugHelper } from 'n8n-nodes-base/nodes/DebugHelper/DebugHelper.node';
|
||||
import { ManualTrigger } from 'n8n-nodes-base/nodes/ManualTrigger/ManualTrigger.node';
|
||||
import { createRunExecutionData } from 'n8n-workflow';
|
||||
import { SpanStatusCode } from '@opentelemetry/api';
|
||||
|
||||
import { WorkflowRunner } from '@/workflow-runner';
|
||||
import { ATTR } from '@/modules/otel/otel.constants';
|
||||
|
||||
import * as utils from '@test-integration/utils';
|
||||
import {
|
||||
createSimpleWorkflowFixture,
|
||||
createFailingWorkflowFixture,
|
||||
} from '@test-integration/workflow-fixtures';
|
||||
import { OtelTestProvider } from './otel-test-provider';
|
||||
|
||||
let otel: OtelTestProvider;
|
||||
let workflowRunner: WorkflowRunner;
|
||||
let executionRepository: ExecutionRepository;
|
||||
let project: Project;
|
||||
|
||||
beforeAll(async () => {
|
||||
otel = OtelTestProvider.create();
|
||||
|
||||
process.env.N8N_OTEL_ENABLED = 'true';
|
||||
|
||||
await testModules.loadModules(['otel']);
|
||||
await testDb.init();
|
||||
await Container.get(ModuleRegistry).initModules('main');
|
||||
await utils.initNodeTypes({
|
||||
'n8n-nodes-base.manualTrigger': { type: new ManualTrigger(), sourcePath: '' },
|
||||
'n8n-nodes-base.debugHelper': { type: new DebugHelper(), sourcePath: '' },
|
||||
});
|
||||
await utils.initBinaryDataService();
|
||||
|
||||
Container.get(InstanceSettings).markAsLeader();
|
||||
|
||||
workflowRunner = Container.get(WorkflowRunner);
|
||||
executionRepository = Container.get(ExecutionRepository);
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
otel.reset();
|
||||
await testDb.truncate(['ExecutionEntity', 'WorkflowEntity', 'Project']);
|
||||
project = await createTeamProject('OTel Test Project');
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
delete process.env.N8N_OTEL_ENABLED;
|
||||
await otel.shutdown();
|
||||
await testDb.terminate();
|
||||
});
|
||||
|
||||
async function executeWorkflow(
|
||||
workflow: WorkflowEntity,
|
||||
mode: 'webhook' | 'trigger' | 'manual' = 'webhook',
|
||||
): Promise<string> {
|
||||
const executionData = createRunExecutionData({});
|
||||
return await workflowRunner.run(
|
||||
{
|
||||
workflowData: workflow,
|
||||
userId: project.id,
|
||||
executionMode: mode,
|
||||
executionData,
|
||||
},
|
||||
true,
|
||||
);
|
||||
}
|
||||
|
||||
async function waitForExecution(executionId: string, timeout = 10_000): Promise<void> {
|
||||
const start = Date.now();
|
||||
while (Date.now() - start < timeout) {
|
||||
const execution = await executionRepository.findOneBy({ id: executionId });
|
||||
if (execution?.stoppedAt) return;
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
}
|
||||
throw new Error(`Execution ${executionId} did not complete within ${timeout}ms`);
|
||||
}
|
||||
|
||||
describe('Workflow tracing', () => {
|
||||
it('should produce a workflow.execute span for a successful execution', async () => {
|
||||
const workflow = await createWorkflow(
|
||||
{ name: 'Success Workflow', ...createSimpleWorkflowFixture() },
|
||||
project,
|
||||
);
|
||||
|
||||
const executionId = await executeWorkflow(workflow, 'webhook');
|
||||
await waitForExecution(executionId);
|
||||
|
||||
const spans = otel.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
expect(spans[0].name).toBe('workflow.execute');
|
||||
expect(spans[0].status.code).not.toBe(SpanStatusCode.ERROR);
|
||||
expect(spans[0].attributes).toMatchObject({
|
||||
[ATTR.WORKFLOW_ID]: workflow.id,
|
||||
[ATTR.WORKFLOW_NAME]: 'Success Workflow',
|
||||
[ATTR.WORKFLOW_NODE_COUNT]: workflow.nodes.length,
|
||||
[ATTR.EXECUTION_ID]: executionId,
|
||||
[ATTR.EXECUTION_MODE]: 'webhook',
|
||||
[ATTR.EXECUTION_STATUS]: 'success',
|
||||
[ATTR.EXECUTION_IS_RETRY]: false,
|
||||
});
|
||||
});
|
||||
|
||||
it('should set execution mode to manual', async () => {
|
||||
const workflow = await createWorkflow(
|
||||
{ name: 'Manual Workflow', ...createSimpleWorkflowFixture() },
|
||||
project,
|
||||
);
|
||||
|
||||
const executionId = await executeWorkflow(workflow, 'manual');
|
||||
await waitForExecution(executionId);
|
||||
|
||||
const spans = otel.getFinishedSpans();
|
||||
expect(spans[0].attributes[ATTR.EXECUTION_MODE]).toBe('manual');
|
||||
});
|
||||
|
||||
it('should set execution mode to trigger', async () => {
|
||||
const workflow = await createWorkflow(
|
||||
{ name: 'Trigger Workflow', ...createSimpleWorkflowFixture() },
|
||||
project,
|
||||
);
|
||||
|
||||
const executionId = await executeWorkflow(workflow, 'trigger');
|
||||
await waitForExecution(executionId);
|
||||
|
||||
const spans = otel.getFinishedSpans();
|
||||
expect(spans[0].attributes[ATTR.EXECUTION_MODE]).toBe('trigger');
|
||||
});
|
||||
|
||||
it('should set span status to ERROR when a node error occurs', async () => {
|
||||
const fixture = createFailingWorkflowFixture();
|
||||
const workflow = await createWorkflow({ name: 'Failing Workflow', ...fixture }, project);
|
||||
|
||||
const triggerNode = workflow.nodes.find((n) => n.type === 'n8n-nodes-base.manualTrigger')!;
|
||||
const executionData = createRunExecutionData({
|
||||
executionData: {
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
node: triggerNode,
|
||||
data: { main: [[{ json: {}, pairedItem: { item: 0 } }]] },
|
||||
source: null,
|
||||
},
|
||||
],
|
||||
},
|
||||
startData: {
|
||||
startNodes: [{ name: triggerNode.name, sourceData: null }],
|
||||
},
|
||||
});
|
||||
|
||||
const executionId = await workflowRunner.run(
|
||||
{
|
||||
workflowData: workflow,
|
||||
userId: project.id,
|
||||
executionMode: 'webhook',
|
||||
executionData,
|
||||
},
|
||||
true,
|
||||
);
|
||||
await waitForExecution(executionId);
|
||||
|
||||
const spans = otel.getFinishedSpans();
|
||||
expect(spans).toHaveLength(1);
|
||||
expect(spans[0].name).toBe('workflow.execute');
|
||||
expect(spans[0].status.code).toBe(SpanStatusCode.ERROR);
|
||||
expect(spans[0].attributes).toMatchObject({
|
||||
[ATTR.WORKFLOW_ID]: workflow.id,
|
||||
[ATTR.WORKFLOW_NAME]: 'Failing Workflow',
|
||||
[ATTR.EXECUTION_ID]: executionId,
|
||||
[ATTR.EXECUTION_STATUS]: 'error',
|
||||
[ATTR.EXECUTION_IS_RETRY]: false,
|
||||
});
|
||||
});
|
||||
|
||||
it('should isolate spans across concurrent executions', async () => {
|
||||
const workflow1 = await createWorkflow(
|
||||
{ name: 'Workflow A', ...createSimpleWorkflowFixture() },
|
||||
project,
|
||||
);
|
||||
const workflow2 = await createWorkflow(
|
||||
{ name: 'Workflow B', ...createSimpleWorkflowFixture() },
|
||||
project,
|
||||
);
|
||||
|
||||
const [execId1, execId2] = await Promise.all([
|
||||
executeWorkflow(workflow1),
|
||||
executeWorkflow(workflow2),
|
||||
]);
|
||||
|
||||
await Promise.all([waitForExecution(execId1), waitForExecution(execId2)]);
|
||||
|
||||
const spans = otel.getFinishedSpans();
|
||||
expect(spans).toHaveLength(2);
|
||||
|
||||
const spanA = spans.find((s) => s.attributes[ATTR.WORKFLOW_ID] === workflow1.id);
|
||||
const spanB = spans.find((s) => s.attributes[ATTR.WORKFLOW_ID] === workflow2.id);
|
||||
|
||||
expect(spanA).toBeDefined();
|
||||
expect(spanB).toBeDefined();
|
||||
expect(spanA!.attributes[ATTR.WORKFLOW_NAME]).toBe('Workflow A');
|
||||
expect(spanB!.attributes[ATTR.WORKFLOW_NAME]).toBe('Workflow B');
|
||||
});
|
||||
});
|
||||
109
packages/cli/src/modules/otel/__tests__/span-registry.test.ts
Normal file
109
packages/cli/src/modules/otel/__tests__/span-registry.test.ts
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
import { mock } from 'jest-mock-extended';
|
||||
import type { Span } from '@opentelemetry/api';
|
||||
|
||||
import { SpanRegistry } from '../span-registry';
|
||||
|
||||
describe('SpanRegistry', () => {
|
||||
let registry: SpanRegistry;
|
||||
|
||||
beforeEach(() => {
|
||||
registry = new SpanRegistry();
|
||||
});
|
||||
|
||||
describe('workflow spans', () => {
|
||||
it('should add and retrieve a workflow span', () => {
|
||||
const span = mock<Span>();
|
||||
registry.addWorkflow('exec-1', span);
|
||||
expect(registry.getWorkflow('exec-1')).toBe(span);
|
||||
});
|
||||
|
||||
it('should return undefined for missing workflow span', () => {
|
||||
expect(registry.getWorkflow('nonexistent')).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should remove and return a workflow span', () => {
|
||||
const span = mock<Span>();
|
||||
registry.addWorkflow('exec-1', span);
|
||||
|
||||
const removed = registry.removeWorkflow('exec-1');
|
||||
expect(removed).toBe(span);
|
||||
expect(registry.getWorkflow('exec-1')).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should return undefined when removing a nonexistent workflow span', () => {
|
||||
expect(registry.removeWorkflow('nonexistent')).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('node spans', () => {
|
||||
it('should add and retrieve a node span', () => {
|
||||
const span = mock<Span>();
|
||||
registry.addNode('exec-1', 'node-a', span);
|
||||
expect(registry.getNode('exec-1', 'node-a')).toBe(span);
|
||||
});
|
||||
|
||||
it('should return undefined for missing node span', () => {
|
||||
expect(registry.getNode('exec-1', 'nonexistent')).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should remove and return a node span', () => {
|
||||
const span = mock<Span>();
|
||||
registry.addNode('exec-1', 'node-a', span);
|
||||
|
||||
const removed = registry.removeNode('exec-1', 'node-a');
|
||||
expect(removed).toBe(span);
|
||||
expect(registry.getNode('exec-1', 'node-a')).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should isolate node spans across executions', () => {
|
||||
const span1 = mock<Span>();
|
||||
const span2 = mock<Span>();
|
||||
registry.addNode('exec-1', 'node-a', span1);
|
||||
registry.addNode('exec-2', 'node-a', span2);
|
||||
|
||||
expect(registry.getNode('exec-1', 'node-a')).toBe(span1);
|
||||
expect(registry.getNode('exec-2', 'node-a')).toBe(span2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('cleanup', () => {
|
||||
it('should remove all spans for an execution', () => {
|
||||
const wfSpan = mock<Span>();
|
||||
const nodeSpan1 = mock<Span>();
|
||||
const nodeSpan2 = mock<Span>();
|
||||
|
||||
registry.addWorkflow('exec-1', wfSpan);
|
||||
registry.addNode('exec-1', 'node-a', nodeSpan1);
|
||||
registry.addNode('exec-1', 'node-b', nodeSpan2);
|
||||
|
||||
registry.cleanup('exec-1');
|
||||
|
||||
expect(registry.getWorkflow('exec-1')).toBeUndefined();
|
||||
expect(registry.getNode('exec-1', 'node-a')).toBeUndefined();
|
||||
expect(registry.getNode('exec-1', 'node-b')).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should not affect other executions', () => {
|
||||
const span1 = mock<Span>();
|
||||
const span2 = mock<Span>();
|
||||
|
||||
registry.addWorkflow('exec-1', span1);
|
||||
registry.addWorkflow('exec-2', span2);
|
||||
|
||||
registry.cleanup('exec-1');
|
||||
|
||||
expect(registry.getWorkflow('exec-1')).toBeUndefined();
|
||||
expect(registry.getWorkflow('exec-2')).toBe(span2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('static key methods', () => {
|
||||
it('should generate workflow key from executionId', () => {
|
||||
expect(SpanRegistry.workflowKey('exec-1')).toBe('exec-1');
|
||||
});
|
||||
|
||||
it('should generate node key from executionId and nodeId', () => {
|
||||
expect(SpanRegistry.nodeKey('exec-1', 'node-a')).toBe('exec-1:node-a');
|
||||
});
|
||||
});
|
||||
});
|
||||
7
packages/cli/src/modules/otel/handlers/interfaces.ts
Normal file
7
packages/cli/src/modules/otel/handlers/interfaces.ts
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
import type { Tracer } from '@opentelemetry/api';
|
||||
|
||||
import type { SpanRegistry } from '../span-registry';
|
||||
|
||||
export interface SpanHandler<T> {
|
||||
handle(ctx: T, spans: SpanRegistry, tracer: Tracer): void;
|
||||
}
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
import { SpanStatusCode } from '@opentelemetry/api';
|
||||
import type { WorkflowExecuteAfterContext } from '@n8n/decorators';
|
||||
|
||||
import { ATTR } from '../otel.constants';
|
||||
import type { SpanHandler } from './interfaces';
|
||||
import type { SpanRegistry } from '../span-registry';
|
||||
|
||||
export class WorkflowEndHandler implements SpanHandler<WorkflowExecuteAfterContext> {
|
||||
handle(ctx: WorkflowExecuteAfterContext, spans: SpanRegistry) {
|
||||
const span = spans.removeWorkflow(ctx.executionId);
|
||||
if (!span) return;
|
||||
|
||||
span.setAttributes({
|
||||
[ATTR.EXECUTION_MODE]: ctx.runData.mode,
|
||||
[ATTR.EXECUTION_STATUS]: ctx.runData.status,
|
||||
[ATTR.EXECUTION_IS_RETRY]: ctx.runData.mode === 'retry',
|
||||
});
|
||||
|
||||
if (ctx.runData.status === 'error' || ctx.runData.status === 'crashed') {
|
||||
span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
|
||||
const error = ctx.runData.data.resultData.error;
|
||||
if (error) {
|
||||
span.setAttribute(ATTR.EXECUTION_ERROR_TYPE, error.constructor.name);
|
||||
}
|
||||
} else {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
}
|
||||
|
||||
span.end();
|
||||
spans.cleanup(ctx.executionId);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
import type { WorkflowExecuteBeforeContext } from '@n8n/decorators';
|
||||
|
||||
import { ATTR } from '../otel.constants';
|
||||
import type { SpanHandler } from './interfaces';
|
||||
import type { SpanRegistry } from '../span-registry';
|
||||
import type { Tracer } from '@opentelemetry/api';
|
||||
|
||||
export class WorkflowStartHandler implements SpanHandler<WorkflowExecuteBeforeContext> {
|
||||
handle(ctx: WorkflowExecuteBeforeContext, spans: SpanRegistry, tracer: Tracer) {
|
||||
const span = tracer.startSpan('workflow.execute', {
|
||||
attributes: {
|
||||
[ATTR.WORKFLOW_ID]: ctx.workflow.id,
|
||||
[ATTR.WORKFLOW_NAME]: ctx.workflow.name,
|
||||
[ATTR.WORKFLOW_NODE_COUNT]: ctx.workflow.nodes.length,
|
||||
[ATTR.EXECUTION_ID]: ctx.executionId,
|
||||
},
|
||||
});
|
||||
|
||||
spans.addWorkflow(ctx.executionId, span);
|
||||
}
|
||||
}
|
||||
43
packages/cli/src/modules/otel/n8n-instrumentation.ts
Normal file
43
packages/cli/src/modules/otel/n8n-instrumentation.ts
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
import { OnLifecycleEvent } from '@n8n/decorators';
|
||||
import type {
|
||||
LifecycleContext,
|
||||
WorkflowExecuteBeforeContext,
|
||||
WorkflowExecuteAfterContext,
|
||||
} from '@n8n/decorators';
|
||||
import { Service } from '@n8n/di';
|
||||
import { trace } from '@opentelemetry/api';
|
||||
|
||||
import { SpanRegistry } from './span-registry';
|
||||
import type { SpanHandler } from './handlers/interfaces';
|
||||
import { WorkflowStartHandler } from './handlers/workflow-start.handler';
|
||||
import { WorkflowEndHandler } from './handlers/workflow-end.handler';
|
||||
|
||||
const TRACER_NAME = 'n8n-workflow';
|
||||
|
||||
@Service()
|
||||
export class N8nInstrumentation {
|
||||
private readonly spans = new SpanRegistry();
|
||||
|
||||
private readonly lifecycleHandlers: Record<string, SpanHandler<LifecycleContext>> = {
|
||||
workflowExecuteBefore: new WorkflowStartHandler(),
|
||||
workflowExecuteAfter: new WorkflowEndHandler(),
|
||||
};
|
||||
|
||||
@OnLifecycleEvent('workflowExecuteBefore')
|
||||
onWorkflowStart(ctx: WorkflowExecuteBeforeContext) {
|
||||
this.lifecycleHandlers.workflowExecuteBefore.handle(
|
||||
ctx,
|
||||
this.spans,
|
||||
trace.getTracer(TRACER_NAME),
|
||||
);
|
||||
}
|
||||
|
||||
@OnLifecycleEvent('workflowExecuteAfter')
|
||||
onWorkflowEnd(ctx: WorkflowExecuteAfterContext) {
|
||||
this.lifecycleHandlers.workflowExecuteAfter.handle(
|
||||
ctx,
|
||||
this.spans,
|
||||
trace.getTracer(TRACER_NAME),
|
||||
);
|
||||
}
|
||||
}
|
||||
22
packages/cli/src/modules/otel/otel.config.ts
Normal file
22
packages/cli/src/modules/otel/otel.config.ts
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
import { Config, Env } from '@n8n/config';
|
||||
|
||||
@Config
|
||||
export class OtelConfig {
|
||||
@Env('N8N_OTEL_ENABLED')
|
||||
enabled: boolean = false;
|
||||
|
||||
@Env('N8N_OTEL_EXPORTER_OTLP_ENDPOINT')
|
||||
exporterEndpoint: string = 'http://localhost:4318';
|
||||
|
||||
@Env('N8N_OTEL_EXPORTER_OTLP_TRACING_PATH')
|
||||
exporterTracingPath: string = '/v1/traces';
|
||||
|
||||
@Env('N8N_OTEL_EXPORTER_OTLP_HEADERS')
|
||||
exporterHeaders: string = '';
|
||||
|
||||
@Env('N8N_OTEL_EXPORTER_SERVICE_NAME')
|
||||
exporterServiceName: string = 'n8n';
|
||||
|
||||
@Env('N8N_OTEL_TRACES_SAMPLE_RATE')
|
||||
tracesSampleRate: number = 1.0;
|
||||
}
|
||||
19
packages/cli/src/modules/otel/otel.constants.ts
Normal file
19
packages/cli/src/modules/otel/otel.constants.ts
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from '@opentelemetry/semantic-conventions';
|
||||
|
||||
export const ATTR = {
|
||||
OTEL_SERVICE_NAME: ATTR_SERVICE_NAME,
|
||||
OTEL_SERVICE_VERSION: ATTR_SERVICE_VERSION,
|
||||
|
||||
INSTANCE_ID: 'n8n.instance.id',
|
||||
INSTANCE_ROLE: 'n8n.instance.role',
|
||||
|
||||
WORKFLOW_ID: 'n8n.workflow.id',
|
||||
WORKFLOW_NAME: 'n8n.workflow.name',
|
||||
WORKFLOW_NODE_COUNT: 'n8n.workflow.node_count',
|
||||
|
||||
EXECUTION_ID: 'n8n.execution.id',
|
||||
EXECUTION_MODE: 'n8n.execution.mode',
|
||||
EXECUTION_STATUS: 'n8n.execution.status',
|
||||
EXECUTION_IS_RETRY: 'n8n.execution.is_retry',
|
||||
EXECUTION_ERROR_TYPE: 'n8n.execution.error_type',
|
||||
} as const;
|
||||
31
packages/cli/src/modules/otel/otel.module.ts
Normal file
31
packages/cli/src/modules/otel/otel.module.ts
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
import type { ModuleInterface } from '@n8n/decorators';
|
||||
import { BackendModule, OnShutdown } from '@n8n/decorators';
|
||||
import { Container } from '@n8n/di';
|
||||
|
||||
@BackendModule({
|
||||
name: 'otel',
|
||||
instanceTypes: ['main', 'worker', 'webhook'],
|
||||
})
|
||||
export class OtelModule implements ModuleInterface {
|
||||
async init() {
|
||||
const { OtelConfig } = await import('./otel.config');
|
||||
const config = Container.get(OtelConfig);
|
||||
if (!config.enabled) return;
|
||||
|
||||
const { OtelService } = await import('./otel.service');
|
||||
Container.get(OtelService).init();
|
||||
|
||||
// Importing N8nInstrumentation triggers @OnLifecycleEvent registration
|
||||
await import('./n8n-instrumentation');
|
||||
}
|
||||
|
||||
@OnShutdown()
|
||||
async shutdown() {
|
||||
const { OtelConfig } = await import('./otel.config');
|
||||
const config = Container.get(OtelConfig);
|
||||
if (!config.enabled) return;
|
||||
|
||||
const { OtelService } = await import('./otel.service');
|
||||
await Container.get(OtelService).shutdown();
|
||||
}
|
||||
}
|
||||
74
packages/cli/src/modules/otel/otel.service.ts
Normal file
74
packages/cli/src/modules/otel/otel.service.ts
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
import { Logger } from '@n8n/backend-common';
|
||||
import { Service } from '@n8n/di';
|
||||
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';
|
||||
import { resourceFromAttributes } from '@opentelemetry/resources';
|
||||
import { NodeSDK } from '@opentelemetry/sdk-node';
|
||||
import { TraceIdRatioBasedSampler } from '@opentelemetry/sdk-trace-node';
|
||||
import { InstanceSettings } from 'n8n-core';
|
||||
|
||||
import { N8N_VERSION } from '@/constants';
|
||||
|
||||
import { OtelConfig } from './otel.config';
|
||||
import { ATTR } from './otel.constants';
|
||||
|
||||
@Service()
|
||||
export class OtelService {
|
||||
private sdk?: NodeSDK;
|
||||
|
||||
constructor(
|
||||
private readonly config: OtelConfig,
|
||||
private readonly instanceSettings: InstanceSettings,
|
||||
private readonly logger: Logger,
|
||||
) {}
|
||||
|
||||
init() {
|
||||
if (!this.config.enabled) return;
|
||||
|
||||
this.sdk = new NodeSDK({
|
||||
resource: resourceFromAttributes({
|
||||
[ATTR.OTEL_SERVICE_NAME]: this.config.exporterServiceName,
|
||||
[ATTR.OTEL_SERVICE_VERSION]: N8N_VERSION,
|
||||
[ATTR.INSTANCE_ID]: this.instanceSettings.instanceId,
|
||||
[ATTR.INSTANCE_ROLE]: this.instanceSettings.instanceType,
|
||||
}),
|
||||
traceExporter: new OTLPTraceExporter({
|
||||
url: `${this.config.exporterEndpoint}${this.config.exporterTracingPath}`,
|
||||
headers: this.config.exporterHeaders ? this.parseHeaders(this.config.exporterHeaders) : {},
|
||||
}),
|
||||
sampler: new TraceIdRatioBasedSampler(this.config.tracesSampleRate),
|
||||
});
|
||||
|
||||
this.sdk.start();
|
||||
}
|
||||
|
||||
async shutdown(): Promise<void> {
|
||||
await this.sdk?.shutdown();
|
||||
}
|
||||
|
||||
parseHeaders(exporterHeaders: string): Record<string, string> {
|
||||
const headers: Record<string, string> = {};
|
||||
for (const pair of exporterHeaders.split(',')) {
|
||||
const trimmedPair = pair.trim();
|
||||
if (!trimmedPair) continue;
|
||||
|
||||
if (!trimmedPair.includes('=')) {
|
||||
this.logger.warn(
|
||||
`Skipping invalid OTEL exporter header "${trimmedPair}": missing "=" separator. Expected format: "key=value".`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const [key, ...rest] = trimmedPair.split('=');
|
||||
const trimmedKey = key.trim();
|
||||
if (!trimmedKey) {
|
||||
this.logger.warn(
|
||||
`Skipping invalid OTEL exporter header "${trimmedPair}": empty key. Expected format: "key=value".`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
headers[trimmedKey] = rest.join('=').trim();
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
}
|
||||
51
packages/cli/src/modules/otel/span-registry.ts
Normal file
51
packages/cli/src/modules/otel/span-registry.ts
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
import type { Span } from '@opentelemetry/api';
|
||||
|
||||
export class SpanRegistry {
|
||||
private spans = new Map<string, Span>();
|
||||
|
||||
static workflowKey(executionId: string) {
|
||||
return executionId;
|
||||
}
|
||||
|
||||
static nodeKey(executionId: string, nodeId: string) {
|
||||
return `${executionId}:${nodeId}`;
|
||||
}
|
||||
|
||||
addWorkflow(executionId: string, span: Span) {
|
||||
this.spans.set(SpanRegistry.workflowKey(executionId), span);
|
||||
}
|
||||
|
||||
getWorkflow(executionId: string): Span | undefined {
|
||||
return this.spans.get(SpanRegistry.workflowKey(executionId));
|
||||
}
|
||||
|
||||
removeWorkflow(executionId: string): Span | undefined {
|
||||
const key = SpanRegistry.workflowKey(executionId);
|
||||
const span = this.spans.get(key);
|
||||
this.spans.delete(key);
|
||||
return span;
|
||||
}
|
||||
|
||||
addNode(executionId: string, nodeId: string, span: Span) {
|
||||
this.spans.set(SpanRegistry.nodeKey(executionId, nodeId), span);
|
||||
}
|
||||
|
||||
getNode(executionId: string, nodeId: string): Span | undefined {
|
||||
return this.spans.get(SpanRegistry.nodeKey(executionId, nodeId));
|
||||
}
|
||||
|
||||
removeNode(executionId: string, nodeId: string): Span | undefined {
|
||||
const key = SpanRegistry.nodeKey(executionId, nodeId);
|
||||
const span = this.spans.get(key);
|
||||
this.spans.delete(key);
|
||||
return span;
|
||||
}
|
||||
|
||||
cleanup(executionId: string) {
|
||||
for (const key of this.spans.keys()) {
|
||||
if (key === executionId || key.startsWith(`${executionId}:`)) {
|
||||
this.spans.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
977
pnpm-lock.yaml
977
pnpm-lock.yaml
File diff suppressed because it is too large
Load diff
|
|
@ -6,6 +6,7 @@ packages:
|
|||
- packages/testing/**
|
||||
|
||||
catalog:
|
||||
'@azure/identity': 4.13.0
|
||||
'@codemirror/autocomplete': 6.20.0
|
||||
'@codemirror/commands': 6.10.1
|
||||
'@codemirror/lang-css': 6.3.1
|
||||
|
|
@ -17,18 +18,17 @@ catalog:
|
|||
'@codemirror/search': 6.5.11
|
||||
'@codemirror/state': 6.5.3
|
||||
'@codemirror/view': 6.39.8
|
||||
'@lezer/common': 1.5.0
|
||||
'@lezer/css': 1.3.0
|
||||
'@lezer/highlight': 1.2.3
|
||||
'@lezer/html': 1.3.13
|
||||
'@lezer/javascript': 1.5.4
|
||||
'@lezer/generator': 1.8.0
|
||||
'@lezer/lr': 1.4.5
|
||||
'@azure/identity': 4.13.0
|
||||
'@langchain/anthropic': 1.1.3
|
||||
'@langchain/community': 1.1.14
|
||||
'@langchain/core': 1.1.34
|
||||
'@langchain/openai': 1.1.3
|
||||
'@lezer/common': 1.5.0
|
||||
'@lezer/css': 1.3.0
|
||||
'@lezer/generator': 1.8.0
|
||||
'@lezer/highlight': 1.2.3
|
||||
'@lezer/html': 1.3.13
|
||||
'@lezer/javascript': 1.5.4
|
||||
'@lezer/lr': 1.4.5
|
||||
'@n8n/typeorm': 0.3.20-16
|
||||
'@n8n_io/ai-assistant-sdk': 1.20.0
|
||||
'@sentry/node': ^10.36.0
|
||||
|
|
@ -53,6 +53,7 @@ catalog:
|
|||
https-proxy-agent: 7.0.6
|
||||
iconv-lite: 0.6.3
|
||||
js-base64: 3.7.2
|
||||
js-tiktoken: 1.0.12
|
||||
jsonrepair: 3.13.2
|
||||
jsonwebtoken: 9.0.3
|
||||
kafkajs: 2.2.4
|
||||
|
|
@ -69,23 +70,29 @@ catalog:
|
|||
rimraf: 6.0.1
|
||||
simple-git: 3.32.3
|
||||
stream-json: 1.9.1
|
||||
tsdown: ^0.16.5
|
||||
ts-morph: ^27.0.2
|
||||
tsdown: ^0.16.5
|
||||
tsx: ^4.19.3
|
||||
typescript: 5.9.2
|
||||
uuid: 10.0.0
|
||||
vite: npm:rolldown-vite@latest
|
||||
vm2: ^3.10.5
|
||||
vite-plugin-dts: ^4.5.4
|
||||
vitest: ^3.1.3
|
||||
vitest-mock-extended: ^3.1.0
|
||||
vm2: ^3.10.5
|
||||
xml2js: 0.6.2
|
||||
xss: 1.0.15
|
||||
zod: 3.25.67
|
||||
zod-to-json-schema: 3.23.3
|
||||
js-tiktoken: 1.0.12
|
||||
|
||||
catalogs:
|
||||
e2e:
|
||||
'@currents/playwright': ^1.15.3
|
||||
'@playwright/cli': 0.1.0
|
||||
'@playwright/test': 1.58.0
|
||||
eslint-plugin-playwright: 2.2.2
|
||||
playwright: 1.58.0
|
||||
playwright-core: 1.58.0
|
||||
frontend:
|
||||
'@sentry/vue': ^10.36.0
|
||||
'@testing-library/jest-dom': ^6.6.3
|
||||
|
|
@ -105,6 +112,10 @@ catalogs:
|
|||
vue-markdown-render: ^2.2.1
|
||||
vue-router: ^4.5.0
|
||||
vue-tsc: ^2.2.8
|
||||
sentry:
|
||||
'@sentry/node': ^10.36.0
|
||||
'@sentry/node-native': ^10.36.0
|
||||
'@sentry/profiling-node': ^10.36.0
|
||||
storybook:
|
||||
'@chromatic-com/storybook': ^4.1.3
|
||||
'@storybook/addon-a11y': ^10.1.11
|
||||
|
|
@ -114,17 +125,6 @@ catalogs:
|
|||
'@storybook/vue3-vite': ^10.1.11
|
||||
eslint-plugin-storybook: ^10.1.11
|
||||
storybook: ^10.1.11
|
||||
e2e:
|
||||
'@playwright/cli': 0.1.0
|
||||
'@playwright/test': 1.58.0
|
||||
'@currents/playwright': ^1.15.3
|
||||
eslint-plugin-playwright: 2.2.2
|
||||
playwright: 1.58.0
|
||||
playwright-core: 1.58.0
|
||||
sentry:
|
||||
'@sentry/node': '^10.36.0'
|
||||
'@sentry/node-native': '^10.36.0'
|
||||
'@sentry/profiling-node': '^10.36.0'
|
||||
|
||||
minimumReleaseAge: 4320
|
||||
|
||||
|
|
@ -132,11 +132,11 @@ minimumReleaseAgeExclude:
|
|||
- '@isaacs/brace-expansion'
|
||||
- '@n8n/*'
|
||||
- '@n8n_io/*'
|
||||
- 'tsdown@0.16.5'
|
||||
- tsdown@0.16.5
|
||||
- eslint-plugin-storybook
|
||||
- eslint-plugin-n8n-nodes-base
|
||||
- '@langchain/*'
|
||||
- 'langchain'
|
||||
- langchain
|
||||
- '@anthropic-ai/sdk'
|
||||
- '@google/generative-ai'
|
||||
- '@google/genai'
|
||||
|
|
|
|||
Loading…
Reference in a new issue