From df6153964824963924bc945a2ec2638cf3773404 Mon Sep 17 00:00:00 2001 From: Teresa Date: Mon, 26 Sep 2022 08:21:40 -0400 Subject: [PATCH] convert kafka topic & consumer group as ENV variables (#390) --- integration-tests/docker-compose.yml | 3 +++ packages/services/usage-ingestor/.env.template | 2 ++ packages/services/usage-ingestor/src/index.ts | 3 ++- packages/services/usage-ingestor/src/ingestor.ts | 3 ++- packages/services/usage/.env.template | 3 ++- packages/services/usage/src/index.ts | 2 +- 6 files changed, 12 insertions(+), 4 deletions(-) diff --git a/integration-tests/docker-compose.yml b/integration-tests/docker-compose.yml index 058453ceb..8ddc502dc 100644 --- a/integration-tests/docker-compose.yml +++ b/integration-tests/docker-compose.yml @@ -343,6 +343,7 @@ services: TOKENS_ENDPOINT: http://tokens:3003 RATE_LIMIT_ENDPOINT: http://rate-limit:3009 KAFKA_CONNECTION_MODE: 'docker' + KAFKA_TOPIC: 'usage_reports_v2' KAFKA_BROKER: broker:29092 KAFKA_BUFFER_SIZE: 350 KAFKA_BUFFER_INTERVAL: 1000 @@ -365,6 +366,8 @@ services: KAFKA_CONNECTION_MODE: 'docker' KAFKA_BROKER: broker:29092 KAFKA_CONCURRENCY: 1 + KAFKA_CONSUMER_GROUP: 'usage-ingestor-v2' + KAFKA_TOPIC: 'usage_reports_v2' CLICKHOUSE_PROTOCOL: 'http' CLICKHOUSE_HOST: clickhouse CLICKHOUSE_PORT: 8123 diff --git a/packages/services/usage-ingestor/.env.template b/packages/services/usage-ingestor/.env.template index 584ec8cce..8c61ab429 100644 --- a/packages/services/usage-ingestor/.env.template +++ b/packages/services/usage-ingestor/.env.template @@ -1,6 +1,8 @@ KAFKA_CONNECTION_MODE="docker" KAFKA_BROKER="localhost:9092" KAFKA_CONCURRENCY="1" +KAFKA_CONSUMER_GROUP="usage-ingestor-v2" +KAFKA_TOPIC="usage_reports_v2" CLICKHOUSE_PROTOCOL="http" CLICKHOUSE_HOST="localhost" CLICKHOUSE_PORT="8123" diff --git a/packages/services/usage-ingestor/src/index.ts b/packages/services/usage-ingestor/src/index.ts index 5bf939bd2..0d783d9f2 100644 --- a/packages/services/usage-ingestor/src/index.ts +++ b/packages/services/usage-ingestor/src/index.ts @@ -44,7 +44,8 @@ async function main() { } : null, kafka: { - topic: 'usage_reports_v2', + topic: ensureEnv('KAFKA_TOPIC'), + consumerGroup: ensureEnv('KAFKA_CONSUMER_GROUP'), concurrency: ensureEnv('KAFKA_CONCURRENCY', 'number'), connection: ensureEnv('KAFKA_CONNECTION_MODE') == 'hosted' diff --git a/packages/services/usage-ingestor/src/ingestor.ts b/packages/services/usage-ingestor/src/ingestor.ts index e95d0e95b..55b901252 100644 --- a/packages/services/usage-ingestor/src/ingestor.ts +++ b/packages/services/usage-ingestor/src/ingestor.ts @@ -42,6 +42,7 @@ export function createIngestor(config: { clickhouseCloud: ClickHouseConfig | null; kafka: { topic: string; + consumerGroup: string; concurrency: number; connection: | { @@ -87,7 +88,7 @@ export function createIngestor(config: { }, }); const consumer = kafka.consumer({ - groupId: `usage-ingestor-v2`, + groupId: config.kafka.consumerGroup, retry: { retries: 2, }, diff --git a/packages/services/usage/.env.template b/packages/services/usage/.env.template index ac38ecd63..71c5a7743 100644 --- a/packages/services/usage/.env.template +++ b/packages/services/usage/.env.template @@ -4,5 +4,6 @@ KAFKA_BROKER="localhost:9092" KAFKA_BUFFER_SIZE="10" KAFKA_BUFFER_INTERVAL="5000" KAFKA_BUFFER_DYNAMIC="true" +KAFKA_TOPIC="usage_reports_v2" PORT=4001 -RATE_LIMIT_ENDPOINT="http://localhost:4012" \ No newline at end of file +RATE_LIMIT_ENDPOINT="http://localhost:4012" diff --git a/packages/services/usage/src/index.ts b/packages/services/usage/src/index.ts index 97c966f91..aa9a4f90f 100644 --- a/packages/services/usage/src/index.ts +++ b/packages/services/usage/src/index.ts @@ -33,7 +33,7 @@ async function main() { const { collect, readiness, start, stop } = createUsage({ logger: server.log, kafka: { - topic: 'usage_reports_v2', + topic: ensureEnv('KAFKA_TOPIC'), buffer: { size: ensureEnv('KAFKA_BUFFER_SIZE', 'number'), interval: ensureEnv('KAFKA_BUFFER_INTERVAL', 'number'),