diff --git a/packages/services/usage-ingestor/src/writer.ts b/packages/services/usage-ingestor/src/writer.ts index b74c987ec..9ffe44b4e 100644 --- a/packages/services/usage-ingestor/src/writer.ts +++ b/packages/services/usage-ingestor/src/writer.ts @@ -62,9 +62,9 @@ export function createWriter({ const sql = `INSERT INTO operations (${operationsFields}) FORMAT CSV`; await Promise.all([ - writeCsv(clickhouse, agents, sql, compressed), + writeCsv(clickhouse, agents, sql, compressed, logger, 3), clickhouseCloud - ? writeCsv(clickhouseCloud, agents, sql, compressed).catch(error => { + ? writeCsv(clickhouseCloud, agents, sql, compressed, logger, 1).catch(error => { logger.error('Failed to write operations to ClickHouse Cloud %s', error); // Ignore errors from clickhouse cloud return Promise.resolve(); @@ -78,9 +78,9 @@ export function createWriter({ const sql = `INSERT INTO operation_collection (${registryFields}) FORMAT CSV`; await Promise.all([ - writeCsv(clickhouse, agents, sql, compressed), + writeCsv(clickhouse, agents, sql, compressed, logger, 3), clickhouseCloud - ? writeCsv(clickhouseCloud, agents, sql, compressed).catch(error => { + ? writeCsv(clickhouseCloud, agents, sql, compressed, logger, 1).catch(error => { logger.error('Failed to write operation_collection to ClickHouse Cloud %s', error); // Ignore errors from clickhouse cloud return Promise.resolve(); @@ -96,7 +96,9 @@ export function createWriter({ clickhouse, agents, `INSERT INTO operations_new (${legacyOperationsFields}) FORMAT CSV`, - await compress(csv) + await compress(csv), + logger, + 3 ); }, async writeRegistry(records: string[]) { @@ -105,7 +107,9 @@ export function createWriter({ clickhouse, agents, `INSERT INTO operations_registry (${legacyRegistryFields}) FORMAT CSV`, - await compress(csv) + await compress(csv), + logger, + 3 ); }, }, @@ -123,7 +127,9 @@ async function writeCsv( https: Agent.HttpsAgent; }, query: string, - body: Buffer + body: Buffer, + logger: FastifyLoggerInstance, + maxRetry: number ) { return got .post(`${config.protocol ?? 'https'}://${config.host}:${config.port}`, { @@ -140,11 +146,14 @@ async function writeCsv( }, retry: { calculateDelay(info) { - if (info.attemptCount >= 5) { - // After 5 retries, stop. + if (info.attemptCount >= maxRetry) { + logger.warn('Exceeded the retry limit (%s/%s) for %s', info.attemptCount, maxRetry, query); + // After N retries, stop. return 0; } + logger.debug('Retry %s/%s for %s', info.attemptCount, maxRetry, query); + return info.attemptCount * 250; }, },