style: remove aggregator related codes (#521)

This commit is contained in:
Warren 2024-12-09 09:59:36 -08:00 committed by GitHub
parent 5bdaa9f181
commit 7a766f7977
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 9 additions and 368 deletions

View file

@ -57,7 +57,6 @@ services:
# ports:
# - 9000:9000
environment:
AGGREGATOR_PAYLOAD_SIZE_LIMIT: '64mb'
APP_TYPE: 'api'
CLICKHOUSE_HOST: http://ch-server:8123
CLICKHOUSE_PASSWORD: api

View file

@ -113,7 +113,6 @@ services:
ports:
- ${HYPERDX_API_PORT}:${HYPERDX_API_PORT}
environment:
AGGREGATOR_API_URL: 'http://aggregator:8001'
APP_TYPE: 'api'
CLICKHOUSE_HOST: http://ch-server:8123
CLICKHOUSE_LOG_LEVEL: ${HYPERDX_LOG_LEVEL}

View file

@ -26,14 +26,6 @@
</networks>
<quota>default</quota>
</api>
<aggregator>
<password>aggregator</password>
<profile>default</profile>
<networks>
<ip>::/0</ip>
</networks>
<quota>default</quota>
</aggregator>
<worker>
<password>worker</password>
<profile>default</profile>

View file

@ -1,90 +0,0 @@
import { recordException } from '@hyperdx/node-opentelemetry';
import compression from 'compression';
import type { NextFunction, Request, Response } from 'express';
import express from 'express';
import { serializeError } from 'serialize-error';
import * as clickhouse from './clickhouse';
import * as config from './config';
import { mongooseConnection } from './models';
import routers from './routers/aggregator';
import { BaseError, StatusCode } from './utils/errors';
import logger, { expressLogger } from './utils/logger';
if (!config.AGGREGATOR_PAYLOAD_SIZE_LIMIT) {
throw new Error('AGGREGATOR_PAYLOAD_SIZE_LIMIT is not defined');
}
const app: express.Application = express();
const healthCheckMiddleware = async (
req: Request,
res: Response,
next: NextFunction,
) => {
if (mongooseConnection.readyState !== 1) {
logger.error('MongoDB is down!');
return res.status(StatusCode.INTERNAL_SERVER).send('MongoDB is down!');
}
try {
await clickhouse.healthCheck();
} catch (e) {
logger.error('Clickhouse is down!');
return res.status(StatusCode.INTERNAL_SERVER).send('Clickhouse is down!');
}
next();
};
app.disable('x-powered-by');
app.use(compression());
app.use(express.json({ limit: config.AGGREGATOR_PAYLOAD_SIZE_LIMIT })); // WARNING: should be greater than the upstream batch size limit
app.use(express.text({ limit: config.AGGREGATOR_PAYLOAD_SIZE_LIMIT }));
app.use(
express.urlencoded({
extended: false,
limit: config.AGGREGATOR_PAYLOAD_SIZE_LIMIT,
}),
);
app.use(expressLogger);
// ---------------------------------------------------------
// -------------------- Routers ----------------------------
// ---------------------------------------------------------
app.use('/', healthCheckMiddleware, routers.rootRouter);
// ---------------------------------------------------------
// error handling
app.use(
(
err: BaseError & { status?: number },
_: Request,
res: Response,
next: NextFunction,
) => {
void recordException(err, {
mechanism: {
type: 'generic',
handled: false,
},
});
// TODO: REPLACED WITH recordException once we enable tracing SDK
logger.error({
location: 'appErrorHandler',
error: serializeError(err),
});
// TODO: for all non 5xx errors
if (err.status === StatusCode.CONTENT_TOO_LARGE) {
// WARNING: return origin status code so the ingestor will tune up the adaptive concurrency properly
return res.sendStatus(err.status);
}
// WARNING: should always return 500 so the ingestor will queue logs
res.status(StatusCode.INTERNAL_SERVER).send('Something broke!');
},
);
export default app;

View file

@ -1,10 +1,7 @@
const env = process.env;
export const NODE_ENV = env.NODE_ENV as string;
export const AGGREGATOR_API_URL = env.AGGREGATOR_API_URL as string;
export const AGGREGATOR_PAYLOAD_SIZE_LIMIT =
env.AGGREGATOR_PAYLOAD_SIZE_LIMIT as string;
export const APP_TYPE = env.APP_TYPE as 'api' | 'aggregator' | 'scheduled-task';
export const APP_TYPE = env.APP_TYPE as 'api' | 'scheduled-task';
export const CLICKHOUSE_HOST = env.CLICKHOUSE_HOST as string;
export const CLICKHOUSE_PASSWORD = env.CLICKHOUSE_PASSWORD as string;
export const CLICKHOUSE_USER = env.CLICKHOUSE_USER as string;

View file

@ -105,16 +105,10 @@ class MockAPIServer extends MockServer {
protected readonly appType = 'api';
}
class MockAggregatorServer extends MockServer {
protected readonly appType = 'aggregator';
}
export const getServer = (appType: 'api' | 'aggregator' = 'api') => {
export const getServer = (appType: 'api' = 'api') => {
switch (appType) {
case 'api':
return new MockAPIServer();
case 'aggregator':
return new MockAggregatorServer();
default:
throw new Error(`Invalid app type: ${appType}`);
}

View file

@ -1,107 +0,0 @@
import _ from 'lodash';
import * as clickhouse from '@/clickhouse';
import { createTeam } from '@/controllers/team';
import { getAgent, getServer } from '@/fixtures';
import { sleep } from '@/utils/common';
describe('aggregator root router', () => {
const server = getServer('aggregator');
beforeAll(async () => {
await server.start();
});
afterEach(async () => {
await server.clearDBs();
});
afterAll(async () => {
await server.stop();
});
it('GET /health', async () => {
const agent = await getAgent(server);
await agent.get('/health').expect(200);
});
it('POST / -> should return 400 if no logs', async () => {
const agent = await getAgent(server);
await agent.post('/').send({}).expect(400);
});
it('POST / -> should ingest logs', async () => {
const team = await createTeam({ name: 'test-team' });
const agent = await getAgent(server);
await agent.post('/').send([
{
b: {
_hdx_body: 'Initializing ClickHouse...',
level: 'info',
message: 'Initializing ClickHouse...',
},
h: '509a8b2dea19',
hdx_platform: 'nodejs',
hdx_token: team.apiKey,
hdx_token_hash: '2f4da895de6a20c100c28daaa5c07c51',
path: '/',
r: { level: 'info', message: 'Initializing ClickHouse...' },
s_id: null,
sn: 0,
st: 'info',
sv: 'hdx-oss-dev-api',
t_id: null,
ts: 1704517334214000000,
tso: 1704517336156579600,
},
]);
// wait for data to be committed to clickhouse
await sleep(500);
const resp = await clickhouse.client.query({
query: `SELECT * FROM default.${clickhouse.TableName.LogStream}`,
format: 'JSON',
});
const result: any = await resp.json();
expect(result.data.length).toBe(1);
expect(result.data.map((row: any) => _.omit(row, ['id', '_created_at'])))
.toMatchInlineSnapshot(`
Array [
Object {
"_host": "509a8b2dea19",
"_namespace": "",
"_platform": "nodejs",
"_service": "hdx-oss-dev-api",
"_source": "{\\"level\\":\\"info\\",\\"message\\":\\"Initializing ClickHouse...\\"}",
"bool.names": Array [],
"bool.values": Array [],
"end_timestamp": "1970-01-01T00:00:00.000000000Z",
"number.names": Array [],
"number.values": Array [],
"observed_timestamp": "2024-01-06T05:02:16.156579600Z",
"parent_span_id": "",
"severity_number": 0,
"severity_text": "info",
"span_id": "",
"span_name": "",
"string.names": Array [
"_hdx_body",
"level",
"message",
],
"string.values": Array [
"Initializing ClickHouse...",
"info",
"Initializing ClickHouse...",
],
"timestamp": "2024-01-06T05:02:14.214000000Z",
"trace_id": "",
"type": "log",
},
]
`);
});
// TODO: test metrics
});

View file

@ -1,5 +0,0 @@
import rootRouter from './root';
export default {
rootRouter,
};

View file

@ -1,118 +0,0 @@
import express from 'express';
import groupBy from 'lodash/groupBy';
import {
bulkInsertLogStream,
bulkInsertRrwebEvents,
bulkInsertTeamMetricStream,
} from '@/clickhouse';
import * as config from '@/config';
import { getTeamByApiKey } from '@/controllers/team';
import logger from '@/utils/logger';
import type { VectorLog, VectorMetric } from '@/utils/logParser';
import {
extractApiKey,
vectorLogParser,
vectorMetricParser,
vectorRrwebParser,
} from '@/utils/logParser';
const router = express.Router();
const bulkInsert = async (
hdxTelemetry: string | undefined,
apiKey: string,
data: (VectorLog | VectorMetric)[],
) => {
const team = await getTeamByApiKey(apiKey);
if (team) {
switch (hdxTelemetry) {
case 'metric':
await bulkInsertTeamMetricStream(
vectorMetricParser.parse(data as VectorMetric[]),
);
break;
default: {
const rrwebEvents: VectorLog[] = [];
const logs: VectorLog[] = [];
for (const log of data as VectorLog[]) {
if (log.hdx_platform === 'rrweb') {
rrwebEvents.push(log);
} else {
logs.push(log);
}
}
const promises = [bulkInsertLogStream(vectorLogParser.parse(logs))];
if (rrwebEvents.length > 0) {
promises.push(
bulkInsertRrwebEvents(vectorRrwebParser.parse(rrwebEvents)),
);
}
await Promise.all(promises);
break;
}
}
}
};
router.get('/health', (_, res) => {
res.send({ data: 'OK', version: config.CODE_VERSION });
});
// bulk insert logs
router.post('/', async (req, res, next) => {
const { telemetry } = req.query;
const hdxTelemetry = (telemetry ?? 'log') as string;
try {
const logs: (VectorLog | VectorMetric)[] = req.body;
if (!Array.isArray(logs)) {
return res.sendStatus(400);
}
// TODO: move this to the end of the request so vector will buffer the logs
// Need to check request.timeout_secs config
res.sendStatus(200);
logger.info({
message: `Received ${hdxTelemetry}`,
size: JSON.stringify(logs).length,
n: logs.length,
});
const filteredLogs = logs
.map(log => ({
...log,
hdx_apiKey: extractApiKey(log),
}))
// check hdx_platform values ?
.filter(log => log.hdx_platform && log.hdx_apiKey);
if (logs.length - filteredLogs.length > 0) {
// TEMP: DEBUGGING (remove later)
const droppedLogs = logs
.map(log => ({
...log,
hdx_apiKey: extractApiKey(log),
}))
.filter(log => !log.hdx_platform || !log.hdx_apiKey);
logger.info({
message: `Dropped ${hdxTelemetry}`,
n: filteredLogs.length,
diff: logs.length - filteredLogs.length,
droppedLogs,
});
}
if (filteredLogs.length > 0) {
const groupedLogs = groupBy(filteredLogs, 'hdx_apiKey');
await Promise.all(
Object.entries(groupedLogs).map(([apiKey, logs]) =>
bulkInsert(hdxTelemetry, apiKey, logs),
),
);
}
} catch (e) {
next(e);
}
});
export default router;

View file

@ -21,11 +21,6 @@ export default class Server {
// eslint-disable-next-line n/no-unsupported-features/es-syntax
(await import('./api-app').then(m => m.default)) as any,
);
case 'aggregator':
return http.createServer(
// eslint-disable-next-line n/no-unsupported-features/es-syntax
(await import('./aggregator-app').then(m => m.default)) as any,
);
default:
throw new Error(`Invalid APP_TYPE: ${config.APP_TYPE}`);
}

View file

@ -14,7 +14,7 @@ const logger = winston.createLogger({
transports: [
HyperDX.getWinstonTransport('info', {
apiKey: '3f26ffad-14cf-4fb7-9dc9-e64fa0b84ee0', // hyperdx usage stats service api key
baseUrl: 'https://in.hyperdx.io',
baseUrl: 'https://in-otel.hyperdx.io/v1/logs',
maxLevel: 'info',
service: 'hyperdx-oss-usage-stats',
} as any),
@ -61,32 +61,17 @@ const healthChecks = async () => {
}
};
const ingestorUrl = new URL(config.INGESTOR_API_URL ?? '');
const otelCollectorUrl = new URL(config.OTEL_EXPORTER_OTLP_ENDPOINT ?? '');
const aggregatorUrl = new URL(config.AGGREGATOR_API_URL ?? '');
const [pingIngestor, pingOtelCollector, pingAggregator, pingMiner, pingCH] =
await Promise.all([
ingestorUrl.hostname && ingestorUrl.protocol
? ping(`${ingestorUrl.protocol}//${ingestorUrl.hostname}:8686/health`)
: Promise.resolve(null),
otelCollectorUrl.hostname && otelCollectorUrl.protocol
? ping(
`${otelCollectorUrl.protocol}//${otelCollectorUrl.hostname}:13133`,
)
: Promise.resolve(null),
aggregatorUrl.href
? ping(`${aggregatorUrl.href}health`)
: Promise.resolve(null),
ping(`${config.MINER_API_URL}/health`),
ping(`${config.CLICKHOUSE_HOST}/ping`),
]);
const [pingOtelCollector, pingCH] = await Promise.all([
otelCollectorUrl.hostname && otelCollectorUrl.protocol
? ping(`${otelCollectorUrl.protocol}//${otelCollectorUrl.hostname}:13133`)
: Promise.resolve(null),
ping(`${config.CLICKHOUSE_HOST}/ping`),
]);
return {
pingIngestor,
pingOtelCollector,
pingAggregator,
pingMiner,
pingCH,
};
};