Skip to content

Commit

Permalink
Remove Subscriber and use only Executor (#3117)
Browse files Browse the repository at this point in the history
* Remove Subscriber and use only Executor

* Fix introspectSchema

* Fix tests

* Cleanup
  • Loading branch information
ardatan committed Jun 29, 2021
1 parent eb233da commit c0ca319
Show file tree
Hide file tree
Showing 17 changed files with 178 additions and 273 deletions.
6 changes: 6 additions & 0 deletions .changeset/nine-ducks-burn.md
@@ -0,0 +1,6 @@
---
'@graphql-tools/wrap': major
---

BREAKING CHANGE
- Remove unnecessary `introspectSchemaSync`, `introspectSchema` already handles sync execution
15 changes: 15 additions & 0 deletions .changeset/sharp-wombats-perform.md
@@ -0,0 +1,15 @@
---
'@graphql-tools/batch-execute': major
'@graphql-tools/delegate': major
'@graphql-tools/links': major
'@graphql-tools/url-loader': major
'@graphql-tools/stitch': major
'@graphql-tools/utils': major
'@graphql-tools/wrap': major
---

BREAKING CHANGE
- Remove Subscriber and use only Executor
- - Now `Executor` can receive `AsyncIterable` and subscriptions will also be handled by `Executor`. This is a future-proof change for defer, stream and live queries


7 changes: 5 additions & 2 deletions packages/batch-execute/src/createBatchingExecutor.ts
Expand Up @@ -18,7 +18,10 @@ export function createBatchingExecutor(
) => Record<string, any> = defaultExtensionsReducer
): Executor {
const loader = new DataLoader(createLoadFn(executor, extensionsReducer), dataLoaderOptions);
return (executionParams: ExecutionParams) => loader.load(executionParams);
return (executionParams: ExecutionParams) =>
executionParams.info?.operation.operation === 'subscription'
? executor(executionParams)
: loader.load(executionParams);
}

function createLoadFn(
Expand Down Expand Up @@ -53,7 +56,7 @@ function createLoadFn(

const executionResults: Array<ValueOrPromise<ExecutionResult>> = execBatches.map(execBatch => {
const mergedExecutionParams = mergeExecutionParams(execBatch, extensionsReducer);
return new ValueOrPromise(() => executor(mergedExecutionParams));
return new ValueOrPromise(() => executor(mergedExecutionParams) as ExecutionResult);
});

return ValueOrPromise.all(executionResults)
Expand Down
4 changes: 1 addition & 3 deletions packages/delegate/src/Subschema.ts
Expand Up @@ -3,7 +3,7 @@ import { GraphQLSchema } from 'graphql';
import { SubschemaConfig, Transform, MergedTypeConfig, CreateProxyingResolverFn, BatchingOptions } from './types';

import { applySchemaTransforms } from './applySchemaTransforms';
import { Executor, Subscriber } from '@graphql-tools/utils';
import { Executor } from '@graphql-tools/utils';

export function isSubschema(value: any): value is Subschema {
return Boolean(value.transformedSchema);
Expand All @@ -20,7 +20,6 @@ export class Subschema<K = any, V = any, C = K, TContext = Record<string, any>>
public schema: GraphQLSchema;

public executor?: Executor<TContext>;
public subscriber?: Subscriber<TContext>;
public batch?: boolean;
public batchingOptions?: BatchingOptions<K, V, C>;

Expand All @@ -34,7 +33,6 @@ export class Subschema<K = any, V = any, C = K, TContext = Record<string, any>>
this.schema = config.schema;

this.executor = config.executor;
this.subscriber = config.subscriber;
this.batch = config.batch;
this.batchingOptions = config.batchingOptions;

Expand Down
89 changes: 31 additions & 58 deletions packages/delegate/src/delegateToSchema.ts
Expand Up @@ -18,13 +18,12 @@ import { getBatchingExecutor } from '@graphql-tools/batch-execute';

import {
mapAsyncIterator,
ExecutionResult,
Executor,
ExecutionParams,
Subscriber,
Maybe,
assertSome,
AggregateError,
isAsyncIterable,
} from '@graphql-tools/utils';

import {
Expand Down Expand Up @@ -102,41 +101,27 @@ export function delegateRequest<TContext = Record<string, any>, TArgs = any>(
validateRequest(delegationContext, processedRequest.document);
}

const { operation, context, info } = delegationContext;
const { context, info } = delegationContext;

if (operation === 'query' || operation === 'mutation') {
const executor = getExecutor(delegationContext);
const executor = getExecutor(delegationContext);

return new ValueOrPromise(() =>
executor({
...processedRequest,
context,
info,
})
)
.then(originalResult => transformer.transformResult(originalResult))
.resolve();
}

const subscriber = getSubscriber(delegationContext);

return subscriber({
...processedRequest,
context,
info,
}).then(subscriptionResult => {
if (Symbol.asyncIterator in subscriptionResult) {
// "subscribe" to the subscription result and map the result through the transforms
return mapAsyncIterator<ExecutionResult, any>(
subscriptionResult as AsyncIterableIterator<ExecutionResult>,
originalResult => ({
return new ValueOrPromise(() =>
executor({
...processedRequest,
context,
info,
})
)
.then(originalResult => {
if (isAsyncIterable(originalResult)) {
// "subscribe" to the subscription result and map the result through the transforms
return mapAsyncIterator(originalResult, originalResult => ({
[delegationContext.fieldName]: transformer.transformResult(originalResult),
})
);
}

return transformer.transformResult(subscriptionResult as ExecutionResult);
});
}));
}
return transformer.transformResult(originalResult);
})
.resolve();
}

const emptyObject = {};
Expand Down Expand Up @@ -250,14 +235,14 @@ function validateRequest(delegationContext: DelegationContext<any>, document: Do
}

function getExecutor<TContext>(delegationContext: DelegationContext<TContext>): Executor<TContext> {
const { subschemaConfig, targetSchema, context } = delegationContext;
const { subschemaConfig, targetSchema, context, operation } = delegationContext;

let executor: Executor = subschemaConfig?.executor || createDefaultExecutor(targetSchema);
let executor: Executor = subschemaConfig?.executor || createDefaultExecutor(targetSchema, operation);

if (subschemaConfig?.batch) {
const batchingOptions = subschemaConfig?.batchingOptions;
executor = getBatchingExecutor(
context ?? self ?? window ?? global,
context ?? globalThis ?? window ?? global,
executor,
batchingOptions?.dataLoaderOptions,
batchingOptions?.extensionsReducer
Expand All @@ -267,29 +252,17 @@ function getExecutor<TContext>(delegationContext: DelegationContext<TContext>):
return executor;
}

function getSubscriber<TContext>(delegationContext: DelegationContext<TContext>): Subscriber<TContext> {
const { subschemaConfig, targetSchema } = delegationContext;
return subschemaConfig?.subscriber || createDefaultSubscriber(targetSchema);
}

function createDefaultExecutor(schema: GraphQLSchema): Executor {
return (({ document, context, variables, rootValue }: ExecutionParams) =>
execute({
const createDefaultExecutor = (schema: GraphQLSchema, operation: OperationTypeNode) =>
(({ document, context, variables, rootValue }: ExecutionParams) => {
const executionParams = {
schema,
document,
contextValue: context,
variableValues: variables,
rootValue,
})) as Executor;
}

function createDefaultSubscriber(schema: GraphQLSchema) {
return ({ document, context, variables, rootValue }: ExecutionParams) =>
subscribe({
schema,
document,
contextValue: context,
variableValues: variables,
rootValue,
}) as any;
}
};
if (operation === 'subscription') {
return subscribe(executionParams);
}
return execute(executionParams);
}) as Executor;
2 changes: 1 addition & 1 deletion packages/delegate/src/index.ts
Expand Up @@ -9,4 +9,4 @@ export * from './resolveExternalValue';
export * from './subschemaConfig';
export * from './transforms';
export * from './types';
export { Executor, Subscriber, AsyncExecutor, SyncExecutor, ExecutionParams } from '@graphql-tools/utils';
export { Executor, AsyncExecutor, SyncExecutor, ExecutionParams } from '@graphql-tools/utils';
3 changes: 1 addition & 2 deletions packages/delegate/src/types.ts
Expand Up @@ -14,7 +14,7 @@ import {

import DataLoader from 'dataloader';

import { ExecutionParams, ExecutionResult, Executor, Request, Subscriber, TypeMap } from '@graphql-tools/utils';
import { ExecutionParams, ExecutionResult, Executor, Request, TypeMap } from '@graphql-tools/utils';

import { Subschema } from './Subschema';
import { OBJECT_SUBSCHEMA_SYMBOL, FIELD_SUBSCHEMA_MAP_SYMBOL, UNPATHED_ERRORS_SYMBOL } from './symbols';
Expand Down Expand Up @@ -145,7 +145,6 @@ export interface SubschemaConfig<K = any, V = any, C = K, TContext = Record<stri
transforms?: Array<Transform<any, TContext>>;
merge?: Record<string, MergedTypeConfig<any, any, TContext>>;
executor?: Executor<TContext>;
subscriber?: Subscriber<TContext>;
batch?: boolean;
batchingOptions?: BatchingOptions<K, V, C>;
}
Expand Down
1 change: 0 additions & 1 deletion packages/links/src/index.ts
@@ -1,5 +1,4 @@
export { createServerHttpLink } from './createServerHttpLink';
export { AwaitVariablesLink } from './AwaitVariablesLink';
export { linkToExecutor } from './linkToExecutor';
export { linkToSubscriber } from './linkToSubscriber';
export { GraphQLUpload } from './GraphQLUpload';
36 changes: 19 additions & 17 deletions packages/links/src/linkToExecutor.ts
@@ -1,24 +1,26 @@
import { toPromise } from '@apollo/client/core';
import { ApolloLink, execute } from '@apollo/client/link/core';
import { Observable } from '@apollo/client/utilities';
import { toPromise } from '@apollo/client/link/utils';

import { AsyncExecutor, ExecutionParams, ExecutionResult } from '@graphql-tools/utils';
import { Executor, ExecutionParams, ExecutionResult, observableToAsyncIterable } from '@graphql-tools/utils';

export const linkToExecutor =
(link: ApolloLink): AsyncExecutor =>
<TReturn, TArgs, TContext>(params: ExecutionParams<TArgs, TContext>): Promise<ExecutionResult<TReturn>> => {
(link: ApolloLink): Executor =>
async <TReturn, TArgs, TContext>(params: ExecutionParams<TArgs, TContext>) => {
const { document, variables, extensions, context, info, operationName } = params;
return toPromise(
execute(link, {
query: document,
variables: variables,
context: {
graphqlContext: context,
graphqlResolveInfo: info,
clientAwareness: {},
},
extensions,
operationName,
}) as Observable<ExecutionResult<TReturn>>
);
const observable = execute(link, {
query: document,
variables,
context: {
graphqlContext: context,
graphqlResolveInfo: info,
clientAwareness: {},
},
extensions,
operationName,
}) as Observable<ExecutionResult<TReturn>>;
if (info?.operation.operation === 'subscription') {
return observableToAsyncIterable<ExecutionResult<TReturn>>(observable)[Symbol.asyncIterator]();
}
return toPromise(observable);
};
25 changes: 0 additions & 25 deletions packages/links/src/linkToSubscriber.ts

This file was deleted.

0 comments on commit c0ca319

Please sign in to comment.