Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor batching logic #220

Merged
merged 1 commit into from Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -317,7 +317,7 @@ Create a new `DataLoader` given a batch loading function and options.

| Option Key | Type | Default | Description |
| ---------- | ---- | ------- | ----------- |
| *batch* | Boolean | `true` | Set to `false` to disable batching, invoking `batchLoadFn` with a single load key.
| *batch* | Boolean | `true` | Set to `false` to disable batching, invoking `batchLoadFn` with a single load key. This is equivalent to setting `maxBatchSize` to `1`.
| *maxBatchSize* | Number | `Infinity` | Limits the number of items that get passed in to the `batchLoadFn`.
| *cache* | Boolean | `true` | Set to `false` to disable memoization caching, creating a new Promise and new key in the `batchLoadFn` for every load of the same key.
| *cacheKeyFn* | Function | `key => key` | Produces cache key for a given load key. Useful when objects are keys and two objects should be considered equivalent.
Expand Down
18 changes: 18 additions & 0 deletions src/__tests__/dataloader.test.js
Expand Up @@ -106,6 +106,24 @@ describe('Primary API', () => {
expect(loadCalls).toEqual([ [ 1 ] ]);
});

it('coalesces identical requests across sized batches', async () => {
const [ identityLoader, loadCalls ] = idLoader<number>({ maxBatchSize: 2 });

const promise1a = identityLoader.load(1);
const promise2 = identityLoader.load(2);
const promise1b = identityLoader.load(1);
const promise3 = identityLoader.load(3);

const [ value1a, value2, value1b, value3 ] =
await Promise.all([ promise1a, promise2, promise1b, promise3 ]);
expect(value1a).toBe(1);
expect(value2).toBe(2);
expect(value1b).toBe(1);
expect(value3).toBe(3);

expect(loadCalls).toEqual([ [ 1, 2 ], [ 3 ] ]);
});

it('caches repeated requests', async () => {
const [ identityLoader, loadCalls ] = idLoader<string>();

Expand Down
136 changes: 67 additions & 69 deletions src/index.js
Expand Up @@ -54,14 +54,14 @@ class DataLoader<K, V, C = K> {
this._batchLoadFn = batchLoadFn;
this._options = options;
this._promiseCache = getValidCacheMap(options);
this._queue = [];
this._batch = null;
}

// Private
_batchLoadFn: BatchLoadFn<K, V>;
_options: ?Options<K, V, C>;
_promiseCache: ?CacheMap<C, Promise<V>>;
_queue: LoaderQueue<K, V>;
_batch: Batch<K, V> | null;

/**
* Loads a key, returning a `Promise` for the value represented by that key.
Expand All @@ -76,7 +76,7 @@ class DataLoader<K, V, C = K> {

// Determine options
var options = this._options;
var shouldBatch = !options || options.batch !== false;
var batch = getCurrentBatch(this);
var cache = this._promiseCache;
var cacheKey = getCacheKey(options, key);

Expand All @@ -88,23 +88,11 @@ class DataLoader<K, V, C = K> {
}
}

// Otherwise, produce a new Promise for this value.
// Otherwise, produce a new Promise for this key, and enqueue it to be
// dispatched along with the current batch.
batch.keys.push(key);
var promise = new Promise((resolve, reject) => {
// Enqueue this Promise to be dispatched.
this._queue.push({ key, resolve, reject });

// Determine if a dispatch of this queue should be scheduled.
// A single dispatch should be scheduled per queue at the time when the
// queue changes from "empty" to "full".
if (this._queue.length === 1) {
if (shouldBatch) {
// If batching, schedule a task to dispatch the queue.
enqueuePostPromiseJob(() => dispatchQueue(this));
} else {
// Otherwise dispatch the (queue of one) immediately.
dispatchQueue(this);
}
}
batch.callbacks.push({ resolve, reject });
});

// If caching, cache this promise.
Expand Down Expand Up @@ -239,43 +227,61 @@ var enqueuePostPromiseJob =
// Private: cached resolved Promise instance
var resolvedPromise;

// Private: given the current state of a Loader instance, perform a batch load
// from its current queue.
function dispatchQueue<K, V>(loader: DataLoader<K, V, any>) {
// Take the current loader queue, replacing it with an empty queue.
var queue = loader._queue;
loader._queue = [];

// If a maxBatchSize was provided and the queue is longer, then segment the
// queue into multiple batches, otherwise treat the queue as a single batch.
var maxBatchSize = loader._options && loader._options.maxBatchSize;
if (maxBatchSize && maxBatchSize > 0 && maxBatchSize < queue.length) {
for (var i = 0; i < queue.length / maxBatchSize; i++) {
dispatchQueueBatch(
loader,
queue.slice(i * maxBatchSize, (i + 1) * maxBatchSize)
);
}
} else {
dispatchQueueBatch(loader, queue);
// Private: Describes a batch of requests
type Batch<K, V> = {
hasDispatched: boolean,
keys: Array<K>,
callbacks: Array<{
resolve: (value: V) => void;
reject: (error: Error) => void;
}>
}

// Private: Either returns the current batch, or creates and schedules a
// dispatch of a new batch for the given loader.
function getCurrentBatch<K, V>(loader: DataLoader<K, V, any>): Batch<K, V> {
var options = loader._options;
var maxBatchSize =
(options && options.maxBatchSize) ||
(options && options.batch === false ? 1 : 0);

// If there is an existing batch which has not yet dispatched and is within
// the limit of the batch size, then return it.
var existingBatch = loader._batch;
if (
existingBatch !== null &&
!existingBatch.hasDispatched &&
(maxBatchSize === 0 || existingBatch.keys.length < maxBatchSize)
) {
return existingBatch;
}

// Otherwise, create a new batch for this loader.
var newBatch = { hasDispatched: false, keys: [], callbacks: [] };

// Store it on the loader so it may be reused.
loader._batch = newBatch;

// Then schedule a task to dispatch this batch of requests.
enqueuePostPromiseJob(() => dispatchBatch(loader, newBatch));

return newBatch;
}

function dispatchQueueBatch<K, V>(
function dispatchBatch<K, V>(
loader: DataLoader<K, V, any>,
queue: LoaderQueue<K, V>
batch: Batch<K, V>
) {
// Collect all keys to be loaded in this dispatch
var keys = queue.map(({ key }) => key);
// Mark this batch as having been dispatched.
batch.hasDispatched = true;

// Call the provided batchLoadFn for this loader with the loader queue's keys.
var batchLoadFn = loader._batchLoadFn;
// Call with the loader as the `this` context.
var batchPromise = batchLoadFn.call(loader, keys);
// Call the provided batchLoadFn for this loader with the batch's keys and
// with the loader as the `this` context.
var batchPromise = loader._batchLoadFn(batch.keys);

// Assert the expected response from batchLoadFn
if (!batchPromise || typeof batchPromise.then !== 'function') {
return failedDispatch(loader, queue, new TypeError(
return failedDispatch(loader, batch, new TypeError(
'DataLoader must be constructed with a function which accepts ' +
'Array<key> and returns Promise<Array<value>>, but the function did ' +
`not return a Promise: ${String(batchPromise)}.`
Expand All @@ -293,41 +299,40 @@ function dispatchQueueBatch<K, V>(
`not return a Promise of an Array: ${String(values)}.`
);
}
if (values.length !== keys.length) {
if (values.length !== batch.keys.length) {
throw new TypeError(
'DataLoader must be constructed with a function which accepts ' +
'Array<key> and returns Promise<Array<value>>, but the function did ' +
'not return a Promise of an Array of the same length as the Array ' +
'of keys.' +
`\n\nKeys:\n${String(keys)}` +
`\n\nKeys:\n${String(batch.keys)}` +
`\n\nValues:\n${String(values)}`
);
}

// Step through the values, resolving or rejecting each Promise in the
// loaded queue.
queue.forEach(({ resolve, reject }, index) => {
var value = values[index];
// Step through values, resolving or rejecting each Promise in the batch.
for (var i = 0; i < batch.callbacks.length; i++) {
var value = values[i];
if (value instanceof Error) {
reject(value);
batch.callbacks[i].reject(value);
} else {
resolve(value);
batch.callbacks[i].resolve(value);
}
});
}).catch(error => failedDispatch(loader, queue, error));
}
}).catch(error => failedDispatch(loader, batch, error));
}

// Private: do not cache individual loads if the entire batch dispatch fails,
// but still reject each request so they do not hang.
function failedDispatch<K, V>(
loader: DataLoader<K, V, any>,
queue: LoaderQueue<K, V>,
batch: Batch<K, V>,
error: Error
) {
queue.forEach(({ key, reject }) => {
loader.clear(key);
reject(error);
});
for (var i = 0; i < batch.keys.length; i++) {
loader.clear(batch.keys[i]);
batch.callbacks[i].reject(error);
}
}

// Private: produce a cache key for a given key (and options)
Expand Down Expand Up @@ -362,13 +367,6 @@ function getValidCacheMap<K, V, C>(
return cacheMap;
}

// Private
type LoaderQueue<K, V> = Array<{
key: K;
resolve: (value: V) => void;
reject: (error: Error) => void;
}>;

// Private
function isArrayLike(x: mixed): boolean {
return (
Expand Down