mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
perf + feat: introduce SimpleCache and specify getMetricsTags time range (#178)
1. introduce a caching abstraction SimpleCache 2. add time range filtering to getMetricsTags query
This commit is contained in:
parent
619bd1a952
commit
423fc22346
11 changed files with 231 additions and 81 deletions
6
.changeset/eighty-cows-leave.md
Normal file
6
.changeset/eighty-cows-leave.md
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
'@hyperdx/api': patch
|
||||
'@hyperdx/app': patch
|
||||
---
|
||||
|
||||
perf + feat: introduce SimpleCache and specify getMetricsTags time range
|
||||
|
|
@ -592,30 +592,20 @@ export const getCHServerMetrics = async () => {
|
|||
}, {});
|
||||
};
|
||||
|
||||
export const getMetricsTags = async (teamId: string) => {
|
||||
if (config.CACHE_METRICS_TAGS) {
|
||||
logger.info({
|
||||
message: 'getMetricsTags: attempting cached fetch',
|
||||
teamId: teamId,
|
||||
});
|
||||
return getMetricsTagsCached(teamId);
|
||||
} else {
|
||||
logger.info({
|
||||
message: 'getMetricsTags: skipping cache, direct query',
|
||||
teamId: teamId,
|
||||
});
|
||||
return getMetricsTagsUncached(teamId);
|
||||
}
|
||||
};
|
||||
|
||||
// NB preserving this exactly as in original for this ticket
|
||||
// but looks like a good candidate for a query refactor baed on comment
|
||||
const getMetricsTagsUncached = async (teamId: string) => {
|
||||
export const getMetricsTags = async ({
|
||||
teamId,
|
||||
startTime,
|
||||
endTime,
|
||||
}: {
|
||||
teamId: string;
|
||||
startTime: number; // unix in ms
|
||||
endTime: number; // unix in ms
|
||||
}) => {
|
||||
const tableName = `default.${TableName.Metric}`;
|
||||
// TODO: remove 'data_type' in the name field
|
||||
const query = SqlString.format(
|
||||
`
|
||||
SELECT
|
||||
SELECT
|
||||
any(is_delta) as is_delta,
|
||||
any(is_monotonic) as is_monotonic,
|
||||
any(unit) as unit,
|
||||
|
|
@ -623,10 +613,14 @@ const getMetricsTagsUncached = async (teamId: string) => {
|
|||
format('{} - {}', name, data_type) as name,
|
||||
groupUniqArray(_string_attributes) AS tags
|
||||
FROM ??
|
||||
WHERE (?)
|
||||
GROUP BY name, data_type
|
||||
ORDER BY name
|
||||
`,
|
||||
[tableName],
|
||||
[
|
||||
tableName,
|
||||
SqlString.raw(SearchQueryBuilder.timestampInBetween(startTime, endTime)),
|
||||
],
|
||||
);
|
||||
const ts = Date.now();
|
||||
const rows = await client.query({
|
||||
|
|
@ -639,37 +633,24 @@ const getMetricsTagsUncached = async (teamId: string) => {
|
|||
),
|
||||
},
|
||||
});
|
||||
const result = await rows.json<ResponseJSON<{ names: string[] }>>();
|
||||
const result = await rows.json<
|
||||
ResponseJSON<{
|
||||
data_type: string;
|
||||
is_delta: boolean;
|
||||
is_monotonic: boolean;
|
||||
name: string;
|
||||
tags: Record<string, string>[];
|
||||
unit: string;
|
||||
}>
|
||||
>();
|
||||
logger.info({
|
||||
message: 'getMetricsProps',
|
||||
message: 'getMetricsTags',
|
||||
query,
|
||||
took: Date.now() - ts,
|
||||
});
|
||||
return result;
|
||||
};
|
||||
|
||||
const getMetricsTagsCached = async (teamId: string) => {
|
||||
const redisKey = `metrics-tags-${teamId}`;
|
||||
const cached = await redisClient.get(redisKey);
|
||||
if (cached) {
|
||||
logger.info({
|
||||
message: 'getMetricsTags: cache hit',
|
||||
teamId: teamId,
|
||||
});
|
||||
return JSON.parse(cached);
|
||||
} else {
|
||||
logger.info({
|
||||
message: 'getMetricsTags: cache miss',
|
||||
teamId: teamId,
|
||||
});
|
||||
const result = await getMetricsTagsUncached(teamId);
|
||||
await redisClient.set(redisKey, JSON.stringify(result), {
|
||||
PX: ms(config.CACHE_METRICS_EXPIRATION_IN_SEC.toString() + 's'),
|
||||
});
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
export const isRateAggFn = (aggFn: AggFn) => {
|
||||
return (
|
||||
aggFn === AggFn.SumRate ||
|
||||
|
|
|
|||
|
|
@ -26,8 +26,3 @@ export const PORT = Number.parseInt(env.PORT as string);
|
|||
export const REDIS_URL = env.REDIS_URL as string;
|
||||
export const SERVER_URL = env.SERVER_URL as string;
|
||||
export const USAGE_STATS_ENABLED = env.USAGE_STATS_ENABLED !== 'false';
|
||||
export const CACHE_METRICS_TAGS = env.CACHE_METRICS_TAGS !== 'false';
|
||||
// deliberately using '||' instead of '??' to avoid empty/falsey values
|
||||
// returning as a string since all env values are stringish (to be parsed using ms())
|
||||
export const CACHE_METRICS_EXPIRATION_IN_SEC =
|
||||
(env.CACHE_METRICS_EXPIRATION_IN_SEC as string) || '600';
|
||||
|
|
|
|||
|
|
@ -12,9 +12,10 @@ import {
|
|||
LogType,
|
||||
MetricModel,
|
||||
} from '@/utils/logParser';
|
||||
import { redisClient } from '@/utils/redis';
|
||||
|
||||
import * as config from './config';
|
||||
import { createTeam, getTeam } from './controllers/team';
|
||||
import { getTeam } from './controllers/team';
|
||||
import { findUserByEmail } from './controllers/user';
|
||||
import { mongooseConnection } from './models';
|
||||
import Server from './server';
|
||||
|
|
@ -65,6 +66,9 @@ class MockServer extends Server {
|
|||
}
|
||||
|
||||
async start(): Promise<void> {
|
||||
if (!config.IS_CI) {
|
||||
throw new Error('ONLY execute this in CI env 😈 !!!');
|
||||
}
|
||||
await super.start();
|
||||
await initCiEnvs();
|
||||
}
|
||||
|
|
@ -117,6 +121,38 @@ export const getLoggedInAgent = async (server: MockServer) => {
|
|||
user,
|
||||
};
|
||||
};
|
||||
|
||||
// ------------------------------------------------
|
||||
// ------------------ Redis -----------------------
|
||||
// ------------------------------------------------
|
||||
export const clearRedis = async () => {
|
||||
if (!config.IS_CI) {
|
||||
throw new Error('ONLY execute this in CI env 😈 !!!');
|
||||
}
|
||||
await redisClient.flushAll();
|
||||
};
|
||||
|
||||
// ------------------------------------------------
|
||||
// ------------------ Clickhouse ------------------
|
||||
// ------------------------------------------------
|
||||
export const clearClickhouseTables = async () => {
|
||||
if (!config.IS_CI) {
|
||||
throw new Error('ONLY execute this in CI env 😈 !!!');
|
||||
}
|
||||
const promises: any[] = [];
|
||||
for (const table of Object.values(clickhouse.TableName)) {
|
||||
promises.push(
|
||||
clickhouse.client.command({
|
||||
query: `TRUNCATE TABLE default.${table}`,
|
||||
clickhouse_settings: {
|
||||
wait_end_of_query: 1,
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
await Promise.all(promises);
|
||||
};
|
||||
|
||||
export function buildEvent({
|
||||
level,
|
||||
source = 'test',
|
||||
|
|
|
|||
|
|
@ -50,6 +50,19 @@ const MOCK_DASHBOARD = {
|
|||
describe('alerts router', () => {
|
||||
const server = getServer();
|
||||
|
||||
beforeAll(async () => {
|
||||
await server.start();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await clearDBCollections();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await server.closeHttpServer();
|
||||
await closeDB();
|
||||
});
|
||||
|
||||
it('index has alerts attached to dashboards', async () => {
|
||||
const { agent } = await getLoggedInAgent(server);
|
||||
|
||||
|
|
@ -79,17 +92,4 @@ describe('alerts router', () => {
|
|||
expect(alert.dashboard).toBeDefined();
|
||||
}
|
||||
});
|
||||
|
||||
beforeAll(async () => {
|
||||
await server.start();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await clearDBCollections();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await server.closeHttpServer();
|
||||
await closeDB();
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -50,6 +50,19 @@ const MOCK_DASHBOARD = {
|
|||
describe('dashboard router', () => {
|
||||
const server = getServer();
|
||||
|
||||
beforeAll(async () => {
|
||||
await server.start();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await clearDBCollections();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await server.closeHttpServer();
|
||||
await closeDB();
|
||||
});
|
||||
|
||||
it('deletes attached alerts when deleting charts', async () => {
|
||||
const { agent } = await getLoggedInAgent(server);
|
||||
|
||||
|
|
@ -102,17 +115,4 @@ describe('dashboard router', () => {
|
|||
.sort();
|
||||
expect(allChartsPostDelete).toEqual(chartsWithAlertsPostDelete);
|
||||
});
|
||||
|
||||
beforeAll(async () => {
|
||||
await server.start();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await clearDBCollections();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await server.closeHttpServer();
|
||||
await closeDB();
|
||||
});
|
||||
});
|
||||
|
|
|
|||
77
packages/api/src/routers/api/__tests__/metrics.test.ts
Normal file
77
packages/api/src/routers/api/__tests__/metrics.test.ts
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
import * as clickhouse from '@/clickhouse';
|
||||
import {
|
||||
buildMetricSeries,
|
||||
clearClickhouseTables,
|
||||
clearDBCollections,
|
||||
clearRedis,
|
||||
closeDB,
|
||||
getLoggedInAgent,
|
||||
getServer,
|
||||
} from '@/fixtures';
|
||||
|
||||
describe('metrics router', () => {
|
||||
const server = getServer();
|
||||
|
||||
beforeAll(async () => {
|
||||
await server.start();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await clearDBCollections();
|
||||
await clearClickhouseTables();
|
||||
await clearRedis();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await server.closeHttpServer();
|
||||
await closeDB();
|
||||
});
|
||||
|
||||
it('GET /metrics/tags', async () => {
|
||||
const now = Date.now();
|
||||
await clickhouse.bulkInsertTeamMetricStream(
|
||||
buildMetricSeries({
|
||||
name: 'test.cpu',
|
||||
tags: { host: 'host1', foo: 'bar' },
|
||||
data_type: clickhouse.MetricsDataType.Gauge,
|
||||
is_monotonic: false,
|
||||
is_delta: false,
|
||||
unit: 'Percent',
|
||||
points: [{ value: 1, timestamp: now }],
|
||||
}),
|
||||
);
|
||||
await clickhouse.bulkInsertTeamMetricStream(
|
||||
buildMetricSeries({
|
||||
name: 'test.cpu',
|
||||
tags: { host: 'host2', foo2: 'bar2' },
|
||||
data_type: clickhouse.MetricsDataType.Gauge,
|
||||
is_monotonic: false,
|
||||
is_delta: false,
|
||||
unit: 'Percent',
|
||||
points: [{ value: 1, timestamp: now }],
|
||||
}),
|
||||
);
|
||||
|
||||
const { agent } = await getLoggedInAgent(server);
|
||||
const results = await agent.get('/metrics/tags').expect(200);
|
||||
expect(results.body.data).toEqual([
|
||||
{
|
||||
is_delta: false,
|
||||
is_monotonic: false,
|
||||
unit: 'Percent',
|
||||
data_type: 'Gauge',
|
||||
name: 'test.cpu - Gauge',
|
||||
tags: [
|
||||
{
|
||||
foo2: 'bar2',
|
||||
host: 'host2',
|
||||
},
|
||||
{
|
||||
foo: 'bar',
|
||||
host: 'host1',
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
|
@ -1,9 +1,11 @@
|
|||
import opentelemetry, { SpanStatusCode } from '@opentelemetry/api';
|
||||
import express from 'express';
|
||||
import ms from 'ms';
|
||||
import { z } from 'zod';
|
||||
import { validateRequest } from 'zod-express-middleware';
|
||||
|
||||
import * as clickhouse from '@/clickhouse';
|
||||
import { SimpleCache } from '@/utils/redis';
|
||||
|
||||
const router = express.Router();
|
||||
|
||||
|
|
@ -13,7 +15,19 @@ router.get('/tags', async (req, res, next) => {
|
|||
if (teamId == null) {
|
||||
return res.sendStatus(403);
|
||||
}
|
||||
res.json(await clickhouse.getMetricsTags(teamId.toString()));
|
||||
|
||||
const nowInMs = Date.now();
|
||||
const simpleCache = new SimpleCache<
|
||||
Awaited<ReturnType<typeof clickhouse.getMetricsTags>>
|
||||
>(`metrics-tags-${teamId}`, ms('10m'), () =>
|
||||
clickhouse.getMetricsTags({
|
||||
// FIXME: fix it 5 days ago for now
|
||||
startTime: nowInMs - ms('5d'),
|
||||
endTime: nowInMs,
|
||||
teamId: teamId.toString(),
|
||||
}),
|
||||
);
|
||||
res.json(await simpleCache.get());
|
||||
} catch (e) {
|
||||
const span = opentelemetry.trace.getActiveSpan();
|
||||
span?.recordException(e as Error);
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ describe('external api v1', () => {
|
|||
},
|
||||
],
|
||||
rows: 1,
|
||||
});
|
||||
} as any);
|
||||
const { agent, user } = await getLoggedInAgent(server);
|
||||
const resp = await agent
|
||||
.get(`/api/v1/metrics/tags`)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import { getTeam } from '@/controllers/team';
|
|||
import { validateUserAccessKey } from '@/middleware/auth';
|
||||
import { Api400Error, Api403Error } from '@/utils/errors';
|
||||
import rateLimiter from '@/utils/rateLimiter';
|
||||
import { SimpleCache } from '@/utils/redis';
|
||||
|
||||
const router = express.Router();
|
||||
|
||||
|
|
@ -154,7 +155,19 @@ router.get(
|
|||
if (teamId == null) {
|
||||
throw new Api403Error('Forbidden');
|
||||
}
|
||||
const tags = await clickhouse.getMetricsTags(teamId.toString());
|
||||
|
||||
const nowInMs = Date.now();
|
||||
const simpleCache = new SimpleCache<
|
||||
Awaited<ReturnType<typeof clickhouse.getMetricsTags>>
|
||||
>(`metrics-tags-${teamId}`, ms('10m'), () =>
|
||||
clickhouse.getMetricsTags({
|
||||
// FIXME: fix it 5 days ago for now
|
||||
startTime: nowInMs - ms('5d'),
|
||||
endTime: nowInMs,
|
||||
teamId: teamId.toString(),
|
||||
}),
|
||||
);
|
||||
const tags = await simpleCache.get();
|
||||
res.json({
|
||||
data: tags.data.map(tag => ({
|
||||
// FIXME: unify the return type of both internal and external APIs
|
||||
|
|
|
|||
|
|
@ -12,6 +12,34 @@ client.on('error', (err: any) => {
|
|||
logger.error('Redis error: ', serializeError(err));
|
||||
});
|
||||
|
||||
class SimpleCache<T> {
|
||||
constructor(
|
||||
private readonly key: string,
|
||||
private readonly ttlInMs: number,
|
||||
private readonly fetcher: () => Promise<T>,
|
||||
) {}
|
||||
|
||||
async get(): Promise<T> {
|
||||
const cached = await client.get(this.key);
|
||||
if (cached != null) {
|
||||
logger.info({
|
||||
message: 'SimpleCache: cache hit',
|
||||
key: this.key,
|
||||
});
|
||||
return JSON.parse(cached);
|
||||
}
|
||||
logger.info({
|
||||
message: 'SimpleCache: cache miss',
|
||||
key: this.key,
|
||||
});
|
||||
const result = await this.fetcher();
|
||||
await client.set(this.key, JSON.stringify(result), {
|
||||
PX: this.ttlInMs,
|
||||
});
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
export default client;
|
||||
|
||||
export { client as redisClient };
|
||||
export { client as redisClient, SimpleCache };
|
||||
|
|
|
|||
Loading…
Reference in a new issue