feat: support custom otel collector config (BETA) (#1074)

plus the fix to reduce bloat in opamp agent logs

Users should be able to mount the custom otel collector config file and add/overrider receivers, processors and exporters
For example:
```
receivers:
  hostmetrics:
    collection_interval: 5s
    scrapers:
      cpu:
      load:
      memory:
      disk:
      filesystem:
      network:
# override the default processors
processors:
  batch:
    send_batch_size: 10000
    timeout: 10s
  memory_limiter:
    limit_mib: 2000
service:
  pipelines:
    metrics/hostmetrics:
      receivers: [hostmetrics]
      # attach existing processors
      processors: [memory_limiter, batch]
      # attach existing exporters
      exporters: [clickhouse]
```
This will add a new `hostmetrics` receiver + `metrics/hostmetrics` pipeline and update existing `batch` + `memory_limiter` processors 

WARNING: This feature is still in beta, and future updates may change how it works, potentially affecting compatibility

Ref: HDX-1865
This commit is contained in:
Warren 2025-08-18 14:22:43 -07:00 committed by GitHub
parent 41083a450e
commit ab50b12a6b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 564 additions and 220 deletions

View file

@ -0,0 +1,5 @@
---
"@hyperdx/api": patch
---
feat: support custom otel collector config (BETA)

View file

@ -0,0 +1,5 @@
---
"@hyperdx/api": patch
---
fix: reduce bloat in opamp agent logs

View file

@ -30,14 +30,17 @@ services:
HYPERDX_API_KEY: ${HYPERDX_API_KEY}
HYPERDX_LOG_LEVEL: ${HYPERDX_LOG_LEVEL}
OPAMP_SERVER_URL: 'http://host.docker.internal:${HYPERDX_OPAMP_PORT}'
CUSTOM_OTELCOL_CONFIG_FILE: '/etc/otelcol-contrib/custom.config.yaml'
# Uncomment to enable stdout logging for the OTel collector
# OTEL_SUPERVISOR_PASSTHROUGH_LOGS: 'true'
OTEL_SUPERVISOR_PASSTHROUGH_LOGS: 'false'
# Uncomment to enable JSON schema in ClickHouse
# Be sure to also set BETA_CH_OTEL_JSON_SCHEMA_ENABLED to 'true' in ch-server
# OTEL_AGENT_FEATURE_GATE_ARG: '--feature-gates=clickhouse.json'
volumes:
- ./docker/otel-collector/config.yaml:/etc/otelcol-contrib/config.yaml
- ./docker/otel-collector/supervisor_docker.yaml:/etc/otel/supervisor.yaml
# Add a custom config file
- ./docker/otel-collector/custom.config.yaml:/etc/otelcol-contrib/custom.config.yaml
ports:
- '13133:13133' # health_check extension
- '24225:24225' # fluentd receiver

View file

@ -3,38 +3,44 @@ FROM otel/opentelemetry-collector-contrib:0.129.1 AS col
FROM otel/opentelemetry-collector-opampsupervisor:0.128.0 AS supervisor
# From: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/aa5c3aa4c7ec174361fcaf908de8eaca72263078/cmd/opampsupervisor/Dockerfile#L18
FROM alpine:latest@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c AS prep
RUN apk --update add ca-certificates
RUN mkdir -p /etc/otel/supervisor-data/
FROM scratch AS base
FROM alpine:latest@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c AS base
ARG USER_UID=10001
ARG USER_GID=10001
# Install certs, create user/group, and make the writable data dir
RUN apk add --no-cache ca-certificates && \
addgroup -S -g ${USER_GID} otel && \
adduser -S -u ${USER_UID} -G otel otel && \
install -d -m 0777 -o ${USER_UID} -g ${USER_GID} /etc/otel/supervisor-data
USER ${USER_UID}:${USER_GID}
COPY --from=prep /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=prep --chmod=777 --chown=${USER_UID}:${USER_GID} /etc/otel/supervisor-data /etc/otel/supervisor-data
COPY --from=supervisor --chmod=755 /usr/local/bin/opampsupervisor /
COPY ./supervisor_docker.yaml /etc/otel/supervisor.yaml
COPY --from=supervisor --chmod=755 /usr/local/bin/opampsupervisor /opampsupervisor
COPY --from=col --chmod=755 /otelcol-contrib /otelcontribcol
# Copy entrypoint and log rotation scripts
COPY --chmod=755 ./entrypoint.sh /entrypoint.sh
COPY --chmod=755 ./log-rotator.sh /log-rotator.sh
## dev ##############################################################################################
FROM base AS dev
COPY ./config.yaml /etc/otelcol-contrib/config.yaml
COPY ./supervisor_docker.yaml /etc/otel/supervisor.yaml
EXPOSE 4317 4318 13133
ENTRYPOINT ["/opampsupervisor"]
ENTRYPOINT ["/entrypoint.sh", "/opampsupervisor"]
CMD ["--config", "/etc/otel/supervisor.yaml"]
## prod #############################################################################################
FROM base AS prod
COPY ./config.yaml /etc/otelcol-contrib/config.yaml
COPY ./supervisor_docker.yaml /etc/otel/supervisor.yaml
EXPOSE 4317 4318 13133
ENTRYPOINT ["/opampsupervisor"]
ENTRYPOINT ["/entrypoint.sh", "/opampsupervisor"]
CMD ["--config", "/etc/otel/supervisor.yaml"]

View file

@ -0,0 +1,174 @@
receivers:
# Troubleshooting
prometheus:
config:
scrape_configs:
- job_name: 'otelcol'
scrape_interval: 30s
static_configs:
- targets:
- '0.0.0.0:8888'
- ${env:CLICKHOUSE_PROMETHEUS_METRICS_ENDPOINT}
# Data sources: logs
fluentforward:
endpoint: '0.0.0.0:24225'
# Configured via OpAMP w/ authentication
# Data sources: traces, metrics, logs
# otlp/hyperdx:
# protocols:
# grpc:
# include_metadata: true
# endpoint: '0.0.0.0:4317'
# http:
# cors:
# allowed_origins: ['*']
# allowed_headers: ['*']
# include_metadata: true
# endpoint: '0.0.0.0:4318'
processors:
transform:
log_statements:
- context: log
error_mode: ignore
statements:
# JSON parsing: Extends log attributes with the fields from structured log body content, either as an OTEL map or
# as a string containing JSON content.
- set(log.cache, ExtractPatterns(log.body, "(?P<0>(\\{.*\\}))")) where
IsString(log.body)
- merge_maps(log.attributes, ParseJSON(log.cache["0"]), "upsert")
where IsMap(log.cache)
- flatten(log.attributes) where IsMap(log.cache)
- merge_maps(log.attributes, log.body, "upsert") where IsMap(log.body)
- context: log
error_mode: ignore
conditions:
- severity_number == 0 and severity_text == ""
statements:
# Infer: extract the first log level keyword from the first 256 characters of the body
- set(log.cache["substr"], log.body.string) where Len(log.body.string)
< 256
- set(log.cache["substr"], Substring(log.body.string, 0, 256)) where
Len(log.body.string) >= 256
- set(log.cache, ExtractPatterns(log.cache["substr"],
"(?i)(?P<0>(alert|crit|emerg|fatal|error|err|warn|notice|debug|dbug|trace))"))
# Infer: detect FATAL
- set(log.severity_number, SEVERITY_NUMBER_FATAL) where
IsMatch(log.cache["0"], "(?i)(alert|crit|emerg|fatal)")
- set(log.severity_text, "fatal") where log.severity_number ==
SEVERITY_NUMBER_FATAL
# Infer: detect ERROR
- set(log.severity_number, SEVERITY_NUMBER_ERROR) where
IsMatch(log.cache["0"], "(?i)(error|err)")
- set(log.severity_text, "error") where log.severity_number ==
SEVERITY_NUMBER_ERROR
# Infer: detect WARN
- set(log.severity_number, SEVERITY_NUMBER_WARN) where
IsMatch(log.cache["0"], "(?i)(warn|notice)")
- set(log.severity_text, "warn") where log.severity_number ==
SEVERITY_NUMBER_WARN
# Infer: detect DEBUG
- set(log.severity_number, SEVERITY_NUMBER_DEBUG) where
IsMatch(log.cache["0"], "(?i)(debug|dbug)")
- set(log.severity_text, "debug") where log.severity_number ==
SEVERITY_NUMBER_DEBUG
# Infer: detect TRACE
- set(log.severity_number, SEVERITY_NUMBER_TRACE) where
IsMatch(log.cache["0"], "(?i)(trace)")
- set(log.severity_text, "trace") where log.severity_number ==
SEVERITY_NUMBER_TRACE
# Infer: else
- set(log.severity_text, "info") where log.severity_number == 0
- set(log.severity_number, SEVERITY_NUMBER_INFO) where
log.severity_number == 0
- context: log
error_mode: ignore
statements:
# Normalize the severity_text case
- set(log.severity_text, ConvertCase(log.severity_text, "lower"))
resourcedetection:
detectors:
- env
- system
- docker
timeout: 5s
override: false
batch:
memory_limiter:
# 80% of maximum memory up to 2G
limit_mib: 1500
# 25% of limit up to 2G
spike_limit_mib: 512
check_interval: 5s
connectors:
routing/logs:
default_pipelines: [logs/out-default]
error_mode: ignore
table:
- context: log
statement: route() where IsMatch(attributes["rr-web.event"], ".*")
pipelines: [logs/out-rrweb]
exporters:
debug:
verbosity: detailed
sampling_initial: 5
sampling_thereafter: 200
clickhouse/rrweb:
database: ${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_DATABASE}
endpoint: ${env:CLICKHOUSE_ENDPOINT}
password: ${env:CLICKHOUSE_PASSWORD}
username: ${env:CLICKHOUSE_USER}
ttl: 720h
logs_table_name: hyperdx_sessions
timeout: 5s
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
clickhouse:
database: ${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_DATABASE}
endpoint: ${env:CLICKHOUSE_ENDPOINT}
password: ${env:CLICKHOUSE_PASSWORD}
username: ${env:CLICKHOUSE_USER}
ttl: 720h
timeout: 5s
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
extensions:
health_check:
endpoint: :13133
service:
telemetry:
metrics:
readers:
- pull:
exporter:
prometheus:
host: '0.0.0.0'
port: 8888
logs:
level: ${HYPERDX_LOG_LEVEL}
extensions: [health_check]
pipelines:
traces:
# receivers: [otlp/hyperdx]
processors: [memory_limiter, batch]
exporters: [clickhouse]
metrics:
# receivers: [otlp/hyperdx, prometheus]
processors: [memory_limiter, batch]
exporters: [clickhouse]
logs/in:
# receivers: [otlp/hyperdx, fluentforward]
exporters: [routing/logs]
logs/out-default:
receivers: [routing/logs]
processors: [memory_limiter, transform, batch]
exporters: [clickhouse]
logs/out-rrweb:
receivers: [routing/logs]
processors: [memory_limiter, batch]
exporters: [clickhouse/rrweb]

View file

@ -1,30 +1,4 @@
receivers:
# Troubleshooting
prometheus:
config:
scrape_configs:
- job_name: 'otelcol'
scrape_interval: 30s
static_configs:
- targets:
- '0.0.0.0:8888'
- ${env:CLICKHOUSE_PROMETHEUS_METRICS_ENDPOINT}
# Data sources: logs
fluentforward:
endpoint: '0.0.0.0:24225'
# Configured via OpAMP w/ authentication
# Data sources: traces, metrics, logs
# otlp/hyperdx:
# protocols:
# grpc:
# include_metadata: true
# endpoint: '0.0.0.0:4317'
# http:
# cors:
# allowed_origins: ['*']
# allowed_headers: ['*']
# include_metadata: true
# endpoint: '0.0.0.0:4318'
# Receivers are now configured dynamically in opampController.ts
processors:
transform:
log_statements:
@ -33,10 +7,8 @@ processors:
statements:
# JSON parsing: Extends log attributes with the fields from structured log body content, either as an OTEL map or
# as a string containing JSON content.
- set(log.cache, ExtractPatterns(log.body, "(?P<0>(\\{.*\\}))")) where
IsString(log.body)
- merge_maps(log.attributes, ParseJSON(log.cache["0"]), "upsert")
where IsMap(log.cache)
- set(log.cache, ExtractPatterns(log.body, "(?P<0>(\\{.*\\}))")) where IsString(log.body)
- merge_maps(log.attributes, ParseJSON(log.cache["0"]), "upsert") where IsMap(log.cache)
- flatten(log.attributes) where IsMap(log.cache)
- merge_maps(log.attributes, log.body, "upsert") where IsMap(log.body)
- context: log
@ -45,37 +17,24 @@ processors:
- severity_number == 0 and severity_text == ""
statements:
# Infer: extract the first log level keyword from the first 256 characters of the body
- set(log.cache["substr"], log.body.string) where Len(log.body.string)
< 256
- set(log.cache["substr"], Substring(log.body.string, 0, 256)) where
Len(log.body.string) >= 256
- set(log.cache, ExtractPatterns(log.cache["substr"],
"(?i)(?P<0>(alert|crit|emerg|fatal|error|err|warn|notice|debug|dbug|trace))"))
- set(log.cache["substr"], log.body.string) where Len(log.body.string) < 256
- set(log.cache["substr"], Substring(log.body.string, 0, 256)) where Len(log.body.string) >= 256
- set(log.cache, ExtractPatterns(log.cache["substr"], "(?i)(?P<0>(alert|crit|emerg|fatal|error|err|warn|notice|debug|dbug|trace))"))
# Infer: detect FATAL
- set(log.severity_number, SEVERITY_NUMBER_FATAL) where
IsMatch(log.cache["0"], "(?i)(alert|crit|emerg|fatal)")
- set(log.severity_text, "fatal") where log.severity_number ==
SEVERITY_NUMBER_FATAL
- set(log.severity_number, SEVERITY_NUMBER_FATAL) where IsMatch(log.cache["0"], "(?i)(alert|crit|emerg|fatal)")
- set(log.severity_text, "fatal") where log.severity_number == SEVERITY_NUMBER_FATAL
# Infer: detect ERROR
- set(log.severity_number, SEVERITY_NUMBER_ERROR) where
IsMatch(log.cache["0"], "(?i)(error|err)")
- set(log.severity_text, "error") where log.severity_number ==
SEVERITY_NUMBER_ERROR
- set(log.severity_number, SEVERITY_NUMBER_ERROR) where IsMatch(log.cache["0"], "(?i)(error|err)")
- set(log.severity_text, "error") where log.severity_number == SEVERITY_NUMBER_ERROR
# Infer: detect WARN
- set(log.severity_number, SEVERITY_NUMBER_WARN) where
IsMatch(log.cache["0"], "(?i)(warn|notice)")
- set(log.severity_text, "warn") where log.severity_number ==
SEVERITY_NUMBER_WARN
- set(log.severity_number, SEVERITY_NUMBER_WARN) where IsMatch(log.cache["0"], "(?i)(warn|notice)")
- set(log.severity_text, "warn") where log.severity_number == SEVERITY_NUMBER_WARN
# Infer: detect DEBUG
- set(log.severity_number, SEVERITY_NUMBER_DEBUG) where
IsMatch(log.cache["0"], "(?i)(debug|dbug)")
- set(log.severity_text, "debug") where log.severity_number ==
SEVERITY_NUMBER_DEBUG
- set(log.severity_number, SEVERITY_NUMBER_DEBUG) where IsMatch(log.cache["0"], "(?i)(debug|dbug)")
- set(log.severity_text, "debug") where log.severity_number == SEVERITY_NUMBER_DEBUG
# Infer: detect TRACE
- set(log.severity_number, SEVERITY_NUMBER_TRACE) where
IsMatch(log.cache["0"], "(?i)(trace)")
- set(log.severity_text, "trace") where log.severity_number ==
SEVERITY_NUMBER_TRACE
- set(log.severity_number, SEVERITY_NUMBER_TRACE) where IsMatch(log.cache["0"], "(?i)(trace)")
- set(log.severity_text, "trace") where log.severity_number == SEVERITY_NUMBER_TRACE
# Infer: else
- set(log.severity_text, "info") where log.severity_number == 0
- set(log.severity_number, SEVERITY_NUMBER_INFO) where
@ -99,44 +58,7 @@ processors:
# 25% of limit up to 2G
spike_limit_mib: 512
check_interval: 5s
connectors:
routing/logs:
default_pipelines: [logs/out-default]
error_mode: ignore
table:
- context: log
statement: route() where IsMatch(attributes["rr-web.event"], ".*")
pipelines: [logs/out-rrweb]
exporters:
debug:
verbosity: detailed
sampling_initial: 5
sampling_thereafter: 200
clickhouse/rrweb:
database: ${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_DATABASE}
endpoint: ${env:CLICKHOUSE_ENDPOINT}
password: ${env:CLICKHOUSE_PASSWORD}
username: ${env:CLICKHOUSE_USER}
ttl: 720h
logs_table_name: hyperdx_sessions
timeout: 5s
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
clickhouse:
database: ${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_DATABASE}
endpoint: ${env:CLICKHOUSE_ENDPOINT}
password: ${env:CLICKHOUSE_PASSWORD}
username: ${env:CLICKHOUSE_USER}
ttl: 720h
timeout: 5s
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
# Connectors and exporters are now configured dynamically in opampController.ts
extensions:
health_check:
endpoint: :13133
@ -152,23 +74,4 @@ service:
logs:
level: ${HYPERDX_LOG_LEVEL}
extensions: [health_check]
pipelines:
traces:
# receivers: [otlp/hyperdx]
processors: [memory_limiter, batch]
exporters: [clickhouse]
metrics:
# receivers: [otlp/hyperdx, prometheus]
processors: [memory_limiter, batch]
exporters: [clickhouse]
logs/in:
# receivers: [otlp/hyperdx, fluentforward]
exporters: [routing/logs]
logs/out-default:
receivers: [routing/logs]
processors: [memory_limiter, transform, batch]
exporters: [clickhouse]
logs/out-rrweb:
receivers: [routing/logs]
processors: [memory_limiter, batch]
exporters: [clickhouse/rrweb]
# Pipelines are now configured dynamically in opampController.ts

View file

@ -0,0 +1,25 @@
receivers:
hostmetrics:
collection_interval: 5s
scrapers:
cpu:
load:
memory:
disk:
filesystem:
network:
# override the default processors
# processors:
# batch:
# send_batch_size: 10000
# timeout: 10s
# memory_limiter:
# limit_mib: 2000
service:
pipelines:
metrics/hostmetrics:
receivers: [hostmetrics]
# attach existing processors
processors: [memory_limiter, batch]
# attach existing exporters
exporters: [clickhouse]

View file

@ -0,0 +1,9 @@
#!/bin/sh
set -e
# Start log rotation script in background for agent.log
# Arguments: log_file_path [max_size_mb] [max_archives] [check_interval_seconds]
/log-rotator.sh /etc/otel/supervisor-data/agent.log 16 1 60 &
# Execute the supervisor with all passed arguments
exec "$@"

View file

@ -0,0 +1,49 @@
#!/bin/sh
# Generic log rotation script that rotates any log file when it exceeds a certain size
# Usage: log-rotator.sh <log_file_path> [max_size_mb] [max_archives] [check_interval_seconds]
# Parse arguments
LOG_FILE="${1}"
MAX_SIZE_MB="${2:-100}"
MAX_ARCHIVES="${3:-5}"
CHECK_INTERVAL="${4:-300}"
# Validate required argument
if [ -z "$LOG_FILE" ]; then
echo "Error: Log file path is required"
echo "Usage: $0 <log_file_path> [max_size_mb] [max_archives] [check_interval_seconds]"
exit 1
fi
MAX_SIZE_BYTES=$((MAX_SIZE_MB * 1024 * 1024))
echo "Starting log rotation for: $LOG_FILE"
echo "Settings: Max size=${MAX_SIZE_MB}MB, Max archives=${MAX_ARCHIVES}, Check interval=${CHECK_INTERVAL}s"
while true; do
sleep "$CHECK_INTERVAL"
if [ -f "$LOG_FILE" ]; then
SIZE=$(stat -c%s "$LOG_FILE" 2>/dev/null || stat -f%z "$LOG_FILE" 2>/dev/null || echo 0)
if [ "$SIZE" -gt "$MAX_SIZE_BYTES" ]; then
echo "Rotating $LOG_FILE (size: $SIZE bytes)"
# Rotate existing archives
for i in $(seq $((MAX_ARCHIVES - 1)) -1 1); do
if [ -f "${LOG_FILE}.${i}" ]; then
mv "${LOG_FILE}.${i}" "${LOG_FILE}.$((i + 1))"
fi
done
# Move current log to .1
cp "$LOG_FILE" "${LOG_FILE}.1"
> "$LOG_FILE" # Truncate the original file
# Remove oldest archive if it exists
if [ -f "${LOG_FILE}.${MAX_ARCHIVES}" ]; then
rm "${LOG_FILE}.${MAX_ARCHIVES}"
fi
fi
fi
done

View file

@ -20,6 +20,7 @@ agent:
executable: /otelcontribcol
config_files:
- /etc/otelcol-contrib/config.yaml
- ${env:CUSTOM_OTELCOL_CONFIG_FILE}
args:
- ${env:OTEL_AGENT_FEATURE_GATE_ARG}
passthrough_logs: ${env:OTEL_SUPERVISOR_PASSTHROUGH_LOGS}

View file

@ -47,6 +47,10 @@ export async function createTeam({
return team;
}
export function getAllTeams(fields?: string[]) {
return Team.find({}, fields);
}
export function getTeam(id?: string | ObjectId, fields?: string[]) {
if (config.IS_LOCAL_APP_MODE) {
return LOCAL_APP_TEAM;

View file

@ -1,7 +1,8 @@
import { Request, Response } from 'express';
import * as config from '@/config';
import { getTeam } from '@/controllers/team';
import { getAllTeams } from '@/controllers/team';
import type { ITeam } from '@/models/team';
import logger from '@/utils/logger';
import { agentService } from '../services/agentService';
@ -37,17 +38,261 @@ type CollectorConfig = {
};
};
};
prometheus?: {
config: {
scrape_configs: Array<{
job_name: string;
scrape_interval: string;
static_configs: Array<{
targets: string[];
}>;
}>;
};
};
fluentforward?: {
endpoint: string;
};
nop?: null;
'routing/logs'?: string[];
};
connectors?: {
'routing/logs'?: {
default_pipelines: string[];
error_mode: string;
table: Array<{
context: string;
statement: string;
pipelines: string[];
}>;
};
};
exporters?: {
debug?: {
verbosity: string;
sampling_initial: number;
sampling_thereafter: number;
};
'clickhouse/rrweb'?: {
endpoint: string;
database: string;
username: string;
password: string;
ttl: string;
logs_table_name: string;
timeout: string;
retry_on_failure: {
enabled: boolean;
initial_interval: string;
max_interval: string;
max_elapsed_time: string;
};
};
clickhouse?: {
endpoint: string;
database: string;
username: string;
password: string;
ttl: string;
timeout: string;
retry_on_failure: {
enabled: boolean;
initial_interval: string;
max_interval: string;
max_elapsed_time: string;
};
};
};
service: {
extensions: string[];
pipelines: {
[key: string]: {
receivers: string[];
receivers?: string[];
processors?: string[];
exporters?: string[];
};
};
};
};
export const buildOtelCollectorConfig = (teams: ITeam[]): CollectorConfig => {
const apiKeys = teams.filter(team => team.apiKey).map(team => team.apiKey);
const collectorAuthenticationEnforced =
teams[0]?.collectorAuthenticationEnforced;
if (apiKeys && apiKeys.length > 0) {
// Build full configuration with all team API keys
const otelCollectorConfig: CollectorConfig = {
extensions: {},
receivers: {
'otlp/hyperdx': {
protocols: {
grpc: {
endpoint: '0.0.0.0:4317',
include_metadata: true,
},
http: {
endpoint: '0.0.0.0:4318',
cors: {
allowed_origins: ['*'],
allowed_headers: ['*'],
},
include_metadata: true,
},
},
},
prometheus: {
config: {
scrape_configs: [
{
job_name: 'otelcol',
scrape_interval: '30s',
static_configs: [
{
targets: [
'0.0.0.0:8888',
'${env:CLICKHOUSE_PROMETHEUS_METRICS_ENDPOINT}',
],
},
],
},
],
},
},
fluentforward: {
endpoint: '0.0.0.0:24225',
},
},
connectors: {
'routing/logs': {
default_pipelines: ['logs/out-default'],
error_mode: 'ignore',
table: [
{
context: 'log',
statement:
'route() where IsMatch(attributes["rr-web.event"], ".*")',
pipelines: ['logs/out-rrweb'],
},
],
},
},
exporters: {
debug: {
verbosity: 'detailed',
sampling_initial: 5,
sampling_thereafter: 200,
},
'clickhouse/rrweb': {
endpoint: '${env:CLICKHOUSE_ENDPOINT}',
database: '${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_DATABASE}',
username: '${env:CLICKHOUSE_USER}',
password: '${env:CLICKHOUSE_PASSWORD}',
ttl: '720h',
logs_table_name: 'hyperdx_sessions',
timeout: '5s',
retry_on_failure: {
enabled: true,
initial_interval: '5s',
max_interval: '30s',
max_elapsed_time: '300s',
},
},
clickhouse: {
endpoint: '${env:CLICKHOUSE_ENDPOINT}',
database: '${env:HYPERDX_OTEL_EXPORTER_CLICKHOUSE_DATABASE}',
username: '${env:CLICKHOUSE_USER}',
password: '${env:CLICKHOUSE_PASSWORD}',
ttl: '720h',
timeout: '5s',
retry_on_failure: {
enabled: true,
initial_interval: '5s',
max_interval: '30s',
max_elapsed_time: '300s',
},
},
},
service: {
extensions: [],
pipelines: {
traces: {
receivers: ['otlp/hyperdx'],
processors: ['memory_limiter', 'batch'],
exporters: ['clickhouse'],
},
metrics: {
receivers: ['otlp/hyperdx', 'prometheus'],
processors: ['memory_limiter', 'batch'],
exporters: ['clickhouse'],
},
'logs/in': {
receivers: ['otlp/hyperdx', 'fluentforward'],
exporters: ['routing/logs'],
},
'logs/out-default': {
receivers: ['routing/logs'],
processors: ['memory_limiter', 'transform', 'batch'],
exporters: ['clickhouse'],
},
'logs/out-rrweb': {
receivers: ['routing/logs'],
processors: ['memory_limiter', 'batch'],
exporters: ['clickhouse/rrweb'],
},
},
},
};
if (collectorAuthenticationEnforced) {
if (otelCollectorConfig.receivers['otlp/hyperdx'] == null) {
// should never happen
throw new Error('otlp/hyperdx receiver not found');
}
otelCollectorConfig.extensions['bearertokenauth/hyperdx'] = {
scheme: '',
tokens: apiKeys,
};
otelCollectorConfig.receivers['otlp/hyperdx'].protocols.grpc.auth = {
authenticator: 'bearertokenauth/hyperdx',
};
otelCollectorConfig.receivers['otlp/hyperdx'].protocols.http.auth = {
authenticator: 'bearertokenauth/hyperdx',
};
otelCollectorConfig.service.extensions = ['bearertokenauth/hyperdx'];
}
return otelCollectorConfig;
}
// If no apiKeys are found, return NOP config
// This is later merged with otel-collector/config.yaml
// we need to instantiate a valid config so the collector
// can at least start up
return {
extensions: {},
receivers: {
nop: null,
},
connectors: {},
exporters: {},
service: {
extensions: [],
pipelines: {
traces: {
receivers: ['nop'],
},
metrics: {
receivers: ['nop'],
},
logs: {
receivers: ['nop'],
},
},
},
};
};
export class OpampController {
/**
* Handle an OpAMP message from an agent
@ -86,96 +331,11 @@ export class OpampController {
// Check if we should send a remote configuration
if (agentService.agentAcceptsRemoteConfig(agent)) {
const team = await getTeam();
// This is later merged with otel-collector/config.yaml
// we need to instantiate a valid config so the collector
// can at least start up
const NOP_CONFIG: CollectorConfig = {
extensions: {},
receivers: {
nop: null,
},
service: {
extensions: [],
pipelines: {
traces: {
receivers: ['nop'],
},
metrics: {
receivers: ['nop'],
},
'logs/in': {
receivers: ['nop'],
},
},
},
};
let otelCollectorConfig = NOP_CONFIG;
// If team is not found, don't send a remoteConfig, we aren't ready
// to collect telemetry yet
if (team) {
otelCollectorConfig = {
extensions: {},
receivers: {
'otlp/hyperdx': {
protocols: {
grpc: {
endpoint: '0.0.0.0:4317',
include_metadata: true,
},
http: {
endpoint: '0.0.0.0:4318',
cors: {
allowed_origins: ['*'],
allowed_headers: ['*'],
},
include_metadata: true,
},
},
},
},
service: {
extensions: [],
pipelines: {
traces: {
receivers: ['otlp/hyperdx'],
},
metrics: {
receivers: ['otlp/hyperdx', 'prometheus'],
},
'logs/in': {
receivers: ['otlp/hyperdx', 'fluentforward'],
},
},
},
};
if (team.collectorAuthenticationEnforced) {
const ingestionKey = team.apiKey;
if (otelCollectorConfig.receivers['otlp/hyperdx'] == null) {
// should never happen
throw new Error('otlp/hyperdx receiver not found');
}
otelCollectorConfig.extensions['bearertokenauth/hyperdx'] = {
scheme: '',
tokens: [ingestionKey],
};
otelCollectorConfig.receivers['otlp/hyperdx'].protocols.grpc.auth =
{
authenticator: 'bearertokenauth/hyperdx',
};
otelCollectorConfig.receivers['otlp/hyperdx'].protocols.http.auth =
{
authenticator: 'bearertokenauth/hyperdx',
};
otelCollectorConfig.service.extensions = [
'bearertokenauth/hyperdx',
];
}
}
const teams = await getAllTeams([
'apiKey',
'collectorAuthenticationEnforced',
]);
const otelCollectorConfig = buildOtelCollectorConfig(teams);
if (config.IS_DEV) {
console.log(JSON.stringify(otelCollectorConfig, null, 2));

View file

@ -21,7 +21,7 @@ services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.129.1
volumes:
- ../../docker/otel-collector/config.yaml:/etc/otelcol-contrib/config.yaml
- ../../docker/otel-collector/config.deprecated.yaml:/etc/otelcol-contrib/config.yaml
- ./receiver-config.yaml:/etc/otelcol-contrib/receiver-config.yaml
command:
[