convert kafka topic & consumer group as ENV variables (#390)

This commit is contained in:
Teresa 2022-09-26 08:21:40 -04:00 committed by GitHub
parent 4605756a3d
commit df61539648
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 12 additions and 4 deletions

View file

@ -343,6 +343,7 @@ services:
TOKENS_ENDPOINT: http://tokens:3003
RATE_LIMIT_ENDPOINT: http://rate-limit:3009
KAFKA_CONNECTION_MODE: 'docker'
KAFKA_TOPIC: 'usage_reports_v2'
KAFKA_BROKER: broker:29092
KAFKA_BUFFER_SIZE: 350
KAFKA_BUFFER_INTERVAL: 1000
@ -365,6 +366,8 @@ services:
KAFKA_CONNECTION_MODE: 'docker'
KAFKA_BROKER: broker:29092
KAFKA_CONCURRENCY: 1
KAFKA_CONSUMER_GROUP: 'usage-ingestor-v2'
KAFKA_TOPIC: 'usage_reports_v2'
CLICKHOUSE_PROTOCOL: 'http'
CLICKHOUSE_HOST: clickhouse
CLICKHOUSE_PORT: 8123

View file

@ -1,6 +1,8 @@
KAFKA_CONNECTION_MODE="docker"
KAFKA_BROKER="localhost:9092"
KAFKA_CONCURRENCY="1"
KAFKA_CONSUMER_GROUP="usage-ingestor-v2"
KAFKA_TOPIC="usage_reports_v2"
CLICKHOUSE_PROTOCOL="http"
CLICKHOUSE_HOST="localhost"
CLICKHOUSE_PORT="8123"

View file

@ -44,7 +44,8 @@ async function main() {
}
: null,
kafka: {
topic: 'usage_reports_v2',
topic: ensureEnv('KAFKA_TOPIC'),
consumerGroup: ensureEnv('KAFKA_CONSUMER_GROUP'),
concurrency: ensureEnv('KAFKA_CONCURRENCY', 'number'),
connection:
ensureEnv('KAFKA_CONNECTION_MODE') == 'hosted'

View file

@ -42,6 +42,7 @@ export function createIngestor(config: {
clickhouseCloud: ClickHouseConfig | null;
kafka: {
topic: string;
consumerGroup: string;
concurrency: number;
connection:
| {
@ -87,7 +88,7 @@ export function createIngestor(config: {
},
});
const consumer = kafka.consumer({
groupId: `usage-ingestor-v2`,
groupId: config.kafka.consumerGroup,
retry: {
retries: 2,
},

View file

@ -4,5 +4,6 @@ KAFKA_BROKER="localhost:9092"
KAFKA_BUFFER_SIZE="10"
KAFKA_BUFFER_INTERVAL="5000"
KAFKA_BUFFER_DYNAMIC="true"
KAFKA_TOPIC="usage_reports_v2"
PORT=4001
RATE_LIMIT_ENDPOINT="http://localhost:4012"
RATE_LIMIT_ENDPOINT="http://localhost:4012"

View file

@ -33,7 +33,7 @@ async function main() {
const { collect, readiness, start, stop } = createUsage({
logger: server.log,
kafka: {
topic: 'usage_reports_v2',
topic: ensureEnv('KAFKA_TOPIC'),
buffer: {
size: ensureEnv('KAFKA_BUFFER_SIZE', 'number'),
interval: ensureEnv('KAFKA_BUFFER_INTERVAL', 'number'),