Refactor rate-limit to support multi-targets orgs (#126)

This commit is contained in:
Dotan Simha 2022-05-29 14:43:07 +03:00 committed by GitHub
parent cc7ddfb200
commit 42e5273542
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 109 additions and 91 deletions

View file

@ -299,6 +299,7 @@ export interface Storage {
getGetOrganizationsAndTargetPairsWithLimitInfo(): Promise<
ReadonlyArray<{
organization: string;
org_name: string;
target: string;
limit_operations_monthly: number;
limit_schema_push_monthly: number;

View file

@ -32,11 +32,7 @@ export const rateLimitApiRouter = trpc
.query('checkRateLimit', {
input: VALIDATION,
async resolve({ ctx, input }) {
ctx.logger.info(`checkRateLimit called, input is: %o`, input);
const result = await ctx.checkLimit(input);
ctx.logger.info(`checkRateLimit done, result is: %o`, input);
return result;
return ctx.checkLimit(input);
},
});

View file

@ -11,14 +11,30 @@ import { rateLimitOperationsEventOrg, rateLimitSchemaEventOrg } from './metrics'
export type RateLimitCheckResponse = {
limited: boolean;
quota?: number;
current?: number;
quota: number;
current: number;
};
const UNKNOWN_RATE_LIMIT_OBJ: RateLimitCheckResponse = {
current: -1,
quota: -1,
limited: false,
};
export type CachedRateLimitInfo = {
orgName: string;
schemaPushes: RateLimitCheckResponse;
operations: RateLimitCheckResponse;
retentionInDays: number;
};
const DEFAULT_RETENTION = 30; // days
export type Limiter = ReturnType<typeof createRateLimiter>;
type OrganizationId = string;
type TargetId = string;
export function createRateLimiter(config: {
logger: FastifyLoggerInstance;
rateLimitConfig: {
@ -40,12 +56,9 @@ export function createRateLimiter(config: {
const postgres$ = createPostgreSQLStorage(config.storage.connectionString);
let initialized = false;
let intervalHandle: ReturnType<typeof setInterval> | null = null;
let targetIdToRateLimitStatus = {
orgToTargetIdMap: new Map<string, string>(),
retention: new Map<string, number>(),
operationsReporting: new Map<string, RateLimitCheckResponse>(),
schemaPushes: new Map<string, RateLimitCheckResponse>(),
};
let targetIdToOrgLookup = new Map<TargetId, OrganizationId>();
let cachedResult = new Map<OrganizationId, CachedRateLimitInfo>();
async function fetchAndCalculateUsageInformation() {
const now = new Date();
@ -55,12 +68,6 @@ export function createRateLimiter(config: {
};
config.logger.info(`Calculating rate-limit information based on window: ${window.startTime} -> ${window.endTime}`);
const storage = await postgres$;
const newMap: typeof targetIdToRateLimitStatus = {
orgToTargetIdMap: new Map<string, string>(),
retention: new Map<string, number>(),
operationsReporting: new Map<string, RateLimitCheckResponse>(),
schemaPushes: new Map<string, RateLimitCheckResponse>(),
};
const [records, operations, pushes] = await Promise.all([
storage.getGetOrganizationsAndTargetPairsWithLimitInfo(),
@ -72,53 +79,59 @@ export function createRateLimiter(config: {
logger.debug(`Fetched total of ${Object.keys(operations).length} targets with usage information`);
logger.debug(`Fetched total of ${Object.keys(pushes).length} targets with schema push information`);
for (const record of records) {
newMap.orgToTargetIdMap.set(record.organization, record.target);
const currentOperations = operations[record.target] || 0;
const operationsLimited =
record.limit_operations_monthly === 0 ? false : record.limit_operations_monthly < currentOperations;
const newTargetIdToOrgLookup = new Map<TargetId, OrganizationId>();
const newCachedResult = new Map<OrganizationId, CachedRateLimitInfo>();
newMap.retention.set(record.target, record.limit_retention_days);
for (const pairRecord of records) {
newTargetIdToOrgLookup.set(pairRecord.target, pairRecord.organization);
newMap.operationsReporting.set(record.target, {
current: currentOperations,
quota: record.limit_operations_monthly,
limited: operationsLimited,
});
const currentPushes = pushes[record.target] || 0;
const pushLimited =
record.limit_schema_push_monthly === 0 ? false : record.limit_schema_push_monthly < currentPushes;
newMap.schemaPushes.set(record.target, {
current: currentPushes,
quota: record.limit_schema_push_monthly,
limited: pushLimited,
});
if (operationsLimited) {
rateLimitOperationsEventOrg
.labels({
orgId: record.organization,
})
.inc();
logger.info(
`Target="${record.target}" (org="${record.organization}") is now being rate-limited for operations (${currentOperations}/${record.limit_operations_monthly})`
);
if (!newCachedResult.has(pairRecord.organization)) {
newCachedResult.set(pairRecord.organization, {
orgName: pairRecord.org_name,
operations: {
current: 0,
quota: pairRecord.limit_operations_monthly,
limited: false,
},
schemaPushes: {
current: 0,
quota: pairRecord.limit_schema_push_monthly,
limited: false,
},
retentionInDays: pairRecord.limit_retention_days,
});
}
if (pushLimited) {
rateLimitSchemaEventOrg
.labels({
orgId: record.organization,
})
.inc();
logger.info(
`Target="${record.target}" (org="${record.organization}") is now being rate-limited for schema pushes (${currentPushes}/${record.limit_schema_push_monthly})`
);
}
const orgRecord = newCachedResult.get(pairRecord.organization)!;
orgRecord.operations.current = (orgRecord.operations.current || 0) + (operations[pairRecord.target] || 0);
orgRecord.schemaPushes.current = (orgRecord.schemaPushes.current || 0) + (pushes[pairRecord.target] || 0);
}
targetIdToRateLimitStatus = newMap;
newCachedResult.forEach((orgRecord, orgId) => {
const orgName = orgRecord.orgName;
orgRecord.operations.limited =
orgRecord.operations.quota === 0 ? false : orgRecord.operations.current > orgRecord.operations.quota;
orgRecord.schemaPushes.limited =
orgRecord.schemaPushes.quota === 0 ? false : orgRecord.schemaPushes.current > orgRecord.schemaPushes.quota;
if (orgRecord.operations.limited) {
rateLimitOperationsEventOrg.labels({ orgId, orgName }).inc();
logger.info(
`Organization "${orgName}"/"${orgId}" is now being rate-limited for operations (${orgRecord.operations.current}/${orgRecord.operations.quota})`
);
}
if (orgRecord.schemaPushes.limited) {
rateLimitSchemaEventOrg.labels({ orgId, orgName }).inc();
logger.info(
`Organization "${orgName}"/"${orgId}" is now being rate-limited for schema pushes (${orgRecord.schemaPushes.current}/${orgRecord.schemaPushes.quota})`
);
}
});
cachedResult = newCachedResult;
targetIdToOrgLookup = newTargetIdToOrgLookup;
logger.info(`Built a new rate-limit map: %s`, JSON.stringify(Array.from(newCachedResult.entries())));
}
return {
@ -127,40 +140,44 @@ export function createRateLimiter(config: {
return initialized;
},
getRetention(targetId: string) {
const map = targetIdToRateLimitStatus.retention;
const orgId = targetIdToOrgLookup.get(targetId);
if (map.has(targetId)) {
return map.get(targetId)!;
if (!orgId) {
return DEFAULT_RETENTION;
}
// In case we don't have any knowledge on that target id, to use the default.
return DEFAULT_RETENTION;
const orgData = cachedResult.get(orgId);
if (!orgData) {
return DEFAULT_RETENTION;
}
return orgData.retentionInDays;
},
checkLimit(input: RateLimitInput): RateLimitCheckResponse {
const map =
input.type === 'operations-reporting'
? targetIdToRateLimitStatus.operationsReporting
: targetIdToRateLimitStatus.schemaPushes;
const orgId = input.entityType === 'organization' ? input.id : targetIdToOrgLookup.get(input.id);
const entityId =
input.entityType === 'target' ? input.id : targetIdToRateLimitStatus.orgToTargetIdMap.get(input.id);
if (!entityId) {
if (!orgId) {
logger.warn(
`Failed to resolve/find rate limit information for entityId=${entityId} (type=${input.entityType})`
`Failed to resolve/find rate limit information for entityId=${input.id} (type=${input.entityType})`
);
return {
limited: false,
};
return UNKNOWN_RATE_LIMIT_OBJ;
}
if (map.has(entityId)) {
return map.get(entityId)!;
const orgData = cachedResult.get(orgId);
if (!orgData) {
return UNKNOWN_RATE_LIMIT_OBJ;
}
if (input.type === 'operations-reporting') {
return orgData.operations;
} else if (input.type === 'schema-push') {
return orgData.schemaPushes;
} else {
return UNKNOWN_RATE_LIMIT_OBJ;
}
// In case we don't have any knowledge on that target id, we allow it to run
return {
limited: false,
};
},
async start() {
logger.info(

View file

@ -3,11 +3,11 @@ import { metrics } from '@hive/service-common';
export const rateLimitSchemaEventOrg = new metrics.Counter({
name: 'rate_limited_schema_events_count',
help: 'Rate limit events per org id, for schema pushses.',
labelNames: ['orgId'],
labelNames: ['orgId', 'orgName'],
});
export const rateLimitOperationsEventOrg = new metrics.Counter({
name: 'rate_limited_operations_events_count',
help: 'Rate limit events per org id, for operations.',
labelNames: ['orgId'],
labelNames: ['orgId', 'orgName'],
});

View file

@ -1590,6 +1590,7 @@ export async function createStorage(connection: string): Promise<Storage> {
const results = await pool.query<
Slonik<{
organization: string;
org_name: string;
target: string;
limit_operations_monthly: number;
limit_schema_push_monthly: number;
@ -1599,6 +1600,7 @@ export async function createStorage(connection: string): Promise<Storage> {
sql`
SELECT
o.id as organization,
o.name as org_name,
o.limit_operations_monthly,
o.limit_schema_push_monthly,
o.limit_retention_days,

View file

@ -9,6 +9,7 @@ import {
httpRequestsWithNonExistingToken,
httpRequestsWithNoAccess,
collectLatency,
droppedReports,
} from './metrics';
import type { IncomingLegacyReport, IncomingReport } from './types';
import { createUsageRateLimit } from './rate-limit';
@ -112,7 +113,7 @@ async function main() {
entityType: 'target',
})
) {
// TODO: We should trigger a call to update the KV in the WAF in case we want to make sure token is being blocked?
droppedReports.labels({ targetId: tokenInfo.target }).inc();
res.status(429).send();
return;

View file

@ -40,6 +40,12 @@ export const totalReports = new metrics.Counter({
help: 'Number of reports received by usage service',
});
export const droppedReports = new metrics.Counter({
name: 'usage_rate_limit_dropped',
help: 'Number of reports dropped by usage service due to rate-limit',
labelNames: ['targetId'],
});
export const totalLegacyReports = new metrics.Counter({
name: 'usage_reports_legacy_format_total',
help: 'Number of legacy-format reports received by usage service',

View file

@ -4,11 +4,6 @@ fragment OrganizationFields on Organization {
name
type
plan
rateLimit {
operations
schemaPushes
retentionInDays
}
me {
...MemberFields
}