perf + feat: introduce SimpleCache and specify getMetricsTags time range (#178)

1. introduce a caching abstraction SimpleCache
2. add time range filtering to getMetricsTags query
This commit is contained in:
Warren 2024-01-03 17:59:24 -08:00 committed by GitHub
parent 619bd1a952
commit 423fc22346
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 231 additions and 81 deletions

View file

@ -0,0 +1,6 @@
---
'@hyperdx/api': patch
'@hyperdx/app': patch
---
perf + feat: introduce SimpleCache and specify getMetricsTags time range

View file

@ -592,30 +592,20 @@ export const getCHServerMetrics = async () => {
}, {});
};
export const getMetricsTags = async (teamId: string) => {
if (config.CACHE_METRICS_TAGS) {
logger.info({
message: 'getMetricsTags: attempting cached fetch',
teamId: teamId,
});
return getMetricsTagsCached(teamId);
} else {
logger.info({
message: 'getMetricsTags: skipping cache, direct query',
teamId: teamId,
});
return getMetricsTagsUncached(teamId);
}
};
// NB preserving this exactly as in original for this ticket
// but looks like a good candidate for a query refactor baed on comment
const getMetricsTagsUncached = async (teamId: string) => {
export const getMetricsTags = async ({
teamId,
startTime,
endTime,
}: {
teamId: string;
startTime: number; // unix in ms
endTime: number; // unix in ms
}) => {
const tableName = `default.${TableName.Metric}`;
// TODO: remove 'data_type' in the name field
const query = SqlString.format(
`
SELECT
SELECT
any(is_delta) as is_delta,
any(is_monotonic) as is_monotonic,
any(unit) as unit,
@ -623,10 +613,14 @@ const getMetricsTagsUncached = async (teamId: string) => {
format('{} - {}', name, data_type) as name,
groupUniqArray(_string_attributes) AS tags
FROM ??
WHERE (?)
GROUP BY name, data_type
ORDER BY name
`,
[tableName],
[
tableName,
SqlString.raw(SearchQueryBuilder.timestampInBetween(startTime, endTime)),
],
);
const ts = Date.now();
const rows = await client.query({
@ -639,37 +633,24 @@ const getMetricsTagsUncached = async (teamId: string) => {
),
},
});
const result = await rows.json<ResponseJSON<{ names: string[] }>>();
const result = await rows.json<
ResponseJSON<{
data_type: string;
is_delta: boolean;
is_monotonic: boolean;
name: string;
tags: Record<string, string>[];
unit: string;
}>
>();
logger.info({
message: 'getMetricsProps',
message: 'getMetricsTags',
query,
took: Date.now() - ts,
});
return result;
};
const getMetricsTagsCached = async (teamId: string) => {
const redisKey = `metrics-tags-${teamId}`;
const cached = await redisClient.get(redisKey);
if (cached) {
logger.info({
message: 'getMetricsTags: cache hit',
teamId: teamId,
});
return JSON.parse(cached);
} else {
logger.info({
message: 'getMetricsTags: cache miss',
teamId: teamId,
});
const result = await getMetricsTagsUncached(teamId);
await redisClient.set(redisKey, JSON.stringify(result), {
PX: ms(config.CACHE_METRICS_EXPIRATION_IN_SEC.toString() + 's'),
});
return result;
}
};
export const isRateAggFn = (aggFn: AggFn) => {
return (
aggFn === AggFn.SumRate ||

View file

@ -26,8 +26,3 @@ export const PORT = Number.parseInt(env.PORT as string);
export const REDIS_URL = env.REDIS_URL as string;
export const SERVER_URL = env.SERVER_URL as string;
export const USAGE_STATS_ENABLED = env.USAGE_STATS_ENABLED !== 'false';
export const CACHE_METRICS_TAGS = env.CACHE_METRICS_TAGS !== 'false';
// deliberately using '||' instead of '??' to avoid empty/falsey values
// returning as a string since all env values are stringish (to be parsed using ms())
export const CACHE_METRICS_EXPIRATION_IN_SEC =
(env.CACHE_METRICS_EXPIRATION_IN_SEC as string) || '600';

View file

@ -12,9 +12,10 @@ import {
LogType,
MetricModel,
} from '@/utils/logParser';
import { redisClient } from '@/utils/redis';
import * as config from './config';
import { createTeam, getTeam } from './controllers/team';
import { getTeam } from './controllers/team';
import { findUserByEmail } from './controllers/user';
import { mongooseConnection } from './models';
import Server from './server';
@ -65,6 +66,9 @@ class MockServer extends Server {
}
async start(): Promise<void> {
if (!config.IS_CI) {
throw new Error('ONLY execute this in CI env 😈 !!!');
}
await super.start();
await initCiEnvs();
}
@ -117,6 +121,38 @@ export const getLoggedInAgent = async (server: MockServer) => {
user,
};
};
// ------------------------------------------------
// ------------------ Redis -----------------------
// ------------------------------------------------
export const clearRedis = async () => {
if (!config.IS_CI) {
throw new Error('ONLY execute this in CI env 😈 !!!');
}
await redisClient.flushAll();
};
// ------------------------------------------------
// ------------------ Clickhouse ------------------
// ------------------------------------------------
export const clearClickhouseTables = async () => {
if (!config.IS_CI) {
throw new Error('ONLY execute this in CI env 😈 !!!');
}
const promises: any[] = [];
for (const table of Object.values(clickhouse.TableName)) {
promises.push(
clickhouse.client.command({
query: `TRUNCATE TABLE default.${table}`,
clickhouse_settings: {
wait_end_of_query: 1,
},
}),
);
}
await Promise.all(promises);
};
export function buildEvent({
level,
source = 'test',

View file

@ -50,6 +50,19 @@ const MOCK_DASHBOARD = {
describe('alerts router', () => {
const server = getServer();
beforeAll(async () => {
await server.start();
});
afterEach(async () => {
await clearDBCollections();
});
afterAll(async () => {
await server.closeHttpServer();
await closeDB();
});
it('index has alerts attached to dashboards', async () => {
const { agent } = await getLoggedInAgent(server);
@ -79,17 +92,4 @@ describe('alerts router', () => {
expect(alert.dashboard).toBeDefined();
}
});
beforeAll(async () => {
await server.start();
});
afterEach(async () => {
await clearDBCollections();
});
afterAll(async () => {
await server.closeHttpServer();
await closeDB();
});
});

View file

@ -50,6 +50,19 @@ const MOCK_DASHBOARD = {
describe('dashboard router', () => {
const server = getServer();
beforeAll(async () => {
await server.start();
});
afterEach(async () => {
await clearDBCollections();
});
afterAll(async () => {
await server.closeHttpServer();
await closeDB();
});
it('deletes attached alerts when deleting charts', async () => {
const { agent } = await getLoggedInAgent(server);
@ -102,17 +115,4 @@ describe('dashboard router', () => {
.sort();
expect(allChartsPostDelete).toEqual(chartsWithAlertsPostDelete);
});
beforeAll(async () => {
await server.start();
});
afterEach(async () => {
await clearDBCollections();
});
afterAll(async () => {
await server.closeHttpServer();
await closeDB();
});
});

View file

@ -0,0 +1,77 @@
import * as clickhouse from '@/clickhouse';
import {
buildMetricSeries,
clearClickhouseTables,
clearDBCollections,
clearRedis,
closeDB,
getLoggedInAgent,
getServer,
} from '@/fixtures';
describe('metrics router', () => {
const server = getServer();
beforeAll(async () => {
await server.start();
});
afterEach(async () => {
await clearDBCollections();
await clearClickhouseTables();
await clearRedis();
});
afterAll(async () => {
await server.closeHttpServer();
await closeDB();
});
it('GET /metrics/tags', async () => {
const now = Date.now();
await clickhouse.bulkInsertTeamMetricStream(
buildMetricSeries({
name: 'test.cpu',
tags: { host: 'host1', foo: 'bar' },
data_type: clickhouse.MetricsDataType.Gauge,
is_monotonic: false,
is_delta: false,
unit: 'Percent',
points: [{ value: 1, timestamp: now }],
}),
);
await clickhouse.bulkInsertTeamMetricStream(
buildMetricSeries({
name: 'test.cpu',
tags: { host: 'host2', foo2: 'bar2' },
data_type: clickhouse.MetricsDataType.Gauge,
is_monotonic: false,
is_delta: false,
unit: 'Percent',
points: [{ value: 1, timestamp: now }],
}),
);
const { agent } = await getLoggedInAgent(server);
const results = await agent.get('/metrics/tags').expect(200);
expect(results.body.data).toEqual([
{
is_delta: false,
is_monotonic: false,
unit: 'Percent',
data_type: 'Gauge',
name: 'test.cpu - Gauge',
tags: [
{
foo2: 'bar2',
host: 'host2',
},
{
foo: 'bar',
host: 'host1',
},
],
},
]);
});
});

View file

@ -1,9 +1,11 @@
import opentelemetry, { SpanStatusCode } from '@opentelemetry/api';
import express from 'express';
import ms from 'ms';
import { z } from 'zod';
import { validateRequest } from 'zod-express-middleware';
import * as clickhouse from '@/clickhouse';
import { SimpleCache } from '@/utils/redis';
const router = express.Router();
@ -13,7 +15,19 @@ router.get('/tags', async (req, res, next) => {
if (teamId == null) {
return res.sendStatus(403);
}
res.json(await clickhouse.getMetricsTags(teamId.toString()));
const nowInMs = Date.now();
const simpleCache = new SimpleCache<
Awaited<ReturnType<typeof clickhouse.getMetricsTags>>
>(`metrics-tags-${teamId}`, ms('10m'), () =>
clickhouse.getMetricsTags({
// FIXME: fix it 5 days ago for now
startTime: nowInMs - ms('5d'),
endTime: nowInMs,
teamId: teamId.toString(),
}),
);
res.json(await simpleCache.get());
} catch (e) {
const span = opentelemetry.trace.getActiveSpan();
span?.recordException(e as Error);

View file

@ -66,7 +66,7 @@ describe('external api v1', () => {
},
],
rows: 1,
});
} as any);
const { agent, user } = await getLoggedInAgent(server);
const resp = await agent
.get(`/api/v1/metrics/tags`)

View file

@ -10,6 +10,7 @@ import { getTeam } from '@/controllers/team';
import { validateUserAccessKey } from '@/middleware/auth';
import { Api400Error, Api403Error } from '@/utils/errors';
import rateLimiter from '@/utils/rateLimiter';
import { SimpleCache } from '@/utils/redis';
const router = express.Router();
@ -154,7 +155,19 @@ router.get(
if (teamId == null) {
throw new Api403Error('Forbidden');
}
const tags = await clickhouse.getMetricsTags(teamId.toString());
const nowInMs = Date.now();
const simpleCache = new SimpleCache<
Awaited<ReturnType<typeof clickhouse.getMetricsTags>>
>(`metrics-tags-${teamId}`, ms('10m'), () =>
clickhouse.getMetricsTags({
// FIXME: fix it 5 days ago for now
startTime: nowInMs - ms('5d'),
endTime: nowInMs,
teamId: teamId.toString(),
}),
);
const tags = await simpleCache.get();
res.json({
data: tags.data.map(tag => ({
// FIXME: unify the return type of both internal and external APIs

View file

@ -12,6 +12,34 @@ client.on('error', (err: any) => {
logger.error('Redis error: ', serializeError(err));
});
class SimpleCache<T> {
constructor(
private readonly key: string,
private readonly ttlInMs: number,
private readonly fetcher: () => Promise<T>,
) {}
async get(): Promise<T> {
const cached = await client.get(this.key);
if (cached != null) {
logger.info({
message: 'SimpleCache: cache hit',
key: this.key,
});
return JSON.parse(cached);
}
logger.info({
message: 'SimpleCache: cache miss',
key: this.key,
});
const result = await this.fetcher();
await client.set(this.key, JSON.stringify(result), {
PX: this.ttlInMs,
});
return result;
}
}
export default client;
export { client as redisClient };
export { client as redisClient, SimpleCache };