Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable batch execution #1971

Merged
merged 2 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 16 additions & 4 deletions packages/batch-delegate/tests/typeMerging.example.test.ts
Expand Up @@ -227,7 +227,8 @@ describe('merging using type merging', () => {
fieldName: '_userById',
args: ({ id }) => ({ id })
}
}
},
batch: true,
},
{
schema: inventorySchema,
Expand All @@ -242,7 +243,8 @@ describe('merging using type merging', () => {
fieldName: '_productByRepresentation',
args: ({ upc, weight, price }) => ({ product: { upc, weight, price } }),
}
}
},
batch: true,
},
{
schema: productsSchema,
Expand All @@ -252,7 +254,8 @@ describe('merging using type merging', () => {
fieldName: '_productByUpc',
args: ({ upc }) => ({ upc }),
}
}
},
batch: true,
},
{
schema: reviewsSchema,
Expand All @@ -268,7 +271,8 @@ describe('merging using type merging', () => {
fieldName: '_productByUpc',
args: ({ upc }) => ({ upc }),
},
}
},
batch: true,
}],
mergeTypes: true,
});
Expand All @@ -284,6 +288,8 @@ describe('merging using type merging', () => {
}
}
`,
undefined,
{},
);

const expectedResult = {
Expand Down Expand Up @@ -315,6 +321,8 @@ describe('merging using type merging', () => {
}
}
`,
undefined,
{},
);

const expectedResult: ExecutionResult = {
Expand Down Expand Up @@ -348,6 +356,8 @@ describe('merging using type merging', () => {
}
}
`,
undefined,
{},
);

const expectedResult: ExecutionResult = {
Expand Down Expand Up @@ -393,6 +403,8 @@ describe('merging using type merging', () => {
}
}
`,
undefined,
{},
);

const expectedResult: ExecutionResult = {
Expand Down
3 changes: 2 additions & 1 deletion packages/delegate/package.json
Expand Up @@ -21,11 +21,12 @@
"@graphql-tools/schema": "6.1.0",
"@graphql-tools/utils": "6.1.0",
"@ardatan/aggregate-error": "0.0.1",
"dataloader": "2.0.0",
"is-promise": "4.0.0",
"tslib": "~2.0.1"
},
"publishConfig": {
"access": "public",
"directory": "dist"
}
}
}
13 changes: 8 additions & 5 deletions packages/delegate/src/delegateToSchema.ts
Expand Up @@ -24,6 +24,7 @@ import { createRequestFromInfo, getDelegatingOperation } from './createRequest';
import { Transformer } from './Transformer';

import AggregateError from '@ardatan/aggregate-error';
import { getBatchingExecutor } from './getBatchingExecutor';

export function delegateToSchema(options: IDelegateToSchemaOptions | GraphQLSchema): any {
if (isSchema(options)) {
Expand Down Expand Up @@ -154,12 +155,15 @@ export function delegateRequest({
}

if (targetOperation === 'query' || targetOperation === 'mutation') {
const executor =
let executor =
subschemaConfig?.executor || createDefaultExecutor(targetSchema, subschemaConfig?.rootValue || targetRootValue);

if (subschemaConfig?.batch) {
executor = getBatchingExecutor(context, subschemaConfig, executor);
}

const executionResult = executor({
document: processedRequest.document,
variables: processedRequest.variables,
...processedRequest,
context,
info,
});
Expand All @@ -174,8 +178,7 @@ export function delegateRequest({
subschemaConfig?.subscriber || createDefaultSubscriber(targetSchema, subschemaConfig?.rootValue || targetRootValue);

return subscriber({
document: processedRequest.document,
variables: processedRequest.variables,
...processedRequest,
context,
info,
}).then((subscriptionResult: AsyncIterableIterator<ExecutionResult> | ExecutionResult) => {
Expand Down
89 changes: 89 additions & 0 deletions packages/delegate/src/getBatchingExecutor.ts
@@ -0,0 +1,89 @@
import { getOperationAST } from 'graphql';

import isPromise from 'is-promise';

import DataLoader from 'dataloader';

import { ExecutionResult } from '@graphql-tools/utils';

import { SubschemaConfig, ExecutionParams } from './types';
import { memoize2of3 } from './memoize';
import { mergeExecutionParams } from './mergeExecutionParams';
import { splitResult } from './splitResult';

export const getBatchingExecutor = memoize2of3(function (
_context: Record<string, any>,
subschemaConfig: SubschemaConfig,
executor: ({ document, context, variables, info }: ExecutionParams) => ExecutionResult | Promise<ExecutionResult>
) {
const loader = new DataLoader(
createLoadFn(
executor ?? subschemaConfig.executor,
subschemaConfig.batchingOptions?.extensionsReducer ?? defaultExtensionsReducer
),
subschemaConfig.batchingOptions?.dataLoaderOptions
);
return (executionParams: ExecutionParams) => loader.load(executionParams);
});

function createLoadFn(
executor: ({ document, context, variables, info }: ExecutionParams) => ExecutionResult | Promise<ExecutionResult>,
extensionsReducer: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
) {
return async (execs: Array<ExecutionParams>): Promise<Array<ExecutionResult>> => {
const execBatches: Array<Array<ExecutionParams>> = [];
let index = 0;
const exec = execs[index];
let currentBatch: Array<ExecutionParams> = [exec];
execBatches.push(currentBatch);
const operationType = getOperationAST(exec.document, undefined).operation;
while (++index < execs.length) {
const currentOperationType = getOperationAST(execs[index].document, undefined).operation;
if (operationType === currentOperationType) {
currentBatch.push(execs[index]);
} else {
currentBatch = [execs[index]];
execBatches.push(currentBatch);
}
}

let containsPromises = false;
const executionResults: Array<ExecutionResult | Promise<ExecutionResult>> = [];
execBatches.forEach(execBatch => {
const mergedExecutionParams = mergeExecutionParams(execBatch, extensionsReducer);
const executionResult = executor(mergedExecutionParams);

if (isPromise(executionResult)) {
containsPromises = true;
}
executionResults.push(executionResult);
});

if (containsPromises) {
return Promise.all(executionResults).then(resultBatches => {
let results: Array<ExecutionResult> = [];
resultBatches.forEach((resultBatch, index) => {
results = results.concat(splitResult(resultBatch, execBatches[index].length));
});
return results;
});
}

let results: Array<ExecutionResult> = [];
(executionResults as Array<ExecutionResult>).forEach((resultBatch, index) => {
results = results.concat(splitResult(resultBatch, execBatches[index].length));
});
return results;
};
}

function defaultExtensionsReducer(
mergedExtensions: Record<string, any>,
executionParams: ExecutionParams
): Record<string, any> {
const newExtensions = executionParams.extensions;
if (newExtensions != null) {
Object.assign(mergedExtensions, newExtensions);
}
return mergedExtensions;
}
40 changes: 40 additions & 0 deletions packages/delegate/src/memoize.ts
Expand Up @@ -211,3 +211,43 @@ export function memoize2<T1 extends Record<string, any>, T2 extends Record<strin

return memoized;
}

export function memoize2of3<
T1 extends Record<string, any>,
T2 extends Record<string, any>,
T3 extends any,
R extends any
>(fn: (A1: T1, A2: T2, A3: T3) => R): (A1: T1, A2: T2, A3: T3) => R {
let cache1: WeakMap<T1, WeakMap<T2, R>>;

function memoized(a1: T1, a2: T2, a3: T3) {
if (!cache1) {
cache1 = new WeakMap();
const cache2: WeakMap<T2, R> = new WeakMap();
cache1.set(a1, cache2);
const newValue = fn(a1, a2, a3);
cache2.set(a2, newValue);
return newValue;
}

let cache2 = cache1.get(a1);
if (!cache2) {
cache2 = new WeakMap();
cache1.set(a1, cache2);
const newValue = fn(a1, a2, a3);
cache2.set(a2, newValue);
return newValue;
}

const cachedValue = cache2.get(a2);
if (cachedValue === undefined) {
const newValue = fn(a1, a2, a3);
cache2.set(a2, newValue);
return newValue;
}

return cachedValue;
}

return memoized;
}