refactor(agents): extract chat platforms into Integration classes

Move platform-specific logic (adapter construction, credential extraction,
webhook registration, component normalization, capability metadata,
callback-size flag) out of ChatIntegrationService, ComponentMapper,
createRichInteractionTool, and AgentChatBridge into a per-platform
Integration subclass registered via IntegrationRegistry.

- integration.ts — Integration abstract base + IntegrationRegistry
- platforms/slack-integration.ts — adapter, credential extraction
- platforms/telegram-integration.ts — adapter, webhook registration via
  onAfterConnect hook, select/image component normalization, short-
  callback-data flag
- agents.module.ts registers both platforms at module init
- ChatIntegrationService.connect() loses its switch statement; adapter
  creation and post-connect hooks route through the registry
- ComponentMapper.toCard pulls normalization from the registry
- createRichInteractionTool pulls supportedComponents and description
  from the registry
- AgentChatBridge decides whether to allocate a CallbackStore based on
  the registry's needsShortCallbackData flag

Adding a new platform (Discord, Teams, …) is now: write one Integration
subclass and register it — no more hunts through four files.

Addresses review comments from @yehorkardash on #28330.
This commit is contained in:
Eugene Molodkin 2026-04-21 08:04:11 +02:00
parent e6b35b50cc
commit c812460a3e
No known key found for this signature in database
8 changed files with 341 additions and 206 deletions

View file

@ -21,6 +21,15 @@ export class AgentsModule implements ModuleInterface {
const { AgentSecureRuntime } = await import('./runtime/agent-secure-runtime');
Container.get(AgentSecureRuntime);
// Populate the integration registry with supported chat platforms.
// Adding a new platform is adding one subclass + one register() call.
const { IntegrationRegistry } = await import('./integrations/integration');
const { SlackIntegration } = await import('./integrations/platforms/slack-integration');
const { TelegramIntegration } = await import('./integrations/platforms/telegram-integration');
const registry = Container.get(IntegrationRegistry);
registry.register(Container.get(SlackIntegration));
registry.register(Container.get(TelegramIntegration));
// Register Chat integration service and reconnect active integrations
const { ChatIntegrationService } = await import('./integrations/chat-integration.service');
const chatService = Container.get(ChatIntegrationService);

View file

@ -1,10 +1,12 @@
import type { AgentMessage, CredentialProvider, StreamChunk } from '@n8n/agents';
import { Container } from '@n8n/di';
import type { Logger } from 'n8n-workflow';
import type { AgentsService } from '../agents.service';
import type { RichSuspendPayload } from '../types';
import { CallbackStore } from './callback-store';
import type { ComponentMapper } from './component-mapper';
import { IntegrationRegistry } from './integration';
/**
* Subset of `AgentsService` consumed by the bridge.
@ -100,9 +102,6 @@ export class AgentChatBridge {
/** Store for shortening callback data on platforms with size limits (Telegram) */
private readonly callbackStore?: CallbackStore;
/** Platforms where callback data must be shortened */
private static readonly SHORT_CALLBACK_PLATFORMS = new Set(['telegram']);
constructor(
private readonly bot: ChatBot,
private readonly agentId: string,
@ -114,7 +113,8 @@ export class AgentChatBridge {
private readonly n8nProjectId: string,
private readonly integrationType: string,
) {
if (AgentChatBridge.SHORT_CALLBACK_PLATFORMS.has(integrationType)) {
const integration = Container.get(IntegrationRegistry).get(integrationType);
if (integration?.needsShortCallbackData) {
this.callbackStore = new CallbackStore();
}
this.registerHandlers();

View file

@ -9,7 +9,8 @@ import { UrlService } from '@/services/url.service';
import { AgentChatBridge } from './agent-chat-bridge';
import { ComponentMapper } from './component-mapper';
import { loadChatSdk, loadMemoryState, loadSlackAdapter, loadTelegramAdapter } from './esm-loader';
import { loadChatSdk, loadMemoryState } from './esm-loader';
import { IntegrationRegistry, type IntegrationContext } from './integration';
import { AgentsCredentialProvider } from '../adapters/agents-credential-provider';
import { AgentRepository } from '../repositories/agent.repository';
@ -56,6 +57,7 @@ export class ChatIntegrationService {
private readonly credentialsService: CredentialsService,
private readonly credentialsFinderService: CredentialsFinderService,
private readonly urlService: UrlService,
private readonly integrationRegistry: IntegrationRegistry,
) {}
private connectionKey(agentId: string, type: string, credentialId: string): string {
@ -82,6 +84,8 @@ export class ChatIntegrationService {
await this.disconnectOne(key);
}
const integration = this.integrationRegistry.require(integrationType);
const user = await this.resolveUser(userId);
// Create credential provider scoped to this agent's project
@ -90,8 +94,15 @@ export class ChatIntegrationService {
// Decrypt the integration credential to get platform tokens
const decryptedData = await this.decryptCredential(credentialId, user);
// Create platform-specific adapter
const adapter = await this.createAdapter(integrationType, decryptedData);
const ctx: IntegrationContext = {
agentId,
projectId,
credential: decryptedData,
webhookUrlFor: (platform) => this.buildWebhookUrl(agentId, projectId, platform),
};
// Delegate adapter construction to the platform implementation.
const adapter = await integration.createAdapter(ctx);
// Dynamic imports — chat packages are ESM-only, use loader to bypass CJS transform
const { Chat } = await loadChatSdk();
@ -130,19 +141,18 @@ export class ChatIntegrationService {
// Post-initialize hooks (e.g. Telegram setWebhook) run AFTER chat is live.
// If one throws we must shut the chat down, otherwise adapters/timers leak.
try {
if (integrationType === 'telegram' && this.getTelegramMode() === 'webhook') {
const botToken = this.extractTelegramBotToken(decryptedData);
await this.registerTelegramWebhook(botToken, agentId, projectId);
if (integration.onAfterConnect) {
try {
await integration.onAfterConnect(ctx);
} catch (error) {
await chat.shutdown().catch((shutdownError: unknown) => {
this.logger.warn(
`[ChatIntegrationService] Shutdown after failed onAfterConnect threw: ${shutdownError instanceof Error ? shutdownError.message : String(shutdownError)}`,
);
});
bridge.dispose();
throw error;
}
} catch (error) {
await chat.shutdown().catch((shutdownError: unknown) => {
this.logger.warn(
`[ChatIntegrationService] Shutdown after failed post-connect hook threw: ${shutdownError instanceof Error ? shutdownError.message : String(shutdownError)}`,
);
});
bridge.dispose();
throw error;
}
// The `chat` variable is returned by `new Chat(...)` from the ESM-only
@ -324,116 +334,10 @@ export class ChatIntegrationService {
return decrypted as Record<string, unknown>;
}
/**
* Extract the bot token from a decrypted Slack credential.
*
* - `slackApi` stores the token as `accessToken`.
* - `slackOAuth2Api` stores the token inside `oauthTokenData.access_token`.
*/
private extractSlackBotToken(credential: Record<string, unknown>): string {
// slackApi credential
let token: string | undefined;
if (typeof credential.accessToken === 'string' && credential.accessToken) {
token = credential.accessToken;
}
// slackOAuth2Api credential — token lives in the nested oauthTokenData object
if (!token) {
const tokenData = credential.oauthTokenData as Record<string, unknown> | undefined;
const oauthToken = tokenData?.access_token ?? tokenData?.accessToken;
if (typeof oauthToken === 'string' && oauthToken) {
token = oauthToken;
}
}
if (!token) {
throw new Error(
'Could not extract a bot token from the Slack credential. ' +
'Please ensure the credential has a valid access token.',
);
}
if (!token.startsWith('xoxb-')) {
const prefix = token.split('-')[0] ?? 'unknown';
throw new Error(
`The Slack credential contains a "${prefix}-" token, but agent integrations require a Bot User OAuth Token ("xoxb-"). ` +
'You can find this in your Slack app under OAuth & Permissions → Bot User OAuth Token.',
);
}
return token;
}
private extractSlackSigningSecret(credential: Record<string, unknown>): string {
const secret = credential.signatureSecret;
if (typeof secret === 'string' && secret) {
return secret;
}
throw new Error(
'The Slack credential is missing a signing secret, which is required for agent integrations. ' +
'Edit the credential and add your Slack app\'s "Signing Secret" (found under Basic Information in the Slack API dashboard).',
);
}
private extractTelegramBotToken(credential: Record<string, unknown>): string {
const token = credential.accessToken;
if (typeof token === 'string' && token) {
return token;
}
throw new Error(
'Could not extract a bot token from the Telegram credential. ' +
'Please ensure the credential has a valid access token from BotFather.',
);
}
private getTelegramMode(): 'webhook' | 'polling' {
const baseUrl = this.urlService.getWebhookBaseUrl();
const isPublic = baseUrl.startsWith('https://') && !baseUrl.includes('localhost');
return isPublic ? 'webhook' : 'polling';
}
private buildWebhookUrl(agentId: string, projectId: string, platform: string): string {
// getWebhookBaseUrl returns the URL with a trailing slash, honours the
// WEBHOOK_URL env var used by n8n's other webhook triggers.
const base = this.urlService.getWebhookBaseUrl();
return `${base}rest/projects/${projectId}/agents/v2/${agentId}/webhooks/${platform}`;
}
private async registerTelegramWebhook(
botToken: string,
agentId: string,
projectId: string,
): Promise<void> {
const webhookUrl = this.buildWebhookUrl(agentId, projectId, 'telegram');
const resp = await fetch(`https://api.telegram.org/bot${botToken}/setWebhook`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url: webhookUrl }),
});
if (!resp.ok) {
throw new Error(`Failed to register Telegram webhook: ${await resp.text()}`);
}
this.logger.info(`[ChatIntegrationService] Telegram webhook registered: ${webhookUrl}`);
}
private async createAdapter(type: string, credential: Record<string, unknown>): Promise<unknown> {
switch (type) {
case 'slack': {
const botToken = this.extractSlackBotToken(credential);
const signingSecret = this.extractSlackSigningSecret(credential);
const { createSlackAdapter } = await loadSlackAdapter();
return createSlackAdapter({ botToken, signingSecret });
}
case 'telegram': {
const botToken = this.extractTelegramBotToken(credential);
const mode = this.getTelegramMode();
const { createTelegramAdapter } = await loadTelegramAdapter();
return createTelegramAdapter({ botToken, mode });
}
default:
throw new Error(`Unsupported integration type: ${type}`);
}
}
}

View file

@ -1,9 +1,12 @@
import { Container } from '@n8n/di';
import { loadChatSdk } from './esm-loader';
import { IntegrationRegistry } from './integration';
/**
* Component type from agent SDK suspend/toMessage payloads.
*/
interface SuspendComponent {
export interface SuspendComponent {
type: string;
text?: string;
label?: string;
@ -51,40 +54,6 @@ interface ComponentRenderContext {
* via the ESM loader to bypass TypeScript's CJS transform.
*/
export class ComponentMapper {
/**
* Normalize components for platforms that don't support certain types.
* Converts select/radio_select individual buttons, image text link.
*/
private normalizeForPlatform(
components: SuspendComponent[],
platform?: string,
): SuspendComponent[] {
// Slack supports everything — no normalization needed
if (!platform || platform === 'slack') return components;
const normalized: SuspendComponent[] = [];
for (const c of components) {
switch (c.type) {
case 'select':
case 'radio_select':
// Convert select options to individual buttons
for (const opt of c.options ?? []) {
normalized.push({ type: 'button', label: opt.label, value: opt.value });
}
break;
case 'image':
// Convert image to a section with a markdown link
if (c.url) {
normalized.push({ type: 'section', text: `[${c.altText ?? 'Image'}](${c.url})` });
}
break;
default:
normalized.push(c);
}
}
return normalized;
}
/**
* Convert a suspend payload to a Chat SDK Card.
*
@ -106,8 +75,9 @@ export class ComponentMapper {
): Promise<unknown> {
const sdk = await loadChatSdk();
// Normalize unsupported components for the target platform
const components = this.normalizeForPlatform(payload.components, platform);
// Delegate per-platform normalization to the Integration implementation.
const integration = platform ? Container.get(IntegrationRegistry).get(platform) : undefined;
const components = integration?.normalizeComponents?.(payload.components) ?? payload.components;
const children: unknown[] = [];
const buttons: unknown[] = [];

View file

@ -0,0 +1,88 @@
import { Service } from '@n8n/di';
import type { SuspendComponent } from './component-mapper';
/** Per-connection context handed to Integration hooks. */
export interface IntegrationContext {
agentId: string;
projectId: string;
credential: Record<string, unknown>;
/** Returns the inbound webhook URL this n8n instance exposes for the given platform. */
webhookUrlFor: (platform: string) => string;
}
/**
* A chat platform (Slack, Telegram, ) that an agent can be connected to.
*
* Encapsulates everything platform-specific in one place: adapter construction,
* credential extraction, capability metadata used by the rich_interaction tool,
* component normalization before rendering, and optional lifecycle hooks.
*
* The concrete subclasses live under `./platforms/`.
*/
export abstract class Integration {
/** Platform identifier (`'slack'`, `'telegram'`, …). */
abstract readonly type: string;
/** Credential types accepted by the frontend selector. */
abstract readonly credentialTypes: string[];
/** Component types this platform supports in rich_interaction cards. */
abstract readonly supportedComponents: string[];
/** User-facing description used by `createRichInteractionTool`. */
abstract readonly description: string;
/**
* True if this platform has a small callback_data limit (Telegram: 64 bytes).
* When true, buttons encode a short key that the bridge resolves via the
* CallbackStore instead of carrying the full payload.
*/
readonly needsShortCallbackData: boolean = false;
/** Build the Chat SDK adapter for this platform. */
abstract createAdapter(ctx: IntegrationContext): Promise<unknown>;
/** Optional hook run AFTER `chat.initialize()`. Throwing triggers cleanup. */
onAfterConnect?(ctx: IntegrationContext): Promise<void>;
/** Optional hook run BEFORE `chat.shutdown()`. Errors here are logged, not thrown. */
onBeforeDisconnect?(ctx: IntegrationContext): Promise<void>;
/**
* Optional per-platform component normalization (applied before toCard).
* Convert unsupported types into close-enough equivalents e.g. Telegram
* turns select options into individual buttons.
*/
normalizeComponents?(components: SuspendComponent[]): SuspendComponent[];
}
/**
* Singleton registry of Integration implementations.
*
* Platforms register themselves during module init (`agents.module.ts`).
* Consumers (ChatIntegrationService, ComponentMapper, createRichInteractionTool,
* AgentChatBridge) look up integrations by type.
*/
@Service()
export class IntegrationRegistry {
private readonly integrations = new Map<string, Integration>();
register(integration: Integration): void {
this.integrations.set(integration.type, integration);
}
get(type: string): Integration | undefined {
return this.integrations.get(type);
}
require(type: string): Integration {
const integration = this.integrations.get(type);
if (!integration) throw new Error(`Unknown integration type: ${type}`);
return integration;
}
list(): Integration[] {
return [...this.integrations.values()];
}
}

View file

@ -0,0 +1,91 @@
import { Service } from '@n8n/di';
import { loadSlackAdapter } from '../esm-loader';
import { Integration, type IntegrationContext } from '../integration';
/**
* Slack platform integration.
*
* Slack callback_data has no small limit and supports every component type
* the rich_interaction tool emits, so no normalization or callback shortening
* is required.
*/
@Service()
export class SlackIntegration extends Integration {
readonly type = 'slack';
readonly credentialTypes = ['slackApi', 'slackOAuth2Api'];
readonly supportedComponents = [
'section',
'button',
'select',
'radio_select',
'divider',
'image',
'fields',
];
readonly description =
'Present rich interactive UI to the user in Slack. Use buttons, ' +
'dropdown selects, radio buttons, images, or formatted content cards. ' +
"The user's response (button click or selection) is returned to you.";
async createAdapter(ctx: IntegrationContext): Promise<unknown> {
const botToken = this.extractBotToken(ctx.credential);
const signingSecret = this.extractSigningSecret(ctx.credential);
const { createSlackAdapter } = await loadSlackAdapter();
return createSlackAdapter({ botToken, signingSecret });
}
/**
* Extract the bot token from a decrypted Slack credential.
*
* - `slackApi` stores the token as `accessToken`.
* - `slackOAuth2Api` stores the token inside `oauthTokenData.access_token`.
*/
private extractBotToken(credential: Record<string, unknown>): string {
let token: string | undefined;
if (typeof credential.accessToken === 'string' && credential.accessToken) {
token = credential.accessToken;
}
if (!token) {
const tokenData = credential.oauthTokenData as Record<string, unknown> | undefined;
const oauthToken = tokenData?.access_token ?? tokenData?.accessToken;
if (typeof oauthToken === 'string' && oauthToken) {
token = oauthToken;
}
}
if (!token) {
throw new Error(
'Could not extract a bot token from the Slack credential. ' +
'Please ensure the credential has a valid access token.',
);
}
if (!token.startsWith('xoxb-')) {
const prefix = token.split('-')[0] ?? 'unknown';
throw new Error(
`The Slack credential contains a "${prefix}-" token, but agent integrations require a Bot User OAuth Token ("xoxb-"). ` +
'You can find this in your Slack app under OAuth & Permissions → Bot User OAuth Token.',
);
}
return token;
}
private extractSigningSecret(credential: Record<string, unknown>): string {
const secret = credential.signatureSecret;
if (typeof secret === 'string' && secret) {
return secret;
}
throw new Error(
'The Slack credential is missing a signing secret, which is required for agent integrations. ' +
'Edit the credential and add your Slack app\'s "Signing Secret" (found under Basic Information in the Slack API dashboard).',
);
}
}

View file

@ -0,0 +1,103 @@
import { Logger } from '@n8n/backend-common';
import { Service } from '@n8n/di';
import { UrlService } from '@/services/url.service';
import type { SuspendComponent } from '../component-mapper';
import { loadTelegramAdapter } from '../esm-loader';
import { Integration, type IntegrationContext } from '../integration';
/**
* Telegram platform integration.
*
* Telegram's Bot API caps callback_data at 64 bytes, so {@link needsShortCallbackData}
* is true the bridge stores full payloads in a CallbackStore and emits short
* 8-char keys as button IDs. The adapter runs in webhook mode when a public
* `WEBHOOK_URL` is configured, otherwise it falls back to polling for local dev.
*/
@Service()
export class TelegramIntegration extends Integration {
readonly type = 'telegram';
readonly credentialTypes = ['telegramApi'];
readonly supportedComponents = ['section', 'button', 'divider', 'fields'];
readonly description =
'Present rich interactive UI in Telegram. Available: buttons, text sections, ' +
'dividers, key-value fields. For multiple choices, use one button per option. ' +
"The user's response (button click) is returned to you.";
readonly needsShortCallbackData = true;
constructor(
private readonly logger: Logger,
private readonly urlService: UrlService,
) {
super();
}
async createAdapter(ctx: IntegrationContext): Promise<unknown> {
const botToken = this.extractBotToken(ctx.credential);
const mode = this.getMode();
const { createTelegramAdapter } = await loadTelegramAdapter();
return createTelegramAdapter({ botToken, mode });
}
async onAfterConnect(ctx: IntegrationContext): Promise<void> {
if (this.getMode() !== 'webhook') return;
const botToken = this.extractBotToken(ctx.credential);
const webhookUrl = ctx.webhookUrlFor('telegram');
const resp = await fetch(`https://api.telegram.org/bot${botToken}/setWebhook`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url: webhookUrl }),
});
if (!resp.ok) {
throw new Error(`Failed to register Telegram webhook: ${await resp.text()}`);
}
this.logger.info(`[TelegramIntegration] Webhook registered: ${webhookUrl}`);
}
normalizeComponents(components: SuspendComponent[]): SuspendComponent[] {
const normalized: SuspendComponent[] = [];
for (const c of components) {
switch (c.type) {
case 'select':
case 'radio_select':
// Convert select options to individual buttons
for (const opt of c.options ?? []) {
normalized.push({ type: 'button', label: opt.label, value: opt.value });
}
break;
case 'image':
// Convert image to a section with a markdown link
if (c.url) {
normalized.push({ type: 'section', text: `[${c.altText ?? 'Image'}](${c.url})` });
}
break;
default:
normalized.push(c);
}
}
return normalized;
}
/** Webhook when we have a public URL, polling otherwise (local dev). */
private getMode(): 'webhook' | 'polling' {
const baseUrl = this.urlService.getWebhookBaseUrl();
const isPublic = baseUrl.startsWith('https://') && !baseUrl.includes('localhost');
return isPublic ? 'webhook' : 'polling';
}
private extractBotToken(credential: Record<string, unknown>): string {
const token = credential.accessToken;
if (typeof token === 'string' && token) {
return token;
}
throw new Error(
'Could not extract a bot token from the Telegram credential. ' +
'Please ensure the credential has a valid access token from BotFather.',
);
}
}

View file

@ -1,48 +1,16 @@
import { Tool } from '@n8n/agents';
import { Container } from '@n8n/di';
import { z } from 'zod';
// ---------------------------------------------------------------------------
// Platform capabilities
// ---------------------------------------------------------------------------
import { IntegrationRegistry } from './integration';
interface PlatformCapabilities {
supportedComponents: string[];
description: string;
}
const PLATFORM_CAPABILITIES: Record<string, PlatformCapabilities> = {
slack: {
supportedComponents: [
'section',
'button',
'select',
'radio_select',
'divider',
'image',
'fields',
],
description:
'Present rich interactive UI to the user in Slack. Use buttons, ' +
'dropdown selects, radio buttons, images, or formatted content cards. ' +
"The user's response (button click or selection) is returned to you.",
},
telegram: {
supportedComponents: ['section', 'button', 'divider', 'fields'],
description:
'Present rich interactive UI in Telegram. Available: buttons, text sections, ' +
'dividers, key-value fields. For multiple choices, use one button per option. ' +
"The user's response (button click) is returned to you.",
},
};
// Conservative default — works on all platforms that support buttons
const DEFAULT_CAPABILITIES: PlatformCapabilities = {
supportedComponents: ['section', 'button', 'divider', 'fields'],
description:
'Present rich interactive UI to the user in chat. Available: buttons, ' +
'text sections, dividers, key-value fields. For choices, use one button per option. ' +
"The user's response (button click) is returned to you.",
};
// Conservative default — works on every platform that supports buttons.
// Used when the tool is constructed without a platform hint.
const DEFAULT_SUPPORTED_COMPONENTS = ['section', 'button', 'divider', 'fields'];
const DEFAULT_DESCRIPTION =
'Present rich interactive UI to the user in chat. Available: buttons, ' +
'text sections, dividers, key-value fields. For choices, use one button per option. ' +
"The user's response (button click) is returned to you.";
// ---------------------------------------------------------------------------
// Shared schemas
@ -108,8 +76,10 @@ function buildComponentSchema(supportedComponents: string[]) {
}
export function createRichInteractionTool(platform?: string) {
const caps = PLATFORM_CAPABILITIES[platform ?? ''] ?? DEFAULT_CAPABILITIES;
const componentSchema = buildComponentSchema(caps.supportedComponents);
const integration = platform ? Container.get(IntegrationRegistry).get(platform) : undefined;
const supportedComponents = integration?.supportedComponents ?? DEFAULT_SUPPORTED_COMPONENTS;
const description = integration?.description ?? DEFAULT_DESCRIPTION;
const componentSchema = buildComponentSchema(supportedComponents);
const inputSchema = z.object({
title: z.string().optional().describe('Card title / header text'),
@ -121,7 +91,7 @@ export function createRichInteractionTool(platform?: string) {
const suspendSchema = inputSchema;
return new Tool('rich_interaction')
.description(caps.description)
.description(description)
.input(inputSchema)
.suspend(suspendSchema)
.resume(