Skip to content

Commit

Permalink
enable batch execution (#1971)
Browse files Browse the repository at this point in the history
* enable batch execution

When `batch` is set to true for a given subschemaConfig, batches all delegated root fields into a combined request passed to the executor. Moreover, batches all requests to a given subschema into the minimum number of requests, collecting queries and mutations separately, preserving operation order. Distributes properly pathed errors to the originating requests.

Adapted from Gatsby query batcher by @vladar.

Caveats:
* Uses a Dataloader under the hood, which is created anew upon each request -- relies on a unique context argument per request to make this happen!
* Passed `info` argument from first executor call to the batched executor call, making info argument unreliable.

Related:

gatsbyjs/gatsby#22347 (comment)
#1710 (comment)
#1959 (comment)
#1954

* fix(delegate): pass extensions to executor/subscriber
  • Loading branch information
yaacovCR committed Sep 1, 2020
1 parent c4ba16c commit 0c0b89f
Show file tree
Hide file tree
Showing 14 changed files with 691 additions and 30 deletions.
20 changes: 16 additions & 4 deletions packages/batch-delegate/tests/typeMerging.example.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ describe('merging using type merging', () => {
fieldName: '_userById',
args: ({ id }) => ({ id })
}
}
},
batch: true,
},
{
schema: inventorySchema,
Expand All @@ -242,7 +243,8 @@ describe('merging using type merging', () => {
fieldName: '_productByRepresentation',
args: ({ upc, weight, price }) => ({ product: { upc, weight, price } }),
}
}
},
batch: true,
},
{
schema: productsSchema,
Expand All @@ -252,7 +254,8 @@ describe('merging using type merging', () => {
fieldName: '_productByUpc',
args: ({ upc }) => ({ upc }),
}
}
},
batch: true,
},
{
schema: reviewsSchema,
Expand All @@ -268,7 +271,8 @@ describe('merging using type merging', () => {
fieldName: '_productByUpc',
args: ({ upc }) => ({ upc }),
},
}
},
batch: true,
}],
mergeTypes: true,
});
Expand All @@ -284,6 +288,8 @@ describe('merging using type merging', () => {
}
}
`,
undefined,
{},
);

const expectedResult = {
Expand Down Expand Up @@ -315,6 +321,8 @@ describe('merging using type merging', () => {
}
}
`,
undefined,
{},
);

const expectedResult: ExecutionResult = {
Expand Down Expand Up @@ -348,6 +356,8 @@ describe('merging using type merging', () => {
}
}
`,
undefined,
{},
);

const expectedResult: ExecutionResult = {
Expand Down Expand Up @@ -393,6 +403,8 @@ describe('merging using type merging', () => {
}
}
`,
undefined,
{},
);

const expectedResult: ExecutionResult = {
Expand Down
3 changes: 2 additions & 1 deletion packages/delegate/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
"@graphql-tools/schema": "6.1.0",
"@graphql-tools/utils": "6.1.0",
"@ardatan/aggregate-error": "0.0.1",
"dataloader": "2.0.0",
"is-promise": "4.0.0",
"tslib": "~2.0.1"
},
"publishConfig": {
"access": "public",
"directory": "dist"
}
}
}
13 changes: 8 additions & 5 deletions packages/delegate/src/delegateToSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { createRequestFromInfo, getDelegatingOperation } from './createRequest';
import { Transformer } from './Transformer';

import AggregateError from '@ardatan/aggregate-error';
import { getBatchingExecutor } from './getBatchingExecutor';

export function delegateToSchema(options: IDelegateToSchemaOptions | GraphQLSchema): any {
if (isSchema(options)) {
Expand Down Expand Up @@ -154,12 +155,15 @@ export function delegateRequest({
}

if (targetOperation === 'query' || targetOperation === 'mutation') {
const executor =
let executor =
subschemaConfig?.executor || createDefaultExecutor(targetSchema, subschemaConfig?.rootValue || targetRootValue);

if (subschemaConfig?.batch) {
executor = getBatchingExecutor(context, subschemaConfig, executor);
}

const executionResult = executor({
document: processedRequest.document,
variables: processedRequest.variables,
...processedRequest,
context,
info,
});
Expand All @@ -174,8 +178,7 @@ export function delegateRequest({
subschemaConfig?.subscriber || createDefaultSubscriber(targetSchema, subschemaConfig?.rootValue || targetRootValue);

return subscriber({
document: processedRequest.document,
variables: processedRequest.variables,
...processedRequest,
context,
info,
}).then((subscriptionResult: AsyncIterableIterator<ExecutionResult> | ExecutionResult) => {
Expand Down
89 changes: 89 additions & 0 deletions packages/delegate/src/getBatchingExecutor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { getOperationAST } from 'graphql';

import isPromise from 'is-promise';

import DataLoader from 'dataloader';

import { ExecutionResult } from '@graphql-tools/utils';

import { SubschemaConfig, ExecutionParams } from './types';
import { memoize2of3 } from './memoize';
import { mergeExecutionParams } from './mergeExecutionParams';
import { splitResult } from './splitResult';

export const getBatchingExecutor = memoize2of3(function (
_context: Record<string, any>,
subschemaConfig: SubschemaConfig,
executor: ({ document, context, variables, info }: ExecutionParams) => ExecutionResult | Promise<ExecutionResult>
) {
const loader = new DataLoader(
createLoadFn(
executor ?? subschemaConfig.executor,
subschemaConfig.batchingOptions?.extensionsReducer ?? defaultExtensionsReducer
),
subschemaConfig.batchingOptions?.dataLoaderOptions
);
return (executionParams: ExecutionParams) => loader.load(executionParams);
});

function createLoadFn(
executor: ({ document, context, variables, info }: ExecutionParams) => ExecutionResult | Promise<ExecutionResult>,
extensionsReducer: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
) {
return async (execs: Array<ExecutionParams>): Promise<Array<ExecutionResult>> => {
const execBatches: Array<Array<ExecutionParams>> = [];
let index = 0;
const exec = execs[index];
let currentBatch: Array<ExecutionParams> = [exec];
execBatches.push(currentBatch);
const operationType = getOperationAST(exec.document, undefined).operation;
while (++index < execs.length) {
const currentOperationType = getOperationAST(execs[index].document, undefined).operation;
if (operationType === currentOperationType) {
currentBatch.push(execs[index]);
} else {
currentBatch = [execs[index]];
execBatches.push(currentBatch);
}
}

let containsPromises = false;
const executionResults: Array<ExecutionResult | Promise<ExecutionResult>> = [];
execBatches.forEach(execBatch => {
const mergedExecutionParams = mergeExecutionParams(execBatch, extensionsReducer);
const executionResult = executor(mergedExecutionParams);

if (isPromise(executionResult)) {
containsPromises = true;
}
executionResults.push(executionResult);
});

if (containsPromises) {
return Promise.all(executionResults).then(resultBatches => {
let results: Array<ExecutionResult> = [];
resultBatches.forEach((resultBatch, index) => {
results = results.concat(splitResult(resultBatch, execBatches[index].length));
});
return results;
});
}

let results: Array<ExecutionResult> = [];
(executionResults as Array<ExecutionResult>).forEach((resultBatch, index) => {
results = results.concat(splitResult(resultBatch, execBatches[index].length));
});
return results;
};
}

function defaultExtensionsReducer(
mergedExtensions: Record<string, any>,
executionParams: ExecutionParams
): Record<string, any> {
const newExtensions = executionParams.extensions;
if (newExtensions != null) {
Object.assign(mergedExtensions, newExtensions);
}
return mergedExtensions;
}
40 changes: 40 additions & 0 deletions packages/delegate/src/memoize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,43 @@ export function memoize2<T1 extends Record<string, any>, T2 extends Record<strin

return memoized;
}

export function memoize2of3<
T1 extends Record<string, any>,
T2 extends Record<string, any>,
T3 extends any,
R extends any
>(fn: (A1: T1, A2: T2, A3: T3) => R): (A1: T1, A2: T2, A3: T3) => R {
let cache1: WeakMap<T1, WeakMap<T2, R>>;

function memoized(a1: T1, a2: T2, a3: T3) {
if (!cache1) {
cache1 = new WeakMap();
const cache2: WeakMap<T2, R> = new WeakMap();
cache1.set(a1, cache2);
const newValue = fn(a1, a2, a3);
cache2.set(a2, newValue);
return newValue;
}

let cache2 = cache1.get(a1);
if (!cache2) {
cache2 = new WeakMap();
cache1.set(a1, cache2);
const newValue = fn(a1, a2, a3);
cache2.set(a2, newValue);
return newValue;
}

const cachedValue = cache2.get(a2);
if (cachedValue === undefined) {
const newValue = fn(a1, a2, a3);
cache2.set(a2, newValue);
return newValue;
}

return cachedValue;
}

return memoized;
}

0 comments on commit 0c0b89f

Please sign in to comment.