Skip to content

Commit

Permalink
subscriptions support in the query planner (#2389)
Browse files Browse the repository at this point in the history
* Subscriptions support in the query planner
  • Loading branch information
clenfest committed Mar 9, 2023
1 parent 1782766 commit a9385bd
Show file tree
Hide file tree
Showing 13 changed files with 491 additions and 50 deletions.
7 changes: 7 additions & 0 deletions .changeset/forty-hairs-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@apollo/composition": minor
"@apollo/query-planner": minor
---

Addition of new query planner node types to enable federated subscriptions support

11 changes: 0 additions & 11 deletions .changeset/many-rats-allow.md

This file was deleted.

135 changes: 135 additions & 0 deletions composition-js/src/__tests__/compose.subscription.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { FEDERATION2_LINK_WITH_FULL_IMPORTS, FieldDefinition } from '@apollo/federation-internals';
import gql from 'graphql-tag';
import { composeServices } from '../compose';

describe('subscription composition tests', () => {
it('type subscription appears in the supergraph', () => {
const subgraphA = {
name: 'subgraphA',
typeDefs: gql`
extend schema
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
type Query {
me: User!
}
type Subscription {
onNewUser: User!
}
type User {
id: ID!
name: String!
}
`,
};

const subgraphB = {
name: 'subgraphB',
typeDefs:gql`
extend schema
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
type Query {
foo: Int
}
type Subscription {
bar: Int
}
`,
};

const { errors, schema } = composeServices([subgraphA, subgraphB]);
expect(errors).toBeUndefined();
expect(schema).toBeDefined();
const onNewUser = schema?.elementByCoordinate('Subscription.onNewUser') as FieldDefinition<any>;
expect(onNewUser.appliedDirectives?.[0].toString()).toBe('@join__field(graph: SUBGRAPHA)');
});

it.each([
{ directive: '@shareable', errorMsg: 'Fields on root level subscription object cannot be marked as shareable'},
])('directives that are incompatible with subscriptions wont compose', ({ directive, errorMsg }) => {
const subgraphA = {
name: 'subgraphA',
typeDefs: gql`
extend schema
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
type Query {
me: User!
}
type Subscription {
onNewUser: User! ${directive}
}
type User {
id: ID!
name: String!
}
`,
};

const subgraphB = {
name: 'subgraphB',
typeDefs:gql`
extend schema
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
type Query {
foo: Int
}
type Subscription {
bar: Int
}
`,
};

const { errors, schema } = composeServices([subgraphA, subgraphB]);
expect(errors?.length).toBe(1);
expect(errors?.[0].message).toBe(errorMsg);
expect(schema).toBeUndefined();
});

it('subscription name collisions across subgraphs should not compose', () => {
const subgraphA = {
name: 'subgraphA',
typeDefs: gql`
extend schema
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
type Query {
me: User!
}
type Subscription {
onNewUser: User
foo: Int!
}
type User {
id: ID!
name: String!
}
`,
};

const subgraphB = {
name: 'subgraphB',
typeDefs:gql`
extend schema
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
type Query {
foo: Int
}
type Subscription {
foo: Int!
}
`,
};

const { errors, schema } = composeServices([subgraphA, subgraphB]);
expect(errors?.length).toBe(1);
expect(errors?.[0].message).toBe('Non-shareable field "Subscription.foo" is resolved from multiple subgraphs: it is resolved from subgraphs "subgraphA" and "subgraphB" and defined as non-shareable in all of them');
expect(schema).toBeUndefined();
});
});
41 changes: 37 additions & 4 deletions composition-js/src/merging/merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ class Merger {
private mergeObject(sources: (ObjectType | undefined)[], dest: ObjectType) {
const isEntity = this.hintOnInconsistentEntity(sources, dest);
const isValueType = !isEntity && !dest.isRootType();
const isSubscription = dest.isSubscriptionRootType();

this.addFieldsShallow(sources, dest);
if (!dest.hasFields()) {
Expand All @@ -806,7 +807,15 @@ class Merger {
const subgraphFields = sources.map(t => t?.field(destField.name));
const mergeContext = this.validateOverride(subgraphFields, destField);

this.mergeField(subgraphFields, destField, mergeContext);
if (isSubscription) {
this.validateSubscriptionField(subgraphFields);
}

this.mergeField({
sources: subgraphFields,
dest: destField,
mergeContext,
});
this.validateFieldSharing(subgraphFields, destField, mergeContext);
}
}
Expand Down Expand Up @@ -1085,7 +1094,7 @@ class Merger {
const { subgraphsWithOverride, subgraphMap } = sources.map((source, idx) => {
if (!source) {
// While the subgraph may not have the field directly, it could have "stand-in" for that field
// through @interfaceObject, and it is those stand-ins that would be effectively overridden.
// through @interfaceObject, and it is those stand-ins that would be effectively overridden.
const interfaceObjectAbstractingFields = this.fieldsInSourceIfAbstractedByInterfaceObject(dest, idx);
if (interfaceObjectAbstractingFields.length > 0) {
return {
Expand Down Expand Up @@ -1251,7 +1260,15 @@ class Merger {
}).filter(isDefined);
}

private mergeField(sources: FieldOrUndefinedArray, dest: FieldDefinition<any>, mergeContext: FieldMergeContext = new FieldMergeContext(sources)) {
private mergeField({
sources,
dest,
mergeContext = new FieldMergeContext(sources),
}: {
sources: FieldOrUndefinedArray,
dest: FieldDefinition<any>,
mergeContext: FieldMergeContext,
}) {
if (sources.every((s, i) => s === undefined ? this.fieldsInSourceIfAbstractedByInterfaceObject(dest, i).every((f) => this.isExternal(i, f)) : this.isExternal(i, s))) {
const definingSubgraphs = sources.map((source, i) => {
if (source) {
Expand Down Expand Up @@ -1790,7 +1807,11 @@ class Merger {
}
const subgraphFields = sources.map(t => t?.field(destField.name));
const mergeContext = this.validateOverride(subgraphFields, destField);
this.mergeField(subgraphFields, destField, mergeContext);
this.mergeField({
sources: subgraphFields,
dest: destField,
mergeContext,
});
}
}

Expand Down Expand Up @@ -2805,4 +2826,16 @@ class Merger {
: err;
});
}

private validateSubscriptionField(sources: FieldOrUndefinedArray) {
// no subgraph marks field as @shareable
const fieldsWithShareable = sources.filter((src, idx) => src && src.appliedDirectivesOf(this.metadata(idx).shareableDirective()).length > 0);
if (fieldsWithShareable.length > 0) {
const nodes = sourceASTs(...fieldsWithShareable);
this.errors.push(ERRORS.INVALID_FIELD_SHARING.err(
`Fields on root level subscription object cannot be marked as shareable`,
{ nodes},
));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { QueryPlan, PlanNode } from '@apollo/query-planner';
import { QueryPlan, PlanNode, SubscriptionNode } from '@apollo/query-planner';
import { astSerializer, queryPlanSerializer } from '../snapshotSerializers';
import prettyFormat from 'pretty-format';

Expand Down Expand Up @@ -41,9 +41,9 @@ function toCallService(
let pass = false;
// let initialServiceCall = null;
// recurse the node, find first match of service name, return
function walkExecutionNode(node?: PlanNode) {
function walkExecutionNode(node?: PlanNode | SubscriptionNode) {
if (!node) return;
if (node.kind === 'Fetch' && node.serviceName === service) {
if ((node.kind === 'Fetch') && node.serviceName === service) {
pass = true;
// initialServiceCall = node;
return;
Expand All @@ -56,6 +56,10 @@ function toCallService(
case 'Sequence':
node.nodes.forEach(walkExecutionNode);
break;
case 'Subscription':
walkExecutionNode(node.primary);
walkExecutionNode(node.rest);
break;
default:
return;
}
Expand Down
4 changes: 4 additions & 0 deletions gateway-js/src/executeQueryPlan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ export async function executeQueryPlan(
requestContext.metrics && requestContext.metrics.captureTraces
);

if (queryPlan.node?.kind === 'Subscription') {
throw new Error('Execution of subscriptions not supported by gateway');
}

if (queryPlan.node) {
const traceNode = await executeNode(
context,
Expand Down
8 changes: 8 additions & 0 deletions internals-js/src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,14 @@ export class ObjectType extends FieldBasedType<ObjectType, ObjectTypeReferencer>
return schema.schemaDefinition.root('query')?.type === this;
}

/**
* Whether this type is the "subscription" root type of the schema (will return false if the type is detached).
*/
isSubscriptionRootType(): boolean {
const schema = this.schema();
return schema.schemaDefinition.root('subscription')?.type === this;
}

protected removeReferenceRecursive(ref: ObjectTypeReferencer): void {
// Note that the ref can also be a`SchemaDefinition`, but don't have anything to do then.
switch (ref.kind) {
Expand Down
12 changes: 11 additions & 1 deletion query-planner-js/src/QueryPlan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export type ResponsePath = (string | number)[];

export interface QueryPlan {
kind: 'QueryPlan';
node?: PlanNode;
node?: PlanNode | SubscriptionNode;
}

export type PlanNode = SequenceNode | ParallelNode | FetchNode | FlattenNode | DeferNode | ConditionNode;
Expand All @@ -26,6 +26,12 @@ export interface ParallelNode {
nodes: PlanNode[];
}

export interface SubscriptionNode {
kind: 'Subscription';
primary: FetchNode;
rest?: PlanNode;
}

export interface FetchNode {
kind: 'Fetch';
serviceName: string;
Expand Down Expand Up @@ -229,3 +235,7 @@ export const trimSelectionNodes = (

return remapped;
};

export const isPlanNode = (node: PlanNode | SubscriptionNode | undefined): node is PlanNode => {
return !!node && node.kind !== 'Subscription';
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { operationFromDocument } from '@apollo/federation-internals';
import { assert, operationFromDocument } from '@apollo/federation-internals';
import gql from 'graphql-tag';
import { isPlanNode } from '../QueryPlan';
import { composeAndCreatePlanner, findFetchNodes } from "./testHelper";

describe('basic @key on interface/@interfaceObject handling', () => {
Expand Down Expand Up @@ -306,6 +307,7 @@ describe('basic @key on interface/@interfaceObject handling', () => {
}
`);

assert(isPlanNode(plan.node), 'buildQueryPlan should return QueryPlan');
const rewrites = findFetchNodes('S2', plan.node)[0].inputRewrites;
expect(rewrites).toBeDefined();
expect(rewrites?.length).toBe(1);
Expand Down

0 comments on commit a9385bd

Please sign in to comment.