Multi-Tier caching in the tokens service (#6129)

This commit is contained in:
Kamil Kisiela 2024-12-19 13:35:23 +01:00 committed by GitHub
parent d1e5f03937
commit 53e0a20ac6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 522 additions and 448 deletions

View file

@ -318,7 +318,7 @@ export interface Token {
project: string;
organization: string;
date: string;
lastUsedAt: string;
lastUsedAt: string | null;
scopes: readonly string[];
}

View file

@ -2,6 +2,20 @@ import { Interceptor, sql } from 'slonik';
import { getPool, toDate, tokens } from './db';
import type { Slonik } from './shared';
function transformToken(row: tokens) {
return {
token: row.token,
tokenAlias: row.token_alias,
name: row.name,
date: row.created_at as unknown as string,
lastUsedAt: row.last_used_at as unknown as string | null,
organization: row.organization_id,
project: row.project_id,
target: row.target_id,
scopes: row.scopes || [],
};
}
export async function createTokenStorage(
connection: string,
maximumPoolSize: number,
@ -33,10 +47,10 @@ export async function createTokenStorage(
`,
);
return result.rows;
return result.rows.map(transformToken);
},
async getToken({ token }: { token: string }) {
return pool.maybeOne<Slonik<tokens>>(
const row = await pool.maybeOne<Slonik<tokens>>(
sql`
SELECT *
FROM tokens
@ -44,8 +58,10 @@ export async function createTokenStorage(
LIMIT 1
`,
);
return row ? transformToken(row) : null;
},
createToken({
async createToken({
token,
tokenAlias,
target,
@ -62,7 +78,7 @@ export async function createTokenStorage(
organization: string;
scopes: readonly string[];
}) {
return pool.one<Slonik<tokens>>(
const row = await pool.one<Slonik<tokens>>(
sql`
INSERT INTO tokens
(name, token, token_alias, target_id, project_id, organization_id, scopes)
@ -73,14 +89,20 @@ export async function createTokenStorage(
)})
RETURNING *
`,
);
)
return transformToken(row);
},
async deleteToken({ token }: { token: string }) {
await pool.query(
sql`
UPDATE tokens SET deleted_at = NOW() WHERE token = ${token}
`,
);
async deleteToken(params: { token: string; postDeletionTransaction: () => Promise<void> }) {
await pool.transaction(async t => {
await t.query(sql`
UPDATE tokens
SET deleted_at = NOW()
WHERE token = ${params.token}
`);
await params.postDeletionTransaction();
});
},
async touchTokens({ tokens }: { tokens: Array<{ token: string; date: Date }> }) {
await pool.query(sql`

View file

@ -8,3 +8,4 @@ REDIS_PORT="6379"
REDIS_PASSWORD=""
PORT=6001
OPENTELEMETRY_COLLECTOR_ENDPOINT="<sync>"
LOG_LEVEL="debug"

View file

@ -17,6 +17,7 @@
"dotenv": "16.4.5",
"fastify": "4.28.1",
"ioredis": "5.4.1",
"lru-cache": "11.0.2",
"ms": "2.1.3",
"p-timeout": "6.1.3",
"pino-pretty": "11.3.0",

View file

@ -5,8 +5,8 @@ import { z } from 'zod';
import { createErrorHandler, handleTRPCError, maskToken, metrics } from '@hive/service-common';
import type { inferRouterInputs, inferRouterOutputs } from '@trpc/server';
import { initTRPC } from '@trpc/server';
import { useCache } from './cache';
import { cacheHits, cacheMisses } from './metrics';
import { recordTokenRead } from './metrics';
import { Storage } from './multi-tier-storage';
const httpRequests = new metrics.Counter({
name: 'tokens_http_requests',
@ -43,7 +43,7 @@ function generateToken() {
export type Context = {
req: FastifyRequest;
errorHandler: ReturnType<typeof createErrorHandler>;
getStorage: ReturnType<typeof useCache>['getStorage'];
storage: Storage;
tokenReadFailuresCache: LruType<string>;
};
@ -72,9 +72,7 @@ export const tokensApiRouter = t.router({
)
.query(async ({ ctx, input }) => {
try {
const storage = await ctx.getStorage();
return await storage.readTarget(input.targetId);
return await ctx.storage.readTarget(input.targetId);
} catch (error) {
ctx.errorHandler('Failed to get tokens of a target', error as Error);
@ -91,8 +89,7 @@ export const tokensApiRouter = t.router({
)
.mutation(async ({ ctx, input }) => {
try {
const storage = await ctx.getStorage();
await storage.invalidateTokens(input.tokens);
await ctx.storage.invalidateTokens(input.tokens);
return true;
} catch (error) {
@ -116,9 +113,8 @@ export const tokensApiRouter = t.router({
.mutation(async ({ ctx, input }) => {
try {
const { target, project, organization, name, scopes } = input;
const storage = await ctx.getStorage();
const token = generateToken();
const result = await storage.writeToken({
const result = await ctx.storage.writeToken({
name,
target,
project,
@ -149,8 +145,7 @@ export const tokensApiRouter = t.router({
.mutation(async ({ ctx, input }) => {
try {
const hashed_token = input.token;
const storage = await ctx.getStorage();
await storage.deleteToken(hashed_token);
await ctx.storage.deleteToken(hashed_token);
return true;
} catch (error) {
@ -175,14 +170,12 @@ export const tokensApiRouter = t.router({
const cachedFailure = ctx.tokenReadFailuresCache.get(hash);
if (cachedFailure) {
cacheHits.inc(1);
throw new Error(cachedFailure);
}
try {
const storage = await ctx.getStorage();
const result = await storage.readToken(hash);
const result = await ctx.storage.readToken(hash, alias);
recordTokenRead(result ? 200 : 404);
// removes the token from the failures cache (in case the value expired)
ctx.tokenReadFailuresCache.delete(hash);
@ -193,8 +186,8 @@ export const tokensApiRouter = t.router({
// set token read as failure
// so we don't try to read it again for next X minutes
ctx.tokenReadFailuresCache.set(hash, (error as Error).message);
cacheMisses.inc(1);
recordTokenRead(500);
throw error;
}
}),

View file

@ -1,290 +0,0 @@
import type { FastifyBaseLogger } from 'fastify';
import type { Redis } from 'ioredis';
import ms from 'ms';
import LRU from 'tiny-lru';
import { traceInlineSync } from '@hive/service-common';
import { ConnectionError } from '@hive/storage';
import { atomic, until, useActionTracker } from './helpers';
import { cacheHits, cacheInvalidations, cacheMisses } from './metrics';
import type { Storage, StorageItem } from './storage';
function generateKey(hashedToken: string) {
return `tokens:cache:${hashedToken}`;
}
interface CacheStorage extends Omit<Storage, 'touchTokens'> {
invalidateTokens(hashedTokens: string[]): Promise<void>;
shouldCacheError(error: unknown): boolean;
}
const TTLSeconds = {
/**
* TTL for tokens that don't exist in the DB.
*/
notFound: 60, // seconds
/**
* TTL for tokens that exist in the DB.
*/
found: 60 * 5, // 5 minutes
/**
* TTL for tokens in the in-memory cache.
* Helps to reduce the traffic that goes to Redis (as we read the token from in-memory cache first) and in case Redis is down.
*/
inMemory: 60, // seconds
};
function useSafeRedis(redis: Redis, logger: FastifyBaseLogger) {
const cache = LRU<string>(1000, TTLSeconds.inMemory * 1000 /* s -> ms */);
// Purge the cache when redis is ready (when it reconnects or when it starts)
redis.on('ready', () => {
logger.info('Redis is ready, purging the in-memory cache');
cache.clear();
});
return {
async get(key: string) {
const cached = cache.get(key);
if (cached) {
return cached;
}
if (redis.status === 'ready') {
return redis.get(key);
}
logger.warn('Redis is not ready, skipping GET');
return cache.get(key);
},
async del(keys: string[]) {
for (const key of keys) {
cache.delete(key);
}
if (redis.status === 'ready') {
if (keys.length > 0) {
await redis.del(...keys);
}
} else {
logger.warn('Redis is not ready, skipping DEL');
}
},
async setex(key: string, ttl: number, value: string) {
cache.set(key, value);
if (redis.status === 'ready') {
await redis.setex(key, ttl, value);
} else {
logger.warn('Redis is not ready, skipping SETEX');
}
},
};
}
// Cache is a wrapper around the storage that adds a cache layer.
// It also handles invalidation of the cache.
// It also handles the "touch" logic to mark tokens as used and update the "lastUsedAt" column in PG.
// Without the cache we would hit the DB for every request, with the cache we hit it only once (until a token is invalidated).
export function useCache(
storagePromise: Promise<Storage>,
redisInstance: Redis,
logger: FastifyBaseLogger,
): {
start(): Promise<void>;
stop(): Promise<void>;
readiness(): Promise<boolean>;
getStorage(): Promise<CacheStorage>;
} {
let started = false;
let cachedStoragePromise: Promise<CacheStorage> | null = null;
function getStorage() {
if (!cachedStoragePromise) {
cachedStoragePromise = create();
}
return cachedStoragePromise;
}
const tracker = useActionTracker();
const redis = useSafeRedis(redisInstance, logger);
async function create() {
const storage = await storagePromise;
const touch = useTokenTouchScheduler(storage, logger);
// When there's a new token or a token was removed we need to invalidate the cache
async function invalidateTokens(hashedTokens: string[]) {
cacheInvalidations.inc(1);
if (hashedTokens.length > 0) {
await redis.del(hashedTokens.map(generateKey));
}
}
// Thanks to the `atomic` function, every call to this function will only be executed once and Promise will be shared.
// This is important because we don't want to make multiple requests to the DB for the same token, at the same time.
const readTokenFromStorage = atomic(async function _readToken(hashedToken: string) {
const item = await storage.readToken(hashedToken);
if (!item) {
// If the token doesn't exist in the DB we still want to cache it for a short period of time to avoid hitting the DB again and again.
await redis.setex(generateKey(hashedToken), TTLSeconds.notFound, JSON.stringify(null));
} else {
await redis.setex(generateKey(hashedToken), TTLSeconds.found, JSON.stringify(item));
}
return item;
});
// Thanks to the `atomic` function, every call to this function will only be executed once and Promise will be shared.
// This is important because we don't want to make multiple requests to Redis for the same token, at the same time.
const readTokenFromRedis = atomic(async function _readToken(
hashedToken: string,
): Promise<StorageItem | null | undefined> {
const item = await redis.get(generateKey(hashedToken));
if (typeof item === 'string') {
return JSON.parse(item);
}
return;
});
const cachedStorage: CacheStorage = {
destroy() {
return storage.destroy();
},
isReady() {
return storage.isReady();
},
shouldCacheError(error) {
return !(error instanceof ConnectionError);
},
invalidateTokens(hashedTokens) {
return invalidateTokens(hashedTokens);
},
readTarget(target) {
return storage.readTarget(target);
},
async readToken(hashedToken) {
const cached = await readTokenFromRedis(hashedToken);
if (typeof cached !== 'undefined') {
cacheHits.inc(1);
// mark as used
touch.schedule(hashedToken);
return cached;
}
cacheMisses.inc(1);
const item = await readTokenFromStorage(hashedToken);
if (!item) {
return null;
}
touch.schedule(hashedToken); // mark as used
return item;
},
writeToken: tracker.wrap(async item => {
logger.debug('Writing token (target=%s)', item.target);
const result = await storage.writeToken(item);
return result;
}),
deleteToken: tracker.wrap(async hashedToken => {
await redis.del([generateKey(hashedToken)]);
return storage.deleteToken(hashedToken);
}),
};
started = true;
return cachedStorage;
}
async function start() {
await getStorage();
}
async function stop() {
logger.info('Started Tokens shutdown...');
started = false;
// Wait for all the pending operations to finish
await until(tracker.idle, 10_000).catch(error => {
logger.error('Failed to wait for tokens being idle', error);
});
if (cachedStoragePromise) {
await (await cachedStoragePromise).destroy();
}
// Wait for Redis to finish all the pending operations
await redisInstance.quit();
process.exit(0);
}
async function readiness() {
return (
started &&
(redisInstance.status === 'ready' || redisInstance.status === 'reconnecting') &&
(await (await getStorage()).isReady())
);
}
return {
start,
stop,
readiness,
getStorage,
};
}
function useTokenTouchScheduler(storage: Storage, logger: FastifyBaseLogger) {
const scheduledTokens = new Map<string, Date>();
/**
* Mark token as used
*/
function schedule(hashedToken: string): void {
const now = new Date();
scheduledTokens.set(hashedToken, now);
}
// updated every 10m
const interval = setInterval(
traceInlineSync('Touch Tokens', {}, () => {
if (!scheduledTokens.size) {
return;
}
const tokens = Array.from(scheduledTokens.entries()).map(([token, date]) => ({
token,
date,
}));
scheduledTokens.clear();
logger.debug(`Touch ${tokens.length} tokens`);
storage.touchTokens(tokens).catch(error => {
logger.error(error);
});
}),
ms('60s'),
);
function dispose() {
clearInterval(interval);
}
return {
schedule,
dispose,
};
}

View file

@ -1,24 +1,41 @@
import pTimeout from 'p-timeout';
const requestsInFlight = new Map<string, Promise<any>>();
const atomicPromisesInFlight = new Map<string, Promise<any>>();
export function atomic<A extends string, R>(fn: (arg: A) => Promise<R>): (arg: A) => Promise<R> {
return function atomicWrapper(arg) {
if (requestsInFlight.has(arg)) {
return requestsInFlight.get(arg)!;
/**
* This function is used to share execution across multiple calls of the same function.
* It's useful when you have a function that can be called multiple times in a short period of time,
* but you want to execute it only once.
*
* Once the execution is finished, the function will be available for the next call.
*
* @param fn - Function that should be executed only once per its execution period.
* @returns Function that will execute the original function only once.
*/
export function atomic<R>(fn: () => Promise<R>): () => Promise<R> {
// Generate a unique string for each call of `atomic` function to prevent collisions.
const uniqueId = Math.random().toString(36).slice(2) + Date.now().toString(36);
return function atomicWrapper() {
const existing = atomicPromisesInFlight.get(uniqueId);
if (existing) {
return existing;
}
const promise = fn(arg);
requestsInFlight.set(arg, promise);
const promise = fn();
atomicPromisesInFlight.set(uniqueId, promise);
return promise.finally(() => {
requestsInFlight.delete(arg);
atomicPromisesInFlight.delete(uniqueId);
});
};
}
// It's used to track the number of requests that are in flight.
// This is important because we don't want to kill the pod when `DELETE` or `POST` action is in progress.
/**
* It's used to track the number of requests that are in flight.
* This is important because we don't want to kill the pod when
* state mutating requests are in progress.
*/
export function useActionTracker() {
let actionsInProgress = 0;
@ -43,11 +60,17 @@ export function useActionTracker() {
};
}
export function until(fn: () => boolean, timeout: number): Promise<void> {
/**
* This function is used to wait until the condition is met or the timeout is reached.
*
* @param conditionFn - function to check the condition
* @param timeout - timeout in milliseconds
*/
export function until(conditionFn: () => boolean, timeout: number): Promise<void> {
return pTimeout(
new Promise(resolve => {
const interval = setInterval(() => {
if (fn()) {
if (conditionFn()) {
clearInterval(interval);
resolve();
}

View file

@ -18,9 +18,8 @@ import {
} from '@hive/service-common';
import * as Sentry from '@sentry/node';
import { Context, tokensApiRouter } from './api';
import { useCache } from './cache';
import { env } from './environment';
import { createStorage } from './storage';
import { createStorage } from './multi-tier-storage';
export async function main() {
let tracing: TracingInstance | undefined;
@ -83,10 +82,11 @@ export async function main() {
tls: env.redis.tlsEnabled ? {} : undefined,
});
const { start, stop, readiness, getStorage } = useCache(
createStorage(env.postgres, tracing ? [tracing.instrumentSlonik()] : []),
const storage = await createStorage(
env.postgres,
redis,
server.log,
tracing ? [tracing.instrumentSlonik()] : [],
);
const stopHeartbeats = env.heartbeat
@ -95,14 +95,14 @@ export async function main() {
endpoint: env.heartbeat.endpoint,
intervalInMS: 20_000,
onError: e => server.log.error(e, `Heartbeat failed with error`),
isReady: readiness,
isReady: storage.isReady,
})
: startHeartbeats({ enabled: false });
async function shutdown() {
stopHeartbeats();
await server.close();
await stop();
await storage.close();
}
try {
@ -146,7 +146,7 @@ export async function main() {
return {
req,
errorHandler,
getStorage,
storage,
tokenReadFailuresCache,
};
},
@ -164,7 +164,7 @@ export async function main() {
method: ['GET', 'HEAD'],
url: '/_readiness',
async handler(_, res) {
const isReady = await readiness();
const isReady = await storage.isReady();
reportReadiness(isReady);
void res.status(isReady ? 200 : 400).send();
},
@ -178,8 +178,6 @@ export async function main() {
port: env.http.port,
host: '::',
});
await start();
} catch (error) {
server.log.fatal(error);
Sentry.captureException(error, {

View file

@ -1,16 +1,32 @@
import type { LRUCache } from 'lru-cache';
import { metrics } from '@hive/service-common';
export const cacheHits = new metrics.Counter({
name: 'tokens_cache_hits',
help: 'Number of cache hits',
const tokenReads = new metrics.Counter({
name: 'tokens_reads',
help: 'Number of token reads',
labelNames: ['status'],
});
export const cacheMisses = new metrics.Counter({
name: 'tokens_cache_misses',
help: 'Number of cache misses',
const cacheReads = new metrics.Counter({
name: 'tokens_cache_reads',
help: 'Number of cache reads',
labelNames: ['status'],
});
export const cacheInvalidations = new metrics.Counter({
name: 'tokens_cache_invalidations',
help: 'Number of cache invalidations',
const cacheFills = new metrics.Counter({
name: 'tokens_cache_fills',
help: 'Number of cache fills',
labelNames: ['source'],
});
export function recordCacheRead(status: NonNullable<LRUCache.Status<unknown>['fetch']>) {
cacheReads.inc({ status });
}
export function recordCacheFill(source: 'db' | 'redis-fresh' | 'redis-stale') {
cacheFills.inc({ source });
}
export function recordTokenRead(status: 200 | 500 | 404) {
tokenReads.inc({ status });
}

View file

@ -0,0 +1,380 @@
import type { FastifyBaseLogger } from 'fastify';
import type { Redis } from 'ioredis';
import { LRUCache } from 'lru-cache';
import ms from 'ms';
import { createConnectionString, createTokenStorage, Interceptor } from '@hive/storage';
import { captureException, captureMessage } from '@sentry/node';
import { atomic, until, useActionTracker } from './helpers';
import { recordCacheFill, recordCacheRead } from './metrics';
type CacheEntry = StorageItem | 'not-found';
interface StorageItem {
token: string;
name: string;
tokenAlias: string;
date: string;
lastUsedAt: string | null;
organization: string;
project: string;
target: string;
scopes: readonly string[];
}
export interface Storage {
close(): Promise<void>;
isReady(): Promise<boolean>;
readTarget(targetId: string): Promise<StorageItem[]>;
readToken(hashedToken: string, maskedToken: string): Promise<StorageItem | null>;
writeToken(item: Omit<StorageItem, 'date' | 'lastUsedAt'>): Promise<StorageItem>;
deleteToken(hashedToken: string): Promise<void>;
invalidateTokens(hashedTokens: string[]): Promise<void>;
}
const cacheConfig = {
inMemory: {
maxEntries: 1000,
ttlInMs: ms('5m'),
},
redis: {
ttlInMs: ms('1h'),
staleTtlInMs: ms('24h'),
},
tokenTouchIntervalInMs: ms('60s'),
} as const;
export async function createStorage(
config: Parameters<typeof createConnectionString>[0],
redis: Redis,
serverLogger: FastifyBaseLogger,
additionalInterceptors: Interceptor[],
): Promise<Storage> {
const tracker = useActionTracker();
const connectionString = createConnectionString(config);
const db = await createTokenStorage(connectionString, 5, additionalInterceptors);
const touch = tokenTouchScheduler(serverLogger, async tokens => {
try {
await db.touchTokens({ tokens });
} catch (error) {
serverLogger.error('Failed to touch tokens', error);
}
});
const cache = new LRUCache<
string,
CacheEntry,
{
maskedToken: string;
}
>({
max: cacheConfig.inMemory.maxEntries,
ttl: cacheConfig.inMemory.ttlInMs,
// Allow to return stale data if the fetchMethod is slow
allowStale: false,
// Don't delete the cache entry if the fetchMethod fails
noDeleteOnFetchRejection: true,
// Allow to return stale data if the fetchMethod fails.
// The rejection reason won't be logged though.
allowStaleOnFetchRejection: true,
// If a cache entry is stale or missing, this method is called
// to fill the cache with fresh data.
// This method is called only once per cache key,
// even if multiple requests are waiting for it.
async fetchMethod(hashedToken, _staleEntry, { context }) {
// Nothing fresh in the in-memory cache, let's check Redis
const logger = serverLogger.child({ maskedToken: context.maskedToken });
let redisData: string | null = null;
if (redis.status === 'ready') {
redisData = await redis.get(generateRedisKey(hashedToken)).catch(error => {
handleStorageError({
logger,
error,
logMsg: 'Failed to read token from Redis',
tier: 'redis',
action: 'fetch',
});
return null;
});
} else {
// If redis is not ready, we fallback to the Db.
// This will put more load on the Db, but it won't break the usage reporting.
// It's a temporary state, as fetched value will be written to in-memory cache,
// and to Redis - when it's back online.
logger.warn('Redis is not ready, falling back to Db');
captureMessage('Redis was not available as secondary cache', 'warning');
}
if (redisData) {
recordCacheFill('redis-fresh');
logger.debug('Returning fresh data from Redis');
return JSON.parse(redisData) as CacheEntry;
}
try {
// Nothing in Redis, let's check the DB
const dbResult = await db.getToken({ token: hashedToken });
const cacheEntry = dbResult ?? 'not-found';
recordCacheFill('db');
// Write to Redis, so the next time we can read it from there
await setInRedis(redis, hashedToken, cacheEntry).catch(error => {
handleStorageError({
logger,
error,
logMsg: 'Failed to write token to Redis, but it was written to the in-memory cache',
tier: 'redis',
action: 'set',
});
});
} catch (error) {
// If the DB is down, we log the error, and we throw exception.
// This will cause the cache to return stale data.
// This may have a performance impact (more calls to Db), but it won't break the system.
handleStorageError({
logger,
error,
logMsg: 'Failed to read token from the Db',
tier: 'db',
action: 'fetch',
});
if (redis.status !== 'ready') {
logger.warn('Redis is not ready, cannot read stale data from it');
throw error;
}
const staleRedisData = await redis.get(generateStaleRedisKey(hashedToken)).catch(error => {
handleStorageError({
logger,
error,
logMsg: 'Failed to read token from Redis (stale)',
tier: 'redis-stale',
action: 'fetch',
});
return null;
});
if (!staleRedisData) {
logger.debug('No stale data in Redis');
throw error;
}
recordCacheFill('redis-stale');
logger.debug('Returning stale data from Redis');
// Stale data will be cached in the in-memory cache only, as it's not fresh.
return JSON.parse(staleRedisData) as CacheEntry;
}
throw new Error('Unexpected code path');
},
});
return {
async close() {
// Wait for all the pending operations to finish
await until(tracker.idle, 10_000).catch(error => {
serverLogger.error('Failed to wait for tokens being idle', error);
});
await db.destroy();
// Wait for Redis to finish all the pending operations
await redis.quit();
touch.dispose();
},
isReady: atomic(async () => {
if (redis.status === 'ready' || redis.status === 'reconnecting') {
return db.isReady();
}
return false;
}),
async readTarget(target) {
return db.getTokens({ target });
},
async readToken(hashedToken, maskedToken) {
const status: LRUCache.Status<CacheEntry> = {};
const context = { maskedToken, source: 'in-memory' };
const data = await cache.fetch(hashedToken, {
context,
status,
});
if (status.fetch) {
recordCacheRead(status.fetch);
} else {
serverLogger.warn('Status of the fetch is missing');
}
if (!data) {
// Looked up in all layers, and the token is not found
return null;
}
if (data === 'not-found') {
return null;
}
touch.schedule(hashedToken);
return data;
},
writeToken: tracker.wrap(async item => {
const result = await db.createToken(item);
// We don't want to write to in-memory cache,
// as the token may not be used immediately, or at all.
// We write to Redis, so in case the token is used,
// we reuse it for the in-memory cache, without hitting the DB.
const hashedToken = result.token;
const cacheEntry = result;
// Write to Redis gracefully. If it fails, we log the error, but we don't throw.
// The token won't be in Redis cache, but it will be possible to retrieve it from the DB.
// It will affect performance slightly, but it won't break the system.
try {
await setInRedis(redis, hashedToken, cacheEntry);
} catch (error) {
handleStorageError({
logger: serverLogger,
error,
logMsg: 'Failed to write token to Redis, but it was created in the Db',
tier: 'redis',
action: 'set',
});
}
return cacheEntry;
}),
deleteToken: tracker.wrap(async hashedToken => {
await db.deleteToken({
token: hashedToken,
async postDeletionTransaction() {
// delete from Redis when the token is deleted from the DB
await redis.del([generateRedisKey(hashedToken), generateStaleRedisKey(hashedToken)]);
// only then delete from the in-memory cache.
// The other replicas will purge the token from
// their own in-memory caches on their own pace (ttl)
cache.delete(hashedToken);
},
});
}),
invalidateTokens: tracker.wrap(async hashedTokens => {
if (hashedTokens.length === 0) {
return;
}
await redis.del(
hashedTokens.map(generateRedisKey).concat(hashedTokens.map(generateStaleRedisKey)),
);
for (const hashedToken of hashedTokens) {
cache.delete(hashedToken);
}
}),
};
}
function generateRedisKey(hashedToken: string) {
// bump the version when the cache format changes
return `tokens:cache:v2:${hashedToken}`;
}
function generateStaleRedisKey(hashedToken: string) {
// bump the version when the cache format changes
return `tokens:stale-cache:v2:${hashedToken}`;
}
async function setInRedis(redis: Redis, hashedToken: string, cacheEntry: CacheEntry) {
if (redis.status !== 'ready') {
return;
}
const stringifiedCacheEntry = JSON.stringify(cacheEntry);
const results = await redis
.pipeline()
.setex(
generateRedisKey(hashedToken),
Math.ceil(cacheConfig.redis.ttlInMs / 1000),
stringifiedCacheEntry,
)
.setex(
generateStaleRedisKey(hashedToken),
Math.ceil(cacheConfig.redis.staleTtlInMs / 1000),
stringifiedCacheEntry,
)
.exec();
if (!results?.length) {
return;
}
const errors: Error[] = [];
for (const [error] of results) {
if (error instanceof Error) {
errors.push(error);
}
}
if (errors.length) {
throw new Error(errors.map(e => e.message).join('\n'), {
cause: {
message: 'SETEX Pipeline Failure',
errors,
},
});
}
}
function tokenTouchScheduler(
logger: FastifyBaseLogger,
onTouch: (tokens: Array<{ token: string; date: Date }>) => Promise<void>,
) {
const scheduledTokens = new Map<string, Date>();
/**
* Mark token as used
*/
function schedule(hashedToken: string): void {
const now = new Date();
scheduledTokens.set(hashedToken, now);
}
const interval = setInterval(() => {
if (!scheduledTokens.size) {
return;
}
const tokens = Array.from(scheduledTokens.entries()).map(([token, date]) => ({
token,
date,
}));
scheduledTokens.clear();
logger.debug(`Touch ${tokens.length} tokens`);
void onTouch(tokens);
}, cacheConfig.tokenTouchIntervalInMs);
function dispose() {
clearInterval(interval);
}
return {
schedule,
dispose,
};
}
function handleStorageError(params: {
logger: FastifyBaseLogger;
error: unknown;
logMsg: string;
tier: 'redis' | 'redis-stale' | 'db';
action: 'fetch' | 'set';
}) {
params.logger.error(params.logMsg, params.error);
captureException(params.error, {
tags: {
storageTier: params.tier,
storageAction: params.action,
},
});
}

View file

@ -1,79 +0,0 @@
import { createConnectionString, createTokenStorage, Interceptor, tokens } from '@hive/storage';
export interface StorageItem {
token: string;
name: string;
tokenAlias: string;
date: string;
lastUsedAt: string;
organization: string;
project: string;
target: string;
scopes: readonly string[];
}
export interface Storage {
destroy(): Promise<void>;
isReady(): Promise<boolean>;
readTarget(targetId: string): Promise<StorageItem[]>;
readToken(hashedToken: string): Promise<StorageItem | null>;
writeToken(item: Omit<StorageItem, 'date' | 'lastUsedAt'>): Promise<StorageItem>;
deleteToken(hashedToken: string): Promise<void>;
touchTokens(tokens: Array<{ token: string; date: Date }>): Promise<void>;
}
export async function createStorage(
config: Parameters<typeof createConnectionString>[0],
additionalInterceptors: Interceptor[],
): Promise<Storage> {
const connectionString = createConnectionString(config);
const db = await createTokenStorage(connectionString, 5, additionalInterceptors);
function transformToken(item: tokens): StorageItem {
return {
token: item.token,
tokenAlias: item.token_alias,
name: item.name,
date: item.created_at as any,
lastUsedAt: item.last_used_at as any,
organization: item.organization_id,
project: item.project_id,
target: item.target_id,
scopes: item.scopes || [],
};
}
return {
destroy() {
return db.destroy();
},
isReady() {
return db.isReady();
},
async readTarget(target) {
const tokens = await db.getTokens({ target });
return tokens.map(transformToken);
},
async readToken(hashed_token) {
const result = await db.getToken({ token: hashed_token });
if (!result) {
return null;
}
return transformToken(result);
},
async writeToken(item) {
const result = await db.createToken(item);
return transformToken(result);
},
async deleteToken(hashed_token) {
return db.deleteToken({ token: hashed_token });
},
touchTokens(tokens) {
return db.touchTokens({ tokens });
},
};
}

View file

@ -1450,6 +1450,9 @@ importers:
ioredis:
specifier: 5.4.1
version: 5.4.1
lru-cache:
specifier: 11.0.2
version: 11.0.2
ms:
specifier: 2.1.3
version: 2.1.3
@ -11926,6 +11929,10 @@ packages:
resolution: {integrity: sha512-2bIM8x+VAf6JT4bKAljS1qUWgMsqZRPGJS6FSahIMPVvctcNhyVp7AJu7quxOW9jwkryBReKZY5tY5JYv2n/7Q==}
engines: {node: 14 || >=16.14}
lru-cache@11.0.2:
resolution: {integrity: sha512-123qHRfJBmo2jXDbo/a5YOQrJoHF/GNQTLzQ5+IdK5pWpceK17yRc6ozlWd25FxvGKQbIUs91fDFkXmDHTKcyA==}
engines: {node: 20 || >=22}
lru-cache@4.1.5:
resolution: {integrity: sha512-sWZlbEP2OsHNkXrMl5GYk/jKk70MBng6UU4YI/qGDYbgf6YbP4EvmqISbXCoJiRKs+1bSpFHVgQxvJ17F2li5g==}
@ -16228,8 +16235,8 @@ snapshots:
dependencies:
'@aws-crypto/sha256-browser': 3.0.0
'@aws-crypto/sha256-js': 3.0.0
'@aws-sdk/client-sso-oidc': 3.596.0(@aws-sdk/client-sts@3.596.0)
'@aws-sdk/client-sts': 3.596.0
'@aws-sdk/client-sso-oidc': 3.596.0
'@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)
'@aws-sdk/core': 3.592.0
'@aws-sdk/credential-provider-node': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0)
'@aws-sdk/middleware-host-header': 3.577.0
@ -16336,11 +16343,11 @@ snapshots:
transitivePeerDependencies:
- aws-crt
'@aws-sdk/client-sso-oidc@3.596.0(@aws-sdk/client-sts@3.596.0)':
'@aws-sdk/client-sso-oidc@3.596.0':
dependencies:
'@aws-crypto/sha256-browser': 3.0.0
'@aws-crypto/sha256-js': 3.0.0
'@aws-sdk/client-sts': 3.596.0
'@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)
'@aws-sdk/core': 3.592.0
'@aws-sdk/credential-provider-node': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0)
'@aws-sdk/middleware-host-header': 3.577.0
@ -16379,7 +16386,6 @@ snapshots:
'@smithy/util-utf8': 3.0.0
tslib: 2.8.1
transitivePeerDependencies:
- '@aws-sdk/client-sts'
- aws-crt
'@aws-sdk/client-sso-oidc@3.693.0(@aws-sdk/client-sts@3.693.0)':
@ -16513,11 +16519,11 @@ snapshots:
transitivePeerDependencies:
- aws-crt
'@aws-sdk/client-sts@3.596.0':
'@aws-sdk/client-sts@3.596.0(@aws-sdk/client-sso-oidc@3.596.0)':
dependencies:
'@aws-crypto/sha256-browser': 3.0.0
'@aws-crypto/sha256-js': 3.0.0
'@aws-sdk/client-sso-oidc': 3.596.0(@aws-sdk/client-sts@3.596.0)
'@aws-sdk/client-sso-oidc': 3.596.0
'@aws-sdk/core': 3.592.0
'@aws-sdk/credential-provider-node': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0)
'@aws-sdk/middleware-host-header': 3.577.0
@ -16556,6 +16562,7 @@ snapshots:
'@smithy/util-utf8': 3.0.0
tslib: 2.8.1
transitivePeerDependencies:
- '@aws-sdk/client-sso-oidc'
- aws-crt
'@aws-sdk/client-sts@3.693.0':
@ -16669,7 +16676,7 @@ snapshots:
'@aws-sdk/credential-provider-ini@3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0)':
dependencies:
'@aws-sdk/client-sts': 3.596.0
'@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)
'@aws-sdk/credential-provider-env': 3.587.0
'@aws-sdk/credential-provider-http': 3.596.0
'@aws-sdk/credential-provider-process': 3.587.0
@ -16788,7 +16795,7 @@ snapshots:
'@aws-sdk/credential-provider-web-identity@3.587.0(@aws-sdk/client-sts@3.596.0)':
dependencies:
'@aws-sdk/client-sts': 3.596.0
'@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)
'@aws-sdk/types': 3.577.0
'@smithy/property-provider': 3.1.8
'@smithy/types': 3.6.0
@ -16963,7 +16970,7 @@ snapshots:
'@aws-sdk/token-providers@3.587.0(@aws-sdk/client-sso-oidc@3.596.0)':
dependencies:
'@aws-sdk/client-sso-oidc': 3.596.0(@aws-sdk/client-sts@3.596.0)
'@aws-sdk/client-sso-oidc': 3.596.0
'@aws-sdk/types': 3.577.0
'@smithy/property-provider': 3.1.8
'@smithy/shared-ini-file-loader': 3.1.9
@ -23391,8 +23398,8 @@ snapshots:
'@typescript-eslint/parser': 7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3)
eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)
eslint-config-prettier: 9.1.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-import-resolver-typescript: 3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-plugin-import: 2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-import-resolver-typescript: 3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-plugin-import: 2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-plugin-jsonc: 2.11.1(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-plugin-jsx-a11y: 6.8.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-plugin-mdx: 3.0.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
@ -26100,13 +26107,13 @@ snapshots:
transitivePeerDependencies:
- supports-color
eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)):
eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)):
dependencies:
debug: 4.3.7(supports-color@8.1.1)
enhanced-resolve: 5.17.1
eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)
eslint-module-utils: 2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-plugin-import: 2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-module-utils: 2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-plugin-import: 2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
fast-glob: 3.3.2
get-tsconfig: 4.7.5
is-core-module: 2.13.1
@ -26137,14 +26144,14 @@ snapshots:
transitivePeerDependencies:
- supports-color
eslint-module-utils@2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)):
eslint-module-utils@2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)):
dependencies:
debug: 3.2.7(supports-color@8.1.1)
optionalDependencies:
'@typescript-eslint/parser': 7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3)
eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)
eslint-import-resolver-node: 0.3.9
eslint-import-resolver-typescript: 3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-import-resolver-typescript: 3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
transitivePeerDependencies:
- supports-color
@ -26160,7 +26167,7 @@ snapshots:
eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)
eslint-compat-utils: 0.1.2(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)):
eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)):
dependencies:
array-includes: 3.1.7
array.prototype.findlastindex: 1.2.3
@ -26170,7 +26177,7 @@ snapshots:
doctrine: 2.1.0
eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)
eslint-import-resolver-node: 0.3.9
eslint-module-utils: 2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
eslint-module-utils: 2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))
hasown: 2.0.0
is-core-module: 2.13.1
is-glob: 4.0.3
@ -28642,6 +28649,8 @@ snapshots:
lru-cache@10.2.0: {}
lru-cache@11.0.2: {}
lru-cache@4.1.5:
dependencies:
pseudomap: 1.0.2