diff --git a/README.md b/README.md index e52d393..62a03cb 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,7 @@ Done in 141.94s. ### Support for GraphQL spec -The goal is to support the [June 2018 version of the GraphQL spec](https://facebook.github.io/graphql/June2018/). At this moment, -the only missing feature is support for Subscriptions. +The goal is to support the [June 2018 version of the GraphQL spec](https://facebook.github.io/graphql/June2018/). #### Differences to `graphql-js` @@ -91,10 +90,18 @@ if (!isCompiledQuery(compiledQuery)) { #### Execute the Query ```js -const executionResult = await compiledQuery.query(); +const executionResult = await compiledQuery.query(root, context, variables); console.log(executionResult); ``` +#### Subscribe to the Query + +```js +const result = await compiledQuery.subscribe(root, context, variables); +for await (const value of result) { + console.log(value); +} +``` ## API ### compiledQuery = compileQuery(schema, document, operationName, compilerOptions) @@ -112,14 +119,18 @@ Compiles the `document` AST, using an optional operationName and compiler option for overly expensive serializers - `customJSONSerializer` {boolean, default: false} - Whether to produce also a JSON serializer function using `fast-json-stringify`. The default stringifier function is `JSON.stringify` -#### compiledQuery.compiled(root: any, context: any, variables: Maybe<{ [key: string]: any }>) +#### compiledQuery.query(root: any, context: any, variables: Maybe<{ [key: string]: any }>) + +the compiled function that can be called with a root value, a context and the required variables to produce execution result. + +#### compiledQuery.subscribe(root: any, context: any, variables: Maybe<{ [key: string]: any }>) -the compiled function that can be called with a root value, a context and the required variables. +(available for GraphQL Subscription only) the compiled function that can be called with a root value, a context and the required variables to produce either an AsyncIterator (if successful) or an ExecutionResult (error). #### compiledQuery.stringify(value: any) the compiled function for producing a JSON string. It will be `JSON.stringify` unless `compilerOptions.customJSONSerializer` is true. -The value argument should the return of the compiled GraphQL function. +The value argument should be the return of the compiled GraphQL function. ## LICENSE diff --git a/src/__tests__/subscription.test.ts b/src/__tests__/subscription.test.ts new file mode 100644 index 0000000..65cbd24 --- /dev/null +++ b/src/__tests__/subscription.test.ts @@ -0,0 +1,1217 @@ +/** + * Based on https://github.com/graphql/graphql-js/blob/main/src/subscription/__tests__/subscribe-test.js + * This test suite makes an addition denoted by "*" comments: + * graphql-jit does not support the root resolver pattern that this test uses + * so the part must be rewritten to include that root resolver in `subscribe` of + * the GraphQLObject in the schema. + */ + +import { EventEmitter } from "events"; +import { + DocumentNode, + ExecutionResult, + GraphQLBoolean, + GraphQLError, + GraphQLInt, + GraphQLList, + GraphQLObjectType, + GraphQLSchema, + GraphQLString, + parse, + SubscriptionArgs +} from "graphql"; +import { CompiledQuery, compileQuery, isCompiledQuery } from "../execution"; + +function eventEmitterAsyncIterator( + eventEmitter: EventEmitter, + eventName: string +): AsyncIterableIterator { + const pullQueue = [] as any; + const pushQueue = [] as any; + let listening = true; + eventEmitter.addListener(eventName, pushValue); + + function pushValue(event: any) { + if (pullQueue.length !== 0) { + pullQueue.shift()({ value: event, done: false }); + } else { + pushQueue.push(event); + } + } + + function pullValue() { + return new Promise(resolve => { + if (pushQueue.length !== 0) { + resolve({ value: pushQueue.shift(), done: false }); + } else { + pullQueue.push(resolve); + } + }); + } + + function emptyQueue() { + if (listening) { + listening = false; + eventEmitter.removeListener(eventName, pushValue); + for (const resolve of pullQueue) { + resolve({ value: undefined, done: true }); + } + pullQueue.length = 0; + pushQueue.length = 0; + } + } + + return { + next() { + return listening ? pullValue() : this.return(); + }, + return() { + emptyQueue(); + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error: any) { + emptyQueue(); + return Promise.reject(error); + }, + [Symbol.asyncIterator]() { + return this; + } + } as any; +} + +async function subscribe({ + schema, + document, + operationName, + rootValue, + contextValue, + variableValues +}: SubscriptionArgs): Promise< + AsyncIterableIterator | ExecutionResult +> { + const prepared = compileQuery(schema, document, operationName || ""); + if (!isCompiledQuery(prepared)) return prepared; + return prepared.subscribe!(rootValue, contextValue, variableValues); +} + +const EmailType = new GraphQLObjectType({ + name: "Email", + fields: { + from: { type: GraphQLString }, + subject: { type: GraphQLString }, + message: { type: GraphQLString }, + unread: { type: GraphQLBoolean } + } +}); + +const InboxType = new GraphQLObjectType({ + name: "Inbox", + fields: { + total: { + type: GraphQLInt, + resolve: inbox => inbox.emails.length + }, + unread: { + type: GraphQLInt, + resolve: inbox => inbox.emails.filter((email: any) => email.unread).length + }, + emails: { type: new GraphQLList(EmailType) } + } +}); + +const QueryType = new GraphQLObjectType({ + name: "Query", + fields: { + inbox: { type: InboxType } + } +}); + +const EmailEventType = new GraphQLObjectType({ + name: "EmailEvent", + fields: { + email: { type: EmailType }, + inbox: { type: InboxType } + } +}); + +const emailSchema = emailSchemaWithResolvers(); + +function emailSchemaWithResolvers( + subscribeFn?: (arg: T) => any, + resolveFn?: (arg: T) => any +) { + return new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + importantEmail: { + type: EmailEventType, + resolve: resolveFn, + subscribe: subscribeFn, + args: { + priority: { type: GraphQLInt } + } + } + } + }) + }); +} + +const defaultSubscriptionAST = parse(` + subscription ($priority: Int = 0) { + importantEmail(priority: $priority) { + email { + from + subject + } + inbox { + unread + total + } + } + } +`); + +async function createSubscription( + pubsub: EventEmitter, + schema?: GraphQLSchema, + document: DocumentNode = defaultSubscriptionAST +) { + const data = { + inbox: { + emails: [ + { + from: "joe@graphql.org", + subject: "Hello", + message: "Hello World", + unread: false + } + ] + }, + importantEmail() { + return eventEmitterAsyncIterator(pubsub, "importantEmail"); + } + }; + + if (!schema) { + // * + schema = emailSchemaWithResolvers(() => + eventEmitterAsyncIterator(pubsub, "importantEmail") + ); + } + + function sendImportantEmail(newEmail: any) { + data.inbox.emails.push(newEmail); + // Returns true if the event was consumed by a subscriber. + return pubsub.emit("importantEmail", { + importantEmail: { + email: newEmail, + inbox: data.inbox + } + }); + } + + // `subscribe` returns Promise + return { + sendImportantEmail, + subscription: await subscribe({ schema, document, rootValue: data }) + }; +} + +async function expectPromiseToThrow( + promise: () => Promise, + message: string +) { + try { + await promise(); + throw new Error("promise should have thrown but did not"); + } catch (error) { + expect(error).toBeInstanceOf(Error); + expect(error.message).toEqual(message); + } +} + +// Check all error cases when initializing the subscription. +describe("Subscription Initialization Phase", () => { + it("accepts positional arguments", async () => { + const document = parse(` + subscription { + importantEmail + } + `); + + async function* emptyAsyncIterator() { + // Empty + } + + const ai = await subscribe({ + // * + schema: emailSchemaWithResolvers(emptyAsyncIterator), + document, + rootValue: { + importantEmail: emptyAsyncIterator + } + }); + + // @ts-ignore + ai.next(); + // @ts-ignore + ai.return(); + }); + + it("accepts multiple subscription fields defined in schema", async () => { + const pubsub = new EventEmitter(); + const SubscriptionTypeMultiple = new GraphQLObjectType({ + name: "Subscription", + fields: { + // * + importantEmail: { + type: EmailEventType, + subscribe: () => eventEmitterAsyncIterator(pubsub, "importantEmail") + }, + nonImportantEmail: { + type: EmailEventType, + subscribe: () => + eventEmitterAsyncIterator(pubsub, "nonImportantEmail") + } + } + }); + + const testSchema = new GraphQLSchema({ + query: QueryType, + subscription: SubscriptionTypeMultiple + }); + + const { subscription, sendImportantEmail } = await createSubscription( + pubsub, + testSchema + ); + + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }); + + // @ts-ignore + await subscription.next(); + }); + + it("accepts type definition with sync subscribe function", async () => { + const pubsub = new EventEmitter(); + const schema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + importantEmail: { + type: GraphQLString, + subscribe: () => eventEmitterAsyncIterator(pubsub, "importantEmail") + } + } + }) + }); + + const subscription = await subscribe({ + schema, + document: parse(` + subscription { + importantEmail + } + `) + }); + + pubsub.emit("importantEmail", { + importantEmail: {} + }); + + // @ts-ignore + await subscription.next(); + }); + + it("accepts type definition with async subscribe function", async () => { + const pubsub = new EventEmitter(); + const schema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + importantEmail: { + type: GraphQLString, + subscribe: async () => { + await new Promise(setImmediate); + return eventEmitterAsyncIterator(pubsub, "importantEmail"); + } + } + } + }) + }); + + const subscription = await subscribe({ + schema, + document: parse(` + subscription { + importantEmail + } + `) + }); + + pubsub.emit("importantEmail", { + importantEmail: {} + }); + + // @ts-ignore + await subscription.next(); + }); + + it("should only resolve the first field of invalid multi-field", async () => { + let didResolveImportantEmail = false; + let didResolveNonImportantEmail = false; + + const SubscriptionTypeMultiple = new GraphQLObjectType({ + name: "Subscription", + fields: { + importantEmail: { + type: EmailEventType, + subscribe() { + didResolveImportantEmail = true; + return eventEmitterAsyncIterator(new EventEmitter(), "event"); + } + }, + nonImportantEmail: { + type: EmailEventType, + // istanbul ignore next (Shouldn't be called) + subscribe() { + didResolveNonImportantEmail = true; + return eventEmitterAsyncIterator(new EventEmitter(), "event"); + } + } + } + }); + + const schema = new GraphQLSchema({ + query: QueryType, + subscription: SubscriptionTypeMultiple + }); + + const subscription = await subscribe({ + schema, + document: parse(` + subscription { + importantEmail + nonImportantEmail + } + `) + }); + + // @ts-ignore + subscription.next(); // Ask for a result, but ignore it. + + expect(didResolveImportantEmail).toBe(true); + expect(didResolveNonImportantEmail).toBe(false); + + // Close subscription + // @ts-ignore + subscription.return(); + }); + + it("resolves to an error for unknown subscription field", async () => { + const ast = parse(` + subscription { + unknownField + } + `); + + const pubsub = new EventEmitter(); + + const { subscription } = await createSubscription(pubsub, emailSchema, ast); + + expect(subscription).toEqual({ + errors: [ + { + message: 'The subscription field "unknownField" is not defined.', + locations: [{ line: 3, column: 9 }] + } + ] + }); + }); + + it("throws an error if subscribe does not return an iterator", async () => { + const invalidEmailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + importantEmail: { + type: GraphQLString, + subscribe: () => "test" + } + } + }) + }); + + const pubsub = new EventEmitter(); + + await expectPromiseToThrow( + () => createSubscription(pubsub, invalidEmailSchema), + 'Subscription field must return Async Iterable. Received: "test".' + ); + }); + + it("resolves to an error for subscription resolver errors", async () => { + // Returning an error + const subscriptionReturningErrorSchema = emailSchemaWithResolvers( + () => new Error("test error") + ); + await testReportsError(subscriptionReturningErrorSchema); + + // Throwing an error + const subscriptionThrowingErrorSchema = emailSchemaWithResolvers(() => { + throw new Error("test error"); + }); + await testReportsError(subscriptionThrowingErrorSchema); + + // Resolving to an error + const subscriptionResolvingErrorSchema = emailSchemaWithResolvers(() => + Promise.resolve(new Error("test error")) + ); + await testReportsError(subscriptionResolvingErrorSchema); + + // Rejecting with an error + const subscriptionRejectingErrorSchema = emailSchemaWithResolvers(() => + Promise.reject(new Error("test error")) + ); + await testReportsError(subscriptionRejectingErrorSchema); + + async function testReportsError(schema: GraphQLSchema) { + const result = await subscribe({ + schema, + document: parse(` + subscription { + importantEmail + } + `) + }); + + expect(result).toEqual({ + errors: [ + { + message: "test error", + locations: [{ line: 3, column: 13 }], + path: ["importantEmail"] + } + ] + }); + } + }); + + it("resolves to an error for source event stream resolver errors", async () => { + // Returning an error + const subscriptionReturningErrorSchema = emailSchemaWithResolvers( + () => new Error("test error") + ); + await testReportsError(subscriptionReturningErrorSchema); + + // Throwing an error + const subscriptionThrowingErrorSchema = emailSchemaWithResolvers(() => { + throw new Error("test error"); + }); + await testReportsError(subscriptionThrowingErrorSchema); + + // Resolving to an error + const subscriptionResolvingErrorSchema = emailSchemaWithResolvers(() => + Promise.resolve(new Error("test error")) + ); + await testReportsError(subscriptionResolvingErrorSchema); + + // Rejecting with an error + const subscriptionRejectingErrorSchema = emailSchemaWithResolvers(() => + Promise.reject(new Error("test error")) + ); + await testReportsError(subscriptionRejectingErrorSchema); + + async function testReportsError(schema: GraphQLSchema) { + // Promise | ExecutionResult> + const result = await subscribe({ + schema, + document: parse(` + subscription { + importantEmail + } + `) + }); + + expect(result).toEqual({ + errors: [ + { + message: "test error", + locations: [{ column: 13, line: 3 }], + path: ["importantEmail"] + } + ] + }); + } + }); + + it("resolves to an error if variables were wrong type", async () => { + // If we receive variables that cannot be coerced correctly, subscribe() + // will resolve to an ExecutionResult that contains an informative error + // description. + const ast = parse(` + subscription ($priority: Int) { + importantEmail(priority: $priority) { + email { + from + subject + } + inbox { + unread + total + } + } + } + `); + + const result = await subscribe({ + schema: emailSchema, + document: ast, + variableValues: { priority: "meow" } + }); + + expect(result).toEqual({ + errors: [ + { + // Different + message: + 'Variable "$priority" got invalid value "meow"; Expected type Int; Int cannot represent non-integer value: "meow"', + locations: [{ line: 2, column: 21 }] + } + ] + }); + }); +}); + +// Once a subscription returns a valid AsyncIterator, it can still yield +// errors. +describe("Subscription Publish Phase", () => { + it("produces a payload for multiple subscribe in same subscription", async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = await createSubscription( + pubsub + ); + const second = await createSubscription(pubsub); + + // @ts-ignore + const payload1 = subscription.next(); + // @ts-ignore + const payload2 = second.subscription.next(); + + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + const expectedPayload = { + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }; + + expect(await payload1).toEqual(expectedPayload); + expect(await payload2).toEqual(expectedPayload); + }); + + it("produces a payload per subscription event", async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = await createSubscription( + pubsub + ); + + // Wait for the next subscription payload. + // @ts-ignore + const payload = subscription.next(); + + // A new email arrives! + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + // The previously waited on payload now has a value. + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }); + + // Another new email arrives, before subscription.next() is called. + expect( + sendImportantEmail({ + from: "hyo@graphql.org", + subject: "Tools", + message: "I <3 making things", + unread: true + }) + ).toBe(true); + + // The next waited on payload will have a value. + // @ts-ignore + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "hyo@graphql.org", + subject: "Tools" + }, + inbox: { + unread: 2, + total: 3 + } + } + } + } + }); + + // The client decides to disconnect. + // @ts-ignore + expect(await subscription.return()).toEqual({ + done: true, + value: undefined + }); + + // Which may result in disconnecting upstream services as well. + expect( + sendImportantEmail({ + from: "adam@graphql.org", + subject: "Important", + message: "Read me please", + unread: true + }) + ).toBe(false); // No more listeners. + + // Awaiting a subscription after closing it results in completed results. + // @ts-ignore + expect(await subscription.next()).toEqual({ + done: true, + value: undefined + }); + }); + + it("produces a payload when there are multiple events", async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = await createSubscription( + pubsub + ); + // @ts-ignore + let payload = subscription.next(); + + // A new email arrives! + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }); + + // @ts-ignore + payload = subscription.next(); + + // A new email arrives! + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Alright 2", + message: "Tests are good 2", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright 2" + }, + inbox: { + unread: 2, + total: 3 + } + } + } + } + }); + }); + + it("should not trigger when subscription is already done", async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = await createSubscription( + pubsub + ); + // @ts-ignore + let payload = subscription.next(); + + // A new email arrives! + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }); + + // @ts-ignore + payload = subscription.next(); + // @ts-ignore + subscription.return(); + + // A new email arrives! + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Alright 2", + message: "Tests are good 2", + unread: true + }) + ).toBe(false); + + expect(await payload).toEqual({ + done: true, + value: undefined + }); + }); + + it("should not trigger when subscription is thrown", async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = await createSubscription( + pubsub + ); + // @ts-ignore + let payload = subscription.next(); + + // A new email arrives! + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }); + + // @ts-ignore + payload = subscription.next(); + + // Throw error + let caughtError; + try { + // @ts-ignore + await subscription.throw("ouch"); + } catch (e) { + caughtError = e; + } + expect(caughtError).toBe("ouch"); + + // A new email arrives! + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Alright 2", + message: "Tests are good 2", + unread: true + }) + ).toBe(false); + + expect(await payload).toEqual({ + done: true, + value: undefined + }); + }); + + it("event order is correct for multiple publishes", async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = await createSubscription( + pubsub + ); + // @ts-ignore + let payload = subscription.next(); + + // A new email arrives! + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Message", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + // A new email arrives! + expect( + sendImportantEmail({ + from: "yuzhi@graphql.org", + subject: "Message 2", + message: "Tests are good 2", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Message" + }, + inbox: { + unread: 2, + total: 3 + } + } + } + } + }); + + // @ts-ignore + payload = subscription.next(); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Message 2" + }, + inbox: { + unread: 2, + total: 3 + } + } + } + } + }); + }); + + it("should handle error during execution of source event", async () => { + const erroringEmailSchema = emailSchemaWithResolvers( + async function*() { + yield { email: { subject: "Hello" } }; + yield { email: { subject: "Goodbye" } }; + yield { email: { subject: "Bonjour" } }; + }, + event => { + if ((event as any).email.subject === "Goodbye") { + throw new Error("Never leave."); + } + return event; + } + ); + + const subscription = await subscribe({ + schema: erroringEmailSchema, + document: parse(` + subscription { + importantEmail { + email { + subject + } + } + } + `) + }); + + // @ts-ignore + const payload1 = await subscription.next(); + expect(payload1).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + subject: "Hello" + } + } + } + } + }); + + // An error in execution is presented as such. + // @ts-ignore + const payload2 = await subscription.next(); + expect(payload2).toEqual({ + done: false, + value: { + errors: [ + { + message: "Never leave.", + locations: [{ line: 3, column: 11 }], + path: ["importantEmail"] + } + ], + data: { + importantEmail: null + } + } + }); + + // However that does not close the response event stream. Subsequent + // events are still executed. + // @ts-ignore + const payload3 = await subscription.next(); + expect(payload3).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + subject: "Bonjour" + } + } + } + } + }); + }); + + it("should pass through error thrown in source event stream", async () => { + const erroringEmailSchema = emailSchemaWithResolvers( + async function*() { + yield { email: { subject: "Hello" } }; + throw new Error("test error"); + }, + email => email + ); + + const subscription = await subscribe({ + schema: erroringEmailSchema, + document: parse(` + subscription { + importantEmail { + email { + subject + } + } + } + `) + }); + + // @ts-ignore + const payload1 = await subscription.next(); + expect(payload1).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + subject: "Hello" + } + } + } + } + }); + + let expectedError; + try { + // @ts-ignore + await subscription.next(); + } catch (error) { + expectedError = error; + } + + expect(expectedError instanceof Error).toBe(true); + expect("message" in expectedError).toBe(true); + + // @ts-ignore + const payload2 = await subscription.next(); + expect(payload2).toEqual({ + done: true, + value: undefined + }); + }); + + it("should resolve GraphQL error from source event stream", async () => { + const erroringEmailSchema = emailSchemaWithResolvers( + async function*() { + yield { email: { subject: "Hello" } }; + throw new GraphQLError("test error"); + }, + email => email + ); + + const subscription = await subscribe({ + schema: erroringEmailSchema, + document: parse(` + subscription { + importantEmail { + email { + subject + } + } + } + `) + }); + + // @ts-ignore + const payload1 = await subscription.next(); + expect(payload1).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + subject: "Hello" + } + } + } + } + }); + + // @ts-ignore + const payload2 = await subscription.next(); + expect(payload2).toEqual({ + done: false, + value: { + errors: [ + { + message: "test error" + } + ] + } + }); + + // @ts-ignore + const payload3 = await subscription.next(); + expect(payload3).toEqual({ + done: true, + value: undefined + }); + }); +}); + +describe("dx", () => { + test("function name of the bound query", async () => { + const schema = new GraphQLSchema({ + subscription: new GraphQLObjectType({ + name: "Type", + fields: { + a: { type: GraphQLString } + } + }) + }); + const document = parse(`subscription mockOperationName { a }`); + const compiledQuery = compileQuery(schema, document) as CompiledQuery; + expect(isCompiledQuery(compiledQuery)).toBe(true); + expect(compiledQuery.subscribe!.name).toBe("mockOperationName"); + }); + test("sets subscribe property only if operation is a subscription", () => { + const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: "TypeZ", + fields: { + a: { type: GraphQLString } + } + }) + }); + const document = parse(`query mockOperationName { a }`); + const compiledQuery = compileQuery(schema, document) as CompiledQuery; + expect(compiledQuery).not.toHaveProperty("subscribe"); + }); +}); diff --git a/src/execution.ts b/src/execution.ts index 29c5ce5..f09cd6a 100644 --- a/src/execution.ts +++ b/src/execution.ts @@ -27,10 +27,13 @@ import { isObjectType, isSpecifiedScalarType, Kind, + locatedError, TypeNameMetaFieldDef } from "graphql"; import { ExecutionContext as GraphQLContext } from "graphql/execution/execute"; +import { pathToArray } from "graphql/jsutils/Path"; import { FieldNode, OperationDefinitionNode } from "graphql/language/ast"; +import mapAsyncIterator from "graphql/subscription/mapAsyncIterator"; import { GraphQLTypeResolver } from "graphql/type/definition"; import { addPath, @@ -167,6 +170,11 @@ export interface CompiledQuery { context: any, variables: Maybe<{ [key: string]: any }> ) => Promise | ExecutionResult; + subscribe?: ( + root: any, + context: any, + variables: Maybe<{ [key: string]: any }> + ) => Promise | ExecutionResult>; stringify: (v: any) => string; } @@ -232,7 +240,17 @@ export function compileQuery( context.operation.variableDefinitions || [] ); - const functionBody = compileOperation(context); + const type = getOperationRootType(context.schema, context.operation); + const fieldMap = collectFields( + context, + type, + context.operation.selectionSet, + Object.create(null), + Object.create(null) + ); + + const functionBody = compileOperation(context, type, fieldMap); + const compiledQuery: InternalCompiledQuery = { query: createBoundQuery( context, @@ -245,6 +263,24 @@ export function compileQuery( ), stringify }; + + if (context.operation.operation === "subscription") { + compiledQuery.subscribe = createBoundSubscribe( + context, + document, + compileSubscriptionOperation( + context, + type, + fieldMap, + compiledQuery.query + ), + getVariables, + context.operation.name != null + ? context.operation.name.value + : undefined + ); + } + if ((options as any).debug) { // result of the compilation useful for debugging issues // and visualization tools like try-jit. @@ -369,16 +405,12 @@ function postProcessResult({ * @param {CompilationContext} context compilation context with the execution context * @returns {string} a function body to be instantiated together with the header, footer */ -function compileOperation(context: CompilationContext) { - const type = getOperationRootType(context.schema, context.operation); +function compileOperation( + context: CompilationContext, + type: GraphQLObjectType, + fieldMap: FieldsAndNodes +) { const serialExecution = context.operation.operation === "mutation"; - const fieldMap = collectFields( - context, - type, - context.operation.selectionSet, - Object.create(null), - Object.create(null) - ); const topLevel = compileObjectType( context, type, @@ -1637,3 +1669,194 @@ function getParentArgIndexes(context: CompilationContext) { function getJsFieldName(fieldName: string) { return `${LOCAL_JS_FIELD_NAME_PREFIX}${fieldName}`; } + +/** + * Subscription + * Implements the "CreateSourceEventStream" algorithm described in the + * GraphQL specification, resolving the subscription source event stream. + * + * Returns a Promise which resolves to either an AsyncIterable (if successful) + * or an ExecutionResult (error). The promise will be rejected if the schema or + * other arguments to this function are invalid, or if the resolved event stream + * is not an async iterable. + * + * If the client-provided arguments to this function do not result in a + * compliant subscription, a GraphQL Response (ExecutionResult) with + * descriptive errors and no data will be returned. + * + * If the the source stream could not be created due to faulty subscription + * resolver logic or underlying systems, the promise will resolve to a single + * ExecutionResult containing `errors` and no `data`. + * + * If the operation succeeded, the promise resolves to the AsyncIterable for the + * event stream returned by the resolver. + * + * A Source Event Stream represents a sequence of events, each of which triggers + * a GraphQL execution for that event. + * + * This may be useful when hosting the stateful subscription service in a + * different process or machine than the stateless GraphQL execution engine, + * or otherwise separating these two steps. For more on this, see the + * "Supporting Subscriptions at Scale" information in the GraphQL specification. + * + * Since createSourceEventStream only builds execution context and reports errors + * in doing so, which we did, we simply call directly the later called + * executeSubscription. + */ + +function isAsyncIterable( + val: unknown +): val is AsyncIterableIterator { + return typeof Object(val)[Symbol.asyncIterator] === "function"; +} + +function compileSubscriptionOperation( + context: CompilationContext, + type: GraphQLObjectType, + fieldMap: FieldsAndNodes, + queryFn: CompiledQuery["query"] +) { + function reportGraphQLError(error: any): ExecutionResult { + if (error instanceof GraphQLError) { + return { + errors: [error] + }; + } + throw error; + } + + const fieldNodes = Object.values(fieldMap)[0]; + const fieldNode = fieldNodes[0]; + const fieldName = fieldNode.name.value; + + const field = resolveFieldDef(context, type, fieldNodes); + + if (!field) { + throw new GraphQLError( + `The subscription field "${fieldName}" is not defined.`, + fieldNodes + ); + } + + const responsePath = addPath(undefined, fieldName); + const resolveInfoName = createResolveInfoName(responsePath); + + // Call the `subscribe()` resolver or the default resolver to produce an + // AsyncIterable yielding raw payloads. + const subscriber = field.subscribe; + + return async function subscribe(executionContext: ExecutionContext) { + const resolveInfo = executionContext.resolveInfos[resolveInfoName]( + executionContext.rootValue, + executionContext.variables, + responsePath + ); + + let resultOrStream: AsyncIterableIterator; + + try { + try { + resultOrStream = + subscriber && + (await subscriber( + executionContext.rootValue, + executionContext.variables, + executionContext.context, + resolveInfo + )); + if (resultOrStream instanceof Error) { + throw resultOrStream; + } + } catch (error) { + throw locatedError( + error, + resolveInfo.fieldNodes, + pathToArray(resolveInfo.path) + ); + } + if (!isAsyncIterable(resultOrStream)) { + throw new Error( + "Subscription field must return Async Iterable. " + + `Received: ${inspect(resultOrStream)}.` + ); + } + } catch (e) { + return reportGraphQLError(e); + } + + // For each payload yielded from a subscription, map it over the normal + // GraphQL `execute` function, with `payload` as the rootValue. + // This implements the "MapSourceToResponseEvent" algorithm described in + // the GraphQL specification. The `execute` function provides the + // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the + // "ExecuteQuery" algorithm, for which `execute` is also used. + // We use our `query` function in place of `execute` + const mapSourceToResponse = (payload: any) => + queryFn(payload, executionContext.context, executionContext.variables); + + return mapAsyncIterator( + resultOrStream, + mapSourceToResponse, + reportGraphQLError + ); + }; +} + +function createBoundSubscribe( + compilationContext: CompilationContext, + document: DocumentNode, + func: ( + context: ExecutionContext + ) => Promise | ExecutionResult>, + getVariableValues: (inputs: { [key: string]: any }) => CoercedVariableValues, + operationName: string | undefined +): CompiledQuery["subscribe"] { + const { + resolvers, + typeResolvers, + isTypeOfs, + serializers, + resolveInfos + } = compilationContext; + const trimmer = createNullTrimmer(compilationContext); + const fnName = operationName ? operationName : "subscribe"; + + const ret = { + async [fnName]( + rootValue: any, + context: any, + variables: Maybe<{ [key: string]: any }> + ): Promise | ExecutionResult> { + // this can be shared across in a batch request + const parsedVariables = getVariableValues(variables || {}); + + // Return early errors if variable coercing failed. + if (failToParseVariables(parsedVariables)) { + return { errors: parsedVariables.errors }; + } + + const executionContext: ExecutionContext = { + rootValue, + context, + variables: parsedVariables.coerced, + safeMap, + inspect, + GraphQLError: GraphqlJitError, + resolvers, + typeResolvers, + isTypeOfs, + serializers, + resolveInfos, + trimmer, + promiseCounter: 0, + nullErrors: [], + errors: [], + data: {} + }; + + return func.call(null, executionContext); + } + }; + + return ret[fnName]; +}