Add schema proposal composition background job (#7706)

This commit is contained in:
jdolle 2026-02-27 13:53:14 -08:00 committed by GitHub
parent 84c44aa1a6
commit 9357d39646
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
51 changed files with 1647 additions and 538 deletions

View file

@ -0,0 +1,18 @@
---
'hive': minor
---
We continue to build and expand the features of schema proposals. In this change, a background
composition job was added to allow asynchronous updates to the composition state of a proposal. This
composition job uses the schema service's composer but is unique from checks in that it takes the latest state of all subgraphs that are a part of a schema proposal.
### Additional environment variables for `workflows` service:
The `workflow` service calls the `schema` service's composeAndValidate TRPC endpoint and requires the `schema` service endpoint. And the shared instance of Redis, used as a pubsub in the `server` and `api` services, is also now used by `workflows` to update `Subscription.schemaProposalComposition`.
For self hosters, make sure to provide the following environment variables to the `workflows` service:
- SCHEMA_ENDPOINT
- REDIS_HOST
- REDIS_PORT
- REDIS_PASSWORD

View file

@ -136,17 +136,6 @@ const tokens = deployTokens({
observability,
});
deployWorkflows({
image: docker.factory.getImageId('workflows', imagesTag),
docker,
environment,
postgres,
postmarkSecret,
observability,
sentry,
heartbeat: heartbeatsConfig.get('webhooks'),
});
const commerce = deployCommerce({
image: docker.factory.getImageId('commerce', imagesTag),
docker,
@ -201,6 +190,18 @@ const schemaPolicy = deploySchemaPolicy({
observability,
});
deployWorkflows({
image: docker.factory.getImageId('workflows', imagesTag),
docker,
environment,
postgres,
postmarkSecret,
observability,
sentry,
heartbeat: heartbeatsConfig.get('webhooks'),
schema,
});
const supertokens = deploySuperTokens(postgres, { dependencies: [dbMigrations] }, environment);
const zendesk = configureZendesk({ environment });
const githubApp = configureGithubApp();

View file

@ -1,10 +1,12 @@
import * as pulumi from '@pulumi/pulumi';
import { serviceLocalEndpoint } from '../utils/local-endpoint';
import { ServiceSecret } from '../utils/secrets';
import { ServiceDeployment } from '../utils/service-deployment';
import { Docker } from './docker';
import { Environment } from './environment';
import { Observability } from './observability';
import { Postgres } from './postgres';
import { Schema } from './schema';
import { Sentry } from './sentry';
export class PostmarkSecret extends ServiceSecret<{
@ -22,6 +24,7 @@ export function deployWorkflows({
postgres,
observability,
postmarkSecret,
schema,
}: {
postgres: Postgres;
observability: Observability;
@ -31,6 +34,7 @@ export function deployWorkflows({
heartbeat?: string;
sentry: Sentry;
postmarkSecret: PostmarkSecret;
schema: Schema;
}) {
return (
new ServiceDeployment(
@ -47,6 +51,7 @@ export function deployWorkflows({
? observability.tracingEndpoint
: '',
LOG_JSON: '1',
SCHEMA_ENDPOINT: serviceLocalEndpoint(schema.service),
},
readinessProbe: '/_readiness',
livenessProbe: '/_health',

View file

@ -322,6 +322,10 @@ services:
SENTRY_DSN: '${SENTRY_DSN:-}'
PROMETHEUS_METRICS: '${PROMETHEUS_METRICS:-}'
LOG_JSON: '1'
SCHEMA_ENDPOINT: http://schema:3002
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_PASSWORD: '${REDIS_PASSWORD}'
usage:
image: '${DOCKER_REGISTRY}usage${DOCKER_TAG}'

View file

@ -239,6 +239,7 @@ services:
workflows:
environment:
EMAIL_PROVIDER: '${EMAIL_PROVIDER}'
SCHEMA_ENDPOINT: http://schema:3002
LOG_LEVEL: debug
ports:
- '3014:3014'

View file

@ -34,6 +34,7 @@
"dockerode": "4.0.8",
"dotenv": "16.4.7",
"graphql": "16.9.0",
"graphql-sse": "2.6.0",
"human-id": "4.1.1",
"ioredis": "5.8.2",
"slonik": "30.4.4",

View file

@ -1,4 +1,5 @@
import { ExecutionResult, parse, print } from 'graphql';
import { createClient } from 'graphql-sse';
import { TypedDocumentNode } from '@graphql-typed-document-node/core';
import { sortSDL } from '@theguild/federation-composition';
import { getServiceHost } from './utils';
@ -87,3 +88,42 @@ export async function execute<TResult, TVariables>(
},
};
}
export async function subscribe<TResult, TVariables>(
params: {
document: TypedDocumentNode<TResult, TVariables>;
operationName?: string;
authToken?: string;
token?: string;
legacyAuthorizationMode?: boolean;
} & (TVariables extends Record<string, never>
? { variables?: never }
: { variables: TVariables }),
) {
const registryAddress = await getServiceHost('server', 8082);
const client = createClient({
url: `http://${registryAddress}/graphql`,
headers: {
...(params.authToken
? {
authorization: `Bearer ${params.authToken}`,
}
: {}),
...(params.token
? params.legacyAuthorizationMode
? {
'x-api-token': params.token,
}
: {
authorization: `Bearer ${params.token}`,
}
: {}),
},
});
return client.iterate({
operationName: params.operationName,
query: print(params.document),
variables: params.variables ?? {},
});
}

View file

@ -766,6 +766,7 @@ export function initSeed() {
commit: string;
},
contextId?: string,
schemaProposalId?: string,
) {
return await checkSchema(
{
@ -773,6 +774,7 @@ export function initSeed() {
service,
meta,
contextId,
schemaProposalId,
},
secret,
);

View file

@ -0,0 +1,83 @@
import { graphql } from 'testkit/gql';
import { ProjectType, ResourceAssignmentModeType } from 'testkit/gql/graphql';
import { execute } from 'testkit/graphql';
import { initSeed } from 'testkit/seed';
const CreateProposalMutation = graphql(`
mutation CreateProposalMutation($input: CreateSchemaProposalInput!) {
createSchemaProposal(input: $input) {
ok {
schemaProposal {
id
}
}
error {
message
}
}
}
`);
describe('Schema Proposals', () => {
test.concurrent(
'cannot be proposed without "schemaProposal:modify" permission',
async ({ expect }) => {
const { createOrg, ownerToken } = await initSeed().createOwner();
const { createProject, createOrganizationAccessToken, setFeatureFlag } = await createOrg();
await setFeatureFlag('schemaProposals', true);
const { target } = await createProject(ProjectType.Federation);
const { privateAccessKey: accessKey } = await createOrganizationAccessToken(
{
resources: {
mode: ResourceAssignmentModeType.All,
},
permissions: ['schemaProposal:describe'],
},
ownerToken,
);
const result = await execute({
document: CreateProposalMutation,
variables: {
input: {
target: { byId: target.id },
author: 'Jeff',
title: 'Proposed changes to the schema...',
},
},
authToken: accessKey,
}).then(r => r.expectGraphQLErrors());
},
);
test.concurrent(
'can be proposed successfully with "schemaProposal:modify" permission',
async ({ expect }) => {
const { createOrg, ownerToken } = await initSeed().createOwner();
const { createProject, createOrganizationAccessToken, setFeatureFlag } = await createOrg();
await setFeatureFlag('schemaProposals', true);
const { target } = await createProject(ProjectType.Federation);
const { privateAccessKey: accessKey } = await createOrganizationAccessToken({
resources: {
mode: ResourceAssignmentModeType.All,
},
permissions: ['schemaProposal:modify'],
});
const result = await execute({
document: CreateProposalMutation,
variables: {
input: {
target: { byId: target.id },
author: 'Jeff',
title: 'Proposed changes to the schema...',
},
},
authToken: accessKey,
}).then(r => r.expectNoGraphQLErrors());
expect(result.createSchemaProposal.ok?.schemaProposal).toHaveProperty('id');
},
);
});

View file

@ -0,0 +1,115 @@
import { graphql } from 'testkit/gql';
import { ProjectType, ResourceAssignmentModeType } from 'testkit/gql/graphql';
import { execute } from 'testkit/graphql';
import { initSeed } from 'testkit/seed';
const CreateProposalMutation = graphql(`
mutation CreateProposalMutation($input: CreateSchemaProposalInput!) {
createSchemaProposal(input: $input) {
ok {
schemaProposal {
id
}
}
error {
message
}
}
}
`);
const ReadProposalQuery = graphql(`
query ReadProposalQuery($input: SchemaProposalInput!) {
schemaProposal(input: $input) {
title
description
checks(input: { latestPerService: true }) {
edges {
node {
id
}
}
}
}
}
`);
/**
* Creates a proposal and returns a token with specified permissions
**/
async function setup(input: {
tokenPermissions: string[];
}): Promise<{ accessKey: string; proposalId: string }> {
const { createOrg, ownerToken } = await initSeed().createOwner();
const { createProject, createOrganizationAccessToken, setFeatureFlag } = await createOrg();
await setFeatureFlag('schemaProposals', true);
const { target } = await createProject(ProjectType.Federation);
// create as owner
const result = await execute({
document: CreateProposalMutation,
variables: {
input: {
target: { byId: target.id },
author: 'Jeff',
title: 'Proposed changes to the schema...',
},
},
token: ownerToken,
}).then(r => r.expectNoGraphQLErrors());
const { privateAccessKey: accessKey } = await createOrganizationAccessToken(
{
resources: {
mode: ResourceAssignmentModeType.All,
},
permissions: input.tokenPermissions,
},
ownerToken,
);
const proposalId = result.createSchemaProposal.ok?.schemaProposal.id!;
return { accessKey, proposalId };
}
describe('Schema Proposals', () => {
test.concurrent(
'can read proposal with "schemaProposal:describe" permission',
async ({ expect }) => {
const { accessKey, proposalId } = await setup({
tokenPermissions: ['schemaProposal:describe'],
});
{
const proposal = await execute({
document: ReadProposalQuery,
variables: {
input: {
id: proposalId,
},
},
token: accessKey,
}).then(r => r.expectNoGraphQLErrors());
expect(proposal.schemaProposal?.title).toMatchInlineSnapshot(
`Proposed changes to the schema...`,
);
}
},
);
test.concurrent('cannot read proposal without "schemaProposal:describe" permission', async () => {
const { accessKey, proposalId } = await setup({ tokenPermissions: [] });
{
await execute({
document: ReadProposalQuery,
variables: {
input: {
id: proposalId,
},
},
token: accessKey,
}).then(r => r.expectGraphQLErrors());
}
});
});

View file

@ -0,0 +1,116 @@
import { graphql } from 'testkit/gql';
import { ProjectType, ResourceAssignmentModeType } from 'testkit/gql/graphql';
import { execute, subscribe } from 'testkit/graphql';
import { initSeed } from 'testkit/seed';
const CreateProposalMutation = graphql(`
mutation CreateProposalMutation($input: CreateSchemaProposalInput!) {
createSchemaProposal(input: $input) {
ok {
schemaProposal {
id
}
}
error {
message
}
}
}
`);
const ProposalCompositionSubscription = graphql(`
subscription ProposalCompositionSubscription(
$input: SchemaProposalCompositionSubscriptionInput!
) {
schemaProposalComposition(input: $input) {
status
timestamp
}
}
`);
/**
* Creates a proposal and returns a token with specified permissions
**/
async function setup(input: { tokenPermissions: string[] }) {
const { createOrg, ownerToken } = await initSeed().createOwner();
const { createProject, createOrganizationAccessToken, setFeatureFlag } = await createOrg();
await setFeatureFlag('schemaProposals', true);
const project = await createProject(ProjectType.Federation);
// create as owner
const result = await execute({
document: CreateProposalMutation,
variables: {
input: {
target: { byId: project.target.id },
author: 'Jeff',
title: 'Proposed changes to the schema...',
},
},
token: ownerToken,
}).then(r => r.expectNoGraphQLErrors());
const { privateAccessKey: accessKey } = await createOrganizationAccessToken(
{
resources: {
mode: ResourceAssignmentModeType.All,
},
permissions: input.tokenPermissions,
},
ownerToken,
);
const proposalId = result.createSchemaProposal.ok?.schemaProposal.id!;
return { accessKey, proposalId, project };
}
describe('Schema Proposals', () => {
test.concurrent(
'can subscribe for proposal events with "schemaProposal:describe" permission',
async ({ expect }) => {
const { accessKey, proposalId, project } = await setup({
tokenPermissions: ['schemaProposal:describe'],
});
const query = await subscribe({
document: ProposalCompositionSubscription,
variables: {
input: {
proposalId,
},
},
token: accessKey,
});
// create the schema check to trigger the composition and subscription event
const token = await project.createTargetAccessToken({ mode: 'readWrite' });
await token.publishSchema({
sdl: /* GraphQL */ `
type Query {
ping: String
}
`,
service: 'example',
url: 'http://localhost:4001',
});
const checkResultErrors = await token
.checkSchema(
/* GraphQL */ `
type Query {
ping: String
pong: String
}
`,
'example',
undefined,
undefined,
proposalId,
)
.then(r => r.expectNoGraphQLErrors());
expect(checkResultErrors.schemaCheck.__typename).toBe(`SchemaCheckSuccess`);
const { value } = await query.next();
expect(value.data.schemaProposalComposition.status).toBe(`SUCCESS`);
await expect(query.return?.()).resolves.toMatchObject({ done: true });
},
);
});

View file

@ -76,7 +76,7 @@
"@graphql-eslint/eslint-plugin": "3.20.1",
"@graphql-inspector/cli": "6.0.6",
"@graphql-inspector/core": "7.1.2",
"@graphql-inspector/patch": "0.1.2",
"@graphql-inspector/patch": "0.1.3-alpha-20260225183305-b1e68e3f363401fe16845d7f731ff8ed3467f5a7",
"@graphql-tools/load": "8.1.2",
"@manypkg/get-packages": "2.2.2",
"@next/eslint-plugin-next": "14.2.23",

View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022 The Guild
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,62 @@
{
"name": "@hive/pubsub",
"version": "0.0.1",
"type": "module",
"repository": {
"type": "git",
"url": "graphql-hive/console",
"directory": "packages/libraries/pubsub"
},
"homepage": "https://the-guild.dev/graphql/hive",
"author": {
"email": "contact@the-guild.dev",
"name": "The Guild",
"url": "https://the-guild.dev"
},
"license": "MIT",
"private": true,
"engines": {
"node": ">=16.0.0"
},
"main": "dist/cjs/index.js",
"module": "dist/esm/index.js",
"exports": {
".": {
"require": {
"types": "./dist/typings/index.d.cts",
"default": "./dist/cjs/index.js"
},
"import": {
"types": "./dist/typings/index.d.ts",
"default": "./dist/esm/index.js"
},
"default": {
"types": "./dist/typings/index.d.ts",
"default": "./dist/esm/index.js"
}
},
"./package.json": "./package.json"
},
"typings": "dist/typings/index.d.ts",
"scripts": {
"build": "node ../../../scripts/generate-version.mjs && bob build",
"check:build": "bob check",
"typecheck": "tsc --noEmit"
},
"peerDependencies": {
"ioredis": "^5.0.0"
},
"dependencies": {
"@graphql-yoga/redis-event-target": "3.0.3",
"graphile-worker": "^0.16.0",
"graphql-yoga": "5.13.3"
},
"devDependencies": {
"tslib": "2.8.1",
"vitest": "4.0.9"
},
"sideEffects": false,
"typescript": {
"definition": "dist/typings/index.d.ts"
}
}

View file

@ -0,0 +1,16 @@
import { createPubSub } from 'graphql-yoga';
import { Redis } from 'ioredis';
import { createRedisEventTarget } from '@graphql-yoga/redis-event-target';
import { bridgeGraphileLogger, type Logger } from './logger.js';
import type { HivePubSub } from './pub-sub.js';
export function createHivePubSub(args: { publisher: Redis; subscriber: Redis }) {
return createPubSub({
eventTarget: createRedisEventTarget({
publishClient: args.publisher,
subscribeClient: args.subscriber,
}),
}) as HivePubSub;
}
export { type HivePubSub, type Logger, bridgeGraphileLogger };

View file

@ -0,0 +1,32 @@
import { Logger as GraphileLogger, type LogLevel as GraphileLogLevel } from 'graphile-worker';
export type Logger = {
error(msg: string, ...interpol: unknown[]): void;
warn(msg: string, ...interpol: unknown[]): void;
info(msg: string, ...interpol: unknown[]): void;
debug(msg: string, ...interpol: unknown[]): void;
};
function logLevel(level: GraphileLogLevel) {
switch (level) {
case 'warning':
return 'warn' as const;
case 'info': {
return 'info' as const;
}
case 'debug': {
return 'debug' as const;
}
case 'error': {
return 'error' as const;
}
}
return 'info';
}
export function bridgeGraphileLogger(logger: Logger) {
return new GraphileLogger(_scope => (level, message, _meta) => {
logger[logLevel(level)](message);
});
}

View file

@ -0,0 +1,9 @@
import type { PubSub } from 'graphql-yoga';
export type HivePubSub = PubSub<{
oidcIntegrationLogs: [oidcIntegrationId: string, payload: { timestamp: string; message: string }];
schemaProposalComposition: [
proposalId: string,
payload: { timestamp: string; status: 'SUCCESS' | 'ERROR'; reason?: string | null },
];
}>;

View file

@ -0,0 +1 @@
export const version = '0.0.1';

View file

@ -0,0 +1,15 @@
{
"extends": "../../../tsconfig.json",
"include": ["src"],
"compilerOptions": {
"types": ["node"],
"baseUrl": ".",
"outDir": "dist",
"rootDir": "src",
"target": "es2017",
"module": "esnext",
"skipLibCheck": true,
"declaration": true,
"declarationMap": true
}
}

View file

@ -0,0 +1,17 @@
import { type MigrationExecutor } from '../pg-migrator';
export default {
name: '2026.02.24T00-00-00.proposal-composition.ts',
run: ({ sql }) => [
{
name: 'add schema proposal composition state',
query: sql`
ALTER TABLE IF EXISTS "schema_proposals"
ADD COLUMN IF NOT EXISTS "composition_status" TEXT
, ADD COLUMN IF NOT EXISTS "composition_timestamp" TIMESTAMPTZ
, ADD COLUMN IF NOT EXISTS "composition_status_reason" TEXT
;
`,
},
],
} satisfies MigrationExecutor;

View file

@ -189,5 +189,6 @@ export const runPGMigrations = async (args: { slonik: DatabasePool; runTo?: stri
...(env.useSupertokensAtHome
? [await import('./actions/2026.02.18T00-00-00.ensure-supertokens-tables')]
: []),
await import('./actions/2026.02.24T00-00-00.proposal-composition'),
],
});

View file

@ -25,6 +25,7 @@
"@graphql-inspector/core": "7.1.2",
"@graphql-tools/merge": "9.1.1",
"@hive/cdn-script": "workspace:*",
"@hive/pubsub": "workspace:*",
"@hive/schema": "workspace:*",
"@hive/service-common": "workspace:*",
"@hive/storage": "workspace:*",
@ -55,6 +56,7 @@
"date-fns": "4.1.0",
"fast-json-stable-stringify": "2.1.0",
"got": "14.4.7",
"graphile-worker": "0.16.6",
"graphql": "16.9.0",
"graphql-modules": "3.1.1",
"graphql-parse-resolve-info": "4.13.0",

View file

@ -94,6 +94,16 @@ export const permissionGroups: Array<PermissionGroup> = [
title: 'Manage CDN access tokens',
description: 'Allow managing access tokens for the CDN.',
},
{
id: 'schemaProposal:modify',
title: 'Create and edit schema proposals',
description: 'Allow managing schema proposals.',
},
{
id: 'schemaProposal:describe',
title: 'View schema proposals',
description: 'Allow viewing schema proposals and adding comments.',
},
],
},
{

View file

@ -122,6 +122,21 @@ export default gql`
body: String!
}
extend type Subscription {
schemaProposalComposition(
input: SchemaProposalCompositionSubscriptionInput!
): SchemaProposalCompositionEvent!
}
type SchemaProposalCompositionEvent {
status: ProposalCompositionStatus!
timestamp: String!
}
input SchemaProposalCompositionSubscriptionInput {
proposalId: ID!
}
extend type Query {
schemaProposals(
after: String
@ -240,6 +255,15 @@ export default gql`
"""
fromCursor: String
): String
compositionStatus: ProposalCompositionStatus
compositionTimestamp: DateTime
compositionStatusReason: String
}
enum ProposalCompositionStatus {
ERROR
SUCCESS
}
input SchemaProposalChecksInput {

View file

@ -1,13 +1,14 @@
/**
* This wraps the higher level logic with schema proposals.
*/
import { Injectable, Scope } from 'graphql-modules';
import { Inject, Injectable, Scope } from 'graphql-modules';
import { TargetReferenceInput } from 'packages/libraries/core/src/client/__generated__/types';
import type { SchemaProposalStage } from '../../../__generated__/types';
import { HiveError } from '../../../shared/errors';
import { Session } from '../../auth/lib/authz';
import { IdTranslator } from '../../shared/providers/id-translator';
import { Logger } from '../../shared/providers/logger';
import { PUB_SUB_CONFIG, type HivePubSub } from '../../shared/providers/pub-sub';
import { Storage } from '../../shared/providers/storage';
import { SchemaProposalStorage } from './schema-proposal-storage';
@ -23,10 +24,41 @@ export class SchemaProposalManager {
private storage: Storage,
private session: Session,
private idTranslator: IdTranslator,
@Inject(PUB_SUB_CONFIG) private pubSub: HivePubSub,
) {
this.logger = logger.child({ source: 'SchemaProposalsManager' });
}
async subscribeToSchemaProposalCompositions(args: { proposalId: string }) {
const proposal = await this.proposalStorage.getProposalTargetId({
id: args.proposalId,
});
if (!proposal) {
this.session.raise('schemaProposal:describe');
}
const selector = await this.idTranslator.resolveTargetReference({
reference: {
byId: proposal.targetId,
},
});
if (!selector) {
this.session.raise('schemaProposal:describe');
}
await this.session.assertPerformAction({
organizationId: selector.organizationId,
action: 'schemaProposal:describe',
params: selector,
});
this.logger.info(`Subscribed to "schemaProposalComposition" (id=${args.proposalId})`);
return this.pubSub.subscribe('schemaProposalComposition', args.proposalId);
}
async proposeSchema(args: {
target: TargetReferenceInput;
title: string;
@ -39,6 +71,12 @@ export class SchemaProposalManager {
this.session.raise('schemaProposal:modify');
}
await this.session.assertPerformAction({
action: 'schemaProposal:modify',
organizationId: selector.organizationId,
params: selector,
});
const createProposalResult = await this.proposalStorage.createProposal({
organizationId: selector.organizationId,
author: args.author ?? null,
@ -79,7 +117,24 @@ export class SchemaProposalManager {
}
async getProposal(args: { id: string }) {
return this.proposalStorage.getProposal(args);
const proposal = await this.proposalStorage.getProposal(args);
if (proposal) {
const selector = await this.idTranslator.resolveTargetReference({
reference: {
byId: proposal.targetId,
},
});
if (selector === null) {
this.session.raise('schemaProposal:describe');
}
await this.session.assertPerformAction({
action: 'schemaProposal:describe',
organizationId: selector.organizationId,
params: selector,
});
}
return proposal;
}
async getPaginatedReviews(args: {
@ -90,6 +145,23 @@ export class SchemaProposalManager {
authors: string[];
}) {
this.logger.debug('Get paginated reviews (target=%s, after=%s)', args.proposalId, args.after);
const proposal = await this.proposalStorage.getProposalTargetId({ id: args.proposalId });
if (proposal) {
const selector = await this.idTranslator.resolveTargetReference({
reference: {
byId: proposal.targetId,
},
});
if (selector === null) {
this.session.raise('schemaProposal:describe');
}
await this.session.assertPerformAction({
action: 'schemaProposal:describe',
organizationId: selector.organizationId,
params: selector,
});
}
return this.proposalStorage.getPaginatedReviews(args);
}
@ -110,9 +182,15 @@ export class SchemaProposalManager {
reference: args.target,
});
if (selector === null) {
this.session.raise('schemaProposal:modify');
this.session.raise('schemaProposal:describe');
}
await this.session.assertPerformAction({
action: 'schemaProposal:describe',
organizationId: selector.organizationId,
params: selector,
});
return this.proposalStorage.getPaginatedProposals({
targetId: selector.targetId,
after: args.after,
@ -129,8 +207,12 @@ export class SchemaProposalManager {
}) {
this.logger.debug(`Reviewing proposal (proposal=%s, stage=%s)`, args.proposalId, args.stage);
// @todo check permissions for user
const proposal = await this.proposalStorage.getProposal({ id: args.proposalId });
const proposal = await this.proposalStorage.getProposalTargetId({ id: args.proposalId });
if (!proposal) {
throw new HiveError('Proposal target lookup failed.');
}
const user = await this.session.getViewer();
const target = await this.storage.getTargetById(proposal.targetId);
@ -138,6 +220,16 @@ export class SchemaProposalManager {
throw new HiveError('Proposal target lookup failed.');
}
await this.session.assertPerformAction({
action: 'schemaProposal:describe',
organizationId: target.orgId,
params: {
organizationId: target.orgId,
projectId: target.projectId,
targetId: proposal.targetId,
},
});
if (args.stage) {
const review = await this.proposalStorage.manuallyTransitionProposal({
organizationId: target.orgId,

View file

@ -8,6 +8,8 @@ import {
decodeCreatedAtAndUUIDIdBasedCursor,
encodeCreatedAtAndUUIDIdBasedCursor,
} from '@hive/storage';
import { TaskScheduler } from '@hive/workflows/kit';
import { SchemaProposalCompositionTask } from '@hive/workflows/tasks/schema-proposal-composition';
import { SchemaProposalStage } from '../../../__generated__/types';
import { Logger } from '../../shared/providers/logger';
import { PG_POOL_CONFIG } from '../../shared/providers/pg-pool';
@ -38,11 +40,25 @@ export class SchemaProposalStorage {
logger: Logger,
@Inject(PG_POOL_CONFIG) private pool: DatabasePool,
private storage: Storage,
@Inject(SCHEMA_PROPOSALS_ENABLED) private schemaProposalsEnabled: Boolean, // @todo
@Inject(SCHEMA_PROPOSALS_ENABLED) private schemaProposalsEnabled: Boolean,
private taskScheduler: TaskScheduler,
) {
this.logger = logger.child({ source: 'SchemaProposalStorage' });
}
async runBackgroundComposition(input: {
proposalId: string;
targetId: string;
externalComposition: {
enabled: boolean;
endpoint?: string | null;
encryptedSecret?: string | null;
};
native: boolean;
}) {
await this.taskScheduler.scheduleTask(SchemaProposalCompositionTask, input);
}
private async assertSchemaProposalsEnabled(args: {
organizationId: string;
targetId: string;
@ -189,6 +205,29 @@ export class SchemaProposalStorage {
};
}
/**
* A stripped down version of getProposal that only returns the ID. This is intended
* to be used
*/
async getProposalTargetId(args: { id: string }) {
this.logger.debug('Get proposal target ID (proposal=%s)', args.id);
const result = await this.pool
.maybeOne<unknown>(
sql`
SELECT
id
, target_id as "targetId"
FROM
"schema_proposals"
WHERE
id=${args.id}
`,
)
.then(row => SchemaProposalTargetIdModel.safeParse(row));
return result.data ?? null;
}
async getProposal(args: { id: string }) {
this.logger.debug('Get proposal (proposal=%s)', args.id);
const result = await this.pool
@ -202,9 +241,9 @@ export class SchemaProposalStorage {
"sp"."id" = ${args.id}
`,
)
.then(row => SchemaProposalModel.parse(row));
.then(row => SchemaProposalModel.safeParse(row));
return result;
return result.data ?? null;
}
async getPaginatedProposals(args: {
@ -344,7 +383,7 @@ export class SchemaProposalStorage {
}
const schemaProposalFields = sql`
sp."id"
sp."id"
, to_json(sp."created_at") as "createdAt"
, to_json(sp."updated_at") as "updatedAt"
, sp."title"
@ -352,6 +391,9 @@ const schemaProposalFields = sql`
, sp."stage"
, sp."target_id" as "targetId"
, sp."author"
, sp."composition_status" as "compositionStatus"
, to_json(sp."composition_timestamp") as "compositionTimestamp"
, sp."composition_status_reason" as "compositionStatusReason"
`;
const schemaProposalReviewFields = sql`
@ -388,6 +430,17 @@ const SchemaProposalModel = z.object({
stage: StageModel,
targetId: z.string(),
author: z.string(),
compositionStatus: z.enum(['ERROR', 'SUCCESS']).nullable(),
compositionStatusReason: z.string().nullable(),
compositionTimestamp: z.string().nullable(),
});
/**
* Minimal model for extracting just the target Id for permission checks.
*/
const SchemaProposalTargetIdModel = z.object({
id: z.string(),
targetId: z.string(),
});
export type SchemaProposalRecord = z.infer<typeof SchemaProposalModel>;

View file

@ -0,0 +1,13 @@
import type { SubscriptionResolvers } from '../../../../__generated__/types';
import { SchemaProposalManager } from '../../providers/schema-proposal-manager';
export const schemaProposalComposition: NonNullable<
SubscriptionResolvers['schemaProposalComposition']
> = {
subscribe: (_, args, { injector }) =>
injector
.get(SchemaProposalManager)
.subscribeToSchemaProposalCompositions({ proposalId: args.input.proposalId }),
resolve: (payload: { status: 'ERROR' | 'SUCCESS'; timestamp: string; reason?: string | null }) =>
payload,
};

View file

@ -728,6 +728,15 @@ export class SchemaPublisher {
const retention = await this.rateLimit.getRetention({ targetId: target.id });
const expiresAt = retention ? new Date(Date.now() + retention * millisecondsPerDay) : null;
if (input.schemaProposalId) {
await this.schemaProposals.runBackgroundComposition({
externalComposition: project.externalComposition,
native: project.nativeFederation,
proposalId: input.schemaProposalId,
targetId: target.id,
});
}
if (checkResult.conclusion === SchemaCheckConclusion.Failure) {
schemaCheck = await this.storage.createSchemaCheck({
schemaSDL: sdl,

View file

@ -1,8 +1,5 @@
import { InjectionToken } from 'graphql-modules';
import type { PubSub } from 'graphql-yoga';
export type HivePubSub = PubSub<{
oidcIntegrationLogs: [oidcIntegrationId: string, payload: { timestamp: string; message: string }];
}>;
import type { HivePubSub } from '@hive/pubsub';
export const PUB_SUB_CONFIG = new InjectionToken<HivePubSub>('PUB_SUB');
export type { HivePubSub };

View file

@ -413,7 +413,6 @@ export interface Storage {
first: number | null;
cursor: null | string;
}): Promise<PaginatedSchemaVersionConnection>;
// @todo consider moving to proposals provider
getPaginatedSchemaChecksForSchemaProposal<
TransformedSchemaCheck extends SchemaCheck = SchemaCheck,
>(_: {

View file

@ -31,6 +31,7 @@
"@graphql-yoga/redis-event-target": "3.0.3",
"@hive/api": "workspace:*",
"@hive/cdn-script": "workspace:*",
"@hive/pubsub": "workspace:*",
"@hive/schema": "workspace:*",
"@hive/service-common": "workspace:*",
"@hive/storage": "workspace:*",

View file

@ -8,9 +8,7 @@ import {
} from 'supertokens-node/framework/fastify/index.js';
import cors from '@fastify/cors';
import type { FastifyCorsOptionsDelegateCallback } from '@fastify/cors';
import { createRedisEventTarget } from '@graphql-yoga/redis-event-target';
import 'reflect-metadata';
import { createPubSub } from 'graphql-yoga';
import { z } from 'zod';
import formDataPlugin from '@fastify/formbody';
import {
@ -24,7 +22,6 @@ import {
import { AccessTokenKeyContainer } from '@hive/api/modules/auth/lib/supertokens-at-home/crypto';
import { EmailVerification } from '@hive/api/modules/auth/providers/email-verification';
import { OAuthCache } from '@hive/api/modules/auth/providers/oauth-cache';
import { HivePubSub } from '@hive/api/modules/shared/providers/pub-sub';
import { createRedisClient } from '@hive/api/modules/shared/providers/redis';
import { RedisRateLimiter } from '@hive/api/modules/shared/providers/redis-rate-limiter';
import { TargetsByIdCache } from '@hive/api/modules/target/providers/targets-by-id-cache';
@ -34,6 +31,7 @@ import { ArtifactStorageReader } from '@hive/cdn-script/artifact-storage-reader'
import { AwsClient } from '@hive/cdn-script/aws';
import { createIsAppDeploymentActive } from '@hive/cdn-script/is-app-deployment-active';
import { createIsKeyValid } from '@hive/cdn-script/key-validation';
import { createHivePubSub } from '@hive/pubsub';
import {
configureTracing,
createServer,
@ -184,16 +182,14 @@ export async function main() {
const redis = createRedisClient('Redis', env.redis, server.log.child({ source: 'Redis' }));
const pubSub = createPubSub({
eventTarget: createRedisEventTarget({
publishClient: redis,
subscribeClient: createRedisClient(
'subscriber',
env.redis,
server.log.child({ source: 'RedisSubscribe' }),
),
}),
}) as HivePubSub;
const pubSub = createHivePubSub({
publisher: redis,
subscriber: createRedisClient(
'subscriber',
env.redis,
server.log.child({ source: 'RedisSubscribe' }),
),
});
registerShutdown({
logger: server.log,

View file

@ -383,6 +383,9 @@ export interface schema_proposal_reviews {
export interface schema_proposals {
author: string;
comments_count: number;
composition_status: string | null;
composition_status_reason: string | null;
composition_timestamp: Date | null;
created_at: Date;
description: string;
id: string;

View file

@ -5,3 +5,7 @@ POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
EMAIL_FROM=mock@graphql-hive.com
EMAIL_PROVIDER=mock
SCHEMA_ENDPOINT=http://localhost:6500
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=

View file

@ -10,15 +10,23 @@
},
"devDependencies": {
"@graphql-hive/logger": "1.0.9",
"@graphql-inspector/core": "7.1.2",
"@graphql-inspector/patch": "0.1.3",
"@graphql-yoga/redis-event-target": "3.0.3",
"@hive/pubsub": "workspace:*",
"@hive/service-common": "workspace:*",
"@hive/storage": "workspace:*",
"@sentry/node": "7.120.2",
"@trpc/client": "10.45.3",
"@types/mjml": "4.7.1",
"@types/nodemailer": "7.0.4",
"@types/sendmail": "1.4.7",
"bentocache": "1.1.0",
"dotenv": "16.4.7",
"graphile-worker": "0.16.6",
"graphql": "16.9.0",
"graphql-yoga": "5.13.3",
"ioredis": "5.8.2",
"mjml": "4.14.0",
"nodemailer": "7.0.11",
"sendmail": "1.6.1",

View file

@ -1,11 +1,15 @@
import type { DatabasePool } from 'slonik';
import type { Logger } from '@graphql-hive/logger';
import type { HivePubSub } from '@hive/pubsub';
import type { EmailProvider } from './lib/emails/providers.js';
import { RequestBroker } from './lib/webhooks/send-webhook.js';
import type { SchemaProvider } from './lib/schema/provider.js';
import type { RequestBroker } from './lib/webhooks/send-webhook.js';
export type Context = {
logger: Logger;
email: EmailProvider;
schema: SchemaProvider;
pg: DatabasePool;
requestBroker: RequestBroker | null;
pubSub: HivePubSub;
};

View file

@ -26,6 +26,7 @@ const EnvironmentModel = zod.object({
RELEASE: emptyString(zod.string().optional()),
HEARTBEAT_ENDPOINT: emptyString(zod.string().url().optional()),
EMAIL_FROM: zod.string().email(),
SCHEMA_ENDPOINT: zod.string().url(),
});
const SentryModel = zod.union([
@ -82,6 +83,13 @@ const EmailProviderModel = zod.union([
SendmailEmailModel,
]);
const RedisModel = zod.object({
REDIS_HOST: zod.string(),
REDIS_PORT: NumberFromString,
REDIS_PASSWORD: emptyString(zod.string().optional()),
REDIS_TLS_ENABLED: emptyString(zod.union([zod.literal('1'), zod.literal('0')]).optional()),
});
const RequestBrokerModel = zod.union([
zod.object({
REQUEST_BROKER: emptyString(zod.literal('0').optional()),
@ -127,6 +135,7 @@ const configs = {
log: LogModel.safeParse(process.env),
tracing: OpenTelemetryConfigurationModel.safeParse(process.env),
requestBroker: RequestBrokerModel.safeParse(process.env),
redis: RedisModel.safeParse(process.env),
};
const environmentErrors: Array<string> = [];
@ -158,6 +167,7 @@ const prometheus = extractConfig(configs.prometheus);
const log = extractConfig(configs.log);
const tracing = extractConfig(configs.tracing);
const requestBroker = extractConfig(configs.requestBroker);
const redis = extractConfig(configs.redis);
const emailProviderConfig =
email.EMAIL_PROVIDER === 'postmark'
@ -204,6 +214,9 @@ export const env = {
provider: emailProviderConfig,
emailFrom: base.EMAIL_FROM,
},
schema: {
serviceUrl: base.SCHEMA_ENDPOINT,
},
sentry: sentry.SENTRY === '1' ? { dsn: sentry.SENTRY_DSN } : null,
log: {
level: log.LOG_LEVEL ?? 'info',
@ -235,4 +248,10 @@ export const env = {
} satisfies RequestBroker)
: null,
httpHeartbeat: base.HEARTBEAT_ENDPOINT ? { endpoint: base.HEARTBEAT_ENDPOINT } : null,
redis: {
host: redis.REDIS_HOST,
port: redis.REDIS_PORT,
password: redis.REDIS_PASSWORD ?? '',
tlsEnabled: redis.REDIS_TLS_ENABLED === '1',
},
} as const;

View file

@ -1,6 +1,7 @@
import { run } from 'graphile-worker';
import { createPool } from 'slonik';
import { Logger } from '@graphql-hive/logger';
import { bridgeGraphileLogger, createHivePubSub } from '@hive/pubsub';
import {
createServer,
registerShutdown,
@ -12,7 +13,9 @@ import {
import { Context } from './context.js';
import { env } from './environment.js';
import { createEmailProvider } from './lib/emails/providers.js';
import { bridgeFastifyLogger, bridgeGraphileLogger } from './logger.js';
import { schemaProvider } from './lib/schema/provider.js';
import { bridgeFastifyLogger } from './logger.js';
import { createRedisClient } from './redis';
import { createTaskEventEmitter } from './task-events.js';
if (env.sentry) {
@ -39,6 +42,7 @@ const modules = await Promise.all([
import('./tasks/schema-change-notification.js'),
import('./tasks/usage-rate-limit-exceeded.js'),
import('./tasks/usage-rate-limit-warning.js'),
import('./tasks/schema-proposal-composition.js'),
]);
const crontab = `
@ -63,19 +67,35 @@ const stopHttpHeartbeat = env.httpHeartbeat
})
: null;
const context: Context = {
logger,
email: createEmailProvider(env.email.provider, env.email.emailFrom),
pg,
requestBroker: env.requestBroker,
};
const server = await createServer({
sentryErrorHandler: !!env.sentry,
name: 'workflows',
log: logger,
});
const redis = createRedisClient('Redis', env.redis, server.log.child({ source: 'Redis' }));
const pubSub = createHivePubSub({
publisher: redis,
subscriber: createRedisClient(
'subscriber',
env.redis,
server.log.child({ source: 'RedisSubscribe' }),
),
});
const context: Context = {
logger,
email: createEmailProvider(env.email.provider, env.email.emailFrom),
pg,
requestBroker: env.requestBroker,
schema: schemaProvider({
logger,
schemaServiceUrl: env.schema.serviceUrl,
}),
pubSub,
};
server.route({
method: ['GET', 'HEAD'],
url: '/_health',
@ -132,6 +152,8 @@ registerShutdown({
logger.info('Shutdown postgres connection.');
await pg.end();
logger.info('Shutdown postgres connection successful.');
logger.info('Shutdown redis connection.');
redis.disconnect(false);
if (shutdownMetrics) {
logger.info('Stopping prometheus endpoint');
await shutdownMetrics();

View file

@ -4,8 +4,8 @@ import { makeWorkerUtils, WorkerUtils, type JobHelpers, type Task } from 'graphi
import type { Pool } from 'pg';
import { z } from 'zod';
import { Logger } from '@graphql-hive/logger';
import { bridgeGraphileLogger } from '@hive/pubsub';
import type { Context } from './context';
import { bridgeGraphileLogger } from './logger';
export type TaskDefinition<TName extends string, TModel> = {
name: TName;

View file

@ -0,0 +1,394 @@
import { DocumentNode, GraphQLError, parse, print, SourceLocation } from 'graphql';
import { DatabasePool, sql } from 'slonik';
import { z } from 'zod';
import type { Logger } from '@graphql-hive/logger';
import type { Change } from '@graphql-inspector/core';
import { errors, patch } from '@graphql-inspector/patch';
import type { Project, SchemaObject } from '@hive/api';
import type { ComposeAndValidateResult } from '@hive/api/shared/entities';
import type { ContractsInputType, SchemaBuilderApi } from '@hive/schema';
import { decodeCreatedAtAndUUIDIdBasedCursor, HiveSchemaChangeModel } from '@hive/storage';
import { createTRPCProxyClient, httpLink } from '@trpc/client';
type SchemaProviderConfig = {
/** URL of the Schema Service */
schemaServiceUrl: string;
logger: Logger;
};
const SchemaModel = z.object({
id: z.string().uuid(),
sdl: z.string(),
serviceName: z.string().nullable(),
serviceUrl: z.string().nullable(),
type: z.string(),
});
const SchemaProposalChangesModel = z.object({
id: z.string().uuid(),
serviceName: z.string().nullable(),
serviceUrl: z.string().nullable(),
schemaProposalChanges: z.array(HiveSchemaChangeModel).default([]),
createdAt: z.string(),
});
function createExternalConfig(config: Project['externalComposition']) {
// : ExternalCompositionConfig {
if (config && config.enabled) {
if (!config.endpoint) {
throw new Error('External composition error: endpoint is missing');
}
if (!config.encryptedSecret) {
throw new Error('External composition error: encryptedSecret is missing');
}
return {
endpoint: config.endpoint,
encryptedSecret: config.encryptedSecret,
};
}
return null;
}
export class GraphQLDocumentStringInvalidError extends Error {
constructor(message: string, location?: SourceLocation) {
const locationString = location ? ` at line ${location.line}, column ${location.column}` : '';
super(`The provided SDL is not valid${locationString}\n: ${message}`);
}
}
export type CreateSchemaObjectInput = {
sdl: string;
serviceName?: string | null;
serviceUrl?: string | null;
};
export const emptySource = '*';
export function createSchemaObject(schema: CreateSchemaObjectInput): SchemaObject {
let document: DocumentNode;
try {
document = parse(schema.sdl, {
noLocation: true,
});
} catch (err) {
if (err instanceof GraphQLError) {
throw new GraphQLDocumentStringInvalidError(err.message, err.locations?.[0]);
}
throw err;
}
return {
document,
raw: schema.sdl,
source: schema.serviceName ?? emptySource,
url: schema.serviceUrl ?? null,
};
}
export type SchemaProvider = ReturnType<typeof schemaProvider>;
export function convertProjectType(t: string) {
const result = (
{
['FEDERATION']: 'federation',
['SINGLE']: 'single',
['STITCHING']: 'stitching',
} as const
)[t];
if (!result) {
throw new Error('Invalid project type.');
}
return result;
}
export function schemaProvider(providerConfig: SchemaProviderConfig) {
const schemaService = createTRPCProxyClient<SchemaBuilderApi>({
links: [
httpLink({
url: `${providerConfig.schemaServiceUrl}/trpc`,
fetch,
}),
],
});
return {
id: 'schema' as const,
/**
* Compose and validate schemas via the schema service.
* - Requests time out after 30 seconds and result in a human readable error response
* - In case the incoming request is canceled, the call to the schema service is aborted
*/
async composeAndValidate(
projectType: string,
schemas: SchemaObject[],
config: {
/** Whether external composition should be used (only Federation) */
external: Project['externalComposition'];
/** Whether native composition should be used (only Federation) */
native: boolean;
/** Specified contracts (only Federation) */
contracts: ContractsInputType | null;
},
) {
const compositionType = convertProjectType(projectType);
providerConfig.logger.debug(
'Composing and validating schemas (type=%s, method=%s)',
compositionType,
compositionType === 'federation'
? config.native
? 'native'
: config.external.enabled
? 'external'
: 'v1'
: 'none',
);
const timeoutAbortSignal = AbortSignal.timeout(30_000);
const onTimeout = () => {
providerConfig.logger.debug(
'Composition HTTP request aborted due to timeout of 30 seconds.',
);
};
timeoutAbortSignal.addEventListener('abort', onTimeout);
try {
const result = await schemaService.composeAndValidate.mutate(
{
type: compositionType,
schemas: schemas.map(s => ({
raw: s.raw,
source: s.source,
url: s.url ?? null,
})),
external: createExternalConfig(config.external),
native: config.native,
contracts: config.contracts,
},
{
// Limit the maximum time allowed for composition requests to 30 seconds to avoid a dead-lock
signal: timeoutAbortSignal,
},
);
return result;
} catch (err) {
// In case of a timeout error we return something the user can process
if (timeoutAbortSignal.reason) {
return {
contracts: null,
metadataAttributes: null,
schemaMetadata: null,
sdl: null,
supergraph: null,
tags: null,
includesNetworkError: true,
includesException: false,
errors: [
{
message: 'The schema composition timed out. Please try again.',
source: 'composition',
},
],
} satisfies ComposeAndValidateResult;
}
throw err;
} finally {
timeoutAbortSignal.removeEventListener('abort', onTimeout);
}
},
async updateSchemaProposalComposition(args: {
proposalId: string;
timestamp: string;
reason: string | null;
status: 'ERROR' | 'SUCCESS';
pool: DatabasePool;
}) {
const { pool, ...state } = args;
await pool.query<unknown>(
sql`/* updateSchemaProposalComposition */
UPDATE schema_proposals
SET
composition_status = ${state.status}
, composition_timestamp = ${state.timestamp}
, composition_status_reason = ${state.reason}
WHERE id=${state.proposalId}`,
);
},
async latestComposableSchemas(args: { targetId: string; pool: DatabasePool }) {
const latestVersion = await args.pool.maybeOne<{ id: string }>(
sql`/* findLatestComposableSchemaVersion */
SELECT sv.id
FROM schema_versions as sv
WHERE sv.target_id = ${args.targetId} AND sv.is_composable IS TRUE
ORDER BY sv.created_at DESC
LIMIT 1
`,
);
if (!latestVersion) {
return [];
}
const version = latestVersion.id;
const result = await args.pool.query<unknown>(
sql`/* getSchemasOfVersion */
SELECT
sl.id
, sl.sdl
, lower(sl.service_name) as "serviceName"
, sl.service_url as "serviceUrl"
, p.type
FROM schema_version_to_log AS svl
LEFT JOIN schema_log AS sl ON (sl.id = svl.action_id)
LEFT JOIN projects as p ON (p.id = sl.project_id)
WHERE
svl.version_id = ${version}
AND sl.action = 'PUSH'
AND p.type != 'CUSTOM'
ORDER BY
sl.created_at DESC
`,
);
return result.rows.map(row => SchemaModel.parse(row));
},
async getBaseSchema(args: { targetId: string; pool: DatabasePool }) {
const data = await args.pool.maybeOne<Record<string, string>>(
sql`/* getBaseSchema */ SELECT base_schema FROM targets WHERE id=${args.targetId}`,
);
return data?.base_schema ?? null;
},
async proposedSchemas(args: {
targetId: string;
proposalId: string;
cursor?: string | null;
pool: DatabasePool;
}) {
const now = new Date().toISOString();
let cursor: {
createdAt: string;
id: string;
} | null = null;
if (args.cursor) {
cursor = decodeCreatedAtAndUUIDIdBasedCursor(args.cursor);
}
// fetch all latest schemas. Support up to 2_000 subgraphs.
const maxLoops = 100;
const services = await this.latestComposableSchemas({
targetId: args.targetId,
pool: args.pool,
});
let nextCursor = cursor;
// collect changes in paginated requests to avoid stalling the db
let i = 0;
do {
const result = await args.pool.query<unknown>(sql`
SELECT
c."id"
, c."service_name" as "serviceName"
, c."service_url" as "serviceUrl"
, c."schema_proposal_changes" as "schemaProposalChanges"
, to_json(c."created_at") as "createdAt"
FROM
"schema_checks" as c
INNER JOIN (
SELECT COALESCE("service_name", '') as "service", "schema_proposal_id", max("created_at") as "maxdate"
FROM schema_checks
${
cursor
? sql`
WHERE "schema_proposal_id" = ${args.proposalId}
AND (
(
"created_at" = ${cursor.createdAt}
AND "id" < ${cursor.id}
)
OR "created_at" < ${cursor.createdAt}
)
`
: sql``
}
GROUP BY "service", "schema_proposal_id"
) as cc
ON c."schema_proposal_id" = cc."schema_proposal_id"
AND COALESCE(c."service_name", '') = cc."service"
AND c."created_at" = cc."maxdate"
WHERE
c."target_id" = ${args.targetId}
AND c."schema_proposal_id" = ${args.proposalId}
${
cursor
? sql`
AND (
(
c."created_at" = ${cursor.createdAt}
AND c."id" < ${cursor.id}
)
OR c."created_at" < ${cursor.createdAt}
)
`
: sql``
}
ORDER BY
c."created_at" DESC
, c."id" DESC
LIMIT 20
`);
const changes = result.rows.map(row => {
const value = SchemaProposalChangesModel.parse(row);
return {
...value,
schemaProposalChanges: value.schemaProposalChanges.map(c => {
const change: Change<any> = {
...c,
path: c.path ?? undefined,
criticality: {
level: c.criticality,
},
};
return change;
}),
};
});
if (changes.length === 20) {
nextCursor = {
// Keep the created at because we want the same set of checks when joining on the "latest".
createdAt: nextCursor?.createdAt ?? changes[0]?.createdAt ?? now,
id: changes[changes.length - 1].id,
};
} else {
nextCursor = null;
}
for (const change of changes) {
const service = services.find(s => change.serviceName === s.serviceName);
if (service) {
const ast = parse(service.sdl, { noLocation: true });
service.sdl = print(
patch(ast, change.schemaProposalChanges, { onError: errors.looseErrorHandler }),
);
if (change.serviceUrl) {
service.serviceUrl = change.serviceUrl;
}
}
}
} while (nextCursor && ++i < maxLoops);
return services;
},
};
}

View file

@ -1,33 +1,5 @@
import { Logger as GraphileLogger, type LogLevel as GraphileLogLevel } from 'graphile-worker';
import type { Logger } from '@graphql-hive/logger';
import { ServiceLogger } from '@hive/service-common';
function logLevel(level: GraphileLogLevel) {
switch (level) {
case 'warning':
return 'warn' as const;
case 'info': {
return 'info' as const;
}
case 'debug': {
return 'debug' as const;
}
case 'error': {
return 'error' as const;
}
}
return 'info';
}
/**
* Bridges Hive Logger to Graphile Logger
*/
export function bridgeGraphileLogger(logger: Logger) {
return new GraphileLogger(_scope => (level, message, _meta) => {
logger[logLevel(level)](message);
});
}
import type { Logger } from '@hive/pubsub';
import type { ServiceLogger } from '@hive/service-common';
export function bridgeFastifyLogger(logger: Logger): ServiceLogger {
return logger as unknown as ServiceLogger;

View file

@ -0,0 +1,54 @@
/**
* Nearly identical to @hive/api's redis definition.
* This is duplicated in case there are additional requirements for workflows.
**/
import type { Redis as RedisInstance, RedisOptions } from 'ioredis';
import Redis from 'ioredis';
import type { Logger } from '@hive/pubsub';
export type { RedisInstance as Redis };
export type RedisConfig = Required<Pick<RedisOptions, 'host' | 'port' | 'password'>> & {
tlsEnabled: boolean;
};
export function createRedisClient(label: string, config: RedisConfig, logger: Logger) {
const redis = new Redis({
host: config.host,
port: config.port,
password: config.password,
retryStrategy(times) {
return Math.min(times * 500, 2000);
},
reconnectOnError(error) {
logger.warn('Redis reconnectOnError (error=%s)', error);
return 1;
},
db: 0,
maxRetriesPerRequest: null,
enableReadyCheck: false,
tls: config.tlsEnabled ? {} : undefined,
});
redis.on('error', err => {
logger.error('Redis connection error (error=%s,label=%s)', err, label);
});
redis.on('connect', () => {
logger.debug('Redis connection established (label=%s)', label);
});
redis.on('ready', () => {
logger.info('Redis connection ready (label=%s)', label);
});
redis.on('close', () => {
logger.info('Redis connection closed (label=%s)', label);
});
redis.on('reconnecting', (timeToReconnect?: number) => {
logger.info('Redis reconnecting in %s (label=%s)', timeToReconnect, label);
});
return redis;
}

View file

@ -0,0 +1,85 @@
/**
* Runs schema composition as a job and then notifies the server when completed.
*
* There's a delay between when this is added and when it's ran on the off chance
* that more requests come in requesting composition for the proposal
*/
import { z } from 'zod';
import { defineTask, implementTask } from '../kit.js';
import { createSchemaObject } from '../lib/schema/provider';
function extendWithBase(schemas: Array<{ sdl: string }>, baseSchema: string | null) {
if (!baseSchema) {
return schemas;
}
return schemas.map((schema, index) => {
if (index === 0) {
return {
...schema,
sdl: baseSchema + ' ' + schema.sdl,
};
}
return schema;
});
}
export const SchemaProposalCompositionTask = defineTask({
name: 'schemaProposalComposition',
schema: z.object({
proposalId: z.string(),
targetId: z.string(),
externalComposition: z.object({
enabled: z.boolean(),
endpoint: z.string().nullable().optional(),
encryptedSecret: z.string().nullable().optional(),
}),
native: z.boolean(),
}),
});
export const task = implementTask(SchemaProposalCompositionTask, async args => {
const schemas = await args.context.schema.proposedSchemas({
pool: args.context.pg,
proposalId: args.input.proposalId,
targetId: args.input.targetId,
});
const baseSchema = await args.context.schema.getBaseSchema({
pool: args.context.pg,
targetId: args.input.targetId,
});
try {
const result = await args.context.schema.composeAndValidate(
schemas[0].type,
extendWithBase(schemas, baseSchema).map(s => createSchemaObject(s)),
{
/** Whether external composition should be used (only Federation) */
external: args.input.externalComposition,
/** Whether native composition should be used (only Federation) */
native: args.input.native,
/** Proposals currently ignore contracts. */
contracts: null,
},
);
const payload: { timestamp: string; status: 'ERROR' | 'SUCCESS'; reason: string | null } = {
timestamp: new Date().toISOString(),
status: result.errors.length ? 'ERROR' : 'SUCCESS',
reason: result.errors.length ? result.errors.map(e => e.message).join('\n') : null,
};
await args.context.schema.updateSchemaProposalComposition({
...payload,
proposalId: args.input.proposalId,
pool: args.context.pg,
});
args.context.pubSub.publish('schemaProposalComposition', args.input.proposalId, payload);
} catch (e) {
// if the internal error persists, then this will be ran multiple times.
args.logger.error('Something went wrong. %s', (e as Error).message);
throw e;
}
});

View file

@ -25,7 +25,7 @@
"@graphiql/toolkit": "0.9.1",
"@graphql-codegen/client-preset-swc-plugin": "0.2.0",
"@graphql-inspector/core": "7.1.2",
"@graphql-inspector/patch": "0.1.2",
"@graphql-inspector/patch": "0.1.3",
"@graphql-tools/mock": "9.0.25",
"@graphql-typed-document-node/core": "3.2.0",
"@headlessui/react": "2.2.0",

View file

@ -1,6 +1,6 @@
import type { MouseEventHandler } from 'react';
import { useState, type MouseEventHandler } from 'react';
import { cn } from '@/lib/utils';
import { CubeIcon } from '@radix-ui/react-icons';
import { ChevronDownIcon, CubeIcon } from '@radix-ui/react-icons';
export enum ServiceHeadingType {
NEW,
@ -11,26 +11,40 @@ export function ServiceHeading(props: {
serviceName: string;
type?: ServiceHeadingType;
onClick?: MouseEventHandler<HTMLDivElement>;
showToggleIcon?: boolean;
}) {
const [isOpen, setIsOpen] = useState(true);
if (props.serviceName.length === 0) {
return null;
}
const showToggleIcon = props.showToggleIcon ?? props.onClick !== undefined;
return (
<div
className={cn(
'flex flex-row items-center border-b-2 px-4 pb-2 pt-4 text-base font-semibold',
props.onClick !== undefined && 'cursor-pointer',
'text-accent bg-accent_10 mt-2 flex flex-row items-center rounded-sm px-4 py-2 text-base font-semibold',
props.onClick !== undefined && 'cursor-pointer hover:underline',
props.onClick !== undefined && isOpen && 'rounded-b-none',
)}
onClick={props.onClick}
onClick={e => {
props.onClick?.(e);
setIsOpen(!isOpen);
}}
>
<CubeIcon className="mr-2" />
<span>{props.serviceName}</span>
{props.type === ServiceHeadingType.NEW ? (
<span className="ml-2 text-xs text-green-500">*NEW*</span>
) : null}
{props.type === ServiceHeadingType.DELETED ? (
<span className="ml-2 text-xs text-red-500">*DELETED*</span>
) : null}
<div className="flex grow flex-row items-center">
<CubeIcon className="mr-2" />
<span>{props.serviceName}</span>
{props.type === ServiceHeadingType.NEW ? (
<span className="ml-2 text-xs text-green-500">*NEW*</span>
) : null}
{props.type === ServiceHeadingType.DELETED ? (
<span className="ml-2 text-xs text-red-500">*DELETED*</span>
) : null}
</div>
{showToggleIcon && (
<div className="flex">
<ChevronDownIcon className={cn('transition', isOpen && '-rotate-180')} />
</div>
)}
</div>
);
}

View file

@ -57,18 +57,19 @@ const STAGE_TITLES = {
export function StageTransitionSelect(props: {
stage: SchemaProposalStage;
onSelect: (stage: SchemaProposalStage) => void | Promise<void>;
className?: string;
}) {
const [open, setOpen] = useState(false);
return (
<Popover open={open} onOpenChange={setOpen}>
<PopoverTrigger asChild>
<Button
variant="link"
variant="outline"
role="combobox"
className="flex min-w-[200px] max-w-[250px] justify-between truncate"
className={cn('flex min-w-[200px] justify-between truncate', props.className)}
aria-expanded={open}
>
<span className="truncate">{STAGE_TITLES[props.stage]}</span>
<div className="truncate">{STAGE_TITLES[props.stage]}</div>
<ChevronsUpDown className="ml-2 size-4 shrink-0 opacity-50" />
</Button>
</PopoverTrigger>

View file

@ -38,7 +38,7 @@ export function TargetProposalChecksPage(props: {
}) {
const checks = useFragment(ProposalOverview_ChecksFragment, props.checks);
return (
<div className="grid w-full grid-cols-3 content-evenly rounded-lg border-2 sm:grid-cols-5">
<div className="grid w-full grid-cols-3 content-evenly sm:grid-cols-5">
{checks?.edges?.map(({ node }, index) => {
return (
<CheckItem

View file

@ -10,18 +10,15 @@ import {
} from '@/components/target/proposals';
import { SaveProposalProvider } from '@/components/target/proposals/save-proposal-modal';
import { StageTransitionSelect } from '@/components/target/proposals/stage-transition-select';
import {
ProposalQuery_VersionsListFragment,
VersionSelect,
} from '@/components/target/proposals/version-select';
import { CardDescription } from '@/components/ui/card';
import { DiffIcon, EditIcon, GraphQLIcon } from '@/components/ui/icon';
import { CheckIcon, DiffIcon, EditIcon, GraphQLIcon, XIcon } from '@/components/ui/icon';
import { Meta } from '@/components/ui/meta';
import { Subtitle, Title } from '@/components/ui/page';
import { SubPageLayoutHeader } from '@/components/ui/page-content-layout';
import { Skeleton } from '@/components/ui/skeleton';
import { Spinner } from '@/components/ui/spinner';
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip';
import { TimeAgo } from '@/components/v2';
import { FragmentType, graphql, useFragment } from '@/gql';
import { ProjectType } from '@/gql/graphql';
@ -80,9 +77,9 @@ const ProposalQuery = graphql(/* GraphQL */ `
stage
title
description
versions: checks(after: null, input: {}) {
...ProposalQuery_VersionsListFragment
}
compositionStatus
compositionTimestamp
compositionStatusReason
checks(after: $version, input: {}) {
...ProposalOverview_ChecksFragment
}
@ -374,7 +371,6 @@ const ProposalsContent = (props: Parameters<typeof TargetProposalsSinglePage>[0]
services={services ?? []}
reviews={proposal.reviews ?? {}}
checks={proposal.checks ?? null}
versions={proposal.versions ?? null}
isDistributedGraph={isDistributedGraph}
proposal={proposal}
target={query.data.target}
@ -432,10 +428,42 @@ const ProposalsContent = (props: Parameters<typeof TargetProposalsSinglePage>[0]
) : (
proposal && (
<>
<div className="grid grid-cols-2">
<VersionSelect proposalId={props.proposalId} versions={proposal.versions ?? {}} />
<div className="flex justify-end">
<div className="flex flex-col gap-2 sm:flex-row">
{/* <VersionSelect proposalId={props.proposalId} versions={proposal.versions ?? {}} /> */}
<Title className="flex grow flex-row items-center gap-2 truncate">
<div className="truncate">{proposal.title}</div>
<TooltipProvider delayDuration={0}>
<Tooltip>
<TooltipTrigger>
{proposal?.compositionStatus === 'ERROR' ? (
<XIcon className="text-red-600" />
) : null}
{proposal?.compositionStatus === 'SUCCESS' ? (
<CheckIcon className="text-emerald-500" />
) : null}
</TooltipTrigger>
<TooltipContent align="start">
{proposal?.compositionStatus === 'ERROR' ? (
<>
Composition Error{' '}
{proposal.compositionTimestamp ? (
<>
(<TimeAgo date={proposal.compositionTimestamp} />)
</>
) : null}
{proposal.compositionStatusReason
?.split('\n')
.map((e, i) => <div key={i}>- {e}</div>) ?? 'Unknown cause.'}{' '}
</>
) : null}
{proposal?.compositionStatus === 'SUCCESS' ? 'Composes Successfully' : null}
</TooltipContent>
</Tooltip>
</TooltipProvider>
</Title>
<div className="flex-col justify-end">
<StageTransitionSelect
className="w-full sm:w-auto"
stage={proposal.stage}
onSelect={async stage => {
const _review = await reviewSchemaProposal({
@ -452,12 +480,13 @@ const ProposalsContent = (props: Parameters<typeof TargetProposalsSinglePage>[0]
/>
</div>
</div>
<div className="p-4 py-8">
<Title>{proposal.title}</Title>
<div className="text-neutral-10 text-xs">
<div className="mb-6 mt-2">
{proposal.description ? (
<div className="w-full border-l-2 p-4">{proposal.description}</div>
) : null}
<div className="text-neutral-10 mt-4 pr-2 text-right text-xs">
proposed <TimeAgo date={proposal.createdAt} /> by {proposal.author}
</div>
<div className="w-full p-2 pt-4">{proposal.description}</div>
</div>
</>
)
@ -478,7 +507,6 @@ function TabbedContent(props: {
services: ServiceProposalDetails[];
reviews: FragmentType<typeof Proposal_ReviewsFragment>;
checks: FragmentType<typeof ProposalOverview_ChecksFragment> | null;
versions: FragmentType<typeof ProposalQuery_VersionsListFragment> | null;
proposal: FragmentType<typeof Proposals_EditProposalProposalFragment>;
target: FragmentType<typeof Proposals_EditProposalTargetFragment>;
me: FragmentType<typeof Proposals_EditProposalMeFragment> | null;
@ -486,7 +514,7 @@ function TabbedContent(props: {
}) {
return (
<Tabs value={props.page} defaultValue={Tab.DETAILS}>
<TabsList variant="menu" className="w-full">
<TabsList variant="menu" className="border-b-1 w-full">
<TabsTrigger variant="menu" value={Tab.DETAILS} asChild>
<Link
to="/$organizationSlug/$projectSlug/$targetSlug/proposals/$proposalId"

View file

@ -1,4 +1,3 @@
// eslint-disable-next-line import/no-extraneous-dependencies -- test file
import { expect, test } from '@playwright/test';
test.describe('Search User Journeys', () => {

File diff suppressed because it is too large Load diff

View file

@ -74,7 +74,8 @@
],
"@graphql-hive/plugin-opentelemetry/setup": [
"./node_modules/@graphql-hive/plugin-opentelemetry/dist/setup.d.ts"
]
],
"@hive/pubsub": ["./packages/libraries/pubsub/src/index.ts"]
}
},
"include": ["packages", "tsup.config.node.ts"],