import { createPool, type DatabasePool, type Interceptor, type PrimitiveValueExpression, type QueryResultRow, type QueryResultRowColumn, type CommonQueryMethods as SlonikCommonQueryMethods, type TaggedTemplateLiteralInvocation, } from 'slonik'; import { createQueryLoggingInterceptor } from 'slonik-interceptor-query-logging'; import { context, SpanKind, SpanStatusCode, trace } from '@hive/service-common'; import { createConnectionString, type PostgresConnectionParamaters } from './connection-string'; const tracer = trace.getTracer('storage'); export interface CommonQueryMethods { exists( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise; any( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise>; maybeOne( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise; query(sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[]): Promise; oneFirst( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise; one(sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[]): Promise; anyFirst( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise>; maybeOneFirst( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise; } export class PostgresDatabasePool implements CommonQueryMethods { constructor(private pool: DatabasePool) {} /** Retrieve the raw PgPool instance. Refrain from using this API. It only exists for postgraphile workers */ getRawPgPool() { return this.pool.pool; } /** Retrieve the raw Slonik instance. Refrain from using this API. */ getSlonikPool() { return this.pool; } async exists( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return this.pool.exists(sql, values); } async any( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise> { return this.pool.any(sql, values); } async maybeOne( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return this.pool.maybeOne(sql, values); } async query( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { await this.pool.query(sql, values); } async oneFirst( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return await this.pool.oneFirst(sql, values); } async maybeOneFirst( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return await this.pool.maybeOneFirst(sql, values); } async one( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return await this.pool.one(sql, values); } async anyFirst( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise> { return await this.pool.anyFirst(sql, values); } async transaction( name: string, handler: (methods: CommonQueryMethods) => Promise, ): Promise { const span = tracer.startSpan(`PG Transaction: ${name}`, { kind: SpanKind.INTERNAL, }); return context.with(trace.setSpan(context.active(), span), async () => { return await this.pool.transaction(async methods => { try { return await handler({ async exists( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return methods.exists(sql, values); }, async any( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise> { return methods.any(sql, values); }, async maybeOne( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return methods.maybeOne(sql, values); }, async query( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { await methods.query(sql, values); }, async oneFirst( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return await methods.oneFirst(sql, values); }, async maybeOneFirst( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return await methods.maybeOneFirst(sql, values); }, async anyFirst( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise> { return await methods.anyFirst(sql, values); }, async one( sql: TaggedTemplateLiteralInvocation, values?: PrimitiveValueExpression[], ): Promise { return await methods.one(sql, values); }, }); } catch (err) { span.setAttribute('error', 'true'); if (err instanceof Error) { span.setAttribute('error.type', err.name); span.setAttribute('error.message', err.message); span.setStatus({ code: SpanStatusCode.ERROR, message: err.message, }); } throw err; } finally { span.end(); } }); }); } end(): Promise { return this.pool.end(); } } const dbInterceptors: Interceptor[] = [createQueryLoggingInterceptor()]; export async function createPostgresDatabasePool(args: { connectionParameters: PostgresConnectionParamaters | string; maximumPoolSize?: number; additionalInterceptors?: Interceptor[]; statementTimeout?: number; }) { const connectionString = typeof args.connectionParameters === 'string' ? args.connectionParameters : createConnectionString(args.connectionParameters); const pool = await createPool(connectionString, { interceptors: dbInterceptors.concat(args.additionalInterceptors ?? []), captureStackTrace: false, maximumPoolSize: args.maximumPoolSize, idleTimeout: 30000, statementTimeout: args.statementTimeout, }); function interceptError>( methodName: K, ) { const original: SlonikCommonQueryMethods[K] = pool[methodName]; function interceptor( this: any, sql: TaggedTemplateLiteralInvocation, values?: QueryResultRowColumn[], ): any { return (original as any).call(this, sql, values).catch((error: any) => { error.sql = sql.sql; error.values = sql.values || values; return Promise.reject(error); }); } pool[methodName] = interceptor; } interceptError('one'); interceptError('many'); return new PostgresDatabasePool(pool); }