diff --git a/packages/migrations/test/2023.11.02T14.41.41.schema-checks-dedup.test.ts b/packages/migrations/test/2023.11.02T14.41.41.schema-checks-dedup.test.ts index 843d66fcb..75d78c3b5 100644 --- a/packages/migrations/test/2023.11.02T14.41.41.schema-checks-dedup.test.ts +++ b/packages/migrations/test/2023.11.02T14.41.41.schema-checks-dedup.test.ts @@ -240,13 +240,28 @@ await describe('migration: schema-checks-dedup', async () => { sql`SELECT count(*) as total FROM schema_checks`, ); assert.strictEqual(countSchemaChecks, 4); - await db.query(sql`DELETE FROM schema_checks WHERE id = ${newSchemaCheck.id}`); + const expiresAt = new Date(); + await db.query(sql` + UPDATE + schema_checks + SET + expires_at = ${expiresAt.toISOString()} + WHERE + id = ${newSchemaCheck.id} + `); + + // Purge unused SDLs from sdl_store + const result = await storage.purgeExpiredSchemaChecks({ + expiresAt, + }); + + assert.strictEqual(result.deletedSchemaCheckCount, 1); + assert.strictEqual(result.deletedSdlStoreCount, 3); + countSchemaChecks = await db.oneFirst( sql`SELECT count(*) as total FROM schema_checks`, ); assert.strictEqual(countSchemaChecks, 3); - // Purge unused SDLs from sdl_store - await storage.purgeUnusedSchemasInStore(); countSdlStore = await db.oneFirst(sql` SELECT count(*) as total FROM sdl_store WHERE diff --git a/packages/services/api/src/modules/shared/providers/storage.ts b/packages/services/api/src/modules/shared/providers/storage.ts index 6b024dc7f..83e9dd557 100644 --- a/packages/services/api/src/modules/shared/providers/storage.ts +++ b/packages/services/api/src/modules/shared/providers/storage.ts @@ -675,11 +675,10 @@ export interface Storage { /** * Delete the expired schema checks from the database. */ - purgeExpiredSchemaChecks(_: { expiresAt: Date }): Promise; - /** - * Delete rows from sdl_store that are not referenced by any schema check. - */ - purgeUnusedSchemasInStore(): Promise; + purgeExpiredSchemaChecks(_: { expiresAt: Date }): Promise<{ + deletedSchemaCheckCount: number; + deletedSdlStoreCount: number; + }>; /** * Find schema check for a given ID and target. */ diff --git a/packages/services/server/src/index.ts b/packages/services/server/src/index.ts index f03c8b6ab..0d88d114e 100644 --- a/packages/services/server/src/index.ts +++ b/packages/services/server/src/index.ts @@ -15,6 +15,7 @@ import { registerTRPC, reportReadiness, startMetrics, + startSentryTransaction, } from '@hive/service-common'; import { createConnectionString, createStorage as createPostgreSQLStorage } from '@hive/storage'; import { Dedupe, ExtraErrorData } from '@sentry/integrations'; @@ -90,7 +91,7 @@ export async function main() { const storage = await createPostgreSQLStorage(createConnectionString(env.postgres), 10); - let dbPureTaskRunner: null | ReturnType = null; + let dbPurgeTaskRunner: null | ReturnType = null; if (!env.hiveServices.usageEstimator) { server.log.debug('Usage estimation is disabled. Skip scheduling purge tasks.'); @@ -98,17 +99,35 @@ export async function main() { server.log.debug( `Usage estimation is enabled. Start scheduling purge tasks every ${env.hiveServices.usageEstimator.dateRetentionPurgeIntervalMinutes} minutes.`, ); - dbPureTaskRunner = createTaskRunner({ + dbPurgeTaskRunner = createTaskRunner({ async run() { - await storage.purgeExpiredSchemaChecks({ - expiresAt: new Date(), + const transaction = startSentryTransaction({ + op: 'db.purgeTaskRunner', + name: 'Purge Task', }); - // TODO: activate await storage.purgeUnusedSchemasInStore(); + try { + const result = await storage.purgeExpiredSchemaChecks({ + expiresAt: new Date(), + }); + server.log.debug( + 'Finished running schema check purge task. (deletedSchemaCheckCount=%s deletedSdlStoreCount=%s)', + result.deletedSchemaCheckCount, + result.deletedSdlStoreCount, + ); + transaction.setMeasurement('deletedSchemaCheckCount', result.deletedSchemaCheckCount, ''); + transaction.setMeasurement('deletedSdlStoreCount', result.deletedSdlStoreCount, ''); + transaction.finish(); + } catch (error) { + captureException(error); + transaction.setStatus('internal_error'); + transaction.finish(); + throw error; + } }, interval: env.hiveServices.usageEstimator.dateRetentionPurgeIntervalMinutes * 60 * 1000, logger: server.log, }); - dbPureTaskRunner.start(); + dbPurgeTaskRunner.start(); } registerShutdown({ @@ -118,9 +137,9 @@ export async function main() { await server.close(); server.log.info('Stopping Storage handler...'); await storage.destroy(); - if (dbPureTaskRunner) { + if (dbPurgeTaskRunner) { server.log.info('Stopping expired schema check purge task...'); - await dbPureTaskRunner.stop(); + await dbPurgeTaskRunner.stop(); } server.log.info('Shutdown complete.'); }, diff --git a/packages/services/service-common/src/index.ts b/packages/services/service-common/src/index.ts index 68f94e2fa..d8e214b2b 100644 --- a/packages/services/service-common/src/index.ts +++ b/packages/services/service-common/src/index.ts @@ -6,3 +6,4 @@ export * from './heartbeats'; export * from './trpc'; export { registerShutdown } from './graceful-shutdown'; export { cleanRequestId } from './helpers'; +export { startSentryTransaction } from './sentry'; diff --git a/packages/services/service-common/src/sentry.ts b/packages/services/service-common/src/sentry.ts index bc92e9a93..3bc6fecf4 100644 --- a/packages/services/service-common/src/sentry.ts +++ b/packages/services/service-common/src/sentry.ts @@ -1,7 +1,13 @@ import type { FastifyInstance, FastifyPluginAsync, FastifyRequest } from 'fastify'; import fp from 'fastify-plugin'; import * as Sentry from '@sentry/node'; -import type { ExtractedNodeRequestData, TraceparentData, Transaction } from '@sentry/types'; +import type { + CustomSamplingContext, + ExtractedNodeRequestData, + TraceparentData, + Transaction, + TransactionContext, +} from '@sentry/types'; import { extractTraceparentData, normalize } from '@sentry/utils'; import { cleanRequestId } from './helpers'; @@ -43,7 +49,7 @@ const plugin: FastifyPluginAsync = async server => { const extractedRequestData = extractRequestData(request); - const transaction = Sentry.startTransaction( + const transaction = startSentryTransaction( { op: 'http.server', name: `${request.method} ${request.url}`, @@ -52,9 +58,6 @@ const plugin: FastifyPluginAsync = async server => { { request: extractedRequestData }, ); (reply as any).sentryTransaction = transaction; - transaction.sampled = true; - - Sentry.configureScope(scope => scope.setSpan(transaction)); return; }); @@ -210,3 +213,15 @@ function replaceAuthorization(value?: string | string[]): string | string[] { return ''; } + +export function startSentryTransaction( + context: TransactionContext, + customSamplingContext?: CustomSamplingContext, +) { + const transaction = Sentry.startTransaction(context, customSamplingContext); + transaction.sampled = true; + + Sentry.configureScope(scope => scope.setSpan(transaction)); + + return transaction; +} diff --git a/packages/services/storage/src/index.ts b/packages/services/storage/src/index.ts index a15c2f206..d871e9cbe 100644 --- a/packages/services/storage/src/index.ts +++ b/packages/services/storage/src/index.ts @@ -3687,34 +3687,61 @@ export async function createStorage(connection: string, maximumPoolSize: number) }, async purgeExpiredSchemaChecks(args) { - await pool.query(sql` - DELETE - FROM - "public"."schema_checks" - WHERE - "expires_at" <= ${args.expiresAt.toISOString()} - `); + return await pool.transaction(async pool => { + const result = await pool.any(sql` + DELETE + FROM "public"."schema_checks" + WHERE + "id" = ANY( + SELECT + "id" + FROM + "public"."schema_checks" + WHERE + "expires_at" <= ${args.expiresAt.toISOString()} + LIMIT + 1000 + ) + RETURNING + "schema_sdl_store_id" as "id1", + "supergraph_sdl_store_id" as "id2", + "composite_schema_sdl_store_id" as "id3" + `); + const ids = PurgeExpiredSchemaChecksIDModel.parse(result); - return 0; - }, + if (ids.size === 0) { + return { + deletedSchemaCheckCount: result.length, + deletedSdlStoreCount: 0, + }; + } + const deletedRecords = await pool.any(sql` + DELETE + FROM + "sdl_store" + WHERE + "id" = ANY( + ${sql.array(Array.from(ids), 'text')} + ) + AND NOT EXISTS ( + SELECT + 1 + FROM + "schema_checks" + WHERE + "schema_checks"."schema_sdl_store_id" = "sdl_store"."id" + OR "schema_checks"."composite_schema_sdl_store_id" = "sdl_store"."id" + OR "schema_checks"."supergraph_sdl_store_id" = "sdl_store"."id" + ) + RETURNING + true as "d" + `); - async purgeUnusedSchemasInStore() { - await pool.query(sql` - DELETE FROM "public"."sdl_store" - WHERE id NOT IN ( - SELECT DISTINCT schema_sdl_store_id - FROM "public"."schema_checks" - WHERE schema_sdl_store_id IS NOT NULL - UNION - SELECT DISTINCT supergraph_sdl_store_id - FROM "public"."schema_checks" - WHERE supergraph_sdl_store_id IS NOT NULL - UNION - SELECT DISTINCT composite_schema_sdl_store_id - FROM "public"."schema_checks" - WHERE composite_schema_sdl_store_id IS NOT NULL - ) - `); + return { + deletedSchemaCheckCount: result.length, + deletedSdlStoreCount: deletedRecords.length, + }; + }); }, async getSchemaVersionByActionId(args) { @@ -4145,4 +4172,22 @@ const TargetModel = zod.object({ graphqlEndpointUrl: zod.string().nullable(), }); +const PurgeExpiredSchemaChecksIDModel = zod + .array( + zod.object({ + id1: zod.string().nullable(), + id2: zod.string().nullable(), + id3: zod.string().nullable(), + }), + ) + .transform(items => { + const ids = new Set(); + for (const row of items) { + row.id1 && ids.add(row.id1); + row.id2 && ids.add(row.id2); + row.id3 && ids.add(row.id3); + } + return ids; + }); + export * from './schema-change-model';