hyperdx/packages/api/src/opamp/controllers/opampController.ts
Warren Lee cb841457f2
[HDX-3994] Deprecate clickhouse.json feature gate in favor of per-exporter json config (#2119)
## Summary

Deprecate the upstream-deprecated `--feature-gates=clickhouse.json` CLI flag in favor of the per-exporter `json: true` config option, as recommended by the OpenTelemetry ClickHouse exporter v0.149.0.

This introduces a new env var `HYPERDX_OTEL_EXPORTER_CLICKHOUSE_JSON_ENABLE` that controls JSON mode at the exporter config level. The old `OTEL_AGENT_FEATURE_GATE_ARG` env var remains backward-compatible — when it contains `clickhouse.json`, the entrypoint strips that gate, maps it to the new env var, and prints a deprecation warning. Other feature gates are preserved and passed through to the collector.

**Key changes:**
- **`docker/otel-collector/entrypoint.sh`** — Detects `clickhouse.json` in `OTEL_AGENT_FEATURE_GATE_ARG`, strips it, sets `HYPERDX_OTEL_EXPORTER_CLICKHOUSE_JSON_ENABLE=true`, and prints a deprecation warning. Remaining feature gates are still passed through to the collector in both standalone and supervisor modes.
- **`docker/otel-collector/config.standalone.yaml`** — Added `json: ${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_JSON_ENABLE:-false}` to both ClickHouse exporter configs
- **`packages/api/src/opamp/controllers/opampController.ts`** — Added `json` field to the `CollectorConfig` type and both ClickHouse exporter configs for OpAMP-managed collectors
- **`docker/otel-collector/supervisor_docker.yaml.tmpl`** — Feature gate pass-through preserved for non-`clickhouse.json` gates (entrypoint strips the deprecated gate before supervisor template renders)
- **`smoke-tests/otel-collector/`** — Added a JSON-enabled otel-collector service and smoke tests verifying:
  - `ResourceAttributes` and `LogAttributes` columns in `otel_logs` are `JSON` type (not `Map`)
  - Log data with various attribute types (string, int, boolean) is inserted and queryable via JSON path access

### How to test locally or on Vercel

1. Run `yarn dev` to start the dev stack
2. Verify the `otel-collector-json` container starts without errors (the `clickhouse.json` feature gate is stripped, not passed to the collector)
3. Check container logs for the deprecation warning when `OTEL_AGENT_FEATURE_GATE_ARG` contains `clickhouse.json`
4. Verify the non-JSON `otel-collector` service continues to work normally (json defaults to false)
5. Run smoke tests: `cd smoke-tests/otel-collector && bats json-exporter.bats`

### References

- Linear Issue: https://linear.app/hyperdx/issue/HDX-3994
- Upstream deprecation: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/clickhouseexporter#experimental-json-support
2026-04-15 15:49:45 +00:00

377 lines
10 KiB
TypeScript

import { Request, Response } from 'express';
import * as config from '@/config';
import { getAllTeams } from '@/controllers/team';
import type { ITeam } from '@/models/team';
import logger from '@/utils/logger';
import { agentService } from '../services/agentService';
import {
createRemoteConfig,
decodeAgentToServer,
encodeServerToAgent,
serverCapabilities,
} from '../utils/protobuf';
type CollectorConfig = {
extensions: Record<string, any>;
receivers: {
'otlp/hyperdx'?: {
protocols: {
grpc: {
endpoint: string;
include_metadata: boolean;
auth?: {
authenticator: string;
};
};
http: {
endpoint: string;
cors: {
allowed_origins: string[];
allowed_headers: string[];
};
include_metadata: boolean;
auth?: {
authenticator: string;
};
};
};
};
prometheus?: {
config: {
scrape_configs: Array<{
job_name: string;
scrape_interval: string;
static_configs: Array<{
targets: string[];
}>;
}>;
};
};
fluentforward?: {
endpoint: string;
};
nop?: null;
'routing/logs'?: string[];
};
connectors?: {
'routing/logs'?: {
default_pipelines: string[];
error_mode: string;
table: Array<{
context: string;
statement: string;
pipelines: string[];
}>;
};
};
exporters?: {
nop?: null;
debug?: {
verbosity: string;
sampling_initial: number;
sampling_thereafter: number;
};
'clickhouse/rrweb'?: {
endpoint: string;
database: string;
username: string;
password: string;
ttl: string;
logs_table_name: string;
timeout: string;
create_schema: string;
json: string;
retry_on_failure: {
enabled: boolean;
initial_interval: string;
max_interval: string;
max_elapsed_time: string;
};
};
clickhouse?: {
endpoint: string;
database: string;
username: string;
password: string;
ttl: string;
timeout: string;
create_schema: string;
json: string;
retry_on_failure: {
enabled: boolean;
initial_interval: string;
max_interval: string;
max_elapsed_time: string;
};
};
};
service: {
extensions: string[];
pipelines: {
[key: string]: {
receivers: string[];
processors?: string[];
exporters: string[];
};
};
};
};
export const buildOtelCollectorConfig = (
teams: Pick<ITeam, 'apiKey' | 'collectorAuthenticationEnforced'>[],
): CollectorConfig => {
const apiKeys = teams.filter(team => team.apiKey).map(team => team.apiKey);
if (config.IS_ALL_IN_ONE_IMAGE || config.IS_LOCAL_APP_MODE || config.IS_DEV) {
// Only allow INGESTION_API_KEY for dev or all-in-one images for security reasons
if (config.INGESTION_API_KEY) {
apiKeys.push(config.INGESTION_API_KEY);
}
}
const collectorAuthenticationEnforced =
teams[0]?.collectorAuthenticationEnforced;
const otelCollectorConfig: CollectorConfig = {
extensions: {},
receivers: {
nop: null,
'otlp/hyperdx': {
protocols: {
grpc: {
endpoint: '0.0.0.0:4317',
include_metadata: true,
},
http: {
endpoint: '0.0.0.0:4318',
cors: {
allowed_origins: ['*'],
allowed_headers: ['*'],
},
include_metadata: true,
},
},
},
prometheus: {
config: {
scrape_configs: [
{
job_name: 'otelcol',
scrape_interval: '30s',
static_configs: [
{
targets: [
'0.0.0.0:8888',
'${env:CLICKHOUSE_PROMETHEUS_METRICS_ENDPOINT}',
],
},
],
},
],
},
},
fluentforward: {
endpoint: '0.0.0.0:24225',
},
},
connectors: {
'routing/logs': {
default_pipelines: ['logs/out-default'],
error_mode: 'ignore',
table: [
{
context: 'log',
statement:
'route() where IsMatch(attributes["rr-web.event"], ".*")',
pipelines: ['logs/out-rrweb'],
},
],
},
},
exporters: {
nop: null,
debug: {
verbosity: 'detailed',
sampling_initial: 5,
sampling_thereafter: 200,
},
'clickhouse/rrweb': {
endpoint: '${env:CLICKHOUSE_ENDPOINT}',
database: '${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_DATABASE}',
username: '${env:CLICKHOUSE_USER}',
password: '${env:CLICKHOUSE_PASSWORD}',
ttl: '${env:HYPERDX_OTEL_EXPORTER_TABLES_TTL:-720h}',
logs_table_name: 'hyperdx_sessions',
timeout: '5s',
create_schema:
'${env:HYPERDX_OTEL_EXPORTER_CREATE_LEGACY_SCHEMA:-false}',
json: '${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_JSON_ENABLE:-false}',
retry_on_failure: {
enabled: true,
initial_interval: '5s',
max_interval: '30s',
max_elapsed_time: '300s',
},
},
clickhouse: {
endpoint: '${env:CLICKHOUSE_ENDPOINT}',
database: '${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_DATABASE}',
username: '${env:CLICKHOUSE_USER}',
password: '${env:CLICKHOUSE_PASSWORD}',
ttl: '${env:HYPERDX_OTEL_EXPORTER_TABLES_TTL:-720h}',
timeout: '5s',
create_schema:
'${env:HYPERDX_OTEL_EXPORTER_CREATE_LEGACY_SCHEMA:-false}',
json: '${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_JSON_ENABLE:-false}',
retry_on_failure: {
enabled: true,
initial_interval: '5s',
max_interval: '30s',
max_elapsed_time: '300s',
},
},
},
service: {
extensions: [],
pipelines: {
traces: {
receivers: ['nop'],
processors: ['memory_limiter', 'batch'],
exporters: ['clickhouse'],
},
metrics: {
// TODO: prometheus needs to be authenticated
receivers: ['prometheus'],
processors: ['memory_limiter', 'batch'],
exporters: ['clickhouse'],
},
'logs/in': {
// TODO: fluentforward needs to be authenticated
receivers: ['fluentforward'],
exporters: ['routing/logs'],
},
'logs/out-default': {
receivers: ['routing/logs'],
processors: ['memory_limiter', 'transform', 'batch'],
exporters: ['clickhouse'],
},
'logs/out-rrweb': {
receivers: ['routing/logs'],
processors: ['memory_limiter', 'batch'],
exporters: ['clickhouse/rrweb'],
},
},
},
};
if (apiKeys && apiKeys.length > 0) {
// attach otlp/hyperdx receiver
otelCollectorConfig.service.pipelines.traces.receivers.push('otlp/hyperdx');
otelCollectorConfig.service.pipelines.metrics.receivers.push(
'otlp/hyperdx',
);
otelCollectorConfig.service.pipelines['logs/in'].receivers.push(
'otlp/hyperdx',
);
if (collectorAuthenticationEnforced) {
if (otelCollectorConfig.receivers['otlp/hyperdx'] == null) {
// should never happen
throw new Error('otlp/hyperdx receiver not found');
}
otelCollectorConfig.extensions['bearertokenauth/hyperdx'] = {
scheme: '',
tokens: apiKeys,
};
otelCollectorConfig.receivers['otlp/hyperdx'].protocols.grpc.auth = {
authenticator: 'bearertokenauth/hyperdx',
};
otelCollectorConfig.receivers['otlp/hyperdx'].protocols.http.auth = {
authenticator: 'bearertokenauth/hyperdx',
};
otelCollectorConfig.service.extensions = ['bearertokenauth/hyperdx'];
}
}
return otelCollectorConfig;
};
export class OpampController {
/**
* Handle an OpAMP message from an agent
*/
public async handleOpampMessage(req: Request, res: Response): Promise<void> {
try {
// Check content type
const contentType = req.get('Content-Type');
if (contentType !== 'application/x-protobuf') {
res
.status(415)
.send(
'Unsupported Media Type: Content-Type must be application/x-protobuf',
);
return;
}
// Decode the AgentToServer message
const agentToServer = decodeAgentToServer(req.body);
logger.debug({ agentToServer }, 'agentToServer');
logger.debug(
// @ts-ignore
`Received message from agent: ${agentToServer.instanceUid?.toString(
'hex',
)}`,
);
// Process the agent status
const agent = agentService.processAgentStatus(agentToServer);
// Prepare the response
const serverToAgent: any = {
instanceUid: agent.instanceUid,
capabilities: serverCapabilities,
};
// Check if we should send a remote configuration
if (agentService.agentAcceptsRemoteConfig(agent)) {
const teams = await getAllTeams([
'apiKey',
'collectorAuthenticationEnforced',
]);
const otelCollectorConfig = buildOtelCollectorConfig(teams);
if (config.IS_DEV) {
logger.debug(JSON.stringify(otelCollectorConfig, null, 2));
}
const remoteConfig = createRemoteConfig(
new Map([
['config.json', Buffer.from(JSON.stringify(otelCollectorConfig))],
]),
'application/json',
);
serverToAgent.remoteConfig = remoteConfig;
logger.debug(
`Sending remote config to agent: ${agent.instanceUid.toString(
'hex',
)}`,
);
}
// Encode and send the response
const encodedResponse = encodeServerToAgent(serverToAgent);
res.setHeader('Content-Type', 'application/x-protobuf');
res.send(encodedResponse);
} catch (error) {
logger.error({ err: error }, 'Error handling OpAMP message');
res.status(500).send('Internal Server Error');
}
}
}
// Create a singleton instance
export const opampController = new OpampController();