mirror of
https://github.com/graphql-hive/console
synced 2026-05-24 01:28:32 +00:00
feat: schema sdl store purging with schema checks (#3258)
Co-authored-by: Kamil Kisiela <kamil.kisiela@gmail.com>
This commit is contained in:
parent
79c0b4139e
commit
5f5294c419
6 changed files with 141 additions and 47 deletions
|
|
@ -240,13 +240,28 @@ await describe('migration: schema-checks-dedup', async () => {
|
|||
sql`SELECT count(*) as total FROM schema_checks`,
|
||||
);
|
||||
assert.strictEqual(countSchemaChecks, 4);
|
||||
await db.query(sql`DELETE FROM schema_checks WHERE id = ${newSchemaCheck.id}`);
|
||||
const expiresAt = new Date();
|
||||
await db.query(sql`
|
||||
UPDATE
|
||||
schema_checks
|
||||
SET
|
||||
expires_at = ${expiresAt.toISOString()}
|
||||
WHERE
|
||||
id = ${newSchemaCheck.id}
|
||||
`);
|
||||
|
||||
// Purge unused SDLs from sdl_store
|
||||
const result = await storage.purgeExpiredSchemaChecks({
|
||||
expiresAt,
|
||||
});
|
||||
|
||||
assert.strictEqual(result.deletedSchemaCheckCount, 1);
|
||||
assert.strictEqual(result.deletedSdlStoreCount, 3);
|
||||
|
||||
countSchemaChecks = await db.oneFirst<number>(
|
||||
sql`SELECT count(*) as total FROM schema_checks`,
|
||||
);
|
||||
assert.strictEqual(countSchemaChecks, 3);
|
||||
// Purge unused SDLs from sdl_store
|
||||
await storage.purgeUnusedSchemasInStore();
|
||||
|
||||
countSdlStore = await db.oneFirst<number>(sql`
|
||||
SELECT count(*) as total FROM sdl_store WHERE
|
||||
|
|
|
|||
|
|
@ -675,11 +675,10 @@ export interface Storage {
|
|||
/**
|
||||
* Delete the expired schema checks from the database.
|
||||
*/
|
||||
purgeExpiredSchemaChecks(_: { expiresAt: Date }): Promise<number>;
|
||||
/**
|
||||
* Delete rows from sdl_store that are not referenced by any schema check.
|
||||
*/
|
||||
purgeUnusedSchemasInStore(): Promise<void>;
|
||||
purgeExpiredSchemaChecks(_: { expiresAt: Date }): Promise<{
|
||||
deletedSchemaCheckCount: number;
|
||||
deletedSdlStoreCount: number;
|
||||
}>;
|
||||
/**
|
||||
* Find schema check for a given ID and target.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import {
|
|||
registerTRPC,
|
||||
reportReadiness,
|
||||
startMetrics,
|
||||
startSentryTransaction,
|
||||
} from '@hive/service-common';
|
||||
import { createConnectionString, createStorage as createPostgreSQLStorage } from '@hive/storage';
|
||||
import { Dedupe, ExtraErrorData } from '@sentry/integrations';
|
||||
|
|
@ -90,7 +91,7 @@ export async function main() {
|
|||
|
||||
const storage = await createPostgreSQLStorage(createConnectionString(env.postgres), 10);
|
||||
|
||||
let dbPureTaskRunner: null | ReturnType<typeof createTaskRunner> = null;
|
||||
let dbPurgeTaskRunner: null | ReturnType<typeof createTaskRunner> = null;
|
||||
|
||||
if (!env.hiveServices.usageEstimator) {
|
||||
server.log.debug('Usage estimation is disabled. Skip scheduling purge tasks.');
|
||||
|
|
@ -98,17 +99,35 @@ export async function main() {
|
|||
server.log.debug(
|
||||
`Usage estimation is enabled. Start scheduling purge tasks every ${env.hiveServices.usageEstimator.dateRetentionPurgeIntervalMinutes} minutes.`,
|
||||
);
|
||||
dbPureTaskRunner = createTaskRunner({
|
||||
dbPurgeTaskRunner = createTaskRunner({
|
||||
async run() {
|
||||
await storage.purgeExpiredSchemaChecks({
|
||||
expiresAt: new Date(),
|
||||
const transaction = startSentryTransaction({
|
||||
op: 'db.purgeTaskRunner',
|
||||
name: 'Purge Task',
|
||||
});
|
||||
// TODO: activate await storage.purgeUnusedSchemasInStore();
|
||||
try {
|
||||
const result = await storage.purgeExpiredSchemaChecks({
|
||||
expiresAt: new Date(),
|
||||
});
|
||||
server.log.debug(
|
||||
'Finished running schema check purge task. (deletedSchemaCheckCount=%s deletedSdlStoreCount=%s)',
|
||||
result.deletedSchemaCheckCount,
|
||||
result.deletedSdlStoreCount,
|
||||
);
|
||||
transaction.setMeasurement('deletedSchemaCheckCount', result.deletedSchemaCheckCount, '');
|
||||
transaction.setMeasurement('deletedSdlStoreCount', result.deletedSdlStoreCount, '');
|
||||
transaction.finish();
|
||||
} catch (error) {
|
||||
captureException(error);
|
||||
transaction.setStatus('internal_error');
|
||||
transaction.finish();
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
interval: env.hiveServices.usageEstimator.dateRetentionPurgeIntervalMinutes * 60 * 1000,
|
||||
logger: server.log,
|
||||
});
|
||||
dbPureTaskRunner.start();
|
||||
dbPurgeTaskRunner.start();
|
||||
}
|
||||
|
||||
registerShutdown({
|
||||
|
|
@ -118,9 +137,9 @@ export async function main() {
|
|||
await server.close();
|
||||
server.log.info('Stopping Storage handler...');
|
||||
await storage.destroy();
|
||||
if (dbPureTaskRunner) {
|
||||
if (dbPurgeTaskRunner) {
|
||||
server.log.info('Stopping expired schema check purge task...');
|
||||
await dbPureTaskRunner.stop();
|
||||
await dbPurgeTaskRunner.stop();
|
||||
}
|
||||
server.log.info('Shutdown complete.');
|
||||
},
|
||||
|
|
|
|||
|
|
@ -6,3 +6,4 @@ export * from './heartbeats';
|
|||
export * from './trpc';
|
||||
export { registerShutdown } from './graceful-shutdown';
|
||||
export { cleanRequestId } from './helpers';
|
||||
export { startSentryTransaction } from './sentry';
|
||||
|
|
|
|||
|
|
@ -1,7 +1,13 @@
|
|||
import type { FastifyInstance, FastifyPluginAsync, FastifyRequest } from 'fastify';
|
||||
import fp from 'fastify-plugin';
|
||||
import * as Sentry from '@sentry/node';
|
||||
import type { ExtractedNodeRequestData, TraceparentData, Transaction } from '@sentry/types';
|
||||
import type {
|
||||
CustomSamplingContext,
|
||||
ExtractedNodeRequestData,
|
||||
TraceparentData,
|
||||
Transaction,
|
||||
TransactionContext,
|
||||
} from '@sentry/types';
|
||||
import { extractTraceparentData, normalize } from '@sentry/utils';
|
||||
import { cleanRequestId } from './helpers';
|
||||
|
||||
|
|
@ -43,7 +49,7 @@ const plugin: FastifyPluginAsync = async server => {
|
|||
|
||||
const extractedRequestData = extractRequestData(request);
|
||||
|
||||
const transaction = Sentry.startTransaction(
|
||||
const transaction = startSentryTransaction(
|
||||
{
|
||||
op: 'http.server',
|
||||
name: `${request.method} ${request.url}`,
|
||||
|
|
@ -52,9 +58,6 @@ const plugin: FastifyPluginAsync = async server => {
|
|||
{ request: extractedRequestData },
|
||||
);
|
||||
(reply as any).sentryTransaction = transaction;
|
||||
transaction.sampled = true;
|
||||
|
||||
Sentry.configureScope(scope => scope.setSpan(transaction));
|
||||
|
||||
return;
|
||||
});
|
||||
|
|
@ -210,3 +213,15 @@ function replaceAuthorization(value?: string | string[]): string | string[] {
|
|||
|
||||
return '<missing>';
|
||||
}
|
||||
|
||||
export function startSentryTransaction(
|
||||
context: TransactionContext,
|
||||
customSamplingContext?: CustomSamplingContext,
|
||||
) {
|
||||
const transaction = Sentry.startTransaction(context, customSamplingContext);
|
||||
transaction.sampled = true;
|
||||
|
||||
Sentry.configureScope(scope => scope.setSpan(transaction));
|
||||
|
||||
return transaction;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3687,34 +3687,61 @@ export async function createStorage(connection: string, maximumPoolSize: number)
|
|||
},
|
||||
|
||||
async purgeExpiredSchemaChecks(args) {
|
||||
await pool.query(sql`
|
||||
DELETE
|
||||
FROM
|
||||
"public"."schema_checks"
|
||||
WHERE
|
||||
"expires_at" <= ${args.expiresAt.toISOString()}
|
||||
`);
|
||||
return await pool.transaction(async pool => {
|
||||
const result = await pool.any<unknown>(sql`
|
||||
DELETE
|
||||
FROM "public"."schema_checks"
|
||||
WHERE
|
||||
"id" = ANY(
|
||||
SELECT
|
||||
"id"
|
||||
FROM
|
||||
"public"."schema_checks"
|
||||
WHERE
|
||||
"expires_at" <= ${args.expiresAt.toISOString()}
|
||||
LIMIT
|
||||
1000
|
||||
)
|
||||
RETURNING
|
||||
"schema_sdl_store_id" as "id1",
|
||||
"supergraph_sdl_store_id" as "id2",
|
||||
"composite_schema_sdl_store_id" as "id3"
|
||||
`);
|
||||
const ids = PurgeExpiredSchemaChecksIDModel.parse(result);
|
||||
|
||||
return 0;
|
||||
},
|
||||
if (ids.size === 0) {
|
||||
return {
|
||||
deletedSchemaCheckCount: result.length,
|
||||
deletedSdlStoreCount: 0,
|
||||
};
|
||||
}
|
||||
const deletedRecords = await pool.any<unknown>(sql`
|
||||
DELETE
|
||||
FROM
|
||||
"sdl_store"
|
||||
WHERE
|
||||
"id" = ANY(
|
||||
${sql.array(Array.from(ids), 'text')}
|
||||
)
|
||||
AND NOT EXISTS (
|
||||
SELECT
|
||||
1
|
||||
FROM
|
||||
"schema_checks"
|
||||
WHERE
|
||||
"schema_checks"."schema_sdl_store_id" = "sdl_store"."id"
|
||||
OR "schema_checks"."composite_schema_sdl_store_id" = "sdl_store"."id"
|
||||
OR "schema_checks"."supergraph_sdl_store_id" = "sdl_store"."id"
|
||||
)
|
||||
RETURNING
|
||||
true as "d"
|
||||
`);
|
||||
|
||||
async purgeUnusedSchemasInStore() {
|
||||
await pool.query(sql`
|
||||
DELETE FROM "public"."sdl_store"
|
||||
WHERE id NOT IN (
|
||||
SELECT DISTINCT schema_sdl_store_id
|
||||
FROM "public"."schema_checks"
|
||||
WHERE schema_sdl_store_id IS NOT NULL
|
||||
UNION
|
||||
SELECT DISTINCT supergraph_sdl_store_id
|
||||
FROM "public"."schema_checks"
|
||||
WHERE supergraph_sdl_store_id IS NOT NULL
|
||||
UNION
|
||||
SELECT DISTINCT composite_schema_sdl_store_id
|
||||
FROM "public"."schema_checks"
|
||||
WHERE composite_schema_sdl_store_id IS NOT NULL
|
||||
)
|
||||
`);
|
||||
return {
|
||||
deletedSchemaCheckCount: result.length,
|
||||
deletedSdlStoreCount: deletedRecords.length,
|
||||
};
|
||||
});
|
||||
},
|
||||
|
||||
async getSchemaVersionByActionId(args) {
|
||||
|
|
@ -4145,4 +4172,22 @@ const TargetModel = zod.object({
|
|||
graphqlEndpointUrl: zod.string().nullable(),
|
||||
});
|
||||
|
||||
const PurgeExpiredSchemaChecksIDModel = zod
|
||||
.array(
|
||||
zod.object({
|
||||
id1: zod.string().nullable(),
|
||||
id2: zod.string().nullable(),
|
||||
id3: zod.string().nullable(),
|
||||
}),
|
||||
)
|
||||
.transform(items => {
|
||||
const ids = new Set<string>();
|
||||
for (const row of items) {
|
||||
row.id1 && ids.add(row.id1);
|
||||
row.id2 && ids.add(row.id2);
|
||||
row.id3 && ids.add(row.id3);
|
||||
}
|
||||
return ids;
|
||||
});
|
||||
|
||||
export * from './schema-change-model';
|
||||
|
|
|
|||
Loading…
Reference in a new issue