Skip to content

Commit

Permalink
consolidate SubscriptionExecutor into Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jun 17, 2021
1 parent e82191a commit 495535f
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 208 deletions.
11 changes: 9 additions & 2 deletions src/execution/execute.ts
Expand Up @@ -60,6 +60,7 @@ export interface ExecutionContext {
variableValues: { [variable: string]: unknown };
fieldResolver: GraphQLFieldResolver<any, any>;
typeResolver: GraphQLTypeResolver<any, any>;
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
errors: Array<GraphQLError>;
}

Expand Down Expand Up @@ -128,7 +129,7 @@ export function execute(args: ExecutionArgs): PromiseOrValue<ExecutionResult> {
// field and its descendants will be omitted, and sibling fields will still
// be executed. An execution which encounters errors will still result in a
// resolved Promise.
return executor.execute();
return executor.executeQueryOrMutation();
}

/**
Expand Down Expand Up @@ -170,6 +171,10 @@ export function assertValidExecutionArguments(
);
}

export interface BuildExecutionContextArgs extends ExecutionArgs {
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
}

/**
* Constructs a ExecutionContext object from the arguments passed to
* execute, which we will pass throughout the other execution methods.
Expand All @@ -179,7 +184,7 @@ export function assertValidExecutionArguments(
* @internal
*/
export function buildExecutionContext(
args: ExecutionArgs,
args: BuildExecutionContextArgs,
): ReadonlyArray<GraphQLError> | ExecutionContext {
const {
schema,
Expand All @@ -190,6 +195,7 @@ export function buildExecutionContext(
operationName,
fieldResolver,
typeResolver,
subscribeFieldResolver,
} = args;

// If arguments are missing or incorrect, throw an error.
Expand Down Expand Up @@ -249,6 +255,7 @@ export function buildExecutionContext(
variableValues: coercedVariableValues.coerced,
fieldResolver: fieldResolver ?? defaultFieldResolver,
typeResolver: typeResolver ?? defaultTypeResolver,
subscribeFieldResolver,
errors: [],
};
}
Expand Down
183 changes: 170 additions & 13 deletions src/execution/executor.ts
Expand Up @@ -47,9 +47,12 @@ import {

import { getOperationRootType } from '../utilities/getOperationRootType';

import { isAsyncIterable } from '../jsutils/isAsyncIterable';

import type { ExecutionContext, ExecutionResult } from './execute';
import { getArgumentValues } from './values';
import { collectFields } from './collectFields';
import { mapAsyncIterator } from './mapAsyncIterator';

/**
* This class is exported only to assist people in implementing their own executors
Expand Down Expand Up @@ -81,6 +84,7 @@ export class Executor {
protected _variableValues: { [variable: string]: unknown };
protected _fieldResolver: GraphQLFieldResolver<any, any>;
protected _typeResolver: GraphQLTypeResolver<any, any>;
protected _subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
protected _errors: Array<GraphQLError>;

constructor({
Expand All @@ -92,6 +96,7 @@ export class Executor {
variableValues,
fieldResolver,
typeResolver,
subscribeFieldResolver,
errors,
}: ExecutionContext) {
this._schema = schema;
Expand All @@ -102,18 +107,15 @@ export class Executor {
this._variableValues = variableValues;
this._fieldResolver = fieldResolver;
this._typeResolver = typeResolver;
this._subscribeFieldResolver = subscribeFieldResolver;
this._errors = errors;
}

execute(): PromiseOrValue<ExecutionResult> {
const data = this.executeOperation();
return this.buildResponse(data);
}

/**
* Implements the "Executing operations" section of the spec.
* Implements the "Executing operations" section of the spec for
* queries and mutations.
*/
executeOperation(): PromiseOrValue<ObjMap<unknown> | null> {
executeQueryOrMutation(): PromiseOrValue<ExecutionResult> {
const { _schema, _fragments, _rootValue, _operation, _variableValues } =
this;
const type = getOperationRootType(_schema, _operation);
Expand All @@ -138,15 +140,17 @@ export class Executor {
? this.executeFieldsSerially(type, _rootValue, path, fields)
: this.executeFields(type, _rootValue, path, fields);
if (isPromise(result)) {
return result.then(undefined, (error) => {
this._errors.push(error);
return Promise.resolve(null);
});
return this.buildResponse(
result.then(undefined, (error) => {
this._errors.push(error);
return Promise.resolve(null);
}),
);
}
return result;
return this.buildResponse(result);
} catch (error) {
this._errors.push(error);
return null;
return this.buildResponse(null);
}
}

Expand Down Expand Up @@ -665,6 +669,159 @@ export class Executor {
return this.executeFields(returnType, result, path, subFieldNodes);
}

async executeSubscription(): Promise<
AsyncGenerator<ExecutionResult, void, void> | ExecutionResult
> {
const resultOrStream = await this.createSourceEventStream();

if (!isAsyncIterable(resultOrStream)) {
return resultOrStream;
}

// For each payload yielded from a subscription, map it over the normal
// GraphQL `execute` function, with `payload` as the rootValue.
// This implements the "MapSourceToResponseEvent" algorithm described in
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
const mapSourceToResponse = (payload: unknown) => {
const executor = new Executor({
schema: this._schema,
fragments: this._fragments,
rootValue: payload,
contextValue: this._contextValue,
operation: this._operation,
variableValues: this._variableValues,
fieldResolver: this._fieldResolver,
typeResolver: this._typeResolver,
errors: [],
});

return executor.executeQueryOrMutation();
};

// Map every source value to a ExecutionResult value as described above.
return mapAsyncIterator(resultOrStream, mapSourceToResponse);
}

/**
* Implements the "CreateSourceEventStream" algorithm described in the
* GraphQL specification, resolving the subscription source event stream.
*
* Returns a Promise which resolves to either an AsyncIterable (if successful)
* or an ExecutionResult (error). The promise will be rejected if the schema or
* other arguments to this function are invalid, or if the resolved event stream
* is not an async iterable.
*
* If the client-provided arguments to this function do not result in a
* compliant subscription, a GraphQL Response (ExecutionResult) with
* descriptive errors and no data will be returned.
*
* If the the source stream could not be created due to faulty subscription
* resolver logic or underlying systems, the promise will resolve to a single
* ExecutionResult containing `errors` and no `data`.
*
* If the operation succeeded, the promise resolves to the AsyncIterable for the
* event stream returned by the resolver.
*
* A Source Event Stream represents a sequence of events, each of which triggers
* a GraphQL execution for that event.
*
* This may be useful when hosting the stateful subscription service in a
* different process or machine than the stateless GraphQL execution engine,
* or otherwise separating these two steps. For more on this, see the
* "Supporting Subscriptions at Scale" information in the GraphQL specification.
*/
async createSourceEventStream(): Promise<
AsyncIterable<unknown> | ExecutionResult
> {
try {
const eventStream = await this._createSourceEventStream();

// Assert field returned an event stream, otherwise yield an error.
if (!isAsyncIterable(eventStream)) {
throw new Error(
'Subscription field must return Async Iterable. ' +
`Received: ${inspect(eventStream)}.`,
);
}

return eventStream;
} catch (error) {
// If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
// Otherwise treat the error as a system-class error and re-throw it.
if (error instanceof GraphQLError) {
return { errors: [error] };
}
throw error;
}
}

public async _createSourceEventStream(): Promise<unknown> {
const {
_schema,
_fragments,
_rootValue,
_contextValue,
_operation,
_variableValues,
_fieldResolver,
} = this;
const type = getOperationRootType(_schema, _operation);
const fields = collectFields(
_schema,
_fragments,
_variableValues,
type,
_operation.selectionSet,
new Map(),
new Set(),
);
const [responseName, fieldNodes] = [...fields.entries()][0];
const fieldDef = getFieldDef(_schema, type, fieldNodes[0]);

if (!fieldDef) {
const fieldName = fieldNodes[0].name.value;
throw new GraphQLError(
`The subscription field "${fieldName}" is not defined.`,
fieldNodes,
);
}

const path = addPath(undefined, responseName, type.name);
const info = this.buildResolveInfo(fieldDef, fieldNodes, type, path);

try {
// Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
// It differs from "ResolveFieldValue" due to providing a different `resolveFn`.

// Build a JS object of arguments from the field.arguments AST, using the
// variables scope to fulfill any variable references.
const args = getArgumentValues(fieldDef, fieldNodes[0], _variableValues);

// Call the `subscribe()` resolver or the default resolver to produce an
// AsyncIterable yielding raw payloads.
const resolveFn = fieldDef.subscribe ?? _fieldResolver;

// The resolve function's optional third argument is a context value that
// is provided to every resolve function within an execution. It is commonly
// used to represent an authenticated user, or request-specific caches.
const eventStream = await resolveFn(
_rootValue,
args,
_contextValue,
info,
);

if (eventStream instanceof Error) {
throw eventStream;
}
return eventStream;
} catch (error) {
throw locatedError(error, fieldNodes, pathToArray(path));
}
}

/**
* A collection of relevant subfields with regard to the return type.
* See 'collectSubfields' above for the memoized version.
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/subscription/__tests__/subscribe-test.ts
Expand Up @@ -12,7 +12,7 @@ import { GraphQLSchema } from '../../type/schema';
import { GraphQLList, GraphQLObjectType } from '../../type/definition';
import { GraphQLInt, GraphQLString, GraphQLBoolean } from '../../type/scalars';

import { SubscriptionExecutor, subscribe } from '../subscribe';
import { subscribe } from '../subscribe';

import { SimplePubSub } from './simplePubSub';

Expand Down
11 changes: 4 additions & 7 deletions src/subscription/subscribe.ts
Expand Up @@ -2,14 +2,14 @@ import type { Maybe } from '../jsutils/Maybe';

import type { DocumentNode } from '../language/ast';

import { Executor } from '../execution/executor';

import type { ExecutionResult } from '../execution/execute';
import { buildExecutionContext } from '../execution/execute';

import type { GraphQLSchema } from '../type/schema';
import type { GraphQLFieldResolver } from '../type/definition';

import { SubscriptionExecutor } from './subscriptionExecutor';

export interface SubscriptionArgs {
schema: GraphQLSchema;
document: DocumentNode;
Expand Down Expand Up @@ -47,16 +47,13 @@ export async function subscribe(
): Promise<AsyncGenerator<ExecutionResult, void, void> | ExecutionResult> {
// If a valid execution context cannot be created due to incorrect arguments,
// a "Response" with only errors is returned.
const exeContext = buildExecutionContext({
...args,
fieldResolver: args.subscribeFieldResolver,
});
const exeContext = buildExecutionContext(args);

// Return early errors if execution context failed.
if (!('schema' in exeContext)) {
return Promise.resolve({ errors: exeContext });
}

const executor = new SubscriptionExecutor(exeContext, args.document);
const executor = new Executor(exeContext);
return executor.executeSubscription();
}

0 comments on commit 495535f

Please sign in to comment.