diff --git a/.changeset/mean-pumpkins-greet.md b/.changeset/mean-pumpkins-greet.md new file mode 100644 index 00000000..64717143 --- /dev/null +++ b/.changeset/mean-pumpkins-greet.md @@ -0,0 +1,5 @@ +--- +"@hyperdx/api": patch +--- + +Add ingestion key authentication in OTel collector via OpAMP diff --git a/.vscode/settings.json b/.vscode/settings.json index 79cc070c..07f85571 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -32,5 +32,5 @@ "**/yarn.lock": true, "**/yarn-*.cjs": true }, - "cSpell.words": ["micropip", "pyimport", "pyodide"] + "cSpell.words": ["micropip", "opamp", "pyimport", "pyodide"] } diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 5af5b6f0..df5ff167 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -19,14 +19,19 @@ services: depends_on: - otel-collector otel-collector: - image: otel/opentelemetry-collector-contrib:0.120.0 + # image: otel/opentelemetry-collector-contrib:0.120.0 + build: + context: ./docker/otel-collector + target: dev environment: CLICKHOUSE_PROMETHEUS_METRICS_ENDPOINT: 'ch-server:9363' CLICKHOUSE_ENDPOINT: 'tcp://ch-server:9000?dial_timeout=10s' HYPERDX_API_KEY: ${HYPERDX_API_KEY} HYPERDX_LOG_LEVEL: ${HYPERDX_LOG_LEVEL} + OPAMP_SERVER_URL: 'http://host.docker.internal:4320' volumes: - ./docker/otel-collector/config.yaml:/etc/otelcol-contrib/config.yaml + - ./docker/otel-collector/supervisor_docker.yaml:/etc/otel/supervisor.yaml ports: - '13133:13133' # health_check extension - '24225:24225' # fluentd receiver @@ -57,7 +62,9 @@ services: - internal healthcheck: # "clickhouse", "client", "-u ${CLICKHOUSE_USER}", "--password ${CLICKHOUSE_PASSWORD}", "-q 'SELECT 1'" - test: wget --no-verbose --tries=1 http://127.0.0.1:8123/ping || exit 1 + test: + wget -O /dev/null --no-verbose --tries=1 http://127.0.0.1:8123/ping || + exit 1 interval: 1s timeout: 1s retries: 60 diff --git a/docker/otel-collector/Dockerfile b/docker/otel-collector/Dockerfile index 0b018b83..ec74071a 100644 --- a/docker/otel-collector/Dockerfile +++ b/docker/otel-collector/Dockerfile @@ -1,18 +1,40 @@ ## base ############################################################################################# -FROM otel/opentelemetry-collector-contrib:0.120.0 AS base +FROM otel/opentelemetry-collector-contrib:0.126.0 AS col +FROM ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-opampsupervisor:0.126.0 AS supervisor +# From: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/aa5c3aa4c7ec174361fcaf908de8eaca72263078/cmd/opampsupervisor/Dockerfile#L18 +FROM alpine:latest@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c AS prep +RUN apk --update add ca-certificates +RUN mkdir -p /etc/otel/supervisor-data/ + +FROM scratch AS base + +ARG USER_UID=10001 +ARG USER_GID=10001 +USER ${USER_UID}:${USER_GID} + +COPY --from=prep /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=prep --chmod=777 --chown=${USER_UID}:${USER_GID} /etc/otel/supervisor-data /etc/otel/supervisor-data +COPY --from=supervisor --chmod=755 /usr/local/bin/opampsupervisor / +COPY ./supervisor_docker.yaml /etc/otel/supervisor.yaml +COPY --from=col --chmod=755 /otelcol-contrib /otelcontribcol ## dev ############################################################################################## -FROM base as dev +FROM base AS dev COPY ./config.yaml /etc/otelcol-contrib/config.yaml EXPOSE 4317 4318 13133 +ENTRYPOINT ["/opampsupervisor"] +CMD ["--config", "/etc/otel/supervisor.yaml"] ## prod ############################################################################################# -FROM base as prod +FROM base AS prod COPY ./config.yaml /etc/otelcol-contrib/config.yaml EXPOSE 4317 4318 13133 + +ENTRYPOINT ["/opampsupervisor"] +CMD ["--config", "/etc/otel/supervisor.yaml"] diff --git a/docker/otel-collector/config.yaml b/docker/otel-collector/config.yaml index 7ff90bf4..1672e72b 100644 --- a/docker/otel-collector/config.yaml +++ b/docker/otel-collector/config.yaml @@ -12,18 +12,19 @@ receivers: # Data sources: logs fluentforward: endpoint: '0.0.0.0:24225' + # Configured via OpAMP w/ authentication # Data sources: traces, metrics, logs - otlp: - protocols: - grpc: - include_metadata: true - endpoint: '0.0.0.0:4317' - http: - cors: - allowed_origins: ['*'] - allowed_headers: ['*'] - include_metadata: true - endpoint: '0.0.0.0:4318' + # otlp/hyperdx: + # protocols: + # grpc: + # include_metadata: true + # endpoint: '0.0.0.0:4317' + # http: + # cors: + # allowed_origins: ['*'] + # allowed_headers: ['*'] + # include_metadata: true + # endpoint: '0.0.0.0:4318' processors: transform: log_statements: @@ -32,8 +33,10 @@ processors: statements: # JSON parsing: Extends log attributes with the fields from structured log body content, either as an OTEL map or # as a string containing JSON content. - - set(log.cache, ExtractPatterns(log.body, "(?P<0>(\\{.*\\}))")) where IsString(log.body) - - merge_maps(log.attributes, ParseJSON(log.cache["0"]), "upsert") where IsMap(log.cache) + - set(log.cache, ExtractPatterns(log.body, "(?P<0>(\\{.*\\}))")) where + IsString(log.body) + - merge_maps(log.attributes, ParseJSON(log.cache["0"]), "upsert") + where IsMap(log.cache) - flatten(log.attributes) where IsMap(log.cache) - merge_maps(log.attributes, log.body, "upsert") where IsMap(log.body) - context: log @@ -42,24 +45,37 @@ processors: - severity_number == 0 and severity_text == "" statements: # Infer: extract the first log level keyword from the first 256 characters of the body - - set(log.cache["substr"], log.body.string) where Len(log.body.string) < 256 - - set(log.cache["substr"], Substring(log.body.string, 0, 256)) where Len(log.body.string) >= 256 - - set(log.cache, ExtractPatterns(log.cache["substr"], "(?i)(?P<0>(alert|crit|emerg|fatal|error|err|warn|notice|debug|dbug|trace))")) + - set(log.cache["substr"], log.body.string) where Len(log.body.string) + < 256 + - set(log.cache["substr"], Substring(log.body.string, 0, 256)) where + Len(log.body.string) >= 256 + - set(log.cache, ExtractPatterns(log.cache["substr"], + "(?i)(?P<0>(alert|crit|emerg|fatal|error|err|warn|notice|debug|dbug|trace))")) # Infer: detect FATAL - - set(log.severity_number, SEVERITY_NUMBER_FATAL) where IsMatch(log.cache["0"], "(?i)(alert|crit|emerg|fatal)") - - set(log.severity_text, "fatal") where log.severity_number == SEVERITY_NUMBER_FATAL + - set(log.severity_number, SEVERITY_NUMBER_FATAL) where + IsMatch(log.cache["0"], "(?i)(alert|crit|emerg|fatal)") + - set(log.severity_text, "fatal") where log.severity_number == + SEVERITY_NUMBER_FATAL # Infer: detect ERROR - - set(log.severity_number, SEVERITY_NUMBER_ERROR) where IsMatch(log.cache["0"], "(?i)(error|err)") - - set(log.severity_text, "error") where log.severity_number == SEVERITY_NUMBER_ERROR + - set(log.severity_number, SEVERITY_NUMBER_ERROR) where + IsMatch(log.cache["0"], "(?i)(error|err)") + - set(log.severity_text, "error") where log.severity_number == + SEVERITY_NUMBER_ERROR # Infer: detect WARN - - set(log.severity_number, SEVERITY_NUMBER_WARN) where IsMatch(log.cache["0"], "(?i)(warn|notice)") - - set(log.severity_text, "warn") where log.severity_number == SEVERITY_NUMBER_WARN + - set(log.severity_number, SEVERITY_NUMBER_WARN) where + IsMatch(log.cache["0"], "(?i)(warn|notice)") + - set(log.severity_text, "warn") where log.severity_number == + SEVERITY_NUMBER_WARN # Infer: detect DEBUG - - set(log.severity_number, SEVERITY_NUMBER_DEBUG) where IsMatch(log.cache["0"], "(?i)(debug|dbug)") - - set(log.severity_text, "debug") where log.severity_number == SEVERITY_NUMBER_DEBUG + - set(log.severity_number, SEVERITY_NUMBER_DEBUG) where + IsMatch(log.cache["0"], "(?i)(debug|dbug)") + - set(log.severity_text, "debug") where log.severity_number == + SEVERITY_NUMBER_DEBUG # Infer: detect TRACE - - set(log.severity_number, SEVERITY_NUMBER_TRACE) where IsMatch(log.cache["0"], "(?i)(trace)") - - set(log.severity_text, "trace") where log.severity_number == SEVERITY_NUMBER_TRACE + - set(log.severity_number, SEVERITY_NUMBER_TRACE) where + IsMatch(log.cache["0"], "(?i)(trace)") + - set(log.severity_text, "trace") where log.severity_number == + SEVERITY_NUMBER_TRACE # Infer: else - set(log.severity_text, "info") where log.severity_number == 0 - set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_number == 0 @@ -132,15 +148,15 @@ service: extensions: [health_check] pipelines: traces: - receivers: [otlp] + # receivers: [otlp/hyperdx] processors: [memory_limiter, batch] exporters: [clickhouse] metrics: - receivers: [otlp, prometheus] + # receivers: [otlp/hyperdx, prometheus] processors: [memory_limiter, batch] exporters: [clickhouse] logs/in: - receivers: [otlp, fluentforward] + # receivers: [otlp/hyperdx, fluentforward] exporters: [routing/logs] logs/out-default: receivers: [routing/logs] diff --git a/docker/otel-collector/supervisor_docker.yaml b/docker/otel-collector/supervisor_docker.yaml new file mode 100644 index 00000000..8d39a32a --- /dev/null +++ b/docker/otel-collector/supervisor_docker.yaml @@ -0,0 +1,26 @@ +# https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/cmd/opampsupervisor/specification/README.md#supervisor-configuration +server: + endpoint: ${OPAMP_SERVER_URL}/v1/opamp + tls: + # Disable verification to test locally. + # Don't do this in production. + insecure_skip_verify: true + # For more TLS settings see config/configtls.ClientConfig + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_own_logs: true + reports_own_traces: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + +agent: + executable: /otelcontribcol + config_files: + - /etc/otelcol-contrib/config.yaml + # passthrough_logs: true # enable to debug collector logs, can crash collector due to perf issues with this flag enabled + +storage: + directory: /etc/otel/supervisor-data/ diff --git a/package.json b/package.json index 458bb181..cc34aef3 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "app:lint": "nx run @hyperdx/app:ci:lint", "dev": "docker compose -f docker-compose.dev.yml up -d && yarn app:dev && docker compose -f docker-compose.dev.yml down", "dev:down": "docker compose -f docker-compose.dev.yml down", + "dev:compose": "docker compose -f docker-compose.dev.yml", "lint": "npx nx run-many -t ci:lint", "version": "make version", "release": "npx changeset tag && npx changeset publish" diff --git a/packages/api/.env.development b/packages/api/.env.development index 24178d05..fa516dbb 100644 --- a/packages/api/.env.development +++ b/packages/api/.env.development @@ -1,5 +1,6 @@ HYPERDX_API_KEY="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" HYPERDX_API_PORT=8000 +HYPERDX_OPAMP_PORT=4320 HYPERDX_APP_PORT=8080 HYPERDX_LOG_LEVEL=debug EXPRESS_SESSION_SECRET="hyperdx is cool 👋" @@ -15,6 +16,7 @@ NODE_ENV=development OTEL_SERVICE_NAME="hdx-oss-dev-api" OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318" PORT=${HYPERDX_API_PORT} +OPAMP_PORT=${HYPERDX_OPAMP_PORT} REDIS_URL=redis://localhost:6379 USAGE_STATS_ENABLED=false NODE_OPTIONS="--max-http-header-size=131072" diff --git a/packages/api/.env.test b/packages/api/.env.test index dfad1ddd..84478335 100644 --- a/packages/api/.env.test +++ b/packages/api/.env.test @@ -7,3 +7,4 @@ FRONTEND_URL=http://app:8080 MONGO_URI=mongodb://localhost:29999/hyperdx-test NODE_ENV=test PORT=9000 +OPAMP_PORT=4320 diff --git a/packages/api/package.json b/packages/api/package.json index 68e8f9ed..f05a121f 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -15,6 +15,7 @@ "@opentelemetry/host-metrics": "^0.35.5", "@opentelemetry/sdk-metrics": "^1.30.1", "@slack/webhook": "^6.1.0", + "@types/node": "^22.15.18", "axios": "^1.6.2", "compression": "^1.7.4", "connect-mongo": "^4.6.0", @@ -43,6 +44,7 @@ "passport-local-mongoose": "^6.1.0", "pluralize": "^8.0.0", "promised-handlebars": "^2.0.1", + "protobufjs": "^7.5.2", "semver": "^7.5.2", "serialize-error": "^8.1.0", "sqlstring": "^2.3.3", diff --git a/packages/api/src/config.ts b/packages/api/src/config.ts index 657be9f4..657383f2 100644 --- a/packages/api/src/config.ts +++ b/packages/api/src/config.ts @@ -24,6 +24,7 @@ export const MINER_API_URL = env.MINER_API_URL as string; export const MONGO_URI = env.MONGO_URI; export const OTEL_SERVICE_NAME = env.OTEL_SERVICE_NAME as string; export const PORT = Number.parseInt(env.PORT as string); +export const OPAMP_PORT = Number.parseInt(env.OPAMP_PORT as string); export const USAGE_STATS_ENABLED = env.USAGE_STATS_ENABLED !== 'false'; export const RUN_SCHEDULED_TASKS_EXTERNALLY = env.RUN_SCHEDULED_TASKS_EXTERNALLY === 'true'; diff --git a/packages/api/src/controllers/team.ts b/packages/api/src/controllers/team.ts index 1a6e21ca..798c724f 100644 --- a/packages/api/src/controllers/team.ts +++ b/packages/api/src/controllers/team.ts @@ -14,6 +14,7 @@ export const LOCAL_APP_TEAM = { // Placeholder keys hookId: uuidv4(), apiKey: uuidv4(), + collectorAuthenticationEnforced: false, toJSON() { return this; }, @@ -28,24 +29,30 @@ export async function isTeamExisting() { return teamCount > 0; } -export async function createTeam({ name }: { name: string }) { +export async function createTeam({ + name, + collectorAuthenticationEnforced = true, +}: { + name: string; + collectorAuthenticationEnforced?: boolean; +}) { if (await isTeamExisting()) { throw new Error('Team already exists'); } - const team = new Team({ name }); + const team = new Team({ name, collectorAuthenticationEnforced }); await team.save(); return team; } -export function getTeam(id: string | ObjectId, fields?: string[]) { +export function getTeam(id?: string | ObjectId, fields?: string[]) { if (config.IS_LOCAL_APP_MODE) { return LOCAL_APP_TEAM; } - return Team.findById(id, fields); + return Team.findOne({}, fields); } export function getTeamByApiKey(apiKey: string) { diff --git a/packages/api/src/fixtures.ts b/packages/api/src/fixtures.ts index f123b608..aecdc48c 100644 --- a/packages/api/src/fixtures.ts +++ b/packages/api/src/fixtures.ts @@ -259,7 +259,7 @@ class MockServer extends Server { protected shouldHandleGracefulShutdown = false; getHttpServer() { - return this.httpServer; + return this.appServer; } async start(): Promise { @@ -276,15 +276,21 @@ class MockServer extends Server { stop() { return new Promise((resolve, reject) => { - this.httpServer.close(err => { + this.appServer.close(err => { if (err) { reject(err); return; } - super - .shutdown() - .then(() => resolve()) - .catch(err => reject(err)); + this.opampServer.close(err => { + if (err) { + reject(err); + return; + } + super + .shutdown() + .then(() => resolve()) + .catch(err => reject(err)); + }); }); }); } diff --git a/packages/api/src/models/team.ts b/packages/api/src/models/team.ts index b40b64b7..fc3c5a48 100644 --- a/packages/api/src/models/team.ts +++ b/packages/api/src/models/team.ts @@ -9,6 +9,7 @@ export interface ITeam { allowedAuthMethods?: 'password'[]; apiKey: string; hookId: string; + collectorAuthenticationEnforced: boolean; } export default mongoose.model( @@ -29,6 +30,10 @@ export default mongoose.model( return uuidv4(); }, }, + collectorAuthenticationEnforced: { + type: Boolean, + default: false, + }, }, { timestamps: true, diff --git a/packages/api/src/opamp/README.md b/packages/api/src/opamp/README.md new file mode 100644 index 00000000..5f65b1da --- /dev/null +++ b/packages/api/src/opamp/README.md @@ -0,0 +1,11 @@ +Implements an HTTP OpAMP server that serves configurations to supervised +collectors. + +Spec: https://github.com/open-telemetry/opamp-spec/tree/main + +Workflow: + +- Sup pings /v1/opamp with status +- Server checks if configs should be updated +- Return new config if current config is outdated + - Config derived from team doc with ingestion api key diff --git a/packages/api/src/opamp/app.ts b/packages/api/src/opamp/app.ts new file mode 100644 index 00000000..aab13a8f --- /dev/null +++ b/packages/api/src/opamp/app.ts @@ -0,0 +1,31 @@ +import express from 'express'; + +import { appErrorHandler } from '@/middleware/error'; +import { opampController } from '@/opamp/controllers/opampController'; + +// Create Express application +const app = express(); + +app.disable('x-powered-by'); + +// Special body parser setup for OpAMP +app.use( + '/v1/opamp', + express.raw({ + type: 'application/x-protobuf', + limit: '10mb', + }), +); + +// OpAMP endpoint +app.post('/v1/opamp', opampController.handleOpampMessage.bind(opampController)); + +// Health check endpoint +app.get('/health', (req, res) => { + res.status(200).json({ status: 'OK' }); +}); + +// Error handling +app.use(appErrorHandler); + +export default app; diff --git a/packages/api/src/opamp/controllers/opampController.ts b/packages/api/src/opamp/controllers/opampController.ts new file mode 100644 index 00000000..9fa775a0 --- /dev/null +++ b/packages/api/src/opamp/controllers/opampController.ts @@ -0,0 +1,203 @@ +import { Request, Response } from 'express'; + +import { getTeam } from '@/controllers/team'; +import logger from '@/utils/logger'; + +import { agentService } from '../services/agentService'; +import { + createRemoteConfig, + decodeAgentToServer, + encodeServerToAgent, + serverCapabilities, +} from '../utils/protobuf'; + +type CollectorConfig = { + extensions: Record; + receivers: { + 'otlp/hyperdx'?: { + protocols: { + grpc: { + endpoint: string; + include_metadata: boolean; + auth?: { + authenticator: string; + }; + }; + http: { + endpoint: string; + cors: { + allowed_origins: string[]; + allowed_headers: string[]; + }; + include_metadata: boolean; + auth?: { + authenticator: string; + }; + }; + }; + }; + nop?: null; + }; + service: { + extensions: string[]; + pipelines: { + [key: string]: { + receivers: string[]; + }; + }; + }; +}; +export class OpampController { + /** + * Handle an OpAMP message from an agent + */ + public async handleOpampMessage(req: Request, res: Response): Promise { + try { + // Check content type + const contentType = req.get('Content-Type'); + if (contentType !== 'application/x-protobuf') { + res + .status(415) + .send( + 'Unsupported Media Type: Content-Type must be application/x-protobuf', + ); + return; + } + + // Decode the AgentToServer message + const agentToServer = decodeAgentToServer(req.body); + logger.debug('agentToServer', agentToServer); + logger.debug( + // @ts-ignore + `Received message from agent: ${agentToServer.instanceUid?.toString( + 'hex', + )}`, + ); + + // Process the agent status + const agent = agentService.processAgentStatus(agentToServer); + + // Prepare the response + const serverToAgent: any = { + instanceUid: agent.instanceUid, + capabilities: serverCapabilities, + }; + + // Check if we should send a remote configuration + if (agentService.agentAcceptsRemoteConfig(agent)) { + const team = await getTeam(); + // This is later merged with otel-collector/config.yaml + // we need to instantiate a valid config so the collector + // can at least start up + const NOP_CONFIG: CollectorConfig = { + extensions: {}, + receivers: { + nop: null, + }, + service: { + extensions: [], + pipelines: { + traces: { + receivers: ['nop'], + }, + metrics: { + receivers: ['nop'], + }, + 'logs/in': { + receivers: ['nop'], + }, + }, + }, + }; + let config = NOP_CONFIG; + + // If team is not found, don't send a remoteConfig, we aren't ready + // to collect telemetry yet + if (team) { + config = { + extensions: {}, + receivers: { + 'otlp/hyperdx': { + protocols: { + grpc: { + endpoint: '0.0.0.0:4317', + include_metadata: true, + }, + http: { + endpoint: '0.0.0.0:4318', + cors: { + allowed_origins: ['*'], + allowed_headers: ['*'], + }, + include_metadata: true, + }, + }, + }, + }, + service: { + extensions: [], + pipelines: { + traces: { + receivers: ['otlp/hyperdx'], + }, + metrics: { + receivers: ['otlp/hyperdx', 'prometheus'], + }, + 'logs/in': { + receivers: ['otlp/hyperdx', 'fluentforward'], + }, + }, + }, + }; + + if (team.collectorAuthenticationEnforced) { + const ingestionKey = team.apiKey; + + if (config.receivers['otlp/hyperdx'] == null) { + // should never happen + throw new Error('otlp/hyperdx receiver not found'); + } + + config.extensions['bearertokenauth/hyperdx'] = { + scheme: '', + tokens: [ingestionKey], + }; + config.receivers['otlp/hyperdx'].protocols.grpc.auth = { + authenticator: 'bearertokenauth/hyperdx', + }; + config.receivers['otlp/hyperdx'].protocols.http.auth = { + authenticator: 'bearertokenauth/hyperdx', + }; + config.service.extensions = ['bearertokenauth/hyperdx']; + } + } + + console.log(JSON.stringify(config, null, 2)); + + const remoteConfig = createRemoteConfig( + new Map([['config.json', Buffer.from(JSON.stringify(config))]]), + 'application/json', + ); + + serverToAgent.remoteConfig = remoteConfig; + logger.debug( + `Sending remote config to agent: ${agent.instanceUid.toString( + 'hex', + )}`, + ); + } + + // Encode and send the response + const encodedResponse = encodeServerToAgent(serverToAgent); + + res.setHeader('Content-Type', 'application/x-protobuf'); + res.send(encodedResponse); + } catch (error) { + logger.error('Error handling OpAMP message:', error); + res.status(500).send('Internal Server Error'); + } + } +} + +// Create a singleton instance +export const opampController = new OpampController(); diff --git a/packages/api/src/opamp/models/agent.ts b/packages/api/src/opamp/models/agent.ts new file mode 100644 index 00000000..cbab90f9 --- /dev/null +++ b/packages/api/src/opamp/models/agent.ts @@ -0,0 +1,92 @@ +export interface AgentAttribute { + key: string; + value: { + stringValue?: string; + intValue?: number; + doubleValue?: number; + boolValue?: boolean; + arrayValue?: AgentAttribute[]; + kvlistValue?: AgentAttribute[]; + bytesValue?: Buffer; + }; +} + +export interface AgentDescription { + identifyingAttributes: AgentAttribute[]; + nonIdentifyingAttributes: AgentAttribute[]; +} + +export interface RemoteConfigStatus { + lastRemoteConfigHash?: Buffer; + status: 'UNSET' | 'APPLIED' | 'APPLYING' | 'FAILED'; + errorMessage?: string; +} + +export interface EffectiveConfig { + configMap: { + [key: string]: { + body: Buffer; + contentType: string; + }; + }; +} + +export interface Agent { + instanceUid: Buffer; + sequenceNum: number; + agentDescription?: AgentDescription; + capabilities: number; + effectiveConfig?: EffectiveConfig; + remoteConfigStatus?: RemoteConfigStatus; + lastSeen: Date; + currentConfigHash?: Buffer; +} + +// TODO: Evaluate if we need to store agent state here at all +export class AgentStore { + private agents: Map = new Map(); + + /** + * Add or update an agent in the store + */ + public upsertAgent(agent: Agent): void { + const instanceUidStr = this.bufferToHex(agent.instanceUid); + this.agents.set(instanceUidStr, { + ...agent, + lastSeen: new Date(), + }); + } + + /** + * Get an agent by its instance UID + */ + public getAgent(instanceUid: Buffer): Agent | undefined { + const instanceUidStr = this.bufferToHex(instanceUid); + return this.agents.get(instanceUidStr); + } + + /** + * Get all agents in the store + */ + public getAllAgents(): Agent[] { + return Array.from(this.agents.values()); + } + + /** + * Remove an agent from the store + */ + public removeAgent(instanceUid: Buffer): boolean { + const instanceUidStr = this.bufferToHex(instanceUid); + return this.agents.delete(instanceUidStr); + } + + /** + * Convert a Buffer to a hex string for use as a map key + */ + private bufferToHex(buffer: Buffer): string { + return buffer.toString('hex'); + } +} + +// Create a singleton instance of the agent store +export const agentStore = new AgentStore(); diff --git a/packages/api/src/opamp/proto/anyvalue.proto b/packages/api/src/opamp/proto/anyvalue.proto new file mode 100644 index 00000000..8ed97315 --- /dev/null +++ b/packages/api/src/opamp/proto/anyvalue.proto @@ -0,0 +1,67 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file is copied and modified from https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/common/v1/common.proto +// Modifications: +// - Removal of unneeded InstrumentationLibrary and StringKeyValue messages. +// - Change of go_package to reference a package in this repo. +// - Removal of gogoproto usage. + +syntax = "proto3"; + +package opamp.proto; + +option go_package = "github.com/open-telemetry/opamp-go/protobufs"; + +// AnyValue is used to represent any type of attribute value. AnyValue may contain a +// primitive value such as a string or integer or it may contain an arbitrary nested +// object containing arrays, key-value lists and primitives. +message AnyValue { + // The value is one of the listed fields. It is valid for all values to be unspecified + // in which case this AnyValue is considered to be "null". + oneof value { + string string_value = 1; + bool bool_value = 2; + int64 int_value = 3; + double double_value = 4; + ArrayValue array_value = 5; + KeyValueList kvlist_value = 6; + bytes bytes_value = 7; + } +} + +// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message +// since oneof in AnyValue does not allow repeated fields. +message ArrayValue { + // Array of values. The array may be empty (contain 0 elements). + repeated AnyValue values = 1; +} + +// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message +// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need +// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to +// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches +// are semantically equivalent. +message KeyValueList { + // A collection of key/value pairs of key-value pairs. The list may be empty (may + // contain 0 elements). + repeated KeyValue values = 1; +} + +// KeyValue is a key-value pair that is used to store Span attributes, Link +// attributes, etc. +message KeyValue { + string key = 1; + AnyValue value = 2; +} diff --git a/packages/api/src/opamp/proto/opamp.proto b/packages/api/src/opamp/proto/opamp.proto new file mode 100644 index 00000000..43a0d1a9 --- /dev/null +++ b/packages/api/src/opamp/proto/opamp.proto @@ -0,0 +1,1018 @@ +// v0.12.0 +// From: https://github.com/open-telemetry/opamp-spec/blob/5ba31a0ee6f9cc3dc768ef43c913d6c6c479c522/proto/opamp.proto + +// Copyright 2021, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// OpAMP: Open Agent Management Protocol (https://github.com/open-telemetry/opamp-spec) + +syntax = "proto3"; + +package opamp.proto; + +import "anyvalue.proto"; + +option go_package = "github.com/open-telemetry/opamp-go/protobufs"; + +message AgentToServer { + // Globally unique identifier of the running instance of the Agent. SHOULD remain + // unchanged for the lifetime of the Agent process. + // MUST be 16 bytes long and SHOULD be generated using the UUID v7 spec. + bytes instance_uid = 1; + + // The sequence number is incremented by 1 for every AgentToServer sent + // by the Agent. This allows the Server to detect that it missed a message when + // it notices that the sequence_num is not exactly by 1 greater than the previously + // received one. + uint64 sequence_num = 2; + + // Data that describes the Agent, its type, where it runs, etc. + // May be omitted if nothing changed since last AgentToServer message. + AgentDescription agent_description = 3; + + // Bitmask of flags defined by AgentCapabilities enum. + // All bits that are not defined in AgentCapabilities enum MUST be set to 0 by + // the Agent. This allows extending the protocol and the AgentCapabilities enum + // in the future such that old Agents automatically report that they don't + // support the new capability. + // This field MUST be always set. + uint64 capabilities = 4; + + // The current health of the Agent and sub-components. The top-level ComponentHealth represents + // the health of the Agent overall. May be omitted if nothing changed since last AgentToServer + // message. + // Status: [Beta] + ComponentHealth health = 5; + + // The current effective configuration of the Agent. The effective configuration is + // the one that is currently used by the Agent. The effective configuration may be + // different from the remote configuration received from the Server earlier, e.g. + // because the Agent uses a local configuration instead (or in addition). + // + // This field SHOULD be unset if the effective config is unchanged since the last + // AgentToServer message. + EffectiveConfig effective_config = 6; + + // The status of the remote config that was previously received from the Server. + // This field SHOULD be unset if the remote config status is unchanged since the + // last AgentToServer message. + RemoteConfigStatus remote_config_status = 7; + + // The list of the Agent packages, including package statuses. This field SHOULD be + // unset if this information is unchanged since the last AgentToServer message for + // this Agent was sent in the stream. + // Status: [Beta] + PackageStatuses package_statuses = 8; + + // AgentDisconnect MUST be set in the last AgentToServer message sent from the + // Agent to the Server. + AgentDisconnect agent_disconnect = 9; + + // Bit flags as defined by AgentToServerFlags bit masks. + uint64 flags = 10; + + // A request to create connection settings. This field is set for flows where + // the Agent initiates the creation of connection settings. + // Status: [Development] + ConnectionSettingsRequest connection_settings_request = 11; + + // A message indicating custom capabilities supported by the Agent. + // Status: [Development] + CustomCapabilities custom_capabilities = 12; + + // A custom message sent from an Agent to the Server. + // Status: [Development] + CustomMessage custom_message = 13; + + // A message indicating the components that are available for configuration on the agent. + // Status: [Development] + AvailableComponents available_components = 14; +} + +enum AgentToServerFlags { + AgentToServerFlags_Unspecified = 0; + + // AgentToServerFlags is a bit mask. Values below define individual bits. + + // The Agent requests Server go generate a new instance_uid, which will + // be sent back in ServerToAgent message + AgentToServerFlags_RequestInstanceUid = 0x00000001; +} + +// AgentDisconnect is the last message sent from the Agent to the Server. The Server +// SHOULD forget the association of the Agent instance with the message stream. +// +// If the message stream is closed in the transport layer then the Server SHOULD +// forget association of all Agent instances that were previously established for +// this message stream using AgentConnect message, even if the corresponding +// AgentDisconnect message were not explicitly received from the Agent. +message AgentDisconnect { +} + +// ConnectionSettingsRequest is a request from the Agent to the Server to create +// and respond with an offer of connection settings for the Agent. +// Status: [Development] +message ConnectionSettingsRequest { + // Request for OpAMP connection settings. If this field is unset + // then the ConnectionSettingsRequest message is empty and is not actionable + // for the Server. + OpAMPConnectionSettingsRequest opamp = 1; + + // In the future we can add request fields for non-OpAMP connection types + // (own telemetry, other connections). +} + +// OpAMPConnectionSettingsRequest is a request for the Server to produce +// a OpAMPConnectionSettings in its response. +// Status: [Development] +message OpAMPConnectionSettingsRequest { + // A request to create a client certificate. This is used to initiate a + // Client Signing Request (CSR) flow. + // Required. + CertificateRequest certificate_request = 1; +} + +// Status: [Development] +message CertificateRequest { + // PEM-encoded Client Certificate Signing Request (CSR), signed by client's private key. + // The Server SHOULD validate the request and SHOULD respond with a + // OpAMPConnectionSettings where the certificate.cert contains the issued + // certificate. + bytes csr = 1; +} + +// AvailableComponents contains metadata relating to the components included +// within the agent. +// status: [Development] +message AvailableComponents { + // A map of a unique component ID to details about the component. + // This may be omitted from the message if the server has not + // explicitly requested it be sent by setting the ReportAvailableComponents + // flag in the previous ServerToAgent message. + map components = 1; + + // Agent-calculated hash of the components. + // This hash should be included in every AvailableComponents message. + bytes hash = 2; + } + +message ComponentDetails { + // Extra key/value pairs that may be used to describe the component. + // The key/value pairs are according to semantic conventions, see: + // https://opentelemetry.io/docs/specs/semconv/ + // + // For example, you may use the "code" semantic conventions to + // report the location of the code for a specific component: + // https://opentelemetry.io/docs/specs/semconv/attributes-registry/code/ + // + // Or you may use the "vcs" semantic conventions to report the + // repository the component may be a part of: + // https://opentelemetry.io/docs/specs/semconv/attributes-registry/vcs/ + repeated KeyValue metadata = 1; + + // A map of component ID to sub components details. It can nest as deeply as needed to + // describe the underlying system. + map sub_component_map = 2; +} + + +message ServerToAgent { + // Agent instance uid. MUST match the instance_uid field in AgentToServer message. + // Used for multiplexing messages from/to multiple agents using one message stream. + bytes instance_uid = 1; + + // error_response is set if the Server wants to indicate that something went wrong + // during processing of an AgentToServer message. If error_response is set then + // all other fields below must be unset and vice versa, if any of the fields below is + // set then error_response must be unset. + ServerErrorResponse error_response = 2; + + // remote_config field is set when the Server has a remote config offer for the Agent. + AgentRemoteConfig remote_config = 3; + + // This field is set when the Server wants the Agent to change one or more + // of its client connection settings (destination, headers, certificate, etc). + // Status: [Beta] + ConnectionSettingsOffers connection_settings = 4; + + // This field is set when the Server has packages to offer to the Agent. + // Status: [Beta] + PackagesAvailable packages_available = 5; + + // Bit flags as defined by ServerToAgentFlags bit masks. + uint64 flags = 6; + + // Bitmask of flags defined by ServerCapabilities enum. + // All bits that are not defined in ServerCapabilities enum MUST be set to 0 + // by the Server. This allows extending the protocol and the ServerCapabilities + // enum in the future such that old Servers automatically report that they + // don't support the new capability. + // This field MUST be set in the first ServerToAgent sent by the Server and MAY + // be omitted in subsequent ServerToAgent messages by setting it to + // UnspecifiedServerCapability value. + uint64 capabilities = 7; + + // Properties related to identification of the Agent, which can be overridden + // by the Server if needed. + AgentIdentification agent_identification = 8; + + // Allows the Server to instruct the Agent to perform a command, e.g. RESTART. This field should not be specified + // with fields other than instance_uid and capabilities. If specified, other fields will be ignored and the command + // will be performed. + // Status: [Beta] + ServerToAgentCommand command = 9; + + // A message indicating custom capabilities supported by the Server. + // Status: [Development] + CustomCapabilities custom_capabilities = 10; + + // A custom message sent from the Server to an Agent. + // Status: [Development] + CustomMessage custom_message = 11; +} + +enum ServerToAgentFlags { + ServerToAgentFlags_Unspecified = 0; + + // Flags is a bit mask. Values below define individual bits. + + // ReportFullState flag can be used by the Server if the Agent did not include the + // particular bit of information in the last status report (which is an allowed + // optimization) but the Server detects that it does not have it (e.g. was + // restarted and lost state). The detection happens using + // AgentToServer.sequence_num values. + // The Server asks the Agent to report full status. + ServerToAgentFlags_ReportFullState = 0x00000001; + + // ReportAvailableComponents flag can be used by the server if the Agent did + // not include the full AvailableComponents message, but only the hash. + // If this flag is specified, the agent will populate available_components.components + // with a full description of the agent's components. + // Status: [Development] + ServerToAgentFlags_ReportAvailableComponents = 0x00000002; +} + +enum ServerCapabilities { + // The capabilities field is unspecified. + ServerCapabilities_Unspecified = 0; + + // The Server can accept status reports. This bit MUST be set, since all Server + // MUST be able to accept status reports. + ServerCapabilities_AcceptsStatus = 0x00000001; + // The Server can offer remote configuration to the Agent. + ServerCapabilities_OffersRemoteConfig = 0x00000002; + // The Server can accept EffectiveConfig in AgentToServer. + ServerCapabilities_AcceptsEffectiveConfig = 0x00000004; + // The Server can offer Packages. + // Status: [Beta] + ServerCapabilities_OffersPackages = 0x00000008; + // The Server can accept Packages status. + // Status: [Beta] + ServerCapabilities_AcceptsPackagesStatus = 0x00000010; + // The Server can offer connection settings. + // Status: [Beta] + ServerCapabilities_OffersConnectionSettings = 0x00000020; + // The Server can accept ConnectionSettingsRequest and respond with an offer. + // Status: [Development] + ServerCapabilities_AcceptsConnectionSettingsRequest = 0x00000040; + + // Add new capabilities here, continuing with the least significant unused bit. +} + +// The OpAMPConnectionSettings message is a collection of fields which comprise an +// offer from the Server to the Agent to use the specified settings for OpAMP +// connection. +// Status: [Beta] +message OpAMPConnectionSettings { + // OpAMP Server URL This MUST be a WebSocket or HTTP URL and MUST be non-empty, for + // example: "wss://example.com:4318/v1/opamp" + string destination_endpoint = 1; + + // Optional headers to use when connecting. Typically used to set access tokens or + // other authorization headers. For HTTP-based protocols the Agent should + // set these in the request headers. + // For example: + // key="Authorization", Value="Basic YWxhZGRpbjpvcGVuc2VzYW1l". + Headers headers = 2; + + // The Agent should use the offered certificate to connect to the destination + // from now on. If the Agent is able to validate and connect using the offered + // certificate the Agent SHOULD forget any previous client certificates + // for this connection. + // This field is optional: if omitted the client SHOULD NOT use a client-side certificate. + // This field can be used to perform a client certificate revocation/rotation. + TLSCertificate certificate = 3; + + // The Agent MUST periodically send an AgentToServer message if the + // AgentCapabilities_ReportsHeartbeat capability is true. At a minimum the instance_uid + // field MUST be set. + // + // An HTTP Client MUST use the value as polling interval, if heartbeat_interval_seconds is non-zero. + // + // A heartbeat is used to keep the connection active and inform the server that the Agent + // is still alive and active. + // + // If this field has no value or is set to 0, the Agent should not send any heartbeats. + // Status: [Development] + uint64 heartbeat_interval_seconds = 4; + + // Optional connection specific TLS settings. + // Status: [Development] + TLSConnectionSettings tls = 5; +} + +// The TelemetryConnectionSettings message is a collection of fields which comprise an +// offer from the Server to the Agent to use the specified settings for a network +// connection to report own telemetry. +// Status: [Beta] +message TelemetryConnectionSettings { + // The value MUST be a full URL an OTLP/HTTP/Protobuf receiver with path. Schema + // SHOULD begin with "https://", for example "https://example.com:4318/v1/metrics" + // The Agent MAY refuse to send the telemetry if the URL begins with "http://". + string destination_endpoint = 1; + + // Optional headers to use when connecting. Typically used to set access tokens or + // other authorization headers. For HTTP-based protocols the Agent should + // set these in the request headers. + // For example: + // key="Authorization", Value="Basic YWxhZGRpbjpvcGVuc2VzYW1l". + Headers headers = 2; + + // The Agent should use the offered certificate to connect to the destination + // from now on. If the Agent is able to validate and connect using the offered + // certificate the Agent SHOULD forget any previous client certificates + // for this connection. + // This field is optional: if omitted the client SHOULD NOT use a client-side certificate. + // This field can be used to perform a client certificate revocation/rotation. + TLSCertificate certificate = 3; + + // Optional connection specific TLS settings. + // Status: [Development] + TLSConnectionSettings tls = 4; +} + +// The OtherConnectionSettings message is a collection of fields which comprise an +// offer from the Server to the Agent to use the specified settings for a network +// connection. It is not required that all fields in this message are specified. +// The Server may specify only some of the fields, in which case it means that +// the Server offers the Agent to change only those fields, while keeping the +// rest of the fields unchanged. +// +// For example the Server may send a ConnectionSettings message with only the +// certificate field set, while all other fields are unset. This means that +// the Server wants the Agent to use a new certificate and continue sending to +// the destination it is currently sending using the current header and other +// settings. +// +// For fields which reference other messages the field is considered unset +// when the reference is unset. +// +// For primitive field (string) we rely on the "flags" to describe that the +// field is not set (this is done to overcome the limitation of old protoc +// compilers don't generate methods that allow to check for the presence of +// the field. +// Status: [Beta] +message OtherConnectionSettings { + // A URL, host:port or some other destination specifier. + string destination_endpoint = 1; + + // Optional headers to use when connecting. Typically used to set access tokens or + // other authorization headers. For HTTP-based protocols the Agent should + // set these in the request headers. + // For example: + // key="Authorization", Value="Basic YWxhZGRpbjpvcGVuc2VzYW1l". + Headers headers = 2; + + // The Agent should use the offered certificate to connect to the destination + // from now on. If the Agent is able to validate and connect using the offered + // certificate the Agent SHOULD forget any previous client certificates + // for this connection. + // This field is optional: if omitted the client SHOULD NOT use a client-side certificate. + // This field can be used to perform a client certificate revocation/rotation. + TLSCertificate certificate = 3; + + // Other connection settings. These are Agent-specific and are up to the Agent + // interpret. + map other_settings = 4; + + // Optional connection specific TLS settings. + // Status: [Development] + TLSConnectionSettings tls = 5; +} + + +// TLSConnectionSettings are optional connection settings that can be passed to +// the client in order to specify TLS configuration. +// Status: [Development] +message TLSConnectionSettings { + // Provides CA cert contents as a string. + string ca_pem_contents = 1; + + // Load system CA pool alongside any passed CAs. + bool include_system_ca_certs_pool = 2; + + // skip certificate verification. + bool insecure_skip_verify = 3; + + // Miniumum accepted TLS version; default "1.2". + string min_version = 4; + + // Maxiumum accepted TLS version; default "". + string max_version = 5; + + // Explicit list of cipher suites. + repeated string cipher_suites = 6; +} + +// Status: [Beta] +message Headers { + repeated Header headers = 1; +} + +// Status: [Beta] +message Header { + string key = 1; + string value = 2; +} + +// Status: [Beta] +message TLSCertificate { + // The (cert,private_key) pair should be issued and signed by a Certificate + // Authority (CA) that the destination Server recognizes. + // + // It is highly recommended that the private key of the CA certificate is NOT + // stored on the destination Server otherwise compromising the Server will allow + // a malicious actor to issue valid Server certificates which will be automatically + // trusted by all agents and will allow the actor to trivially MITM Agent-to-Server + // traffic of all servers that use this CA certificate for their Server-side + // certificates. + // + // Alternatively the certificate may be self-signed, assuming the Server can + // verify the certificate. + + // PEM-encoded certificate. Required. + bytes cert = 1; + + // PEM-encoded private key of the certificate. Required. + bytes private_key = 2; + + // PEM-encoded certificate of the signing CA. + // Optional. MUST be specified if the certificate is CA-signed. + // Can be stored by TLS-terminating intermediary proxies in order to verify + // the connecting client's certificate in the future. + // It is not recommended that the Agent accepts this CA as an authority for + // any purposes. + bytes ca_cert = 3; +} + +// Status: [Beta] +message ConnectionSettingsOffers { + // Hash of all settings, including settings that may be omitted from this message + // because they are unchanged. + bytes hash = 1; + + // Settings to connect to the OpAMP Server. + // If this field is not set then the Agent should assume that the settings are + // unchanged and should continue using existing settings. + // The Agent MUST verify the offered connection settings by actually connecting + // before accepting the setting to ensure it does not loose access to the OpAMP + // Server due to invalid settings. + OpAMPConnectionSettings opamp = 2; + + // Settings to connect to an OTLP metrics backend to send Agent's own metrics to. + // If this field is not set then the Agent should assume that the settings + // are unchanged. + // + // Once accepted the Agent should periodically send to the specified destination + // its own metrics, i.e. metrics of the Agent process and any custom metrics that + // describe the Agent state. + // + // All attributes specified in the identifying_attributes field in AgentDescription + // message SHOULD be also specified in the Resource of the reported OTLP metrics. + // + // Attributes specified in the non_identifying_attributes field in + // AgentDescription message may be also specified in the Resource of the reported + // OTLP metrics, in which case they SHOULD have exactly the same values. + // + // Process metrics MUST follow the conventions for processes: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/process-metrics.md + TelemetryConnectionSettings own_metrics = 3; + + // Similar to own_metrics, but for traces. + TelemetryConnectionSettings own_traces = 4; + + // Similar to own_metrics, but for logs. + TelemetryConnectionSettings own_logs = 5; + + // Another set of connection settings, with a string name associated with each. + // How the Agent uses these is Agent-specific. Typically the name represents + // the name of the destination to connect to (as it is known to the Agent). + // If this field is not set then the Agent should assume that the other_connections + // settings are unchanged. + map other_connections = 6; +} + +// List of packages that the Server offers to the Agent. +// Status: [Beta] +message PackagesAvailable { + // Map of packages. Keys are package names, values are the packages available for download. + map packages = 1; + + // Aggregate hash of all remotely installed packages. The Agent SHOULD include this + // value in subsequent PackageStatuses messages. This in turn allows the management + // Server to identify that a different set of packages is available for the Agent + // and specify the available packages in the next ServerToAgent message. + // + // This field MUST be always set if the management Server supports packages + // of agents. + // + // The hash is calculated as an aggregate of all packages names and content. + bytes all_packages_hash = 2; +} + +// Each Agent is composed of one or more packages. A package has a name and +// content stored in a file. The content of the files, functionality +// provided by the packages, how they are stored and used by the Agent side is Agent +// type-specific and is outside the concerns of the OpAMP protocol. +// +// If the Agent does not have an installed package with the specified name then +// it SHOULD download it from the specified URL and install it. +// +// If the Agent already has an installed package with the specified name +// but with a different hash then the Agent SHOULD download and +// install the package again, since it is a different version of the same package. +// +// If the Agent has an installed package with the specified name and the same +// hash then the Agent does not need to do anything, it already +// has the right version of the package. +// Status: [Beta] +message PackageAvailable { + PackageType type = 1; + + // The package version that is available on the Server side. The Agent may for + // example use this information to avoid downloading a package that was previously + // already downloaded and failed to install. + string version = 2; + + // The downloadable file of the package. + DownloadableFile file = 3; + + // The hash of the package. SHOULD be calculated based on all other fields of the + // PackageAvailable message and content of the file of the package. The hash is + // used by the Agent to determine if the package it has is different from the + // package the Server is offering. + bytes hash = 4; +} + +// The type of the package, either an addon or a top-level package. +// Status: [Beta] +enum PackageType { + PackageType_TopLevel = 0; + PackageType_Addon = 1; +} + +// Status: [Beta] +message DownloadableFile { + // The URL from which the file can be downloaded using HTTP GET request. + // The Server at the specified URL SHOULD support range requests + // to allow for resuming downloads. + string download_url = 1; + + // The hash of the file content. Can be used by the Agent to verify that the file + // was downloaded correctly. + bytes content_hash = 2; + + // Optional signature of the file content. Can be used by the Agent to verify the + // authenticity of the downloaded file, for example can be the + // [detached GPG signature](https://www.gnupg.org/gph/en/manual/x135.html#AEN160). + // The exact signing and verification method is Agent specific. See + // https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#code-signing + // for recommendations. + bytes signature = 3; + + // Optional headers to use when downloading a file. Typically used to set + // access tokens or other authorization headers. For HTTP-based protocols + // the Agent should set these in the request headers. + // For example: + // key="Authorization", Value="Basic YWxhZGRpbjpvcGVuc2VzYW1l". + // Status: [Development] + Headers headers = 4; +} + +message ServerErrorResponse { + ServerErrorResponseType type = 1; + + // Error message in the string form, typically human readable. + string error_message = 2; + + oneof Details { + // Additional information about retrying if type==UNAVAILABLE. + RetryInfo retry_info = 3; + } +} + +enum ServerErrorResponseType { + // Unknown error. Something went wrong, but it is not known what exactly. + // The Agent SHOULD NOT retry the message. + // The error_message field may contain a description of the problem. + ServerErrorResponseType_Unknown = 0; + + // The AgentToServer message was malformed. The Agent SHOULD NOT retry + // the message. + ServerErrorResponseType_BadRequest = 1; + + // The Server is overloaded and unable to process the request. The Agent + // should retry the message later. retry_info field may be optionally + // set with additional information about retrying. + ServerErrorResponseType_Unavailable = 2; +} + +message RetryInfo { + uint64 retry_after_nanoseconds = 1; +} + +// ServerToAgentCommand is sent from the Server to the Agent to request that the Agent +// perform a command. +// Status: [Beta] +message ServerToAgentCommand { + CommandType type = 1; +} + +// Status: [Beta] +enum CommandType { + // The Agent should restart. This request will be ignored if the Agent does not + // support restart. + CommandType_Restart = 0; +} + +//////////////////////////////////////////////////////////////////////////////////// +// Status reporting + +message AgentDescription { + // Attributes that identify the Agent. + // Keys/values are according to OpenTelemetry semantic conventions, see: + // https://github.com/open-telemetry/opentelemetry-specification/tree/main/specification/resource/semantic_conventions + // + // For standalone running Agents (such as OpenTelemetry Collector) the following + // attributes SHOULD be specified: + // - service.name should be set to a reverse FQDN that uniquely identifies the + // Agent type, e.g. "io.opentelemetry.collector" + // - service.namespace if it is used in the environment where the Agent runs. + // - service.version should be set to version number of the Agent build. + // - service.instance.id should be set. It may be set equal to the Agent's + // instance uid (equal to ServerToAgent.instance_uid field) or any other value + // that uniquely identifies the Agent in combination with other attributes. + // - any other attributes that are necessary for uniquely identifying the Agent's + // own telemetry. + // + // The Agent SHOULD also include these attributes in the Resource of its own + // telemetry. The combination of identifying attributes SHOULD be sufficient to + // uniquely identify the Agent's own telemetry in the destination system to which + // the Agent sends its own telemetry. + repeated KeyValue identifying_attributes = 1; + + // Attributes that do not necessarily identify the Agent but help describe + // where it runs. + // The following attributes SHOULD be included: + // - os.type, os.version - to describe where the Agent runs. + // - host.* to describe the host the Agent runs on. + // - cloud.* to describe the cloud where the host is located. + // - any other relevant Resource attributes that describe this Agent and the + // environment it runs in. + // - any user-defined attributes that the end user would like to associate + // with this Agent. + repeated KeyValue non_identifying_attributes = 2; + + // TODO: add ability to specify related entities (such as the Service the Agent is + // is responsible/associated with). +} + +enum AgentCapabilities { + // The capabilities field is unspecified. + AgentCapabilities_Unspecified = 0; + // The Agent can report status. This bit MUST be set, since all Agents MUST + // report status. + AgentCapabilities_ReportsStatus = 0x00000001; + // The Agent can accept remote configuration from the Server. + AgentCapabilities_AcceptsRemoteConfig = 0x00000002; + // The Agent will report EffectiveConfig in AgentToServer. + AgentCapabilities_ReportsEffectiveConfig = 0x00000004; + // The Agent can accept package offers. + // Status: [Beta] + AgentCapabilities_AcceptsPackages = 0x00000008; + // The Agent can report package status. + // Status: [Beta] + AgentCapabilities_ReportsPackageStatuses = 0x00000010; + // The Agent can report own trace to the destination specified by + // the Server via ConnectionSettingsOffers.own_traces field. + // Status: [Beta] + AgentCapabilities_ReportsOwnTraces = 0x00000020; + // The Agent can report own metrics to the destination specified by + // the Server via ConnectionSettingsOffers.own_metrics field. + // Status: [Beta] + AgentCapabilities_ReportsOwnMetrics = 0x00000040; + // The Agent can report own logs to the destination specified by + // the Server via ConnectionSettingsOffers.own_logs field. + // Status: [Beta] + AgentCapabilities_ReportsOwnLogs = 0x00000080; + // The can accept connections settings for OpAMP via + // ConnectionSettingsOffers.opamp field. + // Status: [Beta] + AgentCapabilities_AcceptsOpAMPConnectionSettings = 0x00000100; + // The can accept connections settings for other destinations via + // ConnectionSettingsOffers.other_connections field. + // Status: [Beta] + AgentCapabilities_AcceptsOtherConnectionSettings = 0x00000200; + // The Agent can accept restart requests. + // Status: [Beta] + AgentCapabilities_AcceptsRestartCommand = 0x00000400; + // The Agent will report Health via AgentToServer.health field. + AgentCapabilities_ReportsHealth = 0x00000800; + // The Agent will report RemoteConfig status via AgentToServer.remote_config_status field. + AgentCapabilities_ReportsRemoteConfig = 0x00001000; + // The Agent can report heartbeats. + // This is specified by the ServerToAgent.OpAMPConnectionSettings.heartbeat_interval_seconds field. + // If this capability is true, but the Server does not set a heartbeat_interval_seconds field, the + // Agent should use its own configured interval, which by default will be 30s. The Server may not + // know the configured interval and should not make assumptions about it. + // Status: [Development] + AgentCapabilities_ReportsHeartbeat = 0x00002000; + // The agent will report AvailableComponents via the AgentToServer.available_components field. + // Status: [Development] + AgentCapabilities_ReportsAvailableComponents = 0x00004000; + // Add new capabilities here, continuing with the least significant unused bit. +} + +// The health of the Agent and sub-components +// Status: [Beta] +message ComponentHealth { + // Set to true if the component is up and healthy. + bool healthy = 1; + + // Timestamp since the component is up, i.e. when the component was started. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // If the component is not running MUST be set to 0. + fixed64 start_time_unix_nano = 2; + + // Human-readable error message if the component is in erroneous state. SHOULD be set + // when healthy==false. + string last_error = 3; + + // Component status represented as a string. The status values are defined by agent-specific + // semantics and not at the protocol level. + string status = 4; + + // The time when the component status was observed. Value is UNIX Epoch time in + // nanoseconds since 00:00:00 UTC on 1 January 1970. + fixed64 status_time_unix_nano = 5; + + // A map to store more granular, sub-component health. It can nest as deeply as needed to + // describe the underlying system. + map component_health_map = 6; +} + +message EffectiveConfig { + // The effective config of the Agent. + AgentConfigMap config_map = 1; +} + +message RemoteConfigStatus { + // The hash of the remote config that was last received by this Agent in the + // AgentRemoteConfig.config_hash field. + // The Server SHOULD compare this hash with the config hash + // it has for the Agent and if the hashes are different the Server MUST include + // the remote_config field in the response in the ServerToAgent message. + bytes last_remote_config_hash = 1; + + RemoteConfigStatuses status = 2; + + // Optional error message if status==FAILED. + string error_message = 3; +} + +enum RemoteConfigStatuses { + // The value of status field is not set. + RemoteConfigStatuses_UNSET = 0; + + // Remote config was successfully applied by the Agent. + RemoteConfigStatuses_APPLIED = 1; + + // Agent is currently applying the remote config that it received earlier. + RemoteConfigStatuses_APPLYING = 2; + + // Agent tried to apply the config received earlier, but it failed. + // See error_message for more details. + RemoteConfigStatuses_FAILED = 3; +} + +// The PackageStatuses message describes the status of all packages that the Agent +// has or was offered. +// Status: [Beta] +message PackageStatuses { + // A map of PackageStatus messages, where the keys are package names. + // The key MUST match the name field of PackageStatus message. + map packages = 1; + + // The aggregate hash of all packages that this Agent previously received from the + // Server via PackagesAvailable message. + // + // The Server SHOULD compare this hash to the aggregate hash of all packages that + // it has for this Agent and if the hashes are different the Server SHOULD send + // an PackagesAvailable message to the Agent. + bytes server_provided_all_packages_hash = 2; + + // This field is set if the Agent encountered an error when processing the + // PackagesAvailable message and that error is not related to any particular single + // package. + // The field must be unset is there were no processing errors. + string error_message = 3; +} + +// The status of a single package. +// Status: [Beta] +message PackageStatus { + // Package name. MUST be always set and MUST match the key in the packages field + // of PackageStatuses message. + string name = 1; + + // The version of the package that the Agent has. + // MUST be set if the Agent has this package. + // MUST be empty if the Agent does not have this package. This may be the case + // for example if the package was offered by the Server but failed to install + // and the Agent did not have this package previously. + string agent_has_version = 2; + + // The hash of the package that the Agent has. + // MUST be set if the Agent has this package. + // MUST be empty if the Agent does not have this package. This may be the case for + // example if the package was offered by the Server but failed to install and the + // Agent did not have this package previously. + bytes agent_has_hash = 3; + + // The version of the package that the Server offered to the Agent. + // MUST be set if the installation of the package is initiated by an earlier offer + // from the Server to install this package. + // + // MUST be empty if the Agent has this package but it was installed locally and + // was not offered by the Server. + // + // Note that it is possible for both agent_has_version and server_offered_version + // fields to be set and to have different values. This is for example possible if + // the Agent already has a version of the package successfully installed, the Server + // offers a different version, but the Agent fails to install that version. + string server_offered_version = 4; + + // The hash of the package that the Server offered to the Agent. + // MUST be set if the installation of the package is initiated by an earlier + // offer from the Server to install this package. + // + // MUST be empty if the Agent has this package but it was installed locally and + // was not offered by the Server. + // + // Note that it is possible for both agent_has_hash and server_offered_hash + // fields to be set and to have different values. This is for example possible if + // the Agent already has a version of the package successfully installed, the + // Server offers a different version, but the Agent fails to install that version. + bytes server_offered_hash = 5; + + PackageStatusEnum status = 6; + + // Error message if the status is erroneous. + string error_message = 7; + + // Optional details that may be of interest to a user. + // Should only be set if status is Downloading. + // Status: [Development] + PackageDownloadDetails download_details = 8; +} + + +// Additional details that an agent can use to describe an in-progress package download. +// Status: [Development] +message PackageDownloadDetails { + // The package download progress as a percentage. + double download_percent = 1; + + // The current package download rate in bytes per second. + double download_bytes_per_second = 2; +} + +// The status of this package. +// Status: [Beta] +enum PackageStatusEnum { + // Package is successfully installed by the Agent. + // The error_message field MUST NOT be set. + PackageStatusEnum_Installed = 0; + + // Installation of this package has not yet started. + PackageStatusEnum_InstallPending = 1; + + // Agent is currently installing the package. + // server_offered_hash field MUST be set to indicate the version that the + // Agent is installing. The error_message field MUST NOT be set. + PackageStatusEnum_Installing = 2; + + // Agent tried to install the package but installation failed. + // server_offered_hash field MUST be set to indicate the version that the Agent + // tried to install. The error_message may also contain more details about + // the failure. + PackageStatusEnum_InstallFailed = 3; + + // Agent is currently downloading the package. + // server_offered_hash field MUST be set to indicate the version that the + // Agent is installing. The error_message field MUST NOT be set. + // Status: [Development] + PackageStatusEnum_Downloading = 4; +} + +// Properties related to identification of the Agent, which can be overridden +// by the Server if needed +message AgentIdentification { + // When new_instance_uid is set, Agent MUST update instance_uid + // to the value provided and use it for all further communication. + // MUST be 16 bytes long and SHOULD be generated using the UUID v7 spec. + bytes new_instance_uid = 1; +} + +///////////////////////////////////////////////////////////////////////////////////// +// Config messages +///////////////////////////////////////////////////////////////////////////////////// + +message AgentRemoteConfig { + // Agent config offered by the management Server to the Agent instance. SHOULD NOT be + // set if the config for this Agent has not changed since it was last requested (i.e. + // AgentConfigRequest.last_remote_config_hash field is equal to + // AgentConfigResponse.config_hash field). + AgentConfigMap config = 1; + + // Hash of "config". The Agent SHOULD include this value in subsequent + // RemoteConfigStatus messages in the last_remote_config_hash field. This in turn + // allows the management Server to identify that a new config is available for the Agent. + // + // This field MUST be always set if the management Server supports remote configuration + // of agents. + // + // Management Server must choose a hashing function that guarantees lack of hash + // collisions in practice. + bytes config_hash = 2; +} + +message AgentConfigMap { + // Map of configs. Keys are config file names or config section names. + // The configuration is assumed to be a collection of one or more named config files + // or sections. + // For agents that use a single config file or section the map SHOULD contain a single + // entry and the key may be an empty string. + map config_map = 1; +} + +message AgentConfigFile { + // Config file or section body. The content, format and encoding depends on the Agent + // type. The content_type field may optionally describe the MIME type of the body. + bytes body = 1; + + // Optional MIME Content-Type that describes what's in the body field, for + // example "text/yaml". + string content_type = 2; +} + +///////////////////////////////////////////////////////////////////////////////////// +// Custom messages +///////////////////////////////////////////////////////////////////////////////////// + +message CustomCapabilities { + // A list of custom capabilities that are supported. Each capability is a reverse FQDN + // with optional version information that uniquely identifies the custom capability + // and should match a capability specified in a supported CustomMessage. + // Status: [Development] + repeated string capabilities = 1; +} + +message CustomMessage { + // A reverse FQDN that uniquely identifies the capability and matches one of the + // capabilities in the CustomCapabilities message. + // Status: [Development] + string capability = 1; + + // Type of message within the capability. The capability defines the types of custom + // messages that are used to implement the capability. The type must only be unique + // within the capability. + // Status: [Development] + string type = 2; + + // Binary data of the message. The capability must specify the format of the contents + // of the data for each custom message type it defines. + // Status: [Development] + bytes data = 3; +} diff --git a/packages/api/src/opamp/services/agentService.ts b/packages/api/src/opamp/services/agentService.ts new file mode 100644 index 00000000..2efb7b81 --- /dev/null +++ b/packages/api/src/opamp/services/agentService.ts @@ -0,0 +1,96 @@ +import logger from '@/utils/logger'; + +import { Agent, agentStore } from '../models/agent'; + +export class AgentService { + /** + * Process an agent status report + */ + public processAgentStatus(agentToServer: any): Agent { + try { + // Extract necessary fields from the message + const { + instanceUid, + sequenceNum, + agentDescription, + capabilities, + effectiveConfig, + remoteConfigStatus, + } = agentToServer; + + // Get the existing agent or create a new one + let agent = agentStore.getAgent(instanceUid); + + if (!agent) { + // New agent, create a new record + agent = { + instanceUid, + sequenceNum, + agentDescription, + capabilities, + effectiveConfig, + remoteConfigStatus, + lastSeen: new Date(), + }; + } else { + // Existing agent, update its record + agent = { + ...agent, + sequenceNum, + lastSeen: new Date(), + }; + + // Update optional fields if they exist in the message + if (agentDescription) { + agent.agentDescription = agentDescription; + } + + if (capabilities) { + agent.capabilities = capabilities; + } + + if (effectiveConfig) { + agent.effectiveConfig = effectiveConfig; + } + + if (remoteConfigStatus) { + agent.remoteConfigStatus = remoteConfigStatus; + } + } + + // Update the agent in the store + agentStore.upsertAgent(agent); + + return agent; + } catch (error) { + logger.error('Error processing agent status:', error); + throw error; + } + } + + /** + * Check if an agent accepts remote configuration + */ + public agentAcceptsRemoteConfig(agent: Agent): boolean { + // Check if the agent has the AcceptsRemoteConfig capability bit set + // AcceptsRemoteConfig = 0x00000002 + return (agent.capabilities & 0x00000002) !== 0; + } + + /** + * Get an agent by its instance UID + */ + public getAgent(instanceUid: Buffer): Agent | undefined { + return agentStore.getAgent(instanceUid); + } + + /** + * Get all registered agents + */ + public getAllAgents(): Agent[] { + return agentStore.getAllAgents(); + } +} + +// Create a singleton instance of the agent service +export const agentService = new AgentService(); diff --git a/packages/api/src/opamp/utils/protobuf.ts b/packages/api/src/opamp/utils/protobuf.ts new file mode 100644 index 00000000..c2cf7c79 --- /dev/null +++ b/packages/api/src/opamp/utils/protobuf.ts @@ -0,0 +1,142 @@ +import { createHash } from 'crypto'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as protobuf from 'protobufjs'; + +import logger from '@/utils/logger'; + +// Define the root path of the proto file +const PROTO_PATH = path.resolve(__dirname, '../proto/opamp.proto'); + +// Load the OpAMP proto definition +let root: protobuf.Root; +try { + if (!fs.existsSync(PROTO_PATH)) { + throw new Error(`Proto file not found at ${PROTO_PATH}`); + } + root = protobuf.loadSync(PROTO_PATH); + logger.debug('OpAMP proto definition loaded successfully'); +} catch (error) { + logger.error('Failed to load OpAMP proto definition:', error); + throw error; +} + +// Get message types +const AgentToServer = root.lookupType('opamp.AgentToServer'); +const ServerToAgent = root.lookupType('opamp.ServerToAgent'); +const AgentRemoteConfig = root.lookupType('opamp.AgentRemoteConfig'); +const AgentConfigMap = root.lookupType('opamp.AgentConfigMap'); +const AgentConfigFile = root.lookupType('opamp.AgentConfigFile'); +const ServerCapabilities = root.lookupEnum('opamp.ServerCapabilities'); + +// Define the server capabilities +const serverCapabilities = + ServerCapabilities.values.AcceptsStatus | + ServerCapabilities.values.OffersRemoteConfig | + ServerCapabilities.values.AcceptsEffectiveConfig; + +/** + * Decode an AgentToServer message from binary data + */ +export function decodeAgentToServer(data: Buffer): protobuf.Message { + try { + return AgentToServer.decode(data); + } catch (error) { + logger.error('Failed to decode AgentToServer message:', error); + throw error; + } +} + +/** + * Encode a ServerToAgent message to binary data + */ +export function encodeServerToAgent(message: any): Buffer { + try { + // Verify the message + const error = ServerToAgent.verify(message); + if (error) { + throw new Error(`Invalid ServerToAgent message: ${error}`); + } + + // Create a message instance + const serverToAgent = ServerToAgent.create(message); + + // Encode the message + return Buffer.from(ServerToAgent.encode(serverToAgent).finish()); + } catch (error) { + logger.error('Failed to encode ServerToAgent message:', error); + throw error; + } +} + +/** + * Create a remote configuration message + */ +export function createRemoteConfig( + configFiles: Map, + configType: string = 'text/yaml', +): any { + try { + // Convert the configFiles map to the format expected by AgentConfigMap + const configMap: { [key: string]: any } = {}; + + configFiles.forEach((content, filename) => { + configMap[filename] = { + body: content, + contentType: configType, + }; + }); + + // Create the AgentConfigMap message + const agentConfigMap = { + configMap: configMap, + }; + + // Calculate the config hash + const configHash = calculateConfigHash(configFiles); + + // Create the AgentRemoteConfig message + return { + config: agentConfigMap, + configHash: configHash, + }; + } catch (error) { + logger.error('Failed to create remote config message:', error); + throw error; + } +} + +/** + * Calculate a hash for the configuration files + */ +function calculateConfigHash(configFiles: Map): Buffer { + try { + const hash = createHash('sha256'); + + // Sort keys to ensure consistent hashing + const sortedKeys = Array.from(configFiles.keys()).sort(); + + for (const key of sortedKeys) { + const content = configFiles.get(key); + if (content) { + hash.update(key); + hash.update(content); + } + } + + return hash.digest(); + } catch (error) { + logger.error('Failed to calculate config hash:', error); + throw error; + } +} + +export { + AgentConfigFile, + AgentConfigMap, + AgentRemoteConfig, + AgentToServer, + root, + serverCapabilities, + ServerToAgent, +}; diff --git a/packages/api/src/routers/api/root.ts b/packages/api/src/routers/api/root.ts index 9fb3bb5b..bfbf6526 100644 --- a/packages/api/src/routers/api/root.ts +++ b/packages/api/src/routers/api/root.ts @@ -96,6 +96,7 @@ router.post( const team = await createTeam({ name: `${email}'s Team`, + collectorAuthenticationEnforced: true, }); user.team = team._id; user.name = email; diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index 2810b7e5..e60779ad 100644 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -5,17 +5,23 @@ import { serializeError } from 'serialize-error'; import app from '@/api-app'; import * as config from '@/config'; import { connectDB, mongooseConnection } from '@/models'; +import opampApp from '@/opamp/app'; import logger from '@/utils/logger'; export default class Server { protected shouldHandleGracefulShutdown = true; - protected httpServer!: http.Server; + protected appServer!: http.Server; + protected opampServer!: http.Server; - private createServer() { + private createAppServer() { return http.createServer(app); } + private createOpampServer() { + return http.createServer(opampApp); + } + protected async shutdown(signal?: string) { let hasError = false; logger.info('Closing all db clients...'); @@ -36,29 +42,41 @@ export default class Server { } async start() { - this.httpServer = this.createServer(); - this.httpServer.keepAliveTimeout = 61000; // Ensure all inactive connections are terminated by the ALB, by setting this a few seconds higher than the ALB idle timeout - this.httpServer.headersTimeout = 62000; // Ensure the headersTimeout is set higher than the keepAliveTimeout due to this nodejs regression bug: https://github.com/nodejs/node/issues/27363 + this.appServer = this.createAppServer(); + this.appServer.keepAliveTimeout = 61000; // Ensure all inactive connections are terminated by the ALB, by setting this a few seconds higher than the ALB idle timeout + this.appServer.headersTimeout = 62000; // Ensure the headersTimeout is set higher than the keepAliveTimeout due to this nodejs regression bug: https://github.com/nodejs/node/issues/27363 - this.httpServer.listen(config.PORT, () => { + this.opampServer = this.createOpampServer(); + this.opampServer.keepAliveTimeout = 61000; + this.opampServer.headersTimeout = 62000; + + this.appServer.listen(config.PORT, () => { logger.info( - `Server listening on port ${config.PORT}, NODE_ENV=${process.env.NODE_ENV}`, + `API Server listening on port ${config.PORT}, NODE_ENV=${process.env.NODE_ENV}`, + ); + }); + + this.opampServer.listen(config.OPAMP_PORT, () => { + logger.info( + `OpAMP Server listening on port ${config.OPAMP_PORT}, NODE_ENV=${process.env.NODE_ENV}`, ); }); if (this.shouldHandleGracefulShutdown) { - gracefulShutdown(this.httpServer, { - signals: 'SIGINT SIGTERM', - timeout: 10000, // 10 secs - development: config.IS_DEV, - forceExit: true, // triggers process.exit() at the end of shutdown process - preShutdown: async () => { - // needed operation before httpConnections are shutted down - }, - onShutdown: this.shutdown, - finally: () => { - logger.info('Server gracefully shut down...'); - }, // finally function (sync) - e.g. for logging + [this.appServer, this.opampServer].forEach(server => { + gracefulShutdown(server, { + signals: 'SIGINT SIGTERM', + timeout: 10000, // 10 secs + development: config.IS_DEV, + forceExit: true, // triggers process.exit() at the end of shutdown process + preShutdown: async () => { + // needed operation before httpConnections are shutted down + }, + onShutdown: this.shutdown, + finally: () => { + logger.info('Server gracefully shut down...'); + }, // finally function (sync) - e.g. for logging + }); }); } diff --git a/smoke-tests/otel-collector/README.md b/smoke-tests/otel-collector/README.md index c34c5a8b..4e3cab85 100644 --- a/smoke-tests/otel-collector/README.md +++ b/smoke-tests/otel-collector/README.md @@ -1,6 +1,7 @@ # OpenTelemetry Collector Smoke Tests -This directory contains smoke tests for validating the OpenTelemetry Collector functionality in HyperDX. +This directory contains smoke tests for validating the OpenTelemetry Collector +functionality in HyperDX. ## Prerequisites @@ -9,7 +10,10 @@ Before running the tests, ensure you have the following tools installed: - [Bats](https://github.com/bats-core/bats-core) - Bash Automated Testing System - [Docker](https://www.docker.com/) and Docker Compose - [curl](https://curl.se/) - Command line tool for transferring data -- [ClickHouse client](https://clickhouse.com/docs/en/integrations/sql-clients/clickhouse-client) - Command-line client for ClickHouse +- [ClickHouse client](https://clickhouse.com/docs/en/integrations/sql-clients/clickhouse-client) - + Command-line client for ClickHouse + - Make sure to run `clickhouse install` to install the ClickHouse client after + downloading ClickHouse ## Running the Tests @@ -29,12 +33,15 @@ bats hdx-1453-auto-parse-json.bats ## Test Structure - `*.bats` - Test files written in Bats -- `setup_suite.bash` - Contains global setup_suite and teardown_suite functions that run once for the entire test suite +- `setup_suite.bash` - Contains global setup_suite and teardown_suite functions + that run once for the entire test suite - `data/` - Test data used by the tests - `test_helpers/` - Utility functions for the tests - `docker-compose.yaml` - Docker Compose configuration for the test environment -The test suite uses Bats' `setup_suite` and `teardown_suite` hooks to initialize the environment only once, regardless of how many test files are run. This optimizes test execution by: +The test suite uses Bats' `setup_suite` and `teardown_suite` hooks to initialize +the environment only once, regardless of how many test files are run. This +optimizes test execution by: 1. Validating the environment once 2. Starting Docker containers once at the beginning of the test suite @@ -42,7 +49,9 @@ The test suite uses Bats' `setup_suite` and `teardown_suite` hooks to initialize ## Debugging -If you need to debug the tests, you can set the `SKIP_CLEANUP` environment variable to prevent the Docker containers from being torn down after the tests complete: +If you need to debug the tests, you can set the `SKIP_CLEANUP` environment +variable to prevent the Docker containers from being torn down after the tests +complete: ```bash SKIP_CLEANUP=1 bats *.bats @@ -54,7 +63,9 @@ or SKIP_CLEANUP=true bats *.bats ``` -With `SKIP_CLEANUP` enabled, the test containers will remain running after the tests complete, allowing you to inspect logs, connect to the containers, and debug issues. +With `SKIP_CLEANUP` enabled, the test containers will remain running after the +tests complete, allowing you to inspect logs, connect to the containers, and +debug issues. To manually clean up the containers after debugging: diff --git a/smoke-tests/otel-collector/docker-compose.yaml b/smoke-tests/otel-collector/docker-compose.yaml index c2b2b590..ae17ff70 100644 --- a/smoke-tests/otel-collector/docker-compose.yaml +++ b/smoke-tests/otel-collector/docker-compose.yaml @@ -11,15 +11,23 @@ services: networks: - internal healthcheck: - test: wget --no-verbose --tries=1 http://127.0.0.1:8123/ping || exit 1 + test: + wget -O /dev/null --no-verbose --tries=1 http://127.0.0.1:8123/ping || + exit 1 interval: 5s timeout: 3s retries: 5 start_period: 10s otel-collector: - extends: - file: ../../docker-compose.ci.yml - service: otel-collector + image: otel/opentelemetry-collector-contrib:0.126.0 + volumes: + - ../../docker/otel-collector/config.yaml:/etc/otelcol-contrib/config.yaml + - ./receiver-config.yaml:/etc/otelcol-contrib/receiver-config.yaml + command: + [ + '--config=/etc/otelcol-contrib/receiver-config.yaml', + '--config=/etc/otelcol-contrib/config.yaml', + ] environment: - CLICKHOUSE_ENDPOINT=tcp://ch-server:9000?dial_timeout=10s - CLICKHOUSE_PROMETHEUS_METRICS_ENDPOINT=ch-server:9363 diff --git a/smoke-tests/otel-collector/receiver-config.yaml b/smoke-tests/otel-collector/receiver-config.yaml new file mode 100644 index 00000000..b369508c --- /dev/null +++ b/smoke-tests/otel-collector/receiver-config.yaml @@ -0,0 +1,42 @@ +# Usually added via OpAMP +receivers: + # Troubleshooting + prometheus: + config: + scrape_configs: + - job_name: 'otelcol' + scrape_interval: 30s + static_configs: + - targets: + - '0.0.0.0:8888' + - ${env:CLICKHOUSE_PROMETHEUS_METRICS_ENDPOINT} + # Data sources: logs + fluentforward: + endpoint: '0.0.0.0:24225' + # Configured via OpAMP w/ authentication + # Data sources: traces, metrics, logs + otlp/hyperdx: + protocols: + grpc: + include_metadata: true + endpoint: '0.0.0.0:4317' + http: + cors: + allowed_origins: ['*'] + allowed_headers: ['*'] + include_metadata: true + endpoint: '0.0.0.0:4318' + +service: + pipelines: + traces: + receivers: + - otlp/hyperdx + metrics: + receivers: + - otlp/hyperdx + - prometheus + logs/in: + receivers: + - otlp/hyperdx + - fluentforward diff --git a/smoke-tests/otel-collector/test_helpers/utilities.bash b/smoke-tests/otel-collector/test_helpers/utilities.bash index b6a7d1a6..e917b3c3 100644 --- a/smoke-tests/otel-collector/test_helpers/utilities.bash +++ b/smoke-tests/otel-collector/test_helpers/utilities.bash @@ -13,7 +13,7 @@ validate_env() { # Check if clickhouse-client is installed if ! command -v clickhouse-client &> /dev/null; then - echo "❌ Error: clickhouse-client is not installed. Please install clickhouse-client to continue." >&3 + echo "❌ Error: clickhouse-client is not installed. Please install clickhouse-client to continue. (Did you run `clickhouse install` yet?)" >&3 return 1 fi diff --git a/yarn.lock b/yarn.lock index 01a826a8..60a33a20 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4277,6 +4277,7 @@ __metadata: "@types/lodash": "npm:^4.14.198" "@types/minimist": "npm:^1.2.2" "@types/ms": "npm:^0.7.31" + "@types/node": "npm:^22.15.18" "@types/object-hash": "npm:^2.2.1" "@types/passport-http-bearer": "npm:^1.0.37" "@types/passport-local": "npm:^1.0.34" @@ -4317,6 +4318,7 @@ __metadata: passport-local-mongoose: "npm:^6.1.0" pluralize: "npm:^8.0.0" promised-handlebars: "npm:^2.0.1" + protobufjs: "npm:^7.5.2" rimraf: "npm:^4.4.1" semver: "npm:^7.5.2" serialize-error: "npm:^8.1.0" @@ -10061,6 +10063,15 @@ __metadata: languageName: node linkType: hard +"@types/node@npm:^22.15.18": + version: 22.15.18 + resolution: "@types/node@npm:22.15.18" + dependencies: + undici-types: "npm:~6.21.0" + checksum: 10c0/e23178c568e2dc6b93b6aa3b8dfb45f9556e527918c947fe7406a4c92d2184c7396558912400c3b1b8d0fa952ec63819aca2b8e4d3545455fc6f1e9623e09ca6 + languageName: node + linkType: hard + "@types/normalize-package-data@npm:^2.4.0": version: 2.4.1 resolution: "@types/normalize-package-data@npm:2.4.1" @@ -23424,6 +23435,26 @@ __metadata: languageName: node linkType: hard +"protobufjs@npm:^7.5.2": + version: 7.5.2 + resolution: "protobufjs@npm:7.5.2" + dependencies: + "@protobufjs/aspromise": "npm:^1.1.2" + "@protobufjs/base64": "npm:^1.1.2" + "@protobufjs/codegen": "npm:^2.0.4" + "@protobufjs/eventemitter": "npm:^1.1.0" + "@protobufjs/fetch": "npm:^1.1.0" + "@protobufjs/float": "npm:^1.0.2" + "@protobufjs/inquire": "npm:^1.1.0" + "@protobufjs/path": "npm:^1.1.2" + "@protobufjs/pool": "npm:^1.1.0" + "@protobufjs/utf8": "npm:^1.1.0" + "@types/node": "npm:>=13.7.0" + long: "npm:^5.0.0" + checksum: 10c0/c4ac6298280dfe6116e7a1b722310de807037b1abed816db2af2a595ddc9dc6160447eebd4ad8e2968d41f0f85375f62e893adfe760af1a943d195931c7bb875 + languageName: node + linkType: hard + "protobufjs@npm:~6.11.2": version: 6.11.4 resolution: "protobufjs@npm:6.11.4" @@ -27718,6 +27749,13 @@ __metadata: languageName: node linkType: hard +"undici-types@npm:~6.21.0": + version: 6.21.0 + resolution: "undici-types@npm:6.21.0" + checksum: 10c0/c01ed51829b10aa72fc3ce64b747f8e74ae9b60eafa19a7b46ef624403508a54c526ffab06a14a26b3120d055e1104d7abe7c9017e83ced038ea5cf52f8d5e04 + languageName: node + linkType: hard + "unicode-canonical-property-names-ecmascript@npm:^2.0.0": version: 2.0.0 resolution: "unicode-canonical-property-names-ecmascript@npm:2.0.0"