diff --git a/docker-compose.ci.yml b/docker-compose.ci.yml index c18e4ca8..3c60f962 100644 --- a/docker-compose.ci.yml +++ b/docker-compose.ci.yml @@ -57,7 +57,6 @@ services: # ports: # - 9000:9000 environment: - AGGREGATOR_PAYLOAD_SIZE_LIMIT: '64mb' APP_TYPE: 'api' CLICKHOUSE_HOST: http://ch-server:8123 CLICKHOUSE_PASSWORD: api diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index cf816840..b3695d77 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -113,7 +113,6 @@ services: ports: - ${HYPERDX_API_PORT}:${HYPERDX_API_PORT} environment: - AGGREGATOR_API_URL: 'http://aggregator:8001' APP_TYPE: 'api' CLICKHOUSE_HOST: http://ch-server:8123 CLICKHOUSE_LOG_LEVEL: ${HYPERDX_LOG_LEVEL} diff --git a/docker/clickhouse/local/users.xml b/docker/clickhouse/local/users.xml index 42f36ae4..80c8d4fa 100644 --- a/docker/clickhouse/local/users.xml +++ b/docker/clickhouse/local/users.xml @@ -26,14 +26,6 @@ default - - aggregator - default - - ::/0 - - default - worker default diff --git a/packages/api/src/aggregator-app.ts b/packages/api/src/aggregator-app.ts deleted file mode 100644 index 4503b9c4..00000000 --- a/packages/api/src/aggregator-app.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { recordException } from '@hyperdx/node-opentelemetry'; -import compression from 'compression'; -import type { NextFunction, Request, Response } from 'express'; -import express from 'express'; -import { serializeError } from 'serialize-error'; - -import * as clickhouse from './clickhouse'; -import * as config from './config'; -import { mongooseConnection } from './models'; -import routers from './routers/aggregator'; -import { BaseError, StatusCode } from './utils/errors'; -import logger, { expressLogger } from './utils/logger'; - -if (!config.AGGREGATOR_PAYLOAD_SIZE_LIMIT) { - throw new Error('AGGREGATOR_PAYLOAD_SIZE_LIMIT is not defined'); -} - -const app: express.Application = express(); - -const healthCheckMiddleware = async ( - req: Request, - res: Response, - next: NextFunction, -) => { - if (mongooseConnection.readyState !== 1) { - logger.error('MongoDB is down!'); - return res.status(StatusCode.INTERNAL_SERVER).send('MongoDB is down!'); - } - - try { - await clickhouse.healthCheck(); - } catch (e) { - logger.error('Clickhouse is down!'); - return res.status(StatusCode.INTERNAL_SERVER).send('Clickhouse is down!'); - } - next(); -}; - -app.disable('x-powered-by'); -app.use(compression()); -app.use(express.json({ limit: config.AGGREGATOR_PAYLOAD_SIZE_LIMIT })); // WARNING: should be greater than the upstream batch size limit -app.use(express.text({ limit: config.AGGREGATOR_PAYLOAD_SIZE_LIMIT })); -app.use( - express.urlencoded({ - extended: false, - limit: config.AGGREGATOR_PAYLOAD_SIZE_LIMIT, - }), -); - -app.use(expressLogger); - -// --------------------------------------------------------- -// -------------------- Routers ---------------------------- -// --------------------------------------------------------- -app.use('/', healthCheckMiddleware, routers.rootRouter); -// --------------------------------------------------------- - -// error handling -app.use( - ( - err: BaseError & { status?: number }, - _: Request, - res: Response, - next: NextFunction, - ) => { - void recordException(err, { - mechanism: { - type: 'generic', - handled: false, - }, - }); - - // TODO: REPLACED WITH recordException once we enable tracing SDK - logger.error({ - location: 'appErrorHandler', - error: serializeError(err), - }); - - // TODO: for all non 5xx errors - if (err.status === StatusCode.CONTENT_TOO_LARGE) { - // WARNING: return origin status code so the ingestor will tune up the adaptive concurrency properly - return res.sendStatus(err.status); - } - - // WARNING: should always return 500 so the ingestor will queue logs - res.status(StatusCode.INTERNAL_SERVER).send('Something broke!'); - }, -); - -export default app; diff --git a/packages/api/src/config.ts b/packages/api/src/config.ts index ebe712ce..1a4f16ec 100644 --- a/packages/api/src/config.ts +++ b/packages/api/src/config.ts @@ -1,10 +1,7 @@ const env = process.env; export const NODE_ENV = env.NODE_ENV as string; -export const AGGREGATOR_API_URL = env.AGGREGATOR_API_URL as string; -export const AGGREGATOR_PAYLOAD_SIZE_LIMIT = - env.AGGREGATOR_PAYLOAD_SIZE_LIMIT as string; -export const APP_TYPE = env.APP_TYPE as 'api' | 'aggregator' | 'scheduled-task'; +export const APP_TYPE = env.APP_TYPE as 'api' | 'scheduled-task'; export const CLICKHOUSE_HOST = env.CLICKHOUSE_HOST as string; export const CLICKHOUSE_PASSWORD = env.CLICKHOUSE_PASSWORD as string; export const CLICKHOUSE_USER = env.CLICKHOUSE_USER as string; diff --git a/packages/api/src/fixtures.ts b/packages/api/src/fixtures.ts index 62f7658a..dbde6a1d 100644 --- a/packages/api/src/fixtures.ts +++ b/packages/api/src/fixtures.ts @@ -105,16 +105,10 @@ class MockAPIServer extends MockServer { protected readonly appType = 'api'; } -class MockAggregatorServer extends MockServer { - protected readonly appType = 'aggregator'; -} - -export const getServer = (appType: 'api' | 'aggregator' = 'api') => { +export const getServer = (appType: 'api' = 'api') => { switch (appType) { case 'api': return new MockAPIServer(); - case 'aggregator': - return new MockAggregatorServer(); default: throw new Error(`Invalid app type: ${appType}`); } diff --git a/packages/api/src/routers/aggregator/__tests__/root.test.ts b/packages/api/src/routers/aggregator/__tests__/root.test.ts deleted file mode 100644 index adcc9556..00000000 --- a/packages/api/src/routers/aggregator/__tests__/root.test.ts +++ /dev/null @@ -1,107 +0,0 @@ -import _ from 'lodash'; - -import * as clickhouse from '@/clickhouse'; -import { createTeam } from '@/controllers/team'; -import { getAgent, getServer } from '@/fixtures'; -import { sleep } from '@/utils/common'; - -describe('aggregator root router', () => { - const server = getServer('aggregator'); - - beforeAll(async () => { - await server.start(); - }); - - afterEach(async () => { - await server.clearDBs(); - }); - - afterAll(async () => { - await server.stop(); - }); - - it('GET /health', async () => { - const agent = await getAgent(server); - await agent.get('/health').expect(200); - }); - - it('POST / -> should return 400 if no logs', async () => { - const agent = await getAgent(server); - await agent.post('/').send({}).expect(400); - }); - - it('POST / -> should ingest logs', async () => { - const team = await createTeam({ name: 'test-team' }); - const agent = await getAgent(server); - await agent.post('/').send([ - { - b: { - _hdx_body: 'Initializing ClickHouse...', - level: 'info', - message: 'Initializing ClickHouse...', - }, - h: '509a8b2dea19', - hdx_platform: 'nodejs', - hdx_token: team.apiKey, - hdx_token_hash: '2f4da895de6a20c100c28daaa5c07c51', - path: '/', - r: { level: 'info', message: 'Initializing ClickHouse...' }, - s_id: null, - sn: 0, - st: 'info', - sv: 'hdx-oss-dev-api', - t_id: null, - ts: 1704517334214000000, - tso: 1704517336156579600, - }, - ]); - - // wait for data to be committed to clickhouse - await sleep(500); - - const resp = await clickhouse.client.query({ - query: `SELECT * FROM default.${clickhouse.TableName.LogStream}`, - format: 'JSON', - }); - const result: any = await resp.json(); - expect(result.data.length).toBe(1); - expect(result.data.map((row: any) => _.omit(row, ['id', '_created_at']))) - .toMatchInlineSnapshot(` -Array [ - Object { - "_host": "509a8b2dea19", - "_namespace": "", - "_platform": "nodejs", - "_service": "hdx-oss-dev-api", - "_source": "{\\"level\\":\\"info\\",\\"message\\":\\"Initializing ClickHouse...\\"}", - "bool.names": Array [], - "bool.values": Array [], - "end_timestamp": "1970-01-01T00:00:00.000000000Z", - "number.names": Array [], - "number.values": Array [], - "observed_timestamp": "2024-01-06T05:02:16.156579600Z", - "parent_span_id": "", - "severity_number": 0, - "severity_text": "info", - "span_id": "", - "span_name": "", - "string.names": Array [ - "_hdx_body", - "level", - "message", - ], - "string.values": Array [ - "Initializing ClickHouse...", - "info", - "Initializing ClickHouse...", - ], - "timestamp": "2024-01-06T05:02:14.214000000Z", - "trace_id": "", - "type": "log", - }, -] -`); - }); - - // TODO: test metrics -}); diff --git a/packages/api/src/routers/aggregator/index.ts b/packages/api/src/routers/aggregator/index.ts deleted file mode 100644 index 67235cb0..00000000 --- a/packages/api/src/routers/aggregator/index.ts +++ /dev/null @@ -1,5 +0,0 @@ -import rootRouter from './root'; - -export default { - rootRouter, -}; diff --git a/packages/api/src/routers/aggregator/root.ts b/packages/api/src/routers/aggregator/root.ts deleted file mode 100644 index 299f359d..00000000 --- a/packages/api/src/routers/aggregator/root.ts +++ /dev/null @@ -1,118 +0,0 @@ -import express from 'express'; -import groupBy from 'lodash/groupBy'; - -import { - bulkInsertLogStream, - bulkInsertRrwebEvents, - bulkInsertTeamMetricStream, -} from '@/clickhouse'; -import * as config from '@/config'; -import { getTeamByApiKey } from '@/controllers/team'; -import logger from '@/utils/logger'; -import type { VectorLog, VectorMetric } from '@/utils/logParser'; -import { - extractApiKey, - vectorLogParser, - vectorMetricParser, - vectorRrwebParser, -} from '@/utils/logParser'; - -const router = express.Router(); - -const bulkInsert = async ( - hdxTelemetry: string | undefined, - apiKey: string, - data: (VectorLog | VectorMetric)[], -) => { - const team = await getTeamByApiKey(apiKey); - if (team) { - switch (hdxTelemetry) { - case 'metric': - await bulkInsertTeamMetricStream( - vectorMetricParser.parse(data as VectorMetric[]), - ); - break; - default: { - const rrwebEvents: VectorLog[] = []; - const logs: VectorLog[] = []; - for (const log of data as VectorLog[]) { - if (log.hdx_platform === 'rrweb') { - rrwebEvents.push(log); - } else { - logs.push(log); - } - } - const promises = [bulkInsertLogStream(vectorLogParser.parse(logs))]; - if (rrwebEvents.length > 0) { - promises.push( - bulkInsertRrwebEvents(vectorRrwebParser.parse(rrwebEvents)), - ); - } - await Promise.all(promises); - break; - } - } - } -}; - -router.get('/health', (_, res) => { - res.send({ data: 'OK', version: config.CODE_VERSION }); -}); - -// bulk insert logs -router.post('/', async (req, res, next) => { - const { telemetry } = req.query; - const hdxTelemetry = (telemetry ?? 'log') as string; - try { - const logs: (VectorLog | VectorMetric)[] = req.body; - if (!Array.isArray(logs)) { - return res.sendStatus(400); - } - // TODO: move this to the end of the request so vector will buffer the logs - // Need to check request.timeout_secs config - res.sendStatus(200); - - logger.info({ - message: `Received ${hdxTelemetry}`, - size: JSON.stringify(logs).length, - n: logs.length, - }); - - const filteredLogs = logs - .map(log => ({ - ...log, - hdx_apiKey: extractApiKey(log), - })) - // check hdx_platform values ? - .filter(log => log.hdx_platform && log.hdx_apiKey); - - if (logs.length - filteredLogs.length > 0) { - // TEMP: DEBUGGING (remove later) - const droppedLogs = logs - .map(log => ({ - ...log, - hdx_apiKey: extractApiKey(log), - })) - .filter(log => !log.hdx_platform || !log.hdx_apiKey); - logger.info({ - message: `Dropped ${hdxTelemetry}`, - n: filteredLogs.length, - diff: logs.length - filteredLogs.length, - droppedLogs, - }); - } - - if (filteredLogs.length > 0) { - const groupedLogs = groupBy(filteredLogs, 'hdx_apiKey'); - await Promise.all( - Object.entries(groupedLogs).map(([apiKey, logs]) => - bulkInsert(hdxTelemetry, apiKey, logs), - ), - ); - } - } catch (e) { - next(e); - } -}); - -export default router; diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index f8ec1a89..45b7a221 100644 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -21,11 +21,6 @@ export default class Server { // eslint-disable-next-line n/no-unsupported-features/es-syntax (await import('./api-app').then(m => m.default)) as any, ); - case 'aggregator': - return http.createServer( - // eslint-disable-next-line n/no-unsupported-features/es-syntax - (await import('./aggregator-app').then(m => m.default)) as any, - ); default: throw new Error(`Invalid APP_TYPE: ${config.APP_TYPE}`); } diff --git a/packages/api/src/tasks/usageStats.ts b/packages/api/src/tasks/usageStats.ts index eba0e526..db2f8661 100644 --- a/packages/api/src/tasks/usageStats.ts +++ b/packages/api/src/tasks/usageStats.ts @@ -14,7 +14,7 @@ const logger = winston.createLogger({ transports: [ HyperDX.getWinstonTransport('info', { apiKey: '3f26ffad-14cf-4fb7-9dc9-e64fa0b84ee0', // hyperdx usage stats service api key - baseUrl: 'https://in.hyperdx.io', + baseUrl: 'https://in-otel.hyperdx.io/v1/logs', maxLevel: 'info', service: 'hyperdx-oss-usage-stats', } as any), @@ -61,32 +61,17 @@ const healthChecks = async () => { } }; - const ingestorUrl = new URL(config.INGESTOR_API_URL ?? ''); const otelCollectorUrl = new URL(config.OTEL_EXPORTER_OTLP_ENDPOINT ?? ''); - const aggregatorUrl = new URL(config.AGGREGATOR_API_URL ?? ''); - const [pingIngestor, pingOtelCollector, pingAggregator, pingMiner, pingCH] = - await Promise.all([ - ingestorUrl.hostname && ingestorUrl.protocol - ? ping(`${ingestorUrl.protocol}//${ingestorUrl.hostname}:8686/health`) - : Promise.resolve(null), - otelCollectorUrl.hostname && otelCollectorUrl.protocol - ? ping( - `${otelCollectorUrl.protocol}//${otelCollectorUrl.hostname}:13133`, - ) - : Promise.resolve(null), - aggregatorUrl.href - ? ping(`${aggregatorUrl.href}health`) - : Promise.resolve(null), - ping(`${config.MINER_API_URL}/health`), - ping(`${config.CLICKHOUSE_HOST}/ping`), - ]); + const [pingOtelCollector, pingCH] = await Promise.all([ + otelCollectorUrl.hostname && otelCollectorUrl.protocol + ? ping(`${otelCollectorUrl.protocol}//${otelCollectorUrl.hostname}:13133`) + : Promise.resolve(null), + ping(`${config.CLICKHOUSE_HOST}/ping`), + ]); return { - pingIngestor, pingOtelCollector, - pingAggregator, - pingMiner, pingCH, }; };